SObjectizer-5 Extra
simple_not_mtsafe.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio-based simple not thread safe
4  * environment infrastructure.
5  */
6 
7 #pragma once
8 
9 #include <so_5_extra/env_infrastructures/asio/impl/common.hpp>
10 
11 #include <so_5/impl/st_env_infrastructure_reuse.hpp>
12 #include <so_5/impl/internal_env_iface.hpp>
13 #include <so_5/details/sync_helpers.hpp>
14 #include <so_5/details/at_scope_exit.hpp>
15 #include <so_5/details/invoke_noexcept_code.hpp>
16 
17 #include <asio.hpp>
18 
19 #include <string_view>
20 
21 namespace so_5 {
22 
23 namespace extra {
24 
25 namespace env_infrastructures {
26 
27 namespace asio {
28 
29 namespace simple_not_mtsafe {
30 
31 namespace impl {
32 
33 //! A short name for namespace with reusable stuff.
35 
36 //! A short name for namespace with common stuff.
37 /*!
38  * \since
39  * v.1.0.2
40  */
42 
43 //
44 // shutdown_status_t
45 //
47 
48 //
49 // coop_repo_t
50 //
51 /*!
52  * \brief Implementation of coop_repository for
53  * simple thread-safe single-threaded environment infrastructure.
54  */
56 
57 //
58 // stats_controller_t
59 //
60 /*!
61  * \brief Implementation of stats_controller for that type of
62  * single-threaded environment.
63  */
64 using stats_controller_t =
66 
67 //
68 // event_queue_impl_t
69 //
70 /*!
71  * \brief Implementation of event_queue interface for the default dispatcher.
72  *
73  * \tparam Activity_Tracker A type for tracking work thread activity.
74  */
75 template< typename Activity_Tracker >
77  {
78  public :
79  //! Type for representation of statistical data for this event queue.
80  struct stats_t
81  {
82  //! The current size of the demands queue.
84  };
85 
86  //! Initializing constructor.
88  //! Asio's io_context to be used for dispatching.
89  outliving_reference_t<::asio::io_context> io_svc,
90  //! Actual activity tracker.
91  outliving_reference_t<Activity_Tracker> activity_tracker )
92  : m_io_svc(io_svc)
94  {}
95 
96  void
97  push( execution_demand_t demand ) override
98  {
99  ::asio::post(
100  m_io_svc.get(),
101  [this, d = std::move(demand)]() mutable {
102  // Statistics must be updated.
104 
105  // The demand can be handled now.
106  // With working time tracking.
108  {
109  // For the case if call_handler will throw.
110  const auto stopper = ::so_5::details::at_scope_exit(
111  [this]{ m_activity_tracker.get().work_stopped(); });
112 
114  }
115 
116  // If there is no any pending demands then
117  // waiting must be started.
118  if( !m_stats.m_demands_count )
120  } );
121 
122  if( !m_stats.m_demands_count )
123  // Waiting must be stopped because we have received an event.
125 
126  // Increment demands count only if post doesn't throw.
128  }
129 
130  //! Notification that event queue work is started.
131  void
133  //! ID of the main working thread.
134  current_thread_id_t thread_id )
135  {
137 
138  // There is no any pending demand now. We can start counting
139  // the waiting time.
141  }
142 
143  //! Get the current statistics.
144  stats_t
145  query_stats() const noexcept
146  {
147  return m_stats;
148  }
149 
150  private :
153 
156  };
157 
158 //
159 // disp_ds_name_parts_t
160 //
161 /*!
162  * \brief A class with major part of dispatcher name.
163  */
164 struct disp_ds_name_parts_t final
165  {
166  static constexpr std::string_view
167  disp_type_part() noexcept { return { "asio_not_mtsafe" }; }
168  };
169 
170 //
171 // default_dispatcher_t
172 //
173 /*!
174  * \brief An implementation of dispatcher to be used in
175  * places where default dispatcher is needed.
176  *
177  * \tparam Activity_Tracker a type of activity tracker to be used
178  * for run-time statistics.
179  *
180  * \since
181  * v.1.3.0
182  */
183 template< typename Activity_Tracker >
185  : public reusable::default_dispatcher_t<
189  {
194 
195  public :
197  outliving_reference_t< environment_t > env,
198  outliving_reference_t< event_queue_impl_t<Activity_Tracker> > event_queue,
199  outliving_reference_t< Activity_Tracker > activity_tracker )
201  {
202  // Event queue should be started manually.
203  // We known that the default dispatcher is created on a thread
204  // that will be used for events dispatching.
205  event_queue.get().start( this->thread_id() );
206  }
207  };
208 
209 //
210 // env_infrastructure_t
211 //
212 /*!
213  * \brief Default implementation of not-thread safe single-threaded environment
214  * infrastructure.
215  *
216  * \attention
217  * This class doesn't have any mutex inside.
218  *
219  * \tparam Activity_Tracker A type of activity tracker to be used.
220  */
221 template< typename Activity_Tracker >
224  {
225  public :
227  //! Asio's io_context to be used.
228  outliving_reference_t<::asio::io_context> io_svc,
229  //! Environment to work in.
230  environment_t & env,
231  //! Cooperation action listener.
232  coop_listener_unique_ptr_t coop_listener,
233  //! Mbox for distribution of run-time stats.
234  mbox_t stats_distribution_mbox );
235 
236  void
237  launch( env_init_t init_fn ) override;
238 
239  void
240  stop() override;
241 
242  SO_5_NODISCARD
243  coop_unique_holder_t
244  make_coop(
247 
250  coop_unique_holder_t coop ) override;
251 
252  void
254  coop_shptr_t coop ) noexcept override;
255 
256  bool
258  coop_shptr_t coop_name ) override;
259 
262  const std::type_index & type_wrapper,
263  const message_ref_t & msg,
264  const mbox_t & mbox,
266  std::chrono::steady_clock::duration period ) override;
267 
268  void
269  single_timer(
270  const std::type_index & type_wrapper,
271  const message_ref_t & msg,
272  const mbox_t & mbox,
273  std::chrono::steady_clock::duration pause ) override;
274 
276  stats_controller() noexcept override;
277 
279  stats_repository() noexcept override;
280 
282  query_coop_repository_stats() override;
283 
285  query_timer_thread_stats() override;
286 
288  make_default_disp_binder() override;
289 
290  private :
291  //! Asio's io_context to be used.
293 
294  //! Actual SObjectizer Environment.
296 
297  //! Status of shutdown procedure.
299 
300  //! Repository of registered coops.
302 
303  //! Actual activity tracker.
304  Activity_Tracker m_activity_tracker;
305 
306  //! Event queue which is necessary for the default dispatcher.
307  event_queue_impl_t< Activity_Tracker > m_event_queue;
308 
309  //! Dispatcher to be used as default dispatcher.
310  /*!
311  * \note
312  * Has an actual value only inside launch() method.
313  */
315 
316  //! Stats controller for this environment.
318 
319  //! Counter of cooperations which are waiting for final deregistration
320  //! step.
321  /*!
322  * It is necessary for building correct run-time stats.
323  */
325 
326  //! The pointer to an exception that was thrown during init phase.
327  /*!
328  * This exception is stored inside a callback posted to Asio.
329  * An then this exception will be rethrown from launch() method
330  * after the shutdown of SObjectizer.
331  */
333 
334  void
335  run_default_dispatcher_and_go_further( env_init_t init_fn );
336 
337  /*!
338  * \note Calls m_io_svc.stop() if necessary.
339  */
340  void
342  };
343 
344 template< typename Activity_Tracker >
346  outliving_reference_t<::asio::io_context> io_svc,
347  environment_t & env,
348  coop_listener_unique_ptr_t coop_listener,
349  mbox_t stats_distribution_mbox )
350  : m_io_svc( io_svc )
351  , m_env( env )
357  {}
358 
359 template< typename Activity_Tracker >
360 void
361 env_infrastructure_t<Activity_Tracker>::launch( env_init_t init_fn )
362  {
363  // Post initial operation to Asio event loop.
364  ::asio::post( m_io_svc.get(), [this, init = std::move(init_fn)] {
366  } );
367 
368  // Default dispatcher should be destroyed on exit from this function.
371  } );
372 
373  // Launch Asio event loop.
374  m_io_svc.get().run();
375 
376  // Event loop can be finished in two cases:
377  // 1. SObjectizer has been shut down. We should do nothing in that case.
378  // 2. There is no more work for Asio. But SObjectizer is still working.
379  // In that case a normal shutdown must be initiated.
380  //
381  const auto still_working = [this]{
383  };
384 
385  if( still_working() )
386  {
387  // Initiate a shutdown operation.
388  stop();
389  // Run Asio event loop until shutdown will be finished.
390  do
391  {
392  m_io_svc.get().restart();
393  m_io_svc.get().run();
394  }
395  while( still_working() );
396  }
397 
399  // Some exception was thrown during initialization.
400  // It should be rethrown.
402  }
403 
404 template< typename Activity_Tracker >
405 void
406 env_infrastructure_t<Activity_Tracker>::stop()
407  {
408  // Note: if the code below throws then we don't know the actual
409  // state of env_infrastructure. Because of that we just terminate
410  // the whole application in the case of an exception.
413  {
415  ::asio::post( m_io_svc.get(), [this] {
416  // Shutdown procedure must be started.
418 
419  // All registered cooperations must be deregistered now.
421 
423  } );
424  }
425  else
427  } );
428  }
429 
430 template< typename Activity_Tracker >
431 coop_unique_holder_t
432 env_infrastructure_t< Activity_Tracker >::make_coop(
433  coop_handle_t parent,
434  disp_binder_shptr_t default_binder )
435  {
436  return m_coop_repo.make_coop(
437  std::move(parent),
438  std::move(default_binder) );
439  }
440 
441 template< typename Activity_Tracker >
443 env_infrastructure_t< Activity_Tracker >::register_coop(
445  {
446  return m_coop_repo.register_coop( std::move(coop) );
447  }
448 
449 template< typename Activity_Tracker >
450 void
452  coop_shptr_t coop_to_dereg ) noexcept
453  {
455 
456  ::asio::post( m_io_svc.get(), [this, coop = std::move(coop_to_dereg)] {
460  } );
461  }
462 
463 template< typename Activity_Tracker >
464 bool
466  coop_shptr_t coop )
467  {
470  }
471 
472 template< typename Activity_Tracker >
475  const std::type_index & type_index,
476  const message_ref_t & msg,
477  const mbox_t & mbox,
480  {
481  using namespace asio_common;
482 
484  if( period != std::chrono::steady_clock::duration::zero() )
485  {
488  m_io_svc.get(),
489  type_index,
490  msg,
491  mbox,
492  period ) };
493 
494  result = timer_id_t{
496 
498  }
499  else
500  {
503  m_io_svc.get(),
504  type_index,
505  msg,
506  mbox ) };
507 
508  result = timer_id_t{
510 
512  }
513 
514  return result;
515  }
516 
517 template< typename Activity_Tracker >
518 void
520  const std::type_index & type_index,
521  const message_ref_t & msg,
522  const mbox_t & mbox,
523  std::chrono::steady_clock::duration pause )
524  {
525  using namespace asio_common;
526 
529  m_io_svc.get(),
530  type_index,
531  msg,
532  mbox ) };
533 
535  }
536 
537 template< typename Activity_Tracker >
539 env_infrastructure_t<Activity_Tracker>::stats_controller() noexcept
540  {
541  return m_stats_controller;
542  }
543 
544 template< typename Activity_Tracker >
546 env_infrastructure_t<Activity_Tracker>::stats_repository() noexcept
547  {
548  return m_stats_controller;
549  }
550 
551 template< typename Activity_Tracker >
554  {
555  const auto stats = m_coop_repo.query_stats();
556 
561  };
562  }
563 
564 template< typename Activity_Tracker >
567  {
568  // Note: this type of environment_infrastructure doesn't support
569  // statistics for timers.
570  return { 0, 0 };
571  }
572 
573 template< typename Activity_Tracker >
576  {
577  return { m_default_disp };
578  }
579 
580 template< typename Activity_Tracker >
581 void
583  env_init_t init_fn )
584  {
585  try
586  {
592 
593  // User-supplied init can be called now.
594  init_fn();
595  }
596  catch(...)
597  {
598  // We can't restore if the following fragment throws and exception.
600  // The current exception should be stored to be
601  // rethrown later.
603 
604  // SObjectizer's shutdown should be initiated.
605  stop();
606 
607  // NOTE: pointer to the default dispatcher will be dropped
608  // in launch() method.
609  } );
610  }
611  }
612 
613 template< typename Activity_Tracker >
614 void
616  {
618  {
619  // If there is no more live coops then shutdown must be
620  // completed.
621  if( !m_coop_repo.has_live_coop() )
622  {
624  // Asio's event loop must be broken here!
625  m_io_svc.get().stop();
626  }
627  }
628  }
629 
630 //
631 // ensure_autoshutdown_enabled
632 //
633 /*!
634  * Throws an exception if autoshutdown feature is disabled.
635  */
636 void
638  const environment_params_t & env_params )
639  {
640  if( env_params.autoshutdown_disabled() )
641  SO_5_THROW_EXCEPTION( rc_autoshutdown_must_be_enabled,
642  "autoshutdown feature must be enabled for "
643  "so_5::env_infrastructures::simple_not_mtsafe" );
644  }
645 
646 } /* namespace impl */
647 
648 //
649 // factory
650 //
651 /*!
652  * \brief A factory for creation of environment infrastructure based on
653  * Asio's event loop.
654  *
655  * \attention
656  * This environment infrastructure is not a thread safe.
657  *
658  * Usage example:
659  * \code
660 int main()
661 {
662  asio::io_context io_svc;
663 
664  so_5::launch( [](so_5::environment_t & env) {
665  ... // Some initialization stuff.
666  },
667  [&io_svc](so_5::environment_params_t & params) {
668  using asio_env = so_5::extra::env_infrastructures::asio::simple_not_mtsafe;
669 
670  params.infrastructure_factory( asio_env::factory(io_svc) );
671  } );
672 
673  return 0;
674 }
675  * \endcode
676  */
679  {
680  using namespace impl;
681 
682  return [&io_svc](
683  environment_t & env,
684  environment_params_t & env_params,
685  mbox_t stats_distribution_mbox )
686  {
687  ensure_autoshutdown_enabled( env_params );
688 
689  environment_infrastructure_t * obj = nullptr;
690 
691  // Create environment infrastructure object in dependence of
692  // work thread activity tracking flag.
693  const auto tracking = env_params.work_thread_activity_tracking();
694  if( work_thread_activity_tracking_t::on == tracking )
695  obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
696  outliving_mutable(io_svc),
697  env,
698  env_params.so5__giveout_coop_listener(),
699  std::move(stats_distribution_mbox) );
700  else
701  obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
702  outliving_mutable(io_svc),
703  env,
704  env_params.so5__giveout_coop_listener(),
705  std::move(stats_distribution_mbox) );
706 
707  return environment_infrastructure_unique_ptr_t(
708  obj,
709  environment_infrastructure_t::default_deleter() );
710  };
711  }
712 
713 } /* namespace simple_not_mtsafe */
714 
715 } /* namespace asio */
716 
717 } /* namespace env_infrastructures */
718 
719 } /* namespace extra */
720 
721 } /* namespace so_5 */
Type for representation of statistical data for this event queue.
event_queue_impl_t(outliving_reference_t<::asio::io_context > io_svc, outliving_reference_t< Activity_Tracker > activity_tracker)
Initializing constructor.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
void start(current_thread_id_t thread_id)
Notification that event queue work is started.
An implementation of dispatcher to be used in places where default dispatcher is needed.
void ensure_autoshutdown_enabled(const environment_params_t &env_params)
Implementation of event_queue interface for the default dispatcher.
event_queue_impl_t< Activity_Tracker > m_event_queue
Event queue which is necessary for the default dispatcher.
stats_controller_t m_stats_controller
Stats controller for this environment.
Ranges for error codes of each submodules.
Definition: details.hpp:13
std::size_t m_final_dereg_coop_count
Counter of cooperations which are waiting for final deregistration step.
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
outliving_reference_t< ::asio::io_context > m_io_svc
Asio&#39;s io_context to be used.
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< event_queue_impl_t< Activity_Tracker > > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
environment_infrastructure_factory_t factory(::asio::io_context &io_svc)
A factory for creation of environment infrastructure based on Asio&#39;s event loop.
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
SO_5_NODISCARD coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
env_infrastructure_t(outliving_reference_t<::asio::io_context > io_svc, environment_t &env, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
std::exception_ptr m_exception_from_init
The pointer to an exception that was thrown during init phase.