SObjectizer 5.8
Loading...
Searching...
No Matches
mt_env_infrastructure.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief Default implementation of multithreaded environment infrastructure.
8 *
9 * \since v.5.5.19
10 */
11
12#include <so_5/impl/mt_env_infrastructure.hpp>
13
14#include <so_5/impl/run_stage.hpp>
15#include <so_5/impl/internal_env_iface.hpp>
16
17#include <so_5/environment.hpp>
18#include <so_5/send_functions.hpp>
19
20#include <so_5/disp/one_thread/pub.hpp>
21
22namespace so_5 {
23
24namespace env_infrastructures {
25
26namespace default_mt {
27
28namespace impl {
29
30//
31// coop_repo_t
32//
33coop_repo_t::coop_repo_t(
35 coop_listener_unique_ptr_t coop_listener )
36 : coop_repository_basis_t( env, std::move(coop_listener) )
38 {}
39
40void
41coop_repo_t::start()
42{
43 // A separate thread for doing the final dereg must be started.
44 m_final_dereg_thread = std::thread{ [this] { final_dereg_thread_body(); } };
45}
46
47void
48coop_repo_t::finish()
49{
50 // Deregistration of all cooperations should be initiated.
52
53 // Deregistration of all cooperations should be finished.
55
56 // Notify a dedicated thread and wait while it will be stopped.
57 {
58 std::lock_guard< std::mutex > lock{ m_final_dereg_chain_lock };
60 m_final_dereg_chain_cond.notify_one();
61 }
63}
64
65void
67 coop_shptr_t coop )
68{
69 std::lock_guard< std::mutex > lck{ m_final_dereg_chain_lock };
70
71 // Update the final_dereg_chain.
72 m_final_dereg_chain.append( std::move(coop) );
73
75 {
76 // Final deregistration thread may wait, have to wake it up.
77 m_final_dereg_chain_cond.notify_one();
78 }
79}
80
81bool
83 coop_shptr_t coop ) noexcept
84{
85 const auto result =
87
90
91 return result.m_has_live_coop;
92}
93
94void
103
104void
106{
107 std::unique_lock< std::mutex > lck{ m_lock };
108
110 [this] { return status_t::normal != m_status; } );
111}
112
113void
115{
116 std::unique_lock< std::mutex > lck{ m_lock };
117
118 // Must wait for a signal is there are cooperations in
119 // the deregistration process.
121 [this] { return 0u == m_total_coops; } );
122}
123
125coop_repo_t::query_stats()
126{
127 const auto final_dereg_coops = [this]() {
128 std::lock_guard< std::mutex > lck{ m_final_dereg_chain_lock };
130 }();
131
132 const auto basis_stats = coop_repository_basis_t::query_stats();
133
134 return {
135 basis_stats.m_total_coop_count,
136 basis_stats.m_total_agent_count,
137 final_dereg_coops
138 };
139}
140
141void
143{
144 std::unique_lock< std::mutex > lck{ m_final_dereg_chain_lock };
145
146 for( bool should_continue = true; should_continue; )
147 {
148 bool should_wait = true;
149
150 // If there are some waiting coops they have to be processed even
151 // if m_final_dereg_thread_shutdown_flag is set.
153 {
154 // There are some coops to be deregistered.
156
157 // Because the processing takes some time there is no need
158 // to sleep before new check for m_final_dereg_chain.
159 should_wait = false;
160 }
161
163 {
164 // It's time to finish the work.
165 should_continue = false;
166 }
167 else if( should_wait )
168 {
169 // No coops to deregister. Have to wait.
170 m_final_dereg_chain_cond.wait( lck );
171 }
172 }
173}
174
175void
177 std::unique_lock< std::mutex > & lck ) noexcept
178{
179 //
180 // NOTE: don't expect exceptions here.
181 //
182
183 // There are some coops to be deregistered.
184 // Have to extract the current value of final dereg chain from
185 // the coop_repo instance.
187
188 // All following actions has to be performed on unlocked mutex.
189 lck.unlock();
190
191 // Do final_deregister_coop for every item in the chain
192 // one by one.
194
195 // Have to reacquire the lock back.
196 lck.lock();
197}
198
199//
200// mt_env_infrastructure_t
201//
203 environment_t & env,
204 environment_params_t::default_disp_params_t default_disp_params,
205 timer_thread_unique_ptr_t timer_thread,
206 coop_listener_unique_ptr_t coop_listener,
207 mbox_t stats_distribution_mbox )
208 : m_env( env )
209 , m_default_dispatcher_params{ std::move(default_disp_params) }
210 , m_timer_thread( std::move(timer_thread) )
211 , m_coop_repo( outliving_mutable(env), std::move(coop_listener) )
212 , m_stats_controller( std::move(stats_distribution_mbox) )
213 {
214 }
215
216void
217mt_env_infrastructure_t::launch( env_init_t init_fn )
218 {
219 run_default_dispatcher_and_go_further( std::move(init_fn) );
220 }
221
222void
224 {
225 // Sends shutdown signal for all agents.
227 }
228
229[[nodiscard]]
232 coop_handle_t parent,
233 disp_binder_shptr_t default_binder )
234 {
235 return m_coop_repo.make_coop( std::move(parent), std::move(default_binder) );
236 }
237
241 {
242 return m_coop_repo.register_coop( std::move(coop) );
243 }
244
245void
247 coop_shptr_t coop ) noexcept
248 {
250 }
251
252bool
254 coop_shptr_t coop ) noexcept
255 {
256 return m_coop_repo.final_deregister_coop( std::move(coop) );
257 }
258
261 const std::type_index & type_wrapper,
262 const message_ref_t & msg,
263 const mbox_t & mbox,
264 std::chrono::steady_clock::duration pause,
265 std::chrono::steady_clock::duration period )
266 {
268 type_wrapper,
269 mbox,
270 msg,
271 pause,
272 period );
273 }
274
275void
277 const std::type_index & type_wrapper,
278 const message_ref_t & msg,
279 const mbox_t & mbox,
280 std::chrono::steady_clock::duration pause )
281 {
283 type_wrapper,
284 mbox,
285 msg,
286 pause,
287 std::chrono::milliseconds::zero() );
288 }
289
295
301
307
313
314namespace {
315
316//! Helper class to be used with std::visit.
318 {
319 [[nodiscard]]
320 disp_binder_shptr_t
322 {
323 return handle.binder();
324 }
325
326 [[nodiscard]]
327 disp_binder_shptr_t
329 {
330 return handle.binder();
331 }
332 };
333
334} /* namespace anonymous */
335
336disp_binder_shptr_t
341
342namespace {
343
344//! Helper class to be used with std::visit.
346 {
348
349 explicit dispatcher_maker_t( environment_t & env ) : m_env{ env }
350 {}
351
352 [[nodiscard]]
353 mt_env_infrastructure_t::default_dispatcher_holder_t
354 operator()( const so_5::disp::one_thread::disp_params_t & params ) const
355 {
356 return {
358 m_env,
359 std::string{ "DEFAULT" },
360 params )
361 };
362 }
363
364 [[nodiscard]]
365 mt_env_infrastructure_t::default_dispatcher_holder_t
366 operator()( const so_5::disp::nef_one_thread::disp_params_t & params ) const
367 {
368 return {
370 m_env,
371 std::string{ "DEFAULT" },
372 params )
373 };
374 }
375 };
376
377//! Helper class to be used with std::visit.
379 {
380 void
382 {
383 handle.reset();
384 }
385
386 void
388 {
389 handle.reset();
390 }
391 };
392
393} /* namespace anonymous */
394
395void
397 env_init_t init_fn )
398 {
400 "run_default_dispatcher",
401 [this] {
402 // Default dispatcher should be created.
403 m_default_dispatcher = std::visit(
406 },
407 [this] {
408 // Default dispatcher is no more needed.
409 std::visit( handle_reseter_t{}, m_default_dispatcher );
410 },
411 [this, init_fn] {
412 run_timer_thread_and_go_further( std::move(init_fn) );
413 } );
414 }
415
416void
418 env_init_t init_fn )
419 {
421 "run_timer",
422 [this] { m_timer_thread->start(); },
423 [this] { m_timer_thread->finish(); },
424 [this, init_fn] {
425 run_agent_core_and_go_further( std::move(init_fn) );
426 } );
427 }
428
429void
431 env_init_t init_fn )
432 {
434 "run_agent_core",
435 [this] { m_coop_repo.start(); },
436 [this] { m_coop_repo.finish(); },
437 [this, init_fn] {
439 } );
440 }
441
442void
458
459} /* namespace impl */
460
461//
462// factory
463//
464SO_5_FUNC environment_infrastructure_factory_t
466 {
467 return [](
468 environment_t & env,
469 environment_params_t & params,
470 mbox_t stats_distribution_mbox )
471 {
472 // Timer thread is necessary for that environment.
473 auto timer =
477
478 // Now the environment object can be created.
479 auto obj = new impl::mt_env_infrastructure_t(
480 env,
482 std::move(timer),
484 std::move(stats_distribution_mbox) );
485
486 return environment_infrastructure_unique_ptr_t(
487 obj,
489 };
490 }
491
492} /* namespace default_mt */
493
494} /* namespace env_infrastructures */
495
496} /* namespace so_5 */
Type of smart handle for a cooperation.
A special type that plays role of unique_ptr for coop.
Definition coop.hpp:1342
Alias for namespace with traits of event queue.
A handle for nef_one_thread dispatcher.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
void reset() noexcept
Drop the content of handle.
Alias for namespace with traits of event queue.
A handle for one_thread dispatcher.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
void reset() noexcept
Drop the content of handle.
void start_deregistration()
Initiate start of the cooperation deregistration.
void ready_to_deregister_notify(coop_shptr_t coop)
Notification about readiness of the cooperation deregistration.
std::condition_variable m_final_dereg_chain_cond
Notification object to inform that the chain of coops for the final deregistration isn't empty anymor...
bool final_deregister_coop(coop_shptr_t coop) noexcept
Do final actions of the cooperation deregistration.
void process_current_final_dereg_chain(std::unique_lock< std::mutex > &lck) noexcept
Method that performs the final deregistration for coops in the m_final_dereg_chain.
so_5::impl::final_dereg_chain_holder_t m_final_dereg_chain
The chain of coops for the final deregistration.
std::mutex m_final_dereg_chain_lock
Lock object for thread-safety of the chain of coops ready for the final deregistration.
coop_repo_t(outliving_reference_t< environment_t > env, coop_listener_unique_ptr_t coop_listener)
Initializing constructor.
void wait_all_coop_to_deregister()
Wait for end of all cooperations deregistration.
void final_dereg_thread_body()
Method that implements the body of final deregistration thread.
std::condition_variable m_deregistration_started_cond
Condition variable for the deregistration start indication.
bool m_final_dereg_thread_shutdown_flag
The flag for shutting down the final deregistration thread.
std::condition_variable m_deregistration_finished_cond
Condition variable for the deregistration finish indication.
environment_infrastructure_t::coop_repository_stats_t query_stats()
Get the current statistic for run-time monitoring.
std::thread m_final_dereg_thread
A separate thread for doing the final deregistration.
void wait_for_start_deregistration()
Wait for a signal about start of the cooperation deregistration.
Default implementation of multithreaded environment infrastructure.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
Initiate a timer (delayed or periodic message).
::so_5::stats::repository_t & stats_repository() noexcept override
Get stats repository for the environment.
const environment_params_t::default_disp_params_t m_default_dispatcher_params
Parameters for the default dispatcher.
timer_thread_stats_t query_timer_thread_stats() override
Query run-time statistics for timer (thread or manager).
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
Initiate a delayed message.
void stop() noexcept override
Initiate a signal for shutdown of Environment.
coop_handle_t register_coop(coop_unique_holder_t coop) override
Register new cooperation.
::so_5::stats::controller_t & stats_controller() noexcept override
Get stats controller for the environment.
timer_thread_unique_ptr_t m_timer_thread
Timer thread to be used by the environment.
bool final_deregister_coop(coop_shptr_t coop_name) noexcept override
Do final actions of the cooperation deregistration.
void launch(env_init_t init_fn) override
Do actual launch of SObjectizer's Environment.
mt_env_infrastructure_t(environment_t &env, environment_params_t::default_disp_params_t default_disp_params, timer_thread_unique_ptr_t timer_thread, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
Initializing constructor.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
Create an instance of a new coop.
coop_repository_stats_t query_coop_repository_stats() override
Query run-time statistics for cooperation repository.
::so_5::stats::impl::std_controller_t m_stats_controller
Run-time stats controller to be used by the environment.
environment_t & m_env
SOEnv for that this infrastructure was created.
disp_binder_shptr_t make_default_disp_binder() override
Create a binder for the default dispatcher.
An interface for environment_infrastructure entity.
static environment_infrastructure_deleter_fnptr_t default_deleter()
Default deleter for environment_infrastructure object.
Parameters for the SObjectizer Environment initialization.
coop_listener_unique_ptr_t so5_giveout_coop_listener()
Get cooperation listener.
so_5::timer_thread_factory_t so5_giveout_timer_thread_factory()
Get the timer_thread factory.
const default_disp_params_t & default_disp_params() const
Get the parameters for the default dispatcher.
const error_logger_shptr_t & so5_error_logger() const
Get error logger for the environment.
SObjectizer Environment.
A basic part for various implementations of coop_repository.
std::size_t m_total_coops
Total count of coops.
std::mutex m_lock
Lock for coop repository.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder)
Create an instance of a new coop.
void deregister_all_coop() noexcept
Deregisted all cooperations.
coop_repository_basis_t(outliving_reference_t< environment_t > environment, coop_listener_unique_ptr_t coop_listener)
final_deregistration_result_t final_deregister_coop(coop_shptr_t coop) noexcept
Do final actions of the cooperation deregistration.
try_switch_to_shutdown_result_t
Result of attempt to switch to shutdown state.
environment_infrastructure_t::coop_repository_stats_t query_stats()
Get the current statistic for run-time monitoring.
status_t
Enumeration of possible repository statuses.
try_switch_to_shutdown_result_t try_switch_to_shutdown() noexcept
Try to switch repository to shutdown state.
coop_handle_t register_coop(coop_unique_holder_t agent_coop)
Register cooperation.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A public interface for control SObjectizer monitoring options.
An interface of data sources repository.
An indentificator for the timer.
Definition timers.hpp:73
virtual void finish()=0
Finish timer and wait for full stop.
virtual void schedule_anonymous(const std::type_index &type_index, const mbox_t &mbox, const message_ref_t &msg, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period)=0
Push anonymous delayed/periodic message to the timer queue.
virtual timer_thread_stats_t query_stats()=0
Get statistics for run-time monitoring.
virtual void start()=0
Launch timer.
virtual timer_id_t schedule(const std::type_index &type_index, const mbox_t &mbox, const message_ref_t &msg, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period)=0
Push delayed/periodic message to the timer queue.
#define SO_5_FUNC
Definition declspec.hpp:48
Some reusable and low-level classes/functions which can be used in public header files.
auto do_with_rollback_on_exception(Main_Action main_action, Rollback_Action rollback_action) -> decltype(main_action())
Helper function for do some action with rollback in the case of an exception.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of nef_one_thread dispatcher.
Dispatcher with single working thread.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
Event dispatchers.
Default multi-threaded environment infrastructure.
SO_5_FUNC environment_infrastructure_factory_t factory()
A factory for creation the default multitheading environment infrastructure.
Various implementations of environment_infrastructure.
Details of SObjectizer run-time implementations.
Definition agent.cpp:780
void process_final_dereg_chain(coop_shptr_t head) noexcept
Helper function that does proceesing of final dereg chain.
void run_stage(const std::string &stage_name, Init_Fn &&init_fn, Deinit_Fn &&deinit_fn, Next_Stage &&next_stage)
Helper template function for doing initialization phase with rollback on failure.
Definition run_stage.hpp:31
void wrap_init_fn_call(Init_Fn init_fn)
A special wrapper for calling init function.
timer_thread_unique_ptr_t create_appropriate_timer_thread(error_logger_shptr_t error_logger, const timer_thread_factory_t &user_factory)
Helper function for timer_thread creation.
Definition timers.hpp:754
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
disp_binder_shptr_t operator()(so_5::disp::one_thread::dispatcher_handle_t &handle) const
disp_binder_shptr_t operator()(so_5::disp::nef_one_thread::dispatcher_handle_t &handle) const
mt_env_infrastructure_t::default_dispatcher_holder_t operator()(const so_5::disp::one_thread::disp_params_t &params) const
mt_env_infrastructure_t::default_dispatcher_holder_t operator()(const so_5::disp::nef_one_thread::disp_params_t &params) const
Statistical data for run-time monitoring of coop repository content.
std::size_t m_total_coop_count
Count of cooperations inside the environment.
Statistics for run-time monitoring.
Definition timers.hpp:120