2
3
6
7
8
9
10
14#include <so_5/mchain.hpp>
15#include <so_5/mchain_select_ifaces.hpp>
16#include <so_5/environment.hpp>
18#include <so_5/ret_code.hpp>
19#include <so_5/exception.hpp>
20#include <so_5/error_logger.hpp>
22#include <so_5/details/abort_on_fatal_error.hpp>
23#include <so_5/details/at_scope_exit.hpp>
24#include <so_5/details/safe_cv_wait_for.hpp>
29#include <condition_variable>
41
42
43
44
52 "an attempt to get message from empty demand queue" );
59
60
61
62
70 "an attempt to push a message to full demand queue" );
77
78
79
80
85
86
91
92
93
96 is_full()
const noexcept {
return false; }
101 is_empty()
const noexcept {
return m_queue.empty(); }
108 ensure_queue_not_empty( *
this );
109 return m_queue.front();
116 ensure_queue_not_empty( *
this );
124 m_queue.push_back( std::move(demand) );
130 size()
const noexcept {
return m_queue.size(); }
141
142
143
144
145
158 is_full()
const noexcept {
return m_max_size == m_queue.size(); }
163 is_empty()
const noexcept {
return m_queue.empty(); }
170 ensure_queue_not_empty( *
this );
171 return m_queue.front();
178 ensure_queue_not_empty( *
this );
186 ensure_queue_not_full( *
this );
187 m_queue.push_back( std::move(demand) );
193 size()
const noexcept {
return m_queue.size(); }
206
207
208
209
210
226 is_full()
const noexcept {
return m_max_size == m_size; }
238 ensure_queue_not_empty( *
this );
239 return m_storage[ m_head ];
246 ensure_queue_not_empty( *
this );
247 m_storage[ m_head ] = demand_t{};
248 m_head = (m_head + 1) % m_max_size;
256 ensure_queue_not_full( *
this );
257 auto index = (m_head + m_size) % m_max_size;
258 m_storage[ index ] = std::move(demand);
265 size()
const noexcept {
return m_size; }
283
284
285
286
301
302
303
304
305
306
307
308template<
typename Queue,
typename Tracing_Base >
311 ,
private Tracing_Base
315 template<
typename... Tracing_Args >
324 Tracing_Args &&... tracing_args )
347 "mchain doesn't support subscription" );
360 s <<
"<mchain:id=" <<
m_id <<
">";
374 const std::type_index & msg_type,
375 const message_ref_t & message,
376 unsigned int )
override
395
396
397
406 "set_delivery_filter is called for mchain" );
419 duration_t empty_queue_timeout )
override
511 const std::type_index & msg_type,
512 const message_ref_t & message,
628
629
644
645
646
647
648
649
650
654
655
656
657
662
663
664
665
666
669 const std::type_index & msg_type,
670 const message_ref_t & message )
730 "an attempt to push message to full mchain "
731 "with overflow_reaction_t::throw_exception policy" );
738 log_stream <<
"overflow_reaction_t::abort_app "
739 "will be performed for mchain (id="
740 <<
m_id <<
"), msg_type: "
742 <<
". Application will be aborted"
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
774 const std::type_index & msg_type,
775 const message_ref_t & message )
813 log_stream <<
"overflow_reaction_t::abort_app "
814 "will be performed for mchain (id="
815 <<
m_id <<
"), msg_type: "
817 <<
". Application will be aborted"
831
832
833
834
835
836
837
838
872
873
886
887
888
889
890
891
892
893
894
897 typename Tracing_Base::deliver_op_tracer & tracer,
898 const std::type_index & msg_type,
899 const message_ref_t & message )
An interface of message chain.
Interface for message sink.
static bool special_sink_ptr_compare(const abstract_message_sink_t *a, const abstract_message_sink_t *b) noexcept
Interface for creator of new mbox in OOP style.
An interface of delivery filter object.
Mixin to be used in implementation of MPSC mbox with message limits.
agent_t & query_owner_reference() const noexcept
abstract_message_sink_t & message_sink_to_use(const local_mbox_details::subscription_info_with_sink_t &info) const noexcept
limitful_mpsc_mbox_mixin_t(outliving_reference_t< agent_t > owner)
Mixin to be used in implementation of MPSC mbox without message limits.
message_sink_without_message_limit_t m_actual_sink
Actual message sink to be used.
agent_t & query_owner_reference() const noexcept
abstract_message_sink_t & message_sink_to_use(const local_mbox_details::subscription_info_with_sink_t &) noexcept
limitless_mpsc_mbox_mixin_t(outliving_reference_t< agent_t > owner)
const subscription_info_with_sink_t * operator->() const
const subscription_info_with_sink_t & operator*() const
const_map_iterator_type m_it_m
const_vector_iterator_type m_it_v
const_iterator & operator++()
const_iterator(const_vector_iterator_type it_v)
bool operator!=(const const_iterator &o) const
bool operator==(const const_iterator &o) const
const_iterator operator++(int)
bool operator==(const iterator &o) const
vector_iterator_type m_it_v
iterator(vector_iterator_type it_v)
subscription_info_with_sink_t & operator*()
bool operator!=(const iterator &o) const
subscription_info_with_sink_t * operator->()
A special container for holding subscriber_info objects.
subscriber_adaptive_container_t(const subscriber_adaptive_container_t &o)
Copy constructor.
bool is_vector() const
Is vector used as a storage.
storage_type m_storage
The current storage type to be used by container.
subscriber_adaptive_container_t & operator=(subscriber_adaptive_container_t &&o) noexcept
Move operator.
void insert_to_vector(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to vector.
void emplace(abstract_message_sink_t &sink_as_key, Args &&... args)
friend void swap(subscriber_adaptive_container_t &a, subscriber_adaptive_container_t &b) noexcept
const_iterator begin() const
const_iterator end() const
vector_type m_vector
Container for small amount of subscriber_infos.
void insert(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
subscriber_adaptive_container_t()
Default constructor.
void insert_to_map(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to map.
subscriber_adaptive_container_t(subscriber_adaptive_container_t &&o) noexcept
Move constructor.
void switch_storage_to_map()
Switching storage from vector to map.
iterator find(abstract_message_sink_t &subscriber)
map_type m_map
Container for large amount of subscriber_infos.
iterator find_in_vector(abstract_message_sink_t &subscriber)
subscriber_adaptive_container_t & operator=(const subscriber_adaptive_container_t &o)
Copy operator.
iterator find_in_map(abstract_message_sink_t &subscriber)
void switch_storage_to_vector()
Switching storage from map to vector.
void erase(const iterator &it)
An information block about one subscription to one message type with presence of message_sink.
abstract_message_sink_t & sink_reference() const noexcept
Get a reference to the subscribed sink.
A template with implementation of local mbox.
mbox_type_t type() const override
Get the type of message box.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
mbox_id_t id() const override
Unique ID of this mbox.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
void do_deliver_message_to_subscriber(const local_mbox_details::subscription_info_with_sink_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
local_mbox_template(mbox_id_t id, environment_t &env, Tracing_Args &&... args)
void modify_and_remove_subscriber_if_needed(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Changer changer)
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
std::string query_name() const override
Get the mbox name.
void insert_or_modify_subscriber(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer)
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
mbox_t create_ordinary_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that handles message limits.
mbox_t create_limitless_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that ignores message limits.
mbox_t introduce_named_mbox(mbox_namespace_name_t mbox_namespace, nonempty_name_t mbox_name, const std::function< mbox_t() > &mbox_factory)
Introduce named mbox with user-provided factory.
mbox_core_t(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff)
mbox_t create_custom_mbox(environment_t &env, ::so_5::custom_mbox_details::creator_iface_t &creator)
Create a custom mbox.
void destroy_mbox(const full_named_mbox_id_t &name) noexcept
Remove a reference to the named mbox.
mchain_t create_mchain(environment_t &env, const mchain_params_t ¶ms)
Create message chain.
mbox_t create_mbox(environment_t &env)
Create local anonymous mbox.
mbox_t create_mbox(environment_t &env, nonempty_name_t mbox_name)
Create local named mbox.
mbox_id_t allocate_mbox_id() noexcept
Allocate an ID for a new custom mbox or mchain.
mbox_core_stats_t query_stats()
Get statistics for run-time monitoring.
void push_event(mbox_id_t mbox_id, message_delivery_mode_t, const std::type_index &msg_type, const message_ref_t &message, unsigned int, const message_limit::impl::action_msg_tracer_t *tracer) override
Get a message and push it to the appropriate destination.
mpsc_mbox_template_t(mbox_id_t id, environment_t &env, outliving_reference_t< agent_t > owner, Tracing_Args &&... tracing_args)
mbox_type_t type() const override
Get the type of message box.
void modify_and_remove_subscription_if_needed(const std::type_index &msg_type, Info_Changer changer)
Helper for modification and deletion of subscription info.
const mbox_id_t m_id
ID of this mbox.
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
void do_delivery(const std::type_index &msg_type, const message_ref_t &message, typename Tracing_Base::deliver_op_tracer const &tracer, L l)
Helper method to do delivery actions under locked object.
subscriptions_map_t m_subscriptions
Information about the current subscriptions.
default_rw_spinlock_t m_lock
Protection of object from modification.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &) noexcept override
Removes delivery filter for message type and subscriber.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
std::string query_name() const override
Get the mbox name.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
Add the message handler.
mbox_id_t id() const override
Unique ID of this mbox.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
void insert_or_modify_subscription(const std::type_index &msg_type, Info_Maker maker, Info_Changer changer)
Helper for performing insertion or modification of subscription info.
environment_t & m_env
Environment in that the mbox was created.
Base class for a mbox for the case when message delivery tracing is enabled.
mbox_t m_mbox
Actual mbox.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
mbox_id_t id() const override
Unique ID of this mbox.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
named_local_mbox_t(full_named_mbox_id_t full_name, const mbox_t &mbox, impl::mbox_core_t &mbox_core)
mbox_type_t type() const override
Get the type of message box.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
impl::mbox_core_ref_t m_mbox_core
An utility for this mbox.
~named_local_mbox_t() override
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
friend class impl::mbox_core_t
std::string query_name() const override
Get the mbox name.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
const full_named_mbox_id_t m_name
Mbox name.
A class for the name of mbox_namespace.
Parameters for message chain.
Parameters for defining chain size.
Implementation of demands queue for size-limited message chain with dynamically allocated storage.
bool is_full() const noexcept
Is queue full?
limited_dynamic_demand_queue(const capacity_t &capacity)
Initializing constructor.
std::size_t size() const noexcept
Size of the queue.
std::deque< demand_t > m_queue
Queue's storage.
bool is_empty() const noexcept
Is queue empty?
void pop_front()
Remove the front item from queue.
void push_back(demand_t &&demand)
Add a new item to the end of the queue.
const std::size_t m_max_size
Maximum size of the queue.
demand_t & front()
Access to front item from queue.
Implementation of demands queue for size-limited message chain with preallocated storage.
std::size_t m_size
The current size of the queue.
std::size_t size() const noexcept
Size of the queue.
bool is_empty() const noexcept
Is queue empty?
void pop_front()
Remove the front item from queue.
void push_back(demand_t &&demand)
Add a new item to the end of the queue.
bool is_full() const noexcept
Is queue full?
std::vector< demand_t > m_storage
Queue's storage.
const std::size_t m_max_size
Maximum size of the queue.
limited_preallocated_demand_queue(const capacity_t &capacity)
Initializing constructor.
demand_t & front()
Access to front item from queue.
std::size_t m_head
Index of the queue head.
Implementation of demands queue for size-unlimited message chain.
unlimited_demand_queue(const capacity_t &)
demand_t & front()
Access to front item from queue.
void push_back(demand_t &&demand)
Add a new item to the end of the queue.
std::deque< demand_t > m_queue
Queue's storage.
void pop_front()
Remove the front item from queue.
std::size_t size() const noexcept
Size of the queue.
bool is_empty() const noexcept
Is queue empty?
bool is_full() const noexcept
Is queue full?
Template-based implementation of message chain.
void unsubscribe_event_handler(const std::type_index &, abstract_message_sink_t &) noexcept override
Remove all message handlers.
std::string query_name() const override
Get the mbox name.
mbox_type_t type() const override
Get the type of message box.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
const empty_notification_func_t m_empty_notificator
Optional notificator for 'empty' condition.
void set_delivery_filter(const std::type_index &, const delivery_filter_t &, abstract_message_sink_t &) override
const capacity_t m_capacity
Chain capacity.
std::size_t size() const override
Count of messages in the chain.
void notify_multi_chain_select_ops() noexcept
void try_to_store_message_to_queue_nonblocking_mode(const std::type_index &msg_type, const message_ref_t &message)
An implementation of storing another message to chain for the case of delated/periodic messages.
void try_to_store_message_to_queue_ordinary_mode(const std::type_index &msg_type, const message_ref_t &message)
Actual implementation of pushing message to the queue.
const not_empty_notification_func_t m_not_empty_notificator
Optional notificator for 'not_empty' condition.
void subscribe_event_handler(const std::type_index &, abstract_message_sink_t &) override
Add the message handler.
extraction_status_t extract_demand_from_not_empty_queue(demand_t &dest)
Implementation of extract operation for the case when message queue is not empty.
details::status m_status
Status of the chain.
select_case_t * m_select_tail
A queue of multi-chain selects in which this chain is used.
bool empty() const override
Is message chain empty?
std::condition_variable m_overflow_cond
Condition variable for waiting on full queue.
Queue m_queue
Chain's demands queue.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int) override
Deliver message for all subscribers with respect to message limits.
std::size_t m_threads_to_wakeup
Count of threads sleeping on empty mchain.
void remove_from_select(select_case_t &select_case) noexcept override
Removement of mchain from multi chain select.
mchain_props::push_status_t push(const std::type_index &msg_type, const message_ref_t &message, mchain_props::select_case_t &select_case) override
An attempt to push a new message into the mchain.
mbox_id_t id() const override
Unique ID of this mbox.
const mbox_id_t m_id
Mbox ID for chain.
std::mutex m_lock
Chain's lock.
environment_t & m_env
SObjectizer Environment for which message chain is created.
extraction_status_t extract(demand_t &dest, duration_t empty_queue_timeout) override
void actual_close(close_mode_t mode) override
Close the chain.
void complete_store_message_to_queue(typename Tracing_Base::deliver_op_tracer &tracer, const std::type_index &msg_type, const message_ref_t &message)
A reusable method with implementation of last part of storing a message into chain.
std::condition_variable m_underflow_cond
Condition variable for waiting on empty queue.
mchain_template(so_5::environment_t &env, mbox_id_t id, const mchain_params_t ¶ms, Tracing_Args &&... tracing_args)
Initializing constructor.
extraction_status_t extract(demand_t &dest, select_case_t &select_case) override
An extraction attempt as a part of multi chain select.
void drop_delivery_filter(const std::type_index &, abstract_message_sink_t &) noexcept override
Removes delivery filter for message type and subscriber.
Base class for representation of one case in multi chain select.
Interface of holder of message tracer and message trace filter objects.
A class for the name which cannot be empty.
Helper class for indication of long-lived reference via its type.
#define SO_5_LOG_ERROR(logger, var_name)
A special macro for helping error logging.
#define SO_5_THROW_EXCEPTION(error_code, desc)
Some reusable and low-level classes/functions which can be used in public header files.
std::unique_ptr< abstract_message_box_t > make_actual_mbox(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff, A &&... args)
void ensure_sink_for_same_owner(agent_t &actual_owner, abstract_message_sink_t &sink)
Helper the ensures that sink can be used with agent.
Implementation details for MPMC mboxes.
Various helpers for message delivery tracing mechanism.
Details of SObjectizer run-time implementations.
void ensure_queue_not_empty(Q &&queue)
Helper function which throws an exception if queue is empty.
status
Status of the message chain.
@ closed
Bag is closed. New messages cannot be sent to it.
@ open
Bag is open and can be used for message sending.
void ensure_queue_not_full(Q &&queue)
Helper function which throws an exception if queue is full.
Various properties and parameters of message chains.
close_mode_t
What to do with chain's content at close.
extraction_status_t
Result of extraction of message from a message chain.
push_status_t
Result of attempt of pushing a message into a message chain.
Public part of message delivery tracing mechanism.
Private part of message limit implementation.
message_delivery_mode_t
Possible modes of message/signal delivery.
mbox_type_t
Type of the message box.
Full name for a named mbox.
A coolection of data required for local mbox implementation.
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(mbox_id_t id, environment_t &env)
environment_t & m_env
Environment for which the mbox is created.
default_rw_spinlock_t m_lock
Object lock.
const mbox_id_t m_id
ID of this mbox.
static constexpr const std::size_t switch_to_vector
static constexpr const std::size_t switch_to_map
Predicate to be used for comparing keys in subscriber map.
bool operator()(abstract_message_sink_t *a, abstract_message_sink_t *b) const noexcept
Predicate to be used for searching information in the vector.
bool operator()(const subscribers_vector_item_t &a, const subscribers_vector_item_t &b) const noexcept
Information about one subscriber to be stored in a vector.
subscription_info_with_sink_t m_info
Information about the subscription.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key)
abstract_message_sink_t * m_sink_as_key
Pointer to sink that has to be used as search key.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
The normal initializing constructor.
Statistics from mbox_core for run-time monitoring.
Base class for a mbox for the case when message delivery tracing is disabled.
Description of one demand in message chain.