2
3
6
7
8
9
10
11
12
14#include <so_5/disp/prio_dedicated_threads/one_per_prio/pub.hpp>
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
18#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
19#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
20#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
22#include <so_5/stats/repository.hpp>
23#include <so_5/stats/messages.hpp>
24#include <so_5/stats/std_names.hpp>
26#include <so_5/send_functions.hpp>
28#include <so_5/details/invoke_noexcept_code.hpp>
57 const so_5::mbox_t & mbox,
61 so_5::send< stats::messages::work_thread_activity >(
64 stats::suffixes::work_thread_activity(),
66 wt.take_activity_stats() );
75
76
77
78
79
80
81template<
typename Work_Thread >
87 const std::string_view name_base,
124 agent_t & agent )
noexcept override
136 agent_t & agent )
noexcept override
144 friend class disp_data_source_t;
147
148
149
150
151
152 class disp_data_source_t
final :
public stats::
source_t
162 const std::string_view name_base,
164 : m_dispatcher{ disp }
165 , m_base_prefix{ so_5::disp::reuse::make_disp_prefix(
175 std::size_t agents_count = 0;
177 auto & disp = m_dispatcher.get();
179 so_5::prio::for_each_priority( [&]( priority_t p ) {
180 auto agents = disp.m_agents_per_priority[
181 to_size_t(p) ].load( std::memory_order_acquire );
183 agents_count += agents;
185 distribute_value_for_work_thread(
189 *(disp.m_threads[ to_size_t(p) ]) );
192 so_5::send< stats::messages::quantity< std::size_t > >(
195 stats::suffixes::agent_count(),
204 std::size_t agents_count,
207 std::ostringstream ss;
208 ss << m_base_prefix.c_str() <<
"/wt-p" << to_size_t(priority);
210 const stats::prefix_t prefix{ ss.str() };
212 so_5::send< stats::messages::quantity< std::size_t > >(
215 stats::suffixes::work_thread_queue_size(),
216 wt.demands_count() );
218 so_5::send< stats::messages::quantity< std::size_t > >(
221 stats::suffixes::agent_count(),
224 send_thread_activity_stats( mbox, prefix, wt );
308 make( disp_binder_shptr_t binder )
noexcept
310 return { std::move( binder ) };
322 const std::string_view data_sources_name_base,
327 using dispatcher_no_activity_tracking_t =
331 using dispatcher_with_activity_tracking_t =
333 work_thread::work_thread_with_activity_tracking_t >;
335 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
337 dispatcher_no_activity_tracking_t,
338 dispatcher_with_activity_tracking_t >(
339 outliving_mutable(env),
340 data_sources_name_base,
343 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
Alias for namespace with traits of event queue.
A handle for prio_dedicated_threads::one_per_prio dispatcher.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
stats::prefix_t m_base_prefix
Basic prefix for data sources.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_work_thread(const mbox_t &mbox, priority_t priority, std::size_t agents_count, Work_Thread &wt)
void launch_work_threads()
Start all working threads.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Working threads for every priority.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void allocate_work_threads(environment_t &env, const disp_params_t ¶ms)
Allocate work threads for dispatcher.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
~dispatcher_template_t() noexcept override
Interface for dispatcher binders.
Helper class for indication of long-lived reference via its type.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
Implementation details for dispatcher with one thread per priority.
Dispatcher which creates exactly one thread per priority.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_per_prio dispatcher.
Dispatchers with dedicated threads for every priority.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
priority_t
Definition of supported priorities.