SObjectizer  5.8
Loading...
Searching...
No Matches
mchain_details.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief Implementation details for message chains.
8 *
9 * \since v.5.5.13
10 */
11
12#pragma once
13
14#include <so_5/mchain.hpp>
15#include <so_5/mchain_select_ifaces.hpp>
16#include <so_5/environment.hpp>
17
18#include <so_5/ret_code.hpp>
19#include <so_5/exception.hpp>
20#include <so_5/error_logger.hpp>
21
22#include <so_5/details/abort_on_fatal_error.hpp>
23#include <so_5/details/at_scope_exit.hpp>
24#include <so_5/details/safe_cv_wait_for.hpp>
25
26#include <deque>
27#include <vector>
28#include <mutex>
29#include <condition_variable>
30
31namespace so_5 {
32
33namespace mchain_props {
34
35namespace details {
36
37//
38// ensure_queue_not_empty
39//
40/*!
41 * \brief Helper function which throws an exception if queue is empty.
42 *
43 * \since v.5.5.13
44 */
45template< typename Q >
46void
48 {
49 if( queue.is_empty() )
52 "an attempt to get message from empty demand queue" );
53 }
54
55//
56// ensure_queue_not_full
57//
58/*!
59 * \brief Helper function which throws an exception if queue is full.
60 *
61 * \since v.5.5.13
62 */
63template< typename Q >
64void
66 {
67 if( queue.is_full() )
70 "an attempt to push a message to full demand queue" );
71 }
72
73//
74// unlimited_demand_queue
75//
76/*!
77 * \brief Implementation of demands queue for size-unlimited message chain.
78 *
79 * \since v.5.5.13
80 */
82 {
83 public :
84 /*!
85 * \note This constructor is necessary just for a convinience.
86 */
88
89 //! Is queue full?
90 /*!
91 * \note Unlimited queue can't be null. Because of that this
92 * method always returns \a false.
93 */
94 [[nodiscard]]
95 bool
96 is_full() const noexcept { return false; }
97
98 //! Is queue empty?
99 [[nodiscard]]
100 bool
101 is_empty() const noexcept { return m_queue.empty(); }
102
103 //! Access to front item from queue.
104 [[nodiscard]]
105 demand_t &
107 {
108 ensure_queue_not_empty( *this );
109 return m_queue.front();
110 }
111
112 //! Remove the front item from queue.
113 void
115 {
116 ensure_queue_not_empty( *this );
117 m_queue.pop_front();
118 }
119
120 //! Add a new item to the end of the queue.
121 void
122 push_back( demand_t && demand )
123 {
124 m_queue.push_back( std::move(demand) );
125 }
126
127 //! Size of the queue.
128 [[nodiscard]]
129 std::size_t
130 size() const noexcept { return m_queue.size(); }
131
132 private :
133 //! Queue's storage.
135 };
136
137//
138// limited_dynamic_demand_queue
139//
140/*!
141 * \brief Implementation of demands queue for size-limited message chain with
142 * dynamically allocated storage.
143 *
144 * \since v.5.5.13
145 */
147 {
148 public :
149 //! Initializing constructor.
151 const capacity_t & capacity )
153 {}
154
155 //! Is queue full?
156 [[nodiscard]]
157 bool
158 is_full() const noexcept { return m_max_size == m_queue.size(); }
159
160 //! Is queue empty?
161 [[nodiscard]]
162 bool
163 is_empty() const noexcept { return m_queue.empty(); }
164
165 //! Access to front item from queue.
166 [[nodiscard]]
167 demand_t &
169 {
170 ensure_queue_not_empty( *this );
171 return m_queue.front();
172 }
173
174 //! Remove the front item from queue.
175 void
177 {
178 ensure_queue_not_empty( *this );
179 m_queue.pop_front();
180 }
181
182 //! Add a new item to the end of the queue.
183 void
184 push_back( demand_t && demand )
185 {
186 ensure_queue_not_full( *this );
187 m_queue.push_back( std::move(demand) );
188 }
189
190 //! Size of the queue.
191 [[nodiscard]]
192 std::size_t
193 size() const noexcept { return m_queue.size(); }
194
195 private :
196 //! Queue's storage.
198 //! Maximum size of the queue.
200 };
201
202//
203// limited_preallocated_demand_queue
204//
205/*!
206 * \brief Implementation of demands queue for size-limited message chain with
207 * preallocated storage.
208 *
209 * \since v.5.5.13
210 */
212 {
213 public :
214 //! Initializing constructor.
216 const capacity_t & capacity )
219 , m_head{ 0 }
220 , m_size{ 0 }
221 {}
222
223 //! Is queue full?
224 [[nodiscard]]
225 bool
226 is_full() const noexcept { return m_max_size == m_size; }
227
228 //! Is queue empty?
229 [[nodiscard]]
230 bool
231 is_empty() const noexcept { return 0 == m_size; }
232
233 //! Access to front item from queue.
234 [[nodiscard]]
235 demand_t &
237 {
238 ensure_queue_not_empty( *this );
239 return m_storage[ m_head ];
240 }
241
242 //! Remove the front item from queue.
243 void
245 {
246 ensure_queue_not_empty( *this );
247 m_storage[ m_head ] = demand_t{};
248 m_head = (m_head + 1) % m_max_size;
249 --m_size;
250 }
251
252 //! Add a new item to the end of the queue.
253 void
254 push_back( demand_t && demand )
255 {
256 ensure_queue_not_full( *this );
257 auto index = (m_head + m_size) % m_max_size;
258 m_storage[ index ] = std::move(demand);
259 ++m_size;
260 }
261
262 //! Size of the queue.
263 [[nodiscard]]
264 std::size_t
265 size() const noexcept { return m_size; }
266
267 private :
268 //! Queue's storage.
270 //! Maximum size of the queue.
272
273 //! Index of the queue head.
275 //! The current size of the queue.
277 };
278
279//
280// status
281//
282/*!
283 * \brief Status of the message chain.
284 *
285 * \since v.5.5.13
286 */
287enum class status
288 {
289 //! Bag is open and can be used for message sending.
290 open,
291 //! Bag is closed. New messages cannot be sent to it.
292 closed
293 };
294
295} /* namespace details */
296
297//
298// mchain_template
299//
300/*!
301 * \brief Template-based implementation of message chain.
302 *
303 * \tparam Queue type of demand queue for message chain.
304 * \tparam Tracing_Base type with message tracing implementation details.
305 *
306 * \since v.5.5.13
307 */
308template< typename Queue, typename Tracing_Base >
311 , private Tracing_Base
312 {
313 public :
314 //! Initializing constructor.
315 template< typename... Tracing_Args >
317 //! SObjectizer Environment for which message chain is created.
318 so_5::environment_t & env,
319 //! Mbox ID for this chain.
320 mbox_id_t id,
321 //! Chain parameters.
322 const mchain_params_t & params,
323 //! Arguments for Tracing_Base's constructor.
324 Tracing_Args &&... tracing_args )
326 , m_env( env )
327 , m_id( id )
331 , m_queue( params.capacity() )
332 {}
333
334 mbox_id_t
335 id() const override
336 {
337 return m_id;
338 }
339
340 void
342 const std::type_index & /*msg_type*/,
343 abstract_message_sink_t & /*subscriber*/ ) override
344 {
347 "mchain doesn't support subscription" );
348 }
349
350 void
352 const std::type_index & /*msg_type*/,
353 abstract_message_sink_t & /*subscriber*/ ) noexcept override
354 {}
355
356 std::string
357 query_name() const override
358 {
360 s << "<mchain:id=" << m_id << ">";
361
362 return s.str();
363 }
364
366 type() const override
367 {
369 }
370
371 void
373 message_delivery_mode_t delivery_mode,
374 const std::type_index & msg_type,
375 const message_ref_t & message,
376 unsigned int /*redirection_deep*/ ) override
377 {
378 switch( delivery_mode )
379 {
382 msg_type,
383 message );
384 break;
385
388 msg_type,
389 message );
390 break;
391 }
392 }
393
394 /*!
395 * \attention Will throw an exception because delivery
396 * filter is not applicable to mchains.
397 */
398 void
400 const std::type_index & /*msg_type*/,
401 const delivery_filter_t & /*filter*/,
402 abstract_message_sink_t & /*subscriber*/ ) override
403 {
406 "set_delivery_filter is called for mchain" );
407 }
408
409 void
411 const std::type_index & /*msg_type*/,
412 abstract_message_sink_t & /*subscriber*/ ) noexcept override
413 {}
414
415 [[nodiscard]]
418 demand_t & dest,
419 duration_t empty_queue_timeout ) override
420 {
422
423 // If queue is empty we must wait for some time.
425 if( queue_empty )
426 {
427 if( details::status::closed == m_status )
428 // Waiting for new messages has no sence because
429 // chain is closed.
431
432 auto predicate = [this, &queue_empty]() -> bool {
434 return !queue_empty ||
436 };
437
438 // Count of sleeping thread must be incremented before
439 // going to sleep and decremented right after.
442 [this] { --m_threads_to_wakeup; } );
443
444 // Wait until arrival of any message or closing of chain.
446 lock,
449 predicate );
450 }
451
452 // If queue is still empty nothing can be extracted and
453 // we must stop operation.
454 if( queue_empty )
455 return details::status::open == m_status ?
456 // The chain is still open so there must be this result
458 // The chain is closed and there must be different result
460
462 }
463
464 bool
465 empty() const override
466 {
467 return m_queue.is_empty();
468 }
469
470 std::size_t
471 size() const override
472 {
473 return m_queue.size();
474 }
475
477 environment() const noexcept override
478 {
479 return m_env;
480 }
481
482 protected :
483 [[nodiscard]]
486 demand_t & dest,
487 select_case_t & select_case ) override
488 {
490
491 const bool queue_empty = m_queue.is_empty();
492 if( queue_empty )
493 {
494 if( details::status::closed == m_status )
495 // There is no need to wait for something.
497
498 // In other cases select_tail must be modified.
501
503 }
504 else
506 }
507
508 [[nodiscard]]
511 const std::type_index & msg_type,
512 const message_ref_t & message,
513 mchain_props::select_case_t & select_case ) override
514 {
516 *this, // as tracing base.
517 *this, // as chain.
518 msg_type,
519 message };
520
522
523 // Message cannot be stored to closed chain.
524 if( details::status::closed == m_status )
526
527 if( m_queue.is_full() )
528 {
529 // The select_case should be stored until there will
530 // be a free space in the chain (or chain will be closed).
533
535 }
536 else
537 {
538 // Just store a new message to the queue.
540 tracer,
541 msg_type,
542 message );
544 }
545 }
546
547 void
549 select_case_t & select_case ) noexcept override
550 {
552
554 select_case_t * prev = nullptr;
555 while( c )
556 {
557 select_case_t * const next = c->query_next();
558 if( c == &select_case )
559 {
560 if( prev )
561 prev->set_next( next );
562 else
564
565 return;
566 }
567
568 prev = c;
569 c = next;
570 }
571 }
572
573 void
574 actual_close( close_mode_t mode ) override
575 {
577
578 if( details::status::closed == m_status )
579 return;
580
582
583 const bool was_full = m_queue.is_full();
584
586 {
587 while( !m_queue.is_empty() )
588 {
590 *this, m_queue.front() );
592 }
593 }
594
595 // Since v.5.7.0 select operations must be notified
596 // always, even if the mchain is not empty.
598
600 // Someone is waiting on empty chain for new messages.
601 // It must be informed that no new messages will be here.
603
604 if( was_full )
605 // Someone can wait on full chain for free place for new message.
606 // It must be informed that the chain is closed.
608 }
609
610 private :
611 //! SObjectizer Environment for which message chain is created.
613
614 //! Status of the chain.
616
617 //! Mbox ID for chain.
618 const mbox_id_t m_id;
619
620 //! Chain capacity.
622
623 //! Optional notificator for 'not_empty' condition.
625
626 //! Optional notificator for 'empty' condition.
627 /*!
628 * \since v.5.8.5
629 */
631
632 //! Chain's demands queue.
633 Queue m_queue;
634
635 //! Chain's lock.
637
638 //! Condition variable for waiting on empty queue.
640 //! Condition variable for waiting on full queue.
642
643 /*!
644 * \brief Count of threads sleeping on empty mchain.
645 *
646 * This value is incremented before sleeping on m_underflow_cond and
647 * decremented just after a return from this sleep.
648 *
649 * \since v.5.5.16
650 */
652
653 /*!
654 * \brief A queue of multi-chain selects in which this chain is used.
655 *
656 * \since v.5.5.16
657 */
659
660 //! Actual implementation of pushing message to the queue.
661 /*!
662 * \note
663 * This implementation must be used for ordinary delivery operations.
664 * For delivery operations from timer thread another method must be
665 * called (see try_to_store_message_to_queue_nonblocking_mode()).
666 */
667 void
669 const std::type_index & msg_type,
670 const message_ref_t & message )
671 {
673 *this, // as tracing base.
674 *this, // as chain.
675 msg_type,
676 message };
677
679
680 // Message cannot be stored to closed chain.
681 if( details::status::closed == m_status )
682 return;
683
684 // If queue full and waiting on full queue is enabled we
685 // must wait for some time until there will be some space in
686 // the queue.
687 bool queue_full = m_queue.is_full();
689 {
691 lock,
694 [this, &queue_full] {
696 return !queue_full ||
698 } );
699
700 // Message cannot be stored to closed chain.
701 //
702 // NOTE: this additional check is necessary after
703 // wait for overflow_timeout because the chain can
704 // be closed during that wait.
705 if( details::status::closed == m_status )
706 return;
707 }
708
709 // If queue still full we must perform some reaction.
710 if( queue_full )
711 {
714 {
715 // New message must be simply ignored.
717 return;
718 }
720 {
721 // The oldest message must be simply removed.
724 }
726 {
730 "an attempt to push message to full mchain "
731 "with overflow_reaction_t::throw_exception policy" );
732 }
733 else
734 {
738 log_stream << "overflow_reaction_t::abort_app "
739 "will be performed for mchain (id="
740 << m_id << "), msg_type: "
741 << msg_type.name()
742 << ". Application will be aborted"
743 << std::endl;
744 }
745 } );
746 }
747 }
748
750 tracer,
751 msg_type,
752 message );
753 }
754
755 /*!
756 * \brief An implementation of storing another message to
757 * chain for the case of delated/periodic messages.
758 *
759 * This implementation handles overloaded chains differently:
760 * - there is no waiting on overloaded chain (even if such waiting
761 * is specified in mchain params);
762 * - overflow_reaction_t::throw_exception is replaced by
763 * overflow_reaction_t::drop_newest.
764 *
765 * These defferences are necessary because the context of timer
766 * thread is very special: there can't be any long-time operation
767 * (like waiting for free space on overloaded chain) and there can't
768 * be an exception about mchain's overflow.
769 *
770 * \since v.5.5.18
771 */
772 void
774 const std::type_index & msg_type,
775 const message_ref_t & message )
776 {
778 *this, // as tracing base.
779 *this, // as chain.
780 msg_type,
781 message };
782
784
785 // Message cannot be stored to closed chain.
786 if( details::status::closed == m_status )
787 return;
788
789 bool queue_full = m_queue.is_full();
790 // NOTE: there is no awaiting on full mchain.
791 // If queue full we must perform some reaction.
792 if( queue_full )
793 {
797 {
798 // New message must be simply ignored.
800 return;
801 }
803 {
804 // The oldest message must be simply removed.
807 }
808 else
809 {
813 log_stream << "overflow_reaction_t::abort_app "
814 "will be performed for mchain (id="
815 << m_id << "), msg_type: "
816 << msg_type.name()
817 << ". Application will be aborted"
818 << std::endl;
819 }
820 } );
821 }
822 }
823
825 tracer,
826 msg_type,
827 message );
828 }
829
830 /*!
831 * \brief Implementation of extract operation for the case when
832 * message queue is not empty.
833 *
834 * \attention This helper method must be called when chain object
835 * is locked in some hi-level method.
836 *
837 * \since v.5.5.16
838 */
841 demand_t & dest )
842 {
843 // If queue was full then someone can wait on it.
844 const bool queue_was_full = m_queue.is_full();
845 dest = std::move( m_queue.front() );
847
848 this->trace_extracted_demand( *this, dest );
849
850 // Since v.5.8.5 there could be an empty notificator
851 // that has to be used if the queue becomes empty.
852 if( m_queue.is_empty() )
853 {
856 [this] { m_empty_notificator(); } );
857 }
858
859 if( queue_was_full )
860 {
861 // Since v.5.7.0 waiting select_cases should be
862 // notified too because they are send_cases.
864
866 }
867
869 }
870
871 /*!
872 * \since v.5.5.16
873 */
874 void
876 {
877 if( m_select_tail )
878 {
879 auto old = m_select_tail;
880 m_select_tail = nullptr;
881 old->notify();
882 }
883 }
884
885 /*!
886 * \brief A reusable method with implementation of
887 * last part of storing a message into chain.
888 *
889 * \note
890 * Intended to be called from try_to_store_message_to_queue_ordinary_mode()
891 * and try_to_store_message_to_queue_nonblocking_mode().
892 *
893 * \since v.5.5.18
894 */
895 void
897 typename Tracing_Base::deliver_op_tracer & tracer,
898 const std::type_index & msg_type,
899 const message_ref_t & message )
900 {
901 const bool was_empty = m_queue.is_empty();
902
904
906
907 // If chain was empty then multi-chain cases must be notified.
908 // And if not_empty_notificator is defined then it must be used too.
909 if( was_empty )
910 {
913 [this] { m_not_empty_notificator(); } );
914
916 }
917
918 // Should be wake up some sleeping thread?
920 // Someone is waiting on empty queue.
922 }
923 };
924
925} /* namespace mchain_props */
926
927} /* namespace so_5 */
An interface of message chain.
Definition mchain.hpp:459
Interface for message sink.
static bool special_sink_ptr_compare(const abstract_message_sink_t *a, const abstract_message_sink_t *b) noexcept
A base class for agents.
Definition agent.hpp:673
Interface for creator of new mbox in OOP style.
An interface of delivery filter object.
Definition mbox.hpp:62
SObjectizer Environment.
Mixin to be used in implementation of MPSC mbox with message limits.
Definition mpsc_mbox.hpp:42
agent_t & query_owner_reference() const noexcept
Definition mpsc_mbox.hpp:48
abstract_message_sink_t & message_sink_to_use(const local_mbox_details::subscription_info_with_sink_t &info) const noexcept
Definition mpsc_mbox.hpp:55
limitful_mpsc_mbox_mixin_t(outliving_reference_t< agent_t > owner)
Definition mpsc_mbox.hpp:62
Mixin to be used in implementation of MPSC mbox without message limits.
Definition mpsc_mbox.hpp:77
message_sink_without_message_limit_t m_actual_sink
Actual message sink to be used.
Definition mpsc_mbox.hpp:79
agent_t & query_owner_reference() const noexcept
Definition mpsc_mbox.hpp:84
abstract_message_sink_t & message_sink_to_use(const local_mbox_details::subscription_info_with_sink_t &) noexcept
Definition mpsc_mbox.hpp:91
limitless_mpsc_mbox_mixin_t(outliving_reference_t< agent_t > owner)
Definition mpsc_mbox.hpp:98
A special container for holding subscriber_info objects.
subscriber_adaptive_container_t(const subscriber_adaptive_container_t &o)
Copy constructor.
storage_type m_storage
The current storage type to be used by container.
subscriber_adaptive_container_t & operator=(subscriber_adaptive_container_t &&o) noexcept
Move operator.
void insert_to_vector(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to vector.
void emplace(abstract_message_sink_t &sink_as_key, Args &&... args)
friend void swap(subscriber_adaptive_container_t &a, subscriber_adaptive_container_t &b) noexcept
vector_type m_vector
Container for small amount of subscriber_infos.
void insert(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
void insert_to_map(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t &&info)
Insertion of new item to map.
subscriber_adaptive_container_t(subscriber_adaptive_container_t &&o) noexcept
Move constructor.
void switch_storage_to_map()
Switching storage from vector to map.
iterator find(abstract_message_sink_t &subscriber)
map_type m_map
Container for large amount of subscriber_infos.
iterator find_in_vector(abstract_message_sink_t &subscriber)
subscriber_adaptive_container_t & operator=(const subscriber_adaptive_container_t &o)
Copy operator.
iterator find_in_map(abstract_message_sink_t &subscriber)
void switch_storage_to_vector()
Switching storage from map to vector.
An information block about one subscription to one message type with presence of message_sink.
abstract_message_sink_t & sink_reference() const noexcept
Get a reference to the subscribed sink.
A template with implementation of local mbox.
mbox_type_t type() const override
Get the type of message box.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
mbox_id_t id() const override
Unique ID of this mbox.
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep)
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
void do_deliver_message_to_subscriber(const local_mbox_details::subscription_info_with_sink_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) const
void ensure_immutable_message(const std::type_index &msg_type, const message_ref_t &what) const
Ensures that message is an immutable message.
local_mbox_template(mbox_id_t id, environment_t &env, Tracing_Args &&... args)
void modify_and_remove_subscriber_if_needed(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Changer changer)
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
std::string query_name() const override
Get the mbox name.
void insert_or_modify_subscriber(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber, Info_Maker maker, Info_Changer changer)
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
mbox_t create_ordinary_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that handles message limits.
mbox_t create_limitless_mpsc_mbox(environment_t &env, agent_t &owner)
Create mpsc_mbox that ignores message limits.
mbox_t introduce_named_mbox(mbox_namespace_name_t mbox_namespace, nonempty_name_t mbox_name, const std::function< mbox_t() > &mbox_factory)
Introduce named mbox with user-provided factory.
mbox_core_t(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff)
Definition mbox_core.cpp:27
mbox_t create_custom_mbox(environment_t &env, ::so_5::custom_mbox_details::creator_iface_t &creator)
Create a custom mbox.
void destroy_mbox(const full_named_mbox_id_t &name) noexcept
Remove a reference to the named mbox.
mchain_t create_mchain(environment_t &env, const mchain_params_t &params)
Create message chain.
mbox_t create_mbox(environment_t &env)
Create local anonymous mbox.
Definition mbox_core.cpp:35
mbox_t create_mbox(environment_t &env, nonempty_name_t mbox_name)
Create local named mbox.
Definition mbox_core.cpp:46
mbox_id_t allocate_mbox_id() noexcept
Allocate an ID for a new custom mbox or mchain.
mbox_core_stats_t query_stats()
Get statistics for run-time monitoring.
void push_event(mbox_id_t mbox_id, message_delivery_mode_t, const std::type_index &msg_type, const message_ref_t &message, unsigned int, const message_limit::impl::action_msg_tracer_t *tracer) override
Get a message and push it to the appropriate destination.
mpsc_mbox_template_t(mbox_id_t id, environment_t &env, outliving_reference_t< agent_t > owner, Tracing_Args &&... tracing_args)
mbox_type_t type() const override
Get the type of message box.
void modify_and_remove_subscription_if_needed(const std::type_index &msg_type, Info_Changer changer)
Helper for modification and deletion of subscription info.
const mbox_id_t m_id
ID of this mbox.
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
void do_delivery(const std::type_index &msg_type, const message_ref_t &message, typename Tracing_Base::deliver_op_tracer const &tracer, L l)
Helper method to do delivery actions under locked object.
subscriptions_map_t m_subscriptions
Information about the current subscriptions.
default_rw_spinlock_t m_lock
Protection of object from modification.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &) noexcept override
Removes delivery filter for message type and subscriber.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
std::string query_name() const override
Get the mbox name.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
Add the message handler.
mbox_id_t id() const override
Unique ID of this mbox.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
void insert_or_modify_subscription(const std::type_index &msg_type, Info_Maker maker, Info_Changer changer)
Helper for performing insertion or modification of subscription info.
environment_t & m_env
Environment in that the mbox was created.
Base class for a mbox for the case when message delivery tracing is enabled.
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
Set a delivery filter for message type and subscriber.
mbox_id_t id() const override
Unique ID of this mbox.
void subscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) override
Add the message handler.
named_local_mbox_t(full_named_mbox_id_t full_name, const mbox_t &mbox, impl::mbox_core_t &mbox_core)
mbox_type_t type() const override
Get the type of message box.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
Deliver message for all subscribers with respect to message limits.
impl::mbox_core_ref_t m_mbox_core
An utility for this mbox.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
std::string query_name() const override
Get the mbox name.
void unsubscribe_event_handler(const std::type_index &type_wrapper, abstract_message_sink_t &subscriber) noexcept override
Remove all message handlers.
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Removes delivery filter for message type and subscriber.
const full_named_mbox_id_t m_name
Mbox name.
A class for the name of mbox_namespace.
Parameters for message chain.
Definition mchain.hpp:741
Parameters for defining chain size.
Definition mchain.hpp:229
Implementation of demands queue for size-limited message chain with dynamically allocated storage.
limited_dynamic_demand_queue(const capacity_t &capacity)
Initializing constructor.
std::size_t size() const noexcept
Size of the queue.
void pop_front()
Remove the front item from queue.
void push_back(demand_t &&demand)
Add a new item to the end of the queue.
const std::size_t m_max_size
Maximum size of the queue.
demand_t & front()
Access to front item from queue.
Implementation of demands queue for size-limited message chain with preallocated storage.
std::size_t size() const noexcept
Size of the queue.
void push_back(demand_t &&demand)
Add a new item to the end of the queue.
const std::size_t m_max_size
Maximum size of the queue.
limited_preallocated_demand_queue(const capacity_t &capacity)
Initializing constructor.
demand_t & front()
Access to front item from queue.
Implementation of demands queue for size-unlimited message chain.
demand_t & front()
Access to front item from queue.
void push_back(demand_t &&demand)
Add a new item to the end of the queue.
std::deque< demand_t > m_queue
Queue's storage.
void pop_front()
Remove the front item from queue.
std::size_t size() const noexcept
Size of the queue.
bool is_empty() const noexcept
Is queue empty?
bool is_full() const noexcept
Is queue full?
Template-based implementation of message chain.
void unsubscribe_event_handler(const std::type_index &, abstract_message_sink_t &) noexcept override
Remove all message handlers.
std::string query_name() const override
Get the mbox name.
mbox_type_t type() const override
Get the type of message box.
environment_t & environment() const noexcept override
SObjectizer Environment for which the mbox is created.
const empty_notification_func_t m_empty_notificator
Optional notificator for 'empty' condition.
void set_delivery_filter(const std::type_index &, const delivery_filter_t &, abstract_message_sink_t &) override
const capacity_t m_capacity
Chain capacity.
std::size_t size() const override
Count of messages in the chain.
void try_to_store_message_to_queue_nonblocking_mode(const std::type_index &msg_type, const message_ref_t &message)
An implementation of storing another message to chain for the case of delated/periodic messages.
void try_to_store_message_to_queue_ordinary_mode(const std::type_index &msg_type, const message_ref_t &message)
Actual implementation of pushing message to the queue.
const not_empty_notification_func_t m_not_empty_notificator
Optional notificator for 'not_empty' condition.
void subscribe_event_handler(const std::type_index &, abstract_message_sink_t &) override
Add the message handler.
extraction_status_t extract_demand_from_not_empty_queue(demand_t &dest)
Implementation of extract operation for the case when message queue is not empty.
details::status m_status
Status of the chain.
select_case_t * m_select_tail
A queue of multi-chain selects in which this chain is used.
bool empty() const override
Is message chain empty?
std::condition_variable m_overflow_cond
Condition variable for waiting on full queue.
Queue m_queue
Chain's demands queue.
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int) override
Deliver message for all subscribers with respect to message limits.
std::size_t m_threads_to_wakeup
Count of threads sleeping on empty mchain.
void remove_from_select(select_case_t &select_case) noexcept override
Removement of mchain from multi chain select.
mchain_props::push_status_t push(const std::type_index &msg_type, const message_ref_t &message, mchain_props::select_case_t &select_case) override
An attempt to push a new message into the mchain.
mbox_id_t id() const override
Unique ID of this mbox.
const mbox_id_t m_id
Mbox ID for chain.
environment_t & m_env
SObjectizer Environment for which message chain is created.
extraction_status_t extract(demand_t &dest, duration_t empty_queue_timeout) override
void actual_close(close_mode_t mode) override
Close the chain.
void complete_store_message_to_queue(typename Tracing_Base::deliver_op_tracer &tracer, const std::type_index &msg_type, const message_ref_t &message)
A reusable method with implementation of last part of storing a message into chain.
std::condition_variable m_underflow_cond
Condition variable for waiting on empty queue.
mchain_template(so_5::environment_t &env, mbox_id_t id, const mchain_params_t &params, Tracing_Args &&... tracing_args)
Initializing constructor.
extraction_status_t extract(demand_t &dest, select_case_t &select_case) override
An extraction attempt as a part of multi chain select.
void drop_delivery_filter(const std::type_index &, abstract_message_sink_t &) noexcept override
Removes delivery filter for message type and subscriber.
Base class for representation of one case in multi chain select.
Interface of holder of message tracer and message trace filter objects.
A class for the name which cannot be empty.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
#define SO_5_LOG_ERROR(logger, var_name)
A special macro for helping error logging.
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
std::unique_ptr< abstract_message_box_t > make_actual_mbox(outliving_reference_t< so_5::msg_tracing::holder_t > msg_tracing_stuff, A &&... args)
void ensure_sink_for_same_owner(agent_t &actual_owner, abstract_message_sink_t &sink)
Helper the ensures that sink can be used with agent.
Implementation details for MPMC mboxes.
Various helpers for message delivery tracing mechanism.
Details of SObjectizer run-time implementations.
Definition agent.cpp:905
Implementation details.
Definition mchain.hpp:37
void ensure_queue_not_empty(Q &&queue)
Helper function which throws an exception if queue is empty.
status
Status of the message chain.
@ closed
Bag is closed. New messages cannot be sent to it.
@ open
Bag is open and can be used for message sending.
void ensure_queue_not_full(Q &&queue)
Helper function which throws an exception if queue is full.
Various properties and parameters of message chains.
Definition mchain.hpp:28
close_mode_t
What to do with chain's content at close.
Definition mchain.hpp:410
extraction_status_t
Result of extraction of message from a message chain.
Definition mchain.hpp:371
push_status_t
Result of attempt of pushing a message into a message chain.
Definition mchain.hpp:389
Public part of message delivery tracing mechanism.
Private part of message limit implementation.
Definition agent.cpp:33
message_delivery_mode_t
Possible modes of message/signal delivery.
Definition types.hpp:172
mbox_type_t
Type of the message box.
Definition mbox.hpp:163
Full name for a named mbox.
Definition mbox_core.hpp:69
A coolection of data required for local mbox implementation.
messages_table_t m_subscribers
Map of subscribers to messages.
data_t(mbox_id_t id, environment_t &env)
environment_t & m_env
Environment for which the mbox is created.
default_rw_spinlock_t m_lock
Object lock.
const mbox_id_t m_id
ID of this mbox.
bool operator()(abstract_message_sink_t *a, abstract_message_sink_t *b) const noexcept
bool operator()(const subscribers_vector_item_t &a, const subscribers_vector_item_t &b) const noexcept
subscription_info_with_sink_t m_info
Information about the subscription.
abstract_message_sink_t * m_sink_as_key
Pointer to sink that has to be used as search key.
subscribers_vector_item_t(abstract_message_sink_t &sink_as_key, subscription_info_with_sink_t info)
The normal initializing constructor.
Statistics from mbox_core for run-time monitoring.
Definition mbox_core.hpp:49
Base class for a mbox for the case when message delivery tracing is disabled.
Description of one demand in message chain.
Definition mchain.hpp:144