SObjectizer-5 Extra
first_last_subscriber_notification.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of mbox that informs about the first and the
4  * last subscriptions.
5  *
6  * \since v.1.5.2
7  */
8 
9 #pragma once
10 
11 #include <so_5/version.hpp>
12 
13 #if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 7u, 4u)
14 #error "SObjectizer-5.7.4 of newest is required"
15 #endif
16 
17 #include <so_5_extra/error_ranges.hpp>
18 
19 #include <so_5/impl/agent_ptr_compare.hpp>
20 #include <so_5/impl/message_limit_internals.hpp>
21 #include <so_5/impl/msg_tracing_helpers.hpp>
22 #include <so_5/impl/local_mbox_basic_subscription_info.hpp>
23 
24 #include <so_5/details/sync_helpers.hpp>
25 #include <so_5/details/invoke_noexcept_code.hpp>
26 
27 #include <so_5/mbox.hpp>
28 #include <so_5/send_functions.hpp>
29 
30 #include <string_view>
31 
32 namespace so_5 {
33 
34 namespace extra {
35 
36 namespace mboxes {
37 
39 
40 namespace errors {
41 
42 /*!
43  * \brief An attempt to use a message type that differs from mbox's message
44  * type.
45  *
46  * Type of message to be used with first_last_subscriber_notification_mbox
47  * is fixed as part of first_last_subscriber_notification_mbox type.
48  * An attempt to use different message type (for subscription, delivery or
49  * setting delivery filter) will lead to an exception with this error code.
50  *
51  * \since v.1.5.2
52  */
55 
56 /*!
57  * \brief An attempt to add a new subscriber for MPSC mbox when another
58  * subscriber already exists.
59  *
60  * When an instance of first_last_subscriber_notification_mbox is created as
61  * MPSC mbox then only one subscriber can be added. An attempt to add another
62  * subscriber will lead to this error.
63  *
64  * \since v.1.5.2
65  */
68 
69 } /* namespace errors */
70 
71 /*!
72  * \brief Signal to be sent when the first subscriber arrives.
73  *
74  * Usage example:
75  * \code
76  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
77  *
78  * class my_producer final : public so_5::agent_t
79  * {
80  * public:
81  * // Message with published data.
82  * struct msg_data final : public so_5::message_t {...};
83  *
84  * private:
85  * state_t st_no_consumers{ this };
86  * state_t st_consumers_waiting{ this };
87  * ...
88  * const so_5::mbox_t publishing_mbox_;
89  * ...
90  * public:
91  * my_producer( context_t ctx )
92  * : so_5::agent_t{ std::move(ctx) }
93  * // New mbox for publishing produced data has to be created.
94  * , publishing_mbox_{ mbox_ns::make<msg_data>(
95  * so_environment(),
96  * // agent's direct mbox as the target for notifications.
97  * so_direct_mbox(),
98  * so_5::mbox_type_t::multi_producer_multi_consumer )
99  * }
100  * {...}
101  *
102  * void so_define_agent() override
103  * {
104  * st_consumers_waiting
105  * .on_enter{ turn_data_acquisition_on(); }
106  * .on_exit{ turn_data_acquisition_off(); }
107  * .event( &my_producer::evt_last_subscriber )
108  * ;
109  *
110  * st_no_consumers
111  * .event( &my_producer::evt_first_subscriber )
112  * ;
113  * ...
114  *
115  * st_no_consumers.activate();
116  * }
117  *
118  * ...
119  * private:
120  * void evt_first_subscriber( mhood_t< msg_first_subscriber > )
121  * {
122  * st_consumers_waiting.activate();
123  * }
124  *
125  * void evt_last_subscriber( mhood_t< msg_last_subscriber > )
126  * {
127  * st_no_consumers.activate();
128  * }
129  * ...
130  * };
131  * \endcode
132  *
133  * \since v.1.5.2
134  */
135 struct msg_first_subscriber final : public so_5::signal_t {};
136 
137 /*!
138  * \brief Signal to be sent when the last subscriber gone.
139  *
140  * See msg_first_subscriber for usage example.
141  *
142  * \since v.1.5.2
143  */
144 struct msg_last_subscriber final : public so_5::signal_t {};
145 
146 namespace details {
147 
148 /*!
149  * \brief An information block about one subscriber.
150  *
151  * \since v.1.5.2
152  */
154 
155 //
156 // template_independent_mbox_data_t
157 //
158 /*!
159  * \brief A mixin with actual data which is necessary for implementation
160  * of actual mbox.
161  *
162  * This data type doesn't depend on any template parameters.
163  *
164  * \since v.1.5.2
165  */
167  {
168  //! A special coparator for agents with respect to
169  //! agent's priority.
171  {
172  bool operator()( agent_t * a, agent_t * b ) const noexcept
173  {
174  return ::so_5::impl::special_agent_ptr_compare( *a, *b );
175  }
176  };
177 
178  //! Type of subscribers map.
179  using subscribers_map_t =
181 
182  //! SObjectizer Environment to work in.
184 
185  //! ID of the mbox.
187 
188  //! Mbox for notifications about the first/last subscribers.
190 
191  //! Type of this mbox (MPMC or MPSC).
193 
194  //! Subscribers.
195  /*!
196  * Can be empty.
197  */
199 
200  //! Number of actual subscriptions.
201  /*!
202  * \note
203  * There could be cases when a delivery filter is set, but subscription
204  * isn't made yet. Such cases shouldn't be treated as subscriptions.
205  * So we have to store the number of actual subscriptions separately
206  * and don't rely on the size of m_subscribers.
207  */
209 
211  environment_t & env,
212  mbox_id_t id,
213  mbox_t notification_mbox,
214  mbox_type_t mbox_type )
215  : m_env{ env }
216  , m_id{ id }
219  {}
220  };
221 
222 //
223 // actual_mbox_t
224 //
225 
226 /*!
227  * \brief An actual implementation of first/last subscriber message mbox.
228  *
229  * \tparam Msg_Type type of message to be used with this mbox.
230  *
231  * \tparam Lock_Type type of lock object to be used for thread safety.
232  *
233  * \tparam Tracing_Base base class with implementation of message
234  * delivery tracing methods.
235  *
236  * \since v.1.5.2
237  */
238 template<
239  typename Msg_Type,
240  typename Lock_Type,
241  typename Tracing_Base >
243  : public abstract_message_box_t
244  , private Tracing_Base
245  {
246  public:
247  /*!
248  * \brief Initializing constructor.
249  *
250  * \tparam Tracing_Args parameters for Tracing_Base constructor
251  * (can be empty list if Tracing_Base have only the default constructor).
252  */
253  template< typename... Tracing_Args >
255  //! SObjectizer Environment to work in.
256  environment_t & env,
257  //! ID of this mbox.
258  mbox_id_t id,
259  //! Mbox for notifications about the first/last subscriber.
260  mbox_t notification_mbox,
261  //! Type of this mbox (MPSC or MPMC).
262  mbox_type_t mbox_type,
263  //! Optional parameters for Tracing_Base's constructor.
264  Tracing_Args &&... args )
267  {
268  // Use of mutable message type for MPMC mbox should be prohibited.
269  if constexpr( is_mutable_message< Msg_Type >::value )
270  {
271  switch( mbox_type )
272  {
276  "an attempt to make MPMC mbox for mutable message, "
277  "msg_type=" + std::string(typeid(Msg_Type).name()) );
278  break;
279 
281  break;
282  }
283  }
284  }
285 
286  mbox_id_t
287  id() const override
288  {
289  return this->m_data.m_id;
290  }
291 
292  void
294  const std::type_index & msg_type,
295  const so_5::message_limit::control_block_t * limit,
296  agent_t & subscriber ) override
297  {
299  msg_type,
300  "an attempt to subscribe with different message type" );
301 
303  subscriber,
304  [limit] {
305  return subscriber_info_t{ limit };
306  },
307  [limit]( subscriber_info_t & info ) {
308  info.set_limit( limit );
309  },
310  [this]() {
311  this->m_data.m_subscriptions_count += 1u;
312  } );
313  }
314 
315  void
317  const std::type_index & msg_type,
318  agent_t & subscriber ) override
319  {
321  msg_type,
322  "an attempt to unsubscribe with different message type" );
323 
325  subscriber,
326  []( subscriber_info_t & info ) {
327  info.drop_limit();
328  },
329  [this]() {
330  this->m_data.m_subscriptions_count -= 1u;
331  } );
332  }
333 
334  std::string
335  query_name() const override
336  {
338  s << "<mbox:type=FIRST_LAST_SUBSCR_NOTIFY";
339 
340  switch( this->m_data.m_mbox_type )
341  {
343  s << "(MPMC)";
344  break;
345 
347  s << "(MPSC)";
348  break;
349  }
350 
351  s << ":id=" << this->m_data.m_id << ">";
352 
353  return s.str();
354  }
355 
357  type() const override
358  {
359  return this->m_data.m_mbox_type;
360  }
361 
362  void
364  const std::type_index & msg_type,
365  const message_ref_t & message,
366  unsigned int overlimit_reaction_deep ) override
367  {
369  msg_type,
370  "an attempt to deliver with different message type" );
371 
373  *this, // as Tracing_base
374  *this, // as abstract_message_box_t
375  "deliver_message",
377 
378  // NOTE: we don't check for message mutability because
379  // it's impossible to create MPMC mbox for mutable message.
380  // If MPMC mbox was created for immutable message, but a user
381  // tries to send a mutable message then it will be a message
382  // of a different type and the corresponding exception will
383  // be thrown earlier.
385  tracer,
386  msg_type,
387  message,
389  }
390 
391  void
393  const std::type_index & msg_type,
394  const delivery_filter_t & filter,
395  agent_t & subscriber ) override
396  {
398  msg_type,
399  "an attempt to set delivery_filter with "
400  "different message type" );
401 
403  subscriber,
404  [flt = &filter] {
405  return subscriber_info_t{ flt };
406  },
407  [flt = &filter]( subscriber_info_t & info ) {
408  info.set_filter( *flt );
409  },
410  []() { /* nothing to do */ } );
411  }
412 
413  void
415  const std::type_index & msg_type,
416  agent_t & subscriber ) noexcept override
417  {
419  msg_type,
420  "an attempt to drop delivery_filter with "
421  "different message type" );
422 
424  subscriber,
425  []( subscriber_info_t & info ) {
426  info.drop_filter();
427  },
428  []() { /* nothing to do */ } );
429  }
430 
432  environment() const noexcept override
433  {
434  return this->m_data.m_env;
435  }
436 
437  private :
438  //! Data of this message mbox.
440 
441  //! Object lock.
442  Lock_Type m_lock;
443 
444  /*!
445  * Throws an error if msg_type differs from Config::msg_type.
446  */
447  static void
449  const std::type_index & msg_type,
450  std::string_view error_description )
451  {
452  if( msg_type != typeid(Msg_Type) )
455  //FIXME: we have to create std::string object because
456  //so_5::exception_t::raise expects std::string.
457  //This should be fixed after resolving:
458  //https://github.com/Stiffstream/sobjectizer/issues/46
460  }
461 
462  template<
463  typename Info_Maker,
464  typename Info_Changer,
465  typename Post_Action >
466  void
468  agent_t & subscriber,
469  Info_Maker maker,
470  Info_Changer changer,
471  Post_Action post_action )
472  {
474 
476  if( it_subscriber == this->m_data.m_subscribers.end() )
477  {
478  // There is no subscriber yet. It must be added if
479  // it's possible.
481 
483  &subscriber, maker() );
484  }
485  else
486  // Subscriber is known. It must be updated.
488 
489  // All following actions shouldn't throw.
491  {
492  // post_action can increment number of actual subscribers so
493  // we have to store the old value before calling post_action.
494  const auto old_subscribers_count =
496  post_action();
497 
499  1u == this->m_data.m_subscriptions_count )
500  {
501  // We've got the first subscriber.
503  this->m_data.m_notification_mbox );
504  }
505  } );
506  }
507 
508  template<
509  typename Info_Changer,
510  typename Post_Action >
511  void
513  agent_t & subscriber,
514  Info_Changer changer,
515  Post_Action post_action )
516  {
518 
519  auto it_subscriber = this->m_data.m_subscribers.find(
520  &subscriber );
521  if( it_subscriber != this->m_data.m_subscribers.end() )
522  {
523  // Subscriber is found and must be modified.
525 
526  // If info about subscriber becomes empty after
527  // modification then subscriber info must be removed.
528  if( it_subscriber->second.empty() )
530 
531  // All following actions shouldn't throw.
533  {
534  // post_action can increment number of actual
535  // subscribers so we have to store the old value before
536  // calling post_action.
537  const auto old_subscribers_count =
539  post_action();
540 
542  0u == this->m_data.m_subscriptions_count )
543  {
544  // We've lost the last subscriber.
546  this->m_data.m_notification_mbox );
547  }
548  } );
549  }
550  }
551 
552  void
554  typename Tracing_Base::deliver_op_tracer const & tracer,
555  const std::type_index & msg_type,
556  const message_ref_t & message,
557  unsigned int overlimit_reaction_deep )
558  {
560 
561  auto & subscribers = this->m_data.m_subscribers;
562  if( !subscribers.empty() )
563  for( const auto & kv : subscribers )
565  *(kv.first),
566  kv.second,
567  tracer,
568  msg_type,
569  message,
571  else
573  }
574 
575  void
577  agent_t & subscriber,
578  const subscriber_info_t & subscriber_info,
579  typename Tracing_Base::deliver_op_tracer const & tracer,
580  const std::type_index & msg_type,
581  const message_ref_t & message,
582  unsigned int overlimit_reaction_deep ) const
583  {
584  const auto delivery_status =
586  subscriber,
587  message,
588  []( const message_ref_t & msg ) -> message_t & {
589  return *msg;
590  } );
591 
593  {
594  using namespace so_5::message_limit::impl;
595 
597  this->m_data.m_id,
598  subscriber,
600  msg_type,
601  message,
604  [&] {
606 
608  subscriber,
610  this->m_data.m_id,
611  msg_type,
612  message );
613  } );
614  }
615  else
618  }
619 
620  void
622  {
623  // If this mbox is MPSC mbox then a new item can be
624  // added to subscribers container only if that container
625  // is empty.
626  // This is true even if new item will hold only delivery_filter,
627  // but not a subscription. It's because there is no sense
628  // to have a delivery_filter for MPSC mbox without having
629  // a subscription.
631  this->m_data.m_mbox_type) &&
632  !this->m_data.m_subscribers.empty() )
633  {
636  "subscriber already exists for MPSC mbox, new "
637  "subscriber can't be added" );
638  }
639  }
640  };
641 
642 } /* namespace details */
643 
644 //
645 // make_mbox
646 //
647 /*!
648  * \brief Create an instance of first_last_subscriber_notification mbox.
649  *
650  * Usage examples:
651  *
652  * Create a MPMC mbox with std::mutex as Lock_Type (this mbox can safely be
653  * used in multi-threaded environments):
654  * \code
655  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
656  * so_5::environment_t & env = ...;
657  * auto notification_mbox = env.create_mbox();
658  * auto mbox = mbox_ns::make_mbox<my_message>(
659  * env,
660  * notification_mbox,
661  * so_5::mbox_type_t::multi_producer_multi_consumer);
662  * \endcode
663  *
664  * Create a MPSC mbox with std::mutex as Lock_Type (this mbox can safely be
665  * used in multi-threaded environments):
666  * \code
667  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
668  * so_5::environment_t & env = ...;
669  * auto notification_mbox = env.create_mbox();
670  * auto mbox = mbox_ns::make_mbox<my_message>(
671  * env,
672  * notification_mbox,
673  * so_5::mbox_type_t::multi_producer_single_consumer);
674  * \endcode
675  *
676  * Create a MPMC mbox with so_5::null_mutex_t as Lock_Type (this mbox can only
677  * be used in single-threaded environments):
678  * \code
679  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
680  * so_5::environment_t & env = ...;
681  * auto notification_mbox = env.create_mbox();
682  * auto mbox = mbox_ns::make_mbox<my_message, so_5::null_mutex_t>(
683  * env,
684  * notification_mbox,
685  * so_5::mbox_type_t::multi_producer_multi_consumer);
686  * \endcode
687  *
688  * \attention
689  * This type of mbox terminates the whole application if an attempt
690  * to send a notification (in form of msg_first_subscriber and msg_last_subscriber
691  * signals) throws.
692  *
693  * \tparam Msg_Type type of message to be used with a new mbox.
694  *
695  * \tparam Lock_Type type of lock to be used for thread safety. It can be
696  * std::mutex or so_5::null_mutex_t (or any other type which can be used
697  * with std::lock_quard).
698  *
699  * \since v.1.5.2
700  */
701 template<
702  typename Msg_Type,
703  typename Lock_Type = std::mutex >
704 [[nodiscard]]
705 mbox_t
707  //! SObjectizer Environment to work in.
708  environment_t & env,
709  //! Mbox for notifications about the first/last subscriber.
711  //! Type of this mbox (MPSC or MPMC).
713  {
714  return env.make_custom_mbox(
716  {
717  mbox_t result;
718 
720  {
721  using T = details::actual_mbox_t<
722  Msg_Type,
723  Lock_Type,
725 
726  result = mbox_t{ new T{
727  data.m_env.get(),
728  data.m_id,
730  mbox_type,
731  data.m_tracer.get()
732  } };
733  }
734  else
735  {
736  using T = details::actual_mbox_t<
737  Msg_Type,
738  Lock_Type,
740  result = mbox_t{ new T{
741  data.m_env.get(),
742  data.m_id,
744  mbox_type
745  } };
746  }
747 
748  return result;
749  } );
750  }
751 
752 //
753 // make_multi_consumer_mbox
754 //
755 /*!
756  * \brief Create an instance of first_last_subscriber_notification MPMC mbox.
757  *
758  * Usage examples:
759  *
760  * Create a MPMC mbox with std::mutex as Lock_Type (this mbox can safely be
761  * used in multi-threaded environments):
762  * \code
763  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
764  * so_5::environment_t & env = ...;
765  * auto notification_mbox = env.create_mbox();
766  * auto mbox = mbox_ns::make_multi_consumer_mbox<my_message>(
767  * env,
768  * notification_mbox);
769  * \endcode
770  *
771  * \note
772  * It's just a thin wrapper around make_mbox() template function.
773  *
774  * \sa make_mbox
775  *
776  * \since v.1.5.2
777  */
778 template<
779  typename Msg_Type,
780  typename Lock_Type = std::mutex >
781 [[nodiscard]]
782 mbox_t
784  //! SObjectizer Environment to work in.
785  environment_t & env,
786  //! Mbox for notifications about the first/last subscriber.
788 {
789  return make_mbox< Msg_Type, Lock_Type >(
790  env,
793 }
794 
795 //
796 // make_single_consumer_mbox
797 //
798 /*!
799  * \brief Create an instance of first_last_subscriber_notification MPSC mbox.
800  *
801  * Usage examples:
802  *
803  * Create a MPSC mbox with std::mutex as Lock_Type (this mbox can safely be
804  * used in multi-threaded environments):
805  * \code
806  * namespace mbox_ns = so_5::extra::mboxes::first_last_subscriber_notification;
807  * so_5::environment_t & env = ...;
808  * auto notification_mbox = env.create_mbox();
809  * auto mbox = mbox_ns::make_single_consumer_mbox<my_message>(
810  * env,
811  * notification_mbox);
812  * \endcode
813  *
814  * \note
815  * It's just a thin wrapper around make_mbox() template function.
816  *
817  * \sa make_mbox
818  *
819  * \since v.1.5.2
820  */
821 template<
822  typename Msg_Type,
823  typename Lock_Type = std::mutex >
824 [[nodiscard]]
825 mbox_t
827  //! SObjectizer Environment to work in.
828  environment_t & env,
829  //! Mbox for notifications about the first/last subscriber.
831 {
832  return make_mbox< Msg_Type, Lock_Type >(
833  env,
836 }
837 
838 } /* namespace first_last_subscriber_notification */
839 
840 } /* namespace mboxes */
841 
842 } /* namespace extra */
843 
844 } /* namespace so_5 */
void insert_or_modify_subscriber(agent_t &subscriber, Info_Maker maker, Info_Changer changer, Post_Action post_action)
mbox_t make_multi_consumer_mbox(environment_t &env, mbox_t notification_mbox)
Create an instance of first_last_subscriber_notification MPMC mbox.
const int rc_subscriber_already_exists_for_mpsc_mbox
An attempt to add a new subscriber for MPSC mbox when another subscriber already exists.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
Ranges for error codes of each submodules.
Definition: details.hpp:13
void do_deliver_message_to_subscriber(agent_t &subscriber, const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const
void drop_delivery_filter(const std::type_index &msg_type, agent_t &subscriber) noexcept override
mbox_t make_mbox(environment_t &env, mbox_t notification_mbox, mbox_type_t mbox_type)
Create an instance of first_last_subscriber_notification mbox.
void modify_and_remove_subscriber_if_needed(agent_t &subscriber, Info_Changer changer, Post_Action post_action)
const int rc_different_message_type
An attempt to use a message type that differs from mbox&#39;s message type.
void subscribe_event_handler(const std::type_index &msg_type, const so_5::message_limit::control_block_t *limit, agent_t &subscriber) override
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, agent_t &subscriber) override
void unsubscribe_event_handlers(const std::type_index &msg_type, agent_t &subscriber) override
mbox_t make_single_consumer_mbox(environment_t &env, mbox_t notification_mbox)
Create an instance of first_last_subscriber_notification MPSC mbox.
actual_mbox_t(environment_t &env, mbox_id_t id, mbox_t notification_mbox, mbox_type_t mbox_type, Tracing_Args &&... args)
Initializing constructor.
void do_deliver_message(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) override
static void ensure_expected_msg_type(const std::type_index &msg_type, std::string_view error_description)
template_independent_mbox_data_t(environment_t &env, mbox_id_t id, mbox_t notification_mbox, mbox_type_t mbox_type)