SObjectizer-5 Extra
Loading...
Searching...
No Matches
pub.hpp
Go to the documentation of this file.
1/*!
2 * \file
3 * \brief Implementation of Asio's One Thread dispatcher.
4 *
5 * \since
6 * v.1.4.1
7 */
8
9#pragma once
10
11#include <so_5_extra/error_ranges.hpp>
12
13#include <so_5/disp_binder.hpp>
14#include <so_5/send_functions.hpp>
15
16#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
17#include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
18#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
19#include <so_5/disp/reuse/work_thread_factory_params.hpp>
20
21#include <so_5/stats/repository.hpp>
22#include <so_5/stats/messages.hpp>
23#include <so_5/stats/std_names.hpp>
24#include <so_5/stats/impl/activity_tracking.hpp>
25
26#include <so_5/details/invoke_noexcept_code.hpp>
27#include <so_5/details/rollback_on_exception.hpp>
28#include <so_5/details/abort_on_fatal_error.hpp>
29
30#include <so_5/impl/thread_join_stuff.hpp>
31
32#include <so_5/outliving.hpp>
33
34#include <asio/io_context.hpp>
35#include <asio/post.hpp>
36
37namespace so_5 {
38
39namespace extra {
40
41namespace disp {
42
43namespace asio_one_thread {
44
45namespace errors {
46
47//! Asio IoService is not set for asio_thread_pool dispatcher.
50
51} /* namespace errors */
52
53/*!
54 * \brief An alias for shared-pointer to io_context object.
55 *
56 * \since v.1.4.1
57 */
59
60//
61// disp_params_t
62//
63/*!
64 * \brief Parameters for %asio_one_thread dispatcher.
65 */
69 {
74
75 public :
76 //! Default constructor.
77 disp_params_t() = default;
78
79 friend inline void
81 disp_params_t & a, disp_params_t & b ) noexcept
82 {
83 using std::swap;
84
85 swap(
87 static_cast< activity_tracking_mixin_t & >(b) );
88
89 swap(
90 static_cast< thread_factory_mixin_t & >(a),
91 static_cast< thread_factory_mixin_t & >(b) );
92
93 swap( a.m_io_context, b.m_io_context );
94 }
95
96 //! Use external Asio io_context object with dispatcher.
97 /*!
98 * Usage example:
99 * \code
100 * int main() {
101 * asio::io_context svc;
102 * so_5::launch( [&](so_5::environment_t & env) {
103 * namespace asio_ot = so_5::extra::disp::asio_one_thread;
104 * auto disp = asio_ot::create_private_disp(
105 * env, "asio_ot",
106 * asio_tp::disp_params_t{}.use_external_io_context(svc) );
107 * ...
108 * } );
109 * }
110 * \endcode
111 */
114 ::asio::io_context & service )
115 {
116 m_io_context = std::shared_ptr< ::asio::io_context >(
117 std::addressof( service ),
118 // Empty deleter.
119 [](::asio::io_context *) {} );
120 return *this;
121 }
122
123 //! Use external Asio io_context object with dispatcher.
124 /*!
125 * \note
126 * Ownership of this io_context object must be shared with
127 * others.
128 */
131 std::shared_ptr< ::asio::io_context > service )
132 {
133 m_io_context = std::move(service);
134 return *this;
135 }
136
137 //! Use own Asio io_context object.
138 /*!
139 * Note this object will be dynamically created at the start
140 * of the dispatcher. And will be destroyed with the dispatcher object.
141 *
142 * A created io_context can be accessed later via io_context() method.
143 */
146 {
147 m_io_context = std::make_shared< ::asio::io_context >();
148 return *this;
149 }
150
151 //! Get the io_context.
153 io_context() const noexcept
154 {
155 return m_io_context;
156 }
157
158 private :
159 //! Asio's io_context which must be used with this dispatcher.
161 };
162
163namespace impl {
164
165//
166// actual_disp_binder_t
167//
168/*!
169 * \brief An actual interace of disp_binder for asio_one_thread dispatcher.
170 *
171 * That binder should allow to get a reference to io_context objects.
172 *
173 * \since v.1.4.1
174 */
176 : public disp_binder_t
177 {
178 public :
179 [[nodiscard]]
180 virtual asio::io_context &
181 io_context() noexcept = 0;
182 };
183
184//
185// actual_disp_binder_shptr_t
186//
187/*!
188 * \brief An alias for shared-pointer to actual_disp_binder.
189 *
190 * \since v.1.4.1
191 */
194
196
197} /* namespace impl */
198
199//
200// dispatcher_handle_t
201//
202
203/*!
204 * \brief A handle for %asio_one_thread dispatcher.
205 *
206 * \since
207 * v.1.4.1
208 */
209class [[nodiscard]] dispatcher_handle_t
210 {
212
213 //! A reference to actual implementation of a dispatcher.
215
217 impl::actual_disp_binder_shptr_t binder ) noexcept
218 : m_binder{ std::move(binder) }
219 {}
220
221 //! Is this handle empty?
222 [[nodiscard]]
223 bool
224 empty() const noexcept { return !m_binder; }
225
226 public :
227 dispatcher_handle_t() noexcept = default;
228
229 //! Get a binder for that dispatcher.
230 /*!
231 * Usage example:
232 * \code
233 * using namespace so_5::extra::disp::asio_one_thread;
234 *
235 * asio::io_context io_ctx;
236 *
237 * so_5::environment_t & env = ...;
238 * auto disp = make_dispatcher( env, "my_disp", io_ctx );
239 *
240 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
241 * coop.make_agent_with_binder< some_agent_type >(disp.binder(), ...);
242 * ...
243 * } );
244 * \endcode
245 *
246 * \attention
247 * An attempt to call this method on empty handle is UB.
248 */
249 [[nodiscard]]
251 binder() const
252 {
253 return m_binder;
254 }
255
256 //! Get reference to io_context from that dispatcher.
257 /*!
258 * \attention
259 * An attempt to call this method on empty handle is UB.
260 */
261 [[nodiscard]]
262 ::asio::io_context &
263 io_context() noexcept
264 {
265 return m_binder->io_context();
266 }
267
268 //! Is this handle empty?
269 [[nodiscard]]
270 operator bool() const noexcept { return empty(); }
271
272 //! Does this handle contain a reference to dispatcher?
273 [[nodiscard]]
274 bool
275 operator!() const noexcept { return !empty(); }
276
277 //! Drop the content of handle.
278 void
279 reset() noexcept { m_binder.reset(); }
280 };
281
282namespace impl {
283
284//
285// demands_counter_t
286//
287/*!
288 * \brief Type of atomic counter for counting waiting demands.
289 *
290 * \since
291 * v.1.4.1
292 */
294
296
297/*!
298 * \brief A type of holder of data common to all worker thread
299 * implementations.
300 *
301 * \since v.1.4.1
302 */
304 {
305 //! Asio's context to be used.
307
308 //! Thread object.
310
311 //! ID of the work thread.
312 /*!
313 * \note Receives actual value only after successful start
314 * of the thread.
315 */
317
318 //! Counter of waiting demands.
320
322 io_context_shptr_t io_context,
323 ::so_5::disp::work_thread_holder_t thread_holder )
326 {}
327
328 [[nodiscard]]
330 io_context() const noexcept { return *(this->m_io_context); }
331
332 [[nodiscard]]
334 demands_counter() noexcept { return this->m_demands_counter; }
335 };
336
337/*!
338 * \brief Base class for implementation of worker thread without
339 * thread activity tracking.
340 *
341 * Methods work_started() and work_finished() are empty.
342 *
343 * \brief v.1.4.1
344 */
346 {
347 using base_type_t = common_data_t;
348
349 public :
351 io_context_shptr_t io_context,
352 ::so_5::disp::work_thread_holder_t thread_holder )
353 : base_type_t(
355 {}
356
357 using base_type_t::io_context;
358
359 using base_type_t::demands_counter;
360
361 protected :
362 void
363 work_started() { /* Nothing to do. */ }
364
365 void
366 work_finished() { /* Nothing to do. */ }
367 };
368
369/*!
370 * \brief Base class for implementation of worker thread with
371 * thread activity tracking.
372 *
373 * Methods work_started() and work_finished() perform actial activity
374 * tracking.
375 *
376 * This class also provides public method take_activity_stats() to
377 * retrive activity statistics.
378 *
379 * \brief v.1.4.1
380 */
382 {
383 using base_type_t = common_data_t;
384
385 public :
387 io_context_shptr_t io_context,
388 ::so_5::disp::work_thread_holder_t thread_holder )
389 : base_type_t(
391 {}
392
393 using base_type_t::io_context;
394
395 using base_type_t::demands_counter;
396
397 [[nodiscard]]
400 {
401 so_5::stats::work_thread_activity_stats_t result;
402
403 result.m_working_stats = m_working_stats.take_stats();
404
405 return result;
406 }
407
408 protected :
409 //! Statictics for work activity.
413
414 void
415 work_started() { m_working_stats.start(); }
416
417 void
418 work_finished() { m_working_stats.stop(); }
419 };
420
421} /* namespace work_thread_details */
422
423//
424// work_thread_template_t
425//
426/*!
427 * \brief An implementation of worker thread in form of the template class.
428 *
429 * Work_Thread parameter is expected to be
430 * work_thread_details::no_activity_tracking_impl_t or
431 * work_thread_details::with_activity_tracking_impl_t.
432 *
433 * This class is also play a role of event_queue. It's because there
434 * is no real event-queue to be controlled by this class. All
435 * demands are delegated to io_context object.
436 *
437 * \since v.1.4.1
438 */
439template< typename Work_Thread >
441 : public Work_Thread
442 , public event_queue_t
443 {
444 using base_type_t = Work_Thread;
445
446 //! SObjectizer Environment to work in.
447 /*!
448 * We have to store that reference to have an ability to
449 * log error messages in thread's body.
450 */
452
453 public :
454 //! Initializing constructor.
456 environment_t & env,
457 io_context_shptr_t io_context,
458 ::so_5::disp::work_thread_holder_t thread_holder )
459 : base_type_t(
461 , m_env( env )
462 {}
463
464 //! Starts a new thread.
465 void
467 {
469 [this]() { body(); } );
470 }
471
472 void
474 {
475 this->io_context().stop();
476 }
477
478 void
485
487 thread_id() const
488 {
489 return this->m_thread_id;
490 }
491
492 void
493 push( execution_demand_t demand ) override
494 {
495 // Demand count statistics should be updated.
496 ++(this->demands_counter());
497
498 // If posting a demand fails the count of demands
499 // should be decremented.
501 [&] {
502 asio::post( this->io_context(),
503 [d = std::move(demand), this]() mutable {
504 this->handle_demand( std::move(d) );
505 } );
506 },
507 [this] {
508 --this->demands_counter();
509 } );
510 }
511
512 void
513 push_evt_start( execution_demand_t demand ) override
514 {
515 // Just delegate to the ordinary push().
516 this->push( std::move(demand) );
517 }
518
519 void
520 push_evt_finish( execution_demand_t demand ) noexcept override
521 {
522 // Just delegate to the ordinary push() despite
523 // the fact that push() isn't noexcept.
524 this->push( std::move(demand) );
525 }
526
527 private :
528 void
530 {
532
533 // We don't expect any errors here.
534 // But if something happens then there is no way to
535 // recover and the whole application should be aborted.
536 try
537 {
538 // Prevent return from io_context::run() if there is no
539 // more Asio's events.
540 auto work = ::asio::make_work_guard( this->io_context() );
541 this->io_context().run();
542 }
543 catch( const std::exception & x )
544 {
547 log_stream << "An exception caught in work thread "
548 "of so_5::extra::disp::asio_one_thread dispatcher."
549 " Exception: "
550 << x.what() << std::endl;
551 }
552 } );
553 }
554 catch( ... )
555 {
558 log_stream << "An unknown exception caught in work thread "
559 "of so_5::extra::disp::asio_one_thread dispatcher."
560 << std::endl;
561 }
562 } );
563 }
564 }
565
566 void
567 handle_demand( execution_demand_t demand ) noexcept
568 {
569 // Demand count statistics should be updated.
570 --(this->demands_counter());
571
572 this->work_started();
574 [this] { this->work_finished(); } );
575
577 }
578 };
579
580//
581// work_thread_no_activity_tracking_t
582//
583using work_thread_no_activity_tracking_t =
586
587inline void
589 const so_5::mbox_t &,
590 const so_5::stats::prefix_t &,
591 work_thread_no_activity_tracking_t & )
592 {
593 /* Nothing to do */
594 }
595
596//
597// work_thread_with_activity_tracking_t
598//
599using work_thread_with_activity_tracking_t =
602
603inline void
604send_thread_activity_stats(
605 const so_5::mbox_t & mbox,
606 const so_5::stats::prefix_t & prefix,
607 work_thread_with_activity_tracking_t & wt )
608 {
609 so_5::send< so_5::stats::messages::work_thread_activity >(
610 mbox,
611 prefix,
612 so_5::stats::suffixes::work_thread_activity(),
613 wt.thread_id(),
614 wt.take_activity_stats() );
615 }
616
617//
618// dispatcher_template_t
619//
620/*!
621 * \brief An implementation of the dispatcher in the form of template class.
622 *
623 * This dispatcher launches worker thread in the constructor and
624 * stops and joins it in the destructor.
625 *
626 * \since v.1.4.1
627 */
628template< typename Work_Thread >
630 {
631 friend class disp_data_source_t;
632
633 public:
653
655 {
658 }
659
660 void
662 agent_t & /*agent*/ ) override
663 {
664 // Nothing to do.
665 }
666
667 void
669 agent_t & /*agent*/ ) noexcept override
670 {
671 // Nothing to do.
672 }
673
674 void
676 agent_t & agent ) noexcept override
677 {
680 }
681
682 void
684 agent_t & /*agent*/ ) noexcept override
685 {
687 }
688
689 [[nodiscard]]
692 {
693 return m_work_thread.io_context();
694 }
695
696 private:
697
698 /*!
699 * \brief Data source for run-time monitoring of whole dispatcher.
700 *
701 * \since
702 * v.1.4.1
703 */
705 {
706 //! Dispatcher to work with.
708
709 //! Basic prefix for data sources.
711
712 public :
723
724 void
746
747 private:
748 };
749
750 //! Working thread for the dispatcher.
751 Work_Thread m_work_thread;
752
753 //! Data source for run-time monitoring.
756
757 //! Count of agents bound to this dispatcher.
759 };
760
761//
762// dispatcher_handle_maker_t
763//
764/*!
765 * \brief A factory class for creation of dispatcher_handle instances.
766 *
767 * \since v.1.4.1
768 */
770 {
771 public :
772 [[nodiscard]]
774 make( actual_disp_binder_shptr_t binder ) noexcept
775 {
776 return { std::move( binder ) };
777 }
778 };
779
780//
781// create_dispatcher
782//
783/*!
784 * \brief The actual implementation of dispatcher creation procedure.
785 *
786 * \tparam Traits Type of traits to be used for a new dispatcher.
787 * \tparam Thread_Init_Args Types of arguments to be passed as additional
788 * parameters to the constructor of Traits::thread_type.
789 *
790 * \since v.1.4.1
791 */
792template<
793 typename Traits,
794 typename... Thread_Init_Args >
795[[nodiscard]]
798 //! SObjectizer environment to work in.
799 environment_t & env,
800 //! Short name for this instance to be used in thread activity stats.
801 //! Can be empty string. In this case name will be generated automatically.
802 const std::string_view data_sources_name_base,
803 //! Parameters for this dispatcher instance.
804 disp_params_t params,
805 //! Parameters for initialization of a custom thread.
806 Thread_Init_Args && ...thread_init_args )
807 {
808 using namespace so_5::disp::reuse;
809
810 const auto io_svc_ptr = params.io_context();
811 if( !io_svc_ptr )
814 "io_context is not set in disp_params" );
815
819
823
827 // Type of result pointer.
829 // Actual type of dispatcher without thread activity tracking.
831 // Actual type of dispatcher with thread activity tracking.
835 std::move(params),
837
839 }
840
841} /* namespace impl */
842
843//
844// default_traits_t
845//
846/*!
847 * \brief Default traits of %asio_one_thread dispatcher.
848 *
849 * \note
850 * This type is empty in v.1.5.0.
851 * It is left empty intentionally to have a possibility to extend it later, in
852 * some future version.
853 *
854 * \since
855 * v.1.4.1
856 */
858 {
859 };
860
861//
862// make_dispatcher
863//
864/*!
865 * \brief A function for creation an instance of %asio_one_thread dispatcher.
866 *
867 * Usage examples:
868 * \code
869 * // Dispatcher which uses own Asio IoContext and default traits.
870 * namespace asio_disp = so_5::extra::disp::asio_one_thread;
871 * asio_disp::disp_params_t params;
872 * params.use_own_io_context(); // Asio IoContext object will be created here.
873 * // This object will be accessible later via
874 * // dispatcher_handle_t::io_context() method.
875 * auto disp = asio_disp::make_dispatcher(
876 * env,
877 * "my_asio_disp",
878 * std::move(disp_params) );
879 * \endcode
880 *
881 * \code
882 * // Dispatcher which uses external Asio IoContext and default traits.
883 * asio::io_context & io_svc = ...;
884 * namespace asio_disp = so_5::extra::disp::asio_one_thread;
885 * asio_disp::disp_params_t params;
886 * params.use_external_io_context( io_svc );
887 * auto disp = asio_disp::make_dispatcher(
888 * env,
889 * "my_asio_disp",
890 * std::move(disp_params) );
891 * \endcode
892 *
893 * \par Requirements for traits type
894 * The Traits-type is empty in v.1.5.0. There was a possibitily to specify
895 * a custom thread type in previous versions of so_5_extra, but since v.1.5.0
896 * custom threads are supported via standard SObjectizer's mechanism based
897 * on `abstract_work_thread_t`/`abstract_work_thread_factory_t` interfaces.
898 * But the Traits-type might be extended by some content in future versions.
899 *
900 * \tparam Traits Type with traits for a dispatcher. For the requirements
901 * for \a Traits type see the section "Requirements for traits type" above.
902 *
903 * \since
904 * v.1.4.1
905 */
906template< typename Traits = default_traits_t >
909 //! SObjectizer environment to work in.
910 environment_t & env,
911 //! Short name for this instance to be used in thread activity stats.
912 //! Can be empty string. In this case name will be generated automatically.
913 const std::string_view data_sources_name_base,
914 //! Parameters for this dispatcher instance.
915 disp_params_t params )
916 {
917 return impl::create_dispatcher< Traits >(
918 env,
920 std::move(params) );
921 }
922
923} /* namespace asio_one_thread */
924
925} /* namespace disp */
926
927} /* namespace extra */
928
929} /* namespace so_5 */
Parameters for asio_one_thread dispatcher.
Definition pub.hpp:69
disp_params_t & use_external_io_context(std::shared_ptr< ::asio::io_context > service)
Use external Asio io_context object with dispatcher.
Definition pub.hpp:130
disp_params_t & use_own_io_context()
Use own Asio io_context object.
Definition pub.hpp:145
std::shared_ptr< ::asio::io_context > m_io_context
Asio's io_context which must be used with this dispatcher.
Definition pub.hpp:160
disp_params_t & use_external_io_context(::asio::io_context &service)
Use external Asio io_context object with dispatcher.
Definition pub.hpp:113
disp_params_t()=default
Default constructor.
std::shared_ptr< ::asio::io_context > io_context() const noexcept
Get the io_context.
Definition pub.hpp:153
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
Definition pub.hpp:80
A handle for asio_one_thread dispatcher.
Definition pub.hpp:210
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
Definition pub.hpp:275
void reset() noexcept
Drop the content of handle.
Definition pub.hpp:279
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.
Definition pub.hpp:251
operator bool() const noexcept
Is this handle empty?
Definition pub.hpp:270
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
Definition pub.hpp:263
dispatcher_handle_t(impl::actual_disp_binder_shptr_t binder) noexcept
Definition pub.hpp:216
impl::actual_disp_binder_shptr_t m_binder
A reference to actual implementation of a dispatcher.
Definition pub.hpp:214
bool empty() const noexcept
Is this handle empty?
Definition pub.hpp:224
An actual interace of disp_binder for asio_one_thread dispatcher.
Definition pub.hpp:177
virtual asio::io_context & io_context() noexcept=0
A factory class for creation of dispatcher_handle instances.
Definition pub.hpp:770
static dispatcher_handle_t make(actual_disp_binder_shptr_t binder) noexcept
Definition pub.hpp:774
Data source for run-time monitoring of whole dispatcher.
Definition pub.hpp:705
Work_Thread m_work_thread
Working thread for the dispatcher.
Definition pub.hpp:751
std::atomic< std::size_t > m_agents_bound
Count of agents bound to this dispatcher.
Definition pub.hpp:758
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
Definition pub.hpp:634
Base class for implementation of worker thread without thread activity tracking.
Definition pub.hpp:346
no_activity_tracking_impl_t(io_context_shptr_t io_context, ::so_5::disp::work_thread_holder_t thread_holder)
Definition pub.hpp:350
Base class for implementation of worker thread with thread activity tracking.
Definition pub.hpp:382
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::internal_lock > m_working_stats
Statictics for work activity.
Definition pub.hpp:412
with_activity_tracking_impl_t(io_context_shptr_t io_context, ::so_5::disp::work_thread_holder_t thread_holder)
Definition pub.hpp:386
An implementation of worker thread in form of the template class.
Definition pub.hpp:443
void push(execution_demand_t demand) override
Definition pub.hpp:493
void handle_demand(execution_demand_t demand) noexcept
Definition pub.hpp:567
void push_evt_start(execution_demand_t demand) override
Definition pub.hpp:513
void push_evt_finish(execution_demand_t demand) noexcept override
Definition pub.hpp:520
work_thread_template_t(environment_t &env, io_context_shptr_t io_context, ::so_5::disp::work_thread_holder_t thread_holder)
Initializing constructor.
Definition pub.hpp:455
environment_t & m_env
SObjectizer Environment to work in.
Definition pub.hpp:451
const int rc_io_context_is_not_set
Asio IoService is not set for asio_thread_pool dispatcher.
Definition pub.hpp:48
void send_thread_activity_stats(const so_5::mbox_t &, const so_5::stats::prefix_t &, work_thread_no_activity_tracking_t &)
Definition pub.hpp:588
dispatcher_handle_t create_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params, Thread_Init_Args &&...thread_init_args)
The actual implementation of dispatcher creation procedure.
Definition pub.hpp:797
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
A function for creation an instance of asio_one_thread dispatcher.
Definition pub.hpp:908
const int asio_one_thread_errors
Starting point for errors of asio_one_thread submodule.
Ranges for error codes of each submodules.
Definition details.hpp:13
Default traits of asio_one_thread dispatcher.
Definition pub.hpp:858
A type of holder of data common to all worker thread implementations.
Definition pub.hpp:304
common_data_t(io_context_shptr_t io_context, ::so_5::disp::work_thread_holder_t thread_holder)
Definition pub.hpp:321
so_5::current_thread_id_t m_thread_id
ID of the work thread.
Definition pub.hpp:316
demands_counter_t m_demands_counter
Counter of waiting demands.
Definition pub.hpp:319
::so_5::disp::work_thread_holder_t m_thread_holder
Thread object.
Definition pub.hpp:309
io_context_shptr_t m_io_context
Asio's context to be used.
Definition pub.hpp:306