SObjectizer-5 Extra
retained_msg.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of mbox which holds last sent message.
4  *
5  * \since
6  * v.1.0.3
7  */
8 
9 #pragma once
10 
11 #include <so_5_extra/error_ranges.hpp>
12 
13 #include <so_5/impl/agent_ptr_compare.hpp>
14 #include <so_5/impl/message_limit_internals.hpp>
15 #include <so_5/impl/msg_tracing_helpers.hpp>
16 
17 #include <so_5/details/sync_helpers.hpp>
18 
19 #include <so_5/mbox.hpp>
20 
21 #include <memory>
22 
23 namespace so_5 {
24 
25 namespace extra {
26 
27 namespace mboxes {
28 
29 namespace retained_msg {
30 
31 namespace errors {
32 
33 /*!
34  * \brief An attempt perform service request via retained message mbox.
35  *
36  * \since
37  * v.1.0.3
38  */
41 
42 
43 } /* namespace errors */
44 
45 namespace details {
46 
47 /*!
48  * \brief A helper type which is a collection of type parameters.
49  *
50  * This type is used to simplify code of last_msg_mbox internals.
51  * Instead of writting something like:
52  * \code
53  * template< typename Traits >
54  * class ... {...};
55  *
56  * template< typename Traits, typename Lock_Type >
57  * class ... {...};
58  * \endcode
59  * this config_type allows to write like that:
60  * \code
61  * template< typename Config_Type >
62  * class ... {...};
63  *
64  * template< typename Config_Type >
65  * class ... {...};
66  * \endcode
67  *
68  * \tparam Traits traits type to be used.
69  *
70  * \tparam Lock_Type type of object to be used for thread-safety (like
71  * std::mutex or so_5::null_mutex_t).
72  *
73  * \since
74  * v.1.0.3
75  */
76 template<
77  typename Traits,
78  typename Lock_Type >
80  {
81  using traits_type = Traits;
82  using lock_type = Lock_Type;
83  };
84 
85 /*!
86  * \name Type extractors for config_type
87  * \{
88  */
89 template< typename Config_Type >
90 using traits_t = typename Config_Type::traits_type;
91 
92 template< typename Config_Type >
93 using lock_t = typename Config_Type::lock_type;
94 /*!
95  * \}
96  */
97 
98 /*!
99  * \brief An information block about one subscriber.
100  *
101  * \since
102  * v.1.0.3
103  */
105 {
106  /*!
107  * \brief Current status of the subscriber.
108  */
109  enum class state_t
110  {
111  nothing,
113  only_filter,
115  };
116 
117  //! Optional message limit for that subscriber.
119 
120  /*!
121  * \brief Delivery filter for that message for that subscriber.
122  */
124 
125  /*!
126  * \brief Current state of the subscriber parameters.
127  */
129 
130 public :
131  //! Constructor for the case when subscriber info is being
132  //! created during event subscription.
134  const so_5::message_limit::control_block_t * limit )
135  : m_limit( limit )
136  , m_filter( nullptr )
138  {}
139 
140  //! Constructor for the case when subscriber info is being
141  //! created during event subscription.
143  const delivery_filter_t * filter )
144  : m_limit( nullptr )
145  , m_filter( filter )
147  {}
148 
149  bool
150  empty() const noexcept
151  {
152  return state_t::nothing == m_state;
153  }
154 
156  limit() const noexcept
157  {
158  return m_limit;
159  }
160 
161  //! Set the message limit for the subscriber.
162  /*!
163  * Setting the message limit means that there are subscriptions
164  * for the agent.
165  *
166  * \note The message limit can be nullptr.
167  */
168  void
169  set_limit( const message_limit::control_block_t * limit ) noexcept
170  {
171  m_limit = limit;
172 
176  }
177 
178  //! Drop the message limit for the subscriber.
179  /*!
180  * Dropping the message limit means that there is no more
181  * subscription for the agent.
182  */
183  void
184  drop_limit() noexcept
185  {
186  m_limit = nullptr;
187 
190  }
191 
192  //! Set the delivery filter for the subscriber.
193  void
194  set_filter( const delivery_filter_t & filter ) noexcept
195  {
196  m_filter = &filter;
197 
201  }
202 
203  //! Drop the delivery filter for the subscriber.
204  void
205  drop_filter() noexcept
206  {
207  m_filter = nullptr;
208 
211  }
212 
213  //! Must a message be delivered to the subscriber?
217  message_t & msg ) const noexcept
218  {
219  // For the case when there are actual subscriptions.
220  // We assume that will be in 99.9% cases.
221  auto need_deliver = delivery_possibility_t::must_be_delivered;
222 
223  if( state_t::only_filter == m_state )
224  // Only filter, no actual subscriptions.
225  // No message delivery for that case.
226  need_deliver = delivery_possibility_t::no_subscription;
227  else if( state_t::subscriptions_and_filter == m_state )
228  // Delivery must be checked by delivery filter.
229  need_deliver = m_filter->check( subscriber, msg ) ?
230  delivery_possibility_t::must_be_delivered :
231  delivery_possibility_t::disabled_by_delivery_filter;
232 
233  return need_deliver;
234  }
235 };
236 
237 //
238 // messages_table_item_t
239 //
240 /*!
241  * \brief A type of item of message table for retained message mbox.
242  *
243  * For each message type is necessary to store:
244  * - a list of subscriber for that message;
245  * - the last message sent.
246  *
247  * This type is intended to be used as a container for such data.
248  *
249  * \since
250  * v.1.0.3
251  */
253  {
254  //! A special coparator for agents with respect to
255  //! agent's priority.
257  {
258  bool operator()( agent_t * a, agent_t * b ) const noexcept
259  {
260  return ::so_5::impl::special_agent_ptr_compare( *a, *b );
261  }
262  };
263 
264  //! Type of subscribers map.
265  using subscribers_map_t =
267 
268  //! Subscribers.
269  /*!
270  * Can be empty. This is for case when the first message was sent
271  * when there is no subscribers yet.
272  */
274 
275  //! Retained message.
276  /*!
277  * Can be nullptr. It means that there is no any attempts to send
278  * a message of this type.
279  */
281  };
282 
283 //
284 // template_independent_mbox_data_t
285 //
286 /*!
287  * \brief A mixin with actual data which is necessary for implementation
288  * of retained mbox.
289  *
290  * This data type doesn't depend on any template parameters.
291  *
292  * \since
293  * v.1.0.3
294  */
296  {
297  //! SObjectizer Environment to work in.
299 
300  //! ID of the mbox.
302 
303  //! Type of messages table.
304  using messages_table_t =
306 
307  //! Table of current subscriptions and messages.
309 
311  environment_t & env,
312  mbox_id_t id )
313  : m_env{ env }
314  , m_id{id}
315  {}
316  };
317 
318 //
319 // actual_mbox_t
320 //
321 
322 /*!
323  * \brief An actual implementation of retained message mbox.
324  *
325  * \tparam Config type with main definitions for this message box type.
326  *
327  * \tparam Tracing_Base base class with implementation of message
328  * delivery tracing methods.
329  *
330  * \since
331  * v.1.0.3
332  */
333 template<
334  typename Config,
335  typename Tracing_Base >
337  : public abstract_message_box_t
338  , private Tracing_Base
339  {
340  public:
341  /*!
342  * \brief Initializing constructor.
343  *
344  * \tparam Tracing_Args parameters for Tracing_Base constructor
345  * (can be empty list if Tracing_Base have only the default constructor).
346  */
347  template< typename... Tracing_Args >
349  //! SObjectizer Environment to work in.
350  environment_t & env,
351  //! ID of this mbox.
352  mbox_id_t id,
353  //! Optional parameters for Tracing_Base's constructor.
354  Tracing_Args &&... args )
356  , m_data{ env, id }
357  {}
358 
359  mbox_id_t
360  id() const override
361  {
362  return this->m_data.m_id;
363  }
364 
365  void
367  const std::type_index & msg_type,
368  const so_5::message_limit::control_block_t * limit,
369  agent_t & subscriber ) override
370  {
372  msg_type,
373  subscriber,
374  [&] {
375  return subscriber_info_t{ limit };
376  },
377  [&]( subscriber_info_t & info ) {
378  info.set_limit( limit );
379  } );
380  }
381 
382  void
384  const std::type_index & msg_type,
385  agent_t & subscriber ) override
386  {
388  msg_type,
389  subscriber,
390  []( subscriber_info_t & info ) {
391  info.drop_limit();
392  } );
393  }
394 
395  std::string
396  query_name() const override
397  {
399  s << "<mbox:type=RETAINED_MPMC:id=" << this->m_data.m_id << ">";
400 
401  return s.str();
402  }
403 
405  type() const override
406  {
408  }
409 
410  void
412  const std::type_index & msg_type,
413  const message_ref_t & message,
414  unsigned int overlimit_reaction_deep ) override
415  {
417  *this, // as Tracing_base
418  *this, // as abstract_message_box_t
419  "deliver_message",
421 
423 
425  tracer,
426  msg_type,
427  message,
429  }
430 
431  void
433  const std::type_index & msg_type,
434  const delivery_filter_t & filter,
435  agent_t & subscriber ) override
436  {
438  msg_type,
439  subscriber,
440  [&] {
441  return subscriber_info_t{ &filter };
442  },
443  [&]( subscriber_info_t & info ) {
445  } );
446  }
447 
448  void
450  const std::type_index & msg_type,
451  agent_t & subscriber ) noexcept override
452  {
454  msg_type,
455  subscriber,
456  []( subscriber_info_t & info ) {
457  info.drop_filter();
458  } );
459  }
460 
462  environment() const noexcept override
463  {
464  return this->m_data.m_env;
465  }
466 
467  private :
468  //! Data of this message mbox.
470 
471  //! Object lock.
473 
474  template< typename Info_Maker, typename Info_Changer >
475  void
477  const std::type_index & msg_type,
478  agent_t & subscriber,
479  Info_Maker maker,
480  Info_Changer changer )
481  {
483 
484  // If there is no item for this message type it will be
485  // created automatically.
486  auto & table_item = this->m_data.m_messages_table[ msg_type ];
487 
490  // There is no subscriber yet. It must be added.
492  &subscriber, maker() ).first;
493  else
494  // Subscriber is known. It must be updated.
496 
497  // If there is a retained message then delivery attempt
498  // must be performed.
499  // NOTE: an exception at this stage doesn't remove new subscription.
502  msg_type,
504  subscriber,
506  }
507 
508  template< typename Info_Changer >
509  void
511  const std::type_index & msg_type,
512  agent_t & subscriber,
513  Info_Changer changer )
514  {
516 
518  if( it_table_item != this->m_data.m_messages_table.end() )
519  {
520  auto & table_item = it_table_item->second;
521 
523  &subscriber );
525  {
526  // Subscriber is found and must be modified.
528 
529  // If info about subscriber becomes empty after
530  // modification then subscriber info must be removed.
531  if( it_subscriber->second.empty() )
533  }
534  }
535  }
536 
537  void
539  typename Tracing_Base::deliver_op_tracer const & tracer,
540  const std::type_index & msg_type,
541  const message_ref_t & message,
542  unsigned int overlimit_reaction_deep )
543  {
545 
546  // If there is no item for this message type it will be
547  // created automatically.
548  auto & table_item = this->m_data.m_messages_table[ msg_type ];
549 
550  // Message must be stored as retained.
552 
554  if( !subscribers.empty() )
555  for( const auto & kv : subscribers )
557  *(kv.first),
558  kv.second,
559  tracer,
560  msg_type,
561  message,
563  else
565  }
566 
567  void
569  agent_t & subscriber,
570  const subscriber_info_t & subscriber_info,
571  typename Tracing_Base::deliver_op_tracer const & tracer,
572  const std::type_index & msg_type,
573  const message_ref_t & message,
574  unsigned int overlimit_reaction_deep ) const
575  {
576  const auto delivery_status =
578  subscriber,
579  *(message.get()) );
580 
582  {
583  using namespace so_5::message_limit::impl;
584 
586  this->m_data.m_id,
587  subscriber,
589  msg_type,
590  message,
593  [&] {
595 
597  subscriber,
599  this->m_data.m_id,
600  msg_type,
601  message );
602  } );
603  }
604  else
607  }
608 
609  /*!
610  * \brief An attempt to deliver retained message to the new subscriber.
611  *
612  * This attempt will be performed only if there is the retained message.
613  */
614  void
616  const std::type_index & msg_type,
617  const message_ref_t & retained_msg,
618  agent_t & subscriber,
619  const subscriber_info_t & subscriber_info )
620  {
621  if( retained_msg )
622  {
623  const unsigned int overlimit_reaction_deep = 0;
624 
626  *this, // as Tracing_base
627  *this, // as abstract_message_box_t
628  "deliver_message_on_subscription",
629  msg_type,
630  retained_msg,
632 
634  subscriber,
636  tracer,
637  msg_type,
638  retained_msg,
640  }
641  }
642 
643  /*!
644  * \brief Ensures that message is an immutable message.
645  *
646  * Checks mutability flag and throws an exception if message is
647  * a mutable one.
648  */
649  void
651  const std::type_index & msg_type,
652  const message_ref_t & what ) const
653  {
658  "an attempt to deliver mutable message via MPMC mbox"
659  ", msg_type=" + std::string(msg_type.name()) );
660  }
661  };
662 
663 } /* namespace details */
664 
665 //
666 //
667 // default_traits_t
668 //
669 /*!
670  * \brief Default traits for retained message mbox.
671  */
673 
674 //
675 // make_mbox
676 //
677 /*!
678  * \brief Create an instance of retained message mbox.
679  *
680  * Simple usage example:
681  * \code
682  * so_5::environment_t & env = ...;
683  * const so_5::mbox_t retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
684  * so_5::send<Some_Message>(retained_mbox, ...);
685  * \endcode
686  * An instance of default implementation retained message mbox will be created.
687  * This instance will be protected by std::mutex.
688  *
689  * If you want to use retained_mbox in a single-threaded environment
690  * without a multithreaded protection then so_5::null_mutex_t (or any
691  * similar null-mutex implementation) can be used:
692  * \code
693  * so_5::environment_t & env = ...
694  * const so_5::mbox_t retained_mbox =
695  * so_5::extra::mboxes::retained_msg::make_mbox<
696  * so_5::extra::mboxes::retained_msg::default_traits_t,
697  * so_5::null_mutex_t>(env);
698  * so_5::send<Some_Message>(retained_mbox, ...);
699  * \endcode
700  *
701  * If you want to use your own mutex-like object (with interface which
702  * allows to use your mutex-like class with std::lock_guard) then you can
703  * do it similar way:
704  * \code
705  * so_5::environment_t & env = ...
706  * const so_5::mbox_t retained_mbox =
707  * so_5::extra::mboxes::retained_msg::make_mbox<
708  * so_5::extra::mboxes::retained_msg::default_traits_t,
709  * Your_Own_Mutex_Class>(env);
710  * so_5::send<Some_Message>(retained_mbox, ...);
711  * \endcode
712  *
713  * \tparam Traits type with traits of mbox implementation.
714  *
715  * \tparam Lock_Type a type of mutex to be used for protection of
716  * retained message mbox content. This must be a DefaultConstructible
717  * type with interface which allows to use Lock_Type with std::lock_guard.
718  *
719  * \since
720  * v.1.0.3
721  */
722 template<
723  typename Traits = default_traits_t,
724  typename Lock_Type = std::mutex >
725 mbox_t
727  {
729 
730  return env.make_custom_mbox(
731  []( const mbox_creation_data_t & data )
732  {
733  mbox_t result;
734 
736  {
737  using T = details::actual_mbox_t<
738  config_type,
740 
741  result = mbox_t{ new T{
743  }
744  };
745  }
746  else
747  {
748  using T = details::actual_mbox_t<
749  config_type,
751  result = mbox_t{ new T{ data.m_env.get(), data.m_id } };
752  }
753 
754  return result;
755  } );
756  }
757 
758 } /* namespace retained_msg */
759 
760 } /* namespace mboxes */
761 
762 } /* namespace extra */
763 
764 } /* namespace so_5 */
void drop_limit() noexcept
Drop the message limit for the subscriber.
void drop_filter() noexcept
Drop the delivery filter for the subscriber.
A type of item of message table for retained message mbox.
void set_filter(const delivery_filter_t &filter) noexcept
Set the delivery filter for the subscriber.
mbox_t make_mbox(environment_t &env)
Create an instance of retained message mbox.
const message_limit::control_block_t * limit() const noexcept
void insert_or_modify_subscriber(const std::type_index &msg_type, agent_t &subscriber, Info_Maker maker, Info_Changer changer)
void try_deliver_retained_message_to(const std::type_index &msg_type, const message_ref_t &retained_msg, agent_t &subscriber, const subscriber_info_t &subscriber_info)
An attempt to deliver retained message to the new subscriber.
subscriber_info_t(const delivery_filter_t *filter)
Constructor for the case when subscriber info is being created during event subscription.
template_independent_mbox_data_t m_data
Data of this message mbox.
Ranges for error codes of each submodules.
Definition: details.hpp:13
void unsubscribe_event_handlers(const std::type_index &msg_type, agent_t &subscriber) override
so_5::environment_t & environment() const noexcept override
A mixin with actual data which is necessary for implementation of retained mbox.
void drop_delivery_filter(const std::type_index &msg_type, agent_t &subscriber) noexcept override
messages_table_t m_messages_table
Table of current subscriptions and messages.
An actual implementation of retained message mbox.
A special coparator for agents with respect to agent&#39;s priority.
const so_5::message_limit::control_block_t * m_limit
Optional message limit for that subscriber.
delivery_possibility_t must_be_delivered(agent_t &subscriber, message_t &msg) const noexcept
Must a message be delivered to the subscriber?
void modify_and_remove_subscriber_if_needed(const std::type_index &msg_type, agent_t &subscriber, Info_Changer changer)
A helper type which is a collection of type parameters.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, agent_t &subscriber) override
void do_deliver_message_to_subscriber(agent_t &subscriber, const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const
void subscribe_event_handler(const std::type_index &msg_type, const so_5::message_limit::control_block_t *limit, agent_t &subscriber) override
Default traits for retained message mbox.
state_t m_state
Current state of the subscriber parameters.
void set_limit(const message_limit::control_block_t *limit) noexcept
Set the message limit for the subscriber.
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
An information block about one subscriber.
actual_mbox_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
const delivery_filter_t * m_filter
Delivery filter for that message for that subscriber.
environment_t & m_env
SObjectizer Environment to work in.
const int rc_service_request_via_retained_msg_mbox
An attempt perform service request via retained message mbox.
void do_deliver_message(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) override