SObjectizer 5.8
Loading...
Searching...
No Matches
common_implementation.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief Reusable common implementation for thread-pool-like dispatchers.
8 *
9 * \since v.5.5.4
10 */
11
12#pragma once
13
14#include <so_5/event_queue.hpp>
15
16#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
17#include <so_5/disp/reuse/queue_of_queues.hpp>
18#include <so_5/disp/reuse/thread_pool_stats.hpp>
19
20#include <so_5/details/rollback_on_exception.hpp>
21
22#include <mutex>
23
24namespace so_5 {
25
26namespace disp {
27
28namespace thread_pool {
29
30namespace common_implementation {
31
32namespace stats = so_5::stats;
33namespace tp_stats = so_5::disp::reuse::thread_pool_stats;
34
35//
36// dispatcher_t
37//
38/*!
39 * \brief Reusable common implementation for thread-pool-like dispatchers.
40 *
41 * \tparam Work_Thread type of worker thread to be used.
42 *
43 * \tparam Dispatcher_Queue type of dispatcher queue. A single instance of
44 * dispatcher queue will be used for scheduling separate agent queues.
45 * It'e expected to be so_5::disp::reuse::queue_params_t.
46 *
47 * \tparam Bind_Params type of bind_params_t with parameters for binding
48 * just one agent to the dispatcher.
49 *
50 * \tparam Adaptations type with static method. This method adapts the
51 * behavoir of Dispatcher_Queue (and corresponding agent queues) to the
52 * needs of the dispatcher. See so_5::disp::thread_pool::impl::adaptation_t
53 * or so_5::disp::adv_thread_pool::impl::adaptation_t as examples.
54 *
55 * \since v.5.5.4
56 */
57template<
58 typename Work_Thread,
59 typename Dispatcher_Queue,
60 typename Bind_Params,
61 typename Adaptations >
62class dispatcher_t final
63 : public tp_stats::stats_supplier_t
64 {
65 private :
66 //! A short alias for agent queue type.
67 using agent_queue_t = typename Dispatcher_Queue::item_t;
68
69 using agent_queue_ref_t = so_5::intrusive_ptr_t< agent_queue_t >;
70
71 //! Data for one cooperation.
73 {
74 //! Event queue for the cooperation.
75 agent_queue_ref_t m_queue;
76
77 //! Count of agents form that cooperation.
78 /*!
79 * When this counter is zero then cooperation data
80 * must be destroyed.
81 */
82 std::size_t m_agents;
83
84 /*!
85 * \brief Description of that queue for run-time monitoring.
86 *
87 * \since v.5.5.4
88 */
89 tp_stats::queue_description_holder_ref_t m_queue_desc;
90
92 agent_queue_ref_t queue,
93 std::size_t agents,
94 const stats::prefix_t & data_source_name_prefix,
95 coop_id_t coop_id )
96 : m_queue( std::move( queue ) )
97 , m_agents( agents )
100 data_source_name_prefix,
101 coop_id,
102 agents ) )
103 {}
104
105 /*!
106 * \brief Update queue information for run-time monitoring.
107 *
108 * \since v.5.5.4
109 */
110 void
116 };
117
118 //! Map from cooperation name to the cooperation data.
119 using cooperation_map_t = std::map< coop_id_t, cooperation_data_t >;
120
121 //! Data for one agent.
123 {
124 //! Event queue for the agent.
125 /*!
126 * It could be individual queue or queue for the whole
127 * cooperation (to which agent is belonging).
128 */
129 agent_queue_ref_t m_queue;
130
131 /*!
132 * \brief Description of that queue for run-time monitoring.
133 *
134 * \note This description is created only if agent
135 * uses individual FIFO.
136 *
137 * \since v.5.5.4
138 *
139 */
140 tp_stats::queue_description_holder_ref_t m_queue_desc;
141
142 //! Constructor for the case when agent uses cooperation FIFO.
144 agent_queue_ref_t queue )
145 : m_queue( std::move( queue ) )
146 {}
147
148 //! Constructor for the case when agent uses individual FIFO.
149 /*!
150 * In this case a queue_description object must be created.
151 */
153 agent_queue_ref_t queue,
154 const stats::prefix_t & data_source_name_prefix,
155 const agent_t * agent_ptr )
156 : m_queue( std::move( queue ) )
157 , m_queue_desc(
159 data_source_name_prefix,
160 agent_ptr ) )
161 {}
162
163 /*!
164 * \brief Does agent use cooperation FIFO?
165 *
166 * \since v.5.5.4
167 */
168 [[nodiscard]]
169 bool
171 {
172 return !m_queue_desc;
173 }
174
175 /*!
176 * \brief Update queue description with current information.
177 *
178 * \attention Must be called only if !cooperation_fifo().
179 *
180 * \since v.5.5.4
181 */
182 void
188 };
189
190 //! Map from agent pointer to the agent data.
191 using agent_map_t = std::map< agent_t *, agent_data_t >;
192
193 public :
194 dispatcher_t( const dispatcher_t & ) = delete;
195 dispatcher_t & operator=( const dispatcher_t & ) = delete;
196
197 //! Constructor.
198 template< typename Dispatcher_Params >
200 environment_t & env,
201 const so_5::disp::reuse::work_thread_factory_mixin_t< Dispatcher_Params >
202 & disp_params,
203 const std::string_view name_base,
204 std::size_t thread_count,
205 const so_5::disp::mpmc_queue_traits::queue_params_t & queue_params )
206 : m_queue{ queue_params, thread_count }
207 , m_thread_count( thread_count )
209 {
210 m_threads.reserve( thread_count );
211
212 for( std::size_t i = 0; i != m_thread_count; ++i )
213 {
214 // Since v.5.7.3 an instance of the actual work thread
215 // has to be acquired via factory.
216 auto work_thread_holder = acquire_work_thread(
217 disp_params, env );
218
219 m_threads.emplace_back( std::unique_ptr< Work_Thread >(
220 new Work_Thread{
221 outliving_mutable(m_queue),
222 std::move(work_thread_holder)
223 } ) );
224 }
225
227 Adaptations::dispatcher_type_name(),
228 name_base,
229 this );
230 }
231
232 void
234 {
237
238 for( auto & t : m_threads )
239 t->start();
240 }
241
242 void
244 {
245 m_queue.shutdown();
246
247 for( auto & t : m_threads )
248 t->join();
249
251 }
252
253 //! Preallocate all necessary resources for a new agent.
254 void
256 agent_t & agent,
257 const Bind_Params & params )
258 {
259 std::lock_guard< std::mutex > lock{ m_lock };
260
261 if( Adaptations::is_individual_fifo( params ) )
263 agent_ref_t{ &agent }, params );
264 else
266 agent_ref_t{ &agent }, params );
267 }
268
269 //! Undo preallocation of resources for a new agent.
270 void
272 agent_t & agent ) noexcept
273 {
274 std::lock_guard< std::mutex > lock{ m_lock };
275
276 auto it = m_agents.find( &agent );
277 if( it != m_agents.end() )
278 {
279 if( it->second.cooperation_fifo() )
280 {
281 auto it_coop = m_cooperations.find(
282 agent.so_coop().id() );
283 if( it_coop != m_cooperations.end() &&
284 0 == --(it_coop->second.m_agents) )
285 {
286 // agent_queue object can be destroyed
287 // only when it is empty.
288 Adaptations::wait_for_queue_emptyness(
289 *(it_coop->second.m_queue) );
290
291 m_cooperations.erase( it_coop );
292 }
293 }
294 else
295 // agent_queue object can be destroyed
296 // only when it is empty.
297 Adaptations::wait_for_queue_emptyness(
298 *(it->second.m_queue) );
299
300 m_agents.erase( it );
301 }
302 }
303
304 //! Get resources allocated for an agent.
305 [[nodiscard]]
308 {
309 std::lock_guard< std::mutex > lock{ m_lock };
310
311 auto it = m_agents.find( &agent );
312 if( it->second.cooperation_fifo() )
313 return m_cooperations.find( agent.so_coop().id() )->
314 second.m_queue.get();
315 else
316 return it->second.m_queue.get();
317 }
318
319 //! Unbind agent from the dispatcher.
320 void
321 unbind_agent( agent_t & agent ) noexcept
322 {
324 }
325
326 private :
327 //! Queue for active agent's queues.
328 Dispatcher_Queue m_queue;
329
330 //! Count of working threads.
331 const std::size_t m_thread_count;
332
333 //! Pool of work threads.
334 std::vector< std::unique_ptr< Work_Thread > > m_threads;
335
336 //! Object's lock.
337 std::mutex m_lock;
338
339 //! Information about cooperations.
340 /*!
341 * Information to this map is added only if an agent is
342 * using cooperation FIFO mechanism.
343 */
344 cooperation_map_t m_cooperations;
345
346 //! Information of agents.
347 agent_map_t m_agents;
348
349 /*!
350 * \since
351 * v.5.5.4
352 *
353 * \brief Data source for the run-time monitoring.
354 */
357
358 //! Creation event queue for an agent with individual FIFO.
359 void
361 agent_ref_t agent,
362 const Bind_Params & params )
363 {
364 auto queue = make_new_agent_queue( params );
365
366 m_agents.emplace(
367 agent.get(),
369 queue,
371 agent.get() } );
372 }
373
374 //! Creation event queue for an agent with individual FIFO.
375 /*!
376 * If the data for the agent's cooperation is not created yet
377 * it will be created.
378 */
379 void
381 agent_ref_t agent,
382 const Bind_Params & params )
383 {
384 const auto id = agent->so_coop().id();
385
386 auto it = m_cooperations.find( id );
387 if( it == m_cooperations.end() )
388 it = m_cooperations.emplace(
389 id,
392 1,
394 id ) )
395 .first;
396 else
397 it->second.m_agents += 1;
398
399 so_5::details::do_with_rollback_on_exception(
400 [&] {
401 m_agents.emplace(
402 agent.get(),
403 agent_data_t{ it->second.m_queue } );
404 },
405 [&] {
406 // Rollback m_cooperations modification.
407 if( 0 == --(it->second.m_agents) )
408 m_cooperations.erase( it );
409 } );
410 }
411
412 //! Helper method for creating event queue for agents/cooperations.
413 agent_queue_ref_t
415 const Bind_Params & params )
416 {
417 return agent_queue_ref_t(
418 new agent_queue_t{ outliving_mutable(m_queue), params } );
419 }
420
421 /*!
422 * \since
423 * v.5.5.4
424 *
425 * \brief Helper method for casting to stats_supplier-object.
426 */
427 tp_stats::stats_supplier_t &
429 {
430 return *this;
431 }
432
433 /*!
434 * \since
435 * v.5.5.4
436 *
437 * \brief Implementation of stats_supplier-related stuff.
438 */
439 virtual void
440 supply( tp_stats::stats_consumer_t & consumer ) override
441 {
442 // Statics must be collected on locked object.
443 std::lock_guard< std::mutex > lock{ m_lock };
444
445 consumer.set_thread_count( m_threads.size() );
446
447 for( auto & t : m_threads )
448 {
449 using stats_t = so_5::stats::work_thread_activity_stats_t;
450
451 Work_Thread & wt = *t;
452 wt.take_activity_stats(
453 [&wt, &consumer]( const stats_t & st ) {
454 consumer.add_work_thread_activity( wt.thread_id(), st );
455 } );
456 }
457
458 for( auto & q : m_cooperations )
459 {
460 auto & s = q.second;
461 s.update_queue_stats();
462 consumer.add_queue( s.m_queue_desc );
463 }
464
465 for( auto & a : m_agents )
466 {
467 auto & s = a.second;
468 if( !s.cooperation_fifo() )
469 {
470 s.update_queue_stats();
471 consumer.add_queue( s.m_queue_desc );
472 }
473 }
474 }
475 };
476
477} /* namespace common_implementation */
478
479} /* namespace thread_pool */
480
481} /* namespace disp */
482
483} /* namespace so_5 */
#define SO_5_CHECK_INVARIANT(what, data)
A base class for agents.
Definition agent.hpp:673
static execution_hint_t so_create_execution_hint(execution_demand_t &demand)
Create execution hint for the specified demand.
Definition agent.cpp:901
coop_handle_t so_coop() const
Get a handle of agent's coop.
Definition agent.cpp:860
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
The base class for the object with a reference counting.
auto id() const noexcept
Get the ID of the coop.
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &&params_setter) const
Create a binder for that dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void reset() noexcept
Drop the content of handle.
operator bool() const noexcept
Is this handle empty?
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
execution_demand_t peek_front()
Get the information about the front demand.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void push(execution_demand_t demand) override
Push next demand to queue.
std::size_t size() const noexcept
Get the current size of the queue.
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &)
Constructor.
bool worker_finished(unsigned int type_of_worker)
Signal about finishing of worker of the specified type.
void push_evt_finish(execution_demand_t demand) noexcept override
std::atomic< std::size_t > m_size
Current size of the queue.
bool is_there_not_thread_safe_worker() const
Check the presence of thread unsafe worker.
void delete_head() noexcept
Helper method for deleting queue's head object.
bool worker_started(unsigned int type_of_worker)
Remove the front demand.
spinlock_t & lock() noexcept
Access to the queue's lock.
void push_evt_start(execution_demand_t demand) override
bool is_there_any_worker() const
Check the presence of any worker at the moment.
The very basic interface of adv_thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
no_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
with_activity_tracking_impl_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
void process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
work_thread_template_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
An interface for somethine like condition variable for waiting on MPMC queue lock.
virtual void notify() noexcept=0
Notification for waiting customer.
virtual void wait() noexcept=0
Waiting on condition.
An interface for lock for MPMC queue.
virtual condition_unique_ptr_t allocate_condition()=0
Create condition object for another MPMC queue's customer.
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
std::size_t next_thread_wakeup_threshold() const
Getter for thread wakeup threshold value.
const lock_factory_t & lock_factory() const
Getter for lock factory.
Multi-producer/Multi-consumer queue of pointers to event queues.
T * m_tail
The current tail of the intrusive queue.
T * try_switch_to_another(T *current) noexcept
Switch the current non-empty queue to another one if it is possible.
void try_wakeup_someone_if_possible() noexcept
An attempt to wakeup another sleeping thread if it's necessary and possible.
const std::size_t m_max_thread_count
Maximum count of working threads to be used with that mpmc_queue.
so_5::disp::mpmc_queue_traits::lock_unique_ptr_t m_lock
Object's lock.
std::size_t m_queue_size
The current size of the intrusive queue.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
T * m_head
The current head of the intrusive queue.
queue_of_queues_t(const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params, std::size_t thread_count)
void shutdown() noexcept
Initiate shutdown for working threads.
const std::size_t m_next_thread_wakeup_threshold
Threshold for wake up next working thread if there are non-empty agent queues.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t allocate_condition()
T * pop(so_5::disp::mpmc_queue_traits::condition_t &condition) noexcept
Get next active queue.
void push_to_queue(T *new_tail) noexcept
Helper method that pushes a new item to the end of the queue.
bool m_wakeup_in_progress
Is some working thread in wakeup process now?
T * pop_head() noexcept
Helper method that extracts the head item from the queue.
std::vector< so_5::disp::mpmc_queue_traits::condition_t * > m_waiting_customers
Waiting threads.
Type of data source for thread-pool-like dispatchers.
const stats::prefix_t & prefix() const
Basic prefix for data source names.
void set_data_sources_name_base(const std::string_view disp_type, const std::string_view name_basic, const void *disp_pointer)
Setting of data-source basic name.
An interface of collector of information about thread-pool-like dispatcher state.
virtual void add_work_thread_activity(const so_5::current_thread_id_t &thread_id, const so_5::stats::work_thread_activity_stats_t &stats)=0
Informs consumer about yet another working thread activity.
virtual void set_thread_count(std::size_t value)=0
Informs consumer about actual actual thread count.
virtual void add_queue(const intrusive_ptr_t< queue_description_holder_t > &queue_desc)=0
Informs counsumer about yet another event queue.
An interface of supplier of information about thread-pool-like dispatcher state.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
void undo_preallocation_for_agent(agent_t &agent) noexcept
Undo preallocation of resources for a new agent.
void preallocate_resources_for_agent(agent_t &agent, const Bind_Params &params)
Preallocate all necessary resources for a new agent.
void bind_agent_with_cooperation_fifo(agent_ref_t agent, const Bind_Params &params)
Creation event queue for an agent with individual FIFO.
void bind_agent_with_inidividual_fifo(agent_ref_t agent, const Bind_Params &params)
Creation event queue for an agent with individual FIFO.
virtual void supply(tp_stats::stats_consumer_t &consumer) override
Implementation of stats_supplier-related stuff.
tp_stats::stats_supplier_t & stats_supplier()
Helper method for casting to stats_supplier-object.
Dispatcher_Queue m_queue
Queue for active agent's queues.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept
Get resources allocated for an agent.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Pool of work threads.
void unbind_agent(agent_t &agent) noexcept
Unbind agent from the dispatcher.
dispatcher_t(environment_t &env, const so_5::disp::reuse::work_thread_factory_mixin_t< Dispatcher_Params > &disp_params, const std::string_view name_base, std::size_t thread_count, const so_5::disp::mpmc_queue_traits::queue_params_t &queue_params)
Constructor.
agent_queue_ref_t make_new_agent_queue(const Bind_Params &params)
Helper method for creating event queue for agents/cooperations.
dispatcher_t & operator=(const dispatcher_t &)=delete
cooperation_map_t m_cooperations
Information about cooperations.
stats::manually_registered_source_holder_t< tp_stats::data_source_t > m_data_source
Data source for the run-time monitoring.
An analog of unique_ptr for abstract_work_thread.
work_thread_holder_t(work_thread_holder_t &&o) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
An interface of event queue for agent.
bool is_thread_safe() const
Is thread safe handler?
void exec(current_thread_id_t working_thread_id) const
Call event handler directly.
Template class for smart reference wrapper on the atomic_refcounted_t.
intrusive_ptr_t(T *obj) noexcept
Constructor for a raw pointer.
T * operator->() const noexcept
T * get() const noexcept
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
Base for the case of externals stats lock.
An addition to auto_registered_source_holder for the cases where manual registration of data_source s...
void start(outliving_reference_t< repository_t > repo)
A type for storing prefix of data_source name.
Definition prefix.hpp:32
#define SO_5_FUNC
Definition declspec.hpp:48
Some reusable and low-level classes/functions which can be used in public header files.
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Helper tools for implementation of run-time monitoring for thread-pool-like dispatchers.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, coop_id_t coop_id, std::size_t agent_count)
Helper function for creating queue_description_holder object.
queue_description_holder_ref_t make_queue_desc_holder(const stats::prefix_t &prefix, const void *agent)
Helper function for creating queue_description_holder object.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Thread pool dispatcher.
Event dispatchers.
Details of SObjectizer run-time implementations.
Definition agent.cpp:780
void ensure_join_from_different_thread(current_thread_id_t thread_to_be_joined)
Ensures that join will be called from different thread.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
current_thread_id_t query_current_thread_id()
Get the ID of the current thread.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t &params) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &) noexcept
common_data_t(outliving_reference_t< dispatcher_queue_t > queue, work_thread_holder_t thread_holder)
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
queue_description_t m_desc
Actual description for the event queue.
std::size_t m_agent_count
Count of agents bound to that queue.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
void update_queue_stats()
Update queue description with current information.
agent_data_t(agent_queue_ref_t queue, const stats::prefix_t &data_source_name_prefix, const agent_t *agent_ptr)
Constructor for the case when agent uses individual FIFO.
agent_data_t(agent_queue_ref_t queue)
Constructor for the case when agent uses cooperation FIFO.
tp_stats::queue_description_holder_ref_t m_queue_desc
Description of that queue for run-time monitoring.
cooperation_data_t(agent_queue_ref_t queue, std::size_t agents, const stats::prefix_t &data_source_name_prefix, coop_id_t coop_id)
A description of event execution demand.
agent_t * m_receiver
Receiver of demand.
Various traits of activity tracking implementation.
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.