SObjectizer  5.8
Loading...
Searching...
No Matches
prio_one_thread/reuse/work_thread.hpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief A working thread for dispatcher with one common working
8 * thread and support of demands priority.
9 *
10 * \since v.5.5.8
11 */
12
13#pragma once
14
15#include <so_5/current_thread_id.hpp>
16
17#include <so_5/disp/abstract_work_thread.hpp>
18
19#include <so_5/stats/work_thread_activity.hpp>
20#include <so_5/stats/impl/activity_tracking.hpp>
21
22#include <so_5/details/at_scope_exit.hpp>
23
24#include <so_5/impl/thread_join_stuff.hpp>
25
26#include <thread>
27
28namespace so_5 {
29
30namespace disp {
31
32namespace prio_one_thread {
33
34namespace reuse {
35
37
38//
39// common_data_t
40//
41/*!
42 * \brief A common data for all work thread implementations.
43 *
44 * \since v.5.5.18
45 */
46template< typename Demand_Queue >
48 {
49 //! Demands queue to work for.
50 Demand_Queue & m_queue;
51
52 //! Thread object.
54
55 //! ID of the work thread.
56 /*!
57 * \note Receives actual value only after successful start
58 * of the thread.
59 */
61
63 work_thread_holder_t thread_holder,
64 Demand_Queue & queue )
65 : m_queue( queue )
67 {}
68 };
69
70//
71// no_activity_tracking_impl_t
72//
73/*!
74 * \brief A part of implementation of work thread without activity tracking.
75 *
76 * \since v.5.5.18
77 */
78template< typename Demand_Queue >
79class no_activity_tracking_impl_t : protected common_data_t< Demand_Queue >
80 {
81 using base_type_t = common_data_t< Demand_Queue >;
82
83 public :
85 work_thread_holder_t thread_holder,
86 Demand_Queue & queue )
88 {}
89
90 protected :
91 void
92 work_started() { /* Nothing to do. */ }
93
94 void
95 work_finished() { /* Nothing to do. */ }
96
97 void
98 wait_started() { /* Nothing to do. */ }
99
100 void
101 wait_finished() { /* Nothing to do. */ }
102 };
103
104//
105// with_activity_tracking_impl_t
106//
107/*!
108 * \brief A part of implementation of work thread with activity tracking.
109 *
110 * \since v.5.5.18
111 */
112template< typename Demand_Queue >
113class with_activity_tracking_impl_t : protected common_data_t< Demand_Queue >
114 {
115 using base_type_t = common_data_t< Demand_Queue >;
116
117 public :
119 work_thread_holder_t thread_holder,
120 Demand_Queue & queue )
122 {}
123
134
135 protected :
136 //! Statictics for work activity.
140
141 //! Statictics for wait activity.
145
146 void
148
149 void
151
152 void
154
155 void
157 };
158
159} /* namespace work_thread_details */
160
161//
162// work_thread_template_t
163//
164/*!
165 * \brief A working thread for dispatcher with one common working
166 * thread and support of demands priority.
167 *
168 * \since v.5.5.8, v.5.5.18
169 */
170template<
171 typename Demand_Queue,
172 template<class> class Work_Thread >
173class work_thread_template_t : public Work_Thread< Demand_Queue >
174 {
175 using base_type_t = Work_Thread< Demand_Queue >;
176
177 public :
178 //! Initializing constructor.
180 work_thread_holder_t thread_holder,
181 Demand_Queue & queue )
183 {}
184
185 void
187 {
188 this->m_thread_holder.unchecked_get().start( [this]() { body(); } );
189 }
190
191 void
197
198 [[nodiscard]]
200 thread_id() const
201 {
202 return this->m_thread_id;
203 }
204
205 private :
206 void
208 {
210
211 try
212 {
213 for(;;)
214 {
215 auto d = this->pop_demand();
216 this->call_handler( *d );
217 }
218 }
219 catch( const typename Demand_Queue::shutdown_ex_t & )
220 {}
221 }
222
223 [[nodiscard]]
224 auto
225 pop_demand() -> decltype(std::declval<Demand_Queue>().pop())
226 {
227 this->wait_started();
229 [this] { this->wait_finished(); } );
230
231 return this->m_queue.pop();
232 }
233
234 void
236 {
237 this->work_started();
239 [this] { this->work_finished(); } );
240
242 }
243 };
244
245//
246// work_thread_no_activity_tracking_t
247//
248template< typename Demand_Queue >
253
254//
255// work_thread_with_activity_tracking_t
256//
257template< typename Demand_Queue >
262
263} /* namespace reuse */
264
265} /* namespace prio_one_thread */
266
267} /* namespace disp */
268
269} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
An analog of std::lock_guard for MPSC queue lock.
Container for storing parameters for MPSC queue.
An analog of std::unique_lock for MPSC queue lock.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
A demand queue for dispatcher with one common working thread and round-robin processing of prioritise...
void agent_unbound(priority_t priority)
Notification about detachment of an agent from the queue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock, const quotes_t &quotes)
queue_for_one_priority_t m_priorities[static_cast< std::size_t >(priority_t::p_max)+1]
Subqueues for priorities.
void agent_bound(priority_t priority)
Notification about attachment of yet another agent to the queue.
event_queue_t & event_queue_by_priority(priority_t priority)
Get queue for the priority specified.
void cleanup_queue(queue_for_one_priority_t &queue_info)
Destroy all demands in the queue specified.
void push(queue_for_one_priority_t *subqueue, demand_unique_ptr_t demand)
Push a new demand to the queue.
void add_demand_to_queue(queue_for_one_priority_t &queue, demand_unique_ptr_t demand)
Add a new demand to the tail of the queue specified.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t quote, std::size_t agents_count, std::size_t demands_count)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params, const quotes_t &quotes)
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
size_t query(priority_t prio) const
Get the quote for a priority.
Definition quotes.hpp:89
quotes_t & set(priority_t prio, std::size_t quote)
Set a new quote for a priority.
Definition quotes.hpp:76
static void ensure_quote_not_zero(std::size_t value)
Definition quotes.hpp:99
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_working_stats
Statictics for work activity.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_waiting_stats
Statictics for wait activity.
A working thread for dispatcher with one common working thread and support of demands priority.
auto pop_demand() -> decltype(std::declval< Demand_Queue >().pop())
work_thread_template_t(work_thread_holder_t thread_holder, Demand_Queue &queue)
Initializing constructor.
Mixin that holds optional work thread factory.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
Base for the case of internal stats lock.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Various stuff related to MPSC event queue implementation and tuning.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Implementation details for dispatcher with round-robin policy of handling prioritized events.
Dispatcher which handles events of different priorities in round-robin maner.
dispatcher_handle_t make_dispatcher(environment_t &env, const quotes_t &quotes)
Create an instance of quoted_round_robin dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t &quotes)
Create an instance of quoted_round_robin dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t &quotes, disp_params_t params)
Create an instance of quoted_round_robin dispatcher.
Reusable code for dispatchers with one working thread for events of all priorities.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
Event dispatchers.
Helpers for working with priorities.
Definition priority.hpp:73
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
priority_t
Definition of supported priorities.
Definition priority.hpp:28
A description of event execution demand.