238 const std::type_index & msg_type,
239 abstract_message_sink_t & subscriber )
override
241 insert_or_modify_subscriber(
245 return subscriber_info_t{
246 subscriber_info_t::subscription_present_t{}
249 [&]( subscriber_info_t & info ) {
250 info.subscription_defined();
256 const std::type_index & msg_type,
257 abstract_message_sink_t & subscriber )
noexcept override
259 modify_and_remove_subscriber_if_needed(
262 []( subscriber_info_t & info ) {
263 info.subscription_dropped();
284 message_delivery_mode_t delivery_mode,
285 const std::type_index & msg_type,
286 const message_ref_t & message,
287 unsigned int redirection_deep )
override
289 typename Tracing_Base::deliver_op_tracer tracer{
294 msg_type, message, redirection_deep };
296 ensure_immutable_message( msg_type, message );
298 do_deliver_message_impl(
308 const std::type_index & msg_type,
309 const delivery_filter_t & filter,
310 abstract_message_sink_t & subscriber )
override
312 insert_or_modify_subscriber(
316 return subscriber_info_t{ filter };
318 [&]( subscriber_info_t & info ) {
319 info.set_filter( filter );
352 const std::type_index & msg_type,
353 abstract_message_sink_t & subscriber,
355 Info_Changer changer )
357 std::lock_guard< lock_t<Config> > lock( m_lock );
361 auto & table_item =
this->m_data.m_messages_table[ msg_type ];
363 auto it_subscriber = table_item.m_subscribers.find(
364 std::addressof(subscriber) );
365 if( it_subscriber == table_item.m_subscribers.end() )
367 it_subscriber = table_item.m_subscribers.emplace(
368 std::addressof(subscriber), maker() ).first;
371 changer( it_subscriber->second );
376 if( table_item.m_retained_msg )
377 try_deliver_retained_message_to(
379 table_item.m_retained_msg,
381 it_subscriber->second );
387 const std::type_index & msg_type,
388 abstract_message_sink_t & subscriber,
389 Info_Changer changer )
391 std::lock_guard< lock_t<Config> > lock( m_lock );
393 auto it_table_item =
this->m_data.m_messages_table.find( msg_type );
394 if( it_table_item !=
this->m_data.m_messages_table.end() )
396 auto & table_item = it_table_item->second;
398 auto it_subscriber = table_item.m_subscribers.find(
399 std::addressof(subscriber) );
400 if( it_subscriber != table_item.m_subscribers.end() )
403 changer( it_subscriber->second );
407 if( it_subscriber->second.empty() )
408 table_item.m_subscribers.erase( it_subscriber );
415 typename Tracing_Base::deliver_op_tracer
const & tracer,
416 message_delivery_mode_t delivery_mode,
417 const std::type_index & msg_type,
418 const message_ref_t & message,
419 unsigned int redirection_deep )
421 std::lock_guard< lock_t<Config> > lock( m_lock );
425 auto & table_item =
this->m_data.m_messages_table[ msg_type ];
428 table_item.m_retained_msg = message;
430 auto & subscribers = table_item.m_subscribers;
431 if( !subscribers.empty() )
432 for(
const auto & kv : subscribers )
433 do_deliver_message_to_subscriber(
442 tracer.no_subscribers();
447 abstract_message_sink_t & subscriber,
448 const subscriber_info_t & subscriber_info,
449 typename Tracing_Base::deliver_op_tracer
const & tracer,
450 message_delivery_mode_t delivery_mode,
451 const std::type_index & msg_type,
452 const message_ref_t & message,
453 unsigned int redirection_deep )
const
455 const auto delivery_status =
456 subscriber_info.must_be_delivered(
459 [](
const message_ref_t & msg ) -> message_t & {
463 if( delivery_possibility_t::must_be_delivered == delivery_status )
465 using namespace so_5::message_limit::impl;
467 subscriber.push_event(
473 tracer.overlimit_tracer() );
476 tracer.message_rejected(
477 std::addressof(subscriber), delivery_status );
487 const std::type_index & msg_type,
488 const message_ref_t & retained_msg,
489 abstract_message_sink_t & subscriber,
490 const subscriber_info_t & subscriber_info )
494 const unsigned int redirection_deep = 0;
496 typename Tracing_Base::deliver_op_tracer tracer{
499 "deliver_message_on_subscription",
500 message_delivery_mode_t::ordinary,
505 do_deliver_message_to_subscriber(
509 message_delivery_mode_t::ordinary,
524 const std::type_index & msg_type,
525 const message_ref_t & what )
const
527 if( message_mutability_t::immutable_message !=
528 message_mutability( what ) )
529 SO_5_THROW_EXCEPTION(
530 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
531 "an attempt to deliver mutable message via MPMC mbox"
532 ", msg_type=" + std::string(msg_type.name()) );