2
3
6
7
8
9
10
11
15#include <so_5/mbox.hpp>
17#include <so_5/ret_code.hpp>
18#include <so_5/outliving.hpp>
32
33
34
35
36
68
69
70
71
72
73
74
75
76
77
78
83
84
85
86
87
88
89
108 "call to subscribe_event_handler() is illegal for "
109 "next_turn_mbox_t" );
123 return "<next_turn_mbox>";
135 const std::type_index & msg_type,
136 const message_ref_t & message,
137 unsigned int )
override
139 static const auto & next_turn_msg_type =
142 if( msg_type != next_turn_msg_type )
144 "only next_turn_handler_t::next_turn expected in "
145 "next_turn_mbox_t::do_deliver_message" );
147 const auto & actual_message =
148 dynamic_cast<
const next_turn_handler_t::next_turn & >(
151 actual_message.m_handler.get().on_next_turn(
152 actual_message.m_run_id );
162 "call to set_delivery_filter() is illegal for "
163 "next_turn_mbox_t" );
172 "call to drop_delivery_filter() is illegal for "
173 "next_turn_mbox_t" );
177
178
179
180
181
192 return {
new next_turn_mbox_t{env} };
Interface for message sink.
Type of smart handle for a cooperation.
A special type that plays role of unique_ptr for coop.
An interface of delivery filter object.
Interface for dispatcher binders.
Default implementation of multithreaded environment infrastructure.
std::shared_ptr< default_dispatcher_t< Activity_Tracker > > m_default_disp
Dispatcher to be used as default dispatcher.
stats::repository_t & stats_repository() noexcept override
Get stats repository for the environment.
void single_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause) override
Initiate a delayed message.
reusable::actual_elapsed_timers_collector_t m_timers_collector
A collector for elapsed timers.
so_5::timer_id_t schedule_timer(const std::type_index &type_wrapper, const message_ref_t &msg, const mbox_t &mbox, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period) override
Initiate a timer (delayed or periodic message).
shutdown_status_t m_shutdown_status
Status of shutdown procedure.
void launch(env_init_t init_fn) override
Do actual launch of SObjectizer's Environment.
coop_handle_t register_coop(coop_unique_holder_t coop) override
Register new cooperation.
void stop() noexcept override
Initiate a signal for shutdown of Environment.
so_5::impl::final_dereg_chain_holder_t m_final_dereg_chain
The chain of coops for the final deregistration.
void perform_shutdown_related_actions_if_needed(std::unique_lock< std::mutex > &acquired_lock) noexcept
main_thread_sync_objects_t m_sync_objects
All sync objects to be shared between different parts.
void run_user_supplied_init_and_do_main_loop(env_init_t init_fn)
void try_handle_next_demand(std::unique_lock< std::mutex > &acquired_lock) noexcept
timer_thread_stats_t query_timer_thread_stats() override
Query run-time statistics for timer (thread or manager).
void ready_to_deregister_notify(coop_shptr_t coop) noexcept override
void process_final_deregs_if_any(std::unique_lock< std::mutex > &acquired_lock) noexcept
so_5::environment_infrastructure_t::coop_repository_stats_t query_coop_repository_stats() override
Query run-time statistics for cooperation repository.
void run_main_loop() noexcept
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t default_binder) override
Create an instance of a new coop.
void run_default_dispatcher_and_go_further(env_init_t init_fn)
bool final_deregister_coop(coop_shptr_t coop) noexcept override
Do final actions of the cooperation deregistration.
timer_manager_unique_ptr_t m_timer_manager
A timer manager to be used.
stats_controller_t m_stats_controller
Stats controller for this environment.
event_queue_impl_t m_event_queue
Queue for execution_demands which must be handled on the main thread.
env_infrastructure_t(environment_t &env, timer_manager_factory_t timer_factory, error_logger_shptr_t error_logger, coop_listener_unique_ptr_t coop_listener, mbox_t stats_distribution_mbox)
disp_binder_shptr_t make_default_disp_binder() override
Create a binder for the default dispatcher.
void handle_expired_timers_if_any(std::unique_lock< std::mutex > &acquired_lock) noexcept
coop_repo_t m_coop_repo
Repository of registered coops.
stats::controller_t & stats_controller() noexcept override
Get stats controller for the environment.
Activity_Tracker m_activity_tracker
Actual activity tracker for main working thread.
pop_result_t
Type for result of extraction operation.
void push(execution_demand_t demand) override
event_queue_impl_t(main_thread_sync_objects_t &sync_objects)
void push_evt_start(execution_demand_t demand) override
pop_result_t pop(execution_demand_t &receiver) noexcept
stats_t query_stats() const
std::deque< execution_demand_t > m_demands
main_thread_sync_objects_t & m_sync_objects
void push_evt_finish(execution_demand_t demand) noexcept override
bool empty() const noexcept
demands_container_t m_demands
Collected demands.
virtual void accept(std::type_index type_index, mbox_t mbox, message_ref_t msg) override
Accept and store info about elapsed timer.
bool has_live_coop()
Is there any live coop?
coop_repo_t(outliving_reference_t< environment_t > env, coop_listener_unique_ptr_t coop_listener)
Initializing constructor.
A basic part of implementation of dispatcher to be used in places where default dispatcher is needed.
std::atomic< std::size_t > m_agents_bound
Counter of agents bound to that dispatcher.
outliving_reference_t< Event_Queue_Type > m_event_queue
Event queue for that dispatcher.
default_dispatcher_basis_t(outliving_reference_t< Event_Queue_Type > event_queue)
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
Event_Queue_Type & event_queue() const noexcept
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
std::size_t agents_bound() const noexcept
void handle_demand(execution_demand_t &demand)
current_thread_id_t thread_id() const noexcept
current_thread_id_t m_thread_id
ID of the main thread.
void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
outliving_reference_t< default_dispatcher_t > m_dispatcher
Dispatcher to work with.
stats::prefix_t m_base_prefix
Basic prefix for data sources.
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
disp_data_source_t(outliving_reference_t< default_dispatcher_t > disp)
An implementation of dispatcher to be used in places where default dispatcher is needed.
outliving_reference_t< Activity_Tracker > m_activity_tracker
Activity tracker.
Activity_Tracker & activity_tracker() noexcept
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for speading run-time stats.
default_dispatcher_t(outliving_reference_t< environment_t > env, outliving_reference_t< Event_Queue_Type > event_queue, outliving_reference_t< Activity_Tracker > activity_tracker)
virtual void accept(std::type_index type_index, mbox_t mbox, message_ref_t msg) override
Accept and store info about elapsed timer.
stats::work_thread_activity_stats_t take_activity_stats()
stats::activity_tracking_stuff::stats_collector_t< stats::activity_tracking_stuff::null_lock > m_waiting
void wait_start_if_not_started()
stats::activity_tracking_stuff::stats_collector_t< stats::activity_tracking_stuff::null_lock > m_working
static std::chrono::steady_clock::duration very_small_timeout()
const mbox_t m_next_turn_mbox
Mbox for delayed messages for initiation of next turn.
const mbox_t m_distribution_mbox
Mbox for sending messages with run-time statistics.
status_t m_status
Current status of stats_controller.
virtual void add(stats::source_t &what) override
Registration of new data source.
virtual void turn_on() override
Turn the monitoring on.
virtual std::chrono::steady_clock::duration set_distribution_period(std::chrono::steady_clock::duration period) override
Set distribution period.
std::mutex m_lock
Object's lock.
status_t
Status of stats_controller.
void send_next_message(std::chrono::steady_clock::duration pause, const int run_id)
Helper method for sending next instance of next_turn message.
virtual const mbox_t & mbox() const override
Get the mbox for receiving monitoring information.
stats_controller_t(mbox_t distribution_mbox, mbox_t next_turn_mbox)
Initializing constructor.
virtual void on_next_turn(int run_id) override
std::chrono::steady_clock::duration distribute_current_data()
Actual distribution of the current statistics.
virtual void turn_off() override
Turn the monitoring off.
int m_run_id
ID of stats distribution.
virtual void remove(stats::source_t &what) noexcept override
Deregistration of previously registered data source.
std::chrono::steady_clock::duration m_distribution_period
stats::source_t * m_tail
Tail of data sources list.
stats::source_t * m_head
Head of data sources list.
An interface for environment_infrastructure entity.
An interface of event queue for agent.
A basic part for various implementations of coop_repository.
Helper class for holding the current chain of coops for the final deregistration.
Helper class for accessing protected members from mbox interface.
mbox_iface_for_timers_t(const mbox_t &mb)
void deliver_message_from_timer(const std::type_index &msg_type, const message_ref_t &message)
abstract_message_box_t & m_mb
A base class for agent messages.
Helper class for indication of long-lived reference via its type.
Helper for collecting activity stats.
A holder for data-souce that should be automatically registered and deregistered in registry.
A public interface for control SObjectizer monitoring options.
An interface for initiation of next turn in stats distribution.
virtual ~next_turn_handler_t()
virtual void on_next_turn(int run_id)=0
mbox_id_t id() const override
Unique ID of this mbox.
void subscribe_event_handler(const std::type_index &, abstract_message_sink_t &) override
Add the message handler.
mbox_type_t type() const override
Get the type of message box.
void drop_delivery_filter(const std::type_index &, abstract_message_sink_t &) noexcept override
Removes delivery filter for message type and subscriber.
std::string query_name() const override
Get the mbox name.
environment_t & environment() const noexcept override
next_turn_mbox_t(environment_t &env)
void unsubscribe_event_handler(const std::type_index &, abstract_message_sink_t &) noexcept override
Remove all message handlers.
void set_delivery_filter(const std::type_index &, const delivery_filter_t &, abstract_message_sink_t &) override
Set a delivery filter for message type and subscriber.
static mbox_t make(environment_t &env)
Helper for simplify creation of that mboxes of that type.
void do_deliver_message(message_delivery_mode_t, const std::type_index &msg_type, const message_ref_t &message, unsigned int) override
Deliver message for all subscribers with respect to message limits.
environment_t & m_env
Environment for which that mbox is created.
A type for storing prefix of data_source name.
An interface of data sources repository.
An interface of data source.
An indentificator for the timer.
An interface for collector of elapsed timers.
#define SO_5_THROW_EXCEPTION(error_code, desc)
auto unlock_do_and_lock_again(std::unique_lock< std::mutex > &acquired_lock, Action &&action) -> decltype(action())
main_thread_status_t
A short name for namespace with run-time stats stuff.
void wakeup_if_waiting(main_thread_sync_objects_t &sync_objects)
Simple single-threaded environment infrastructure with thread safety.
SO_5_FUNC environment_infrastructure_factory_t factory(params_t &¶ms)
A factory for creation of simple thread-safe single-thread environment infrastructure object.
Various reusable stuff which can be used in implementation of single-threaded environment infrastruct...
shutdown_status_t
A short name for namespace with run-time stats stuff.
@ must_be_started
Shutdown must be started as soon as possible.
@ completed
Shutdown completed and work of environment must be finished.
@ not_started
Shutdown is not started yet.
@ in_progress
Shutdown is initiated but not finished yet.
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, const current_thread_id_t &thread_id, real_activity_tracker_t &activity_tracker)
void send_thread_activity_stats(const mbox_t &, const stats::prefix_t &, const current_thread_id_t &, fake_activity_tracker_t &)
Various implementations of environment_infrastructure.
Details of SObjectizer run-time implementations.
Internal implementation of run-time monitoring and statistics related stuff.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
message_delivery_mode_t
Possible modes of message/signal delivery.
mbox_type_t
Type of the message box.
@ multi_producer_single_consumer
A special class for generation of names for dispatcher data sources.
static constexpr const char * disp_type_part() noexcept
Type for representation of statistical data for this event queue.
std::size_t m_demands_count
The current size of the demands queue.
A bunch of sync objects which need to be shared between various parts of env_infrastructure.
main_thread_status_t m_status
The current status of the main thread.
std::condition_variable m_wakeup_condition
A condition to sleep on when no activities to handle.
std::mutex m_lock
Main lock for environment infrastructure.
Type of demand created from elapsed timer.
demand_t(std::type_index type_index, mbox_t mbox, message_ref_t message)
std::type_index m_type_index
void wait_start_if_not_started()
Statistical data for run-time monitoring of coop repository content.
A description of event execution demand.
A special class for cases where lock is not needed at all.
next_turn(outliving_reference_t< next_turn_handler_t > handler, int run_id)
outliving_reference_t< next_turn_handler_t > m_handler
Who must do next turn.
int m_run_id
ID of stats distribution.
Stats for a work thread activity.
Statistics for run-time monitoring.