2
3
6
7
8
9
10
11
15#include <so_5/atomic_refcounted.hpp>
17#include <so_5/send_functions.hpp>
19#include <so_5/stats/repository.hpp>
20#include <so_5/stats/messages.hpp>
21#include <so_5/stats/std_names.hpp>
23#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
36
37
38
39
40
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
91
92
93
94
95
96using queue_description_holder_ref_t =
100
101
102
103
104
105inline queue_description_holder_ref_t
109 std::size_t agent_count )
111 queue_description_holder_ref_t result(
new queue_description_holder_t() );
113 std::ostringstream ss;
114 ss << prefix.c_str() <<
"/cq/" << coop_id;
116 result->m_desc.m_prefix = stats::prefix_t{ ss.str() };
117 result->m_desc.m_agent_count = agent_count;
118 result->m_desc.m_queue_size = 0;
124
125
126
127
128
129
130
131inline queue_description_holder_ref_t
136 queue_description_holder_ref_t result(
new queue_description_holder_t() );
138 std::ostringstream ss;
139 ss << prefix.c_str() <<
"/aq/"
140 << so_5::disp::reuse::ios_helpers::pointer{ agent };
142 result->m_desc.m_prefix = stats::prefix_t{ ss.str() };
143 result->m_desc.m_agent_count = 1;
144 result->m_desc.m_queue_size = 0;
150
151
152
153
154
155
173
174
175
176
177
178
182 const so_5::current_thread_id_t & thread_id,
188
189
190
191
192
193
194
195
208
209
210
211
212
229 const std::string_view disp_type,
231 const std::string_view name_basic,
234 const void * disp_pointer )
236 m_prefix = make_disp_prefix( disp_type, name_basic, disp_pointer );
242 const mbox_t & mbox )
override
249 so_5::send< stats::messages::quantity< std::size_t > >(
252 stats::suffixes::disp_thread_count(),
253 collector.thread_count() );
255 so_5::send< stats::messages::quantity< std::size_t > >(
258 stats::suffixes::agent_count(),
259 collector.agent_count() );
261 collector.for_each_thread_activity(
262 [
this, &mbox](
const so_5::current_thread_id_t & thread_id,
264 so_5::send< stats::messages::work_thread_activity >(
266 make_work_thread_prefix( thread_id ),
267 stats::suffixes::work_thread_activity(),
272 collector.for_each_queue(
274 so_5::send< stats::messages::quantity< std::size_t > >(
277 stats::suffixes::agent_count(),
278 queue.m_agent_count );
280 so_5::send< stats::messages::quantity< std::size_t > >(
283 stats::suffixes::work_thread_queue_size(),
284 queue.m_queue_size );
297
298
299
300
307 const so_5::current_thread_id_t & thread_id,
315
316
317
318
329
330
331
332
333
334
335
336
337
340#if defined(__clang__
)
341#pragma clang diagnostic push
342#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
351 wt_activity_info_container_t & wt_activity_holder )
355 m_wt_activity.clear();
361 auto holder = m_queue_desc_head;
362 m_queue_desc_head.reset();
366 auto current = holder;
367 holder = holder->m_next;
368 current->m_next.reset();
371 m_queue_desc_tail.reset();
376 std::size_t thread_count )
override
378 m_thread_count = thread_count;
385 m_agent_count += info->m_desc.m_agent_count;
387 if( m_queue_desc_tail )
389 m_queue_desc_tail->m_next = info;
390 m_queue_desc_tail = info;
394 m_queue_desc_head = info;
395 m_queue_desc_tail = info;
401 const so_5::current_thread_id_t & thread_id,
405 m_wt_activity.emplace_back( thread_id, stats );
411 return m_thread_count;
417 return m_agent_count;
420 template<
typename Lambda >
434 template<
typename Lambda >
453#if defined(__clang__
)
454#pragma clang diagnostic pop
460 std::ostringstream ss;
461 ss << m_prefix <<
"/wt-"
462 << so_5::raw_id_from_current_thread_id( tid );
464 return stats::prefix_t{ ss.str() };
#define SO_5_CHECK_INVARIANT(what, data)
The base class for the object with a reference counting.
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
fifo_t query_fifo() const
Get FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t()
Default constructor.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t() noexcept=default
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
An actual interface of thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
disp_binder_shptr_t binder(bind_params_t params) override
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms) override
Preallocate all necessary resources for a new agent.
~actual_dispatcher_implementation_t() noexcept override
execution_demand_t peek_front()
Get the information about the front demand.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void push(execution_demand_t demand) override
Push next demand to queue.
std::size_t size() const noexcept
Get the current size of the queue.
bool empty() const
Is empty queue?
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
static constexpr const unsigned int thread_safe_worker
spinlock_t m_lock
Object's lock.
bool active() const
Is active queue?
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &)
Constructor.
unsigned int m_workers
Count of active workers.
demand_t * m_tail_demand
Tail of the demand's queue.
bool worker_finished(unsigned int type_of_worker)
Signal about finishing of worker of the specified type.
void push_evt_finish(execution_demand_t demand) noexcept override
std::atomic< std::size_t > m_size
Current size of the queue.
~agent_queue_t() override
static constexpr const unsigned int not_thread_safe_worker
bool is_there_not_thread_safe_worker() const
Check the presence of thread unsafe worker.
void delete_head() noexcept
Helper method for deleting queue's head object.
bool worker_started(unsigned int type_of_worker)
Remove the front demand.
bool m_active
Is this queue activated?
demand_t m_head_demand
Head of the demand's queue.
spinlock_t & lock() noexcept
Access to the queue's lock.
void push_evt_start(execution_demand_t demand) override
dispatcher_queue_t & m_disp_queue
bool is_there_any_worker() const
Check the presence of any worker at the moment.
The very basic interface of adv_thread_pool dispatcher.
virtual ~basic_dispatcher_iface_t() noexcept=default
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Part of implementation of work thread without activity tracing.
no_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
void take_activity_stats(L)
Part of implementation of work thread with activity tracing.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
activity_tracking_traits::lock_t m_stats_lock
Lock for activity statistics.
void take_activity_stats(L lambda)
with_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
void process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
void start()
Launch work thread.
work_thread_template_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
void body()
Thread body method.
An interface for somethine like condition variable for waiting on MPMC queue lock.
Container for storing parameters for MPMC queue.
Multi-producer/Multi-consumer queue of pointers to event queues.
T * m_tail
The current tail of the intrusive queue.
T * try_switch_to_another(T *current) noexcept
Switch the current non-empty queue to another one if it is possible.
void try_wakeup_someone_if_possible() noexcept
An attempt to wakeup another sleeping thread if it's necessary and possible.
const std::size_t m_max_thread_count
Maximum count of working threads to be used with that mpmc_queue.
so_5::disp::mpmc_queue_traits::lock_unique_ptr_t m_lock
Object's lock.
std::size_t m_queue_size
The current size of the intrusive queue.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
T * m_head
The current head of the intrusive queue.
queue_of_queues_t(const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params, std::size_t thread_count)
void shutdown() noexcept
Initiate shutdown for working threads.
void pop_and_notify_one_waiting_customer() noexcept
bool m_shutdown
Shutdown flag.
const std::size_t m_next_thread_wakeup_threshold
Threshold for wake up next working thread if there are non-empty agent queues.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t allocate_condition()
T * pop(so_5::disp::mpmc_queue_traits::condition_t &condition) noexcept
Get next active queue.
void push_to_queue(T *new_tail) noexcept
Helper method that pushes a new item to the end of the queue.
bool m_wakeup_in_progress
Is some working thread in wakeup process now?
T * pop_head() noexcept
Helper method that extracts the head item from the queue.
std::vector< so_5::disp::mpmc_queue_traits::condition_t * > m_waiting_customers
Waiting threads.
Actual type of statical information collector.
std::size_t m_agent_count
void for_each_thread_activity(Lambda lambda) const
std::size_t m_thread_count
std::size_t thread_count() const
void for_each_queue(Lambda lambda) const
std::size_t agent_count() const
intrusive_ptr_t< queue_description_holder_t > m_queue_desc_head
collector_t(wt_activity_info_container_t &wt_activity_holder)
virtual void set_thread_count(std::size_t thread_count) override
Informs consumer about actual actual thread count.
wt_activity_info_container_t & m_wt_activity
virtual void add_work_thread_activity(const so_5::current_thread_id_t &thread_id, const stats::work_thread_activity_stats_t &stats) override
Informs consumer about yet another working thread activity.
virtual void add_queue(const intrusive_ptr_t< queue_description_holder_t > &info) override
Informs counsumer about yet another event queue.
intrusive_ptr_t< queue_description_holder_t > m_queue_desc_tail
Type of data source for thread-pool-like dispatchers.
wt_activity_info_container_t m_wt_activity
Container for collecting work activity stats from working threads.
data_source_t(stats_supplier_t &supplier)
Initializing constructor.
const stats::prefix_t & prefix() const
Basic prefix for data source names.
stats::prefix_t make_work_thread_prefix(const so_5::current_thread_id_t &tid)
stats::prefix_t m_prefix
Prefix for data-source names.
void distribute(const mbox_t &mbox) override
Distribution of statistical information.
void set_data_sources_name_base(const std::string_view disp_type, const std::string_view name_basic, const void *disp_pointer)
Setting of data-source basic name.
stats_supplier_t & m_supplier
Statistical information supplier.
An interface of collector of information about thread-pool-like dispatcher state.
virtual void add_work_thread_activity(const so_5::current_thread_id_t &thread_id, const so_5::stats::work_thread_activity_stats_t &stats)=0
Informs consumer about yet another working thread activity.
virtual void set_thread_count(std::size_t value)=0
Informs consumer about actual actual thread count.
virtual void add_queue(const intrusive_ptr_t< queue_description_holder_t > &queue_desc)=0
Informs counsumer about yet another event queue.
An interface of supplier of information about thread-pool-like dispatcher state.
virtual void supply(stats_consumer_t &consumer)=0
virtual ~stats_supplier_t()
Mixin with thread activity tracking flag.
Mixin that holds optional work thread factory.
void undo_preallocation_for_agent(agent_t &agent) noexcept
Undo preallocation of resources for a new agent.
void preallocate_resources_for_agent(agent_t &agent, const Bind_Params ¶ms)
Preallocate all necessary resources for a new agent.
void bind_agent_with_cooperation_fifo(agent_ref_t agent, const Bind_Params ¶ms)
Creation event queue for an agent with individual FIFO.
void bind_agent_with_inidividual_fifo(agent_ref_t agent, const Bind_Params ¶ms)
Creation event queue for an agent with individual FIFO.
const std::size_t m_thread_count
Count of working threads.
virtual void supply(tp_stats::stats_consumer_t &consumer) override
Implementation of stats_supplier-related stuff.
agent_map_t m_agents
Information of agents.
tp_stats::stats_supplier_t & stats_supplier()
Helper method for casting to stats_supplier-object.
void shutdown_then_wait() noexcept
Dispatcher_Queue m_queue
Queue for active agent's queues.
void start(environment_t &env)
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept
Get resources allocated for an agent.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Pool of work threads.
void unbind_agent(agent_t &agent) noexcept
Unbind agent from the dispatcher.
dispatcher_t(const dispatcher_t &)=delete
dispatcher_t(environment_t &env, const so_5::disp::reuse::work_thread_factory_mixin_t< Dispatcher_Params > &disp_params, const std::string_view name_base, std::size_t thread_count, const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params)
Constructor.
agent_queue_ref_t make_new_agent_queue(const Bind_Params ¶ms)
Helper method for creating event queue for agents/cooperations.
std::mutex m_lock
Object's lock.
dispatcher_t & operator=(const dispatcher_t &)=delete
cooperation_map_t m_cooperations
Information about cooperations.
stats::manually_registered_source_holder_t< tp_stats::data_source_t > m_data_source
Data source for the run-time monitoring.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
An addition to auto_registered_source_holder for the cases where manual registration of data_source s...
A type for storing prefix of data_source name.
An interface of data source.
void adjust_thread_count(disp_params_t ¶ms)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Helper tools for implementation of run-time monitoring for thread-pool-like dispatchers.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, coop_id_t coop_id, std::size_t agent_count)
Helper function for creating queue_description_holder object.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, const void *agent)
Helper function for creating queue_description_holder object.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t ¶ms) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &) noexcept
Actual demand in event queue.
demand_t * m_next
Next item in queue.
execution_demand_t m_demand
Actual demand.
demand_t(execution_demand_t &&original)
Main data for work_thread.
common_data_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
work_thread_holder_t m_thread_holder
Actual thread.
so_5::current_thread_id_t m_thread_id
ID of thread.
dispatcher_queue_t * m_disp_queue
Dispatcher's queue.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
Activity stats for a particular work thread.
so_5::current_thread_id_t m_thread_id
wt_activity_info_t(const so_5::current_thread_id_t &thread_id, const stats::work_thread_activity_stats_t &stats)
stats::work_thread_activity_stats_t m_stats
A holder of one event queue information block.
intrusive_ptr_t< queue_description_holder_t > m_next
Next item in the chain of queues descriptions.
queue_description_t m_desc
Actual description for the event queue.
Description of one event queue.
stats::prefix_t m_prefix
Prefix for data-sources related to that queue.
std::size_t m_agent_count
Count of agents bound to that queue.
std::size_t m_queue_size
Current queue size.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
void update_queue_stats()
Update queue description with current information.
bool cooperation_fifo() const
Does agent use cooperation FIFO?
agent_data_t(agent_queue_ref_t queue, const stats::prefix_t &data_source_name_prefix, const agent_t *agent_ptr)
Constructor for the case when agent uses individual FIFO.
agent_queue_ref_t m_queue
Event queue for the agent.
agent_data_t(agent_queue_ref_t queue)
Constructor for the case when agent uses cooperation FIFO.
Data for one cooperation.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
agent_queue_ref_t m_queue
Event queue for the cooperation.
cooperation_data_t(agent_queue_ref_t queue, std::size_t agents, const stats::prefix_t &data_source_name_prefix, coop_id_t coop_id)
void update_queue_stats()
Update queue information for run-time monitoring.
std::size_t m_agents
Count of agents form that cooperation.
A description of event execution demand.
Various traits of activity tracking implementation.
Stats for a work thread activity.