2
3
6
7
8
9
10
11
15#include <so_5/stats/prefix.hpp>
17#include <so_5/declspec.hpp>
26
27
28
29
30
31
36
37
38
39
40
45
46
47
48
49
58
59
60
61
62
67
68
69
70
71
72
77
78
79
80
81
86
87
88
89
90
95
96
97
98
99
100
105
106
107
108
109
114
115
116
117
118
123
124
125
126
127
132
133
134
135
136
141
142
143
144
145
150
151
152
153
154
155
156
Alias for namespace with traits of event queue.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
queue_traits::queue_params_t m_queue_params
Queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
disp_params_t()=default
Default constructor.
A handle for active_group dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
dispatcher_handle_t() noexcept=default
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
bool empty() const noexcept
Is this handle empty?
void reset() noexcept
Drop the content of handle.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
operator bool() const noexcept
Is this handle empty?
disp_binder_shptr_t binder(nonempty_name_t group_name) const
Get a binder for that dispatcher.
const std::string m_group_name
Name of group for new agents.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, nonempty_name_t group_name) noexcept
An actual interface of active group dispatcher.
virtual so_5::event_queue_t * query_thread_for_group(const std::string &group_name) noexcept=0
Get the event_queue for the specified active group.
virtual void release_thread_for_group(const std::string &group_name) noexcept=0
Release the thread for the specified active group.
virtual void allocate_thread_for_group(const std::string &group_name)=0
Create a new thread for a group if it necessary.
The very basic interface of active_group dispatcher.
virtual ~basic_dispatcher_iface_t() noexcept=default
virtual disp_binder_shptr_t binder(nonempty_name_t group_name)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
outliving_reference_t< dispatcher_template_t > m_dispatcher
Dispatcher to work with.
void distribute_value_for_work_thread(const so_5::mbox_t &mbox, const std::string &group_name, const thread_with_refcounter_t &wt)
disp_data_source_t(const std::string_view name_base, outliving_reference_t< dispatcher_template_t > disp)
stats::prefix_t m_base_prefix
Basic prefix for data sources.
void distribute(const so_5::mbox_t &mbox) override
Send appropriate notification about the current value.
void release_thread_for_group(const std::string &group_name) noexcept override
Release the thread for the specified active group.
void allocate_thread_for_group(const std::string &group_name) override
Create a new thread for a group if it necessary.
outliving_reference_t< environment_t > m_env
SObjectizer Environment to work in.
std::mutex m_lock
This object lock.
so_5::event_queue_t * query_thread_for_group(const std::string &group_name) noexcept override
Get the event_queue for the specified active group.
active_group_map_t m_groups
A map of dispatchers for active groups.
const disp_params_t m_params
Parameters for the dispatcher.
dispatcher_template_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
~dispatcher_template_t() noexcept override
work_thread_shptr_t search_and_try_remove_group_from_map(const std::string &group_name) noexcept
Helper function for searching and erasing agent's thread from map of active threads.
stats::auto_registered_source_holder_t< disp_data_source_t > m_data_source
Data source for run-time monitoring.
disp_binder_shptr_t binder(nonempty_name_t group_name) override
Container for storing parameters for MPSC queue.
A part of demand queue implementation for the case when activity tracking is not used.
no_activity_tracking_impl_t(queue_traits::lock_unique_ptr_t lock)
std::size_t demands_count(const demands_counter_t &external_counter)
Get the count of demands in the queue.
void clear()
Clear demands queue.
void stop_service()
Stop demands processing.
virtual void push(execution_demand_t demand) override
Enqueue new event to the queue.
extraction_result_t pop(demand_container_t &demands, demands_counter_t &external_counter)
Try to extract demands from the queue.
void push_evt_finish(execution_demand_t demand) noexcept override
void start_service()
Start demands processing.
void push_evt_start(execution_demand_t demand) override
queue_template_t(queue_traits::lock_unique_ptr_t lock)
A part of demand queue implementation for the case when activity tracking is used.
so_5::stats::activity_tracking_stuff::stats_collector_t< so_5::stats::activity_tracking_stuff::external_lock< queue_traits::lock_t, so_5::stats::activity_tracking_stuff::no_lock_at_start_stop_policy > > m_waiting_stats
so_5::stats::activity_stats_t take_activity_stats()
with_activity_tracking_impl_t(queue_traits::lock_unique_ptr_t lock)
Part of implementation of work thread with activity tracking.
so_5::stats::work_thread_activity_stats_t take_activity_stats()
Get the activity stats.
so_5::stats::activity_stats_t m_activity_stats
Activity statistics.
const so_5::stats::clock_type_t::time_point * m_activity_started_at
Pointer to time_point object related to the current activity.
activity_tracking_impl_t(work_thread_holder_t thread_holder, queue_traits::lock_factory_t queue_lock_factory)
activity_tracking_traits::lock_t m_stats_lock
Lock for manipulation of activity stats.
void serve_demands_block(demand_container_t &demands)
Main method for serving block of demands.
Part of implementation of work thread without activity tracking.
void serve_demands_block(demand_container_t &demands)
Main method for serving block of demands.
no_activity_tracking_impl_t(work_thread_holder_t thread_holder, queue_traits::lock_factory_t queue_lock_factory)
void shutdown()
Send the shutdown signal to the working thread.
event_queue_t & event_queue()
Get the underlying event_queue object.
std::size_t demands_count()
Get the count of demands in the queue.
void body()
Main thread body.
void wait()
Wait the full stop of the working thread.
void start()
Start the working thread.
work_thread_template_t(work_thread_holder_t thread_holder, queue_traits::lock_factory_t queue_lock_factory)
event_queue_t * get_agent_binding()
Get a binding information for an agent.
so_5::current_thread_id_t thread_id() const
Get ID of work thread.
Mixin with thread activity tracking flag.
Mixin that holds optional work thread factory.
An analog of unique_ptr for abstract_work_thread.
Interface for dispatcher binders.
An interface of event queue for agent.
A base class for agent messages.
A class for the name which cannot be empty.
Helper class for indication of long-lived reference via its type.
Base for the case of externals stats lock.
external_lock(Lock_Type &lock)
Base for the case of internal stats lock.
Helper for collecting activity stats.
so_5::stats::activity_stats_t take_stats()
so_5::stats::activity_stats_t m_work_activity
A statistics for work activity.
Lock_Holder & lock_holder()
stats_collector_t(Args &&...args)
void start_if_not_started()
A helper method for safe start if start method hasn't been called yet.
so_5::stats::clock_type_t::time_point m_work_started_at
A time point when current activity started.
bool m_is_in_working
A flag for indicating work activity.
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
bool operator<(const prefix_t &o) const noexcept
Is less than?
constexpr prefix_t() noexcept
Default constructor creates empty prefix.
constexpr bool empty() const noexcept
Is prefix empty?
static constexpr const std::size_t max_buffer_size
Max size of buffer for prefix value (including 0-symbol at the end).
constexpr std::string_view as_string_view() const noexcept(noexcept(std::string_view{std::declval< const char * >()}))
Access to prefix value as string_view.
constexpr const char * c_str() const noexcept
Access to prefix value.
constexpr prefix_t(const char *value) noexcept
Initializing constructor.
bool operator!=(const prefix_t &o) const noexcept
Is not equal?
static constexpr const std::size_t max_length
Max length of prefix (not including 0-symbol at the end).
char m_value[max_buffer_size]
Actual value.
prefix_t(const std::string &value) noexcept(noexcept(value.c_str()))
Initializing constructor.
bool operator==(const prefix_t &o) const noexcept
Is equal?
An interface of data source.
A type for representing the suffix of data_source name.
constexpr bool operator<(const suffix_t &o) const noexcept
Compares suffixes by pointer value.
constexpr bool operator==(const suffix_t &o) const noexcept
Compares suffixes by pointer values.
constexpr const char * c_str() const noexcept
Access to suffix value.
const char * m_value
Actual value.
constexpr std::string_view as_string_view() const noexcept(noexcept(std::string_view{std::declval< const char * >()}))
Access to prefix value as string_view.
constexpr suffix_t(const char *value) noexcept
Initializing constructor.
constexpr bool operator!=(const suffix_t &o) const noexcept
Compares suffixes by pointer value.
#define SO_5_THROW_EXCEPTION(error_code, desc)
#define SO_5_EXPORT_IMPORT_FOR_QUANTITY_MSG
Helpers for manipulation with standard C++ I/O streams.
Some reusable and low-level classes/functions which can be used in public header files.
void shutdown_and_wait(T &w)
Just a helper function for consequetive call to shutdown and wait.
void send_thread_activity_stats(const so_5::mbox_t &, const stats::prefix_t &, work_thread::work_thread_no_activity_tracking_t &)
void send_thread_activity_stats(const so_5::mbox_t &mbox, const stats::prefix_t &prefix, work_thread::work_thread_with_activity_tracking_t &wt)
Active groups dispatcher implemetation details.
Active groups dispatcher.
dispatcher_handle_t make_dispatcher(so_5::environment_t &env, const std::string_view data_sources_name_base)
Create an instance of active_group dispatcher.
dispatcher_handle_t make_dispatcher(so_5::environment_t &env)
Create an instance of active_group dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_group dispatcher.
Various stuff related to MPSC event queue implementation and tuning.
status_t
Thread status flag.
@ working
1 - thread execution should be continued.
@ stopped
0 - thread execution should be stopped.
Implemetation details of dispatcher's working thread.
extraction_result_t
Type for result of demand extraction.
@ shutting_down
Demand has not been extracted because of shutdown.
@ demand_extracted
Demand has been extracted.
@ no_demands
Demand has not been extracted because the demand queue is empty.
Reusable components for dispatchers.
abstract_work_thread_factory_shptr_t actual_work_thread_factory_to_use(const work_thread_factory_mixin_t< Params > ¶ms, const environment_t &env) noexcept
Helper to detect actual work thread factory to be used.
work_thread_holder_t acquire_work_thread(const work_thread_factory_mixin_t< Params > ¶ms, environment_t &env)
Helper function for acquiring a new worker thread from an appropriate work thread factory.
so_5::stats::prefix_t make_disp_prefix(const std::string_view disp_type, const std::string_view data_sources_name_base, const void *disp_this_pointer)
Create basic prefix for dispatcher data source names.
void modify_disp_params(so_5::environment_t &env, Disp_Params_Type ¶ms)
Helper functions to adjust some dispatcher parameters with respect to settings from environment.
so_5::stats::prefix_t make_disp_working_thread_prefix(const so_5::stats::prefix_t &disp_prefix, std::size_t thread_number)
Create prefix for dispatcher's working thread data source.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Details of SObjectizer run-time implementations.
void ensure_join_from_different_thread(current_thread_id_t thread_to_be_joined)
Ensures that join will be called from different thread.
std::unique_ptr< Common_Disp_Iface_Type > create_appropriate_disp(outliving_reference_t< Env > env, const std::string_view name_base, Disp_Params disp_params, Args &&...args)
Helper function for creation of dispatcher with respect to activity tracking flag in dispatcher param...
void update_stats_from_current_time(activity_stats_t &value_to_update, clock_type_t::time_point activity_started_at)
Helper function for simplification of current stats update.
void update_stats_from_duration(activity_stats_t &value_to_update, clock_type_t::duration last_duration)
Helper function for simplification of current stats update.
duration_t calc_avg_time(std::uint_fast64_t count, duration_t previous, duration_t last)
A function for calculating average value.
Declarations of messages used by run-time monitoring and statistics.
Predefined prefixes of data-sources.
SO_5_FUNC prefix_t coop_repository()
Prefix of data sources with statistics for cooperations and agents repository.
SO_5_FUNC prefix_t timer_thread()
Prefix of data sources with statistics for timer thread.
SO_5_FUNC prefix_t mbox_repository()
Prefix of data sources with statistics for mboxes repository.
Predefined suffixes of data-sources.
SO_5_FUNC suffix_t coop_final_dereg_count()
Suffix for data source with count of cooperations waiting the final deregistration step and removemen...
SO_5_FUNC suffix_t agent_count()
Suffix for data source with count of agents bound to some entity.
SO_5_FUNC suffix_t work_thread_queue_size()
Suffix for data source with count of demands in a working thread event queue.
SO_5_FUNC suffix_t named_mbox_count()
Suffix for data source with count of named mboxes.
SO_5_FUNC suffix_t disp_active_group_count()
Suffix for data source with count of active groups in an active_group dispatcher.
SO_5_FUNC suffix_t work_thread_activity()
Suffix for data source with work thread activity statistics.
SO_5_FUNC suffix_t disp_thread_count()
Suffix for data source with count of work threads for dispatcher.
SO_5_FUNC suffix_t timer_periodic_count()
Suffix for data source with count of periodic timers.
SO_5_FUNC suffix_t coop_count()
Suffix for data source with count of cooperations.
SO_5_FUNC suffix_t timer_single_shot_count()
Suffix for data source with count of single-shot timers.
SO_5_FUNC suffix_t demand_quote()
Suffix for data source with size of quote for demands processing.
All stuff related to run-time monitoring and statistics.
std::ostream & operator<<(std::ostream &to, const prefix_t &what)
Just a helper operator.
std::ostream & operator<<(std::ostream &to, const suffix_t &what)
Just a helper operator.
std::ostream & operator<<(std::ostream &to, const activity_stats_t &what)
Helper for printing value of activity_stats.
Private part of message limit implementation.
Auxiliary class for the working agent counting.
work_thread_shptr_t m_thread
Common data for all implementations of demand_queue.
queue_traits::lock_unique_ptr_t m_lock
demand_container_t m_demands
Demand queue.
common_data_t(queue_traits::lock_unique_ptr_t lock)
Initializing constructor.
bool m_in_service
Service flag.
Common data for all work thread implementations.
std::atomic< status_t > m_status
Thread status flag.
common_data_t(work_thread_holder_t thread_holder, queue_traits::lock_factory_t queue_lock_factory)
Demand_Queue m_queue
Demands queue.
demands_counter_t m_demands_count
A counter for calculating count of demands in the queue.
work_thread_holder_t m_thread_holder
Working thread.
so_5::current_thread_id_t m_thread_id
ID of working thread.
A description of event execution demand.
Statistics of some activity.
std::optional< duration_t > m_current_activity_time
duration_t m_avg_time
Average time for one event.
duration_t m_total_time
Total time spent for events in that period of time.
std::uint_fast64_t m_count
Count of events in that period of time.
Default locking policy for stats_collector_t.
An analog of std::lock_guard but without actual locking actions.
A custom locking policy for stats_collector_t.
A special class for cases where lock is not needed at all.
Various traits of activity tracking implementation.
Notification about finish of stats distribution.
Notification about start of new stats distribution.
A message with value of some quantity.
suffix_t m_suffix
Suffix of data_source name.
T m_value
Actual quantity value.
prefix_t m_prefix
Prefix of data_source name.
quantity(const prefix_t &prefix, const suffix_t &suffix, T value)
Initializing constructor.
Information about one work thread activity.
work_thread_activity_stats_t m_stats
Actual value.
work_thread_activity(const prefix_t &prefix, const suffix_t &suffix, const so_5::current_thread_id_t &thread_id, work_thread_activity_stats_t stats)
prefix_t m_prefix
Prefix of data_source name.
suffix_t m_suffix
Suffix of data_source name.
so_5::current_thread_id_t m_thread_id
ID of the thread.
Stats for a work thread activity.
activity_stats_t m_working_stats
Stats for processed events.
activity_stats_t m_waiting_stats
Stats for waiting periods.