SObjectizer-5 Extra
Loading...
Searching...
No Matches
round_robin.hpp
Go to the documentation of this file.
1/*!
2 * \file
3 * \brief Implementation of round-robin mbox.
4 */
5
6#pragma once
7
8#include <so_5_extra/error_ranges.hpp>
9
10#include <so_5/impl/msg_tracing_helpers.hpp>
11
12namespace so_5 {
13
14namespace extra {
15
16namespace mboxes {
17
18namespace round_robin {
19
20namespace errors {
21
22/*!
23 * \brief An attempt to set delivery filter to round_robin mbox.
24 *
25 * \since
26 * v.1.0.1
27 */
30
31} /* namespace errors */
32
33namespace details {
34
35//
36// subscriber_info_t
37//
38
39/*!
40 * \brief An information block about one subscriber.
41 */
43{
44 //! Subscriber.
46
47 //! Constructor for the case when subscriber info is being
48 //! created during event subscription.
50 so_5::abstract_message_sink_t & sink )
51 : m_sink( sink )
52 {}
53};
54
55//
56// subscriber_container_t
57//
58
59/*!
60 * \brief Type of container for holding subscribers for one message type.
61 */
63 {
64 public :
66
67 bool
68 empty() const noexcept
69 {
70 return m_subscribers.empty();
71 }
72
73 void
75 so_5::abstract_message_sink_t & sink )
76 {
77 m_subscribers.emplace_back( sink );
78 }
79
81 end() noexcept
82 {
83 return m_subscribers.end();
84 }
85
88 {
89 return std::find_if( std::begin(m_subscribers), std::end(m_subscribers),
90 [&]( const auto & info ) {
91 return std::addressof( info.m_sink.get() ) ==
92 std::addressof( sink );
93 } );
94 }
95
96 void
97 erase( storage_t::iterator it ) noexcept
98 {
99 m_subscribers.erase(it);
101 }
102
103 const subscriber_info_t &
104 current_subscriber() const noexcept
105 {
106 return m_subscribers[ m_current_subscriber ];
107 }
108
109 void
111 {
112 ++m_current_subscriber;
114 }
115
116 private :
119
120 void
122 {
123 if( m_current_subscriber >= m_subscribers.size() )
124 m_current_subscriber = 0;
125 }
126 };
127
128//
129// data_t
130//
131
132/*!
133 * \brief Common part of round-robin mbox implementation.
134 *
135 * This part depends only on Lock type but not on tracing facilities.
136 *
137 * \tparam Lock type of lock object to be used.
138 */
139template< typename Lock >
140struct data_t
141 {
142 //! Initializing constructor.
144 environment_t & env,
145 mbox_id_t id )
146 : m_env{ env }
147 , m_id{ id }
148 {}
149
150 //! SObjectizer Environment to work in.
152
153 //! ID of this mbox.
155
156 //! Object lock.
157 Lock m_lock;
158
159 /*!
160 * \brief Map from message type to subscribers.
161 */
165
166 //! Map of subscribers to messages.
168 };
169
170//
171// mbox_template_t
172//
173
174//! A template with implementation of round-robin mbox.
175/*!
176 * \tparam Lock_Type type of lock to be used for thread safety.
177 * \tparam Tracing_Base base class with implementation of message
178 * delivery tracing methods. Expected to be tracing_enabled_base or
179 * tracing_disabled_base from so_5::impl::msg_tracing_helpers namespace.
180 */
181template<
182 typename Lock_Type,
183 typename Tracing_Base >
186 , private data_t< Lock_Type >
187 , private Tracing_Base
188 {
189 using data_type = data_t< Lock_Type >;
190
191 public:
192 //! Initializing constructor.
193 template< typename... Tracing_Args >
195 //! SObjectizer Environment to work in.
196 environment_t & env,
197 //! ID of this mbox.
198 mbox_id_t id,
199 //! Optional parameters for Tracing_Base's constructor.
200 Tracing_Args &&... args )
201 : data_type{ env, id }
203 {}
204
206 id() const override
207 {
208 return this->m_id;
209 }
210
211 void
213 const std::type_index & msg_type,
214 so_5::abstract_message_sink_t & subscriber ) override
215 {
216 std::lock_guard< Lock_Type > lock( this->m_lock );
217
218 auto it = this->m_subscribers.find( msg_type );
219 if( it == this->m_subscribers.end() )
220 {
221 // There isn't such message type yet.
224
226 }
227 else
228 {
229 auto & sinks = it->second;
230
231 auto pos = sinks.find( subscriber );
232 if( pos == sinks.end() )
233 // There is no subscriber in the container.
234 // It must be added.
236 }
237 }
238
239 void
241 const std::type_index & msg_type,
242 so_5::abstract_message_sink_t & subscriber ) noexcept override
243 {
244 std::lock_guard< Lock_Type > lock( this->m_lock );
245
246 auto it = this->m_subscribers.find( msg_type );
247 if( it != this->m_subscribers.end() )
248 {
249 auto & sinks = it->second;
250
251 auto pos = sinks.find( subscriber );
252 if( pos != sinks.end() )
253 {
254 sinks.erase( pos );
255 }
256
257 if( sinks.empty() )
258 this->m_subscribers.erase( it );
259 }
260 }
261
262 std::string
263 query_name() const override
264 {
266 s << "<mbox:type=RRMPSC:id=" << this->m_id << ">";
267
268 return s.str();
269 }
270
272 type() const override
273 {
275 }
276
277 void
279 so_5::message_delivery_mode_t delivery_mode,
280 const std::type_index & msg_type,
281 const so_5::message_ref_t & message,
282 unsigned int redirection_deep ) override
283 {
285 *this, // as Tracing_Base
286 *this, // as abstract_message_box_t
287 "deliver_message",
290
292 tracer,
294 msg_type,
295 message,
297 }
298
299 void
301 const std::type_index & /*msg_type*/,
302 const so_5::delivery_filter_t & /*filter*/,
303 so_5::abstract_message_sink_t & /*subscriber*/ ) override
304 {
305 using namespace so_5::extra::mboxes::round_robin::errors;
306
309 "set_delivery_filter is called for round_robin-mbox" );
310 }
311
312 void
314 const std::type_index & /*msg_type*/,
315 so_5::abstract_message_sink_t & /*subscriber*/ ) noexcept override
316 {
317 // Nothing to do.
318 }
319
321 environment() const noexcept override
322 {
323 return this->m_env;
324 }
325
326 private :
327 void
329 typename Tracing_Base::deliver_op_tracer const & tracer,
330 so_5::message_delivery_mode_t delivery_mode,
331 const std::type_index & msg_type,
332 const so_5::message_ref_t & message,
333 unsigned int redirection_deep )
334 {
335 std::lock_guard< Lock_Type > lock( this->m_lock );
336
337 auto it = this->m_subscribers.find( msg_type );
338 if( it != this->m_subscribers.end() )
339 {
342
345 tracer,
347 msg_type,
348 message,
350 }
351 else
353 }
354
355 void
357 const subscriber_info_t & subscriber_info,
358 typename Tracing_Base::deliver_op_tracer const & tracer,
359 so_5::message_delivery_mode_t delivery_mode,
360 const std::type_index & msg_type,
361 const so_5::message_ref_t & message,
362 unsigned int redirection_deep )
363 {
364 using namespace so_5::message_limit::impl;
365
367 this->m_id,
369 msg_type,
370 message,
373 }
374 };
375
376} /* namespace details */
377
378//
379// make_mbox
380//
381/*!
382 * \brief Create an implementation of round-robin mbox.
383 *
384 * Usage example:
385 * \code
386 so_5::environment_t & env = ...;
387 const so_5::mbox_t rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );
388 ...
389 so_5::send< some_message >( rrmbox, ... );
390 * \endcode
391 *
392 * \tparam Lock_Type type of lock to be used for thread safety.
393 */
394template< typename Lock_Type = std::mutex >
395mbox_t
397 {
398 return env.make_custom_mbox(
399 []( const mbox_creation_data_t & data ) {
401
403 {
404 using T = details::mbox_template_t<
405 Lock_Type,
407
409 data.m_env.get(),
410 data.m_id,
411 data.m_tracer )
412 };
413 }
414 else
415 {
416 using T = details::mbox_template_t<
417 Lock_Type,
419
421 data.m_env.get(),
422 data.m_id )
423 };
424 }
425
426 return result;
427 } );
428 }
429
430} /* namespace round_robin */
431
432} /* namespace mboxes */
433
434} /* namespace extra */
435
436} /* namespace so_5 */
A template with implementation of round-robin mbox.
void unsubscribe_event_handler(const std::type_index &msg_type, so_5::abstract_message_sink_t &subscriber) noexcept override
void set_delivery_filter(const std::type_index &, const so_5::delivery_filter_t &, so_5::abstract_message_sink_t &) override
void drop_delivery_filter(const std::type_index &, so_5::abstract_message_sink_t &) noexcept override
void do_deliver_message(so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep) override
void do_deliver_message_to_subscriber(const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep)
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep)
void subscribe_event_handler(const std::type_index &msg_type, so_5::abstract_message_sink_t &subscriber) override
so_5::environment_t & environment() const noexcept override
mbox_template_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
Type of container for holding subscribers for one message type.
const subscriber_info_t & current_subscriber() const noexcept
void emplace_back(so_5::abstract_message_sink_t &sink)
storage_t::iterator find(so_5::abstract_message_sink_t &sink) noexcept
const int mboxes_round_robin_errors
Starting point for errors of mboxes::round_robin submodule.
const int rc_delivery_filter_cannot_be_used_on_round_robin_mbox
An attempt to set delivery filter to round_robin mbox.
mbox_t make_mbox(environment_t &env)
Create an implementation of round-robin mbox.
Ranges for error codes of each submodules.
Definition details.hpp:13
Common part of round-robin mbox implementation.
data_t(environment_t &env, mbox_id_t id)
Initializing constructor.
messages_table_t m_subscribers
Map of subscribers to messages.
environment_t & m_env
SObjectizer Environment to work in.
An information block about one subscriber.
subscriber_info_t(so_5::abstract_message_sink_t &sink)
Constructor for the case when subscriber info is being created during event subscription.
std::reference_wrapper< so_5::abstract_message_sink_t > m_sink
Subscriber.