SObjectizer-5 Extra
simple_not_mtsafe.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio-based simple not thread safe
4  * environment infrastructure.
5  */
6 
7 #pragma once
8 
9 #include <so_5_extra/env_infrastructures/asio/impl/common.hpp>
10 
11 #include <so_5/rt/impl/h/st_env_infrastructure_reuse.hpp>
12 #include <so_5/details/h/sync_helpers.hpp>
13 #include <so_5/details/h/at_scope_exit.hpp>
14 #include <so_5/details/h/invoke_noexcept_code.hpp>
15 
16 #include <so_5/h/stdcpp.hpp>
17 
18 #include <asio.hpp>
19 
20 namespace so_5 {
21 
22 namespace extra {
23 
24 namespace env_infrastructures {
25 
26 namespace asio {
27 
28 namespace simple_not_mtsafe {
29 
30 namespace impl {
31 
32 //! A short name for namespace with reusable stuff.
34 
35 //! A short name for namespace with common stuff.
36 /*!
37  * \since
38  * v.1.0.2
39  */
41 
42 //
43 // shutdown_status_t
44 //
46 
47 //
48 // coop_repo_t
49 //
50 /*!
51  * \brief Implementation of coop_repository for
52  * simple thread-safe single-threaded environment infrastructure.
53  */
55 
56 //
57 // stats_controller_t
58 //
59 /*!
60  * \brief Implementation of stats_controller for that type of
61  * single-threaded environment.
62  */
63 using stats_controller_t =
65 
66 //
67 // event_queue_impl_t
68 //
69 /*!
70  * \brief Implementation of event_queue interface for the default dispatcher.
71  *
72  * \tparam Activity_Tracker A type for tracking work thread activity.
73  */
74 template< typename Activity_Tracker >
76  {
77  public :
78  //! Type for representation of statistical data for this event queue.
79  struct stats_t
80  {
81  //! The current size of the demands queue.
83  };
84 
85  //! Initializing constructor.
87  //! Asio's io_context to be used for dispatching.
88  outliving_reference_t<::asio::io_context> io_svc,
89  //! Actual activity tracker.
90  outliving_reference_t<Activity_Tracker> activity_tracker )
91  : m_io_svc(io_svc)
93  {}
94 
95  virtual void
96  push( execution_demand_t demand ) override
97  {
98  ::asio::post(
99  m_io_svc.get(),
100  [this, d = std::move(demand)]() mutable {
101  // Statistics must be updated.
103 
104  // The demand can be handled now.
105  // With working time tracking.
107  {
108  // For the case if call_handler will throw.
109  const auto stopper = ::so_5::details::at_scope_exit(
110  [this]{ m_activity_tracker.get().work_stopped(); });
111 
113  }
114 
115  // If there is no any pending demands then
116  // waiting must be started.
117  if( !m_stats.m_demands_count )
119  } );
120 
121  // Increment demands count only if post doesn't throw.
122  if( !m_stats.m_demands_count )
123  // Waiting must be stopped because we have received an event.
125 
127  }
128 
129  //! Notification that event queue work is started.
130  void
132  //! ID of the main working thread.
133  current_thread_id_t thread_id )
134  {
136 
137  // There is no any pending demand now. We can start counting
138  // the waiting time.
140  }
141 
142  //! Get the current statistics.
143  stats_t
144  query_stats() const noexcept
145  {
146  return m_stats;
147  }
148 
149  private :
152 
155  };
156 
157 //
158 // disp_ds_name_parts_t
159 //
160 /*!
161  * \brief A class with major part of dispatcher name.
162  */
163 struct disp_ds_name_parts_t final
164  {
165  static const char * disp_type_part() noexcept { return "asio_not_mtsafe"; }
166  };
167 
168 //
169 // default_disp_impl_basis_t
170 //
171 /*!
172  * \brief A basic part of implementation of dispatcher interface to be used in
173  * places where default dispatcher is needed.
174  */
175 template< typename Activity_Tracker >
178 
179 //
180 // default_disp_binder_t
181 //
182 /*!
183  * \brief An implementation of disp_binder interface for default dispatcher
184  * for this environment infrastructure.
185  */
186 template< typename Activity_Tracker >
187 using default_disp_binder_t =
190 
191 //
192 // default_disp_impl_t
193 //
194 /*!
195  * \brief An implementation of dispatcher interface to be used in
196  * places where default dispatcher is needed.
197  *
198  * \tparam Activity_Tracker a type of activity tracker to be used
199  * for run-time statistics.
200  *
201  * \since
202  * v.5.5.19
203  */
204 template< typename Activity_Tracker >
205 using default_disp_impl_t =
210 
211 //
212 // env_infrastructure_t
213 //
214 /*!
215  * \brief Default implementation of not-thread safe single-threaded environment
216  * infrastructure.
217  *
218  * \attention
219  * This class doesn't have any mutex inside.
220  *
221  * \tparam Activity_Tracker A type of activity tracker to be used.
222  */
223 template< typename Activity_Tracker >
226  {
227  public :
229  //! Asio's io_context to be used.
230  outliving_reference_t<::asio::io_context> io_svc,
231  //! Environment to work in.
232  environment_t & env,
233  //! Cooperation action listener.
234  coop_listener_unique_ptr_t coop_listener,
235  //! Mbox for distribution of run-time stats.
236  mbox_t stats_distribution_mbox );
237 
238  virtual void
239  launch( env_init_t init_fn ) override;
240 
241  virtual void
242  stop() override;
243 
244  virtual void
246  coop_unique_ptr_t coop ) override;
247 
248  virtual void
250  nonempty_name_t name,
251  coop_dereg_reason_t dereg_reason ) override;
252 
253  virtual void
255  coop_t * coop ) override;
256 
257  virtual bool
259  std::string coop_name ) override;
260 
261  virtual so_5::timer_id_t
263  const std::type_index & type_wrapper,
264  const message_ref_t & msg,
265  const mbox_t & mbox,
267  std::chrono::steady_clock::duration period ) override;
268 
269  virtual void
270  single_timer(
271  const std::type_index & type_wrapper,
272  const message_ref_t & msg,
273  const mbox_t & mbox,
274  std::chrono::steady_clock::duration pause ) override;
275 
276  virtual stats::controller_t &
277  stats_controller() noexcept override;
278 
279  virtual stats::repository_t &
280  stats_repository() noexcept override;
281 
282  virtual dispatcher_t &
283  query_default_dispatcher() override;
284 
286  query_coop_repository_stats() override;
287 
288  virtual timer_thread_stats_t
289  query_timer_thread_stats() override;
290 
292  make_default_disp_binder() override;
293 
294  private :
295  //! Asio's io_context to be used.
297 
298  //! Actual SObjectizer Environment.
300 
301  //! Status of shutdown procedure.
303 
304  //! Repository of registered coops.
306 
307  //! Actual activity tracker.
308  Activity_Tracker m_activity_tracker;
309 
310  //! Event queue which is necessary for the default dispatcher.
311  event_queue_impl_t< Activity_Tracker > m_event_queue;
312 
313  //! Dispatcher to be used as default dispatcher.
315 
316  //! Stats controller for this environment.
318 
319  //! Counter of cooperations which are waiting for final deregistration
320  //! step.
321  /*!
322  * It is necessary for building correct run-time stats.
323  */
325 
326  void
327  run_default_dispatcher_and_go_further( env_init_t init_fn );
328 
329  /*!
330  * \note Calls m_io_svc.stop() and m_default_disp.shutdown() if necessary.
331  */
332  void
334  };
335 
336 template< typename Activity_Tracker >
338  outliving_reference_t<::asio::io_context> io_svc,
339  environment_t & env,
340  coop_listener_unique_ptr_t coop_listener,
341  mbox_t stats_distribution_mbox )
342  : m_io_svc( io_svc )
343  , m_env( env )
346  , m_default_disp(
350  m_env,
353  {}
354 
355 template< typename Activity_Tracker >
356 void
357 env_infrastructure_t<Activity_Tracker>::launch( env_init_t init_fn )
358  {
359  // Post initial operation to Asio event loop.
360  ::asio::post( m_io_svc.get(), [this, init = std::move(init_fn)] {
362  } );
363 
364  // Launch Asio event loop.
365  m_io_svc.get().run();
366 
367  // Event loop can be finished in two cases:
368  // 1. SObjectizer has been shut down. We should do nothing in that case.
369  // 2. There is no more work for Asio. But SObjectizer is still working.
370  // In that case a normal shutdown must be initiated.
371  //
372  const auto still_working = [this]{
374  };
375 
376  if( still_working() )
377  {
378  // Initiate a shutdown operation.
379  stop();
380  // Run Asio event loop until shutdown will be finished.
381  do
382  {
383  m_io_svc.get().restart();
384  m_io_svc.get().run();
385  }
386  while( still_working() );
387  }
388  }
389 
390 template< typename Activity_Tracker >
391 void
392 env_infrastructure_t<Activity_Tracker>::stop()
393  {
394  // Note: if the code below throws then we don't know the actual
395  // state of env_infrastructure. Because of that we just terminate
396  // the whole application the the case of an exception.
399  {
401  ::asio::post( m_io_svc.get(), [this] {
402  // Shutdown procedure must be started.
404 
405  // All registered cooperations must be deregistered now.
407 
409  } );
410  }
411  else
413  } );
414  }
415 
416 template< typename Activity_Tracker >
417 void
419  coop_unique_ptr_t coop )
420  {
422  }
423 
424 template< typename Activity_Tracker >
425 void
427  nonempty_name_t name,
428  coop_dereg_reason_t dereg_reason )
429  {
431  }
432 
433 template< typename Activity_Tracker >
434 void
436  coop_t * coop )
437  {
439 
440  ::asio::post( m_io_svc.get(), [this, coop] {
443  } );
444  }
445 
446 template< typename Activity_Tracker >
447 bool
449  std::string coop_name )
450  {
453  }
454 
455 template< typename Activity_Tracker >
458  const std::type_index & type_index,
459  const message_ref_t & msg,
460  const mbox_t & mbox,
463  {
464  using namespace asio_common;
465 
467  if( period != std::chrono::steady_clock::duration::zero() )
468  {
471  m_io_svc.get(),
472  type_index,
473  msg,
474  mbox,
475  period ) };
476 
477  result = timer_id_t{
479 
481  }
482  else
483  {
486  m_io_svc.get(),
487  type_index,
488  msg,
489  mbox ) };
490 
491  result = timer_id_t{
493 
495  }
496 
497  return result;
498  }
499 
500 template< typename Activity_Tracker >
501 void
503  const std::type_index & type_index,
504  const message_ref_t & msg,
505  const mbox_t & mbox,
506  std::chrono::steady_clock::duration pause )
507  {
508  using namespace asio_common;
509 
512  m_io_svc.get(),
513  type_index,
514  msg,
515  mbox ) };
516 
518  }
519 
520 template< typename Activity_Tracker >
522 env_infrastructure_t<Activity_Tracker>::stats_controller() noexcept
523  {
524  return m_stats_controller;
525  }
526 
527 template< typename Activity_Tracker >
529 env_infrastructure_t<Activity_Tracker>::stats_repository() noexcept
530  {
531  return m_stats_controller;
532  }
533 
534 template< typename Activity_Tracker >
535 dispatcher_t &
537  {
538  return m_default_disp;
539  }
540 
541 template< typename Activity_Tracker >
544  {
545  const auto stats = m_coop_repo.query_stats();
546 
552  };
553  }
554 
555 template< typename Activity_Tracker >
558  {
559  // Note: this type of environment_infrastructure doesn't support
560  // statistics for timers.
561  return { 0, 0 };
562  }
563 
564 template< typename Activity_Tracker >
567  {
570  }
571 
572 template< typename Activity_Tracker >
573 void
575  env_init_t init_fn )
576  {
577  bool default_disp_started = false;
578 
579  try
580  {
581  // Event queue must know ID of the current thread.
582  // It also must start counting the waiting time.
584 
585  // Now we can start the default dispatcher.
588 
589  // Now, if init_fn will throw we must call shutdown() for
590  // the default dispatcher.
591  default_disp_started = true;
592 
593  // User-supplied init can be called now.
594  init_fn();
595  }
596  catch(...)
597  {
600 
601  throw;
602  }
603  }
604 
605 template< typename Activity_Tracker >
606 void
608  {
610  {
611  // If there is no more live coops then shutdown must be
612  // completed.
613  if( !m_coop_repo.has_live_coop() )
614  {
616  // Asio's event loop must be broken here!
617  m_io_svc.get().stop();
618 
619  // Default dispatcher can be shut down now.
621  }
622  }
623  }
624 
625 //
626 // ensure_autoshutdown_enabled
627 //
628 /*!
629  * Throws an exception if autoshutdown feature is disabled.
630  */
631 void
633  const environment_params_t & env_params )
634  {
635  if( env_params.autoshutdown_disabled() )
636  SO_5_THROW_EXCEPTION( rc_autoshutdown_must_be_enabled,
637  "autoshutdown feature must be enabled for "
638  "so_5::env_infrastructures::simple_not_mtsafe" );
639  }
640 
641 } /* namespace impl */
642 
643 //
644 // factory
645 //
646 /*!
647  * \brief A factory for creation of environment infrastructure based on
648  * Asio's event loop.
649  *
650  * \attention
651  * This environment infrastructure is not a thread safe.
652  *
653  * Usage example:
654  * \code
655 int main()
656 {
657  asio::io_context io_svc;
658 
659  so_5::launch( [](so_5::environment_t & env) {
660  ... // Some initialization stuff.
661  },
662  [&io_svc](so_5::environment_params_t & params) {
663  using asio_env = so_5::extra::env_infrastructures::asio::simple_not_mtsafe;
664 
665  params.infrastructure_factory( asio_env::factory(io_svc) );
666  } );
667 
668  return 0;
669 }
670  * \endcode
671  */
674  {
675  using namespace impl;
676 
677  return [&io_svc](
678  environment_t & env,
679  environment_params_t & env_params,
680  mbox_t stats_distribution_mbox )
681  {
682  ensure_autoshutdown_enabled( env_params );
683 
684  environment_infrastructure_t * obj = nullptr;
685 
686  // Create environment infrastructure object in dependence of
687  // work thread activity tracking flag.
688  const auto tracking = env_params.work_thread_activity_tracking();
689  if( work_thread_activity_tracking_t::on == tracking )
690  obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
691  outliving_mutable(io_svc),
692  env,
693  env_params.so5__giveout_coop_listener(),
694  std::move(stats_distribution_mbox) );
695  else
696  obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
697  outliving_mutable(io_svc),
698  env,
699  env_params.so5__giveout_coop_listener(),
700  std::move(stats_distribution_mbox) );
701 
702  return environment_infrastructure_unique_ptr_t(
703  obj,
704  environment_infrastructure_t::default_deleter() );
705  };
706  }
707 
708 } /* namespace simple_not_mtsafe */
709 
710 } /* namespace asio */
711 
712 } /* namespace env_infrastructures */
713 
714 } /* namespace extra */
715 
716 } /* namespace so_5 */
Type for representation of statistical data for this event queue.
event_queue_impl_t(outliving_reference_t<::asio::io_context > io_svc, outliving_reference_t< Activity_Tracker > activity_tracker)
Initializing constructor.
virtual void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
void start(current_thread_id_t thread_id)
Notification that event queue work is started.
void ensure_autoshutdown_enabled(const environment_params_t &env_params)
Implementation of event_queue interface for the default dispatcher.
default_disp_impl_t< Activity_Tracker > m_default_disp
Dispatcher to be used as default dispatcher.
event_queue_impl_t< Activity_Tracker > m_event_queue
Event queue which is necessary for the default dispatcher.
stats_controller_t m_stats_controller
Stats controller for this environment.
Ranges for error codes of each submodules.
Definition: details.hpp:14
std::size_t m_final_dereg_coop_count
Counter of cooperations which are waiting for final deregistration step.
virtual so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
outliving_reference_t< ::asio::io_context > m_io_svc
Asio&#39;s io_context to be used.
environment_infrastructure_factory_t factory(::asio::io_context &io_svc)
A factory for creation of environment infrastructure based on Asio&#39;s event loop.
virtual void deregister_coop(nonempty_name_t name, coop_dereg_reason_t dereg_reason) override
Default implementation of not-thread safe single-threaded environment infrastructure.
virtual so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
env_infrastructure_t(outliving_reference_t<::asio::io_context > io_svc, environment_t &env, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)