SObjectizer-5 Extra
round_robin.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of round-robin mbox.
4  */
5 
6 #pragma once
7 
8 #include <so_5/impl/msg_tracing_helpers.hpp>
9 #include <so_5/impl/message_limit_internals.hpp>
10 
11 namespace so_5 {
12 
13 namespace extra {
14 
15 namespace mboxes {
16 
17 namespace round_robin {
18 
19 namespace details {
20 
21 //
22 // subscriber_info_t
23 //
24 
25 /*!
26  * \brief An information block about one subscriber.
27  */
29 {
30  //! Subscriber.
32 
33  //! Optional message limit for that subscriber.
35 
36  //! Constructor for the case when subscriber info is being
37  //! created during event subscription.
39  agent_t * agent,
40  const so_5::message_limit::control_block_t * limit )
41  : m_agent( agent )
42  , m_limit( limit )
43  {}
44 };
45 
46 //
47 // subscriber_container_t
48 //
49 
50 /*!
51  * \brief Type of container for holding subscribers for one message type.
52  */
54  {
55  public :
57 
58  bool
59  empty() const noexcept
60  {
61  return m_subscribers.empty();
62  }
63 
64  void
66  agent_t * agent,
67  const so_5::message_limit::control_block_t * limit )
68  {
69  m_subscribers.emplace_back( agent, limit );
70  }
71 
73  end() noexcept
74  {
75  return m_subscribers.end();
76  }
77 
79  find( agent_t * subscriber ) noexcept
80  {
81  return std::find_if( std::begin(m_subscribers), std::end(m_subscribers),
82  [&]( const auto & info ) {
83  return info.m_agent == subscriber;
84  } );
85  }
86 
87  void
88  erase( storage_t::iterator it ) noexcept
89  {
90  m_subscribers.erase(it);
92  }
93 
94  const subscriber_info_t &
95  current_subscriber() const noexcept
96  {
97  return m_subscribers[ m_current_subscriber ];
98  }
99 
100  void
102  {
103  ++m_current_subscriber;
105  }
106 
107  private :
110 
111  void
113  {
114  if( m_current_subscriber >= m_subscribers.size() )
115  m_current_subscriber = 0;
116  }
117  };
118 
119 //
120 // data_t
121 //
122 
123 /*!
124  * \brief Common part of round-robin mbox implementation.
125  *
126  * This part depends only on Lock type but not on tracing facilities.
127  *
128  * \tparam Lock type of lock object to be used.
129  */
130 template< typename Lock >
131 struct data_t
132  {
133  //! Initializing constructor.
135  environment_t & env,
136  mbox_id_t id )
137  : m_env{ env }
138  , m_id{ id }
139  {}
140 
141  //! SObjectizer Environment to work in.
143 
144  //! ID of this mbox.
146 
147  //! Object lock.
148  Lock m_lock;
149 
150  /*!
151  * \brief Map from message type to subscribers.
152  */
153  using messages_table_t = std::map<
154  std::type_index,
156 
157  //! Map of subscribers to messages.
159  };
160 
161 //
162 // mbox_template_t
163 //
164 
165 //! A template with implementation of round-robin mbox.
166 /*!
167  * \tparam Lock_Type type of lock to be used for thread safety.
168  * \tparam Tracing_Base base class with implementation of message
169  * delivery tracing methods. Expected to be tracing_enabled_base or
170  * tracing_disabled_base from so_5::impl::msg_tracing_helpers namespace.
171  */
172 template<
173  typename Lock_Type,
174  typename Tracing_Base >
176  : public abstract_message_box_t
177  , private data_t< Lock_Type >
178  , private Tracing_Base
179  {
180  using data_type = data_t< Lock_Type >;
181 
182  public:
183  //! Initializing constructor.
184  template< typename... Tracing_Args >
186  //! SObjectizer Environment to work in.
187  environment_t & env,
188  //! ID of this mbox.
189  mbox_id_t id,
190  //! Optional parameters for Tracing_Base's constructor.
191  Tracing_Args &&... args )
192  : data_type{ env, id }
194  {}
195 
196  mbox_id_t
197  id() const override
198  {
199  return this->m_id;
200  }
201 
202  void
204  const std::type_index & type_wrapper,
205  const so_5::message_limit::control_block_t * limit,
206  agent_t & subscriber ) override
207  {
208  std::lock_guard< Lock_Type > lock( this->m_lock );
209 
210  auto it = this->m_subscribers.find( type_wrapper );
211  if( it == this->m_subscribers.end() )
212  {
213  // There isn't such message type yet.
216 
218  }
219  else
220  {
221  auto & agents = it->second;
222 
223  auto pos = agents.find( &subscriber );
224  if( pos == agents.end() )
225  // There is no subscriber in the container.
226  // It must be added.
228  }
229  }
230 
231  void
233  const std::type_index & type_wrapper,
234  agent_t & subscriber ) override
235  {
236  std::lock_guard< Lock_Type > lock( this->m_lock );
237 
238  auto it = this->m_subscribers.find( type_wrapper );
239  if( it != this->m_subscribers.end() )
240  {
241  auto & agents = it->second;
242 
243  auto pos = agents.find( &subscriber );
244  if( pos != agents.end() )
245  {
246  agents.erase( pos );
247  }
248 
249  if( agents.empty() )
250  this->m_subscribers.erase( it );
251  }
252  }
253 
254  std::string
255  query_name() const override
256  {
258  s << "<mbox:type=RRMPSC:id=" << this->m_id << ">";
259 
260  return s.str();
261  }
262 
264  type() const override
265  {
267  }
268 
269  void
271  const std::type_index & msg_type,
272  const message_ref_t & message,
273  unsigned int overlimit_reaction_deep ) override
274  {
276  *this, // as Tracing_Base
277  *this, // as abstract_message_box_t
278  "deliver_message",
280 
282  tracer,
283  msg_type,
284  message,
286  }
287 
288  void
290  const std::type_index & /*msg_type*/,
291  const delivery_filter_t & /*filter*/,
292  agent_t & /*subscriber*/ ) override
293  {
296  "set_delivery_filter is called for MPSC-mbox" );
297  }
298 
299  void
301  const std::type_index & /*msg_type*/,
302  agent_t & /*subscriber*/ ) noexcept override
303  {
304  // Nothing to do.
305  }
306 
308  environment() const noexcept override
309  {
310  return this->m_env;
311  }
312 
313  private :
314  void
316  typename Tracing_Base::deliver_op_tracer const & tracer,
317  const std::type_index & msg_type,
318  const message_ref_t & message,
319  unsigned int overlimit_reaction_deep )
320  {
321  std::lock_guard< Lock_Type > lock( this->m_lock );
322 
323  auto it = this->m_subscribers.find( msg_type );
324  if( it != this->m_subscribers.end() )
325  {
326  const auto & agent_info = it->second.current_subscriber();
328 
330  agent_info,
331  tracer,
332  msg_type,
333  message,
335  }
336  else
338  }
339 
340  void
342  const subscriber_info_t & agent_info,
343  typename Tracing_Base::deliver_op_tracer const & tracer,
344  const std::type_index & msg_type,
345  const message_ref_t & message,
346  unsigned int overlimit_reaction_deep )
347  {
348  using namespace so_5::message_limit::impl;
349 
351  this->m_id,
352  *(agent_info.m_agent),
354  msg_type,
355  message,
358  [&] {
360 
362  *(agent_info.m_agent),
364  this->m_id,
365  msg_type,
366  message );
367  } );
368  }
369  };
370 
371 } /* namespace details */
372 
373 //
374 // make_mbox
375 //
376 /*!
377  * \brief Create an implementation of round-robin mbox.
378  *
379  * Usage example:
380  * \code
381  so_5::environment_t & env = ...;
382  const so_5::mbox_t rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );
383  ...
384  so_5::send< some_message >( rrmbox, ... );
385  * \endcode
386  *
387  * \tparam Lock_Type type of lock to be used for thread safety.
388  */
389 template< typename Lock_Type = std::mutex >
390 mbox_t
392  {
393  return env.make_custom_mbox(
394  []( const mbox_creation_data_t & data ) {
395  mbox_t result;
396 
398  {
399  using T = details::mbox_template_t<
400  Lock_Type,
402 
404  data.m_env.get(),
405  data.m_id,
406  data.m_tracer.get() )
407  };
408  }
409  else
410  {
411  using T = details::mbox_template_t<
412  Lock_Type,
414 
416  data.m_env.get(),
417  data.m_id )
418  };
419  }
420 
421  return result;
422  } );
423  }
424 
425 } /* namespace round_robin */
426 
427 } /* namespace mboxes */
428 
429 } /* namespace extra */
430 
431 } /* namespace so_5 */
void subscribe_event_handler(const std::type_index &type_wrapper, const so_5::message_limit::control_block_t *limit, agent_t &subscriber) override
environment_t & m_env
SObjectizer Environment to work in.
const subscriber_info_t & current_subscriber() const noexcept
Definition: round_robin.hpp:95
void do_deliver_message_to_subscriber(const subscriber_info_t &agent_info, typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
Ranges for error codes of each submodules.
Definition: details.hpp:13
void set_delivery_filter(const std::type_index &, const delivery_filter_t &, agent_t &) override
void do_deliver_message(const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep) override
void unsubscribe_event_handlers(const std::type_index &type_wrapper, agent_t &subscriber) override
A template with implementation of round-robin mbox.
subscriber_info_t(agent_t *agent, const so_5::message_limit::control_block_t *limit)
Constructor for the case when subscriber info is being created during event subscription.
Definition: round_robin.hpp:38
so_5::environment_t & environment() const noexcept override
mbox_t make_mbox(environment_t &env)
Create an implementation of round-robin mbox.
Common part of round-robin mbox implementation.
Type of container for holding subscribers for one message type.
Definition: round_robin.hpp:53
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, const std::type_index &msg_type, const message_ref_t &message, unsigned int overlimit_reaction_deep)
void emplace_back(agent_t *agent, const so_5::message_limit::control_block_t *limit)
Definition: round_robin.hpp:65
storage_t::iterator find(agent_t *subscriber) noexcept
Definition: round_robin.hpp:79
const so_5::message_limit::control_block_t * m_limit
Optional message limit for that subscriber.
Definition: round_robin.hpp:34
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(environment_t &env, mbox_id_t id)
Initializing constructor.
mbox_template_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
void drop_delivery_filter(const std::type_index &, agent_t &) noexcept override
An information block about one subscriber.
Definition: round_robin.hpp:28