SObjectizer-5 Extra
simple_mtsafe.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio-based simple thread safe
4  * environment infrastructure.
5  */
6 
7 #pragma once
8 
9 #include <so_5_extra/env_infrastructures/asio/impl/common.hpp>
10 
11 #include <so_5/impl/st_env_infrastructure_reuse.hpp>
12 #include <so_5/impl/internal_env_iface.hpp>
13 #include <so_5/details/sync_helpers.hpp>
14 #include <so_5/details/at_scope_exit.hpp>
15 #include <so_5/details/invoke_noexcept_code.hpp>
16 
17 #include <asio.hpp>
18 
19 #include <string_view>
20 
21 namespace so_5 {
22 
23 namespace extra {
24 
25 namespace env_infrastructures {
26 
27 namespace asio {
28 
29 namespace simple_mtsafe {
30 
31 namespace impl {
32 
33 //! A short name for namespace with reusable stuff.
35 
36 //! A short name for namespace with common stuff.
38 
39 //
40 // shutdown_status_t
41 //
43 
44 //
45 // coop_repo_t
46 //
47 /*!
48  * \brief Implementation of coop_repository for
49  * simple thread-safe single-threaded environment infrastructure.
50  */
52 
53 //
54 // stats_controller_t
55 //
56 /*!
57  * \brief Implementation of stats_controller for that type of
58  * single-threaded environment.
59  */
60 using stats_controller_t =
62 
63 //
64 // event_queue_impl_t
65 //
66 /*!
67  * \brief Implementation of event_queue interface for the default dispatcher.
68  *
69  * \tparam Activity_Tracker A type for tracking work thread activity.
70  */
71 template< typename Activity_Tracker >
73  {
75 
76  public :
77  //! Type for representation of statistical data for this event queue.
78  struct stats_t
79  {
80  //! The current size of the demands queue.
82  };
83 
84  //! Initializing constructor.
86  //! Asio's io_context to be used for dispatching.
87  outliving_reference_t<::asio::io_context> io_svc,
88  //! Actual activity tracker.
89  outliving_reference_t<Activity_Tracker> activity_tracker )
90  : m_io_svc(io_svc)
93  {
94  }
95 
96  virtual void
97  push( execution_demand_t demand ) override
98  {
99  // Statistics must be updated.
101 
102  // Now we can schedule processing of the demand.
103  // It ::asio::post fails then statistics must be reverted.
105  [&] {
106  ::asio::post(
107  m_io_svc.get(),
108  [this, d = std::move(demand)]() mutable {
109  // Statistics must be updated.
111 
112  // Update wait statistics.
114  const auto wait_starter = ::so_5::details::at_scope_exit(
115  [this]{ m_activity_tracker.get().wait_started(); } );
116 
117  // The demand can be handled now.
118  // With working time tracking.
120  {
121  // For the case if call_handler will throw.
122  const auto stopper = ::so_5::details::at_scope_exit(
123  [this]{ m_activity_tracker.get().work_stopped(); });
124 
126  }
127  } );
128  },
129  [this] {
131  } );
132  }
133 
134  //! Notification that event queue work is started.
135  void
137  //! ID of the main working thread.
138  current_thread_id_t thread_id )
139  {
141 
142  // There is no any pending demand now. We can start counting
143  // the waiting time.
145  }
146 
147  //! Get the current statistics.
148  stats_t
149  query_stats() const noexcept
150  {
152  }
153 
154  private :
157 
159 
161  };
162 
163 //
164 // disp_ds_name_parts_t
165 //
166 /*!
167  * \brief A class with major part of dispatcher name.
168  */
169 struct disp_ds_name_parts_t final
170  {
171  static constexpr std::string_view
172  disp_type_part() noexcept { return { "asio_mtsafe" }; }
173  };
174 
175 //
176 // default_dispatcher_t
177 //
178 /*!
179  * \brief An implementation of dispatcher to be used in
180  * places where default dispatcher is needed.
181  *
182  * \tparam Activity_Tracker a type of activity tracker to be used
183  * for run-time statistics.
184  *
185  * \since
186  * v.1.3.0
187  */
188 template< typename Activity_Tracker >
190  : public reusable::default_dispatcher_t<
194  {
199 
200  public :
202  outliving_reference_t< environment_t > env,
203  outliving_reference_t< event_queue_impl_t<Activity_Tracker> > event_queue,
204  outliving_reference_t< Activity_Tracker > activity_tracker )
206  {
207  // Event queue should be started manually.
208  // We known that the default dispatcher is created on a thread
209  // that will be used for events dispatching.
210  event_queue.get().start( this->thread_id() );
211  }
212  };
213 
214 //
215 // env_infrastructure_t
216 //
217 /*!
218  * \brief Default implementation of not-thread safe single-threaded environment
219  * infrastructure.
220  *
221  * \attention
222  * This object doesn't have any mutexes. All synchronization is done via
223  * delegation mutating operations to Asio's context (asio::post and
224  * asio::dispatch are used).
225  *
226  * \tparam Activity_Tracker A type of activity tracker to be used.
227  */
228 template< typename Activity_Tracker >
231  {
232  public :
234  //! Asio's io_context to be used.
235  outliving_reference_t<::asio::io_context> io_svc,
236  //! Environment to work in.
237  environment_t & env,
238  //! Cooperation action listener.
239  coop_listener_unique_ptr_t coop_listener,
240  //! Mbox for distribution of run-time stats.
241  mbox_t stats_distribution_mbox );
242 
243  void
244  launch( env_init_t init_fn ) override;
245 
246  void
247  stop() override;
248 
249  [[nodiscard]]
251  make_coop(
254 
257  coop_unique_holder_t coop ) override;
258 
259  void
261  coop_shptr_t coop ) noexcept override;
262 
263  bool
265  coop_shptr_t coop_name ) override;
266 
269  const std::type_index & type_wrapper,
270  const message_ref_t & msg,
271  const mbox_t & mbox,
273  std::chrono::steady_clock::duration period ) override;
274 
275  void
276  single_timer(
277  const std::type_index & type_wrapper,
278  const message_ref_t & msg,
279  const mbox_t & mbox,
280  std::chrono::steady_clock::duration pause ) override;
281 
283  stats_controller() noexcept override;
284 
286  stats_repository() noexcept override;
287 
289  query_coop_repository_stats() override;
290 
292  query_timer_thread_stats() override;
293 
295  make_default_disp_binder() override;
296 
297  private :
298  //! Asio's io_context to be used.
300 
301  //! Actual SObjectizer Environment.
303 
304  //! Status of shutdown procedure.
306 
307  //! Repository of registered coops.
309 
310  //! Actual activity tracker.
311  Activity_Tracker m_activity_tracker;
312 
313  //! Event queue which is necessary for the default dispatcher.
314  event_queue_impl_t< Activity_Tracker > m_event_queue;
315 
316  //! Dispatcher to be used as default dispatcher.
317  /*!
318  * \note
319  * Has an actual value only inside launch() method.
320  */
322 
323  //! Stats controller for this environment.
325 
326  //! Counter of cooperations which are waiting for final deregistration
327  //! step.
328  /*!
329  * It is necessary for building correct run-time stats.
330  */
332 
333  //! The pointer to an exception that was thrown during init phase.
334  /*!
335  * This exception is stored inside a callback posted to Asio.
336  * An then this exception will be rethrown from launch() method
337  * after the shutdown of SObjectizer.
338  */
340 
341  void
342  run_default_dispatcher_and_go_further( env_init_t init_fn );
343 
344  /*!
345  * \note Calls m_io_svc.stop() and m_default_disp.shutdown() if necessary.
346  *
347  * \attention Must be called only for locked object!
348  */
349  void
351  };
352 
353 template< typename Activity_Tracker >
355  outliving_reference_t<::asio::io_context> io_svc,
356  environment_t & env,
357  coop_listener_unique_ptr_t coop_listener,
358  mbox_t stats_distribution_mbox )
359  : m_io_svc( io_svc )
360  , m_env( env )
368  {}
369 
370 template< typename Activity_Tracker >
371 void
372 env_infrastructure_t<Activity_Tracker>::launch( env_init_t init_fn )
373  {
374  // Post initial operation to Asio event loop.
375  ::asio::post( m_io_svc.get(), [this, init = std::move(init_fn)] {
377  } );
378 
379  // Default dispatcher should be destroyed on exit from this function.
382  } );
383 
384  // Tell that there is a work to do.
385  auto work = ::asio::make_work_guard( m_io_svc.get() );
386 
387  // Launch Asio event loop.
388  m_io_svc.get().run();
389 
391  // Some exception was thrown during initialization.
392  // It should be rethrown.
394  }
395 
396 template< typename Activity_Tracker >
397 void
398 env_infrastructure_t<Activity_Tracker>::stop()
399  {
400  // NOTE: if the code below throws then we don't know the actual
401  // state of env_infrastructure. Because of that we just terminate
402  // the whole application the the case of an exception.
408  {
409  // All registered cooperations must be deregistered now.
410  ::asio::dispatch( m_io_svc.get(),
411  [this] {
413 
415 
417  } );
418  }
419  else
420  // Check for shutdown completeness must be performed only
421  // on the main Asio's thread.
422  ::asio::dispatch( m_io_svc.get(), [this] {
424  } );
425  } );
426  }
427 
428 template< typename Activity_Tracker >
430 env_infrastructure_t< Activity_Tracker >::make_coop(
433  {
434  return m_coop_repo.make_coop(
435  std::move(parent),
436  std::move(default_binder) );
437  }
438 
439 template< typename Activity_Tracker >
441 env_infrastructure_t< Activity_Tracker >::register_coop(
443  {
444  return m_coop_repo.register_coop( std::move(coop) );
445  }
446 
447 template< typename Activity_Tracker >
448 void
450  coop_shptr_t coop_to_dereg ) noexcept
451  {
453 
454  ::asio::post( m_io_svc.get(), [this, coop = std::move(coop_to_dereg)] {
458  } );
459  }
460 
461 template< typename Activity_Tracker >
462 bool
464  coop_shptr_t coop )
465  {
468  }
469 
470 template< typename Activity_Tracker >
473  const std::type_index & type_index,
474  const message_ref_t & msg,
475  const mbox_t & mbox,
478  {
479  using namespace asio_common;
480 
481  // We do not control shutdown_status_t here. Because it seems
482  // to be safe to call schedule_timer after call to stop().
483  // New timer will simply ignored during shutdown process.
485  if( period != std::chrono::steady_clock::duration::zero() )
486  {
489  m_io_svc.get(),
490  type_index,
491  msg,
492  mbox,
493  period ) };
494 
495  result = timer_id_t{
497 
499  }
500  else
501  {
504  m_io_svc.get(),
505  type_index,
506  msg,
507  mbox ) };
508 
509  result = timer_id_t{
511 
513  }
514 
515  return result;
516  }
517 
518 template< typename Activity_Tracker >
519 void
521  const std::type_index & type_index,
522  const message_ref_t & msg,
523  const mbox_t & mbox,
524  std::chrono::steady_clock::duration pause )
525  {
526  using namespace asio_common;
527 
528  // We do not control shutdown_status_t here. Because it seems
529  // to be safe to call schedule_timer after call to stop().
530  // New timer will simply ignored during shutdown process.
531 
534  m_io_svc.get(),
535  type_index,
536  msg,
537  mbox ) };
538 
540  }
541 
542 template< typename Activity_Tracker >
544 env_infrastructure_t<Activity_Tracker>::stats_controller() noexcept
545  {
546  return m_stats_controller;
547  }
548 
549 template< typename Activity_Tracker >
551 env_infrastructure_t<Activity_Tracker>::stats_repository() noexcept
552  {
553  return m_stats_controller;
554  }
555 
556 template< typename Activity_Tracker >
559  {
560  const auto stats = m_coop_repo.query_stats();
561 
566  };
567  }
568 
569 template< typename Activity_Tracker >
572  {
573  // NOTE: this type of environment_infrastructure doesn't support
574  // statistics for timers.
575  return { 0, 0 };
576  }
577 
578 template< typename Activity_Tracker >
581  {
582  return { m_default_disp };
583  }
584 
585 template< typename Activity_Tracker >
586 void
588  env_init_t init_fn )
589  {
590  try
591  {
597 
598  // User-supplied init can be called now.
599  init_fn();
600  }
601  catch(...)
602  {
603  // We can't restore if the following fragment throws and exception.
605  // The current exception should be stored to be
606  // rethrown later.
608 
609  // SObjectizer's shutdown should be initiated.
610  stop();
611 
612  // NOTE: pointer to the default dispatcher will be dropped
613  // in launch() method.
614  } );
615  }
616  }
617 
618 template< typename Activity_Tracker >
619 void
621  {
623  {
624  // If there is no more live coops then shutdown must be
625  // completed.
626  if( !m_coop_repo.has_live_coop() )
627  {
629  // Asio's event loop must be broken here!
630  m_io_svc.get().stop();
631  }
632  }
633  }
634 
635 } /* namespace impl */
636 
637 //
638 // factory
639 //
640 /*!
641  * \brief A factory for creation of environment infrastructure based on
642  * Asio's event loop.
643  *
644  * \attention
645  * This environment infrastructure is not a thread safe.
646  *
647  * Usage example:
648  * \code
649 int main()
650 {
651  asio::io_context io_svc;
652 
653  so_5::launch( [](so_5::environment_t & env) {
654  ... // Some initialization stuff.
655  },
656  [&io_svc](so_5::environment_params_t & params) {
657  using asio_env = so_5::extra::env_infrastructures::asio::simple_mtsafe;
658 
659  params.infrastructure_factory( asio_env::factory(io_svc) );
660  } );
661 
662  return 0;
663 }
664  * \endcode
665  */
668  {
669  using namespace impl;
670 
671  return [&io_svc](
672  environment_t & env,
673  environment_params_t & env_params,
674  mbox_t stats_distribution_mbox )
675  {
676  environment_infrastructure_t * obj = nullptr;
677 
678  // Create environment infrastructure object in dependence of
679  // work thread activity tracking flag.
680  const auto tracking = env_params.work_thread_activity_tracking();
681  if( work_thread_activity_tracking_t::on == tracking )
682  obj = new env_infrastructure_t< reusable::real_activity_tracker_t >(
683  outliving_mutable(io_svc),
684  env,
685  env_params.so5__giveout_coop_listener(),
686  std::move(stats_distribution_mbox) );
687  else
688  obj = new env_infrastructure_t< reusable::fake_activity_tracker_t >(
689  outliving_mutable(io_svc),
690  env,
691  env_params.so5__giveout_coop_listener(),
692  std::move(stats_distribution_mbox) );
693 
694  return environment_infrastructure_unique_ptr_t(
695  obj,
696  environment_infrastructure_t::default_deleter() );
697  };
698  }
699 
700 } /* namespace simple_mtsafe */
701 
702 } /* namespace asio */
703 
704 } /* namespace env_infrastructures */
705 
706 } /* namespace extra */
707 
708 } /* namespace so_5 */
Type for representation of statistical data for this event queue.
env_infrastructure_t(outliving_reference_t<::asio::io_context > io_svc, environment_t &env, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
environment_infrastructure_factory_t factory(::asio::io_context &io_svc)
A factory for creation of environment infrastructure based on Asio&#39;s event loop.
Implementation of event_queue interface for the default dispatcher.
std::exception_ptr m_exception_from_init
The pointer to an exception that was thrown during init phase.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
An implementation of dispatcher to be used in places where default dispatcher is needed.
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< event_queue_impl_t< Activity_Tracker > > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
Ranges for error codes of each submodules.
Definition: details.hpp:13
stats_t query_stats() const noexcept
Get the current statistics.
void start(current_thread_id_t thread_id)
Notification that event queue work is started.
std::atomic< std::size_t > m_final_dereg_coop_count
Counter of cooperations which are waiting for final deregistration step.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
event_queue_impl_t(outliving_reference_t<::asio::io_context > io_svc, outliving_reference_t< Activity_Tracker > activity_tracker)
Initializing constructor.
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
event_queue_impl_t< Activity_Tracker > m_event_queue
Event queue which is necessary for the default dispatcher.
stats_controller_t m_stats_controller
Stats controller for this environment.
Default implementation of not-thread safe single-threaded environment infrastructure.
std::atomic< shutdown_status_t > m_shutdown_status
Status of shutdown procedure.
outliving_reference_t< ::asio::io_context > m_io_svc
Asio&#39;s io_context to be used.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override