SObjectizer-5 Extra
round_robin.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of round-robin mbox.
4  */
5 
6 #pragma once
7 
8 #include <so_5/rt/impl/h/msg_tracing_helpers.hpp>
9 #include <so_5/rt/impl/h/message_limit_internals.hpp>
10 
11 namespace so_5 {
12 
13 namespace extra {
14 
15 namespace mboxes {
16 
17 namespace round_robin {
18 
19 namespace details {
20 
21 //
22 // subscriber_info_t
23 //
24 
25 /*!
26  * \brief An information block about one subscriber.
27  */
29 {
30  //! Subscriber.
32 
33  //! Optional message limit for that subscriber.
35 
36  //! Constructor for the case when subscriber info is being
37  //! created during event subscription.
39  agent_t * agent,
40  const so_5::message_limit::control_block_t * limit )
41  : m_agent( agent )
42  , m_limit( limit )
43  {}
44 };
45 
46 //
47 // subscriber_container_t
48 //
49 
50 /*!
51  * \brief Type of container for holding subscribers for one message type.
52  */
54  {
55  public :
57 
58  bool
59  empty() const
60  {
61  return m_subscribers.empty();
62  }
63 
64  void
66  agent_t * agent,
67  const so_5::message_limit::control_block_t * limit )
68  {
69  m_subscribers.emplace_back( agent, limit );
70  }
71 
73  end()
74  {
75  return m_subscribers.end();
76  }
77 
80  {
81  return std::find_if( std::begin(m_subscribers), std::end(m_subscribers),
82  [&]( const auto & info ) {
83  return info.m_agent == subscriber;
84  } );
85  }
86 
87  void
88  erase( storage_t::iterator it )
89  {
90  m_subscribers.erase(it);
92  }
93 
94  const subscriber_info_t &
96  {
97  return m_subscribers[ m_current_subscriber ];
98  }
99 
100  void
102  {
103  ++m_current_subscriber;
105  }
106 
107  private :
110 
111  void
113  {
114  if( m_current_subscriber >= m_subscribers.size() )
115  m_current_subscriber = 0;
116  }
117  };
118 
119 //
120 // data_t
121 //
122 
123 /*!
124  * \brief Common part of round-robin mbox implementation.
125  *
126  * This part depends only on Lock type but not on tracing facilities.
127  *
128  * \tparam Lock type of lock object to be used.
129  */
130 template< typename Lock >
131 struct data_t
132  {
133  //! Initializing constructor.
134  data_t( mbox_id_t id )
135  : m_id{ id }
136  {}
137 
138  //! ID of this mbox.
140 
141  //! Object lock.
142  /*!
143  * \note
144  * Declared as mutable because it will be used in const-methods.
145  */
146  mutable Lock m_lock;
147 
148  /*!
149  * \brief Map from message type to subscribers.
150  */
151  using messages_table_t = std::map<
152  std::type_index,
154 
155  //! Map of subscribers to messages.
156  /*!
157  * \note
158  * Declared as mutable because it will be used in const-methods.
159  */
161  };
162 
163 //
164 // mbox_template_t
165 //
166 
167 //! A template with implementation of round-robin mbox.
168 /*!
169  * \tparam Lock_Type type of lock to be used for thread safety.
170  * \tparam Tracing_Base base class with implementation of message
171  * delivery tracing methods. Expected to be tracing_enabled_base or
172  * tracing_disabled_base from so_5::impl::msg_tracing_helpers namespace.
173  */
174 template<
175  typename Lock_Type,
176  typename Tracing_Base >
178  : public abstract_message_box_t
179  , private data_t< Lock_Type >
180  , private Tracing_Base
181  {
182  using data_type = data_t< Lock_Type >;
183 
184  public:
185  //! Initializing constructor.
186  template< typename... Tracing_Args >
188  //! ID of this mbox.
189  mbox_id_t id,
190  //! Optional parameters for Tracing_Base's constructor.
191  Tracing_Args &&... args )
192  : data_type{ id }
194  {}
195 
196  virtual mbox_id_t
197  id() const override
198  {
199  return this->m_id;
200  }
201 
202  virtual void
204  const std::type_index & type_wrapper,
205  const so_5::message_limit::control_block_t * limit,
206  agent_t * subscriber ) override
207  {
208  std::lock_guard< Lock_Type > lock( this->m_lock );
209 
210  auto it = this->m_subscribers.find( type_wrapper );
211  if( it == this->m_subscribers.end() )
212  {
213  // There isn't such message type yet.
216 
218  }
219  else
220  {
221  auto & agents = it->second;
222 
223  auto pos = agents.find( subscriber );
224  if( pos == agents.end() )
225  // There is no subscriber in the container.
226  // It must be added.
228  }
229  }
230 
231  virtual void
233  const std::type_index & type_wrapper,
234  agent_t * subscriber ) override
235  {
236  std::lock_guard< Lock_Type > lock( this->m_lock );
237 
238  auto it = this->m_subscribers.find( type_wrapper );
239  if( it != this->m_subscribers.end() )
240  {
241  auto & agents = it->second;
242 
243  auto pos = agents.find( subscriber );
244  if( pos != agents.end() )
245  {
246  agents.erase( pos );
247  }
248 
249  if( agents.empty() )
250  this->m_subscribers.erase( it );
251  }
252  }
253 
254  virtual std::string
255  query_name() const override
256  {
258  s << "<mbox:type=RRMPSC:id=" << this->m_id << ">";
259 
260  return s.str();
261  }
262 
263  virtual mbox_type_t
264  type() const override
265  {
267  }
268 
269  virtual void
271  const std::type_index & msg_type,
272  const message_ref_t & message,
273  unsigned int overlimit_reaction_deep ) const override
274  {
276  *this, // as Tracing_Base
277  *this, // as abstract_message_box_t
278  "deliver_message",
280 
282  tracer,
283  msg_type,
284  message,
287  }
288 
289  virtual void
291  const std::type_index & msg_type,
292  const message_ref_t & message,
293  unsigned int overlimit_reaction_deep ) const override
294  {
296  *this, // as Tracing_Base
297  *this, // as abstract_message_box_t
298  "deliver_service_request",
300 
302  tracer,
303  msg_type,
304  message,
306  }
307 
308  void
310  const std::type_index & msg_type,
311  const message_ref_t & message,
312  unsigned int overlimit_reaction_deep ) override
313  {
315  *this, // as Tracing_Base
316  *this, // as abstract_message_box_t
317  "deliver_enveloped_msg",
319 
321  tracer,
322  msg_type,
323  message,
326  }
327 
328  virtual void
330  const std::type_index & /*msg_type*/,
331  const delivery_filter_t & /*filter*/,
332  agent_t & /*subscriber*/ ) override
333  {
336  "set_delivery_filter is called for MPSC-mbox" );
337  }
338 
339  virtual void
341  const std::type_index & /*msg_type*/,
342  agent_t & /*subscriber*/ ) noexcept override
343  {
344  // Nothing to do.
345  }
346 
347  private :
348  void
350  typename Tracing_Base::deliver_op_tracer const & tracer,
351  const std::type_index & msg_type,
352  const message_ref_t & message,
353  invocation_type_t invocation_type,
354  unsigned int overlimit_reaction_deep ) const
355  {
356  std::lock_guard< Lock_Type > lock( this->m_lock );
357 
358  auto it = this->m_subscribers.find( msg_type );
359  if( it != this->m_subscribers.end() )
360  {
361  const auto & agent_info = it->second.current_subscriber();
363 
365  agent_info,
366  tracer,
367  msg_type,
368  message,
371  }
372  else
374  }
375 
376  void
378  const subscriber_info_t & agent_info,
379  typename Tracing_Base::deliver_op_tracer const & tracer,
380  const std::type_index & msg_type,
381  const message_ref_t & message,
382  invocation_type_t invocation_type,
383  unsigned int overlimit_reaction_deep ) const
384  {
385  using namespace so_5::message_limit::impl;
386 
388  this->m_id,
390  *(agent_info.m_agent),
392  msg_type,
393  message,
396  [&] {
398 
400  *(agent_info.m_agent),
402  this->m_id,
403  msg_type,
404  message );
405  } );
406  }
407 
408  void
410  typename Tracing_Base::deliver_op_tracer const & tracer,
411  const std::type_index & msg_type,
412  const message_ref_t & message,
413  unsigned int overlimit_reaction_deep ) const
414  {
415  using namespace so_5::message_limit::impl;
416 
418  [&] {
419  std::lock_guard< Lock_Type > lock( this->m_lock );
420 
421  auto it = this->m_subscribers.find( msg_type );
422  if( it == this->m_subscribers.end() )
423  {
425 
428  "no service handlers (no subscribers for message)" );
429  }
430 
431  const auto & agent_info = it->second.current_subscriber();
433 
435  tracer,
436  agent_info,
437  msg_type,
438  message,
440  } );
441  }
442 
443  void
445  typename Tracing_Base::deliver_op_tracer const & tracer,
446  const subscriber_info_t & agent_info,
447  const std::type_index & msg_type,
448  const message_ref_t & message,
449  unsigned int overlimit_reaction_deep ) const
450  {
451  using namespace so_5::message_limit::impl;
452 
454  this->m_id,
456  *(agent_info.m_agent),
458  msg_type,
459  message,
462  [&] {
464 
466  *(agent_info.m_agent),
468  this->m_id,
469  msg_type,
470  message );
471  } );
472  }
473  };
474 
475 } /* namespace details */
476 
477 //
478 // make_mbox
479 //
480 /*!
481  * \brief Create an implementation of round-robin mbox.
482  *
483  * Usage example:
484  * \code
485  so_5::environment_t & env = ...;
486  const so_5::mbox_t rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );
487  ...
488  so_5::send< some_message >( rrmbox, ... );
489  * \endcode
490  *
491  * \tparam Lock_Type type of lock to be used for thread safety.
492  */
493 template< typename Lock_Type = std::mutex >
494 mbox_t
496  {
497  return env.make_custom_mbox(
498  []( const mbox_creation_data_t & data ) {
499  mbox_t result;
500 
502  {
503  using T = details::mbox_template_t<
504  Lock_Type,
506 
507  result = mbox_t( new T( data.m_id, data.m_tracer.get() ) );
508  }
509  else
510  {
511  using T = details::mbox_template_t<
512  Lock_Type,
514 
515  result = mbox_t( new T( data.m_id ) );
516  }
517 
518  return result;
519  } );
520  }
521 
522 } /* namespace round_robin */
523 
524 } /* namespace mboxes */
525 
526 } /* namespace extra */
527 
528 } /* namespace so_5 */
virtual void drop_delivery_filter(const std::type_index &, agent_t &) noexcept override
virtual void do_deliver_service_request(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const override
Ranges for error codes of each submodules.
Definition: details.hpp:14
data_t(mbox_id_t id)
Initializing constructor.
virtual void do_deliver_message(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const override
void do_deliver_service_request_to_subscriber(typename Tracing_Base::deliver_op_tracer const &tracer, const subscriber_info_t &agent_info, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const
void do_deliver_service_request_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) const
void do_deliver_message_to_subscriber(const subscriber_info_t &agent_info, typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, invocation_type_t invocation_type, unsigned int overlimit_reaction_deep) const
A template with implementation of round-robin mbox.
void do_deliver_enveloped_msg(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) override
subscriber_info_t(agent_t *agent, const so_5::message_limit::control_block_t *limit)
Constructor for the case when subscriber info is being created during event subscription.
Definition: round_robin.hpp:38
virtual void unsubscribe_event_handlers(const std::type_index &type_wrapper, agent_t *subscriber) override
mbox_t make_mbox(environment_t &env)
Create an implementation of round-robin mbox.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, invocation_type_t invocation_type, unsigned int overlimit_reaction_deep) const
Common part of round-robin mbox implementation.
Type of container for holding subscribers for one message type.
Definition: round_robin.hpp:53
void emplace_back(agent_t *agent, const so_5::message_limit::control_block_t *limit)
Definition: round_robin.hpp:65
virtual void subscribe_event_handler(const std::type_index &type_wrapper, const so_5::message_limit::control_block_t *limit, agent_t *subscriber) override
const so_5::message_limit::control_block_t * m_limit
Optional message limit for that subscriber.
Definition: round_robin.hpp:34
messages_table_t m_subscribers
Map of subscribers to messages.
virtual void set_delivery_filter(const std::type_index &, const delivery_filter_t &, agent_t &) override
mbox_template_t(mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
An information block about one subscriber.
Definition: round_robin.hpp:28