SObjectizer  5.8
Loading...
Searching...
No Matches
prio_dedicated_threads/one_per_prio/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5/*!
6 * \file
7 * \brief Functions for creating and binding of the dispatcher with
8 * dedicated threads per priority.
9 *
10 * \since
11 * v.5.5.8
12 */
13
14#include <so_5/disp/prio_dedicated_threads/one_per_prio/pub.hpp>
15
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
17
18#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
19#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
20#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
21
22#include <so_5/stats/repository.hpp>
23#include <so_5/stats/messages.hpp>
24#include <so_5/stats/std_names.hpp>
25
26#include <so_5/send_functions.hpp>
27
28#include <so_5/details/invoke_noexcept_code.hpp>
29
30#include <algorithm>
31
32namespace so_5 {
33
34namespace disp {
35
37
38namespace one_per_prio {
39
40namespace impl {
41
42namespace stats = so_5::stats;
43
44namespace {
45
46void
48 const so_5::mbox_t &,
49 const stats::prefix_t &,
50 so_5::disp::reuse::work_thread::work_thread_no_activity_tracking_t & )
51 {
52 /* Nothing to do */
53 }
54
55void
57 const so_5::mbox_t & mbox,
58 const stats::prefix_t & prefix,
59 so_5::disp::reuse::work_thread::work_thread_with_activity_tracking_t & wt )
60 {
61 so_5::send< stats::messages::work_thread_activity >(
62 mbox,
63 prefix,
64 stats::suffixes::work_thread_activity(),
65 wt.thread_id(),
66 wt.take_activity_stats() );
67 }
68
69} /* namespace anonymous */
70
71//
72// dispatcher_template_t
73//
74/*!
75 * \brief An actual implementation of dispatcher with dedicated thread
76 * for every priority in form of a template class.
77 *
78 * \since
79 * v.5.5.8, v.5.5.18, v.5.6.0
80 */
81template< typename Work_Thread >
83 {
84 public:
87 const std::string_view name_base,
88 disp_params_t params )
93 }
94 {
97 }
98
99 ~dispatcher_template_t() noexcept override
100 {
101 for( auto & t : m_threads )
102 t->shutdown();
103
104 for( auto & t : m_threads )
105 t->wait();
106 }
107
108 void
110 agent_t & /*agent*/ ) override
111 {
112 // Nothing to do.
113 }
114
115 void
117 agent_t & /*agent*/ ) noexcept override
118 {
119 // Nothing to do.
120 }
121
122 void
124 agent_t & agent ) noexcept override
125 {
126 const auto priority = agent.so_priority();
127
130
132 }
133
134 void
136 agent_t & agent ) noexcept override
137 {
138 const auto priority = agent.so_priority();
139
141 }
142
143 private:
144 friend class disp_data_source_t;
145
146 /*!
147 * \brief Data source for run-time monitoring of whole dispatcher.
148 *
149 * \since
150 * v.5.5.8
151 */
152 class disp_data_source_t final : public stats::source_t
153 {
154 //! Dispatcher to work with.
156
157 //! Basic prefix for data sources.
159
160 public :
162 const std::string_view name_base,
164 : m_dispatcher{ disp }
165 , m_base_prefix{ so_5::disp::reuse::make_disp_prefix(
166 "pdt-opp",
167 name_base,
168 &(disp.get()) )
169 }
170 {}
171
172 void
173 distribute( const mbox_t & mbox ) override
174 {
175 std::size_t agents_count = 0;
176
177 auto & disp = m_dispatcher.get();
178
179 so_5::prio::for_each_priority( [&]( priority_t p ) {
180 auto agents = disp.m_agents_per_priority[
181 to_size_t(p) ].load( std::memory_order_acquire );
182
183 agents_count += agents;
184
185 distribute_value_for_work_thread(
186 mbox,
187 p,
188 agents,
189 *(disp.m_threads[ to_size_t(p) ]) );
190 } );
191
192 so_5::send< stats::messages::quantity< std::size_t > >(
193 mbox,
194 m_base_prefix,
195 stats::suffixes::agent_count(),
196 agents_count );
197 }
198
199 private:
200 void
202 const mbox_t & mbox,
203 priority_t priority,
204 std::size_t agents_count,
205 Work_Thread & wt )
206 {
207 std::ostringstream ss;
208 ss << m_base_prefix.c_str() << "/wt-p" << to_size_t(priority);
209
210 const stats::prefix_t prefix{ ss.str() };
211
212 so_5::send< stats::messages::quantity< std::size_t > >(
213 mbox,
214 prefix,
215 stats::suffixes::work_thread_queue_size(),
216 wt.demands_count() );
217
218 so_5::send< stats::messages::quantity< std::size_t > >(
219 mbox,
220 prefix,
221 stats::suffixes::agent_count(),
222 agents_count );
223
224 send_thread_activity_stats( mbox, prefix, wt );
225 }
226 };
227
228 //! Data source for run-time monitoring.
229 stats::auto_registered_source_holder_t< disp_data_source_t >
231
232 //! Working threads for every priority.
234
235 //! Counters for agent count for every priority.
238
239 //! Allocate work threads for dispatcher.
240 void
256
257 //! Start all working threads.
258 void
260 {
261 using namespace std;
262 using namespace so_5::details;
263 using namespace so_5::prio;
264
265 // This helper vector will be used for shutdown of
266 // started threads in the case of an exception.
268 // Initially all items must be NULL.
270
272 for( std::size_t i = 0; i != total_priorities_count; ++i )
273 {
276
277 m_threads[ i ]->start();
278
279 // Thread successfully started. Pointer to it
280 // must be used on rollback.
281 started_threads[ i ] = m_threads[ i ].get();
282 }
283 },
284 [&] {
286 // Shutdown all started threads.
287 for( auto t : started_threads )
288 if( t )
289 {
290 t->shutdown();
291 t->wait();
292 }
293 else
294 // No more started threads.
295 break;
296 } );
297 } );
298 }
299 };
300
301//
302// dispatcher_handle_maker_t
303//
305 {
306 public :
308 make( disp_binder_shptr_t binder ) noexcept
309 {
310 return { std::move( binder ) };
311 }
312 };
313
314} /* namespace impl */
315
316//
317// make_dispatcher
318//
321 environment_t & env,
322 const std::string_view data_sources_name_base,
323 disp_params_t params )
324 {
325 using namespace so_5::disp::reuse;
326
327 using dispatcher_no_activity_tracking_t =
329 work_thread::work_thread_no_activity_tracking_t >;
330
331 using dispatcher_with_activity_tracking_t =
333 work_thread::work_thread_with_activity_tracking_t >;
334
335 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
336 disp_binder_t,
337 dispatcher_no_activity_tracking_t,
338 dispatcher_with_activity_tracking_t >(
339 outliving_mutable(env),
340 data_sources_name_base,
341 std::move(params) );
342
343 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
344 }
345
346} /* namespace one_per_prio */
347
348} /* namespace prio_dedicated_threads */
349
350} /* namespace disp */
351
352} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
void distribute_value_for_work_thread(const mbox_t &mbox, priority_t priority, std::size_t agents_count, Work_Thread &wt)
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
std::vector< std::unique_ptr< Work_Thread > > m_threads
Working threads for every priority.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void allocate_work_threads(environment_t &env, const disp_params_t &params)
Allocate work threads for dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
Interface for dispatcher binders.
SObjectizer Environment.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
Implementation details for dispatcher with one thread per priority.
Dispatcher which creates exactly one thread per priority.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_per_prio dispatcher.
Dispatchers with dedicated threads for every priority.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
priority_t
Definition of supported priorities.
Definition priority.hpp:28