2
3
6
7
8
9
10
11
12
19#include <so_5/execution_demand.hpp>
20#include <so_5/event_queue.hpp>
22#include <so_5/priority.hpp>
24#include <so_5/disp/mpsc_queue_traits/pub.hpp>
26#if defined(__clang__
) && (__clang_major__
>= 16
)
27#pragma clang diagnostic push
28#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
47
48
49
50
71
72
73
74
81
82
83
84
87 friend struct queue_for_one_priority_t;
104
105
106
112
113
118 demand_unique_ptr_t what{
new demand_t{
119 std::move( exec_demand ) } };
121 m_demand_queue->push(
this, std::move( what ) );
125
126
127
131 this->push( std::move(demand) );
135
136
137
138
139
140
144 this->push( std::move(demand) );
163 queue_traits::lock_unique_ptr_t lock )
167 for(
auto & q : m_priorities )
168 q.m_demand_queue =
this;
172 for(
auto & q : m_priorities )
192
193
206 demand_unique_ptr_t result{ m_current_priority->m_head };
208 m_current_priority->m_head = result->m_next;
209 result->m_next =
nullptr;
218 while( m_current_priority > m_priorities )
220 --m_current_priority;
221 if( m_current_priority->m_head )
235 return m_priorities[ to_size_t(priority) ];
242 ++(m_priorities[ to_size_t(priority) ].m_agents_count);
249 --(m_priorities[ to_size_t(priority) ].m_agents_count);
254 template<
class Lambda >
275
276
277
278
291 demand_unique_ptr_t t{ h };
302 demand_unique_ptr_t demand )
306 add_demand_to_queue( *subqueue, std::move( demand ) );
324 demand_unique_ptr_t demand )
329 queue.m_tail->m_next = demand.release();
335 queue.m_head = demand.release();
339 ++(queue.m_demands_count);
353#if defined(__clang__
) && (__clang_major__
>= 16
)
354#pragma clang diagnostic pop
An analog of std::lock_guard for MPSC queue lock.
void notify_one() noexcept
Container for storing parameters for MPSC queue.
An analog of std::unique_lock for MPSC queue lock.
void wait_for_notify() noexcept
Alias for namespace with traits of event queue.
const queue_traits::queue_params_t & queue_params() const noexcept
Getter for queue parameters.
disp_params_t()=default
Default constructor.
queue_traits::queue_params_t m_queue_params
Queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
A handle for prio_one_thread::strictly_ordered dispatcher.
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
operator bool() const noexcept
Is this handle empty?
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
dispatcher_handle_t() noexcept=default
dispatcher_handle_t(disp_binder_shptr_t binder) noexcept
bool empty() const noexcept
Is this handle empty?
disp_binder_shptr_t m_binder
Binder for the dispatcher.
This exception is thrown when pop is called after stop.
A demand queue with support of demands priorities.
demand_unique_ptr_t pop()
Pop demand from the queue.
queue_for_one_priority_t m_priorities[so_5::prio::total_priorities_count]
Subqueues for priorities.
queue_for_one_priority_t * m_current_priority
Pointer to the current subqueue.
queue_traits::lock_unique_ptr_t m_lock
Queue lock.
void push(queue_for_one_priority_t *subqueue, demand_unique_ptr_t demand)
Push a new demand to the queue.
bool m_shutdown
Shutdown flag.
void handle_stats_for_each_prio(Lambda handler)
void agent_bound(priority_t priority)
Notification about attachment of yet another agent to the queue.
void add_demand_to_queue(queue_for_one_priority_t &queue, demand_unique_ptr_t demand)
Add a new demand to the tail of the queue specified.
void stop()
Set the shutdown signal.
void cleanup_queue(queue_for_one_priority_t &queue_info)
Destroy all demands in the queue specified.
void agent_unbound(priority_t priority)
Notification about detachment of an agent from the queue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock)
event_queue_t & event_queue_by_priority(priority_t priority)
Get queue for the priority specified.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
Data source for run-time monitoring of whole dispatcher.
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t agents_count, std::size_t demands_count)
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
stats::prefix_t m_base_prefix
Basic prefix for data sources.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
demand_queue_t m_demand_queue
Demand queue for the dispatcher.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
Work_Thread m_work_thread
Working thread for the dispatcher.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
~dispatcher_template_t() noexcept override
Mixin with thread activity tracking flag.
Mixin that holds optional work thread factory.
Interface for dispatcher binders.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Reusable code for dispatchers with one working thread for events of all priorities.
Implementation details for dispatcher which handles prioritized events in strict order.
Dispatcher which handles events in strict order (from highest priority to lowest).
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base)
Create an instance of strictly_ordered dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of strictly_ordered dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of strictly_ordered dispatcher.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
Helpers for working with priorities.
const unsigned int total_priorities_count
Total count of priorities.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
priority_t
Definition of supported priorities.
Description of queue for one priority.
std::atomic< std::size_t > m_demands_count
Count of demands in the queue.
demand_t * m_head
Head of the queue.
void push_evt_finish(execution_demand_t demand) noexcept override
demand_queue_t * m_demand_queue
Pointer to main demand queue.
demand_t * m_tail
Tail of the queue.
void push(execution_demand_t exec_demand) override
Enqueue new event to the queue.
std::atomic< std::size_t > m_agents_count
Count of agents attached to that queue.
void push_evt_start(execution_demand_t demand) override
Statistic about one subqueue.
std::size_t m_agents_count
std::size_t m_demands_count
demand_t(execution_demand_t &&source)
Initializing constructor.
demand_t * m_next
Next demand in the queue.
A description of event execution demand.