2
3
5#include <so_5/environment.hpp>
9#include <so_5/impl/internal_env_iface.hpp>
10#include <so_5/impl/coop_private_iface.hpp>
12#include <so_5/impl/mbox_core.hpp>
13#include <so_5/impl/layer_core.hpp>
14#include <so_5/impl/stop_guard_repo.hpp>
15#include <so_5/impl/std_msg_tracer_holder.hpp>
17#include <so_5/impl/run_stage.hpp>
19#include <so_5/stats/impl/std_controller.hpp>
20#include <so_5/stats/impl/ds_mbox_core_stats.hpp>
21#include <so_5/stats/impl/ds_agent_core_stats.hpp>
22#include <so_5/stats/impl/ds_timer_thread_stats.hpp>
24#include <so_5/env_infrastructures.hpp>
26#include <so_5/details/rollback_on_exception.hpp>
89 swap( a.m_timer_thread_factory, b.m_timer_thread_factory );
90 swap( a.m_so_layers, b.m_so_layers );
91 swap( a.m_coop_listener, b.m_coop_listener );
92 swap( a.m_event_exception_logger, b.m_event_exception_logger );
97 swap( a.m_error_logger, b.m_error_logger );
98 swap( a.m_message_delivery_tracer, b.m_message_delivery_tracer );
99 swap( a.m_message_delivery_tracer_filter, b.m_message_delivery_tracer_filter );
101 swap( a.m_default_disp_params, b.m_default_disp_params );
105 swap( a.m_queue_locks_defaults_manager, b.m_queue_locks_defaults_manager );
107 swap( a.m_infrastructure_factory, b.m_infrastructure_factory );
109 swap( a.m_event_queue_hook, b.m_event_queue_hook );
111 swap( a.m_work_thread_factory, b.m_work_thread_factory );
113 swap( a.m_default_subscription_storage_factory, b.m_default_subscription_storage_factory );
118 so_5::timer_thread_factory_t factory )
120 m_timer_thread_factory = std::move( factory );
126 coop_listener_unique_ptr_t coop_listener )
128 m_coop_listener = std::move( coop_listener );
134 event_exception_logger_unique_ptr_t logger )
136 if(
nullptr != logger.get() )
137 m_event_exception_logger = std::move( logger );
144 const std::type_index & type,
145 layer_unique_ptr_t layer_ptr )
147 m_so_layers[ type ] = layer_ref_t( layer_ptr.release() );
154
155
156
157
163 impl::mbox_core_t & mbox_repository,
188
189
190
191
192
198 queue_locks_defaults_manager_unique_ptr_t result( std::move(current) );
201 result = make_defaults_manager_for_combined_locks();
210
211
212
213
214
215
225 return original_queue;
237
238
239
240
241
248 event_queue_hook_unique_ptr_t result( std::move(current) );
251 result = make_event_queue_hook< default_event_queue_hook_t >(
252 &event_queue_hook_t::default_deleter );
258
259
260
261
262
263
264
265
266
273 so_5::disp::abstract_work_thread_factory_shptr_t result{
274 std::move(user_provided_factory)
278 result = so_5::disp::make_std_work_thread_factory();
284
285
286
287
288
289
290
291
292
299 if( !user_provided_factory )
300 user_provided_factory = default_subscription_storage_factory();
302 return user_provided_factory;
311
312
313
314
318
319
320
321
322
323
324
328
329
330
331
332
333
334
335
342
343
344
345
349
350
351
352
353
354
355
362
363
364
365
369
370
371
372
373
374
378
379
380
381
382
383
384
385
386
387
388
392
393
394
395
399
400
401
402
406
407
408
409
410
411
412
413
417
418
419
420
421
422
423
424
425
429
430
431
432
433
434
435
436
440
441
442
443
447
448
449
450
451
452
453
526 return m_impl->m_mbox_core->create_mbox( *
this );
533 return m_impl->m_mbox_core->create_mbox( *
this, std::move(nonempty_name) );
540 const std::function< mbox_t() > & mbox_factory )
542 return m_impl->m_mbox_core->introduce_named_mbox(
543 std::move(mbox_namespace),
544 std::move(mbox_name),
552 return m_impl->m_mbox_core->create_mchain( *
this, params );
557 event_exception_logger_unique_ptr_t logger )
561 std::lock_guard< std::mutex > lock{
562 m_impl->m_event_exception_logger_lock };
565 swap( m_impl->m_event_exception_logger, logger );
567 m_impl->m_event_exception_logger->on_install( std::move( logger ) );
575 return m_impl->m_infrastructure->make_coop(
577 so_make_default_disp_binder() );
583 disp_binder_shptr_t disp_binder )
585 return m_impl->m_infrastructure->make_coop(
587 std::move(disp_binder) );
595 return m_impl->m_infrastructure->make_coop(
597 so_make_default_disp_binder() );
604 disp_binder_shptr_t disp_binder )
606 return m_impl->m_infrastructure->make_coop(
608 std::move(disp_binder) );
615 return m_impl->m_infrastructure->register_coop( std::move( agent_coop ) );
624 using duration = std::chrono::steady_clock::duration;
625 if( params.m_pause < duration::zero() )
627 so_5::rc_negative_value_for_pause,
628 "an attempt to call schedule_timer() with negative pause value" );
629 if( params.m_period < duration::zero() )
631 so_5::rc_negative_value_for_period,
632 "an attempt to call schedule_timer() with negative period value" );
638 if( std::chrono::steady_clock::duration::zero() != params.m_period )
640 so_5::rc_mutable_msg_cannot_be_periodic,
641 "unable to schedule periodic timer for mutable message,"
642 " msg_type=" + std::string(params.m_msg_type.name()) );
644 else if( mbox_type_t::multi_producer_multi_consumer == params.m_mbox->type() )
646 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
647 "unable to schedule timer for mutable message and "
648 "MPMC mbox, msg_type=" + std::string(params.m_msg_type.name()) );
651 return m_impl->m_infrastructure->schedule_timer(
664 using duration = std::chrono::steady_clock::duration;
665 if( params.m_pause < duration::zero() )
667 so_5::rc_negative_value_for_pause,
668 "an attempt to call single_timer() with negative pause value" );
671 if( message_mutability_t::mutable_message == message_mutability(params.m_msg) &&
672 mbox_type_t::multi_producer_multi_consumer == params.m_mbox->type() )
674 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
675 "unable to schedule single timer for mutable message and "
676 "MPMC mbox, msg_type=" + std::string(params.m_msg_type.name()) );
678 m_impl->m_infrastructure->single_timer(
687 const std::type_index & type )
const
689 return m_impl->m_layer_core.query_layer( type );
694 const std::type_index & type,
695 const layer_ref_t & layer )
697 m_impl->m_layer_core.add_extra_layer( type, layer );
712 catch(
const std::exception & x )
715 rc_environment_error,
716 std::string(
"some unexpected error during "
717 "environment launching: " ) + x.what() );
725 const auto action = m_impl->m_stop_guards.initiate_stop();
726 if( impl::stop_guard_repository_t::action_t::do_actual_stop == action )
727 m_impl->m_infrastructure->stop();
732 const std::exception & event_exception,
735 std::lock_guard< std::mutex > lock{ m_impl->m_event_exception_logger_lock };
737 m_impl->m_event_exception_logger->log_exception( event_exception, coop );
743 return m_impl->m_exception_reaction;
749 return *(m_impl->m_error_logger);
755 return m_impl->m_infrastructure->stats_controller();
761 return m_impl->m_infrastructure->stats_repository();
767 return m_impl->m_work_thread_factory;
773 return m_impl->m_work_thread_activity_tracking;
779 return m_impl->m_infrastructure->make_default_disp_binder();
785 return m_impl->m_autoshutdown_disabled;
792 return m_impl->m_mbox_core->create_custom_mbox( *
this, creator );
797 stop_guard_shptr_t guard,
800 const auto result = m_impl->m_stop_guards.setup_guard( std::move(guard) );
801 if( stop_guard_t::setup_result_t::stop_already_in_progress == result
802 && stop_guard_t::what_if_stop_in_progress_t::throw_exception ==
803 reaction_on_stop_in_progress )
806 rc_cannot_set_stop_guard_when_stop_is_started,
807 "stop_guard can't be set because the stop operation is "
808 "already in progress" );
816 stop_guard_shptr_t guard )
818 const auto action = m_impl->m_stop_guards.remove_guard( std::move(guard) );
819 if( impl::stop_guard_repository_t::action_t::do_actual_stop == action )
820 m_impl->m_infrastructure->stop();
827 if( !m_impl->m_msg_tracing_stuff.is_msg_tracing_enabled() )
829 rc_msg_tracing_disabled,
830 "msg_tracing's filter can't be changed when msg_tracing "
833 m_impl->m_msg_tracing_stuff.change_filter( std::move(filter) );
840 "run_stats_controller",
844 [
this] { m_impl->m_infrastructure->stats_controller().turn_off(); },
853 [
this] { m_impl->m_layer_core.start(); },
854 [
this] { m_impl->m_layer_core.finish(); },
860 class autoshutdown_guard_t
final
869 bool autoshutdown_disabled )
875 m_guard_coop = env.register_coop( env.make_coop() );
881 if( !m_autoshutdown_disabled )
882 m_env.deregister_coop( m_guard_coop, dereg_reason::normal );
891 m_impl->m_infrastructure->launch(
895 autoshutdown_guard_t guard{
897 m_impl->m_autoshutdown_disabled };
911 return m_env.m_impl->m_mbox_core->create_ordinary_mpsc_mbox(
920 return m_env.m_impl->m_mbox_core->create_limitless_mpsc_mbox(
927 coop_shptr_t coop )
noexcept
929 m_env.m_impl->m_infrastructure->ready_to_deregister_notify( std::move(coop) );
934 coop_shptr_t coop )
noexcept
936 bool any_cooperation_alive =
937 m_env.m_impl->m_infrastructure->final_deregister_coop(
940 if( !any_cooperation_alive && !
m_env.m_impl->m_autoshutdown_disabled )
947 return m_env.m_impl->m_msg_tracing_stuff.is_msg_tracing_enabled();
953 if( !is_msg_tracing_enabled() )
955 "msg_tracer cannot be accessed because msg_tracing is disabled" );
957 return m_env.m_impl->m_msg_tracing_stuff;
963 return m_env.m_impl->m_msg_tracing_stuff;
969 return m_env.m_impl->m_queue_locks_defaults_manager->
970 mpsc_queue_lock_factory();
976 return m_env.m_impl->m_queue_locks_defaults_manager->
977 mpmc_queue_lock_factory();
986 return m_env.m_impl->m_event_queue_hook->on_bind( agent, original_queue );
994 m_env.m_impl->m_event_queue_hook->on_unbind( agent, queue );
1000 return m_env.m_impl->m_mbox_core->allocate_mbox_id();
1007 return m_env.m_impl->m_default_subscription_storage_factory;
autoshutdown_guard_t(environment_t &env, bool autoshutdown_disabled)
coop_handle_t m_guard_coop
const bool m_autoshutdown_disabled
A bunch of data sources for core objects.
stats::auto_registered_source_holder_t< stats::impl::ds_timer_thread_stats_t > m_timer_thread
Data source for timer thread.
stats::auto_registered_source_holder_t< stats::impl::ds_agent_core_stats_t > m_coop_repository
Data source for cooperations repository.
stats::auto_registered_source_holder_t< stats::impl::ds_mbox_core_stats_t > m_mbox_repository
Data source for mboxes repository.
core_data_sources_t(outliving_reference_t< stats::repository_t > ds_repository, impl::mbox_core_t &mbox_repository, so_5::environment_infrastructure_t &infrastructure)
event_queue_t * on_bind(agent_t *, event_queue_t *original_queue) noexcept override
A reaction to binding of an agent to some event_queue.
void on_unbind(agent_t *, event_queue_t *) noexcept override
A reaction to unbinding of an agent from some event_queue.
Type of smart handle for a cooperation.
A special type that plays role of unique_ptr for coop.
Interface for creator of new mbox in OOP style.
An interface for environment_infrastructure entity.
Parameters for the SObjectizer Environment initialization.
environment_params_t & event_exception_logger(event_exception_logger_unique_ptr_t logger)
Set exception logger object.
bool m_autoshutdown_disabled
Is autoshutdown when there is no more cooperation disabled?
void add_layer(const std::type_index &type, layer_unique_ptr_t layer_ptr)
Add an additional layer.
bool autoshutdown_disabled() const
Is autoshutdown disabled?
environment_params_t(environment_params_t &&other)
Move constructor.
friend SO_5_FUNC void swap(environment_params_t &a, environment_params_t &b) noexcept
Swap operation.
environment_params_t & coop_listener(coop_listener_unique_ptr_t coop_listener)
Set cooperation listener object.
exception_reaction_t exception_reaction() const noexcept
Get exception reaction flag value.
environment_params_t()
Constructor.
work_thread_activity_tracking_t work_thread_activity_tracking() const
Get activity tracking flag for the whole SObjectizer Environment.
environment_params_t & operator=(environment_params_t &&other) noexcept
Move operator.
environment_params_t & timer_thread(so_5::timer_thread_factory_t factory)
Set the timer_thread factory.
exception_reaction_t m_exception_reaction
Exception reaction flag for the whole SO Environment.
work_thread_activity_tracking_t m_work_thread_activity_tracking
Work thread activity tracking for the whole Environment.
disp_binder_shptr_t so_make_default_disp_binder()
Get binding to the default dispatcher.
mbox_t create_mbox()
Create an anonymous MPMC mbox.
error_logger_t & error_logger() const
Get the error_logger object.
void imp_run_layers_and_go_further()
Run layers and call next run stage.
void add_extra_layer(const std::type_index &type, const layer_ref_t &layer)
Add an additional layer.
so_5::timer_id_t so_schedule_timer(const low_level_api::schedule_timer_params_t params)
Schedule timer event.
stats::repository_t & stats_repository()
Access to repository of data sources for run-time monitoring.
void change_message_delivery_tracer_filter(so_5::msg_tracing::filter_shptr_t filter)
Change the current msg_tracing's filter to a new one.
void install_exception_logger(event_exception_logger_unique_ptr_t logger)
Set up an exception logger.
mbox_t introduce_named_mbox(mbox_namespace_name_t mbox_namespace, nonempty_name_t mbox_name, const std::function< mbox_t() > &mbox_factory)
Introduce named mbox with user-provided factory.
void run()
Run the SObjectizer Run-Time.
mchain_t create_mchain(const mchain_params_t ¶ms)
Create message chain.
exception_reaction_t exception_reaction() const noexcept
An exception reaction for the whole SO Environment.
environment_t & self_ref()
Auxiliary methods for getting reference to itself.
coop_unique_holder_t make_coop(coop_handle_t parent)
Create a new cooperation that will be a child for specified parent coop.
so_5::disp::abstract_work_thread_factory_shptr_t work_thread_factory() const noexcept
Access to the global work thread factory.
void call_exception_logger(const std::exception &event_exception, const coop_handle_t &coop) noexcept
Call event exception logger for logging an exception.
environment_t(environment_params_t &&so_environment_params)
stats::controller_t & stats_controller()
Access to controller of run-time monitoring.
layer_t * query_layer(const std::type_index &type) const
Access to an additional layer.
void imp_run_infrastructure()
Launch environment infrastructure and wait for finish.
void imp_run_stats_controller_and_go_further()
Run controller for run-time monitoring and call next run stage.
coop_unique_holder_t make_coop()
Create a cooperation.
void stop() noexcept
Send a shutdown signal to the Run-Time.
work_thread_activity_tracking_t work_thread_activity_tracking() const
Get activity tracking flag for the whole SObjectizer Environment.
mbox_t do_make_custom_mbox(custom_mbox_details::creator_iface_t &creator)
Actual creation of a custom mbox.
void remove_stop_guard(stop_guard_shptr_t guard)
Remove stop_guard and complete the stop operation if necessary.
coop_handle_t register_coop(coop_unique_holder_t agent_coop)
Register a cooperation.
coop_unique_holder_t make_coop(disp_binder_shptr_t disp_binder)
Create a cooperation with specified dispatcher binder.
mbox_t create_mbox(nonempty_name_t mbox_name)
Create named MPMC mbox.
void so_single_timer(const low_level_api::single_timer_params_t params)
Schedule a single shot timer event.
stop_guard_t::setup_result_t setup_stop_guard(stop_guard_shptr_t guard, stop_guard_t::what_if_stop_in_progress_t reaction_on_stop_in_progress=stop_guard_t::what_if_stop_in_progress_t::throw_exception)
Set up a new stop_guard.
bool autoshutdown_disabled() const
Get autoshutdown_disabled flag.
coop_unique_holder_t make_coop(coop_handle_t parent, disp_binder_shptr_t disp_binder)
Create a new cooperation that will be a child for specified parent coop.
An interface for logging error messages.
Interface of event_queue_hook object.
An interface of event queue for agent.
The base class for all SObjectizer exceptions.
A helper class for accessing the functionality of environment-class which is specific for SObjectizer...
mbox_id_t allocate_mbox_id() noexcept
Allocate a new ID for a new custom mbox or mchain.
event_queue_t * event_queue_on_bind(agent_t *agent, event_queue_t *original_queue) noexcept
Call the event_queue_hook when an agent is being bound to a particular event_queue.
so_5::disp::mpmc_queue_traits::lock_factory_t default_mpmc_queue_lock_factory() const
Get default lock_factory for MPMC queues.
so_5::msg_tracing::holder_t & msg_tracing_stuff() const
Get access to message delivery tracer stuff holder.
subscription_storage_factory_t default_subscription_storage_factory() const noexcept(noexcept(subscription_storage_factory_t{}=subscription_storage_factory_t{}))
Get the default storage subscription factory.
environment_t & m_env
Environment instance to work with.
void event_queue_on_unbind(agent_t *agent, event_queue_t *queue) noexcept
Call the event_queue_hook when an agent is being unbound from its event_queue.
mbox_t create_ordinary_mpsc_mbox(agent_t &single_consumer)
Create multi-producer/single-consumer mbox that handles message limits.
bool is_msg_tracing_enabled() const
Is message delivery tracing enabled?
so_5::msg_tracing::holder_t & msg_tracing_stuff_nonchecked() const noexcept
Get access to message delivery tracer stuff holder.
so_5::disp::mpsc_queue_traits::lock_factory_t default_mpsc_queue_lock_factory() const
Get default lock_factory for MPSC queues.
void final_deregister_coop(coop_shptr_t coop) noexcept
Do the final actions of a cooperation deregistration.
mbox_t create_limitless_mpsc_mbox(agent_t &single_consumer)
Create multi-producer/single-consumer mbox that ignores message limits.
void ready_to_deregister_notify(coop_shptr_t coop) noexcept
Notification about readiness to the deregistration.
An utility class for working with layers.
Repository of stop_guards.
An interface of the additional SObjectizer Environment layer.
A class for the name of mbox_namespace.
Parameters for message chain.
friend message_mutability_t message_mutability(const intrusive_ptr_t< message_t > &what) noexcept
Helper method for safe get of message mutability flag.
Interface of holder of message tracer and message trace filter objects.
Standard implementation of message tracer holder.
A class for the name which cannot be empty.
Helper class for indication of long-lived reference via its type.
A holder for data-souce that should be automatically registered and deregistered in registry.
A public interface for control SObjectizer monitoring options.
A data source for distributing information about mbox_core.
A data source for distributing information about timer_thread.
An interface of data sources repository.
An interface of stop_guard entity.
setup_result_t
Type for result of setting up a new stop_guard.
what_if_stop_in_progress_t
An indentificator for the timer.
#define SO_5_THROW_EXCEPTION(error_code, desc)
so_5::subscription_storage_factory_t ensure_subscription_storage_factory_exists(subscription_storage_factory_t user_provided_factory)
Helper function for creation of the default subscription storage factory.
so_5::disp::abstract_work_thread_factory_shptr_t ensure_work_thread_factory_exists(so_5::disp::abstract_work_thread_factory_shptr_t user_provided_factory)
Helper function for creation of the default global work thread factory.
queue_locks_defaults_manager_unique_ptr_t ensure_locks_defaults_manager_exists(queue_locks_defaults_manager_unique_ptr_t current)
Helper function for creation of appropriate manager object if necessary.
event_queue_hook_unique_ptr_t ensure_event_queue_hook_exists(event_queue_hook_unique_ptr_t current)
Helper function for creation of appropriate event_queue_hook object if necessary.
Details of SObjectizer run-time implementations.
Implementation details of message delivery tracing mechanism.
Public part of message delivery tracing mechanism.
Internal implementation of run-time monitoring and statistics related stuff.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
exception_reaction_t
A reaction of SObjectizer to an exception from agent event.
@ abort_on_exception
Execution of application must be aborted immediatelly.
message_mutability_t
A enum with variants of message mutability or immutability.
work_thread_activity_tracking_t
Values for dispatcher's work thread activity tracking.
@ unspecified
Tracking mode is specified elsewhere.
Internal details of SObjectizer Environment object.
so_5::msg_tracing::impl::std_holder_t m_msg_tracing_stuff
Holder of stuff related to message delivery tracing.
const exception_reaction_t m_exception_reaction
An exception reaction for the whole SO Environment.
const bool m_autoshutdown_disabled
Is autoshutdown when there is no more cooperation disabled?
core_data_sources_t m_core_data_sources
Data sources for core objects.
impl::layer_core_t m_layer_core
An utility for layers.
event_exception_logger_unique_ptr_t m_event_exception_logger
Logger for exceptions thrown from event-handlers.
impl::mbox_core_ref_t m_mbox_core
An utility for mboxes.
impl::stop_guard_repository_t m_stop_guards
A repository of stop_guards.
environment_infrastructure_unique_ptr_t m_infrastructure
A specific infrastructure for environment.
internals_t(environment_t &env, environment_params_t &¶ms)
Constructor.
error_logger_shptr_t m_error_logger
Error logger object for this environment.
std::mutex m_event_exception_logger_lock
Lock object for protection of exception logger object.
event_queue_hook_unique_ptr_t m_event_queue_hook
Actual event_queue_hook.
queue_locks_defaults_manager_unique_ptr_t m_queue_locks_defaults_manager
Manager for defaults of queue locks.
work_thread_activity_tracking_t m_work_thread_activity_tracking
Work thread activity tracking for the whole Environment.
so_5::disp::abstract_work_thread_factory_shptr_t m_work_thread_factory
Actual global work thread factory.
subscription_storage_factory_t m_default_subscription_storage_factory
Factory to be used as default subscription storage factory.
const message_ref_t & m_msg
Message to be sent after timeout.