2
3
4
5
6
7
11#include <so_5_extra/error_ranges.hpp>
13#include <so_5/disp_binder.hpp>
14#include <so_5/send_functions.hpp>
16#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
17#include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
18#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
19#include <so_5/disp/reuse/work_thread_factory_params.hpp>
21#include <so_5/stats/repository.hpp>
22#include <so_5/stats/messages.hpp>
23#include <so_5/stats/std_names.hpp>
24#include <so_5/stats/impl/activity_tracking.hpp>
26#include <so_5/details/invoke_noexcept_code.hpp>
27#include <so_5/details/rollback_on_exception.hpp>
28#include <so_5/details/abort_on_fatal_error.hpp>
30#include <so_5/outliving.hpp>
32#include <asio/io_context.hpp>
33#include <asio/io_context_strand.hpp>
34#include <asio/post.hpp>
56
57
58
59
60
88 swap( a.m_thread_count, b.m_thread_count );
89 swap( a.m_io_context, b.m_io_context );
96 m_thread_count = count;
104 return m_thread_count;
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
126 ::asio::io_context & service )
128 m_io_context = std::shared_ptr< ::asio::io_context >(
129 std::addressof( service ),
131 [](::asio::io_context *) {} );
137
138
139
140
143 std::shared_ptr< ::asio::io_context > service )
145 m_io_context = std::move(service);
151
152
153
154
155
159 m_io_context = std::make_shared< ::asio::io_context >();
173
174
189
190
191
192
193
194
195
196
237
238
239
240
241
250 impl::basic_dispatcher_iface_shptr_t dispatcher )
noexcept
257 empty()
const noexcept {
return !m_dispatcher; }
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
300 return m_dispatcher->binder_with_external_strand( strand );
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
340 return m_dispatcher->binder_with_own_strand();
345
346
347
352 return m_dispatcher->io_context();
366 reset()
noexcept { m_dispatcher.reset(); }
375
376
377
378
379
386
387
388
389
390
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
483
484
498
499
513
514
515
516
517
518
519
520
521
522 template<
typename Derived,
typename... Args >
528 ::asio::io_context & io_svc,
550 log_stream <<
"An exception caught in work thread "
551 "of so_5::extra::disp::asio_thread_pool dispatcher."
561 log_stream <<
"An unknown exception caught in work thread "
562 "of so_5::extra::disp::asio_thread_pool dispatcher."
573 ptr()->on_demand( std::move(demand) );
581
582
583
584
585
586
595 on_demand( execution_demand_t demand )
noexcept override
597 demand.call_handler( thread_id() );
605
606
607
608
609
610
611
612
613
614
628
629
630
631
632
636 m_thread_id = std::move(tid);
640
641
642
643
644
649
650
654 m_work_activity.start();
658
659
663 m_work_activity.stop();
667
668
672 ::so_5::stats::work_thread_activity_stats_t result;
673 result.m_working_stats = m_work_activity.take_stats();
683
684
685
686
687
688
697 outliving_reference_t< work_thread_activity_collector_t > activity_stats )
708 on_demand( execution_demand_t demand )
noexcept override
710 m_activity_stats.get().activity_started();
712 demand.call_handler( thread_id() );
714 m_activity_stats.get().activity_finished();
722
723
724
725
726
727
736 actual_dispatcher_shptr_t dispatcher )
756 agent_t & agent )
noexcept override
759 m_dispatcher->agent_bound();
761 agent.so_bind_to_dispatcher( *
this );
769 m_dispatcher->agent_bound();
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801template<
typename Derived >
808 return static_cast< Derived & >( *
this );
815 push( execution_demand_t demand )
override
853
854
855
856
857
858class binder_with_external_strand_t
final
859 :
public binder_template_t< binder_with_external_strand_t >
865 actual_dispatcher_shptr_t dispatcher,
866 outliving_reference_t< ::asio::io_context::strand > strand )
872 strand()
noexcept {
return m_strand.get(); }
883
884
885
886
887
888
889
890class binder_with_own_strand_t
final
891 :
public binder_template_t< binder_with_own_strand_t >
897 actual_dispatcher_shptr_t dispatcher )
914
915
916
917
918
919
920
921
922
923
932 ::so_5::environment_t & env,
949 return { std::make_shared< binder_with_external_strand_t >(
951 outliving_mutable(strand) )
959 return { std::make_shared< binder_with_own_strand_t >(
965 io_context()
const noexcept override {
return *m_io_context; }
982 return m_demands_counter;
989 std::string_view data_sources_name_base )
991 data_source().set_data_sources_name_base( data_sources_name_base );
992 data_source().start( env.stats_repository() );
994 ::so_5::details::do_with_rollback_on_exception(
995 [&] { launch_work_threads(env); },
996 [
this] { data_source().stop(); } );
1002 ::so_5::details::invoke_noexcept_code( [
this] {
1004 m_io_context->stop();
1011 ::so_5::details::invoke_noexcept_code( [
this] {
1013 wait_work_threads();
1015 data_source().stop();
1021
1022
1028
1029
1030
1036
1037
1038
1039
1044#if defined(__clang__
)
1045#pragma clang diagnostic push
1046#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1050
1051
1052
1053
1054
1066
1067
1068
1069
1070
1086 const auto agents_count = m_dispatcher.m_agents_bound.load(
1087 std::memory_order_acquire );
1089 const auto demands_count = m_dispatcher.m_demands_counter.load(
1090 std::memory_order_acquire );
1092 send< ::so_5::stats::messages::quantity< std::size_t > >(
1095 ::so_5::stats::suffixes::agent_count(),
1101 send< ::so_5::stats::messages::quantity< std::size_t > >(
1104 ::so_5::stats::suffixes::work_thread_queue_size(),
1110 std::string_view name_base )
1112 using namespace ::so_5::disp::reuse;
1114 m_base_prefix = make_disp_prefix(
1121 start( ::so_5::stats::repository_t & repo )
1124 m_stats_repo = &repo;
1130 m_stats_repo->remove( *
this );
1134#if defined(__clang__
)
1135#pragma clang diagnostic pop
1146
1147
1148
1149
1162 environment_t & env ) = 0;
1166
1167
1168
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1194 environment_t & env,
1206 environment_t & env,
1207 ::asio::io_context & io_svc,
1211 work_thread_t::run< work_thread_without_activity_tracking_t >(
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1240#if defined(__clang__
)
1241#pragma clang diagnostic push
1242#pragma clang diagnostic ignored "-Wnon-virtual-dtor"
1245
1246
1247
1248
1249
1250
1263 std::size_t thread_count )
1267 for(
auto & c : m_collectors )
1268 c = std::make_unique< work_thread_activity_collector_t >();
1274 disp_data_source_t::distribute( mbox );
1276 for( std::size_t i = 0; i != m_collectors.size(); ++i )
1277 distribute_stats_for_work_thread_at( mbox, i );
1281
1282
1286 return *(m_collectors[index]);
1292 const mbox_t & mbox,
1295 std::ostringstream ss;
1296 ss << base_prefix().c_str() <<
"/wt-" << index;
1298 const ::so_5::stats::prefix_t prefix{ ss.str() };
1299 auto & collector = collector_at( index );
1301 so_5::send< ::so_5::stats::messages::work_thread_activity >(
1304 ::so_5::stats::suffixes::work_thread_activity(),
1305 collector.thread_id(),
1306 collector.take_activity_stats() );
1310#if defined(__clang__
)
1311#pragma clang diagnostic pop
1316 environment_t & env,
1330 environment_t & env,
1332 ::asio::io_context & io_svc,
1338 work_thread_t::run< work_thread_with_activity_tracking_t >(
1342 self.m_actual_data_source.collector_at(index) ) );
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1369 typename Basic_Skeleton >
1401 environment_t & env )
override
1403 using namespace std;
1469 make( actual_dispatcher_shptr_t disp )
noexcept
1471 return { std::move( disp ) };
1481
1482
1483
1484
1485
1486
1490 auto c = std::thread::hardware_concurrency();
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1564 environment_t & env,
1567 const std::string_view data_sources_name_base,
1575 "io_context is not set in disp_params" );
A handle for asio_thread_pool dispatcher.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
void reset() noexcept
Drop the content of handle.
operator bool() const noexcept
Is this handle empty?
dispatcher_handle_t() noexcept=default
::asio::io_context & io_context() noexcept
Get reference to io_context from that dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
bool empty() const noexcept
Is this handle empty?
disp_binder_shptr_t binder(::asio::io_context::strand &strand) const
Get a binder for that dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher.
binder_with_external_strand_t(actual_dispatcher_shptr_t dispatcher, outliving_reference_t< ::asio::io_context::strand > strand)
outliving_reference_t< ::asio::io_context::strand > m_strand
Strand to be used with this event_queue.
::asio::io_context::strand & strand() noexcept
binder_with_own_strand_t(actual_dispatcher_shptr_t dispatcher)
::asio::io_context::strand m_strand
Strand to be used with this event_queue.
::asio::io_context::strand & strand() noexcept
static dispatcher_handle_t make(actual_dispatcher_shptr_t disp) noexcept
Ranges for error codes of each submodules.