2
3
6
7
8
9
10
11
15#include <so_5/types.hpp>
16#include <so_5/exception.hpp>
17#include <so_5/spinlocks.hpp>
19#include <so_5/mbox.hpp>
20#include <so_5/event_queue.hpp>
21#include <so_5/message_limit.hpp>
23#include <so_5/impl/local_mbox_basic_subscription_info.hpp>
24#include <so_5/impl/message_sink_without_message_limit.hpp>
25#include <so_5/impl/msg_tracing_helpers.hpp>
36
37
38
39
40
69
70
71
72
73
74
75
108
109
110
111
112
113
114
124 "unexpected type of message_sink is used for subscription "
125 "to agent's direct mbox" );
127 if( std::addressof(actual_owner) != p->owner_pointer() )
130 "the only one consumer can create subscription to mpsc_mbox" );
140
141
142
143
144
145
146
147
148
149
150
151
153 typename Tracing_Base,
154 typename Limits_Handling_Mixin >
155class mpsc_mbox_template_t
final
157 ,
private Limits_Handling_Mixin
158 ,
protected Tracing_Base
161 template<
typename... Tracing_Args >
166 Tracing_Args &&... tracing_args )
167 : Limits_Handling_Mixin{ owner }
168 , Tracing_Base{ std::forward< Tracing_Args >( tracing_args )... }
181 const std::type_index & msg_type,
184 std::lock_guard< default_rw_spinlock_t > lock{
m_lock };
186 ensure_sink_for_same_owner(
187 this->query_owner_reference(),
190 insert_or_modify_subscription(
193 return subscription_info_t
{ subscriber
};
195 [&]( subscription_info_t & info ) {
202 const std::type_index & msg_type,
205 std::lock_guard< default_rw_spinlock_t > lock{
m_lock };
212 std::addressof(subscriber) );
216 if( std::addressof(
this->query_owner_reference()) !=
221 modify_and_remove_subscription_if_needed(
223 []( subscription_info_t & info ) {
231 std::ostringstream s;
232 s <<
"<mbox:type=MPSC:id=" <<
m_id <<
">";
246 const std::type_index & msg_type,
247 const message_ref_t & message,
248 unsigned int redirection_deep )
override
250 typename Tracing_Base::deliver_op_tracer tracer{
263 [&](
const subscription_info_t & info )
271 this->message_sink_to_use( info ).push_event(
277 tracer.overlimit_tracer() );
283 const std::type_index & msg_type,
287 std::lock_guard< default_rw_spinlock_t > lock{
m_lock };
289 ensure_sink_for_same_owner(
290 this->query_owner_reference(),
293 insert_or_modify_subscription(
296 return subscription_info_t
{ filter
};
298 [&]( subscription_info_t & info ) {
305 const std::type_index & msg_type,
308 std::lock_guard< default_rw_spinlock_t > lock{
m_lock };
310 modify_and_remove_subscription_if_needed(
312 []( subscription_info_t & info ) {
325
326
327
328
332
333
334
335
336
337 using subscriptions_map_t = std::map<
339 subscription_info_t >;
342
343
347
348
349
350
351
352
353
354
355
356
357
358
359
363
364
365
366
370
371
372
373
377
378
379
380
381
382 template<
typename Info_Maker,
typename Info_Changer >
385 const std::type_index & msg_type,
387 Info_Changer changer )
398 changer( it->second );
403
404
405
406
407
408 template<
typename Info_Changer >
411 const std::type_index & msg_type,
412 Info_Changer changer )
417 changer( it->second );
424
425
426
427
428
429
430
431 template<
typename L >
435 const std::type_index & msg_type,
437 const message_ref_t & message,
439 typename Tracing_Base::deliver_op_tracer
const & tracer,
450 const auto delivery_status =
451 it->second.must_be_delivered(
453 [](
const message_ref_t & m ) ->
message_t & {
463 tracer.message_rejected(
472 tracer.no_subscribers();
478
479
480
481
482using ordinary_mpsc_mbox_without_tracing_t =
483 mpsc_mbox_template_t<
489
490
491
492
493using ordinary_mpsc_mbox_with_tracing_t =
494 mpsc_mbox_template_t<
500
501
502
503
504
505using limitless_mpsc_mbox_without_tracing_t =
506 mpsc_mbox_template_t<
512
513
514
515
516
517using limitless_mpsc_mbox_with_tracing_t =
518 mpsc_mbox_template_t<
Interface for message sink.
static bool special_sink_ptr_compare(const abstract_message_sink_t *a, const abstract_message_sink_t *b) noexcept
virtual void push_event(mbox_id_t mbox_id, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep, const message_limit::impl::action_msg_tracer_t *tracer)=0
Get a message and push it to the appropriate destination.
Interface for creator of new mbox in OOP style.
virtual mbox_t create(const mbox_creation_data_t &data)=0
Creation of custom mbox.
An interface of delivery filter object.
Mixin to be used in implementation of MPSC mbox with message limits.
agent_t & query_owner_reference() const noexcept
abstract_message_sink_t & message_sink_to_use(const local_mbox_details::subscription_info_with_sink_t &info) const noexcept
limitful_mpsc_mbox_mixin_t(outliving_reference_t< agent_t > owner)
Mixin to be used in implementation of MPSC mbox without message limits.
message_sink_without_message_limit_t m_actual_sink
Actual message sink to be used.
agent_t & query_owner_reference() const noexcept
abstract_message_sink_t & message_sink_to_use(const local_mbox_details::subscription_info_with_sink_t &) noexcept
limitless_mpsc_mbox_mixin_t(outliving_reference_t< agent_t > owner)
const subscription_info_with_sink_t * operator->() const
const subscription_info_with_sink_t & operator*() const
const_map_iterator_type m_it_m
const_vector_iterator_type m_it_v
const_iterator & operator++()
const_iterator(const_vector_iterator_type it_v)
bool operator!=(const const_iterator &o) const
const_iterator(const_map_iterator_type it_m)
bool operator==(const const_iterator &o) const
const_iterator operator++(int)
iterator(map_iterator_type it_m)
bool operator==(const iterator &o) const
vector_iterator_type m_it_v
iterator(vector_iterator_type it_v)
subscription_info_with_sink_t & operator*()
bool operator!=(const iterator &o) const
subscription_info_with_sink_t * operator->()
A special container for holding subscriber_info objects.
subscriber_adaptive_container_t(const subscriber_adaptive_container_t &o)
Copy constructor.
bool is_vector() const
Is vector used as a storage.
storage_type m_storage
The current storage type to be used by container.
subscriber_adaptive_container_t & operator=(subscriber_adaptive_container_t &&o) noexcept
Move operator.
void insert_to_vector(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to vector.
void emplace(abstract_message_sink_t &sink_as_key, Args &&... args)
friend void swap(subscriber_adaptive_container_t &a, subscriber_adaptive_container_t &b) noexcept
const_iterator begin() const
const_iterator end() const
vector_type m_vector
Container for small amount of subscriber_infos.
void insert(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
subscriber_adaptive_container_t()
Default constructor.
void insert_to_map(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to map.
subscriber_adaptive_container_t(subscriber_adaptive_container_t &&o) noexcept
Move constructor.
void switch_storage_to_map()
Switching storage from vector to map.
iterator find(abstract_message_sink_t &subscriber)
map_type m_map
Container for large amount of subscriber_infos.
iterator find_in_vector(abstract_message_sink_t &subscriber)
subscriber_adaptive_container_t & operator=(const subscriber_adaptive_container_t &o)
Copy operator.
iterator find_in_map(abstract_message_sink_t &subscriber)
void switch_storage_to_vector()
Switching storage from map to vector.
void erase(const iterator &it)
An information block about one subscription to one message type with presence of message_sink.
void set_filter(const delivery_filter_t &filter)
Set the delivery filter for the subscriber.
void drop_filter()
Drop the delivery filter for the subscriber.
subscription_info_with_sink_t(const delivery_filter_t &filter)
void drop_sink()
Inform about removal of a subscription.
abstract_message_sink_t & sink_reference() const noexcept
Get a reference to the subscribed sink.
abstract_message_sink_t * sink_pointer() const noexcept
Get a pointer to the subscribed sink.
bool empty() const noexcept
void set_sink(abstract_message_sink_t &sink)
Inform about addition of a subscription.
subscription_info_with_sink_t(abstract_message_sink_t &sink)
A template with implementation of local mbox.
mbox_type_t type() const override
Get the type of message box.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
mbox_id_t id() const override
Unique ID of this mbox.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
void do_deliver_message_to_subscriber(const local_mbox_details::subscription_info_with_sink_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
local_mbox_template(mbox_id_t id, environment_t &env, Tracing_Args &&... args)
void modify_and_remove_subscriber_if_needed(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Changer changer)
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
std::string query_name() const override
Get the mbox name.
void insert_or_modify_subscriber(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer)
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
mbox_t create_ordinary_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that handles message limits.
mbox_t create_limitless_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that ignores message limits.
named_mboxes_dictionary_t m_named_mboxes_dictionary
Named mboxes.
mbox_t introduce_named_mbox(mbox_namespace_name_t mbox_namespace, nonempty_name_t mbox_name, const std::function< mbox_t() > &mbox_factory)
Introduce named mbox with user-provided factory.
outliving_reference_t< so_5::msg_tracing::holder_t > m_msg_tracing_stuff
Data related to message delivery tracing.
mbox_core_t(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff)
std::mutex m_dictionary_lock
Named mbox map's lock.
mbox_t create_custom_mbox(environment_t &env, ::so_5::custom_mbox_details::creator_iface_t &creator)
Create a custom mbox.
std::atomic< mbox_id_t > m_mbox_id_counter
A counter for mbox ID generation.
void destroy_mbox(const full_named_mbox_id_t &name) noexcept
Remove a reference to the named mbox.
mchain_t create_mchain(environment_t &env, const mchain_params_t ¶ms)
Create message chain.
mbox_t create_mbox(environment_t &env)
Create local anonymous mbox.
mbox_t create_mbox(environment_t &env, nonempty_name_t mbox_name)
Create local named mbox.
mbox_id_t allocate_mbox_id() noexcept
Allocate an ID for a new custom mbox or mchain.
mbox_core_stats_t query_stats()
Get statistics for run-time monitoring.
A base class for message sinks to be used by agents.
void push_event(mbox_id_t mbox_id, message_delivery_mode_t, const std::type_index &msg_type, const message_ref_t &message, unsigned int, const message_limit::impl::action_msg_tracer_t *tracer) override
Get a message and push it to the appropriate destination.
mpsc_mbox_template_t(mbox_id_t id, environment_t &env, outliving_reference_t< agent_t > owner, Tracing_Args &&... tracing_args)
mbox_type_t type() const override
Get the type of message box.
void modify_and_remove_subscription_if_needed(const std::type_index &msg_type, Info_Changer changer)
Helper for modification and deletion of subscription info.
const mbox_id_t m_id
ID of this mbox.
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
void do_delivery(const std::type_index &msg_type, const message_ref_t &message, typename Tracing_Base::deliver_op_tracer const &tracer, L l)
Helper method to do delivery actions under locked object.
subscriptions_map_t m_subscriptions
Information about the current subscriptions.
default_rw_spinlock_t m_lock
Protection of object from modification.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &) noexcept override
Removes delivery filter for message type and subscriber.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
std::string query_name() const override
Get the mbox name.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
Add the message handler.
mbox_id_t id() const override
Unique ID of this mbox.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
void insert_or_modify_subscription(const std::type_index &msg_type, Info_Maker maker, Info_Changer changer)
Helper for performing insertion or modification of subscription info.
environment_t & m_env
Environment in that the mbox was created.
Base class for a mbox for the case when message delivery tracing is enabled.
mbox_t m_mbox
Actual mbox.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
mbox_id_t id() const override
Unique ID of this mbox.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
named_local_mbox_t(full_named_mbox_id_t full_name, const mbox_t &mbox, impl::mbox_core_t &mbox_core)
mbox_type_t type() const override
Get the type of message box.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
impl::mbox_core_ref_t m_mbox_core
An utility for this mbox.
~named_local_mbox_t() override
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
std::string query_name() const override
Get the mbox name.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
const full_named_mbox_id_t m_name
Mbox name.
intrusive_ptr_t(T *obj) noexcept
Constructor for a raw pointer.
intrusive_ptr_t & operator=(intrusive_ptr_t &&o) noexcept
Move operator.
T & operator*() const noexcept
A class for the name of mbox_namespace.
std::string_view query_name() const noexcept(noexcept(std::string_view{m_name}))
Get the value.
Parameters for message chain.
const mchain_props::capacity_t & capacity() const
Get chain's capacity and related params.
memory_usage_t memory_usage() const
Memory allocation type for size-limited chain.
bool unlimited() const
Is message chain have no size limit?
Implementation of demands queue for size-limited message chain with dynamically allocated storage.
Implementation of demands queue for size-limited message chain with preallocated storage.
Implementation of demands queue for size-unlimited message chain.
A base class for agent messages.
friend message_mutability_t message_mutability(const intrusive_ptr_t< message_t > &what) noexcept
Helper method for safe get of message mutability flag.
Interface of holder of message tracer and message trace filter objects.
virtual bool is_msg_tracing_enabled() const noexcept=0
Is message tracing enabled?
A class for the name which cannot be empty.
std::string giveout_value() noexcept(noexcept(std::string{ std::move(m_nonempty_name) }))
Get the value away from the object.
Helper class for indication of long-lived reference via its type.
outliving_reference_t(outliving_reference_t const &o) noexcept
Scoped guard for shared locks.
read_lock_guard_t(Lock &l)
#define SO_5_THROW_EXCEPTION(error_code, desc)
Some reusable and low-level classes/functions which can be used in public header files.
auto invoke_noexcept_code(L lambda) noexcept -> decltype(lambda())
std::unique_ptr< abstract_message_box_t > make_actual_mbox(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff, A &&... args)
void ensure_sink_for_same_owner(agent_t &actual_owner, abstract_message_sink_t &sink)
Helper the ensures that sink can be used with agent.
Implementation details for MPMC mboxes.
Various helpers for message delivery tracing mechanism.
Details of SObjectizer run-time implementations.
mchain_t make_mchain(outliving_reference_t< so_5::msg_tracing::holder_t > tracer, const mchain_params_t ¶ms, A &&... args)
Helper function for creation of a new mchain with respect to message tracing.
std::string default_global_mbox_namespace()
Helper function that returns name of the default global namespace for named mboxes.
Various properties and parameters of message chains.
memory_usage_t
Memory allocation for storage for size-limited chains.
@ dynamic
Storage can be allocated and deallocated dynamically.
Public part of message delivery tracing mechanism.
Private part of message limit implementation.
const int rc_nullptr_as_result_of_user_mbox_factory
nullptr returned by user-provided mbox factory.
message_delivery_mode_t
Possible modes of message/signal delivery.
mbox_type_t
Type of the message box.
@ multi_producer_single_consumer
@ multi_producer_multi_consumer
message_mutability_t
A enum with variants of message mutability or immutability.
delivery_possibility_t
Result of checking delivery posibility.
const int rc_illegal_subscriber_for_mpsc_mbox
An attempt to create illegal subscription to mpsc_mbox.
const int rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox
An attempt to deliver mutable message via MPMC mbox.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Full name for a named mbox.
full_named_mbox_id_t(std::string mbox_namespace, std::string mbox_name)
Initializing constructor.
A coolection of data required for local mbox implementation.
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(mbox_id_t id, environment_t &env)
environment_t & m_env
Environment for which the mbox is created.
default_rw_spinlock_t m_lock
Object lock.
const mbox_id_t m_id
ID of this mbox.
static constexpr const std::size_t switch_to_vector
static constexpr const std::size_t switch_to_map
Predicate to be used for comparing keys in subscriber map.
bool operator()(abstract_message_sink_t *a, abstract_message_sink_t *b) const noexcept
Predicate to be used for searching information in the vector.
bool operator()(const subscribers_vector_item_t &a, const subscribers_vector_item_t &b) const noexcept
Information about one subscriber to be stored in a vector.
subscription_info_with_sink_t m_info
Information about the subscription.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key)
abstract_message_sink_t * m_sink_as_key
Pointer to sink that has to be used as search key.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
The normal initializing constructor.
Statistics from mbox_core for run-time monitoring.
named_mbox_info_t(mbox_t mbox)
mbox_t m_mbox
Real mbox for that name.
unsigned int m_external_ref_count
Reference count by external mbox_refs.
Base class for a mbox for the case when message delivery tracing is disabled.
An information which is necessary for creation of a new mbox.
mbox_creation_data_t(outliving_reference_t< environment_t > env, mbox_id_t id, outliving_reference_t< msg_tracing::holder_t > tracer)
Initializing constructor.