SObjectizer-5 Extra
Loading...
Searching...
No Matches
retained_msg.hpp
Go to the documentation of this file.
1/*!
2 * \file
3 * \brief Implementation of mbox which holds last sent message.
4 *
5 * \since
6 * v.1.0.3
7 */
8
9#pragma once
10
11#include <so_5/version.hpp>
12
13#if SO_5_VERSION < SO_5_VERSION_MAKE(5u, 8u, 0u)
14#error "SObjectizer-5.8.0 of newest is required"
15#endif
16
17#include <so_5_extra/error_ranges.hpp>
18
19#include <so_5/impl/msg_tracing_helpers.hpp>
20#include <so_5/impl/local_mbox_basic_subscription_info.hpp>
21
22#include <so_5/details/sync_helpers.hpp>
23
24#include <so_5/mbox.hpp>
25
26#include <memory>
27
28namespace so_5 {
29
30namespace extra {
31
32namespace mboxes {
33
34namespace retained_msg {
35
36namespace errors {
37
38// Note: this namespace is empty for now.
39
40} /* namespace errors */
41
42namespace details {
43
44/*!
45 * \brief A helper type which is a collection of type parameters.
46 *
47 * This type is used to simplify code of retained_msg_mbox internals.
48 * Instead of writting something like:
49 * \code
50 * template< typename Traits >
51 * class ... {...};
52 *
53 * template< typename Traits, typename Lock_Type >
54 * class ... {...};
55 * \endcode
56 * this config_type allows to write like that:
57 * \code
58 * template< typename Config_Type >
59 * class ... {...};
60 *
61 * template< typename Config_Type >
62 * class ... {...};
63 * \endcode
64 *
65 * \tparam Traits traits type to be used.
66 *
67 * \tparam Lock_Type type of object to be used for thread-safety (like
68 * std::mutex or so_5::null_mutex_t).
69 *
70 * \since
71 * v.1.0.3
72 */
73template<
74 typename Traits,
75 typename Lock_Type >
77 {
78 using traits_type = Traits;
79 using lock_type = Lock_Type;
80 };
81
82/*!
83 * \name Type extractors for config_type
84 * \{
85 */
86template< typename Config_Type >
87using traits_t = typename Config_Type::traits_type;
88
89template< typename Config_Type >
90using lock_t = typename Config_Type::lock_type;
91/*!
92 * \}
93 */
94
95/*!
96 * \brief An information block about one subscriber.
97 *
98 * \since v.1.0.3, v.1.5.1
99 */
100using subscriber_info_t =
102
103//
104// messages_table_item_t
105//
106/*!
107 * \brief A type of item of message table for retained message mbox.
108 *
109 * For each message type is necessary to store:
110 * - a list of subscriber for that message;
111 * - the last message sent.
112 *
113 * This type is intended to be used as a container for such data.
114 *
115 * \since
116 * v.1.0.3
117 */
119 {
120 //! A special coparator for sinks with respect to
121 //! sinks's priority.
123 {
124 bool operator()(
125 abstract_message_sink_t * a,
126 abstract_message_sink_t * b ) const noexcept
127 {
128 return abstract_message_sink_t::special_sink_ptr_compare( a, b );
129 }
130 };
131
132 //! Type of subscribers map.
137 >;
138
139 //! Subscribers.
140 /*!
141 * Can be empty. This is for case when the first message was sent
142 * when there is no subscribers yet.
143 */
145
146 //! Retained message.
147 /*!
148 * Can be nullptr. It means that there is no any attempts to send
149 * a message of this type.
150 */
152 };
153
154//
155// template_independent_mbox_data_t
156//
157/*!
158 * \brief A mixin with actual data which is necessary for implementation
159 * of retained mbox.
160 *
161 * This data type doesn't depend on any template parameters.
162 *
163 * \since
164 * v.1.0.3
165 */
167 {
168 //! SObjectizer Environment to work in.
170
171 //! ID of the mbox.
173
174 //! Type of messages table.
177
178 //! Table of current subscriptions and messages.
180
182 environment_t & env,
183 mbox_id_t id )
184 : m_env{ env }
185 , m_id{id}
186 {}
187 };
188
189//
190// actual_mbox_t
191//
192
193/*!
194 * \brief An actual implementation of retained message mbox.
195 *
196 * \tparam Config type with main definitions for this message box type.
197 *
198 * \tparam Tracing_Base base class with implementation of message
199 * delivery tracing methods.
200 *
201 * \since
202 * v.1.0.3
203 */
204template<
205 typename Config,
206 typename Tracing_Base >
207class actual_mbox_t final
208 : public abstract_message_box_t
209 , private Tracing_Base
210 {
211 public:
212 /*!
213 * \brief Initializing constructor.
214 *
215 * \tparam Tracing_Args parameters for Tracing_Base constructor
216 * (can be empty list if Tracing_Base have only the default constructor).
217 */
218 template< typename... Tracing_Args >
220 //! SObjectizer Environment to work in.
221 environment_t & env,
222 //! ID of this mbox.
223 mbox_id_t id,
224 //! Optional parameters for Tracing_Base's constructor.
225 Tracing_Args &&... args )
226 : Tracing_Base{ std::forward< Tracing_Args >(args)... }
227 , m_data{ env, id }
228 {}
229
231 id() const override
232 {
233 return this->m_data.m_id;
234 }
235
236 void
238 const std::type_index & msg_type,
239 abstract_message_sink_t & subscriber ) override
240 {
241 insert_or_modify_subscriber(
242 msg_type,
243 subscriber,
244 [&] {
245 return subscriber_info_t{
246 subscriber_info_t::subscription_present_t{}
247 };
248 },
249 [&]( subscriber_info_t & info ) {
250 info.subscription_defined();
251 } );
252 }
253
254 void
256 const std::type_index & msg_type,
257 abstract_message_sink_t & subscriber ) noexcept override
258 {
259 modify_and_remove_subscriber_if_needed(
260 msg_type,
261 subscriber,
262 []( subscriber_info_t & info ) {
263 info.subscription_dropped();
264 } );
265 }
266
267 std::string
268 query_name() const override
269 {
270 std::ostringstream s;
271 s << "<mbox:type=RETAINED_MPMC:id=" << this->m_data.m_id << ">";
272
273 return s.str();
274 }
275
277 type() const override
278 {
279 return mbox_type_t::multi_producer_multi_consumer;
280 }
281
282 void
284 message_delivery_mode_t delivery_mode,
285 const std::type_index & msg_type,
286 const message_ref_t & message,
287 unsigned int redirection_deep ) override
288 {
289 typename Tracing_Base::deliver_op_tracer tracer{
290 *this, // as Tracing_base
291 *this, // as abstract_message_box_t
292 "deliver_message",
293 delivery_mode,
294 msg_type, message, redirection_deep };
295
296 ensure_immutable_message( msg_type, message );
297
298 do_deliver_message_impl(
299 tracer,
300 delivery_mode,
301 msg_type,
302 message,
303 redirection_deep );
304 }
305
306 void
308 const std::type_index & msg_type,
309 const delivery_filter_t & filter,
310 abstract_message_sink_t & subscriber ) override
311 {
312 insert_or_modify_subscriber(
313 msg_type,
314 subscriber,
315 [&] {
316 return subscriber_info_t{ filter };
317 },
318 [&]( subscriber_info_t & info ) {
319 info.set_filter( filter );
320 } );
321 }
322
323 void
325 const std::type_index & msg_type,
326 abstract_message_sink_t & subscriber ) noexcept override
327 {
328 modify_and_remove_subscriber_if_needed(
329 msg_type,
330 subscriber,
331 []( subscriber_info_t & info ) {
332 info.drop_filter();
333 } );
334 }
335
337 environment() const noexcept override
338 {
339 return this->m_data.m_env;
340 }
341
342 private :
343 //! Data of this message mbox.
345
346 //! Object lock.
348
349 template< typename Info_Maker, typename Info_Changer >
350 void
352 const std::type_index & msg_type,
353 abstract_message_sink_t & subscriber,
354 Info_Maker maker,
355 Info_Changer changer )
356 {
357 std::lock_guard< lock_t<Config> > lock( m_lock );
358
359 // If there is no item for this message type it will be
360 // created automatically.
361 auto & table_item = this->m_data.m_messages_table[ msg_type ];
362
363 auto it_subscriber = table_item.m_subscribers.find(
364 std::addressof(subscriber) );
365 if( it_subscriber == table_item.m_subscribers.end() )
366 // There is no subscriber yet. It must be added.
367 it_subscriber = table_item.m_subscribers.emplace(
368 std::addressof(subscriber), maker() ).first;
369 else
370 // Subscriber is known. It must be updated.
371 changer( it_subscriber->second );
372
373 // If there is a retained message then delivery attempt
374 // must be performed.
375 // NOTE: an exception at this stage doesn't remove new subscription.
376 if( table_item.m_retained_msg )
377 try_deliver_retained_message_to(
378 msg_type,
379 table_item.m_retained_msg,
380 subscriber,
381 it_subscriber->second );
382 }
383
384 template< typename Info_Changer >
385 void
387 const std::type_index & msg_type,
388 abstract_message_sink_t & subscriber,
389 Info_Changer changer )
390 {
391 std::lock_guard< lock_t<Config> > lock( m_lock );
392
393 auto it_table_item = this->m_data.m_messages_table.find( msg_type );
394 if( it_table_item != this->m_data.m_messages_table.end() )
395 {
396 auto & table_item = it_table_item->second;
397
398 auto it_subscriber = table_item.m_subscribers.find(
399 std::addressof(subscriber) );
400 if( it_subscriber != table_item.m_subscribers.end() )
401 {
402 // Subscriber is found and must be modified.
403 changer( it_subscriber->second );
404
405 // If info about subscriber becomes empty after
406 // modification then subscriber info must be removed.
407 if( it_subscriber->second.empty() )
408 table_item.m_subscribers.erase( it_subscriber );
409 }
410 }
411 }
412
413 void
415 typename Tracing_Base::deliver_op_tracer const & tracer,
416 message_delivery_mode_t delivery_mode,
417 const std::type_index & msg_type,
418 const message_ref_t & message,
419 unsigned int redirection_deep )
420 {
421 std::lock_guard< lock_t<Config> > lock( m_lock );
422
423 // If there is no item for this message type it will be
424 // created automatically.
425 auto & table_item = this->m_data.m_messages_table[ msg_type ];
426
427 // Message must be stored as retained.
428 table_item.m_retained_msg = message;
429
430 auto & subscribers = table_item.m_subscribers;
431 if( !subscribers.empty() )
432 for( const auto & kv : subscribers )
433 do_deliver_message_to_subscriber(
434 *(kv.first),
435 kv.second,
436 tracer,
437 delivery_mode,
438 msg_type,
439 message,
440 redirection_deep );
441 else
442 tracer.no_subscribers();
443 }
444
445 void
447 abstract_message_sink_t & subscriber,
448 const subscriber_info_t & subscriber_info,
449 typename Tracing_Base::deliver_op_tracer const & tracer,
450 message_delivery_mode_t delivery_mode,
451 const std::type_index & msg_type,
452 const message_ref_t & message,
453 unsigned int redirection_deep ) const
454 {
455 const auto delivery_status =
456 subscriber_info.must_be_delivered(
457 subscriber,
458 message,
459 []( const message_ref_t & msg ) -> message_t & {
460 return *msg;
461 } );
462
463 if( delivery_possibility_t::must_be_delivered == delivery_status )
464 {
465 using namespace so_5::message_limit::impl;
466
467 subscriber.push_event(
468 this->id(),
469 delivery_mode,
470 msg_type,
471 message,
472 redirection_deep,
473 tracer.overlimit_tracer() );
474 }
475 else
476 tracer.message_rejected(
477 std::addressof(subscriber), delivery_status );
478 }
479
480 /*!
481 * \brief An attempt to deliver retained message to the new subscriber.
482 *
483 * This attempt will be performed only if there is the retained message.
484 */
485 void
487 const std::type_index & msg_type,
488 const message_ref_t & retained_msg,
489 abstract_message_sink_t & subscriber,
490 const subscriber_info_t & subscriber_info )
491 {
492 if( retained_msg )
493 {
494 const unsigned int redirection_deep = 0;
495
496 typename Tracing_Base::deliver_op_tracer tracer{
497 *this, // as Tracing_base
498 *this, // as abstract_message_box_t
499 "deliver_message_on_subscription",
500 message_delivery_mode_t::ordinary,
501 msg_type,
502 retained_msg,
503 redirection_deep };
504
505 do_deliver_message_to_subscriber(
506 subscriber,
507 subscriber_info,
508 tracer,
509 message_delivery_mode_t::ordinary,
510 msg_type,
511 retained_msg,
512 redirection_deep );
513 }
514 }
515
516 /*!
517 * \brief Ensures that message is an immutable message.
518 *
519 * Checks mutability flag and throws an exception if message is
520 * a mutable one.
521 */
522 void
524 const std::type_index & msg_type,
525 const message_ref_t & what ) const
526 {
527 if( message_mutability_t::immutable_message !=
528 message_mutability( what ) )
529 SO_5_THROW_EXCEPTION(
530 so_5::rc_mutable_msg_cannot_be_delivered_via_mpmc_mbox,
531 "an attempt to deliver mutable message via MPMC mbox"
532 ", msg_type=" + std::string(msg_type.name()) );
533 }
534 };
535
536} /* namespace details */
537
538//
539//
540// default_traits_t
541//
542/*!
543 * \brief Default traits for retained message mbox.
544 */
546
547//
548// make_mbox
549//
550/*!
551 * \brief Create an instance of retained message mbox.
552 *
553 * Simple usage example:
554 * \code
555 * so_5::environment_t & env = ...;
556 * const so_5::mbox_t retained_mbox = so_5::extra::mboxes::retained_msg::make_mbox<>(env);
557 * so_5::send<Some_Message>(retained_mbox, ...);
558 * \endcode
559 * An instance of default implementation retained message mbox will be created.
560 * This instance will be protected by std::mutex.
561 *
562 * If you want to use retained_mbox in a single-threaded environment
563 * without a multithreaded protection then so_5::null_mutex_t (or any
564 * similar null-mutex implementation) can be used:
565 * \code
566 * so_5::environment_t & env = ...
567 * const so_5::mbox_t retained_mbox =
568 * so_5::extra::mboxes::retained_msg::make_mbox<
569 * so_5::extra::mboxes::retained_msg::default_traits_t,
570 * so_5::null_mutex_t>(env);
571 * so_5::send<Some_Message>(retained_mbox, ...);
572 * \endcode
573 *
574 * If you want to use your own mutex-like object (with interface which
575 * allows to use your mutex-like class with std::lock_guard) then you can
576 * do it similar way:
577 * \code
578 * so_5::environment_t & env = ...
579 * const so_5::mbox_t retained_mbox =
580 * so_5::extra::mboxes::retained_msg::make_mbox<
581 * so_5::extra::mboxes::retained_msg::default_traits_t,
582 * Your_Own_Mutex_Class>(env);
583 * so_5::send<Some_Message>(retained_mbox, ...);
584 * \endcode
585 *
586 * \tparam Traits type with traits of mbox implementation.
587 *
588 * \tparam Lock_Type a type of mutex to be used for protection of
589 * retained message mbox content. This must be a DefaultConstructible
590 * type with interface which allows to use Lock_Type with std::lock_guard.
591 *
592 * \since
593 * v.1.0.3
594 */
595template<
596 typename Traits = default_traits_t,
597 typename Lock_Type = std::mutex >
598mbox_t
600 {
602
603 return env.make_custom_mbox(
604 []( const mbox_creation_data_t & data )
605 {
607
609 {
610 using T = details::actual_mbox_t<
613
614 result = mbox_t{ new T{
616 }
617 };
618 }
619 else
620 {
621 using T = details::actual_mbox_t<
624 result = mbox_t{ new T{ data.m_env.get(), data.m_id } };
625 }
626
627 return result;
628 } );
629 }
630
631} /* namespace retained_msg */
632
633} /* namespace mboxes */
634
635} /* namespace extra */
636
637} /* namespace so_5 */
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
void do_deliver_message_to_subscriber(abstract_message_sink_t &subscriber, const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
void try_deliver_retained_message_to(const std::type_index &msg_type, const message_ref_t &retained_msg, abstract_message_sink_t &subscriber, const subscriber_info_t &subscriber_info)
An attempt to deliver retained message to the new subscriber.
template_independent_mbox_data_t m_data
Data of this message mbox.
void modify_and_remove_subscriber_if_needed(const std::type_index &msg_type, abstract_message_sink_t &subscriber, Info_Changer changer)
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
so_5::environment_t & environment() const noexcept override
void insert_or_modify_subscriber(const std::type_index &msg_type, abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer)
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
actual_mbox_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
mbox_t make_mbox(environment_t &env)
Create an instance of retained message mbox.
Ranges for error codes of each submodules.
Definition details.hpp:13
Default traits for retained message mbox.
A helper type which is a collection of type parameters.
A special coparator for sinks with respect to sinks's priority.
bool operator()(abstract_message_sink_t *a, abstract_message_sink_t *b) const noexcept
A type of item of message table for retained message mbox.
A mixin with actual data which is necessary for implementation of retained mbox.
messages_table_t m_messages_table
Table of current subscriptions and messages.