2
3
6
7
8
9
10
14#include <so_5/event_queue.hpp>
16#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
17#include <so_5/disp/reuse/queue_of_queues.hpp>
18#include <so_5/disp/reuse/thread_pool_stats.hpp>
20#include <so_5/details/rollback_on_exception.hpp>
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
59 typename Dispatcher_Queue,
61 typename Adaptations >
62class dispatcher_t
final
67 using agent_queue_t =
typename Dispatcher_Queue::item_t;
79
80
81
85
86
87
88
92 agent_queue_ref_t queue,
94 const stats::
prefix_t & data_source_name_prefix,
100 data_source_name_prefix
,
106
107
108
109
126
127
128
132
133
134
135
136
137
138
139
144 agent_queue_ref_t queue )
145 :
m_queue( std::move( queue ) )
150
151
153 agent_queue_ref_t queue,
154 const stats::
prefix_t & data_source_name_prefix,
156 :
m_queue( std::move( queue ) )
159 data_source_name_prefix
,
164
165
166
167
176
177
178
179
180
181
195 dispatcher_t &
operator=(
const dispatcher_t & ) =
delete;
198 template<
typename Dispatcher_Params >
203 const std::string_view name_base,
204 std::size_t thread_count,
206 :
m_queue{ queue_params, thread_count }
216 auto work_thread_holder = acquire_work_thread(
219 m_threads.emplace_back( std::unique_ptr< Work_Thread >(
222 std::move(work_thread_holder)
227 Adaptations::dispatcher_type_name()
,
257 const Bind_Params & params )
259 std::lock_guard< std::mutex > lock{
m_lock };
261 if( Adaptations::is_individual_fifo( params ) )
263 agent_ref_t
{ &agent
}, params
);
266 agent_ref_t
{ &agent
}, params
);
274 std::lock_guard< std::mutex > lock{
m_lock };
279 if( it->second.cooperation_fifo() )
284 0 == --(it_coop->second.m_agents) )
288 Adaptations::wait_for_queue_emptyness(
289 *(it_coop->second.m_queue) );
297 Adaptations::wait_for_queue_emptyness(
298 *(it->second.m_queue) );
309 std::lock_guard< std::mutex > lock{
m_lock };
312 if( it->second.cooperation_fifo() )
314 second.m_queue.get();
316 return it->second.m_queue.get();
334 std::vector< std::unique_ptr< Work_Thread > >
m_threads;
341
342
343
350
351
352
353
354
362 const Bind_Params & params )
376
377
378
382 const Bind_Params & params )
397 it->second.m_agents += 1;
407 if( 0 == --(it->second.m_agents) )
415 const Bind_Params & params )
417 return agent_queue_ref_t(
418 new agent_queue_t{ outliving_mutable(
m_queue), params } );
422
423
424
425
426
434
435
436
437
438
443 std::lock_guard< std::mutex > lock{
m_lock };
451 Work_Thread & wt = *t;
452 wt.take_activity_stats(
453 [&wt, &consumer](
const stats_t & st ) {
461 s.update_queue_stats();
468 if( !s.cooperation_fifo() )
470 s.update_queue_stats();
#define SO_5_CHECK_INVARIANT(what, data)
static execution_hint_t so_create_execution_hint(execution_demand_t &demand)
Create execution hint for the specified demand.
coop_handle_t so_coop() const
Get a handle of agent's coop.
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
The base class for the object with a reference counting.
auto id() const noexcept
Get the ID of the coop.
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
fifo_t query_fifo() const
Get FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t()
Default constructor.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &¶ms_setter) const
Create a binder for that dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t() noexcept=default
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void reset() noexcept
Drop the content of handle.
operator bool() const noexcept
Is this handle empty?
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
An actual interface of thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
disp_binder_shptr_t binder(bind_params_t params) override
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t ¶ms) override
Preallocate all necessary resources for a new agent.
~actual_dispatcher_implementation_t() noexcept override
execution_demand_t peek_front()
Get the information about the front demand.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void push(execution_demand_t demand) override
Push next demand to queue.
std::size_t size() const noexcept
Get the current size of the queue.
bool empty() const
Is empty queue?
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
static constexpr const unsigned int thread_safe_worker
spinlock_t m_lock
Object's lock.
bool active() const
Is active queue?
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &)
Constructor.
unsigned int m_workers
Count of active workers.
demand_t * m_tail_demand
Tail of the demand's queue.
bool worker_finished(unsigned int type_of_worker)
Signal about finishing of worker of the specified type.
void push_evt_finish(execution_demand_t demand) noexcept override
std::atomic< std::size_t > m_size
Current size of the queue.
~agent_queue_t() override
static constexpr const unsigned int not_thread_safe_worker
bool is_there_not_thread_safe_worker() const
Check the presence of thread unsafe worker.
void delete_head() noexcept
Helper method for deleting queue's head object.
bool worker_started(unsigned int type_of_worker)
Remove the front demand.
bool m_active
Is this queue activated?
demand_t m_head_demand
Head of the demand's queue.
spinlock_t & lock() noexcept
Access to the queue's lock.
void push_evt_start(execution_demand_t demand) override
dispatcher_queue_t & m_disp_queue
bool is_there_any_worker() const
Check the presence of any worker at the moment.
The very basic interface of adv_thread_pool dispatcher.
virtual ~basic_dispatcher_iface_t() noexcept=default
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Part of implementation of work thread without activity tracing.
no_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
void take_activity_stats(L)
Part of implementation of work thread with activity tracing.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
activity_tracking_traits::lock_t m_stats_lock
Lock for activity statistics.
void take_activity_stats(L lambda)
with_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
void process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
void start()
Launch work thread.
work_thread_template_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
void body()
Thread body method.
An interface for somethine like condition variable for waiting on MPMC queue lock.
virtual void notify() noexcept=0
Notification for waiting customer.
virtual void wait() noexcept=0
Waiting on condition.
An interface for lock for MPMC queue.
virtual condition_unique_ptr_t allocate_condition()=0
Create condition object for another MPMC queue's customer.
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
std::size_t next_thread_wakeup_threshold() const
Getter for thread wakeup threshold value.
const lock_factory_t & lock_factory() const
Getter for lock factory.
Multi-producer/Multi-consumer queue of pointers to event queues.
T * m_tail
The current tail of the intrusive queue.
T * try_switch_to_another(T *current) noexcept
Switch the current non-empty queue to another one if it is possible.
void try_wakeup_someone_if_possible() noexcept
An attempt to wakeup another sleeping thread if it's necessary and possible.
const std::size_t m_max_thread_count
Maximum count of working threads to be used with that mpmc_queue.
so_5::disp::mpmc_queue_traits::lock_unique_ptr_t m_lock
Object's lock.
std::size_t m_queue_size
The current size of the intrusive queue.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
T * m_head
The current head of the intrusive queue.
queue_of_queues_t(const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params, std::size_t thread_count)
void shutdown() noexcept
Initiate shutdown for working threads.
void pop_and_notify_one_waiting_customer() noexcept
bool m_shutdown
Shutdown flag.
const std::size_t m_next_thread_wakeup_threshold
Threshold for wake up next working thread if there are non-empty agent queues.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t allocate_condition()
T * pop(so_5::disp::mpmc_queue_traits::condition_t &condition) noexcept
Get next active queue.
void push_to_queue(T *new_tail) noexcept
Helper method that pushes a new item to the end of the queue.
bool m_wakeup_in_progress
Is some working thread in wakeup process now?
T * pop_head() noexcept
Helper method that extracts the head item from the queue.
std::vector< so_5::disp::mpmc_queue_traits::condition_t * > m_waiting_customers
Waiting threads.
Type of data source for thread-pool-like dispatchers.
const stats::prefix_t & prefix() const
Basic prefix for data source names.
void set_data_sources_name_base(const std::string_view disp_type, const std::string_view name_basic, const void *disp_pointer)
Setting of data-source basic name.
An interface of collector of information about thread-pool-like dispatcher state.
virtual void add_work_thread_activity(const so_5::current_thread_id_t &thread_id, const so_5::stats::work_thread_activity_stats_t &stats)=0
Informs consumer about yet another working thread activity.
virtual void set_thread_count(std::size_t value)=0
Informs consumer about actual actual thread count.
virtual void add_queue(const intrusive_ptr_t< queue_description_holder_t > &queue_desc)=0
Informs counsumer about yet another event queue.
An interface of supplier of information about thread-pool-like dispatcher state.
Mixin with thread activity tracking flag.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
void undo_preallocation_for_agent(agent_t &agent) noexcept
Undo preallocation of resources for a new agent.
void preallocate_resources_for_agent(agent_t &agent, const Bind_Params ¶ms)
Preallocate all necessary resources for a new agent.
void bind_agent_with_cooperation_fifo(agent_ref_t agent, const Bind_Params ¶ms)
Creation event queue for an agent with individual FIFO.
void bind_agent_with_inidividual_fifo(agent_ref_t agent, const Bind_Params ¶ms)
Creation event queue for an agent with individual FIFO.
const std::size_t m_thread_count
Count of working threads.
virtual void supply(tp_stats::stats_consumer_t &consumer) override
Implementation of stats_supplier-related stuff.
agent_map_t m_agents
Information of agents.
tp_stats::stats_supplier_t & stats_supplier()
Helper method for casting to stats_supplier-object.
void shutdown_then_wait() noexcept
Dispatcher_Queue m_queue
Queue for active agent's queues.
void start(environment_t &env)
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept
Get resources allocated for an agent.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Pool of work threads.
void unbind_agent(agent_t &agent) noexcept
Unbind agent from the dispatcher.
dispatcher_t(const dispatcher_t &)=delete
dispatcher_t(environment_t &env, const so_5::disp::reuse::work_thread_factory_mixin_t< Dispatcher_Params > &disp_params, const std::string_view name_base, std::size_t thread_count, const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params)
Constructor.
agent_queue_ref_t make_new_agent_queue(const Bind_Params ¶ms)
Helper method for creating event queue for agents/cooperations.
std::mutex m_lock
Object's lock.
dispatcher_t & operator=(const dispatcher_t &)=delete
cooperation_map_t m_cooperations
Information about cooperations.
stats::manually_registered_source_holder_t< tp_stats::data_source_t > m_data_source
Data source for the run-time monitoring.
An analog of unique_ptr for abstract_work_thread.
work_thread_holder_t(work_thread_holder_t &&o) noexcept
Interface for dispatcher binders.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
An interface of event queue for agent.
bool is_thread_safe() const
Is thread safe handler?
void exec(current_thread_id_t working_thread_id) const
Call event handler directly.
Template class for smart reference wrapper on the atomic_refcounted_t.
intrusive_ptr_t(T *obj) noexcept
Constructor for a raw pointer.
T * operator->() const noexcept
Helper class for indication of long-lived reference via its type.
Base for the case of externals stats lock.
Helper for collecting activity stats.
so_5::stats::activity_stats_t take_stats()
An addition to auto_registered_source_holder for the cases where manual registration of data_source s...
Data_Source & get() noexcept
void start(outliving_reference_t< repository_t > repo)
A type for storing prefix of data_source name.
Some reusable and low-level classes/functions which can be used in public header files.
void adjust_thread_count(disp_params_t ¶ms)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Helper tools for implementation of run-time monitoring for thread-pool-like dispatchers.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, coop_id_t coop_id, std::size_t agent_count)
Helper function for creating queue_description_holder object.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, const void *agent)
Helper function for creating queue_description_holder object.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Details of SObjectizer run-time implementations.
void ensure_join_from_different_thread(current_thread_id_t thread_to_be_joined)
Ensures that join will be called from different thread.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
current_thread_id_t query_current_thread_id()
Get the ID of the current thread.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t ¶ms) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &) noexcept
Actual demand in event queue.
demand_t * m_next
Next item in queue.
execution_demand_t m_demand
Actual demand.
demand_t(execution_demand_t &&original)
Main data for work_thread.
common_data_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
work_thread_holder_t m_thread_holder
Actual thread.
so_5::current_thread_id_t m_thread_id
ID of thread.
dispatcher_queue_t * m_disp_queue
Dispatcher's queue.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
queue_description_t m_desc
Actual description for the event queue.
std::size_t m_agent_count
Count of agents bound to that queue.
std::size_t m_queue_size
Current queue size.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
void update_queue_stats()
Update queue description with current information.
bool cooperation_fifo() const
Does agent use cooperation FIFO?
agent_data_t(agent_queue_ref_t queue, const stats::prefix_t &data_source_name_prefix, const agent_t *agent_ptr)
Constructor for the case when agent uses individual FIFO.
agent_queue_ref_t m_queue
Event queue for the agent.
agent_data_t(agent_queue_ref_t queue)
Constructor for the case when agent uses cooperation FIFO.
Data for one cooperation.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
agent_queue_ref_t m_queue
Event queue for the cooperation.
cooperation_data_t(agent_queue_ref_t queue, std::size_t agents, const stats::prefix_t &data_source_name_prefix, coop_id_t coop_id)
void update_queue_stats()
Update queue information for run-time monitoring.
std::size_t m_agents
Count of agents form that cooperation.
A description of event execution demand.
agent_t * m_receiver
Receiver of demand.
Various traits of activity tracking implementation.
Stats for a work thread activity.
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.