2
3
5#include <so_5/disp/one_thread/pub.hpp>
7#include <so_5/environment.hpp>
8#include <so_5/send_functions.hpp>
10#include <so_5/stats/repository.hpp>
11#include <so_5/stats/messages.hpp>
12#include <so_5/stats/std_names.hpp>
14#include <so_5/stats/impl/activity_tracking.hpp>
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
18#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
19#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
20#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
22#include <so_5/details/rollback_on_exception.hpp>
44template<
typename Work_Thread >
59 Work_Thread & work_thread,
60 std::atomic< std::size_t > & agents_bound )
69 const common_data_t< work_thread::work_thread_no_activity_tracking_t > & )
75 const common_data_t< work_thread::work_thread_with_activity_tracking_t > & data )
77 so_5::send< stats::messages::work_thread_activity >(
80 stats::suffixes::work_thread_activity(),
81 data.m_work_thread.thread_id(),
82 data.m_work_thread.take_activity_stats() );
88
89
90template<
typename Work_Thread >
91class data_source_t
final
96 using actual_work_thread_type_t = Work_Thread;
99 actual_work_thread_type_t & work_thread,
100 std::atomic< std::size_t > & agents_bound,
101 const std::string_view name_base,
102 const void * pointer_to_disp )
103 : data_source_details::common_data_t< Work_Thread >{
104 work_thread, agents_bound }
107 using namespace so_5::disp::reuse;
109 this->m_base_prefix = make_disp_prefix(
114 this->m_work_thread_prefix = make_disp_working_thread_prefix(
122 so_5::send< stats::messages::quantity< std::size_t > >(
125 stats::suffixes::agent_count(),
126 this->m_agents_bound.load( std::memory_order_acquire ) );
128 so_5::send< stats::messages::quantity< std::size_t > >(
130 this->m_work_thread_prefix,
131 stats::suffixes::work_thread_queue_size(),
132 this->m_work_thread.demands_count() );
134 data_source_details::track_activity( mbox, *
this );
143
144
145
146
147
148
149
150
151
152
153template<
typename Work_Thread >
159 const std::string_view name_base,
162 acquire_work_thread( params, env.get() ),
163 params.queue_params().lock_factory() }
165 outliving_mutable(env.get().stats_repository()),
171 m_work_thread.start();
176 m_work_thread.shutdown();
177 m_work_thread.wait();
197 agent_t & agent )
noexcept override
199 agent.so_bind_to_dispatcher( *(m_work_thread.get_agent_binding()) );
215
216
217
218
219
223
224
225
226
227
239 make( disp_binder_shptr_t binder )
noexcept
241 return { std::move( binder ) };
253 const std::string_view data_sources_name_base,
258 using dispatcher_no_activity_tracking_t =
259 impl::actual_dispatcher_t<
262 using dispatcher_with_activity_tracking_t =
263 impl::actual_dispatcher_t<
264 work_thread::work_thread_with_activity_tracking_t >;
266 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
268 dispatcher_no_activity_tracking_t,
269 dispatcher_with_activity_tracking_t >(
270 outliving_mutable(env),
271 data_sources_name_base,
274 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
Alias for namespace with traits of event queue.
A handle for one_thread dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
virtual void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
virtual void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
Work_Thread m_work_thread
Working thread for the dispatcher.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
stats::auto_registered_source_holder_t< data_source_t< Work_Thread > > m_data_source
Data source for run-time monitoring.
~actual_dispatcher_t() noexcept override
std::atomic< std::size_t > m_agents_bound
Count of agents bound to this dispatcher.
actual_dispatcher_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
data_source_t(actual_work_thread_type_t &work_thread, std::atomic< std::size_t > &agents_bound, const std::string_view name_base, const void *pointer_to_disp)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
Interface for dispatcher binders.
Helper class for indication of long-lived reference via its type.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
void track_activity(const mbox_t &, const common_data_t< work_thread::work_thread_no_activity_tracking_t > &)
void track_activity(const mbox_t &mbox, const common_data_t< work_thread::work_thread_with_activity_tracking_t > &data)
Implementation details for dispatcher with single working thread.
Dispatcher with single working thread.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
stats::prefix_t m_work_thread_prefix
Prefix for working thread-related data.
Work_Thread & m_work_thread
Working thread of the dispatcher.
common_data_t(Work_Thread &work_thread, std::atomic< std::size_t > &agents_bound)
stats::prefix_t m_base_prefix
Prefix for dispatcher-related data.
std::atomic< std::size_t > & m_agents_bound
Count of agents bound to the dispatcher.