SObjectizer 5.8
Loading...
Searching...
No Matches
basic_event_queue.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief A reusable implementation of agent_queue that can be used by
8 * various thread-pool dispatchers.
9 *
10 * \since v.5.8.0
11 */
12
13#pragma once
14
15#include <so_5/disp/reuse/queue_of_queues.hpp>
16
17#include <so_5/event_queue.hpp>
18#include <so_5/outliving.hpp>
19#include <so_5/spinlocks.hpp>
20
21namespace so_5
22{
23
24namespace disp
25{
26
27namespace thread_pool
28{
29
30namespace impl
31{
32
33using spinlock_t = so_5::default_spinlock_t;
34
35//
36// basic_event_queue_t
37//
39 : public event_queue_t
40 {
41 protected :
42 //! Actual demand in event queue.
44 {
45 //! Next item in queue.
46 /*!
47 * \note
48 * It's a dynamically allocated object that has to be deallocated
49 * manually during the destruction of the queue.
50 */
52
54 : m_next( nullptr )
55 {}
57 : execution_demand_t( std::move( original ) )
58 , m_next( nullptr )
59 {}
60 };
61
62 public :
64 std::size_t max_demands_at_once )
65 : m_max_demands_at_once( max_demands_at_once )
67 {}
68
70 {
73 }
74
75 /*!
76 * \brief Helper method that implements pushing of a new preallocated
77 * demand to the queue.
78 *
79 * It's intended to be used in implementation of push(),
80 * push_evt_start() and push_evt_finish() methods.
81 *
82 * \since v.5.8.0
83 */
84 void
85 push_preallocated( std::unique_ptr< demand_t > tail_demand ) noexcept
86 {
87 const bool was_empty = [&]() noexcept {
88 std::lock_guard< spinlock_t > lock( m_lock );
89
90 const bool queue_was_empty = (nullptr == m_head_demand.m_next);
91
92 m_tail_demand->m_next = tail_demand.release();
94
95 ++m_size;
96
97 return queue_was_empty;
98 }();
99
100 // Scheduling of the queue must be done when queue lock
101 // is unlocked.
102 if( was_empty )
104 }
105
106 //! Push next demand to queue.
107 void
108 push( execution_demand_t demand ) override
109 {
110 std::unique_ptr< demand_t > tail_demand{
111 new demand_t( std::move( demand ) ) };
112
113 push_preallocated( std::move(tail_demand) );
114 }
115
116 //! Push evt_start demand to the queue.
117 void
119 {
120 // Just delegate the work.
121 this->push( std::move(demand) );
122 }
123
124 //! Push evt_start demand to the queue.
125 /*!
126 * \attention
127 * This method is noexcept but it allocates and allocation
128 * can throw. In that case the whole application will be
129 * terminated.
130 */
131 void
132 push_evt_finish( execution_demand_t demand ) noexcept override
133 {
134 // Just delegate the work.
135 this->push( std::move(demand) );
136 }
137
138 //! Get the front demand from queue.
139 /*!
140 * \attention This method must be called only on non-empty queue.
141 */
144 {
145 return *(m_head_demand.m_next);
146 }
147
148 /*!
149 * \brief Queue emptyness indication.
150 *
151 * \since v.5.5.15.1
152 */
153 enum class emptyness_t
154 {
155 empty,
157 };
158
159 /*!
160 * \brief Indication of possibility of continuation of demands processing.
161 *
162 * \since v.5.5.15.1
163 */
165 {
166 //! Next demand can be processed.
167 enabled,
169 };
170
171 /*!
172 * \brief A result of erasing of the front demand from queue.
173 *
174 * \since v.5.5.15.1
175 */
177 {
178 //! Can demands processing be continued?
180 //! Is event queue empty?
182 };
183
184 //! Remove the front demand.
185 /*!
186 * \note Return processing_continuation_t::disabled if
187 * \a demands_processed exceeds m_max_demands_at_once or if
188 * event queue is empty.
189 */
192 //! Count of consequently processed demands from that queue.
193 std::size_t demands_processed )
194 {
195 // Actual deletion of old head must be performed
196 // when m_lock will be released.
197 std::unique_ptr< demand_t > old_head;
198 {
199 std::lock_guard< spinlock_t > lock( m_lock );
200
201 old_head = remove_head();
202
203 const auto emptyness = m_head_demand.m_next ?
205
206 if( emptyness_t::empty == emptyness )
208
209 return pop_result_t{
210 detect_continuation( emptyness, demands_processed ),
211 emptyness };
212 }
213 }
214
215 /*!
216 * \brief Wait while queue becomes empty.
217 *
218 * It is necessary because there is a possibility that
219 * after processing of demand_handler_on_finish cooperation
220 * will be destroyed and agents will be unbound from dispatcher
221 * before the return from demand_handler_on_finish.
222 *
223 * Without waiting for queue emptyness it could lead to
224 * dangling pointer to agent_queue in worker thread.
225 */
226 void
228 {
229 bool empty = false;
230 while( !empty )
231 {
232 {
233 std::lock_guard< spinlock_t > lock( m_lock );
234 empty = (nullptr == m_head_demand.m_next);
235 }
236
237 if( !empty )
238 std::this_thread::yield();
239 }
240 }
241
242 /*!
243 * \brief Get the current size of the queue.
244 *
245 * \since v.5.5.4
246 */
247 [[nodiscard]]
248 std::size_t
249 size() const noexcept
250 {
251 return m_size.load( std::memory_order_acquire );
252 }
253
254 protected:
255 /*!
256 * \brief Perform scheduling of processing of this event queue
257 *
258 * This method should be overrided in a derived class to call
259 * schedule() method for an appropriate dispatcher queue.
260 * See so_5::disp::reuse::queue_of_queues_t::schedule().
261 */
262 virtual void
264
265 private :
266 //! Maximum count of demands to be processed consequently.
267 const std::size_t m_max_demands_at_once;
268
269 //! Object's lock.
270 spinlock_t m_lock;
271
272 //! Head of the demand's queue.
273 /*!
274 * Never contains actual demand. Only m_next field is used.
275 */
277 //! Tail of the demand's queue.
278 /*!
279 * Must point to m_head_demand if queue is empty or to the very
280 * last queue item otherwise.
281 */
283
284 /*!
285 * \brief Current size of the queue.
286 *
287 * \since v.5.5.4
288 */
289 std::atomic< std::size_t > m_size = { 0 };
290
291 //! Helper method for deleting queue's head object.
292 inline std::unique_ptr< demand_t >
293 remove_head() noexcept
294 {
295 std::unique_ptr< demand_t > to_be_deleted{ m_head_demand.m_next };
297
298 --m_size;
299
300 return to_be_deleted;
301 }
302
303 //! Can processing be continued?
306 emptyness_t emptyness,
307 const std::size_t processed )
308 {
309 return emptyness_t::not_empty == emptyness &&
310 processed < m_max_demands_at_once ?
313 }
314 };
315
316} /* namespace impl */
317
318} /* namespace thread_pool */
319
320} /* namespace disp */
321
322} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
The base class for the object with a reference counting.
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
Parameters for binding agents to nef_thread_pool dispatcher.
bind_params_t & max_demands_at_once(std::size_t v)
Set maximum count of demands to be processed at once.
std::size_t m_max_demands_at_once
Maximum count of demands to be processed at once.
std::size_t query_max_demands_at_once() const
Get maximum count of demands to do processed at once.
Alias for namespace with traits of event queue.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
std::size_t m_thread_count
Count of working threads.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
queue_traits::queue_params_t m_queue_params
Queue parameters.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
std::size_t thread_count() const
Getter for thread count.
disp_params_t()=default
Default constructor.
A handle for nef_thread_pool dispatcher.
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
bool empty() const noexcept
Is this handle empty?
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
operator bool() const noexcept
Is this handle empty?
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &&params_setter) const
Create a binder for that dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
const bind_params_t m_params
Binding parameters.
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
An actual interface of nef-thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
void schedule_on_disp_queue() noexcept override
Perform scheduling of processing of this event queue.
dispatcher_queue_t & m_disp_queue
Dispatcher queue with that the agent queue has to be used.
agent_queue_with_preallocated_finish_demand_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
std::unique_ptr< base_type_t::demand_t > m_finish_demand
A preallocated demand for evt_finish.
agent_queue_with_preallocated_finish_demand_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
agent_queue_with_preallocated_finish_demand_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &params)
Initializing constructor.
void intrusive_queue_set_next(agent_queue_with_preallocated_finish_demand_t *next) noexcept
Set a pointer to the next agent_queue.
The very basic interface of thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Multi-producer/Multi-consumer queue of pointers to event queues.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
execution_demand_t & front()
Get the front demand from queue.
void push_evt_start(execution_demand_t demand) override
Push evt_start demand to the queue.
virtual void schedule_on_disp_queue() noexcept=0
Perform scheduling of processing of this event queue.
std::atomic< std::size_t > m_size
Current size of the queue.
std::unique_ptr< demand_t > remove_head() noexcept
Helper method for deleting queue's head object.
void push(execution_demand_t demand) override
Push next demand to queue.
processing_continuation_t
Indication of possibility of continuation of demands processing.
processing_continuation_t detect_continuation(emptyness_t emptyness, const std::size_t processed)
Can processing be continued?
const std::size_t m_max_demands_at_once
Maximum count of demands to be processed consequently.
void push_evt_finish(execution_demand_t demand) noexcept override
Push evt_start demand to the queue.
pop_result_t pop(std::size_t demands_processed)
Remove the front demand.
demand_t * m_tail_demand
Tail of the demand's queue.
void push_preallocated(std::unique_ptr< demand_t > tail_demand) noexcept
Helper method that implements pushing of a new preallocated demand to the queue.
void wait_for_emptyness() noexcept
Wait while queue becomes empty.
demand_t m_head_demand
Head of the demand's queue.
std::size_t size() const noexcept
Get the current size of the queue.
Part of implementation of work thread without activity tracing.
no_activity_tracking_impl_t(outliving_reference_t< Disp_Queue > queue, work_thread_holder_t thread_holder)
Initializing constructor.
with_activity_tracking_impl_t(outliving_reference_t< Disp_Queue > queue, work_thread_holder_t thread_holder)
Initializing constructor.
activity_tracking_traits::lock_t m_stats_lock
Lock for activity statistics.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_waiting_stats_collector
A collector for waiting stats.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock<> > m_work_activity_collector
A collector for work activity.
agent_queue_t * pop_agent_queue() noexcept
An attempt of extraction of non-empty agent queue.
work_thread_template_t(outliving_reference_t< disp_queue_t > queue, work_thread_holder_t thread_holder)
Initializing constructor.
agent_queue_t::emptyness_t process_queue(agent_queue_t &queue)
Processing of demands from agent queue.
void do_queue_processing(agent_queue_t *current_queue)
Starts processing of demands from the queue specified.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
An analog of unique_ptr for abstract_work_thread.
work_thread_holder_t(work_thread_holder_t &&o) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
Base for the case of externals stats lock.
#define SO_5_FUNC
Definition declspec.hpp:48
Various stuff related to MPMC event queue implementation and tuning.
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance nef_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of nef_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of nef_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of nef_thread_pool dispatcher with the default count of working threads.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Internal implementation details of thread pool dispatcher.
Thread pool dispatcher.
Event dispatchers.
Details of SObjectizer run-time implementations.
Definition agent.cpp:780
void ensure_join_from_different_thread(current_thread_id_t thread_to_be_joined)
Ensures that join will be called from different thread.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
current_thread_id_t query_current_thread_id()
Get the ID of the current thread.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t &) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_with_preallocated_finish_demand_t &queue) noexcept
A result of erasing of the front demand from queue.
processing_continuation_t m_continuation
Can demands processing be continued?
common_data_t(outliving_reference_t< Disp_Queue > queue, work_thread_holder_t thread_holder)
Part of common_data_t that doesn't depend on a template parameter.
so_5::disp::mpmc_queue_traits::condition_unique_ptr_t m_condition
Waiting object for long wait.
common_data_template_independent_t(work_thread_holder_t thread_holder, so_5::disp::mpmc_queue_traits::condition_unique_ptr_t condition)
A description of event execution demand.
Various traits of activity tracking implementation.
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.