2
3
6
7
8
15#include <so_5/types.hpp>
16#include <so_5/exception.hpp>
18#include <so_5/spinlocks.hpp>
20#include <so_5/msg_tracing.hpp>
22#include <so_5/mbox.hpp>
23#include <so_5/enveloped_msg.hpp>
25#include <so_5/impl/local_mbox_basic_subscription_info.hpp>
27#include <so_5/impl/msg_tracing_helpers.hpp>
29#include <so_5/details/invoke_noexcept_code.hpp>
44
45
46
47
48
49
50
51
55
56
57
58
59
60
61
66
67
68
86 ,
m_info{ std::move(info) }
91
92
108
109
122 using vector_iterator_type = vector_type::iterator;
123 using const_vector_iterator_type = vector_type::const_iterator;
125 using map_type = std::map<
130 using map_iterator_type = map_type::iterator;
131 using const_map_iterator_type = map_type::const_iterator;
164 ,
m_it_v{ std::move(it_v) }
168 ,
m_it_m{ std::move(it_m) }
183 return std::addressof(
**
this);
226 return !( *
this == o );
240 ,
m_it_v{ std::move(it_v) }
244 ,
m_it_m{ std::move(it_m) }
259 return std::addressof(
**
this);
302 return !( *
this == o );
321 const auto insertion_place = std::lower_bound(
326 m_vector.insert( insertion_place, std::move(new_item) );
335 m_map.emplace( std::addressof(sink_as_key), std::move(info) );
348 vector_type empty_vector;
349 map_type new_storage;
360 swap(
m_map, new_storage );
381 vector_type new_storage;
385 new_storage.reserve(
m_map.size() );
393 [&new_storage](
const map_type::value_type & info ) {
394 new_storage.emplace_back( *(info.first), info.second );
402 swap(
m_map, empty_map );
414 auto pos = std::lower_bound(
501 template<
typename... Args >
575 return m_map.empty();
593
594
595
596
597
615
616
617
618
619
620 using messages_table_t = std::map<
636
637
638
639
640
641
642template<
typename Tracing_Base >
646 ,
private Tracing_Base
649 template<
typename... Tracing_Args >
656 Tracing_Args &&... args )
658 , Tracing_Base{ std::forward< Tracing_Args >(args)... }
669 const std::type_index & type_wrapper,
672 insert_or_modify_subscriber(
687 const std::type_index & type_wrapper,
690 modify_and_remove_subscriber_if_needed(
701 std::ostringstream s;
702 s <<
"<mbox:type=MPMC:id=" <<
m_id <<
">";
716 const std::type_index & msg_type,
717 const message_ref_t & message,
718 unsigned int redirection_deep )
override
720 typename Tracing_Base::deliver_op_tracer tracer{
741 const std::type_index & msg_type,
745 insert_or_modify_subscriber(
760 const std::type_index & msg_type,
763 modify_and_remove_subscriber_if_needed(
778 template<
typename Info_Maker,
typename Info_Changer >
781 const std::type_index & type_wrapper,
784 Info_Changer changer )
786 std::unique_lock< default_rw_spinlock_t > lock(
m_lock );
795 m_subscribers.emplace( type_wrapper, std::move( container ) );
799 auto & sinks = it->second;
801 auto pos = sinks.find( subscriber );
802 if( pos != sinks.end() )
811 sinks.insert( subscriber, maker() );
815 template<
typename Info_Changer >
818 const std::type_index & type_wrapper,
820 Info_Changer changer )
822 std::unique_lock< default_rw_spinlock_t > lock(
m_lock );
827 auto & sinks = it->second;
829 auto pos = sinks.find( subscriber );
830 if( pos != sinks.end() )
848 typename Tracing_Base::deliver_op_tracer
const & tracer,
850 const std::type_index & msg_type,
851 const message_ref_t & message,
852 unsigned int redirection_deep )
859 for(
const auto & a : it->second )
869 tracer.no_subscribers();
875 typename Tracing_Base::deliver_op_tracer
const & tracer,
877 const std::type_index & msg_type,
878 const message_ref_t & message,
879 unsigned int redirection_deep )
const
881 const auto delivery_status =
882 subscriber_info.must_be_delivered(
884 [](
const message_ref_t & m ) ->
message_t & {
896 tracer.overlimit_tracer()
);
899 tracer.message_rejected(
904
905
906
907
908
909
910
911
914 const std::type_index & msg_type,
915 const message_ref_t & what )
const
921 "an attempt to deliver mutable message via MPMC mbox"
922 ", msg_type=" + std::string(msg_type.name()) );
927
928
929
930
931
932using local_mbox_without_tracing =
936
937
938
939
940
941using local_mbox_with_tracing =
Interface for message sink.
static bool special_sink_ptr_compare(const abstract_message_sink_t *a, const abstract_message_sink_t *b) noexcept
virtual void push_event(mbox_id_t mbox_id, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep, const message_limit::impl::action_msg_tracer_t *tracer)=0
Get a message and push it to the appropriate destination.
Interface for creator of new mbox in OOP style.
virtual mbox_t create(const mbox_creation_data_t &data)=0
Creation of custom mbox.
An interface of delivery filter object.
const subscription_info_with_sink_t * operator->() const
const subscription_info_with_sink_t & operator*() const
const_map_iterator_type m_it_m
const_vector_iterator_type m_it_v
const_iterator & operator++()
const_iterator(const_vector_iterator_type it_v)
bool operator!=(const const_iterator &o) const
const_iterator(const_map_iterator_type it_m)
bool operator==(const const_iterator &o) const
const_iterator operator++(int)
iterator(map_iterator_type it_m)
bool operator==(const iterator &o) const
vector_iterator_type m_it_v
iterator(vector_iterator_type it_v)
subscription_info_with_sink_t & operator*()
bool operator!=(const iterator &o) const
subscription_info_with_sink_t * operator->()
A special container for holding subscriber_info objects.
subscriber_adaptive_container_t(const subscriber_adaptive_container_t &o)
Copy constructor.
bool is_vector() const
Is vector used as a storage.
storage_type m_storage
The current storage type to be used by container.
subscriber_adaptive_container_t & operator=(subscriber_adaptive_container_t &&o) noexcept
Move operator.
void insert_to_vector(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to vector.
void emplace(abstract_message_sink_t &sink_as_key, Args &&... args)
friend void swap(subscriber_adaptive_container_t &a, subscriber_adaptive_container_t &b) noexcept
const_iterator begin() const
const_iterator end() const
vector_type m_vector
Container for small amount of subscriber_infos.
void insert(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
subscriber_adaptive_container_t()
Default constructor.
void insert_to_map(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to map.
subscriber_adaptive_container_t(subscriber_adaptive_container_t &&o) noexcept
Move constructor.
void switch_storage_to_map()
Switching storage from vector to map.
iterator find(abstract_message_sink_t &subscriber)
map_type m_map
Container for large amount of subscriber_infos.
iterator find_in_vector(abstract_message_sink_t &subscriber)
subscriber_adaptive_container_t & operator=(const subscriber_adaptive_container_t &o)
Copy operator.
iterator find_in_map(abstract_message_sink_t &subscriber)
void switch_storage_to_vector()
Switching storage from map to vector.
void erase(const iterator &it)
An information block about one subscription to one message type with presence of message_sink.
void set_filter(const delivery_filter_t &filter)
Set the delivery filter for the subscriber.
void drop_filter()
Drop the delivery filter for the subscriber.
subscription_info_with_sink_t(const delivery_filter_t &filter)
void drop_sink()
Inform about removal of a subscription.
abstract_message_sink_t & sink_reference() const noexcept
Get a reference to the subscribed sink.
abstract_message_sink_t * sink_pointer() const noexcept
Get a pointer to the subscribed sink.
void set_sink(abstract_message_sink_t &sink)
Inform about addition of a subscription.
subscription_info_with_sink_t(abstract_message_sink_t &sink)
A template with implementation of local mbox.
mbox_type_t type() const override
Get the type of message box.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
mbox_id_t id() const override
Unique ID of this mbox.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
void do_deliver_message_to_subscriber(const local_mbox_details::subscription_info_with_sink_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
local_mbox_template(mbox_id_t id, environment_t &env, Tracing_Args &&... args)
void modify_and_remove_subscriber_if_needed(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Changer changer)
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
std::string query_name() const override
Get the mbox name.
void insert_or_modify_subscriber(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer)
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
mbox_t create_ordinary_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that handles message limits.
mbox_t create_limitless_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that ignores message limits.
named_mboxes_dictionary_t m_named_mboxes_dictionary
Named mboxes.
mbox_t introduce_named_mbox(mbox_namespace_name_t mbox_namespace, nonempty_name_t mbox_name, const std::function< mbox_t() > &mbox_factory)
Introduce named mbox with user-provided factory.
outliving_reference_t< so_5::msg_tracing::holder_t > m_msg_tracing_stuff
Data related to message delivery tracing.
mbox_core_t(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff)
std::mutex m_dictionary_lock
Named mbox map's lock.
mbox_t create_custom_mbox(environment_t &env, ::so_5::custom_mbox_details::creator_iface_t &creator)
Create a custom mbox.
std::atomic< mbox_id_t > m_mbox_id_counter
A counter for mbox ID generation.
void destroy_mbox(const full_named_mbox_id_t &name) noexcept
Remove a reference to the named mbox.
mchain_t create_mchain(environment_t &env, const mchain_params_t ¶ms)
Create message chain.
mbox_t create_mbox(environment_t &env)
Create local anonymous mbox.
mbox_t create_mbox(environment_t &env, nonempty_name_t mbox_name)
Create local named mbox.
mbox_id_t allocate_mbox_id() noexcept
Allocate an ID for a new custom mbox or mchain.
mbox_core_stats_t query_stats()
Get statistics for run-time monitoring.
Base class for a mbox for the case when message delivery tracing is enabled.
named_local_mbox_t(full_named_mbox_id_t full_name, const mbox_t &mbox, impl::mbox_core_t &mbox_core)
intrusive_ptr_t(T *obj) noexcept
Constructor for a raw pointer.
intrusive_ptr_t & operator=(intrusive_ptr_t &&o) noexcept
Move operator.
T & operator*() const noexcept
A class for the name of mbox_namespace.
std::string_view query_name() const noexcept(noexcept(std::string_view{m_name}))
Get the value.
Parameters for message chain.
const mchain_props::capacity_t & capacity() const
Get chain's capacity and related params.
memory_usage_t memory_usage() const
Memory allocation type for size-limited chain.
bool unlimited() const
Is message chain have no size limit?
Implementation of demands queue for size-limited message chain with dynamically allocated storage.
Implementation of demands queue for size-limited message chain with preallocated storage.
Implementation of demands queue for size-unlimited message chain.
A base class for agent messages.
friend message_mutability_t message_mutability(const intrusive_ptr_t< message_t > &what) noexcept
Helper method for safe get of message mutability flag.
Interface of holder of message tracer and message trace filter objects.
virtual bool is_msg_tracing_enabled() const noexcept=0
Is message tracing enabled?
A class for the name which cannot be empty.
std::string giveout_value() noexcept(noexcept(std::string{ std::move(m_nonempty_name) }))
Get the value away from the object.
Helper class for indication of long-lived reference via its type.
outliving_reference_t(outliving_reference_t const &o) noexcept
Scoped guard for shared locks.
#define SO_5_THROW_EXCEPTION(error_code, desc)
Some reusable and low-level classes/functions which can be used in public header files.
auto invoke_noexcept_code(L lambda) noexcept -> decltype(lambda())
std::unique_ptr< abstract_message_box_t > make_actual_mbox(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff, A &&... args)
Implementation details for MPMC mboxes.
Various helpers for message delivery tracing mechanism.
Details of SObjectizer run-time implementations.
mchain_t make_mchain(outliving_reference_t< so_5::msg_tracing::holder_t > tracer, const mchain_params_t ¶ms, A &&... args)
Helper function for creation of a new mchain with respect to message tracing.
std::string default_global_mbox_namespace()
Helper function that returns name of the default global namespace for named mboxes.
Various properties and parameters of message chains.
memory_usage_t
Memory allocation for storage for size-limited chains.
@ dynamic
Storage can be allocated and deallocated dynamically.
Public part of message delivery tracing mechanism.
Private part of message limit implementation.
const int rc_nullptr_as_result_of_user_mbox_factory
nullptr returned by user-provided mbox factory.
message_delivery_mode_t
Possible modes of message/signal delivery.
mbox_type_t
Type of the message box.
@ multi_producer_multi_consumer
message_mutability_t
A enum with variants of message mutability or immutability.
delivery_possibility_t
Result of checking delivery posibility.
const int rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox
An attempt to deliver mutable message via MPMC mbox.
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Full name for a named mbox.
full_named_mbox_id_t(std::string mbox_namespace, std::string mbox_name)
Initializing constructor.
A coolection of data required for local mbox implementation.
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(mbox_id_t id, environment_t &env)
environment_t & m_env
Environment for which the mbox is created.
default_rw_spinlock_t m_lock
Object lock.
const mbox_id_t m_id
ID of this mbox.
static constexpr const std::size_t switch_to_vector
static constexpr const std::size_t switch_to_map
Predicate to be used for comparing keys in subscriber map.
bool operator()(abstract_message_sink_t *a, abstract_message_sink_t *b) const noexcept
Predicate to be used for searching information in the vector.
bool operator()(const subscribers_vector_item_t &a, const subscribers_vector_item_t &b) const noexcept
Information about one subscriber to be stored in a vector.
subscription_info_with_sink_t m_info
Information about the subscription.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key)
abstract_message_sink_t * m_sink_as_key
Pointer to sink that has to be used as search key.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
The normal initializing constructor.
Statistics from mbox_core for run-time monitoring.
named_mbox_info_t(mbox_t mbox)
mbox_t m_mbox
Real mbox for that name.
unsigned int m_external_ref_count
Reference count by external mbox_refs.
Base class for a mbox for the case when message delivery tracing is disabled.
An information which is necessary for creation of a new mbox.
mbox_creation_data_t(outliving_reference_t< environment_t > env, mbox_id_t id, outliving_reference_t< msg_tracing::holder_t > tracer)
Initializing constructor.