2
3
6
7
8
9
10
11
15#include <so_5/current_thread_id.hpp>
17#include <so_5/disp/abstract_work_thread.hpp>
19#include <so_5/stats/work_thread_activity.hpp>
20#include <so_5/stats/impl/activity_tracking.hpp>
22#include <so_5/details/at_scope_exit.hpp>
24#include <so_5/impl/thread_join_stuff.hpp>
42
43
44
45
46template<
typename Demand_Queue >
57
58
59
64 Demand_Queue & queue )
74
75
76
77
78template<
typename Demand_Queue >
86 Demand_Queue & queue )
108
109
110
111
112template<
typename Demand_Queue >
120 Demand_Queue & queue )
165
166
167
168
169
171 typename Demand_Queue,
172 template<
class>
class Work_Thread >
175 using base_type_t = Work_Thread< Demand_Queue >;
181 Demand_Queue & queue )
An analog of std::lock_guard for MPSC queue lock.
void notify_one() noexcept
Container for storing parameters for MPSC queue.
An analog of std::unique_lock for MPSC queue lock.
Alias for namespace with traits of event queue.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
disp_params_t()
Default constructor.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for prio_one_thread::strictly_ordered dispatcher.
disp_binder_shptr_t m_binder
Binder for the dispatcher.
void reset() noexcept
Drop the content of handle.
bool empty() const noexcept
Is this handle empty?
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
dispatcher_handle_t(disp_binder_shptr_t binder) noexcept
operator bool() const noexcept
Is this handle empty?
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
dispatcher_handle_t() noexcept=default
This exception is thrown when pop is called after stop.
A demand queue for dispatcher with one common working thread and round-robin processing of prioritise...
void agent_unbound(priority_t priority)
Notification about detachment of an agent from the queue.
demand_unique_ptr_t pop()
Pop demand from the queue.
queue_for_one_priority_t * m_current_priority
Pointer to the current subqueue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock, const quotes_t "es)
queue_for_one_priority_t m_priorities[static_cast< std::size_t >(priority_t::p_max)+1]
Subqueues for priorities.
void agent_bound(priority_t priority)
Notification about attachment of yet another agent to the queue.
event_queue_t & event_queue_by_priority(priority_t priority)
Get queue for the priority specified.
bool m_shutdown
Shutdown flag.
queue_traits::lock_unique_ptr_t m_lock
Queue lock.
void stop()
Set the shutdown signal.
void handle_stats_for_each_prio(Lambda handler)
std::size_t m_total_demands_count
Total count of demands in the queue.
void switch_to_lower_priority()
void cleanup_queue(queue_for_one_priority_t &queue_info)
Destroy all demands in the queue specified.
void push(queue_for_one_priority_t *subqueue, demand_unique_ptr_t demand)
Push a new demand to the queue.
void add_demand_to_queue(queue_for_one_priority_t &queue, demand_unique_ptr_t demand)
Add a new demand to the tail of the queue specified.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
Data source for run-time monitoring of whole dispatcher.
stats::prefix_t m_base_prefix
Basic prefix for data sources.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t quote, std::size_t agents_count, std::size_t demands_count)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params, const quotes_t "es)
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
~dispatcher_template_t() noexcept override
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
Work_Thread m_work_thread
Working thread for the dispatcher.
demand_queue_t m_demand_queue
Demand queue for the dispatcher.
A storage of quotes for priorities.
size_t query(priority_t prio) const
Get the quote for a priority.
quotes_t & set(priority_t prio, std::size_t quote)
Set a new quote for a priority.
quotes_t(std::size_t default_value)
static void ensure_quote_not_zero(std::size_t value)
A part of implementation of work thread without activity tracking.
no_activity_tracking_impl_t(work_thread_holder_t thread_holder, Demand_Queue &queue)
A part of implementation of work thread with activity tracking.
with_activity_tracking_impl_t(work_thread_holder_t thread_holder, Demand_Queue &queue)
so_5::stats::work_thread_activity_stats_t take_activity_stats()
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_working_stats
Statictics for work activity.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_waiting_stats
Statictics for wait activity.
A working thread for dispatcher with one common working thread and support of demands priority.
auto pop_demand() -> decltype(std::declval< Demand_Queue >().pop())
void call_handler(so_5::execution_demand_t &demand)
so_5::current_thread_id_t thread_id() const
work_thread_template_t(work_thread_holder_t thread_holder, Demand_Queue &queue)
Initializing constructor.
Mixin with thread activity tracking flag.
Mixin that holds optional work thread factory.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Base for the case of internal stats lock.
Helper for collecting activity stats.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
An interface of data source.
#define SO_5_THROW_EXCEPTION(error_code, desc)
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Implementation details for dispatcher with round-robin policy of handling prioritized events.
Dispatcher which handles events of different priorities in round-robin maner.
dispatcher_handle_t make_dispatcher(environment_t &env, const quotes_t "es)
Create an instance of quoted_round_robin dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t "es)
Create an instance of quoted_round_robin dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t "es, disp_params_t params)
Create an instance of quoted_round_robin dispatcher.
Reusable code for dispatchers with one working thread for events of all priorities.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
Helpers for working with priorities.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
priority_t
Definition of supported priorities.
Description of queue for one priority.
demand_t * m_head
Head of the queue.
void push_evt_start(execution_demand_t demand) override
std::size_t m_demands_processed
Count of processed demands on the current iterations.
void push_evt_finish(execution_demand_t demand) noexcept override
std::size_t m_quote
A quote for this subqueue.
std::atomic< std::size_t > m_agents_count
Count of agents attached to that queue.
void push(execution_demand_t exec_demand) override
Enqueue new event to the queue.
demand_t * m_tail
Tail of the queue.
std::atomic< std::size_t > m_demands_count
Count of demands in the queue.
demand_queue_t * m_demand_queue
Pointer to main demand queue.
Statistic about one subqueue.
std::size_t m_agents_count
std::size_t m_demands_count
demand_t * m_next
Next demand in the queue.
demand_t(execution_demand_t &&source)
Initializing constructor.
A common data for all work thread implementations.
so_5::current_thread_id_t m_thread_id
ID of the work thread.
work_thread_holder_t m_thread_holder
Thread object.
common_data_t(work_thread_holder_t thread_holder, Demand_Queue &queue)
Demand_Queue & m_queue
Demands queue to work for.
A description of event execution demand.
Stats for a work thread activity.