SObjectizer-5 Extra
Loading...
Searching...
No Matches
pub.hpp
Go to the documentation of this file.
1/*!
2 * \file
3 * \brief Implementation of Asio's Thread Pool dispatcher.
4 *
5 * \since
6 * v.1.0.2
7 */
8
9#pragma once
10
11#include <so_5_extra/error_ranges.hpp>
12
13#include <so_5/disp_binder.hpp>
14#include <so_5/send_functions.hpp>
15
16#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
17#include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
18#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
19#include <so_5/disp/reuse/work_thread_factory_params.hpp>
20
21#include <so_5/stats/repository.hpp>
22#include <so_5/stats/messages.hpp>
23#include <so_5/stats/std_names.hpp>
24#include <so_5/stats/impl/activity_tracking.hpp>
25
26#include <so_5/details/invoke_noexcept_code.hpp>
27#include <so_5/details/rollback_on_exception.hpp>
28#include <so_5/details/abort_on_fatal_error.hpp>
29
30#include <so_5/outliving.hpp>
31
32#include <asio/io_context.hpp>
33#include <asio/io_context_strand.hpp>
34#include <asio/post.hpp>
35
36namespace so_5 {
37
38namespace extra {
39
40namespace disp {
41
43
44namespace errors {
45
46//! Asio IoService is not set for asio_thread_pool dispatcher.
49
50} /* namespace errors */
51
52//
53// disp_params_t
54//
55/*!
56 * \brief Parameters for %asio_thread_pool dispatcher.
57 *
58 * \since
59 * v.1.0.2
60 */
64 {
69
70 public :
71 //! Default constructor.
72 disp_params_t() = default;
73
74 friend inline void
76 disp_params_t & a, disp_params_t & b ) noexcept
77 {
78 using std::swap;
79
80 swap(
82 static_cast< activity_tracking_mixin_t & >(b) );
83
84 swap(
85 static_cast< thread_factory_mixin_t & >(a),
86 static_cast< thread_factory_mixin_t & >(b) );
87
88 swap( a.m_thread_count, b.m_thread_count );
89 swap( a.m_io_context, b.m_io_context );
90 }
91
92 //! Setter for thread count.
94 thread_count( std::size_t count )
95 {
96 m_thread_count = count;
97 return *this;
98 }
99
100 //! Getter for thread count.
101 std::size_t
103 {
104 return m_thread_count;
105 }
106
107 //! Use external Asio io_context object with dispatcher.
108 /*!
109 * Usage example:
110 * \code
111 * int main() {
112 * asio::io_context svc;
113 * so_5::launch( [&](so_5::environment_t & env) {
114 * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
115 * auto disp = asio_tp::create_private_disp(
116 * env, "asio_tp",
117 * asio_tp::disp_params_t{}.use_external_io_context(
118 * so_5::outliving_mutable(svc) ) );
119 * ...
120 * } );
121 * }
122 * \endcode
123 */
126 ::asio::io_context & service )
127 {
128 m_io_context = std::shared_ptr< ::asio::io_context >(
129 std::addressof( service ),
130 // Empty deleter.
131 [](::asio::io_context *) {} );
132 return *this;
133 }
134
135 //! Use external Asio io_context object with dispatcher.
136 /*!
137 * \note
138 * Ownership of this io_context object must be shared with
139 * others.
140 */
143 std::shared_ptr< ::asio::io_context > service )
144 {
145 m_io_context = std::move(service);
146 return *this;
147 }
148
149 //! Use own Asio io_context object.
150 /*!
151 * Note this object will be dynamically created at the start
152 * of the dispatcher. And will be destroyed with the dispatcher object.
153 *
154 * A created io_context can be accessed later via io_context() method.
155 */
158 {
159 m_io_context = std::make_shared< ::asio::io_context >();
160 return *this;
161 }
162
163 //! Get the io_context.
165 io_context() const noexcept
166 {
167 return m_io_context;
168 }
169
170 private :
171 //! Count of working threads.
172 /*!
173 * Value 0 means that actual thread will be detected automatically.
174 */
176
177 //! Asio's io_context which must be used with this dispatcher.
179 };
180
181namespace impl {
182
184
185//
186// basic_dispatcher_iface_t
187//
188/*!
189 * \brief The very basic interface of %asio_thread_pool dispatcher.
190 *
191 * This class contains a minimum that is necessary for implementation
192 * of dispatcher_handle class.
193 *
194 * \since
195 * v.1.3.0
196 */
199 {
200 public :
201 virtual ~basic_dispatcher_iface_t() noexcept = default;
202
203 //! Create a binder for that dispatcher.
204 /*!
205 * The binder will use an external strand object.
206 */
207 [[nodiscard]]
208 virtual disp_binder_shptr_t
210
211 //! Create a binder for that dispatcher.
212 /*!
213 * The binder will use an internal (automatically created)
214 * strand object.
215 */
216 [[nodiscard]]
217 virtual disp_binder_shptr_t
219
220 //! Get reference to io_context from that dispatcher.
221 virtual ::asio::io_context &
222 io_context() const noexcept = 0;
223 };
224
227
229
230} /* namespace impl */
231
232//
233// dispatcher_handle_t
234//
235
236/*!
237 * \brief A handle for %asio_thread_pool dispatcher.
238 *
239 * \since
240 * v.1.3.0
241 */
242class [[nodiscard]] dispatcher_handle_t
243 {
245
246 //! A reference to actual implementation of a dispatcher.
248
250 impl::basic_dispatcher_iface_shptr_t dispatcher ) noexcept
252 {}
253
254 //! Is this handle empty?
255 [[nodiscard]]
256 bool
257 empty() const noexcept { return !m_dispatcher; }
258
259 public :
260 dispatcher_handle_t() noexcept = default;
261
262 //! Get a binder for that dispatcher.
263 /*!
264 * This method requires a reference to manually created strand
265 * object for protection of agents bound via binder returned.
266 * A user should create this strand object and ensure the right
267 * lifetime of it.
268 *
269 * Usage example:
270 * \code
271 * using namespace so_5::extra::disp::asio_thread_pool;
272 *
273 * asio::io_context io_ctx;
274 * asio::io_context::strand agents_strand{ io_ctx };
275 *
276 * so_5::environment_t & env = ...;
277 * auto disp = make_dispatcher( env, "my_disp", io_ctx );
278 *
279 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
280 * coop.make_agent_with_binder< some_agent_type >(
281 * disp.binder( agents_strand ),
282 * ... );
283 *
284 * coop.make_agent_with_binder< another_agent_type >(
285 * disp.binder( agents_strand ),
286 * ... );
287 *
288 * ...
289 * } );
290 * \endcode
291 *
292 * \attention
293 * An attempt to call this method on empty handle is UB.
294 */
295 [[nodiscard]]
298 ::asio::io_context::strand & strand ) const
299 {
300 return m_dispatcher->binder_with_external_strand( strand );
301 }
302
303 //! Get a binder for that dispatcher.
304 /*!
305 * This method requires creates an internal strand object by itself.
306 *
307 * Usage example:
308 * \code
309 * using namespace so_5::extra::disp::asio_thread_pool;
310 *
311 * asio::io_context io_ctx;
312 *
313 * so_5::environment_t & env = ...;
314 * auto disp = make_dispatcher( env, "my_disp", io_ctx );
315 *
316 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
317 * // This agent will use its own strand object.
318 * coop.make_agent_with_binder< some_agent_type >(
319 * disp.binder(),
320 * ... );
321 *
322 * // This agent will use its own strand object.
323 * coop.make_agent_with_binder< another_agent_type >(
324 * disp.binder(),
325 * ... );
326 * ...
327 * } );
328 * \endcode
329 *
330 * \attention
331 * An attempt to call this method on empty handle is UB.
332 *
333 * \since
334 * v.1.3.0
335 */
336 [[nodiscard]]
338 binder() const
339 {
340 return m_dispatcher->binder_with_own_strand();
341 }
342
343 //! Get reference to io_context from that dispatcher.
344 /*!
345 * \attention
346 * An attempt to call this method on empty handle is UB.
347 */
348 [[nodiscard]]
349 ::asio::io_context &
350 io_context() noexcept
351 {
352 return m_dispatcher->io_context();
353 }
354
355 //! Is this handle empty?
356 [[nodiscard]]
357 operator bool() const noexcept { return empty(); }
358
359 //! Does this handle contain a reference to dispatcher?
360 [[nodiscard]]
361 bool
362 operator!() const noexcept { return !empty(); }
363
364 //! Drop the content of handle.
365 void
366 reset() noexcept { m_dispatcher.reset(); }
367 };
368
369namespace impl {
370
371//
372// demands_counter_t
373//
374/*!
375 * \brief Type of atomic counter for counting waiting demands.
376 *
377 * \since
378 * v.1.0.2
379 */
381
382//
383// actual_dispatcher_iface_t
384//
385/*!
386 * \brief An actual interface of thread pool dispatcher.
387 *
388 * \since
389 * v.1.3.0
390 */
392 {
393 public :
394 //! Notification about binding of yet another agent.
395 virtual void
396 agent_bound() noexcept = 0;
397
398 //! Notification about unbinding of an agent.
399 virtual void
400 agent_unbound() noexcept = 0;
401
402 //! Get a reference for counter of pending demands.
403 virtual demands_counter_t &
404 demands_counter() noexcept = 0;
405 };
406
407//
408// actual_dispatcher_shptr_t
409//
412
413//
414// thread_local_ptr_holder_t
415//
416/*!
417 * \brief A helper for declaration of static and thread_local pointer
418 * in a header file.
419 *
420 * If non-template class will define a static member in a header file
421 * then there is a possibility to get a link-time error about multiple
422 * definition of that member. But if a static member is defined for
423 * template class then there won't be such problem.
424 *
425 * A typical usage intended to be:
426 * \code
427 * class some_useful_class_t : public thread_local_ptr_holder_t<some_useful_class_t> {
428 * ...
429 * };
430 * \endcode
431 *
432 * \since
433 * v.1.0.2
434 */
435template< class T >
437 {
438 private :
439 //! Value of the pointer which need to be stored.
440 static thread_local T * m_ptr;
441
442 protected :
443 //! Access to the current value of the pointer.
444 static T *
445 ptr() noexcept { return m_ptr; }
446
447 //! Setter for the pointer.
448 static void
449 set_ptr( T * p ) noexcept { m_ptr = p; }
450 };
451
452template< class T >
453thread_local T * thread_local_ptr_holder_t<T>::m_ptr = nullptr;
454
455//
456// work_thread_t
457//
458/*!
459 * \brief Base type for implementations of work thread wrappers.
460 *
461 * Work thread wrapper creates an instance of some type on the stack
462 * of the new thread. Then the pointer of this instance is stored in
463 * thread_local variable (as a pointer to work_thread_t). This pointer
464 * then can be retrieved later by demand handlers to get access to
465 * some dispatcher-specific data.
466 *
467 * It is assumed that there will be two derived classes:
468 * 1. One for the case when thread activity should not be tracked.
469 * 2. Another for the case when thread activity must be tracked.
470 *
471 * These derived classes will reuse some functionality from
472 * work_thread_t. And should implement on_demand() method for
473 * actual demands processing.
474 *
475 * \since
476 * v.1.0.2
477 */
479 {
480 private :
481 //! ID of the work thread.
482 /*!
483 * Gets its value in the constructor and doesn't changed later.
484 */
486
487 protected :
488 // Constructor and destructor are accessible for derived classes only.
492
493 // Just to make compilers happy.
494 virtual ~work_thread_t() = default;
495
496 //! Actual processing of the demand.
497 /*!
498 * Must be implemented in derived classes.
499 */
500 virtual void
501 on_demand( execution_demand_t demand ) noexcept = 0;
502
503 //! ID of the work thread.
505 thread_id() const noexcept
506 {
507 return m_thread_id;
508 }
509
510 public :
511 //! Lauch processing of demand on the context of current thread.
512 /*!
513 * Creates an instance of Derived class, stores pointer to it into
514 * a thread_local static variable, then calls io_svc.run() method.
515 *
516 * \attention
517 * Terminates the whole application if an exception will be thrown.
518 *
519 * \tparam Derived Type of an object to be created on the stack.
520 * \tparam Args Types of arguments for Derived's constructor.
521 */
522 template< typename Derived, typename... Args >
523 static void
525 //! SObjectizer Environment for which work thread was created.
526 environment_t & env,
527 //! Asio IoService to be run on the context of that thread.
528 ::asio::io_context & io_svc,
529 //! Arguments to Derived's constructor.
530 Args &&... args )
531 {
532 // We don't expect any errors here.
533 // But if something happens then there is no way to
534 // recover and the whole application should be aborted.
535 try
536 {
538 // actual_handler must be accessible via thread_local variable.
540
541 // Prevent return from io_context::run() if there is no
542 // more Asio's events.
543 auto work = ::asio::make_work_guard( io_svc );
544 io_svc.run();
545 }
546 catch( const std::exception & x )
547 {
550 log_stream << "An exception caught in work thread "
551 "of so_5::extra::disp::asio_thread_pool dispatcher."
552 " Exception: "
553 << x.what() << std::endl;
554 }
555 } );
556 }
557 catch( ... )
558 {
561 log_stream << "An unknown exception caught in work thread "
562 "of so_5::extra::disp::asio_thread_pool dispatcher."
563 << std::endl;
564 }
565 } );
566 }
567 }
568
569 //! An interface method for passing a demand to processing.
570 static void
571 handle_demand( execution_demand_t demand )
572 {
573 ptr()->on_demand( std::move(demand) );
574 }
575 };
576
577//
578// work_thread_without_activity_tracking_t
579//
580/*!
581 * \brief An implementation of work thread stuff for the case when
582 * thread activity tracking is not needed.
583 *
584 * \since
585 * v.1.0.2
586 */
587class work_thread_without_activity_tracking_t final : public work_thread_t
588 {
589 public :
592
593 protected :
594 virtual void
595 on_demand( execution_demand_t demand ) noexcept override
596 {
597 demand.call_handler( thread_id() );
598 }
599 };
600
601//
602// work_thread_activity_collector_t
603//
604/*!
605 * \brief Type of collector of work thread activity data.
606 *
607 * Objects of this class store also an ID of work thread. This ID is
608 * necessary for so_5::stats::messages::work_thread_activity message.
609 * Because of that a work thread must call setup_thread_id() method
610 * before use of activity collector.
611 *
612 * \since
613 * v.1.0.2
614 */
616 {
617 private :
618 //! ID of thread for which activity stats is collected.
620
621 //! Collected activity stats.
625
626 public :
627 /*!
628 * \brief Setup ID of the current work thread.
629 *
630 * \attention
631 * Must be called as soon as possible after the start of the work thread.
632 */
633 void
634 setup_thread_id( current_thread_id_t tid )
635 {
636 m_thread_id = std::move(tid);
637 }
638
639 /*!
640 * \brief Get the ID of the thread.
641 *
642 * \attention
643 * Returns actual value only after call to setup_thread_id.
644 */
646 thread_id() const noexcept { return m_thread_id; }
647
648 /*!
649 * \brief Mark start point of new activity.
650 */
651 void
653 {
654 m_work_activity.start();
655 }
656
657 /*!
658 * \brief Mark completion of the current activity.
659 */
660 void
662 {
663 m_work_activity.stop();
664 }
665
666 /*!
667 * \brief Get the current stats.
668 */
671 {
672 ::so_5::stats::work_thread_activity_stats_t result;
673 result.m_working_stats = m_work_activity.take_stats();
674
675 return result;
676 }
677 };
678
679//
680// work_thread_with_activity_tracking_t
681//
682/*!
683 * \brief An implementation of work thread stuff for the case when
684 * thread activity tracking must be used.
685 *
686 * \since
687 * v.1.0.2
688 */
690 {
691 private :
692 //! Activity statistics.
694
695 public :
697 outliving_reference_t< work_thread_activity_collector_t > activity_stats )
699 {
700 // Collector must receive ID of this thread.
702 }
703
705
706 protected :
707 virtual void
708 on_demand( execution_demand_t demand ) noexcept override
709 {
710 m_activity_stats.get().activity_started();
711
712 demand.call_handler( thread_id() );
713
714 m_activity_stats.get().activity_finished();
715 }
716 };
717
718//
719// class basic_binder_impl_t
720//
721/*!
722 * \brief Basic part of implementation of a binder for %asio_thread_pool
723 * dispatcher.
724 *
725 * \since
726 * v.1.3.0
727 */
729 : public disp_binder_t
730 , public event_queue_t
731 {
732 public :
733 //! Initializing constructor.
735 //! The actual dispatcher to be used with that binder.
736 actual_dispatcher_shptr_t dispatcher )
738 {}
739
740 void
742 agent_t & /*agent*/ ) override
743 {
744 // There is no need to do something.
745 }
746
747 void
749 agent_t & /*agent*/ ) noexcept override
750 {
751 // There is no need to do something.
752 }
753
754 void
756 agent_t & agent ) noexcept override
757 {
758 // Dispatcher should know about yet another agent bound.
759 m_dispatcher->agent_bound();
760 // Agent should receive its event_queue.
761 agent.so_bind_to_dispatcher( *this );
762 }
763
764 void
766 agent_t & /*agent*/ ) noexcept override
767 {
768 // Dispatcher should know about yet another agent unbound.
769 m_dispatcher->agent_bound();
770 }
771
772 protected :
773 //! The actual dispatcher.
775 };
776
777//
778// binder_template_t
779//
780/*!
781 * \brief An implementation of a binder for %asio_thread_pool dispatcher.
782 *
783 * This binder is also an event_queue for the agents bound via that binder.
784 *
785 * There is no such thing as event_queue for %asio_thread_pool dispacher.
786 * All execution demands will be stored inside Asio IoServce and dispatched
787 * for execution via asio::post mechanism. But SObjectizer requires
788 * an implementation of event_queue which must be used for agents bound
789 * to %asio_thread_pool dispatcher. This class implements this event_queue
790 * concepts.
791 *
792 * This templates implements CRTP and should be parametrized by
793 * derived type. The derived type should provide method:
794 * \code
795 * ::asio::io_context::strand & strand() noexcept;
796 * \endcode
797 *
798 * \since
799 * v.1.3.0
800 */
801template< typename Derived >
803 : public basic_binder_impl_t
804 {
805 auto &
806 self_reference() noexcept
807 {
808 return static_cast< Derived & >( *this );
809 }
810
811 public :
812 using basic_binder_impl_t::basic_binder_impl_t;
813
814 void
815 push( execution_demand_t demand ) override
816 {
818
819 // Another demand will wait for processing.
820 ++counter;
821
823 [d = std::move(demand), &counter]() mutable {
824 // Another demand will be processed.
825 --counter;
826
827 // Delegate processing of the demand to actual
828 // work thread.
830 } );
831 }
832
833 void
834 push_evt_start( execution_demand_t demand ) override
835 {
836 // Just delegate to the ordinary push().
837 this->push( std::move(demand) );
838 }
839
840 void
841 push_evt_finish( execution_demand_t demand ) noexcept override
842 {
843 // Just delegate to the ordinary push() despite
844 // the fact that push() isn't noexcept.
845 this->push( std::move(demand) );
846 }
847 };
848
849//
850// binder_with_external_strand_t
851//
852/*!
853 * \brief An implementation of binder that uses an external strand object.
854 *
855 * \since
856 * v.1.3.0
857 */
858class binder_with_external_strand_t final
859 : public binder_template_t< binder_with_external_strand_t >
860 {
861 using base_type = binder_template_t< binder_with_external_strand_t >;
862
863 public :
865 actual_dispatcher_shptr_t dispatcher,
866 outliving_reference_t< ::asio::io_context::strand > strand )
868 , m_strand{ strand }
869 {}
870
872 strand() noexcept { return m_strand.get(); }
873
874 private :
875 //! Strand to be used with this event_queue.
877 };
878
879//
880// binder_with_own_strand_t
881//
882/*!
883 * \brief An implementation of binder that uses an own strand object.
884 *
885 * This own strand object will be a part of the binder object.
886 *
887 * \since
888 * v.1.3.0
889 */
890class binder_with_own_strand_t final
891 : public binder_template_t< binder_with_own_strand_t >
892 {
893 using base_type = binder_template_t< binder_with_own_strand_t >;
894
895 public :
897 actual_dispatcher_shptr_t dispatcher )
900 {}
901
903 strand() noexcept { return m_strand; }
904
905 private :
906 //! Strand to be used with this event_queue.
908 };
909
910//
911// basic_dispatcher_skeleton_t
912//
913/*!
914 * \brief Basic stuff for all implementations of dispatcher.
915 *
916 * Derived classes should implement the following virtual methods:
917 * - data_source();
918 * - launch_work_threads();
919 * - wait_work_threads().
920 *
921 * \since
922 * v.1.0.2
923 */
925 {
926 protected :
927 class disp_data_source_t;
928 friend class disp_data_source_t;
929
930 public:
932 ::so_5::environment_t & env,
933 disp_params_t params )
938 params,
939 env )
940 )
941 {
942 }
943
944 [[nodiscard]]
947 ::asio::io_context::strand & strand ) override
948 {
949 return { std::make_shared< binder_with_external_strand_t >(
950 shared_from_this(),
951 outliving_mutable(strand) )
952 };
953 }
954
955 [[nodiscard]]
958 {
959 return { std::make_shared< binder_with_own_strand_t >(
960 shared_from_this() )
961 };
962 }
963
964 ::asio::io_context &
965 io_context() const noexcept override { return *m_io_context; }
966
967 void
968 agent_bound() noexcept override
969 {
970 ++m_agents_bound;
971 }
972
973 void
974 agent_unbound() noexcept override
975 {
976 --m_agents_bound;
977 }
978
980 demands_counter() noexcept override
981 {
982 return m_demands_counter;
983 }
984
985 protected :
986 void
988 environment_t & env,
989 std::string_view data_sources_name_base )
990 {
991 data_source().set_data_sources_name_base( data_sources_name_base );
992 data_source().start( env.stats_repository() );
993
994 ::so_5::details::do_with_rollback_on_exception(
995 [&] { launch_work_threads(env); },
996 [this] { data_source().stop(); } );
997 }
998
999 void
1001 {
1002 ::so_5::details::invoke_noexcept_code( [this] {
1003 // Stopping Asio IO service.
1004 m_io_context->stop();
1005 } );
1006 }
1007
1008 void
1010 {
1011 ::so_5::details::invoke_noexcept_code( [this] {
1012 // Waiting for complete stop of all work threads.
1013 wait_work_threads();
1014 // Stopping data source.
1015 data_source().stop();
1016 } );
1017 }
1018
1019 protected :
1020 /*!
1021 * \brief Get the count of work threads to be created.
1022 */
1023 [[nodiscard]]
1024 std::size_t
1025 thread_count() const noexcept { return m_thread_count; }
1026
1027 /*!
1028 * \brief Get access to actual data source instance for that
1029 * dispatcher.
1030 */
1031 [[nodiscard]]
1032 virtual disp_data_source_t &
1033 data_source() noexcept = 0;
1034
1035 /*!
1036 * \brief Get access to thread factory to be used for that dispatcher.
1037 *
1038 * \since v.1.5.0
1039 */
1040 [[nodiscard]]
1042 thread_factory() const noexcept { return m_thread_factory; }
1043
1044#if defined(__clang__)
1045#pragma clang diagnostic push
1046#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1047#endif
1048
1049 /*!
1050 * \brief Data source for run-time monitoring of whole dispatcher.
1051 *
1052 * \since
1053 * v.1.0.2
1054 */
1056 : public ::so_5::stats::source_t
1057 {
1058 //! Dispatcher to work with.
1060
1061 //! Basic prefix for data sources.
1063
1064 //! Data source repository.
1065 /*!
1066 * Will receive actual value in start() method.
1067 *
1068 * \since
1069 * v.1.3.0
1070 */
1072
1073 protected :
1074 //! Access to data source prefix for derived classes.
1075 const ::so_5::stats::prefix_t &
1076 base_prefix() const noexcept { return m_base_prefix; }
1077
1078 public :
1082
1083 virtual void
1084 distribute( const mbox_t & mbox ) override
1085 {
1086 const auto agents_count = m_dispatcher.m_agents_bound.load(
1087 std::memory_order_acquire );
1088
1089 const auto demands_count = m_dispatcher.m_demands_counter.load(
1090 std::memory_order_acquire );
1091
1092 send< ::so_5::stats::messages::quantity< std::size_t > >(
1093 mbox,
1094 m_base_prefix,
1095 ::so_5::stats::suffixes::agent_count(),
1096 agents_count );
1097
1098 // Note: because there is no way to detect on which thread a
1099 // demand will be handled, the total number of waiting
1100 // demands is destributed for the whole dispatcher.
1101 send< ::so_5::stats::messages::quantity< std::size_t > >(
1102 mbox,
1103 m_base_prefix,
1104 ::so_5::stats::suffixes::work_thread_queue_size(),
1105 demands_count );
1106 }
1107
1108 void
1110 std::string_view name_base )
1111 {
1112 using namespace ::so_5::disp::reuse;
1113
1114 m_base_prefix = make_disp_prefix(
1115 "ext-asio-tp",
1116 name_base,
1117 &m_dispatcher );
1118 }
1119
1120 void
1121 start( ::so_5::stats::repository_t & repo )
1122 {
1123 repo.add( *this );
1124 m_stats_repo = &repo;
1125 }
1126
1127 void
1128 stop() noexcept
1129 {
1130 m_stats_repo->remove( *this );
1131 }
1132 };
1133
1134#if defined(__clang__)
1135#pragma clang diagnostic pop
1136#endif
1137
1138 private:
1139 //! Count of work threads.
1141
1142 //! IO Service to work with.
1144
1145 /*!
1146 * \brief Thread factory to be used with this dispatcher.
1147 *
1148 * \since v.1.5.0
1149 */
1151
1152 //! Count of agents bound to that dispatcher.
1154
1155 //! Count of waiting demands.
1157
1158 //! Start all working threads.
1159 virtual void
1161 //! SObjectizer Environment for which threads will be created.
1162 environment_t & env ) = 0;
1163
1164 //! Wait for finish of all threads.
1165 /*!
1166 * It is a blocking call. The current thread will be stopped until
1167 * all work thread will finish their work.
1168 */
1169 virtual void
1170 wait_work_threads() noexcept = 0;
1171 };
1172
1173//
1174// dispatcher_skeleton_without_thread_activity_tracking_t
1175//
1176/*!
1177 * \brief Extension of basic dispatcher skeleton for the case when
1178 * work thread activity is not collected.
1179 *
1180 * This class contains disp_data_source_t instance and implements
1181 * virtual method data_source() for accessing this instance.
1182 *
1183 * It also provides static method run_work_thread() which must be called
1184 * at the beginnig of work thread.
1185 *
1186 * \since
1187 * v.1.0.2
1188 */
1191 {
1192 public :
1198
1199 protected :
1200 virtual disp_data_source_t &
1201 data_source() noexcept { return m_data_source; }
1202
1203 //! Implementation of main function for a work thread.
1204 static void
1206 environment_t & env,
1207 ::asio::io_context & io_svc,
1209 std::size_t /*index*/ )
1210 {
1211 work_thread_t::run< work_thread_without_activity_tracking_t >(
1212 env, io_svc );
1213 }
1214
1215 private :
1216 //! Actual data source instance.
1218 };
1219
1220//
1221// dispatcher_skeleton_with_thread_activity_tracking_t
1222//
1223/*!
1224 * \brief Extension of basic dispatcher skeleton for the case when
1225 * work thread activity must be collected.
1226 *
1227 * This class defines its own actual_disp_data_source_t type and
1228 * contains an instance of that type. There is also implementation
1229 * of data_source() virtual method for accessing this instance.
1230 *
1231 * It provides static method run_work_thread() which must be called
1232 * at the beginnig of work thread.
1233 *
1234 * \since
1235 * v.1.0.2
1236 */
1239 {
1240#if defined(__clang__)
1241#pragma clang diagnostic push
1242#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1243#endif
1244 /*!
1245 * \brief Actual data source type for dispatcher with
1246 * work thread activity tracking.
1247 *
1248 * \since
1249 * v.1.0.2
1250 */
1252 : public disp_data_source_t
1253 {
1254 private :
1255 //! Collectors for run-time stats for every thread.
1256 std::vector<
1259
1260 public :
1263 std::size_t thread_count )
1264 : disp_data_source_t( disp )
1266 {
1267 for( auto & c : m_collectors )
1268 c = std::make_unique< work_thread_activity_collector_t >();
1269 }
1270
1271 virtual void
1272 distribute( const mbox_t & mbox ) override
1273 {
1274 disp_data_source_t::distribute( mbox );
1275
1276 for( std::size_t i = 0; i != m_collectors.size(); ++i )
1277 distribute_stats_for_work_thread_at( mbox, i );
1278 }
1279
1280 /*!
1281 * \note \a index is not checked for validity!
1282 */
1284 collector_at( std::size_t index ) noexcept
1285 {
1286 return *(m_collectors[index]);
1287 }
1288
1289 private :
1290 void
1292 const mbox_t & mbox,
1293 std::size_t index )
1294 {
1295 std::ostringstream ss;
1296 ss << base_prefix().c_str() << "/wt-" << index;
1297
1298 const ::so_5::stats::prefix_t prefix{ ss.str() };
1299 auto & collector = collector_at( index );
1300
1301 so_5::send< ::so_5::stats::messages::work_thread_activity >(
1302 mbox,
1303 prefix,
1304 ::so_5::stats::suffixes::work_thread_activity(),
1305 collector.thread_id(),
1306 collector.take_activity_stats() );
1307 }
1308 };
1309
1310#if defined(__clang__)
1311#pragma clang diagnostic pop
1312#endif
1313
1314 public :
1321
1322 protected :
1323 virtual disp_data_source_t &
1324 data_source() noexcept override { return m_actual_data_source; }
1325
1326 //! Implementation of main function for a work thread.
1327 static void
1329 //! SObjectizer Environment for which the work thread is created.
1330 environment_t & env,
1331 //! Asio IoService to be used.
1332 ::asio::io_context & io_svc,
1333 //! Dispatcher who owns this thread.
1335 //! Ordinal number of this thread.
1336 std::size_t index )
1337 {
1338 work_thread_t::run< work_thread_with_activity_tracking_t >(
1339 env,
1340 io_svc,
1341 outliving_mutable(
1342 self.m_actual_data_source.collector_at(index) ) );
1343 }
1344
1345 private :
1346 //! Data source instance.
1348 };
1349
1350//
1351// dispatcher_template_t
1352//
1353/*!
1354 * \brief Template-based implementation of dispatcher.
1355 *
1356 * Implements virual methods launch_work_threads() and wait_work_threads()
1357 * from basic_dispatcher_skeleton_t.
1358 *
1359 * \tparam Traits Traits-type to be used.
1360 * \tparam Basic_Skeleton A specific skeleton to be used as base type.
1361 * It expected to be dispatcher_skeleton_with_thread_activity_tracking_t or
1362 * dispatcher_skeleton_without_thread_activity_tracking_t.
1363 *
1364 * \since
1365 * v.1.0.2
1366 */
1367template<
1368 typename Traits,
1369 typename Basic_Skeleton >
1370class dispatcher_template_t final : public Basic_Skeleton
1371 {
1372 public:
1374 //! SObjectizer Environment to work in.
1376 //! Value for creating names of data sources for
1377 //! run-time monitoring.
1379 //! Parameters for the dispatcher.
1382 {
1383 this->start( env.get(), data_sources_name_base );
1384 }
1385
1387 {
1388 this->shutdown();
1389 this->wait();
1390 }
1391
1392 private:
1393 //! An alias for thread_holder.
1395
1396 //! Working threads.
1398
1399 virtual void
1401 environment_t & env ) override
1402 {
1403 using namespace std;
1404
1405 m_threads.resize( this->thread_count() );
1406
1408 for( std::size_t i = 0u; i != this->thread_count(); ++i )
1409 {
1410 m_threads[ i ] = this->make_work_thread( env, i );
1411 }
1412 },
1413 [&] {
1415 this->io_context().stop();
1416
1417 // Shutdown all started threads.
1418 for( auto & t : m_threads )
1419 if( t )
1420 {
1421 t.unchecked_get().join();
1422 t = thread_holder_t{};
1423 }
1424 else
1425 // No more started threads.
1426 break;
1427 } );
1428 } );
1429 }
1430
1431 virtual void
1432 wait_work_threads() noexcept override
1433 {
1434 for( auto & t : m_threads )
1435 {
1436 t.unchecked_get().join();
1437 t = thread_holder_t{};
1438 }
1439 }
1440
1444 std::size_t index )
1445 {
1446 Basic_Skeleton * self = this;
1448 this->thread_factory()->acquire( env ),
1449 this->thread_factory()
1450 };
1452 [&env, io_svc = &this->io_context(), self, index]()
1453 {
1455 } );
1456
1457 return work_thread;
1458 }
1459 };
1460
1461//
1462// dispatcher_handle_maker_t
1463//
1465 {
1466 public :
1467 [[nodiscard]]
1468 static dispatcher_handle_t
1469 make( actual_dispatcher_shptr_t disp ) noexcept
1470 {
1471 return { std::move( disp ) };
1472 }
1473 };
1474
1475} /* namespace impl */
1476
1477//
1478// default_thread_pool_size
1479//
1480/*!
1481 * \brief A helper function for detecting default thread count for
1482 * thread pool.
1483 *
1484 * \since
1485 * v.1.0.2
1486 */
1487inline std::size_t
1489 {
1490 auto c = std::thread::hardware_concurrency();
1491 if( !c )
1492 c = 2;
1493
1494 return c;
1495 }
1496
1497//
1498// default_traits_t
1499//
1500/*!
1501 * \brief Default traits of %asio_thread_pool dispatcher.
1502 *
1503 * \note
1504 * This type is empty in v.1.5.0.
1505 * It is left empty intentionally to have a possibility to extend it later, in
1506 * some future version.
1507 *
1508 * \since
1509 * v.1.0.2
1510 */
1512 {
1513 };
1514
1515//
1516// make_dispatcher
1517//
1518/*!
1519 * \brief A function for creation an instance of %asio_thread_pool dispatcher.
1520 *
1521 * Usage examples:
1522 * \code
1523 * // Dispatcher which uses own Asio IoContext and default traits.
1524 * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1525 * asio_tp::disp_params_t params;
1526 * params.use_own_io_context(); // Asio IoContext object will be created here.
1527 * // This object will be accessible later via
1528 * // dispatcher_handle_t::io_context() method.
1529 * auto disp = asio_tp::make_dispatcher(
1530 * env,
1531 * "my_asio_tp",
1532 * std::move(disp_params) );
1533 *
1534 *
1535 * // Dispatcher which uses external Asio IoContext and default traits.
1536 * asio::io_context & io_svc = ...;
1537 * namespace asio_tp = so_5::extra::disp::asio_thread_pool;
1538 * asio_tp::disp_params_t params;
1539 * params.use_external_io_context( io_svc );
1540 * auto disp = asio_tp::make_dispatcher(
1541 * env,
1542 * "my_asio_tp",
1543 * std::move(disp_params) );
1544 * \endcode
1545 *
1546 * \par Requirements for traits type
1547 * The Traits-type is empty in v.1.5.0. There was a possibitily to specify
1548 * a custom thread type in previous versions of so_5_extra, but since v.1.5.0
1549 * custom threads are supported via standard SObjectizer's mechanism based
1550 * on `abstract_work_thread_t`/`abstract_work_thread_factory_t` interfaces.
1551 * But the Traits-type might be extended by some content in future versions.
1552 *
1553 * \tparam Traits Type with traits for a dispatcher. For the requirements
1554 * for \a Traits type see the section "Requirements for traits type" above.
1555 *
1556 * \since
1557 * v.1.0.2
1558 */
1559template< typename Traits = default_traits_t >
1560[[nodiscard]]
1563 //! SObjectizer Environment to work in.
1564 environment_t & env,
1565 //! Value for creating names of data sources for
1566 //! run-time monitoring.
1567 const std::string_view data_sources_name_base,
1568 //! Parameters for the dispatcher.
1569 disp_params_t disp_params )
1570 {
1571 const auto io_svc_ptr = disp_params.io_context();
1572 if( !io_svc_ptr )
1575 "io_context is not set in disp_params" );
1576
1577 if( !disp_params.thread_count() )
1579
1582 // Type of result pointer.
1584 // Actual type of dispatcher without thread activity tracking.
1586 Traits,
1588 // Actual type of dispatcher with thread activity tracking.
1590 Traits,
1594 std::move(disp_params) );
1595
1597 }
1598
1599} /* namespace asio_thread_pool */
1600
1601} /* namespace disp */
1602
1603} /* namespace extra */
1604
1605} /* namespace so_5 */
Parameters for asio_thread_pool dispatcher.
Definition pub.hpp:64
std::size_t m_thread_count
Count of working threads.
Definition pub.hpp:175
std::shared_ptr< ::asio::io_context > io_context() const noexcept
Get the io_context.
Definition pub.hpp:165
std::shared_ptr< ::asio::io_context > m_io_context
Asio's io_context which must be used with this dispatcher.
Definition pub.hpp:178
disp_params_t & use_external_io_context(::asio::io_context &service)
Use external Asio io_context object with dispatcher.
Definition pub.hpp:125
disp_params_t & use_external_io_context(std::shared_ptr< ::asio::io_context > service)
Use external Asio io_context object with dispatcher.
Definition pub.hpp:142
disp_params_t & use_own_io_context()
Use own Asio io_context object.
Definition pub.hpp:157
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
Definition pub.hpp:94
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
Definition pub.hpp:75
std::size_t thread_count() const
Getter for thread count.
Definition pub.hpp:102
A handle for asio_thread_pool dispatcher.
Definition pub.hpp:243
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
Definition pub.hpp:362
void reset() noexcept
Drop the content of handle.
Definition pub.hpp:366
operator bool() const noexcept
Is this handle empty?
Definition pub.hpp:357
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
Definition pub.hpp:350
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
Definition pub.hpp:249
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
Definition pub.hpp:247
bool empty() const noexcept
Is this handle empty?
Definition pub.hpp:257
disp_binder_shptr_t binder(::asio::io_context::strand &strand) const
Get a binder for that dispatcher.
Definition pub.hpp:297
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.
Definition pub.hpp:338
An actual interface of thread pool dispatcher.
Definition pub.hpp:392
virtual demands_counter_t & demands_counter() noexcept=0
Get a reference for counter of pending demands.
virtual void agent_unbound() noexcept=0
Notification about unbinding of an agent.
virtual void agent_bound() noexcept=0
Notification about binding of yet another agent.
Basic part of implementation of a binder for asio_thread_pool dispatcher.
Definition pub.hpp:731
actual_dispatcher_shptr_t m_dispatcher
The actual dispatcher.
Definition pub.hpp:774
void bind(agent_t &agent) noexcept override
Definition pub.hpp:755
void undo_preallocation(agent_t &) noexcept override
Definition pub.hpp:748
basic_binder_impl_t(actual_dispatcher_shptr_t dispatcher)
Initializing constructor.
Definition pub.hpp:734
The very basic interface of asio_thread_pool dispatcher.
Definition pub.hpp:199
virtual disp_binder_shptr_t binder_with_own_strand()=0
Create a binder for that dispatcher.
virtual::asio::io_context & io_context() const noexcept=0
Get reference to io_context from that dispatcher.
virtual disp_binder_shptr_t binder_with_external_strand(::asio::io_context::strand &)=0
Create a binder for that dispatcher.
::so_5::stats::prefix_t m_base_prefix
Basic prefix for data sources.
Definition pub.hpp:1062
const ::so_5::stats::prefix_t & base_prefix() const noexcept
Access to data source prefix for derived classes.
Definition pub.hpp:1076
Basic stuff for all implementations of dispatcher.
Definition pub.hpp:925
virtual void launch_work_threads(environment_t &env)=0
Start all working threads.
demands_counter_t m_demands_counter
Count of waiting demands.
Definition pub.hpp:1156
::asio::io_context & io_context() const noexcept override
Get reference to io_context from that dispatcher.
Definition pub.hpp:965
void agent_unbound() noexcept override
Notification about unbinding of an agent.
Definition pub.hpp:974
::so_5::disp::abstract_work_thread_factory_shptr_t thread_factory() const noexcept
Get access to thread factory to be used for that dispatcher.
Definition pub.hpp:1042
virtual void wait_work_threads() noexcept=0
Wait for finish of all threads.
const std::shared_ptr< ::asio::io_context > m_io_context
IO Service to work with.
Definition pub.hpp:1143
disp_binder_shptr_t binder_with_own_strand() override
Create a binder for that dispatcher.
Definition pub.hpp:957
void agent_bound() noexcept override
Notification about binding of yet another agent.
Definition pub.hpp:968
std::size_t thread_count() const noexcept
Get the count of work threads to be created.
Definition pub.hpp:1025
const ::so_5::disp::abstract_work_thread_factory_shptr_t m_thread_factory
Thread factory to be used with this dispatcher.
Definition pub.hpp:1150
virtual disp_data_source_t & data_source() noexcept=0
Get access to actual data source instance for that dispatcher.
disp_binder_shptr_t binder_with_external_strand(::asio::io_context::strand &strand) override
Create a binder for that dispatcher.
Definition pub.hpp:946
basic_dispatcher_skeleton_t(::so_5::environment_t &env, disp_params_t params)
Definition pub.hpp:931
void start(environment_t &env, std::string_view data_sources_name_base)
Definition pub.hpp:987
std::atomic< std::size_t > m_agents_bound
Count of agents bound to that dispatcher.
Definition pub.hpp:1153
demands_counter_t & demands_counter() noexcept override
Get a reference for counter of pending demands.
Definition pub.hpp:980
An implementation of a binder for asio_thread_pool dispatcher.
Definition pub.hpp:804
void push_evt_start(execution_demand_t demand) override
Definition pub.hpp:834
void push(execution_demand_t demand) override
Definition pub.hpp:815
void push_evt_finish(execution_demand_t demand) noexcept override
Definition pub.hpp:841
binder_with_external_strand_t(actual_dispatcher_shptr_t dispatcher, outliving_reference_t< ::asio::io_context::strand > strand)
Definition pub.hpp:864
outliving_reference_t< ::asio::io_context::strand > m_strand
Strand to be used with this event_queue.
Definition pub.hpp:876
binder_with_own_strand_t(actual_dispatcher_shptr_t dispatcher)
Definition pub.hpp:896
::asio::io_context::strand m_strand
Strand to be used with this event_queue.
Definition pub.hpp:907
static dispatcher_handle_t make(actual_dispatcher_shptr_t disp) noexcept
Definition pub.hpp:1469
Extension of basic dispatcher skeleton for the case when work thread activity must be collected.
Definition pub.hpp:1239
static void run_work_thread(environment_t &env, ::asio::io_context &io_svc, dispatcher_skeleton_with_thread_activity_tracking_t &self, std::size_t index)
Implementation of main function for a work thread.
Definition pub.hpp:1328
virtual disp_data_source_t & data_source() noexcept override
Get access to actual data source instance for that dispatcher.
Definition pub.hpp:1324
Extension of basic dispatcher skeleton for the case when work thread activity is not collected.
Definition pub.hpp:1191
virtual disp_data_source_t & data_source() noexcept
Get access to actual data source instance for that dispatcher.
Definition pub.hpp:1201
static void run_work_thread(environment_t &env, ::asio::io_context &io_svc, dispatcher_skeleton_without_thread_activity_tracking_t &, std::size_t)
Implementation of main function for a work thread.
Definition pub.hpp:1205
std::vector< thread_holder_t > m_threads
Working threads.
Definition pub.hpp:1397
dispatcher_template_t(outliving_reference_t< environment_t > env, std::string_view data_sources_name_base, disp_params_t params)
Definition pub.hpp:1373
thread_holder_t make_work_thread(environment_t &env, std::size_t index)
Definition pub.hpp:1442
virtual void launch_work_threads(environment_t &env) override
Definition pub.hpp:1400
A helper for declaration of static and thread_local pointer in a header file.
Definition pub.hpp:437
static thread_local T * m_ptr
Value of the pointer which need to be stored.
Definition pub.hpp:440
static T * ptr() noexcept
Access to the current value of the pointer.
Definition pub.hpp:445
static void set_ptr(T *p) noexcept
Setter for the pointer.
Definition pub.hpp:449
Type of collector of work thread activity data.
Definition pub.hpp:616
void activity_started() noexcept
Mark start point of new activity.
Definition pub.hpp:652
::so_5::stats::work_thread_activity_stats_t take_activity_stats() noexcept
Get the current stats.
Definition pub.hpp:670
void setup_thread_id(current_thread_id_t tid)
Setup ID of the current work thread.
Definition pub.hpp:634
void activity_finished() noexcept
Mark completion of the current activity.
Definition pub.hpp:661
current_thread_id_t thread_id() const noexcept
Get the ID of the thread.
Definition pub.hpp:646
current_thread_id_t m_thread_id
ID of thread for which activity stats is collected.
Definition pub.hpp:619
::so_5::stats::activity_tracking_stuff::stats_collector_t< ::so_5::stats::activity_tracking_stuff::internal_lock > m_work_activity
Collected activity stats.
Definition pub.hpp:624
Base type for implementations of work thread wrappers.
Definition pub.hpp:479
virtual void on_demand(execution_demand_t demand) noexcept=0
Actual processing of the demand.
current_thread_id_t m_thread_id
ID of the work thread.
Definition pub.hpp:485
current_thread_id_t thread_id() const noexcept
ID of the work thread.
Definition pub.hpp:505
static void run(environment_t &env, ::asio::io_context &io_svc, Args &&... args)
Lauch processing of demand on the context of current thread.
Definition pub.hpp:524
static void handle_demand(execution_demand_t demand)
An interface method for passing a demand to processing.
Definition pub.hpp:571
outliving_reference_t< work_thread_activity_collector_t > m_activity_stats
Activity statistics.
Definition pub.hpp:693
virtual void on_demand(execution_demand_t demand) noexcept override
Actual processing of the demand.
Definition pub.hpp:708
virtual void on_demand(execution_demand_t demand) noexcept override
Actual processing of the demand.
Definition pub.hpp:595
const int rc_io_context_is_not_set
Asio IoService is not set for asio_thread_pool dispatcher.
Definition pub.hpp:47
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t disp_params)
A function for creation an instance of asio_thread_pool dispatcher.
Definition pub.hpp:1562
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
Definition pub.hpp:1488
const int asio_thread_pool_errors
Starting point for errors of asio_thread_pool submodule.
Ranges for error codes of each submodules.
Definition details.hpp:13
Default traits of asio_thread_pool dispatcher.
Definition pub.hpp:1512