SObjectizer  5.8
Loading...
Searching...
No Matches
active_obj/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5
3*/
4
5#include <so_5/disp/active_obj/pub.hpp>
6
7#include <so_5/event_queue.hpp>
8#include <so_5/send_functions.hpp>
9
10#include <so_5/details/rollback_on_exception.hpp>
11
12#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
13#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
14#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
15
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
17
18#include <so_5/stats/repository.hpp>
19#include <so_5/stats/messages.hpp>
20#include <so_5/stats/std_names.hpp>
21
22#include <map>
23#include <mutex>
24#include <algorithm>
25
26namespace so_5
27{
28
29namespace disp
30{
31
32namespace active_obj
33{
34
35namespace impl
36{
37
38namespace work_thread = so_5::disp::reuse::work_thread;
39namespace stats = so_5::stats;
40
41namespace
42{
43
44/*!
45 * \brief Just a helper function for consequetive call to shutdown and wait.
46 * \since
47 * v.5.5.4
48 */
49template< class T >
50void
52 {
53 w.shutdown();
54 w.wait();
55 }
56
57template< class Work_Thread >
58void
60 const so_5::mbox_t & mbox,
61 const stats::prefix_t & prefix,
62 Work_Thread & wt )
63 {
64 so_5::send< stats::messages::quantity< std::size_t > >(
65 mbox,
66 prefix,
67 stats::suffixes::work_thread_queue_size(),
68 wt.demands_count() );
69 }
70
71void
73 const so_5::mbox_t &,
74 const stats::prefix_t &,
75 work_thread::work_thread_no_activity_tracking_t & )
76 {
77 /* Nothing to do */
78 }
79
80void
82 const so_5::mbox_t & mbox,
83 const stats::prefix_t & prefix,
84 work_thread::work_thread_with_activity_tracking_t & wt )
85 {
86 so_5::send< stats::messages::work_thread_activity >(
87 mbox,
88 prefix,
89 stats::suffixes::work_thread_activity(),
90 wt.thread_id(),
91 wt.take_activity_stats() );
92 }
93
94} /* anonymous */
95
96//
97// dispatcher_template_t
98//
99
100/*!
101 * \brief Implementation of active object dispatcher in form of template class.
102 */
103template< typename Work_Thread >
104class dispatcher_template_t final : public disp_binder_t
105 {
106 public:
108 //! SObjectizer Environment to work in.
110 //! Base part of data sources names.
111 const std::string_view name_base,
112 //! Dispatcher's parameters.
113 disp_params_t params )
114 : m_env{ env }
115 , m_params{ std::move(params) }
116 , m_data_source{
117 outliving_mutable(m_env.get().stats_repository()),
118 name_base,
119 outliving_mutable( *this )
120 }
121 {}
122
123 ~dispatcher_template_t() noexcept override
124 {
125 // All working threads should receive stop signal.
126 for( auto & p: m_agent_threads )
127 p.second->shutdown();
128
129 // All working threads should be joined.
130 for( auto & p: m_agent_threads )
131 p.second->wait();
132 }
133
134 void
136 agent_t & agent ) override
137 {
138 std::lock_guard< std::mutex > lock{ m_lock };
139
140 if( m_agent_threads.end() != m_agent_threads.find( &agent ) )
142 rc_disp_create_failed,
143 "thread for the agent is already exists" );
144
145 auto lock_factory = m_params.queue_params().lock_factory();
146 auto thread = std::make_shared< Work_Thread >(
147 acquire_work_thread( m_params, m_env.get() ),
148 std::move(lock_factory) );
149
150 thread->start();
151 so_5::details::do_with_rollback_on_exception(
152 [&] { m_agent_threads[ &agent ] = thread; },
153 [&thread] { shutdown_and_wait( *thread ); } );
154 }
155
156 void
158 agent_t & agent ) noexcept override
159 {
160 const auto eject_thread = [&] {
161 std::lock_guard< std::mutex > lock{ m_lock };
162
163 auto it = m_agent_threads.find( &agent );
164 auto thread = it->second;
165 m_agent_threads.erase( it );
166
167 return thread;
168 };
169
170 shutdown_and_wait( *eject_thread() );
171 }
172
173 void
175 agent_t & agent ) noexcept override
176 {
177 const auto get_queue = [&] {
178 std::lock_guard< std::mutex > lock{ m_lock };
179 return m_agent_threads.find( &agent )->second->get_agent_binding();
180 };
181
182 agent.so_bind_to_dispatcher( *get_queue() );
183 }
184
185 void
187 agent_t & agent ) noexcept override
188 {
189 // We should perform the same actions as for undo_preallocation.
190 undo_preallocation( agent );
191 }
192
193 private:
194 friend class disp_data_source_t;
195
196 //! An alias for shared pointer to work thread object.
197 using work_thread_shptr_t = std::shared_ptr< Work_Thread >;
198
199 //! Typedef for mapping from agents to their working threads.
200 using agent_thread_map_t =
201 std::map< const agent_t *, work_thread_shptr_t >;
202
203 /*!
204 * \brief Data source for run-time monitoring of whole dispatcher.
205 * \since
206 * v.5.5.4
207 */
208 class disp_data_source_t final : public stats::source_t
209 {
210 //! Dispatcher to work with.
211 outliving_reference_t< dispatcher_template_t > m_dispatcher;
212
213 //! Basic prefix for data source names.
215
216 public :
218 const std::string_view name_base,
219 outliving_reference_t< dispatcher_template_t > disp )
220 : m_dispatcher{ disp }
221 {
222 using namespace so_5::disp::reuse;
223
224 m_base_prefix = make_disp_prefix(
225 "ao", // ao -- active_objects
226 name_base,
227 &(m_dispatcher.get()) );
228 }
229
230 void
231 distribute( const mbox_t & mbox ) override
232 {
233 auto & disp = m_dispatcher.get();
234
235 std::lock_guard< std::mutex > lock{ disp.m_lock };
236
237 so_5::send< stats::messages::quantity< std::size_t > >(
238 mbox,
239 m_base_prefix,
240 stats::suffixes::agent_count(),
241 disp.m_agent_threads.size() );
242
243 for( const auto & [a, wt] : disp.m_agent_threads )
244 distribute_value_for_work_thread( mbox, a, *wt );
245 }
246
247 private:
248 void
250 const mbox_t & mbox,
251 const agent_t * agent,
252 Work_Thread & wt )
253 {
254 std::ostringstream ss;
255 ss << m_base_prefix.c_str() << "/wt-"
256 << so_5::disp::reuse::ios_helpers::pointer{ agent };
257
258 const stats::prefix_t wt_prefix{ ss.str() };
259
260 send_demands_count_stats( mbox, wt_prefix, wt );
261 send_thread_activity_stats( mbox, wt_prefix, wt );
262 }
263 };
264
265 /*!
266 * \brief SObjectizer Environment to work in.
267 *
268 * It is necessary for calling work_thread_factory.
269 *
270 * \since v.5.7.3
271 */
273
274 /*!
275 * \brief Parameters for the dispatcher.
276 *
277 * \since
278 * v.5.5.10
279 */
281
282 //! This object lock.
284
285 //! A map from agents to single thread dispatchers.
287
288 /*!
289 * \brief Data source for run-time monitoring.
290 *
291 * \since
292 * v.5.5.4, v.5.6.0
293 */
294 stats::auto_registered_source_holder_t< disp_data_source_t >
296};
297
298//
299// dispatcher_handle_maker_t
300//
302 {
303 public :
305 make( disp_binder_shptr_t binder ) noexcept
306 {
307 return { std::move( binder ) };
308 }
309 };
310
311} /* namespace impl */
312
313//
314// make_dispatcher
315//
316[[nodiscard]]
319 environment_t & env,
320 const std::string_view data_sources_name_base,
321 disp_params_t params )
322 {
323 using namespace so_5::disp::reuse;
324
325 using dispatcher_no_activity_tracking_t =
326 impl::dispatcher_template_t<
327 work_thread::work_thread_no_activity_tracking_t >;
328
329 using dispatcher_with_activity_tracking_t =
330 impl::dispatcher_template_t<
331 work_thread::work_thread_with_activity_tracking_t >;
332
333 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
334 disp_binder_t,
335 dispatcher_no_activity_tracking_t,
336 dispatcher_with_activity_tracking_t >(
337 outliving_mutable(env),
338 data_sources_name_base,
339 std::move(params) );
340
341 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
342 }
343
344} /* namespace active_obj */
345
346} /* namespace disp */
347
348} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
Alias for namespace with traits of event queue.
A handle for active_obj dispatcher.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
stats::prefix_t m_base_prefix
Basic prefix for data source names.
void distribute_value_for_work_thread(const mbox_t &mbox, const agent_t *agent, Work_Thread &wt)
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
outliving_reference_t< environment_t > m_env
SObjectizer Environment to work in.
const disp_params_t m_params
Parameters for the dispatcher.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
agent_thread_map_t m_agent_threads
A map from agents to single thread dispatchers.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
Interface for dispatcher binders.
SObjectizer Environment.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
void shutdown_and_wait(T &w)
Just a helper function for consequetive call to shutdown and wait.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
void send_demands_count_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, Work_Thread &wt)
Active objects dispatcher implemetation details.
Active objects dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_obj dispatcher.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33