SObjectizer 5.8
Loading...
Searching...
No Matches
thread_pool/impl/disp.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief An implementation of thread pool dispatcher.
8 *
9 * \since v.5.4.0
10 */
11
12#pragma once
13
14#include <so_5/disp/thread_pool/impl/basic_event_queue.hpp>
15#include <so_5/disp/thread_pool/impl/work_thread_template.hpp>
16
17#include <so_5/disp/thread_pool/pub.hpp>
18
19namespace so_5
20{
21
22namespace disp
23{
24
25namespace thread_pool
26{
27
28namespace impl
29{
30
31class agent_queue_t;
32
33//
34// dispatcher_queue_t
35//
36using dispatcher_queue_t = so_5::disp::reuse::queue_of_queues_t< agent_queue_t >;
37
38//
39// agent_queue_t
40//
41/*!
42 * \brief Event queue for the agent (or cooperation).
43 *
44 * \note
45 * This class isn't final since v.5.8.0
46 *
47 * \since v.5.4.0
48 */
50 : public basic_event_queue_t
51 , private so_5::atomic_refcounted_t
52 {
53 friend class so_5::intrusive_ptr_t< agent_queue_t >;
54
55 public :
56 //! Initializing constructor.
58 //! Dispatcher queue to work with.
59 outliving_reference_t< dispatcher_queue_t > disp_queue,
60 //! Parameters for the queue.
61 const bind_params_t & params )
64 }
65 , m_disp_queue{ disp_queue.get() }
66 {}
67
68 /*!
69 * \brief Give away a pointer to the next agent_queue.
70 *
71 * \note
72 * This method is a part of interface required by
73 * so_5::disp::reuse::queue_of_queues_t.
74 *
75 * \since v.5.8.0
76 */
77 [[nodiscard]]
80 {
81 auto * r = m_intrusive_queue_next;
82 m_intrusive_queue_next = nullptr;
83 return r;
84 }
85
86 /*!
87 * \brief Set a pointer to the next agent_queue.
88 *
89 * \note
90 * This method is a part of interface required by
91 * so_5::disp::reuse::queue_of_queues_t.
92 *
93 * \since v.5.8.0
94 */
95 void
97 {
99 }
100
101 protected:
102 void
103 schedule_on_disp_queue() noexcept override
104 {
106 }
107
108 private :
109 //! Dispatcher queue with that the agent queue has to be used.
110 dispatcher_queue_t & m_disp_queue;
111
112 /*!
113 * \brief The next item in intrusive queue of agent_queues.
114 *
115 * This field is necessary to implement interface required by
116 * so_5::disp::reuse::queue_of_queues_t.
117 *
118 * \since v.5.8.0
119 */
121 };
122
123//
124// adaptation_t
125//
126/*!
127 * \brief Adaptation of common implementation of thread-pool-like dispatcher
128 * to the specific of this thread-pool dispatcher.
129 *
130 * \since v.5.5.4
131 */
133 {
134 [[nodiscard]]
135 static constexpr std::string_view
137 {
138 return { "tp" }; // thread_pool.
139 }
140
141 [[nodiscard]]
142 static bool
143 is_individual_fifo( const bind_params_t & params ) noexcept
144 {
145 return fifo_t::individual == params.query_fifo();
146 }
147
148 static void
150 {
152 }
153 };
154
155//
156// dispatcher_template_t
157//
158/*!
159 * \brief Template for dispatcher.
160 *
161 * This template depends on work_thread type (with or without activity
162 * tracking).
163 *
164 * \since v.5.5.18
165 */
166template< typename Work_Thread >
167using dispatcher_template_t =
168 common_implementation::dispatcher_t<
169 Work_Thread,
170 dispatcher_queue_t,
172 adaptation_t >;
173
174} /* namespace impl */
175
176} /* namespace thread_pool */
177
178} /* namespace disp */
179
180} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
The base class for the object with a reference counting.
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
Multi-producer/Multi-consumer queue of pointers to event queues.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
Parameters for binding agents to thread_pool dispatcher.
std::size_t m_max_demands_at_once
Maximum count of demands to be processed at once.
bind_params_t & max_demands_at_once(std::size_t v)
Set maximum count of demands to be processed at once.
fifo_t query_fifo() const
Get FIFO type.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
std::size_t query_max_demands_at_once() const
Get maximum count of demands to do processed at once.
Alias for namespace with traits of event queue.
queue_traits::queue_params_t m_queue_params
Queue parameters.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t()=default
Default constructor.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
std::size_t m_thread_count
Count of working threads.
std::size_t thread_count() const
Getter for thread count.
A handle for thread_pool dispatcher.
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool empty() const noexcept
Is this handle empty?
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
operator bool() const noexcept
Is this handle empty?
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &&params_setter) const
Create a binder for that dispatcher.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
const bind_params_t m_params
Binding parameters.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
An actual interface of thread-pool dispatcher.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
disp_binder_shptr_t binder(bind_params_t params) override
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
Event queue for the agent (or cooperation).
agent_queue_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
void intrusive_queue_set_next(agent_queue_t *next) noexcept
Set a pointer to the next agent_queue.
agent_queue_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
void schedule_on_disp_queue() noexcept override
Perform scheduling of processing of this event queue.
dispatcher_queue_t & m_disp_queue
Dispatcher queue with that the agent queue has to be used.
agent_queue_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &params)
Initializing constructor.
The very basic interface of thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
void wait_for_emptyness() noexcept
Wait while queue becomes empty.
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
#define SO_5_FUNC
Definition declspec.hpp:48
Various stuff related to MPMC event queue implementation and tuning.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Internal implementation details of thread pool dispatcher.
Thread pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of thread_pool dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of thread_pool dispatcher with the default count of working threads.
Event dispatchers.
Private part of message limit implementation.
Definition agent.cpp:33
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_t &queue) noexcept
static bool is_individual_fifo(const bind_params_t &params) noexcept