SObjectizer  5.8
Loading...
Searching...
No Matches
strictly_ordered/impl/demand_queue.hpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief A demand queue for dispatcher with one common working
8 * thread and support of demands priority.
9 *
10 * \since
11 * v.5.5.8
12 */
13
14#pragma once
15
16#include <memory>
17#include <atomic>
18
19#include <so_5/execution_demand.hpp>
20#include <so_5/event_queue.hpp>
21
22#include <so_5/priority.hpp>
23
24#include <so_5/disp/mpsc_queue_traits/pub.hpp>
25
26#if defined(__clang__) && (__clang_major__ >= 16)
27#pragma clang diagnostic push
28#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
29#endif
30
31namespace so_5 {
32
33namespace disp {
34
35namespace prio_one_thread {
36
37namespace strictly_ordered {
38
39namespace impl {
40
41namespace queue_traits = so_5::disp::mpsc_queue_traits;
42
43//
44// demand_t
45//
46/*!
47 * \brief A single execution demand.
48 *
49 * \since v.5.5.8
50 */
51struct demand_t final : public execution_demand_t
52 {
53 //! Next demand in the queue.
54 /*!
55 * \note
56 * It's a dynamically allocated object that has to be deallocated
57 * manually during the destruction of the queue.
58 */
59 demand_t * m_next = nullptr;
60
61 //! Initializing constructor.
65 };
66
67//
68// demand_unique_ptr_t
69//
70/*!
71 * \brief An alias for unique_ptr to demand.
72 *
73 * \since v.5.5.8
74 */
76
77//
78// demand_queue_t
79//
80/*!
81 * \brief A demand queue with support of demands priorities.
82 *
83 * \since v.5.5.8
84 */
86 {
87 friend struct queue_for_one_priority_t;
88
89 //! Description of queue for one priority.
91 : public event_queue_t
92 {
93 //! Pointer to main demand queue.
95
96 //! Head of the queue.
97 /*! Null if queue is empty. */
98 demand_t * m_head = nullptr;
99 //! Tail of the queue.
100 /*! Null if queue is empty. */
101 demand_t * m_tail = nullptr;
102
103 /*!
104 * \name Information for run-time monitoring.
105 * \{
106 */
107 //! Count of agents attached to that queue.
109 //! Count of demands in the queue.
111 /*!
112 * \}
113 */
114
115 void
116 push( execution_demand_t exec_demand ) override
117 {
118 demand_unique_ptr_t what{ new demand_t{
119 std::move( exec_demand ) } };
120
121 m_demand_queue->push( this, std::move( what ) );
122 }
123
124 /*!
125 * \note
126 * Delegates the work to the push() method.
127 */
128 void
130 {
131 this->push( std::move(demand) );
132 }
133
134 /*!
135 * \note
136 * Delegates the work to the push() method.
137 *
138 * \attention
139 * Terminates the whole application if the push() throws.
140 */
141 void
142 push_evt_finish( execution_demand_t demand ) noexcept override
143 {
144 this->push( std::move(demand) );
145 }
146 };
147
148 public :
149 //! This exception is thrown when pop is called after stop.
151 {};
152
153 //! Statistic about one subqueue.
160
162 //! Lock to be used for queue protection.
163 queue_traits::lock_unique_ptr_t lock )
164 : m_lock{ std::move(lock) }
165 {
166 // Every subqueue must have a valid pointer to main demand queue.
167 for( auto & q : m_priorities )
168 q.m_demand_queue = this;
169 }
171 {
172 for( auto & q : m_priorities )
173 cleanup_queue( q );
174 }
175
176 //! Set the shutdown signal.
177 void
179 {
180 queue_traits::lock_guard_t lock{ *m_lock };
181
182 m_shutdown = true;
183
184 if( !m_current_priority )
185 // There could be a sleeping working thread.
186 // It must be notified.
187 lock.notify_one();
188 }
189
190 //! Pop demand from the queue.
191 /*!
192 * \throw shutdown_ex_t in the case when queue is shut down.
193 */
194 [[nodiscard]]
197 {
198 queue_traits::unique_lock_t lock{ *m_lock };
199
200 while( !m_shutdown && !m_current_priority )
202
203 if( m_shutdown )
204 throw shutdown_ex_t();
205
206 demand_unique_ptr_t result{ m_current_priority->m_head };
207
208 m_current_priority->m_head = result->m_next;
209 result->m_next = nullptr;
210 --(m_current_priority->m_demands_count);
211
213 {
214 // Queue become empty.
216
217 // A non-empty subqueue with lower priority needs to be found.
218 while( m_current_priority > m_priorities )
219 {
220 --m_current_priority;
221 if( m_current_priority->m_head )
222 return result;
223 }
224
225 m_current_priority = nullptr;
226 }
227
228 return result;
229 }
230
231 //! Get queue for the priority specified.
234 {
235 return m_priorities[ to_size_t(priority) ];
236 }
237
238 //! Notification about attachment of yet another agent to the queue.
239 void
241 {
242 ++(m_priorities[ to_size_t(priority) ].m_agents_count);
243 }
244
245 //! Notification about detachment of an agent from the queue.
246 void
248 {
249 --(m_priorities[ to_size_t(priority) ].m_agents_count);
250 }
251
252 //! A special method for handling statistical data for
253 //! every subqueue.
254 template< class Lambda >
255 void
265
266 private :
267 //! Queue lock.
269
270 //! Shutdown flag.
271 bool m_shutdown = false;
272
273 //! Pointer to the current subqueue.
274 /*!
275 * This pointer will point to the non-empty subqueue with the
276 * highest priority demand. If there is no such demand then
277 * this pointer will be nullptr.
278 */
280
281 //! Subqueues for priorities.
283
284 //! Destroy all demands in the queue specified.
285 void
287 {
288 auto h = queue_info.m_head;
289 while( h )
290 {
291 demand_unique_ptr_t t{ h };
292 h = h->m_next;
293 }
294 }
295
296 //! Push a new demand to the queue.
297 void
299 //! Subqueue for the demand.
300 queue_for_one_priority_t * subqueue,
301 //! Demand to be pushed.
302 demand_unique_ptr_t demand )
303 {
304 queue_traits::lock_guard_t lock{ *m_lock };
305
306 add_demand_to_queue( *subqueue, std::move( demand ) );
307
308 if( !m_current_priority )
309 {
310 // Queue was empty. A sleeping working thread must
311 // be notified.
312 m_current_priority = subqueue;
313 lock.notify_one();
314 }
315 else if( m_current_priority < subqueue )
316 // New demand has greater priority than the previous.
317 m_current_priority = subqueue;
318 }
319
320 //! Add a new demand to the tail of the queue specified.
321 void
324 demand_unique_ptr_t demand )
325 {
326 if( queue.m_tail )
327 {
328 // Queue is not empty. Tail will be modified.
329 queue.m_tail->m_next = demand.release();
330 queue.m_tail = queue.m_tail->m_next;
331 }
332 else
333 {
334 // Queue is empty. The whole description will be modified.
335 queue.m_head = demand.release();
336 queue.m_tail = queue.m_head;
337 }
338
339 ++(queue.m_demands_count);
340 }
341 };
342
343} /* namespace impl */
344
345} /* namespace strictly_ordered */
346
347} /* namespace prio_one_thread */
348
349} /* namespace disp */
350
351} /* namespace so_5 */
352
353#if defined(__clang__) && (__clang_major__ >= 16)
354#pragma clang diagnostic pop
355#endif
A base class for agents.
Definition agent.hpp:673
An analog of std::lock_guard for MPSC queue lock.
Container for storing parameters for MPSC queue.
An analog of std::unique_lock for MPSC queue lock.
const queue_traits::queue_params_t & queue_params() const noexcept
Getter for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
queue_for_one_priority_t m_priorities[so_5::prio::total_priorities_count]
Subqueues for priorities.
queue_for_one_priority_t * m_current_priority
Pointer to the current subqueue.
void push(queue_for_one_priority_t *subqueue, demand_unique_ptr_t demand)
Push a new demand to the queue.
void agent_bound(priority_t priority)
Notification about attachment of yet another agent to the queue.
void add_demand_to_queue(queue_for_one_priority_t &queue, demand_unique_ptr_t demand)
Add a new demand to the tail of the queue specified.
void cleanup_queue(queue_for_one_priority_t &queue_info)
Destroy all demands in the queue specified.
void agent_unbound(priority_t priority)
Notification about detachment of an agent from the queue.
event_queue_t & event_queue_by_priority(priority_t priority)
Get queue for the priority specified.
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t agents_count, std::size_t demands_count)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
Mixin that holds optional work thread factory.
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Reusable code for dispatchers with one working thread for events of all priorities.
Implementation details for dispatcher which handles prioritized events in strict order.
Dispatcher which handles events in strict order (from highest priority to lowest).
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base)
Create an instance of strictly_ordered dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of strictly_ordered dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of strictly_ordered dispatcher.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
Event dispatchers.
Helpers for working with priorities.
Definition priority.hpp:73
const unsigned int total_priorities_count
Total count of priorities.
Definition priority.hpp:105
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
priority_t
Definition of supported priorities.
Definition priority.hpp:28
A description of event execution demand.