11 #include <so_5_extra/error_ranges.hpp> 13 #include <so_5/disp_binder.hpp> 14 #include <so_5/send_functions.hpp> 16 #include <so_5/disp/reuse/work_thread_activity_tracking.hpp> 17 #include <so_5/disp/reuse/data_source_prefix_helpers.hpp> 19 #include <so_5/stats/repository.hpp> 20 #include <so_5/stats/messages.hpp> 21 #include <so_5/stats/std_names.hpp> 22 #include <so_5/stats/impl/activity_tracking.hpp> 24 #include <so_5/details/invoke_noexcept_code.hpp> 25 #include <so_5/details/rollback_on_exception.hpp> 26 #include <so_5/details/abort_on_fatal_error.hpp> 28 #include <so_5/outliving.hpp> 30 #include <asio/io_context.hpp> 31 #include <asio/io_context_strand.hpp> 32 #include <asio/post.hpp> 79 swap( a.m_thread_count, b.m_thread_count );
80 swap( a.m_io_context, b.m_io_context );
87 m_thread_count = count;
95 return m_thread_count;
117 ::asio::io_context & service )
119 m_io_context = std::shared_ptr< ::asio::io_context >(
120 std::addressof( service ),
122 [](::asio::io_context *) {} );
134 std::shared_ptr< ::asio::io_context > service )
136 m_io_context = std::move(service);
150 m_io_context = std::make_shared< ::asio::io_context >();
241 impl::basic_dispatcher_iface_shptr_t dispatcher )
noexcept 247 empty()
const noexcept {
return !m_dispatcher; }
290 return m_dispatcher->binder_with_external_strand( strand );
330 return m_dispatcher->binder_with_own_strand();
342 return m_dispatcher->io_context();
354 reset()
noexcept { m_dispatcher.reset(); }
489 on_demand( execution_demand_t demand )
noexcept = 0;
510 template<
typename Derived,
typename... Args >
516 ::asio::io_context & io_svc,
538 log_stream <<
"An exception caught in work thread " 539 "of so_5::extra::disp::asio_thread_pool dispatcher." 549 log_stream <<
"An unknown exception caught in work thread " 550 "of so_5::extra::disp::asio_thread_pool dispatcher." 561 ptr()->on_demand( std::move(demand) );
583 on_demand( execution_demand_t demand )
noexcept override 585 demand.call_handler( thread_id() );
624 m_thread_id = std::move(tid);
642 m_work_activity.start();
651 m_work_activity.stop();
660 ::so_5::stats::work_thread_activity_stats_t result;
661 result.m_working_stats = m_work_activity.take_stats();
685 outliving_reference_t< work_thread_activity_collector_t > activity_stats )
689 m_activity_stats.get().setup_thread_id( thread_id() );
696 on_demand( execution_demand_t demand )
noexcept override 698 m_activity_stats.get().activity_started();
700 demand.call_handler( thread_id() );
702 m_activity_stats.get().activity_finished();
724 actual_dispatcher_shptr_t dispatcher )
744 agent_t & agent )
noexcept override 747 m_dispatcher->agent_bound();
749 agent.so_bind_to_dispatcher( *
this );
757 m_dispatcher->agent_bound();
789 template<
typename Derived >
796 return static_cast< Derived & >( *
this );
803 push( execution_demand_t demand )
override 831 class binder_with_external_strand_t
final 832 :
public binder_template_t< binder_with_external_strand_t >
838 actual_dispatcher_shptr_t dispatcher,
839 outliving_reference_t< ::asio::io_context::strand > strand )
845 strand()
noexcept {
return m_strand.get(); }
863 class binder_with_own_strand_t
final 864 :
public binder_template_t< binder_with_own_strand_t >
870 actual_dispatcher_shptr_t dispatcher )
916 return { std::make_shared< binder_with_external_strand_t >(
918 outliving_mutable(strand) )
926 return { std::make_shared< binder_with_own_strand_t >(
932 io_context()
const noexcept override {
return *m_io_context; }
949 return m_demands_counter;
956 std::string_view data_sources_name_base )
958 data_source().set_data_sources_name_base( data_sources_name_base );
959 data_source().start( outliving_mutable(env.stats_repository()) );
961 ::so_5::details::do_with_rollback_on_exception(
962 [&] { launch_work_threads(env); },
963 [
this] { data_source().stop(); } );
969 ::so_5::details::invoke_noexcept_code( [
this] {
971 m_io_context->stop();
978 ::so_5::details::invoke_noexcept_code( [
this] {
982 data_source().stop();
1001 #if defined(__clang__
) 1002 #pragma clang diagnostic push 1003 #pragma clang diagnostic ignored "-Wnon-virtual-dtor" 1043 const auto agents_count = m_dispatcher.m_agents_bound.load(
1044 std::memory_order_acquire );
1046 const auto demands_count = m_dispatcher.m_demands_counter.load(
1047 std::memory_order_acquire );
1049 send< ::so_5::stats::messages::quantity< std::size_t > >(
1052 ::so_5::stats::suffixes::agent_count(),
1058 send< ::so_5::stats::messages::quantity< std::size_t > >(
1061 ::so_5::stats::suffixes::work_thread_queue_size(),
1067 std::string_view name_base )
1069 using namespace ::so_5::disp::reuse;
1071 m_base_prefix = make_disp_prefix(
1078 start( ::so_5::stats::repository_t & repo )
1081 m_stats_repo = &repo;
1087 m_stats_repo->remove( *
this );
1091 #if defined(__clang__
) 1092 #pragma clang diagnostic pop 1112 environment_t & env ) = 0;
1155 environment_t & env,
1156 ::asio::io_context & io_svc,
1160 work_thread_t::run< work_thread_without_activity_tracking_t >(
1189 #if defined(__clang__
) 1190 #pragma clang diagnostic push 1191 #pragma clang diagnostic ignored "-Wnon-virtual-dtor" 1212 std::size_t thread_count )
1216 for(
auto & c : m_collectors )
1217 c = std::make_unique< work_thread_activity_collector_t >();
1223 disp_data_source_t::distribute( mbox );
1225 for( std::size_t i = 0; i != m_collectors.size(); ++i )
1226 distribute_stats_for_work_thread_at( mbox, i );
1235 return *(m_collectors[index]);
1241 const mbox_t & mbox,
1244 std::ostringstream ss;
1245 ss << base_prefix().c_str() <<
"/wt-" << index;
1247 const ::so_5::stats::prefix_t prefix{ ss.str() };
1248 auto & collector = collector_at( index );
1250 so_5::send< ::so_5::stats::messages::work_thread_activity >(
1253 ::so_5::stats::suffixes::work_thread_activity(),
1254 collector.thread_id(),
1255 collector.take_activity_stats() );
1259 #if defined(__clang__
) 1260 #pragma clang diagnostic pop 1278 environment_t & env,
1280 ::asio::io_context & io_svc,
1286 work_thread_t::run< work_thread_with_activity_tracking_t >(
1290 self.m_actual_data_source.collector_at(index) ) );
1317 typename Basic_Skeleton >
1323 outliving_reference_t< environment_t > env,
1326 std::string_view data_sources_name_base,
1342 using thread_t =
typename Traits::thread_type;
1351 environment_t & env )
override 1353 using namespace std;
1413 make( actual_dispatcher_shptr_t disp )
noexcept 1415 return { std::move( disp ) };
1434 auto c = std::thread::hardware_concurrency();
1545 environment_t & env,
1548 const std::string_view data_sources_name_base,
1556 "io_context is not set in disp_params" );
dispatcher_handle_t() noexcept=default
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
outliving_reference_t< ::asio::io_context::strand > m_strand
Strand to be used with this event_queue.
A handle for asio_thread_pool dispatcher.
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder(::asio::io_context::strand &strand) const
Get a binder for that dispatcher.
Ranges for error codes of each submodules.
::asio::io_context::strand & strand() noexcept
bool empty() const noexcept
Is this handle empty?
::asio::io_context::strand & strand() noexcept
operator bool() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
static dispatcher_handle_t make(actual_dispatcher_shptr_t disp) noexcept
::asio::io_context::strand m_strand
Strand to be used with this event_queue.
binder_with_own_strand_t(actual_dispatcher_shptr_t dispatcher)
binder_with_external_strand_t(actual_dispatcher_shptr_t dispatcher, outliving_reference_t< ::asio::io_context::strand > strand)
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.