SObjectizer  5.8
Loading...
Searching...
No Matches
simple_not_mtsafe_st_env_infrastructure.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief A very simple not-multithreaded-safe single thread
8 * environment infrastructure.
9 *
10 * \since v.5.5.19
11 */
12
13#include <so_5/impl/st_env_infrastructure_reuse.hpp>
14
15#include <so_5/impl/run_stage.hpp>
16#include <so_5/impl/internal_env_iface.hpp>
17
18#include <so_5/impl/final_dereg_chain_helpers.hpp>
19
20#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
21
22#include <so_5/environment.hpp>
23#include <so_5/env_infrastructures.hpp>
24
25#include <so_5/details/at_scope_exit.hpp>
26#include <so_5/details/sync_helpers.hpp>
27
28namespace so_5 {
29
30namespace env_infrastructures {
31
32namespace simple_not_mtsafe {
33
34namespace impl {
35
36//! A short name for namespace with run-time stats stuff.
37namespace stats = ::so_5::stats;
38
39//! A short name for namespace with reusable stuff.
40namespace reusable = ::so_5::env_infrastructures::st_reusable_stuff;
41
42//
43// shutdown_status_t
44//
45using shutdown_status_t = reusable::shutdown_status_t;
46
47//
48// event_queue_impl_t
49//
50/*!
51 * \brief Implementation of event_queue interface for this type of
52 * environment infrastructure.
53 *
54 * \since v.5.5.19
55 */
56class event_queue_impl_t final : public so_5::event_queue_t
57 {
58 public :
59 //! Type for representation of statistical data for this event queue.
60 struct stats_t
61 {
62 //! The current size of the demands queue.
64 };
65
66 void
67 push( execution_demand_t demand ) override
68 {
69 m_demands.push_back( std::move(demand) );
70 }
71
72 /*!
73 * \note
74 * Delegates the work to the push() method.
75 */
76 void
78 {
79 this->push( std::move(demand) );
80 }
81
82 /*!
83 * \note
84 * Delegates the work to the push() method.
85 *
86 * \attention
87 * Terminates the whole application if the push() throws.
88 */
89 void
90 push_evt_finish( execution_demand_t demand ) noexcept override
91 {
92 this->push( std::move(demand) );
93 }
94
97 {
98 return { m_demands.size() };
99 }
100
101 //! Type for result of extraction operation.
102 enum class pop_result_t
103 {
104 extracted,
106 };
107
109 pop( execution_demand_t & receiver ) noexcept
110 {
111 if( !m_demands.empty() )
112 {
113 receiver = std::move(m_demands.front());
114 m_demands.pop_front();
116 }
117
119 }
120
121 private :
123 };
124
125//
126// coop_repo_t
127//
128/*!
129 * \brief Implementation of coop_repository for
130 * simple thread-safe single-threaded environment infrastructure.
131 *
132 * \since v.5.5.19
133 */
134using coop_repo_t = reusable::coop_repo_t;
135
136//
137// disp_ds_name_parts_t
138//
139/*!
140 * \brief A special class for generation of names for dispatcher data sources.
141 *
142 * \since v.5.5.19
143 */
145 {
146 static constexpr const char *
147 disp_type_part() noexcept { return "not_mtsafe_st_env"; }
148 };
149
150//
151// default_dispatcher_t
152//
153/*!
154 * \brief An implementation of dispatcher to be used in
155 * places where default dispatcher is needed.
156 *
157 * \tparam Activity_Tracker a type of activity tracker to be used
158 * for run-time statistics.
159 *
160 * \since v.5.5.19
161 */
162template< typename Activity_Tracker >
163using default_dispatcher_t =
164 reusable::default_dispatcher_t<
165 event_queue_impl_t,
166 Activity_Tracker,
168
169//
170// stats_controller_t
171//
172/*!
173 * \brief Implementation of stats_controller for that type of
174 * single-threaded environment.
175 *
176 * \since v.5.5.19
177 */
178using stats_controller_t =
179 reusable::stats_controller_t< so_5::details::no_lock_holder_t >;
180
181//
182// env_infrastructure_t
183//
184/*!
185 * \brief Default implementation of not-thread safe single-threaded environment
186 * infrastructure.
187 *
188 * \attention
189 * This class doesn't have any mutex inside.
190 *
191 * \tparam Activity_Tracker A type for tracking activity of main working thread.
192 *
193 * \since v.5.5.19
194 */
195template< typename Activity_Tracker >
198 {
199 public :
201 //! Environment to work in.
202 environment_t & env,
203 //! Factory for timer manager.
204 timer_manager_factory_t timer_factory,
205 //! Error logger necessary for timer_manager.
206 error_logger_shptr_t error_logger,
207 //! Cooperation action listener.
208 coop_listener_unique_ptr_t coop_listener,
209 //! Mbox for distribution of run-time stats.
210 mbox_t stats_distribution_mbox );
211
212 void
213 launch( env_init_t init_fn ) override;
214
215 void
216 stop() noexcept override;
217
218 [[nodiscard]]
220 make_coop(
221 coop_handle_t parent,
222 disp_binder_shptr_t default_binder ) override;
223
226 coop_unique_holder_t coop ) override;
227
228 void
230 coop_shptr_t coop ) noexcept override;
231
232 bool
234 coop_shptr_t coop_name ) noexcept override;
235
238 const std::type_index & type_wrapper,
239 const message_ref_t & msg,
240 const mbox_t & mbox,
241 std::chrono::steady_clock::duration pause,
242 std::chrono::steady_clock::duration period ) override;
243
244 void
246 const std::type_index & type_wrapper,
247 const message_ref_t & msg,
248 const mbox_t & mbox,
249 std::chrono::steady_clock::duration pause ) override;
250
251 stats::controller_t &
252 stats_controller() noexcept override;
253
254 stats::repository_t &
255 stats_repository() noexcept override;
256
259
261 query_timer_thread_stats() override;
262
264 make_default_disp_binder() override;
265
266 private :
268
269 /*!
270 * \brief The chain of coops for the final deregistration.
271 *
272 * \since v.5.8.0
273 */
275
276 //! Status of shutdown procedure.
277 shutdown_status_t m_shutdown_status{ shutdown_status_t::not_started };
278
279 //! A collector for elapsed timers.
280 reusable::direct_delivery_elapsed_timers_collector_t m_timers_collector;
281
282 //! A timer manager to be used.
284
285 //! Queue for execution_demands which must be handled on the main thread.
286 event_queue_impl_t m_event_queue;
287
288 //! Repository of registered coops.
289 coop_repo_t m_coop_repo;
290
291 //! Actual activity tracker for main working thread.
292 Activity_Tracker m_activity_tracker;
293
294 //! Dispatcher to be used as default dispatcher.
295 /*!
296 * \note
297 * Has an actual value only inside launch() method.
298 */
300
301 //! Stats controller for this environment.
302 stats_controller_t m_stats_controller;
303
304 void
306 env_init_t init_fn );
307
308 void
310 env_init_t init_fn );
311
312 void
313 run_main_loop() noexcept;
314
315 void
317
318 void
320
321 void
323
324 void
325 try_handle_next_demand() noexcept;
326 };
327
328template< typename Activity_Tracker >
330 environment_t & env,
331 timer_manager_factory_t timer_factory,
332 error_logger_shptr_t error_logger,
333 coop_listener_unique_ptr_t coop_listener,
334 mbox_t stats_distribution_mbox )
335 : m_env( env )
344 {}
345
346template< typename Activity_Tracker >
347void
348env_infrastructure_t< Activity_Tracker >::launch( env_init_t init_fn )
349 {
351 }
352
353template< typename Activity_Tracker >
354void
355env_infrastructure_t< Activity_Tracker >::stop() noexcept
356 {
358 {
360 }
361 }
362
363template< typename Activity_Tracker >
365env_infrastructure_t< Activity_Tracker >::make_coop(
366 coop_handle_t parent,
367 disp_binder_shptr_t default_binder )
368 {
369 return m_coop_repo.make_coop(
370 std::move(parent),
372 }
373
374template< typename Activity_Tracker >
378 {
380 }
381
382template< typename Activity_Tracker >
383void
385 coop_shptr_t coop ) noexcept
386 {
388 }
389
390template< typename Activity_Tracker >
391bool
393 coop_shptr_t coop ) noexcept
394 {
397 }
398
399template< typename Activity_Tracker >
402 const std::type_index & type_wrapper,
403 const message_ref_t & msg,
404 const mbox_t & mbox,
405 std::chrono::steady_clock::duration pause,
406 std::chrono::steady_clock::duration period )
407 {
410 mbox,
411 msg,
412 pause,
413 period );
414
415 return timer;
416 }
417
418template< typename Activity_Tracker >
419void
421 const std::type_index & type_wrapper,
422 const message_ref_t & msg,
423 const mbox_t & mbox,
424 std::chrono::steady_clock::duration pause )
425 {
428 mbox,
429 msg,
430 pause,
432 }
433
434template< typename Activity_Tracker >
435stats::controller_t &
436env_infrastructure_t< Activity_Tracker >::stats_controller() noexcept
437 {
438 return m_stats_controller;
439 }
440
441template< typename Activity_Tracker >
442stats::repository_t &
443env_infrastructure_t< Activity_Tracker >::stats_repository() noexcept
444 {
445 return m_stats_controller;
446 }
447
448template< typename Activity_Tracker >
460
461template< typename Activity_Tracker >
464 {
466 }
467
468template< typename Activity_Tracker >
471 {
472 return { m_default_disp };
473 }
474
475template< typename Activity_Tracker >
476void
478 env_init_t init_fn )
479 {
480 ::so_5::impl::run_stage(
481 "run_default_dispatcher",
482 [this] {
488 },
489 [this] {
491 },
492 [this, init_func=std::move(init_fn)] {
494 } );
495 }
496
497template< typename Activity_Tracker >
498void
500 env_init_t init_fn )
501 {
502 /*
503 If init_fn throws an exception we can found ourselves
504 in a situation where there are some working coops.
505 Those coops should be correctly deregistered. It means
506 that we should usual main loop even in the case of
507 an exception from init_fn. But this main loop should
508 work only until all coops will be deregistered.
509
510 To do that we will catch an exception from init and
511 initiate shutdown even before the call to run_main_loop().
512 Then we call run_main_loop() and wail for its completion.
513 Then we reraise the exception caught.
514
515 Note that in this scheme run_main_loop() should be
516 noexcept function because otherwise we will loose the
517 initial exception from init_fn.
518 */
520 try
521 {
523 }
524 catch( ... )
525 {
526 // We can't restore if there will be an exception.
528 // Store the content of the exception to reraise it later.
530 // Execution should be stopped.
531 stop();
532 } );
533 }
534
535 // We don't expect exceptions from the main loop.
538 } );
539
540 // If there was an exception from init_fn this exception
541 // should be rethrown.
544 }
545
546template< typename Activity_Tracker >
547void
548env_infrastructure_t< Activity_Tracker >::run_main_loop() noexcept
549 {
550 // Assume that waiting for new demands is started.
551 // This call is necessary because if there is a demand
552 // in event queue then m_activity_tracker.wait_stopped() will be
553 // called without previous m_activity_tracker.wait_started().
555
556 for(;;)
557 {
558 // The first step: all pending final deregs must be processed.
560
561 // There can be pending shutdown operation. It must be handled.
564 break;
565
566 // The next step: all timers must be converted to events.
568
569 // The last step: an attempt to process demand.
570 // Or sleep for some time until next demand arrived.
572 }
573 }
574
575template< typename Activity_Tracker >
576void
578 {
579 // This loop is necessary because it is possible that new
580 // final dereg demand will be added during processing of
581 // the current final dereg demand.
582 while( !m_final_dereg_chain.empty() )
583 {
586 }
587 }
588
589template< typename Activity_Tracker >
590void
592 {
594 {
595 // Shutdown procedure must be started.
597
598 // All registered cooperations must be deregistered now.
600 }
601
603 {
604 // If there is no more live coops then shutdown must be completed.
607 }
608 }
609
610template< typename Activity_Tracker >
611void
613 {
614 // All expired timers must be collected.
615 // NOTE: a direct delivery happens in this environment infrastructure.
617 }
618
619template< typename Activity_Tracker >
620void
621env_infrastructure_t< Activity_Tracker >::try_handle_next_demand() noexcept
622 {
624 const auto pop_result = m_event_queue.pop( demand );
625 // If there is no demands we must go to sleep for some time...
627 {
628 // ... but we should go to sleep only if there is no
629 // pending final deregistration actions.
631 {
632 // We must try to sleep for next timer but only if
633 // there is any timer.
634 if( !m_timer_manager->empty() )
635 {
636 // Tracking time for 'waiting' state must be turned on.
638
639 const auto sleep_time =
641 // We can use very large value here.
642 std::chrono::hours(24) );
643
645 }
646 else
647 // There are no demands and there are no timers.
648 // Environment's work must be finished.
649 stop();
650 }
651 }
652 else
653 {
654 // Tracking time for 'waiting' must be turned off, but
655 // tracking time for 'working' must be tuned on and then off again.
659 [this]{ m_activity_tracker.work_stopped(); } );
660
661 // There is at least one demand to process.
663 }
664 }
665
666//
667// ensure_autoshutdown_enabled
668//
669/*!
670 * Throws an exception if autoshutdown feature is disabled.
671 *
672 * \since v.5.5.19
673 */
674void
676 const environment_params_t & env_params )
677 {
678 if( env_params.autoshutdown_disabled() )
679 SO_5_THROW_EXCEPTION( rc_autoshutdown_must_be_enabled,
680 "autoshutdown feature must be enabled for "
681 "so_5::env_infrastructures::simple_not_mtsafe" );
682 }
683
684} /* namespace impl */
685
686//
687// simple_not_mtsafe_st_env_infrastructure_factory
688//
691 {
692 using namespace impl;
693
694 return [infrastructure_params](
695 environment_t & env,
696 environment_params_t & env_params,
697 mbox_t stats_distribution_mbox )
698 {
699 ensure_autoshutdown_enabled( env_params );
700
701 environment_infrastructure_t * obj = nullptr;
702
703 auto timer_manager_factory =
704 infrastructure_params.timer_manager();
705
706 // Create environment infrastructure object in dependence of
707 // work thread activity tracking flag.
708 const auto tracking = env_params.work_thread_activity_tracking();
709 if( work_thread_activity_tracking_t::on == tracking )
710 obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
711 env,
712 std::move(timer_manager_factory),
713 env_params.so5_error_logger(),
714 env_params.so5_giveout_coop_listener(),
715 std::move(stats_distribution_mbox) );
716 else
717 obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
718 env,
719 std::move(timer_manager_factory),
720 env_params.so5_error_logger(),
721 env_params.so5_giveout_coop_listener(),
722 std::move(stats_distribution_mbox) );
723
724 return environment_infrastructure_unique_ptr_t(
725 obj,
726 environment_infrastructure_t::default_deleter() );
727 };
728 }
729
730} /* namespace simple_not_mtsafe */
731
732} /* namespace env_infrastructures */
733
734} /* namespace so_5 */
Type of smart handle for a cooperation.
A special type that plays role of unique_ptr for coop.
Definition coop.hpp:1342
A class to be used as mixin without any real mutex instance inside.
Default implementation of not-thread safe single-threaded environment infrastructure.
bool final_deregister_coop(coop_shptr_t coop_name) noexcept override
Do final actions of the cooperation deregistration.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
Create an instance of a new coop.
void launch(env_init_t init_fn) override
Do actual launch of SObjectizer's Environment.
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
Query run-time statistics for cooperation repository.
so_5::impl::final_dereg_chain_holder_t m_final_dereg_chain
The chain of coops for the final deregistration.
event_queue_impl_t m_event_queue
Queue for execution_demands which must be handled on the main thread.
stats::repository_t & stats_repository() noexcept override
Get stats repository for the environment.
Activity_Tracker m_activity_tracker
Actual activity tracker for main working thread.
reusable::direct_delivery_elapsed_timers_collector_t m_timers_collector
A collector for elapsed timers.
timer_thread_stats_t query_timer_thread_stats() override
Query run-time statistics for timer (thread or manager).
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
env_infrastructure_t(environment_t &env, timer_manager_factory_t timer_factory, error_logger_shptr_t error_logger, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
void stop() noexcept override
Initiate a signal for shutdown of Environment.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
Initiate a timer (delayed or periodic message).
stats::controller_t & stats_controller() noexcept override
Get stats controller for the environment.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
Initiate a delayed message.
coop_handle_t register_coop(coop_unique_holder_t coop) override
Register new cooperation.
disp_binder_shptr_t make_default_disp_binder() override
Create a binder for the default dispatcher.
void push(execution_demand_t demand) override
Enqueue new event to the queue.
An implementation of dispatcher to be used in places where default dispatcher is needed.
An interface for environment_infrastructure entity.
Parameters for the SObjectizer Environment initialization.
SObjectizer Environment.
An interface of event queue for agent.
Helper class for holding the current chain of coops for the final deregistration.
A public interface for control SObjectizer monitoring options.
An interface of data sources repository.
An indentificator for the timer.
Definition timers.hpp:82
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
Simple single-threaded environment infrastructure without thread safety.
SO_5_FUNC environment_infrastructure_factory_t factory(params_t &&params)
A factory for creation of simple not-thread-safe single-thread environment infrastructure object.
Various reusable stuff which can be used in implementation of single-threaded environment infrastruct...
shutdown_status_t
A short name for namespace with run-time stats stuff.
Various implementations of environment_infrastructure.
Details of SObjectizer run-time implementations.
Definition agent.cpp:905
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
Statistical data for run-time monitoring of coop repository content.
A description of event execution demand.
Statistics for run-time monitoring.
Definition timers.hpp:136