2
3
6
7
8
9
10
11
15#include <so_5/environment_infrastructure.hpp>
17#include <so_5/impl/coop_repository_basis.hpp>
18#include <so_5/impl/mbox_iface_for_timers.hpp>
20#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
22#include <so_5/stats/impl/activity_tracking.hpp>
23#include <so_5/stats/impl/st_env_stuff.hpp>
24#include <so_5/stats/controller.hpp>
25#include <so_5/stats/repository.hpp>
26#include <so_5/stats/prefix.hpp>
27#include <so_5/stats/messages.hpp>
28#include <so_5/stats/std_names.hpp>
30#include <so_5/send_functions.hpp>
31#include <so_5/env_infrastructures.hpp>
33#include <so_5/timers.hpp>
47
48
49
50
67
68
69
70
71
72struct fake_activity_tracker_t
final
86
87
88
89
90
91
92
93
94
95
96class real_activity_tracker_t
final
130 const current_thread_id_t &,
131 fake_activity_tracker_t & )
138 const so_5::mbox_t & mbox,
140 const current_thread_id_t & thread_id,
141 real_activity_tracker_t & activity_tracker )
155
156
157
158
159
160class coop_repo_t
final
169 coop_listener_unique_ptr_t coop_listener )
180 std::lock_guard< std::mutex > l{
m_lock };
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204template<
typename Event_Queue_Type >
249 agent_t & agent )
noexcept override
274
275
276
277
278
279
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
318 typename Event_Queue_Type,
319 typename Activity_Tracker,
320 typename Data_Source_Name_Parts >
330 std::move(event_queue) }
333 outliving_mutable( *
this )
347
348
349
350
351 class disp_data_source_t
final :
public stats::
source_t
368 Data_Source_Name_Parts::disp_type_part(),
382 const auto evt_queue_stats =
388 evt_queue_stats.m_demands_count );
390 send_thread_activity_stats(
410
411
412
413
414class actual_elapsed_timers_collector_t
final
425 std::type_index type_index,
427 message_ref_t message )
435 using demands_container_t = std::deque<
demand_t >;
443 std::type_index type_index,
445 message_ref_t msg )
override
448 std::move(type_index),
454
455
463
464
481 demands_container_t demands;
491
492
493
494
495
496
497
498
499
500
501
502class direct_delivery_elapsed_timers_collector_t
final
508 std::type_index type_index,
510 message_ref_t msg )
override
521
522
523
524
525
526
527
528
529
530template<
typename Lock_Holder >
531class stats_controller_t
final
535 ,
protected Lock_Holder
541 mbox_t distribution_mbox,
543 mbox_t next_turn_mbox )
549 virtual const mbox_t &
558 this->lock_and_perform( [&]{
576 this->lock_and_perform( [&] {
581 virtual std::chrono::steady_clock::duration
583 std::chrono::steady_clock::duration period )
override
585 return this->lock_and_perform( [&] {
598 this->lock_and_perform( [&] {
606 this->lock_and_perform( [&] {
615 this->lock_and_perform( [&] {
648
649
657
658
662
663
664
673
674
676 static std::chrono::steady_clock::duration
679 return std::chrono::milliseconds{1};
683 std::chrono::steady_clock::duration
686 auto started_at = std::chrono::steady_clock::now();
702 return std::chrono::steady_clock::now() - started_at;
708 std::chrono::steady_clock::duration pause,
714 outliving_mutable( *
this ),
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Type of smart handle for a cooperation.
A special type that plays role of unique_ptr for coop.
A class to be used as mixin with actual std::mutex instance inside.
Interface for dispatcher binders.
Default implementation of multithreaded environment infrastructure.
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
stats::repository_t & stats_repository() noexcept override
Get stats repository for the environment.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
Initiate a delayed message.
reusable::actual_elapsed_timers_collector_t m_timers_collector
A collector for elapsed timers.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
Initiate a timer (delayed or periodic message).
shutdown_status_t m_shutdown_status
Status of shutdown procedure.
void launch(env_init_t init_fn) override
Do actual launch of SObjectizer's Environment.
coop_handle_t register_coop(coop_unique_holder_t coop) override
Register new cooperation.
void stop() noexcept override
Initiate a signal for shutdown of Environment.
so_5::impl::final_dereg_chain_holder_t m_final_dereg_chain
The chain of coops for the final deregistration.
void perform_shutdown_related_actions_if_needed(std::unique_lock< std::mutex > &acquired_lock) noexcept
main_thread_sync_objects_t m_sync_objects
All sync objects to be shared between different parts.
void run_user_supplied_init_and_do_main_loop(env_init_t init_fn)
void try_handle_next_demand(std::unique_lock< std::mutex > &acquired_lock) noexcept
timer_thread_stats_t query_timer_thread_stats() override
Query run-time statistics for timer (thread or manager).
void ready_to_deregister_notify(coop_shptr_t coop) noexcept override
void process_final_deregs_if_any(std::unique_lock< std::mutex > &acquired_lock) noexcept
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
Query run-time statistics for cooperation repository.
void run_main_loop() noexcept
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
Create an instance of a new coop.
void run_default_dispatcher_and_go_further(env_init_t init_fn)
bool final_deregister_coop(coop_shptr_t coop) noexcept override
Do final actions of the cooperation deregistration.
timer_manager_unique_ptr_t m_timer_manager
A timer manager to be used.
stats_controller_t m_stats_controller
Stats controller for this environment.
event_queue_impl_t m_event_queue
Queue for execution_demands which must be handled on the main thread.
env_infrastructure_t(environment_t &env, timer_manager_factory_t timer_factory, error_logger_shptr_t error_logger, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
disp_binder_shptr_t make_default_disp_binder() override
Create a binder for the default dispatcher.
void handle_expired_timers_if_any(std::unique_lock< std::mutex > &acquired_lock) noexcept
coop_repo_t m_coop_repo
Repository of registered coops.
stats::controller_t & stats_controller() noexcept override
Get stats controller for the environment.
Activity_Tracker m_activity_tracker
Actual activity tracker for main working thread.
pop_result_t
Type for result of extraction operation.
void push(execution_demand_t demand) override
event_queue_impl_t(main_thread_sync_objects_t &sync_objects)
void push_evt_start(execution_demand_t demand) override
pop_result_t pop(execution_demand_t &receiver) noexcept
stats_t query_stats() const
std::deque< execution_demand_t > m_demands
main_thread_sync_objects_t & m_sync_objects
void push_evt_finish(execution_demand_t demand) noexcept override
Parameters for simple thread-safe single-thread environment.
const timer_manager_factory_t & timer_manager() const
Getter for timer_manager factory.
bool empty() const noexcept
demands_container_t m_demands
Collected demands.
virtual void accept(std::type_index type_index, mbox_t mbox, message_ref_t msg) override
Accept and store info about elapsed timer.
bool has_live_coop()
Is there any live coop?
coop_repo_t(outliving_reference_t< environment_t > env, coop_listener_unique_ptr_t coop_listener)
Initializing constructor.
A basic part of implementation of dispatcher to be used in places where default dispatcher is needed.
std::atomic< std::size_t > m_agents_bound
Counter of agents bound to that dispatcher.
outliving_reference_t< Event_Queue_Type > m_event_queue
Event queue for that dispatcher.
default_dispatcher_basis_t(outliving_reference_t< Event_Queue_Type > event_queue)
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
Event_Queue_Type & event_queue() const noexcept
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
std::size_t agents_bound() const noexcept
void handle_demand(execution_demand_t &demand)
current_thread_id_t thread_id() const noexcept
current_thread_id_t m_thread_id
ID of the main thread.
void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
outliving_reference_t< default_dispatcher_t > m_dispatcher
Dispatcher to work with.
stats::prefix_t m_base_prefix
Basic prefix for data sources.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(outliving_reference_t< default_dispatcher_t > disp)
An implementation of dispatcher to be used in places where default dispatcher is needed.
outliving_reference_t< Activity_Tracker > m_activity_tracker
Activity tracker.
Activity_Tracker & activity_tracker() noexcept
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for speading run-time stats.
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< Event_Queue_Type > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
virtual void accept(std::type_index type_index, mbox_t mbox, message_ref_t msg) override
Accept and store info about elapsed timer.
stats::work_thread_activity_stats_t take_activity_stats()
stats::activity_tracking_stuff::stats_collector_t< stats::activity_tracking_stuff::null_lock > m_waiting
void wait_start_if_not_started()
stats::activity_tracking_stuff::stats_collector_t< stats::activity_tracking_stuff::null_lock > m_working
static std::chrono::steady_clock::duration very_small_timeout()
const mbox_t m_next_turn_mbox
Mbox for delayed messages for initiation of next turn.
const mbox_t m_distribution_mbox
Mbox for sending messages with run-time statistics.
status_t m_status
Current status of stats_controller.
virtual void add(stats::source_t &what) override
Registration of new data source.
virtual void turn_on() override
Turn the monitoring on.
virtual std::chrono::steady_clock::duration set_distribution_period(std::chrono::steady_clock::duration period) override
Set distribution period.
std::mutex m_lock
Object's lock.
status_t
Status of stats_controller.
void send_next_message(std::chrono::steady_clock::duration pause, const int run_id)
Helper method for sending next instance of next_turn message.
virtual const mbox_t & mbox() const override
Get the mbox for receiving monitoring information.
stats_controller_t(mbox_t distribution_mbox, mbox_t next_turn_mbox)
Initializing constructor.
virtual void on_next_turn(int run_id) override
std::chrono::steady_clock::duration distribute_current_data()
Actual distribution of the current statistics.
virtual void turn_off() override
Turn the monitoring off.
int m_run_id
ID of stats distribution.
virtual void remove(stats::source_t &what) noexcept override
Deregistration of previously registered data source.
std::chrono::steady_clock::duration m_distribution_period
stats::source_t * m_tail
Tail of data sources list.
stats::source_t * m_head
Head of data sources list.
An interface for environment_infrastructure entity.
static environment_infrastructure_deleter_fnptr_t default_deleter()
Default deleter for environment_infrastructure object.
Parameters for the SObjectizer Environment initialization.
coop_listener_unique_ptr_t so5_giveout_coop_listener()
Get cooperation listener.
work_thread_activity_tracking_t work_thread_activity_tracking() const
Get activity tracking flag for the whole SObjectizer Environment.
const error_logger_shptr_t & so5_error_logger() const
Get error logger for the environment.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
An interface of event queue for agent.
A basic part for various implementations of coop_repository.
std::size_t m_total_coops
Total count of coops.
std::mutex m_lock
Lock for coop repository.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder)
Create an instance of a new coop.
void deregister_all_coop() noexcept
Deregisted all cooperations.
coop_repository_basis_t(outliving_reference_t< environment_t > environment, coop_listener_unique_ptr_t coop_listener)
final_deregistration_result_t final_deregister_coop(coop_shptr_t coop) noexcept
Do final actions of the cooperation deregistration.
std::size_t m_registrations_in_progress
Count of coops those are in registration now.
environment_infrastructure_t::coop_repository_stats_t query_stats()
Get the current statistic for run-time monitoring.
coop_handle_t register_coop(coop_unique_holder_t agent_coop)
Register cooperation.
Helper class for holding the current chain of coops for the final deregistration.
std::size_t size() const noexcept
void append(coop_shptr_t coop) noexcept
bool empty() const noexcept
coop_shptr_t giveout_current_chain() noexcept
Helper class for accessing protected members from mbox interface.
mbox_iface_for_timers_t(const mbox_t &mb)
void deliver_message_from_timer(const std::type_index &msg_type, const message_ref_t &message)
intrusive_ptr_t(intrusive_ptr_t &&o) noexcept
Move constructor.
Helper class for indication of long-lived reference via its type.
Helper for collecting activity stats.
so_5::stats::activity_stats_t take_stats()
void start_if_not_started()
A helper method for safe start if start method hasn't been called yet.
A holder for data-souce that should be automatically registered and deregistered in registry.
A public interface for control SObjectizer monitoring options.
static std::chrono::steady_clock::duration default_distribution_period()
Default distribution period.
An interface for initiation of next turn in stats distribution.
static mbox_t make(environment_t &env)
Helper for simplify creation of that mboxes of that type.
A type for storing prefix of data_source name.
An interface of data sources repository.
static source_t * source_list_next(const source_t &what) noexcept
Helper method for accessing next data source in the list.
static void source_list_add(source_t &what, source_t *&head, source_t *&tail) noexcept
Helper method for adding data source to existing list.
static void source_list_remove(source_t &what, source_t *&head, source_t *&tail) noexcept
Helper method for removing data source from existing list.
An interface of data source.
virtual void distribute(const mbox_t &)=0
Send appropriate notification about the current value.
An indentificator for the timer.
An interface for collector of elapsed timers.
virtual void schedule_anonymous(const std::type_index &type_index, const mbox_t &mbox, const message_ref_t &msg, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period)=0
Push anonymous delayed/periodic message to the timer queue.
virtual std::chrono::steady_clock::duration timeout_before_nearest_timer(std::chrono::steady_clock::duration default_timer)=0
Calculate time before the nearest timer (if any).
virtual void process_expired_timers()=0
Translation of expired timers into message sends.
virtual timer_id_t schedule(const std::type_index &type_index, const mbox_t &mbox, const message_ref_t &msg, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period)=0
Push delayed/periodic message to the timer queue.
virtual timer_thread_stats_t query_stats()=0
Get statistics for run-time monitoring.
Some reusable and low-level classes/functions which can be used in public header files.
Reusable components for dispatchers.
auto unlock_do_and_lock_again(std::unique_lock< std::mutex > &acquired_lock, Action &&action) -> decltype(action())
main_thread_status_t
A short name for namespace with run-time stats stuff.
void wakeup_if_waiting(main_thread_sync_objects_t &sync_objects)
Simple single-threaded environment infrastructure with thread safety.
SO_5_FUNC environment_infrastructure_factory_t factory(params_t &¶ms)
A factory for creation of simple thread-safe single-thread environment infrastructure object.
Various reusable stuff which can be used in implementation of single-threaded environment infrastruct...
shutdown_status_t
A short name for namespace with run-time stats stuff.
@ must_be_started
Shutdown must be started as soon as possible.
@ completed
Shutdown completed and work of environment must be finished.
@ not_started
Shutdown is not started yet.
@ in_progress
Shutdown is initiated but not finished yet.
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, const current_thread_id_t &thread_id, real_activity_tracker_t &activity_tracker)
void send_thread_activity_stats(const mbox_t &, const stats::prefix_t &, const current_thread_id_t &, fake_activity_tracker_t &)
Various implementations of environment_infrastructure.
Details of SObjectizer run-time implementations.
void process_final_dereg_chain(coop_shptr_t head) noexcept
Helper function that does proceesing of final dereg chain.
void wrap_init_fn_call(Init_Fn init_fn)
A special wrapper for calling init function.
Internal implementation of run-time monitoring and statistics related stuff.
Declarations of messages used by run-time monitoring and statistics.
Predefined suffixes of data-sources.
SO_5_FUNC suffix_t agent_count()
Suffix for data source with count of agents bound to some entity.
SO_5_FUNC suffix_t work_thread_queue_size()
Suffix for data source with count of demands in a working thread event queue.
SO_5_FUNC suffix_t work_thread_activity()
Suffix for data source with work thread activity statistics.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
work_thread_activity_tracking_t
Values for dispatcher's work thread activity tracking.
current_thread_id_t query_current_thread_id()
Get the ID of the current thread.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
A special class for generation of names for dispatcher data sources.
static constexpr const char * disp_type_part() noexcept
Type for representation of statistical data for this event queue.
std::size_t m_demands_count
The current size of the demands queue.
A bunch of sync objects which need to be shared between various parts of env_infrastructure.
main_thread_status_t m_status
The current status of the main thread.
std::condition_variable m_wakeup_condition
A condition to sleep on when no activities to handle.
std::mutex m_lock
Main lock for environment infrastructure.
Type of demand created from elapsed timer.
demand_t(std::type_index type_index, mbox_t mbox, message_ref_t message)
std::type_index m_type_index
void wait_start_if_not_started()
Statistical data for run-time monitoring of coop repository content.
std::size_t m_total_coop_count
Count of cooperations inside the environment.
std::size_t m_total_agent_count
Count of registered agents.
A description of event execution demand.
void call_handler(current_thread_id_t thread_id)
Helper method to simplify demand execution.
A special class for cases where lock is not needed at all.
Notification about finish of stats distribution.
Notification about start of new stats distribution.
A message with value of some quantity.
Information about one work thread activity.
Stats for a work thread activity.
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.
Statistics for run-time monitoring.