264 mbox_t notification_mbox,
266 mbox_type_t mbox_type,
268 Tracing_Args &&... args )
269 : Tracing_Base{ std::forward< Tracing_Args >(args)... }
270 , m_data{ env, id, std::move(notification_mbox), mbox_type }
273 if constexpr( is_mutable_message< Msg_Type >::value )
277 case mbox_type_t::multi_producer_multi_consumer:
278 SO_5_THROW_EXCEPTION(
279 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
280 "an attempt to make MPMC mbox for mutable message, "
281 "msg_type=" + std::string(
typeid(Msg_Type).name()) );
284 case mbox_type_t::multi_producer_single_consumer:
298 const std::type_index & msg_type,
299 abstract_message_sink_t & subscriber )
override
301 ensure_expected_msg_type(
303 "an attempt to subscribe with different message type" );
305 insert_or_modify_subscriber(
308 return subscriber_info_t{
309 subscriber_info_t::subscription_present_t{}
312 []( subscriber_info_t & info ) {
313 info.subscription_defined();
316 this->m_data.m_subscriptions_count += 1u;
322 const std::type_index & msg_type,
323 abstract_message_sink_t & subscriber )
noexcept override
327 if( msg_type ==
typeid(Msg_Type) )
329 modify_and_remove_subscriber_if_needed(
331 []( subscriber_info_t & info ) {
332 info.subscription_dropped();
335 this->m_data.m_subscriptions_count -= 1u;
370 message_delivery_mode_t delivery_mode,
371 const std::type_index & msg_type,
372 const message_ref_t & message,
373 unsigned int redirection_deep )
override
375 ensure_expected_msg_type(
377 "an attempt to deliver with different message type" );
379 typename Tracing_Base::deliver_op_tracer tracer{
384 msg_type, message, redirection_deep };
392 do_deliver_message_impl(
402 const std::type_index & msg_type,
403 const delivery_filter_t & filter,
404 abstract_message_sink_t & subscriber )
override
406 ensure_expected_msg_type(
408 "an attempt to set delivery_filter with "
409 "different message type" );
411 insert_or_modify_subscriber(
414 return subscriber_info_t{ filter };
416 [&filter]( subscriber_info_t & info ) {
417 info.set_filter( filter );
424 const std::type_index & msg_type,
425 abstract_message_sink_t & subscriber )
noexcept override
427 ensure_expected_msg_type(
429 "an attempt to drop delivery_filter with "
430 "different message type" );
432 modify_and_remove_subscriber_if_needed(
434 []( subscriber_info_t & info ) {
473 abstract_message_sink_t & subscriber,
475 Info_Changer changer,
476 Post_Action post_action )
478 std::lock_guard< Lock_Type > lock( m_lock );
480 auto it_subscriber =
this->m_data.m_subscribers.find(
481 std::addressof(subscriber) );
482 if( it_subscriber ==
this->m_data.m_subscribers.end() )
486 ensure_new_item_can_be_added_to_subscribers();
488 this->m_data.m_subscribers.emplace(
489 std::addressof(subscriber), maker() );
493 changer( it_subscriber->second );
496 so_5::details::invoke_noexcept_code( [
this, &post_action]()
500 const auto old_subscribers_count =
501 this->m_data.m_subscriptions_count;
504 if( old_subscribers_count <
this->m_data.m_subscriptions_count &&
505 1u ==
this->m_data.m_subscriptions_count )
508 so_5::send< msg_first_subscriber >(
509 this->m_data.m_notification_mbox );
519 abstract_message_sink_t & subscriber,
520 Info_Changer changer,
521 Post_Action post_action )
523 std::lock_guard< Lock_Type > lock( m_lock );
525 auto it_subscriber =
this->m_data.m_subscribers.find(
526 std::addressof(subscriber) );
527 if( it_subscriber !=
this->m_data.m_subscribers.end() )
530 changer( it_subscriber->second );
534 if( it_subscriber->second.empty() )
535 this->m_data.m_subscribers.erase( it_subscriber );
538 so_5::details::invoke_noexcept_code( [
this, &post_action]()
543 const auto old_subscribers_count =
544 this->m_data.m_subscriptions_count;
547 if( old_subscribers_count >
this->m_data.m_subscriptions_count &&
548 0u ==
this->m_data.m_subscriptions_count )
551 so_5::send< msg_last_subscriber >(
552 this->m_data.m_notification_mbox );
560 typename Tracing_Base::deliver_op_tracer
const & tracer,
561 message_delivery_mode_t delivery_mode,
562 const std::type_index & msg_type,
563 const message_ref_t & message,
564 unsigned int redirection_deep )
566 std::lock_guard< Lock_Type > lock( m_lock );
568 auto & subscribers =
this->m_data.m_subscribers;
569 if( !subscribers.empty() )
570 for(
const auto & kv : subscribers )
571 do_deliver_message_to_subscriber(
580 tracer.no_subscribers();
585 abstract_message_sink_t & subscriber,
586 const subscriber_info_t & subscriber_info,
587 typename Tracing_Base::deliver_op_tracer
const & tracer,
588 message_delivery_mode_t delivery_mode,
589 const std::type_index & msg_type,
590 const message_ref_t & message,
591 unsigned int redirection_deep )
const
593 const auto delivery_status =
594 subscriber_info.must_be_delivered(
597 [](
const message_ref_t & msg ) -> message_t & {
601 if( delivery_possibility_t::must_be_delivered == delivery_status )
603 using namespace so_5::message_limit::impl;
605 subscriber.push_event(
611 tracer.overlimit_tracer() );
614 tracer.message_rejected(
615 std::addressof(subscriber), delivery_status );