2
3
5#include <so_5/disp/active_obj/pub.hpp>
7#include <so_5/event_queue.hpp>
8#include <so_5/send_functions.hpp>
10#include <so_5/details/rollback_on_exception.hpp>
12#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
13#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
14#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
18#include <so_5/stats/repository.hpp>
19#include <so_5/stats/messages.hpp>
20#include <so_5/stats/std_names.hpp>
45
46
47
48
57template<
class Work_Thread >
60 const so_5::mbox_t & mbox,
64 so_5::send< stats::messages::quantity< std::size_t > >(
67 stats::suffixes::work_thread_queue_size(),
75 work_thread::work_thread_no_activity_tracking_t & )
82 const so_5::mbox_t & mbox,
84 work_thread::work_thread_with_activity_tracking_t & wt )
86 so_5::send< stats::messages::work_thread_activity >(
89 stats::suffixes::work_thread_activity(),
91 wt.take_activity_stats() );
101
102
103template<
typename Work_Thread >
111 const std::string_view name_base,
115 , m_params{ std::move(params) }
117 outliving_mutable(m_env.get().stats_repository()),
119 outliving_mutable( *
this )
126 for(
auto & p: m_agent_threads )
127 p.second->shutdown();
130 for(
auto & p: m_agent_threads )
138 std::lock_guard< std::mutex > lock{ m_lock };
140 if( m_agent_threads.end() != m_agent_threads.find( &agent ) )
142 rc_disp_create_failed,
143 "thread for the agent is already exists" );
145 auto lock_factory = m_params.queue_params().lock_factory();
146 auto thread = std::make_shared< Work_Thread >(
147 acquire_work_thread( m_params, m_env.get() ),
148 std::move(lock_factory) );
151 so_5::details::do_with_rollback_on_exception(
152 [&] { m_agent_threads[ &agent ] = thread; },
153 [&thread] { shutdown_and_wait( *thread ); } );
158 agent_t & agent )
noexcept override
160 const auto eject_thread = [&] {
161 std::lock_guard< std::mutex > lock{ m_lock };
163 auto it = m_agent_threads.find( &agent );
164 auto thread = it->second;
165 m_agent_threads.erase( it );
170 shutdown_and_wait( *eject_thread() );
175 agent_t & agent )
noexcept override
177 const auto get_queue = [&] {
178 std::lock_guard< std::mutex > lock{ m_lock };
179 return m_agent_threads.find( &agent )->second->get_agent_binding();
182 agent.so_bind_to_dispatcher( *get_queue() );
187 agent_t & agent )
noexcept override
190 undo_preallocation( agent );
194 friend class disp_data_source_t;
197 using work_thread_shptr_t = std::shared_ptr< Work_Thread >;
200 using agent_thread_map_t =
201 std::map<
const agent_t *, work_thread_shptr_t >;
204
205
206
207
208 class disp_data_source_t
final :
public stats::
source_t
218 const std::string_view name_base,
220 : m_dispatcher{ disp }
222 using namespace so_5::disp::reuse;
224 m_base_prefix = make_disp_prefix(
227 &(m_dispatcher.get()) );
233 auto & disp = m_dispatcher.get();
235 std::lock_guard< std::mutex > lock{ disp.m_lock };
237 so_5::send< stats::messages::quantity< std::size_t > >(
240 stats::suffixes::agent_count(),
241 disp.m_agent_threads.size() );
243 for(
const auto & [a, wt] : disp.m_agent_threads )
244 distribute_value_for_work_thread( mbox, a, *wt );
254 std::ostringstream ss;
255 ss << m_base_prefix.c_str() <<
"/wt-"
256 << so_5::disp::reuse::ios_helpers::pointer{ agent };
258 const stats::prefix_t wt_prefix{ ss.str() };
260 send_demands_count_stats( mbox, wt_prefix, wt );
261 send_thread_activity_stats( mbox, wt_prefix, wt );
266
267
268
269
270
271
275
276
277
278
279
289
290
291
292
293
305 make( disp_binder_shptr_t binder )
noexcept
307 return { std::move( binder ) };
320 const std::string_view data_sources_name_base,
325 using dispatcher_no_activity_tracking_t =
326 impl::dispatcher_template_t<
329 using dispatcher_with_activity_tracking_t =
330 impl::dispatcher_template_t<
331 work_thread::work_thread_with_activity_tracking_t >;
333 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
335 dispatcher_no_activity_tracking_t,
336 dispatcher_with_activity_tracking_t >(
337 outliving_mutable(env),
338 data_sources_name_base,
341 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
Alias for namespace with traits of event queue.
A handle for active_obj dispatcher.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
stats::prefix_t m_base_prefix
Basic prefix for data source names.
void distribute_value_for_work_thread(const mbox_t &mbox, const agent_t *agent, Work_Thread &wt)
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
outliving_reference_t< environment_t > m_env
SObjectizer Environment to work in.
const disp_params_t m_params
Parameters for the dispatcher.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
~dispatcher_template_t() noexcept override
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
agent_thread_map_t m_agent_threads
A map from agents to single thread dispatchers.
std::mutex m_lock
This object lock.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
Interface for dispatcher binders.
Helper class for indication of long-lived reference via its type.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
#define SO_5_THROW_EXCEPTION(error_code, desc)
void shutdown_and_wait(T &w)
Just a helper function for consequetive call to shutdown and wait.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
void send_demands_count_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, Work_Thread &wt)
Active objects dispatcher implemetation details.
Active objects dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_obj dispatcher.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.