SObjectizer-5 Extra
pub.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of Asio's Thread Pool dispatcher.
4  *
5  * \since
6  * v.1.0.2
7  */
8 
9 #pragma once
10 
11 #include <so_5_extra/error_ranges.hpp>
12 
13 #include <so_5/disp_binder.hpp>
14 #include <so_5/send_functions.hpp>
15 
16 #include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
17 #include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
18 
19 #include <so_5/stats/repository.hpp>
20 #include <so_5/stats/messages.hpp>
21 #include <so_5/stats/std_names.hpp>
22 #include <so_5/stats/impl/activity_tracking.hpp>
23 
24 #include <so_5/details/invoke_noexcept_code.hpp>
25 #include <so_5/details/rollback_on_exception.hpp>
26 #include <so_5/details/abort_on_fatal_error.hpp>
27 
28 #include <so_5/outliving.hpp>
29 
30 #include <asio/io_context.hpp>
31 #include <asio/io_context_strand.hpp>
32 #include <asio/post.hpp>
33 
34 namespace so_5 {
35 
36 namespace extra {
37 
38 namespace disp {
39 
40 namespace asio_thread_pool {
41 
42 namespace errors {
43 
44 //! Asio IoService is not set for asio_thread_pool dispatcher.
47 
48 } /* namespace errors */
49 
50 //
51 // disp_params_t
52 //
53 /*!
54  * \brief Parameters for %asio_thread_pool dispatcher.
55  *
56  * \since
57  * v.1.0.2
58  */
61  {
64 
65  public :
66  //! Default constructor.
67  disp_params_t() = default;
68 
69  friend inline void
71  disp_params_t & a, disp_params_t & b ) noexcept
72  {
73  using std::swap;
74 
75  swap(
76  static_cast< activity_tracking_mixin_t & >(a),
77  static_cast< activity_tracking_mixin_t & >(b) );
78 
79  swap( a.m_thread_count, b.m_thread_count );
80  swap( a.m_io_context, b.m_io_context );
81  }
82 
83  //! Setter for thread count.
85  thread_count( std::size_t count )
86  {
87  m_thread_count = count;
88  return *this;
89  }
90 
91  //! Getter for thread count.
92  std::size_t
93  thread_count() const
94  {
95  return m_thread_count;
96  }
97 
98  //! Use external Asio io_context object with dispatcher.
99  /*!
100  * Usage example:
101  * \code
102  * int main() {
103  * asio::io_context svc;
104  * so_5::launch( [&](so_5::environment_t & env) {
105  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
106  * auto disp = asio_tp::create_private_disp(
107  * env, "asio_tp",
108  * asio_tp::disp_params_t{}.use_external_io_context(
109  * so_5::outliving_mutable(svc) ) );
110  * ...
111  * } );
112  * }
113  * \endcode
114  */
115  disp_params_t &
117  ::asio::io_context & service )
118  {
119  m_io_context = std::shared_ptr< ::asio::io_context >(
120  std::addressof( service ),
121  // Empty deleter.
122  [](::asio::io_context *) {} );
123  return *this;
124  }
125 
126  //! Use external Asio io_context object with dispatcher.
127  /*!
128  * \note
129  * Ownership of this io_context object must be shared with
130  * others.
131  */
132  disp_params_t &
134  std::shared_ptr< ::asio::io_context > service )
135  {
136  m_io_context = std::move(service);
137  return *this;
138  }
139 
140  //! Use own Asio io_context object.
141  /*!
142  * Note this object will be dynamically created at the start
143  * of the dispatcher. And will be destroyed with the dispatcher object.
144  *
145  * A created io_context can be accessed later via io_context() method.
146  */
147  disp_params_t &
149  {
150  m_io_context = std::make_shared< ::asio::io_context >();
151  return *this;
152  }
153 
154  //! Get the io_context.
156  io_context() const noexcept
157  {
158  return m_io_context;
159  }
160 
161  private :
162  //! Count of working threads.
163  /*!
164  * Value 0 means that actual thread will be detected automatically.
165  */
167 
168  //! Asio's io_context which must be used with this dispatcher.
170  };
171 
172 namespace impl {
173 
175 
176 //
177 // basic_dispatcher_iface_t
178 //
179 /*!
180  * \brief The very basic interface of %asio_thread_pool dispatcher.
181  *
182  * This class contains a minimum that is necessary for implementation
183  * of dispatcher_handle class.
184  *
185  * \since
186  * v.1.3.0
187  */
190  {
191  public :
192  virtual ~basic_dispatcher_iface_t() noexcept = default;
193 
194  //! Create a binder for that dispatcher.
195  /*!
196  * The binder will use an external strand object.
197  */
198  [[nodiscard]]
199  virtual disp_binder_shptr_t
201 
202  //! Create a binder for that dispatcher.
203  /*!
204  * The binder will use an internal (automatically created)
205  * strand object.
206  */
207  [[nodiscard]]
208  virtual disp_binder_shptr_t
210 
211  //! Get reference to io_context from that dispatcher.
212  virtual ::asio::io_context &
213  io_context() const noexcept = 0;
214  };
215 
218 
220 
221 } /* namespace impl */
222 
223 //
224 // dispatcher_handle_t
225 //
226 
227 /*!
228  * \brief A handle for %asio_thread_pool dispatcher.
229  *
230  * \since
231  * v.1.3.0
232  */
233 class [[nodiscard]] dispatcher_handle_t
234  {
236 
237  //! A reference to actual implementation of a dispatcher.
239 
241  impl::basic_dispatcher_iface_shptr_t dispatcher ) noexcept
243  {}
244 
245  //! Is this handle empty?
246  bool
247  empty() const noexcept { return !m_dispatcher; }
248 
249  public :
250  dispatcher_handle_t() noexcept = default;
251 
252  //! Get a binder for that dispatcher.
253  /*!
254  * This method requires a reference to manually created strand
255  * object for protection of agents bound via binder returned.
256  * A user should create this strand object and ensure the right
257  * lifetime of it.
258  *
259  * Usage example:
260  * \code
261  * using namespace so_5::extra::disp::asio_thread_pool;
262  *
263  * asio::io_context io_ctx;
264  * asio::io_context::strand agents_strand{ io_ctx };
265  *
266  * so_5::environment_t & env = ...;
267  * auto disp = make_dispatcher( env, "my_disp", io_ctx );
268  *
269  * env.introduce_coop( [&]( so_5::coop_t & coop ) {
270  * coop.make_agent_with_binder< some_agent_type >(
271  * disp.binder( agents_strand ),
272  * ... );
273  *
274  * coop.make_agent_with_binder< another_agent_type >(
275  * disp.binder( agents_strand ),
276  * ... );
277  *
278  * ...
279  * } );
280  * \endcode
281  *
282  * \attention
283  * An attempt to call this method on empty handle is UB.
284  */
285  [[nodiscard]]
288  ::asio::io_context::strand & strand ) const
289  {
290  return m_dispatcher->binder_with_external_strand( strand );
291  }
292 
293  //! Get a binder for that dispatcher.
294  /*!
295  * This method requires creates an internal strand object by itself.
296  *
297  * Usage example:
298  * \code
299  * using namespace so_5::extra::disp::asio_thread_pool;
300  *
301  * asio::io_context io_ctx;
302  *
303  * so_5::environment_t & env = ...;
304  * auto disp = make_dispatcher( env, "my_disp", io_ctx );
305  *
306  * env.introduce_coop( [&]( so_5::coop_t & coop ) {
307  * // This agent will use its own strand object.
308  * coop.make_agent_with_binder< some_agent_type >(
309  * disp.binder(),
310  * ... );
311  *
312  * // This agent will use its own strand object.
313  * coop.make_agent_with_binder< another_agent_type >(
314  * disp.binder(),
315  * ... );
316  * ...
317  * } );
318  * \endcode
319  *
320  * \attention
321  * An attempt to call this method on empty handle is UB.
322  *
323  * \since
324  * v.1.3.0
325  */
326  [[nodiscard]]
328  binder() const
329  {
330  return m_dispatcher->binder_with_own_strand();
331  }
332 
333  //! Get reference to io_context from that dispatcher.
334  /*!
335  * \attention
336  * An attempt to call this method on empty handle is UB.
337  */
338  [[nodiscard]]
339  ::asio::io_context &
340  io_context() noexcept
341  {
342  return m_dispatcher->io_context();
343  }
344 
345  //! Is this handle empty?
346  operator bool() const noexcept { return empty(); }
347 
348  //! Does this handle contain a reference to dispatcher?
349  bool
350  operator!() const noexcept { return !empty(); }
351 
352  //! Drop the content of handle.
353  void
354  reset() noexcept { m_dispatcher.reset(); }
355  };
356 
357 namespace impl {
358 
359 //
360 // demands_counter_t
361 //
362 /*!
363  * \brief Type of atomic counter for counting waiting demands.
364  *
365  * \since
366  * v.1.0.2
367  */
369 
370 //
371 // actual_dispatcher_iface_t
372 //
373 /*!
374  * \brief An actual interface of thread pool dispatcher.
375  *
376  * \since
377  * v.1.3.0
378  */
380  {
381  public :
382  //! Notification about binding of yet another agent.
383  virtual void
384  agent_bound() noexcept = 0;
385 
386  //! Notification about unbinding of an agent.
387  virtual void
388  agent_unbound() noexcept = 0;
389 
390  //! Get a reference for counter of pending demands.
391  virtual demands_counter_t &
392  demands_counter() noexcept = 0;
393  };
394 
395 //
396 // actual_dispatcher_shptr_t
397 //
400 
401 //
402 // thread_local_ptr_holder_t
403 //
404 /*!
405  * \brief A helper for declaration of static and thread_local pointer
406  * in a header file.
407  *
408  * If non-template class will define a static member in a header file
409  * then there is a possibility to get a link-time error about multiple
410  * definition of that member. But if a static member is defined for
411  * template class then there won't be such problem.
412  *
413  * A typical usage intended to be:
414  * \code
415  * class some_useful_class_t : public thread_local_ptr_holder_t<some_useful_class_t> {
416  * ...
417  * };
418  * \endcode
419  *
420  * \since
421  * v.1.0.2
422  */
423 template< class T >
425  {
426  private :
427  //! Value of the pointer which need to be stored.
428  static thread_local T * m_ptr;
429 
430  protected :
431  //! Access to the current value of the pointer.
432  static T *
433  ptr() noexcept { return m_ptr; }
434 
435  //! Setter for the pointer.
436  static void
437  set_ptr( T * p ) noexcept { m_ptr = p; }
438  };
439 
440 template< class T >
441 thread_local T * thread_local_ptr_holder_t<T>::m_ptr = nullptr;
442 
443 //
444 // work_thread_t
445 //
446 /*!
447  * \brief Base type for implementations of work thread wrappers.
448  *
449  * Work thread wrapper creates an instance of some type on the stack
450  * of the new thread. Then the pointer of this instance is stored in
451  * thread_local variable (as a pointer to work_thread_t). This pointer
452  * then can be retrieved later by demand handlers to get access to
453  * some dispatcher-specific data.
454  *
455  * It is assumed that there will be two derived classes:
456  * 1. One for the case when thread activity should not be tracked.
457  * 2. Another for the case when thread activity must be tracked.
458  *
459  * These derived classes will reuse some functionality from
460  * work_thread_t. And should implement on_demand() method for
461  * actual demands processing.
462  *
463  * \since
464  * v.1.0.2
465  */
467  {
468  private :
469  //! ID of the work thread.
470  /*!
471  * Gets its value in the constructor and doesn't changed later.
472  */
474 
475  protected :
476  // Constructor and destructor are accessible for derived classes only.
479  {}
480 
481  // Just to make compilers happy.
482  virtual ~work_thread_t() = default;
483 
484  //! Actual processing of the demand.
485  /*!
486  * Must be implemented in derived classes.
487  */
488  virtual void
489  on_demand( execution_demand_t demand ) noexcept = 0;
490 
491  //! ID of the work thread.
493  thread_id() const noexcept
494  {
495  return m_thread_id;
496  }
497 
498  public :
499  //! Lauch processing of demand on the context of current thread.
500  /*!
501  * Creates an instance of Derived class, stores pointer to it into
502  * a thread_local static variable, then calls io_svc.run() method.
503  *
504  * \attention
505  * Terminates the whole application if an exception will be thrown.
506  *
507  * \tparam Derived Type of an object to be created on the stack.
508  * \tparam Args Types of arguments for Derived's constructor.
509  */
510  template< typename Derived, typename... Args >
511  static void
513  //! SObjectizer Environment for which work thread was created.
514  environment_t & env,
515  //! Asio IoService to be run on the context of that thread.
516  ::asio::io_context & io_svc,
517  //! Arguments to Derived's constructor.
518  Args &&... args )
519  {
520  // We don't expect any errors here.
521  // But if something happens then there is no way to
522  // recover and the whole application should be aborted.
523  try
524  {
526  // actual_handler must be accessible via thread_local variable.
528 
529  // Prevent return from io_context::run() if there is no
530  // more Asio's events.
531  auto work = ::asio::make_work_guard( io_svc );
532  io_svc.run();
533  }
534  catch( const std::exception & x )
535  {
538  log_stream << "An exception caught in work thread "
539  "of so_5::extra::disp::asio_thread_pool dispatcher."
540  " Exception: "
541  << x.what() << std::endl;
542  }
543  } );
544  }
545  catch( ... )
546  {
549  log_stream << "An unknown exception caught in work thread "
550  "of so_5::extra::disp::asio_thread_pool dispatcher."
551  << std::endl;
552  }
553  } );
554  }
555  }
556 
557  //! An interface method for passing a demand to processing.
558  static void
559  handle_demand( execution_demand_t demand )
560  {
561  ptr()->on_demand( std::move(demand) );
562  }
563  };
564 
565 //
566 // work_thread_without_activity_tracking_t
567 //
568 /*!
569  * \brief An implementation of work thread stuff for the case when
570  * thread activity tracking is not needed.
571  *
572  * \since
573  * v.1.0.2
574  */
575 class work_thread_without_activity_tracking_t final : public work_thread_t
576  {
577  public :
579  ~work_thread_without_activity_tracking_t() override = default;
580 
581  protected :
582  virtual void
583  on_demand( execution_demand_t demand ) noexcept override
584  {
585  demand.call_handler( thread_id() );
586  }
587  };
588 
589 //
590 // work_thread_activity_collector_t
591 //
592 /*!
593  * \brief Type of collector of work thread activity data.
594  *
595  * Objects of this class store also an ID of work thread. This ID is
596  * necessary for so_5::stats::messages::work_thread_activity message.
597  * Because of that a work thread must call setup_thread_id() method
598  * before use of activity collector.
599  *
600  * \since
601  * v.1.0.2
602  */
604  {
605  private :
606  //! ID of thread for which activity stats is collected.
608 
609  //! Collected activity stats.
613 
614  public :
615  /*!
616  * \brief Setup ID of the current work thread.
617  *
618  * \attention
619  * Must be called as soon as possible after the start of the work thread.
620  */
621  void
622  setup_thread_id( current_thread_id_t tid )
623  {
624  m_thread_id = std::move(tid);
625  }
626 
627  /*!
628  * \brief Get the ID of the thread.
629  *
630  * \attention
631  * Returns actual value only after call to setup_thread_id.
632  */
634  thread_id() const noexcept { return m_thread_id; }
635 
636  /*!
637  * \brief Mark start point of new activity.
638  */
639  void
640  activity_started() noexcept
641  {
642  m_work_activity.start();
643  }
644 
645  /*!
646  * \brief Mark completion of the current activity.
647  */
648  void
649  activity_finished() noexcept
650  {
651  m_work_activity.stop();
652  }
653 
654  /*!
655  * \brief Get the current stats.
656  */
659  {
660  ::so_5::stats::work_thread_activity_stats_t result;
661  result.m_working_stats = m_work_activity.take_stats();
662 
663  return result;
664  }
665  };
666 
667 //
668 // work_thread_with_activity_tracking_t
669 //
670 /*!
671  * \brief An implementation of work thread stuff for the case when
672  * thread activity tracking must be used.
673  *
674  * \since
675  * v.1.0.2
676  */
677 class work_thread_with_activity_tracking_t final : public work_thread_t
678  {
679  private :
680  //! Activity statistics.
682 
683  public :
685  outliving_reference_t< work_thread_activity_collector_t > activity_stats )
687  {
688  // Collector must receive ID of this thread.
689  m_activity_stats.get().setup_thread_id( thread_id() );
690  }
691 
692  ~work_thread_with_activity_tracking_t() override = default;
693 
694  protected :
695  virtual void
696  on_demand( execution_demand_t demand ) noexcept override
697  {
698  m_activity_stats.get().activity_started();
699 
700  demand.call_handler( thread_id() );
701 
702  m_activity_stats.get().activity_finished();
703  }
704  };
705 
706 //
707 // class basic_binder_impl_t
708 //
709 /*!
710  * \brief Basic part of implementation of a binder for %asio_thread_pool
711  * dispatcher.
712  *
713  * \since
714  * v.1.3.0
715  */
717  : public disp_binder_t
718  , public event_queue_t
719  {
720  public :
721  //! Initializing constructor.
723  //! The actual dispatcher to be used with that binder.
724  actual_dispatcher_shptr_t dispatcher )
726  {}
727 
728  void
730  agent_t & /*agent*/ ) override
731  {
732  // There is no need to do something.
733  }
734 
735  void
737  agent_t & /*agent*/ ) noexcept override
738  {
739  // There is no need to do something.
740  }
741 
742  void
744  agent_t & agent ) noexcept override
745  {
746  // Dispatcher should know about yet another agent bound.
747  m_dispatcher->agent_bound();
748  // Agent should receive its event_queue.
749  agent.so_bind_to_dispatcher( *this );
750  }
751 
752  void
754  agent_t & /*agent*/ ) noexcept override
755  {
756  // Dispatcher should know about yet another agent unbound.
757  m_dispatcher->agent_bound();
758  }
759 
760  protected :
761  //! The actual dispatcher.
763  };
764 
765 //
766 // binder_template_t
767 //
768 /*!
769  * \brief An implementation of a binder for %asio_thread_pool dispatcher.
770  *
771  * This binder is also an event_queue for the agents bound via that binder.
772  *
773  * There is no such thing as event_queue for %asio_thread_pool dispacher.
774  * All execution demands will be stored inside Asio IoServce and dispatched
775  * for execution via asio::post mechanism. But SObjectizer requires
776  * an implementation of event_queue which must be used for agents bound
777  * to %asio_thread_pool dispatcher. This class implements this event_queue
778  * concepts.
779  *
780  * This templates implements CRTP and should be parametrized by
781  * derived type. The derived type should provide method:
782  * \code
783  * ::asio::io_context::strand & strand() noexcept;
784  * \endcode
785  *
786  * \since
787  * v.1.3.0
788  */
789 template< typename Derived >
791  : public basic_binder_impl_t
792  {
793  auto &
794  self_reference() noexcept
795  {
796  return static_cast< Derived & >( *this );
797  }
798 
799  public :
800  using basic_binder_impl_t::basic_binder_impl_t;
801 
802  void
803  push( execution_demand_t demand ) override
804  {
806 
807  // Another demand will wait for processing.
808  ++counter;
809 
811  [d = std::move(demand), &counter]() mutable {
812  // Another demand will be processed.
813  --counter;
814 
815  // Delegate processing of the demand to actual
816  // work thread.
818  } );
819  }
820  };
821 
822 //
823 // binder_with_external_strand_t
824 //
825 /*!
826  * \brief An implementation of binder that uses an external strand object.
827  *
828  * \since
829  * v.1.3.0
830  */
831 class binder_with_external_strand_t final
832  : public binder_template_t< binder_with_external_strand_t >
833  {
834  using base_type = binder_template_t< binder_with_external_strand_t >;
835 
836  public :
838  actual_dispatcher_shptr_t dispatcher,
839  outliving_reference_t< ::asio::io_context::strand > strand )
841  , m_strand{ strand }
842  {}
843 
844  ::asio::io_context::strand &
845  strand() noexcept { return m_strand.get(); }
846 
847  private :
848  //! Strand to be used with this event_queue.
850  };
851 
852 //
853 // binder_with_own_strand_t
854 //
855 /*!
856  * \brief An implementation of binder that uses an own strand object.
857  *
858  * This own strand object will be a part of the binder object.
859  *
860  * \since
861  * v.1.3.0
862  */
863 class binder_with_own_strand_t final
864  : public binder_template_t< binder_with_own_strand_t >
865  {
866  using base_type = binder_template_t< binder_with_own_strand_t >;
867 
868  public :
870  actual_dispatcher_shptr_t dispatcher )
873  {}
874 
875  ::asio::io_context::strand &
876  strand() noexcept { return m_strand; }
877 
878  private :
879  //! Strand to be used with this event_queue.
881  };
882 
883 //
884 // basic_dispatcher_skeleton_t
885 //
886 /*!
887  * \brief Basic stuff for all implementations of dispatcher.
888  *
889  * Derived classes should implement the following virtual methods:
890  * - data_source();
891  * - launch_work_threads();
892  * - wait_work_threads().
893  *
894  * \since
895  * v.1.0.2
896  */
898  {
899  protected :
901  friend class disp_data_source_t;
902 
903  public:
905  disp_params_t params )
908  {
909  }
910 
911  [[nodiscard]]
914  ::asio::io_context::strand & strand ) override
915  {
916  return { std::make_shared< binder_with_external_strand_t >(
917  shared_from_this(),
918  outliving_mutable(strand) )
919  };
920  }
921 
922  [[nodiscard]]
925  {
926  return { std::make_shared< binder_with_own_strand_t >(
927  shared_from_this() )
928  };
929  }
930 
931  ::asio::io_context &
932  io_context() const noexcept override { return *m_io_context; }
933 
934  void
935  agent_bound() noexcept override
936  {
937  ++m_agents_bound;
938  }
939 
940  void
941  agent_unbound() noexcept override
942  {
943  --m_agents_bound;
944  }
945 
947  demands_counter() noexcept override
948  {
949  return m_demands_counter;
950  }
951 
952  protected :
953  void
955  environment_t & env,
956  std::string_view data_sources_name_base )
957  {
958  data_source().set_data_sources_name_base( data_sources_name_base );
959  data_source().start( outliving_mutable(env.stats_repository()) );
960 
961  ::so_5::details::do_with_rollback_on_exception(
962  [&] { launch_work_threads(env); },
963  [this] { data_source().stop(); } );
964  }
965 
966  void
968  {
969  ::so_5::details::invoke_noexcept_code( [this] {
970  // Stopping Asio IO service.
971  m_io_context->stop();
972  } );
973  }
974 
975  void
977  {
978  ::so_5::details::invoke_noexcept_code( [this] {
979  // Waiting for complete stop of all work threads.
980  wait_work_threads();
981  // Stopping data source.
982  data_source().stop();
983  } );
984  }
985 
986  protected :
987  /*!
988  * \brief Get the count of work threads to be created.
989  */
990  std::size_t
991  thread_count() const noexcept { return m_thread_count; }
992 
993  /*!
994  * \brief Get access to actual data source instance for that
995  * dispatcher.
996  */
997  virtual disp_data_source_t &
998  data_source() noexcept = 0;
999 
1000 
1001 #if defined(__clang__)
1002 #pragma clang diagnostic push
1003 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1004 #endif
1005 
1006  /*!
1007  * \brief Data source for run-time monitoring of whole dispatcher.
1008  *
1009  * \since
1010  * v.1.0.2
1011  */
1013  : public ::so_5::stats::source_t
1014  {
1015  //! Dispatcher to work with.
1017 
1018  //! Basic prefix for data sources.
1020 
1021  //! Data source repository.
1022  /*!
1023  * Will receive actual value in start() method.
1024  *
1025  * \since
1026  * v.1.3.0
1027  */
1029 
1030  protected :
1031  //! Access to data source prefix for derived classes.
1032  const ::so_5::stats::prefix_t &
1033  base_prefix() const noexcept { return m_base_prefix; }
1034 
1035  public :
1037  : m_dispatcher( disp )
1038  {}
1039 
1040  virtual void
1041  distribute( const mbox_t & mbox ) override
1042  {
1043  const auto agents_count = m_dispatcher.m_agents_bound.load(
1044  std::memory_order_acquire );
1045 
1046  const auto demands_count = m_dispatcher.m_demands_counter.load(
1047  std::memory_order_acquire );
1048 
1049  send< ::so_5::stats::messages::quantity< std::size_t > >(
1050  mbox,
1051  m_base_prefix,
1052  ::so_5::stats::suffixes::agent_count(),
1053  agents_count );
1054 
1055  // Note: because there is no way to detect on which thread a
1056  // demand will be handled, the total number of waiting
1057  // demands is destributed for the whole dispatcher.
1058  send< ::so_5::stats::messages::quantity< std::size_t > >(
1059  mbox,
1060  m_base_prefix,
1061  ::so_5::stats::suffixes::work_thread_queue_size(),
1062  demands_count );
1063  }
1064 
1065  void
1067  std::string_view name_base )
1068  {
1069  using namespace ::so_5::disp::reuse;
1070 
1071  m_base_prefix = make_disp_prefix(
1072  "ext-asio-tp",
1073  name_base,
1074  &m_dispatcher );
1075  }
1076 
1077  void
1078  start( ::so_5::stats::repository_t & repo )
1079  {
1080  repo.add( *this );
1081  m_stats_repo = &repo;
1082  }
1083 
1084  void
1085  stop() noexcept
1086  {
1087  m_stats_repo->remove( *this );
1088  }
1089  };
1090 
1091 #if defined(__clang__)
1092 #pragma clang diagnostic pop
1093 #endif
1094 
1095  private:
1096  //! Count of work threads.
1098 
1099  //! IO Service to work with.
1101 
1102  //! Count of agents bound to that dispatcher.
1104 
1105  //! Count of waiting demands.
1107 
1108  //! Start all working threads.
1109  virtual void
1111  //! SObjectizer Environment for which threads will be created.
1112  environment_t & env ) = 0;
1113 
1114  //! Wait for finish of all threads.
1115  /*!
1116  * It is a blocking call. The current thread will be stopped until
1117  * all work thread will finish their work.
1118  */
1119  virtual void
1120  wait_work_threads() noexcept = 0;
1121  };
1122 
1123 //
1124 // dispatcher_skeleton_without_thread_activity_tracking_t
1125 //
1126 /*!
1127  * \brief Extension of basic dispatcher skeleton for the case when
1128  * work thread activity is not collected.
1129  *
1130  * This class contains disp_data_source_t instance and implements
1131  * virtual method data_source() for accessing this instance.
1132  *
1133  * It also provides static method run_work_thread() which must be called
1134  * at the beginnig of work thread.
1135  *
1136  * \since
1137  * v.1.0.2
1138  */
1141  {
1142  public :
1144  disp_params_t params )
1146  {}
1147 
1148  protected :
1149  virtual disp_data_source_t &
1150  data_source() noexcept { return m_data_source; }
1151 
1152  //! Implementation of main function for a work thread.
1153  static void
1155  environment_t & env,
1156  ::asio::io_context & io_svc,
1158  std::size_t /*index*/ )
1159  {
1160  work_thread_t::run< work_thread_without_activity_tracking_t >(
1161  env, io_svc );
1162  }
1163 
1164  private :
1165  //! Actual data source instance.
1167  };
1168 
1169 //
1170 // dispatcher_skeleton_with_thread_activity_tracking_t
1171 //
1172 /*!
1173  * \brief Extension of basic dispatcher skeleton for the case when
1174  * work thread activity must be collected.
1175  *
1176  * This class defines its own actual_disp_data_source_t type and
1177  * contains an instance of that type. There is also implementation
1178  * of data_source() virtual method for accessing this instance.
1179  *
1180  * It provides static method run_work_thread() which must be called
1181  * at the beginnig of work thread.
1182  *
1183  * \since
1184  * v.1.0.2
1185  */
1188  {
1189 #if defined(__clang__)
1190 #pragma clang diagnostic push
1191 #pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1192 #endif
1193  /*!
1194  * \brief Actual data source type for dispatcher with
1195  * work thread activity tracking.
1196  *
1197  * \since
1198  * v.1.0.2
1199  */
1201  : public disp_data_source_t
1202  {
1203  private :
1204  //! Collectors for run-time stats for every thread.
1205  std::vector<
1208 
1209  public :
1212  std::size_t thread_count )
1213  : disp_data_source_t( disp )
1215  {
1216  for( auto & c : m_collectors )
1217  c = std::make_unique< work_thread_activity_collector_t >();
1218  }
1219 
1220  virtual void
1221  distribute( const mbox_t & mbox ) override
1222  {
1223  disp_data_source_t::distribute( mbox );
1224 
1225  for( std::size_t i = 0; i != m_collectors.size(); ++i )
1226  distribute_stats_for_work_thread_at( mbox, i );
1227  }
1228 
1229  /*!
1230  * \note \a index is not checked for validity!
1231  */
1233  collector_at( std::size_t index ) noexcept
1234  {
1235  return *(m_collectors[index]);
1236  }
1237 
1238  private :
1239  void
1241  const mbox_t & mbox,
1242  std::size_t index )
1243  {
1244  std::ostringstream ss;
1245  ss << base_prefix().c_str() << "/wt-" << index;
1246 
1247  const ::so_5::stats::prefix_t prefix{ ss.str() };
1248  auto & collector = collector_at( index );
1249 
1250  so_5::send< ::so_5::stats::messages::work_thread_activity >(
1251  mbox,
1252  prefix,
1253  ::so_5::stats::suffixes::work_thread_activity(),
1254  collector.thread_id(),
1255  collector.take_activity_stats() );
1256  }
1257  };
1258 
1259 #if defined(__clang__)
1260 #pragma clang diagnostic pop
1261 #endif
1262 
1263  public :
1265  disp_params_t params )
1268  {}
1269 
1270  protected :
1271  virtual disp_data_source_t &
1272  data_source() noexcept override { return m_actual_data_source; }
1273 
1274  //! Implementation of main function for a work thread.
1275  static void
1277  //! SObjectizer Environment for which the work thread is created.
1278  environment_t & env,
1279  //! Asio IoService to be used.
1280  ::asio::io_context & io_svc,
1281  //! Dispatcher who owns this thread.
1283  //! Ordinal number of this thread.
1284  std::size_t index )
1285  {
1286  work_thread_t::run< work_thread_with_activity_tracking_t >(
1287  env,
1288  io_svc,
1289  outliving_mutable(
1290  self.m_actual_data_source.collector_at(index) ) );
1291  }
1292 
1293  private :
1294  //! Data source instance.
1296  };
1297 
1298 //
1299 // dispatcher_template_t
1300 //
1301 /*!
1302  * \brief Template-based implementation of dispatcher.
1303  *
1304  * Implements virual methods launch_work_threads() and wait_work_threads()
1305  * from basic_dispatcher_skeleton_t.
1306  *
1307  * \tparam Traits Traits-type to be used.
1308  * \tparam Basic_Skeleton A specific skeleton to be used as base type.
1309  * It expected to be dispatcher_skeleton_with_thread_activity_tracking_t or
1310  * dispatcher_skeleton_without_thread_activity_tracking_t.
1311  *
1312  * \since
1313  * v.1.0.2
1314  */
1315 template<
1316  typename Traits,
1317  typename Basic_Skeleton >
1318 class dispatcher_template_t final : public Basic_Skeleton
1319  {
1320  public:
1322  //! SObjectizer Environment to work in.
1323  outliving_reference_t< environment_t > env,
1324  //! Value for creating names of data sources for
1325  //! run-time monitoring.
1326  std::string_view data_sources_name_base,
1327  //! Parameters for the dispatcher.
1328  disp_params_t params )
1329  : Basic_Skeleton{ std::move(params) }
1330  {
1331  this->start( env.get(), data_sources_name_base );
1332  }
1333 
1334  ~dispatcher_template_t() noexcept override
1335  {
1336  this->shutdown();
1337  this->wait();
1338  }
1339 
1340  private:
1341  //! An alias for actual thread type.
1342  using thread_t = typename Traits::thread_type;
1343  //! An alias for unique_ptr to thread.
1345 
1346  //! Working threads.
1348 
1349  virtual void
1351  environment_t & env ) override
1352  {
1353  using namespace std;
1354 
1355  m_threads.resize( this->thread_count() );
1356 
1358  for( std::size_t i = 0u; i != this->thread_count(); ++i )
1359  {
1360  m_threads[ i ] = this->make_work_thread( env, i );
1361  }
1362  },
1363  [&] {
1364  ::so_5::details::invoke_noexcept_code( [&] {
1365  this->io_context().stop();
1366 
1367  // Shutdown all started threads.
1368  for( auto & t : m_threads )
1369  if( t )
1370  {
1371  t->join();
1372  t.reset();
1373  }
1374  else
1375  // No more started threads.
1376  break;
1377  } );
1378  } );
1379  }
1380 
1381  virtual void
1382  wait_work_threads() noexcept override
1383  {
1384  for( auto & t : m_threads )
1385  {
1386  t->join();
1387  t.reset();
1388  }
1389  }
1390 
1393  environment_t & env,
1394  std::size_t index )
1395  {
1396  Basic_Skeleton * self = this;
1397  return std::make_unique< thread_t >(
1398  [&env, io_svc = &this->io_context(), self, index]()
1399  {
1401  } );
1402  }
1403  };
1404 
1405 //
1406 // dispatcher_handle_maker_t
1407 //
1409  {
1410  public :
1411  [[nodiscard]]
1412  static dispatcher_handle_t
1413  make( actual_dispatcher_shptr_t disp ) noexcept
1414  {
1415  return { std::move( disp ) };
1416  }
1417  };
1418 
1419 } /* namespace impl */
1420 
1421 //
1422 // default_thread_pool_size
1423 //
1424 /*!
1425  * \brief A helper function for detecting default thread count for
1426  * thread pool.
1427  *
1428  * \since
1429  * v.1.0.2
1430  */
1431 inline std::size_t
1433  {
1434  auto c = std::thread::hardware_concurrency();
1435  if( !c )
1436  c = 2;
1437 
1438  return c;
1439  }
1440 
1441 //
1442 // default_traits_t
1443 //
1444 /*!
1445  * \brief Default traits of %asio_thread_pool dispatcher.
1446  *
1447  * \since
1448  * v.1.0.2
1449  */
1451  {
1452  //! Type of thread.
1454  };
1455 
1456 //
1457 // make_dispatcher
1458 //
1459 /*!
1460  * \brief A function for creation an instance of %asio_thread_pool dispatcher.
1461  *
1462  * Usage examples:
1463  * \code
1464  * // Dispatcher which uses own Asio IoContext and default traits.
1465  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1466  * asio_tp::disp_params_t params;
1467  * params.use_own_io_context(); // Asio IoContext object will be created here.
1468  * // This object will be accessible later via
1469  * // private_dispatcher_t::io_context() method.
1470  * auto disp = asio_tp::make_dispatcher(
1471  * env,
1472  * "my_asio_tp",
1473  * std::move(disp_params) );
1474  *
1475  *
1476  * // Dispatcher which uses external Asio IoContext and default traits.
1477  * asio::io_context & io_svc = ...;
1478  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1479  * asio_tp::disp_params_t params;
1480  * params.use_external_io_context( io_svc );
1481  * auto disp = asio_tp::make_dispatcher(
1482  * env,
1483  * "my_asio_tp",
1484  * std::move(disp_params) );
1485  *
1486  *
1487  * // Dispatcher which uses own Asio IoContext and custom traits.
1488  * struct my_traits
1489  * {
1490  * using thread_type = my_custom_thread_type;
1491  * };
1492  * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1493  * asio_tp::disp_params_t params;
1494  * params.use_own_io_context();
1495  * auto disp = asio_tp::make_dispatcher< my_traits >(
1496  * env,
1497  * "my_asio_tp",
1498  * std::move(disp_params) );
1499  * \endcode
1500  *
1501  * \par Requirements for traits type
1502  * Traits type must define a type which looks like:
1503  * \code
1504  * struct traits
1505  * {
1506  * // Name of type to be used for thread class.
1507  * using thread_type = ...;
1508  * };
1509  * \endcode
1510  *
1511  * \par Requirements for custom thread type
1512  * By default std::thread is used as a class for working with threads.
1513  * But user can specify its own custom thread type via \a Traits::thread_type
1514  * parameter. A custom thread type must be a class which looks like:
1515  * \code
1516  * class custom_thread_type {
1517  * public :
1518  * // Must provide this constructor.
1519  * // F -- is a type of functional object which can be converted
1520  * // into std::function<void()>.
1521  * template<typename F>
1522  * custom_thread_type(F && f) {...}
1523  *
1524  * // Destructor must join thread if it is not joined yet.
1525  * ~custom_thread_type() noexcept {...}
1526  *
1527  * // The same semantic like std::thread::join.
1528  * void join() noexcept {...}
1529  * };
1530  * \endcode
1531  * This class doesn't need to be DefaultConstructible, CopyConstructible,
1532  * MoveConstructible, Copyable or Moveable.
1533  *
1534  * \tparam Traits Type with traits for a dispatcher. For the requirements
1535  * for \a Traits type see the section "Requirements for traits type" above.
1536  *
1537  * \since
1538  * v.1.0.2
1539  */
1540 template< typename Traits = default_traits_t >
1541 [[nodiscard]]
1542 inline dispatcher_handle_t
1544  //! SObjectizer Environment to work in.
1545  environment_t & env,
1546  //! Value for creating names of data sources for
1547  //! run-time monitoring.
1548  const std::string_view data_sources_name_base,
1549  //! Parameters for the dispatcher.
1550  disp_params_t disp_params )
1551  {
1552  const auto io_svc_ptr = disp_params.io_context();
1553  if( !io_svc_ptr )
1556  "io_context is not set in disp_params" );
1557 
1558  if( !disp_params.thread_count() )
1560 
1563  // Type of result pointer.
1565  // Actual type of dispatcher without thread activity tracking.
1567  Traits,
1569  // Actual type of dispatcher with thread activity tracking.
1571  Traits,
1575  std::move(disp_params) );
1576 
1578  }
1579 
1580 } /* namespace asio_thread_pool */
1581 
1582 } /* namespace disp */
1583 
1584 } /* namespace extra */
1585 
1586 } /* namespace so_5 */
An implementation of a binder for asio_thread_pool dispatcher.
Definition: pub.hpp:790
A helper for declaration of static and thread_local pointer in a header file.
Definition: pub.hpp:424
::so_5::stats::prefix_t m_base_prefix
Basic prefix for data sources.
Definition: pub.hpp:1019
Parameters for asio_thread_pool dispatcher.
Definition: pub.hpp:59
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
Definition: pub.hpp:340
static void run_work_thread(environment_t &env, ::asio::io_context &io_svc, dispatcher_skeleton_without_thread_activity_tracking_t &, std::size_t)
Implementation of main function for a work thread.
Definition: pub.hpp:1154
virtual void launch_work_threads(environment_t &env) override
Definition: pub.hpp:1350
outliving_reference_t< ::asio::io_context::strand > m_strand
Strand to be used with this event_queue.
Definition: pub.hpp:849
std::shared_ptr< ::asio::io_context > io_context() const noexcept
Get the io_context.
Definition: pub.hpp:156
std::size_t thread_count() const noexcept
Get the count of work threads to be created.
Definition: pub.hpp:991
A handle for asio_thread_pool dispatcher.
Definition: pub.hpp:233
void bind(agent_t &agent) noexcept override
Definition: pub.hpp:743
::so_5::stats::activity_tracking_stuff::stats_collector_t< ::so_5::stats::activity_tracking_stuff::internal_lock > m_work_activity
Collected activity stats.
Definition: pub.hpp:612
std::vector< std::unique_ptr< work_thread_activity_collector_t > > m_collectors
Collectors for run-time stats for every thread.
Definition: pub.hpp:1207
Basic stuff for all implementations of dispatcher.
Definition: pub.hpp:897
virtual void agent_unbound() noexcept=0
Notification about unbinding of an agent.
void agent_bound() noexcept override
Notification about binding of yet another agent.
Definition: pub.hpp:935
static void set_ptr(T *p) noexcept
Setter for the pointer.
Definition: pub.hpp:437
void activity_finished() noexcept
Mark completion of the current activity.
Definition: pub.hpp:649
void reset() noexcept
Drop the content of handle.
Definition: pub.hpp:354
::asio::io_context & io_context() const noexcept override
Get reference to io_context from that dispatcher.
Definition: pub.hpp:932
Base type for implementations of work thread wrappers.
Definition: pub.hpp:466
void push(execution_demand_t demand) override
Definition: pub.hpp:803
const std::shared_ptr< ::asio::io_context > m_io_context
IO Service to work with.
Definition: pub.hpp:1100
Template-based implementation of dispatcher.
Definition: pub.hpp:1318
disp_binder_shptr_t binder(::asio::io_context::strand &strand) const
Get a binder for that dispatcher.
Definition: pub.hpp:287
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
Definition: pub.hpp:85
Basic part of implementation of a binder for asio_thread_pool dispatcher.
Definition: pub.hpp:716
An actual interface of thread pool dispatcher.
Definition: pub.hpp:379
virtual disp_data_source_t & data_source() noexcept=0
Get access to actual data source instance for that dispatcher.
const std::size_t m_thread_count
Count of work threads.
Definition: pub.hpp:1097
virtual void on_demand(execution_demand_t demand) noexcept override
Actual processing of the demand.
Definition: pub.hpp:583
static thread_local T * m_ptr
Value of the pointer which need to be stored.
Definition: pub.hpp:428
static void run(environment_t &env, ::asio::io_context &io_svc, Args &&... args)
Lauch processing of demand on the context of current thread.
Definition: pub.hpp:512
std::size_t thread_count() const
Getter for thread count.
Definition: pub.hpp:93
Ranges for error codes of each submodules.
Definition: details.hpp:13
void setup_thread_id(current_thread_id_t tid)
Setup ID of the current work thread.
Definition: pub.hpp:622
std::shared_ptr< ::asio::io_context > m_io_context
Asio&#39;s io_context which must be used with this dispatcher.
Definition: pub.hpp:169
disp_binder_shptr_t binder_with_external_strand(::asio::io_context::strand &strand) override
Create a binder for that dispatcher.
Definition: pub.hpp:913
static T * ptr() noexcept
Access to the current value of the pointer.
Definition: pub.hpp:433
std::vector< thread_unique_ptr_t > m_threads
Working threads.
Definition: pub.hpp:1347
virtual void on_demand(execution_demand_t demand) noexcept=0
Actual processing of the demand.
void start(environment_t &env, std::string_view data_sources_name_base)
Definition: pub.hpp:954
actual_dispatcher_shptr_t m_dispatcher
The actual dispatcher.
Definition: pub.hpp:762
outliving_reference_t< work_thread_activity_collector_t > m_activity_stats
Activity statistics.
Definition: pub.hpp:681
demands_counter_t m_demands_counter
Count of waiting demands.
Definition: pub.hpp:1106
virtual demands_counter_t & demands_counter() noexcept=0
Get a reference for counter of pending demands.
disp_params_t & use_own_io_context()
Use own Asio io_context object.
Definition: pub.hpp:148
disp_params_t & use_external_io_context(::asio::io_context &service)
Use external Asio io_context object with dispatcher.
Definition: pub.hpp:116
static void run_work_thread(environment_t &env, ::asio::io_context &io_svc, dispatcher_skeleton_with_thread_activity_tracking_t &self, std::size_t index)
Implementation of main function for a work thread.
Definition: pub.hpp:1276
disp_binder_shptr_t binder_with_own_strand() override
Create a binder for that dispatcher.
Definition: pub.hpp:924
::so_5::stats::repository_t * m_stats_repo
Data source repository.
Definition: pub.hpp:1028
basic_binder_impl_t(actual_dispatcher_shptr_t dispatcher)
Initializing constructor.
Definition: pub.hpp:722
virtual void launch_work_threads(environment_t &env)=0
Start all working threads.
demands_counter_t & demands_counter() noexcept override
Get a reference for counter of pending demands.
Definition: pub.hpp:947
static void handle_demand(execution_demand_t demand)
An interface method for passing a demand to processing.
Definition: pub.hpp:559
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
Definition: pub.hpp:1432
Data source for run-time monitoring of whole dispatcher.
Definition: pub.hpp:1012
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
Definition: pub.hpp:70
bool empty() const noexcept
Is this handle empty?
Definition: pub.hpp:247
virtual disp_data_source_t & data_source() noexcept override
Get access to actual data source instance for that dispatcher.
Definition: pub.hpp:1272
work_thread_with_activity_tracking_t(outliving_reference_t< work_thread_activity_collector_t > activity_stats)
Definition: pub.hpp:684
::asio::io_context::strand & strand() noexcept
Definition: pub.hpp:876
operator bool() const noexcept
Is this handle empty?
Definition: pub.hpp:346
disp_params_t & use_external_io_context(std::shared_ptr< ::asio::io_context > service)
Use external Asio io_context object with dispatcher.
Definition: pub.hpp:133
Default traits of asio_thread_pool dispatcher.
Definition: pub.hpp:1450
virtual disp_binder_shptr_t binder_with_own_strand()=0
Create a binder for that dispatcher.
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
Definition: pub.hpp:238
dispatcher_template_t(outliving_reference_t< environment_t > env, std::string_view data_sources_name_base, disp_params_t params)
Definition: pub.hpp:1321
std::atomic< std::size_t > m_agents_bound
Count of agents bound to that dispatcher.
Definition: pub.hpp:1103
current_thread_id_t thread_id() const noexcept
Get the ID of the thread.
Definition: pub.hpp:634
virtual disp_binder_shptr_t binder_with_external_strand(::asio::io_context::strand &)=0
Create a binder for that dispatcher.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
Definition: pub.hpp:350
static dispatcher_handle_t make(actual_dispatcher_shptr_t disp) noexcept
Definition: pub.hpp:1413
const ::so_5::stats::prefix_t & base_prefix() const noexcept
Access to data source prefix for derived classes.
Definition: pub.hpp:1033
The very basic interface of asio_thread_pool dispatcher.
Definition: pub.hpp:188
Extension of basic dispatcher skeleton for the case when work thread activity must be collected...
Definition: pub.hpp:1186
Type of collector of work thread activity data.
Definition: pub.hpp:603
virtual void agent_bound() noexcept=0
Notification about binding of yet another agent.
virtual ::asio::io_context & io_context() const noexcept=0
Get reference to io_context from that dispatcher.
virtual disp_data_source_t & data_source() noexcept
Get access to actual data source instance for that dispatcher.
Definition: pub.hpp:1150
::asio::io_context::strand m_strand
Strand to be used with this event_queue.
Definition: pub.hpp:880
current_thread_id_t m_thread_id
ID of the work thread.
Definition: pub.hpp:473
Extension of basic dispatcher skeleton for the case when work thread activity is not collected...
Definition: pub.hpp:1139
::so_5::stats::work_thread_activity_stats_t take_activity_stats() noexcept
Get the current stats.
Definition: pub.hpp:658
binder_with_own_strand_t(actual_dispatcher_shptr_t dispatcher)
Definition: pub.hpp:869
virtual void wait_work_threads() noexcept=0
Wait for finish of all threads.
disp_params_t()=default
Default constructor.
virtual void on_demand(execution_demand_t demand) noexcept override
Actual processing of the demand.
Definition: pub.hpp:696
const int rc_io_context_is_not_set
Asio IoService is not set for asio_thread_pool dispatcher.
Definition: pub.hpp:45
current_thread_id_t thread_id() const noexcept
ID of the work thread.
Definition: pub.hpp:493
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t disp_params)
A function for creation an instance of asio_thread_pool dispatcher.
Definition: pub.hpp:1543
void activity_started() noexcept
Mark start point of new activity.
Definition: pub.hpp:640
void agent_unbound() noexcept override
Notification about unbinding of an agent.
Definition: pub.hpp:941
current_thread_id_t m_thread_id
ID of thread for which activity stats is collected.
Definition: pub.hpp:607
binder_with_external_strand_t(actual_dispatcher_shptr_t dispatcher, outliving_reference_t< ::asio::io_context::strand > strand)
Definition: pub.hpp:837
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
Definition: pub.hpp:240
std::size_t m_thread_count
Count of working threads.
Definition: pub.hpp:166
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.
Definition: pub.hpp:328
void undo_preallocation(agent_t &) noexcept override
Definition: pub.hpp:736
basic_dispatcher_skeleton_t & m_dispatcher
Dispatcher to work with.
Definition: pub.hpp:1016
thread_unique_ptr_t make_work_thread(environment_t &env, std::size_t index)
Definition: pub.hpp:1392