SObjectizer 5.8
Loading...
Searching...
No Matches
prio_one_thread/quoted_round_robin/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief Functions for creating and binding of the single thread dispatcher
8 * with priority support (quoted round robin policy).
9 *
10 * \since
11 * v.5.5.8
12 */
13
14#include <so_5/disp/prio_one_thread/quoted_round_robin/pub.hpp>
15
16#include <so_5/disp/prio_one_thread/quoted_round_robin/impl/demand_queue.hpp>
17#include <so_5/disp/prio_one_thread/reuse/work_thread.hpp>
18
19#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
20#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
21#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
22
23#include <so_5/stats/repository.hpp>
24#include <so_5/stats/messages.hpp>
25#include <so_5/stats/std_names.hpp>
26
27#include <so_5/send_functions.hpp>
28
29namespace so_5 {
30
31namespace disp {
32
33namespace prio_one_thread {
34
36
37namespace impl {
38
39namespace stats = so_5::stats;
40
41namespace {
42
43void
45 const so_5::mbox_t &,
46 const stats::prefix_t &,
47 so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t<
48 demand_queue_t > & )
49 {
50 /* Nothing to do */
51 }
52
53void
54send_thread_activity_stats(
55 const so_5::mbox_t & mbox,
56 const stats::prefix_t & prefix,
57 so_5::disp::prio_one_thread::reuse::work_thread_with_activity_tracking_t<
58 demand_queue_t > & wt )
59 {
61 mbox,
62 prefix,
66 }
67
68} /* namespace anonymous */
69
70//
71// dispatcher_template_t
72//
73/*!
74 * \brief An implementation of dispatcher with one working thread and support
75 * of demand priorities (quoted round robin policy) in form of template class.
76 *
77 * \since
78 * v.5.5.8, v.5.5.18, v.5.6.0
79 */
80template< typename Work_Thread >
81class dispatcher_template_t final : public disp_binder_t
82 {
83 friend class disp_data_source_t;
84
85 public:
88 const std::string_view name_base,
89 disp_params_t params,
90 const quotes_t & quotes )
93 quotes }
99 name_base,
100 outliving_mutable(*this)
101 }
102 {
103 m_work_thread.start();
104 }
105
106 ~dispatcher_template_t() noexcept override
107 {
109 m_work_thread.join();
110 }
111
112 void
114 agent_t & /*agent*/ ) override
115 {
116 // Nothing to do.
117 }
118
119 void
121 agent_t & /*agent*/ ) noexcept override
122 {
123 // Nothing to do.
124 }
125
126 void
128 agent_t & agent ) noexcept override
129 {
130 const auto priority = agent.so_priority();
131
134
136 }
137
138 void
140 agent_t & agent ) noexcept override
141 {
142 const auto priority = agent.so_priority();
143
145 }
146
147 private:
148 /*!
149 * \brief Data source for run-time monitoring of whole dispatcher.
150 *
151 * \since
152 * v.5.5.8
153 */
154 class disp_data_source_t : public stats::source_t
155 {
156 //! Dispatcher to work with.
157 outliving_reference_t< dispatcher_template_t > m_dispatcher;
158
159 //! Basic prefix for data sources.
161
162 public :
164 const std::string_view name_base,
165 outliving_reference_t< dispatcher_template_t > disp )
166 : m_dispatcher{ disp }
168 "pot-qrr",
169 name_base,
170 &(disp.get()) )
171 }
172 {}
173
174 void
175 distribute( const mbox_t & mbox ) override
176 {
177 auto & disp = m_dispatcher.get();
178
179 std::size_t agents_count = 0;
180
181 disp.m_demand_queue.handle_stats_for_each_prio(
182 [&]( const demand_queue_t::queue_stats_t & stat ) {
184 mbox,
185 stat.m_priority,
186 stat.m_quote,
189
190 agents_count += stat.m_agents_count;
191 } );
192
194 mbox,
197 agents_count );
198
199 send_thread_activity_stats(
200 mbox,
202 disp.m_work_thread );
203 }
204
205 private:
206 void
208 const mbox_t & mbox,
209 priority_t priority,
210 std::size_t quote,
211 std::size_t agents_count,
212 std::size_t demands_count )
213 {
214 std::ostringstream ss;
215 ss << m_base_prefix.c_str() << "/p" << to_size_t(priority);
216
217 const stats::prefix_t prefix{ ss.str() };
218
220 mbox,
221 prefix,
223 quote );
224
226 mbox,
227 prefix,
229 agents_count );
230
232 mbox,
233 prefix,
235 demands_count );
236 }
237 };
238
239 //! Demand queue for the dispatcher.
241
242 //! Working thread for the dispatcher.
243 Work_Thread m_work_thread;
244
245 //! Data source for run-time monitoring.
248 };
249
250//
251// dispatcher_handle_maker_t
252//
254 {
255 public :
257 make( disp_binder_shptr_t binder ) noexcept
258 {
259 return { std::move( binder ) };
260 }
261 };
262
263} /* namespace impl */
264
265//
266// make_dispatcher
267//
270 environment_t & env,
271 const std::string_view data_sources_name_base,
272 const quotes_t & quotes,
273 disp_params_t params )
274 {
275 using namespace so_5::disp::reuse;
276 using namespace so_5::disp::prio_one_thread::reuse;
277
278 using dispatcher_no_activity_tracking_t =
279 impl::dispatcher_template_t<
280 work_thread_no_activity_tracking_t< impl::demand_queue_t > >;
281
282 using dispatcher_with_activity_tracking_t =
283 impl::dispatcher_template_t<
284 work_thread_with_activity_tracking_t< impl::demand_queue_t > >;
285
286 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
288 dispatcher_no_activity_tracking_t,
289 dispatcher_with_activity_tracking_t >(
291 data_sources_name_base,
292 std::move(params),
293 quotes );
294
295 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
296 }
297
298} /* namespace quoted_round_robin */
299
300} /* namespace prio_one_thread */
301
302} /* namespace disp */
303
304} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
priority_t so_priority() const noexcept
Get the priority of the agent.
Definition agent.hpp:2555
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
const lock_factory_t & lock_factory() const
Getter for lock factory.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
A demand queue for dispatcher with one common working thread and round-robin processing of prioritise...
void agent_unbound(priority_t priority)
Notification about detachment of an agent from the queue.
demand_queue_t(queue_traits::lock_unique_ptr_t lock, const quotes_t &quotes)
void agent_bound(priority_t priority)
Notification about attachment of yet another agent to the queue.
event_queue_t & event_queue_by_priority(priority_t priority)
Get queue for the priority specified.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_priority(const mbox_t &mbox, priority_t priority, std::size_t quote, std::size_t agents_count, std::size_t demands_count)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params, const quotes_t &quotes)
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
Interface for dispatcher binders.
SObjectizer Environment.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
constexpr const char * c_str() const noexcept
Access to prefix value.
Definition prefix.hpp:80
prefix_t(const std::string &value) noexcept(noexcept(value.c_str()))
Initializing constructor.
Definition prefix.hpp:73
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, so_5::disp::prio_one_thread::reuse::work_thread_no_activity_tracking_t< demand_queue_t > &)
Implementation details for dispatcher with round-robin policy of handling prioritized events.
Dispatcher which handles events of different priorities in round-robin maner.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, const quotes_t &quotes, disp_params_t params)
Create an instance of quoted_round_robin dispatcher.
Reusable code for dispatchers with one working thread for events of all priorities.
Dispatcher with one working thread for events of all priorities.
Reusable components for dispatchers.
work_thread_holder_t acquire_work_thread(const work_thread_factory_mixin_t< Params > &params, environment_t &env)
Helper function for acquiring a new worker thread from an appropriate work thread factory.
so_5::stats::prefix_t make_disp_prefix(const std::string_view disp_type, const std::string_view data_sources_name_base, const void *disp_this_pointer)
Create basic prefix for dispatcher data source names.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Event dispatchers.
Declarations of messages used by run-time monitoring and statistics.
Definition messages.hpp:36
Predefined suffixes of data-sources.
Definition std_names.hpp:55
SO_5_FUNC suffix_t agent_count()
Suffix for data source with count of agents bound to some entity.
Definition std_names.cpp:66
SO_5_FUNC suffix_t work_thread_queue_size()
Suffix for data source with count of demands in a working thread event queue.
Definition std_names.cpp:78
SO_5_FUNC suffix_t work_thread_activity()
Suffix for data source with work thread activity statistics.
Definition std_names.cpp:84
SO_5_FUNC suffix_t demand_quote()
Suffix for data source with size of quote for demands processing.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
std::size_t to_size_t(priority_t priority)
Helper function for conversion from priority to size_t.
Definition priority.hpp:48
priority_t
Definition of supported priorities.
Definition priority.hpp:28
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
A message with value of some quantity.
Definition messages.hpp:60
Information about one work thread activity.
Definition messages.hpp:108