SObjectizer-5 Extra
Loading...
Searching...
No Matches
inflight_limit.hpp
Go to the documentation of this file.
1/*!
2 * \file
3 * \brief Implementation of proxy mbox with inflight limit.
4 *
5 * \since v.1.5.2
6 */
7
8#pragma once
9
10#include <so_5_extra/mboxes/proxy.hpp>
11
12#include <so_5_extra/error_ranges.hpp>
13
14#include <so_5_extra/enveloped_msg/just_envelope.hpp>
15
16#include <so_5/impl/msg_tracing_helpers.hpp>
17
18#include <so_5/environment.hpp>
19
20#include <atomic>
21
22namespace so_5 {
23
25
26/*!
27 * \brief Type to be used for limit and counter of inflight messages.
28 *
29 * \since v.1.5.2
30 */
31using underlying_counter_t = unsigned int;
32
33} /* namespace extra::mboxes::inflight_limit */
34
35namespace impl::msg_tracing_helpers::details {
36
37// Special extension for inflight_limit specific data.
39
41 {
42 so_5::extra::mboxes::inflight_limit::underlying_counter_t m_limit;
44 };
45
46inline void
48 {
49 s << "[inflight_limit=" << info.m_limit << ",inflight_current="
50 << info.m_current_number << "]";
51 }
52
53inline void
55 actual_trace_data_t & /*d*/,
57 {
58 // Nothing to do.
59 }
60
61} /* namespace extra_inflight_limit_specifics */
62
63} /* namespace impl::msg_tracing_helpers::details */
64
65namespace extra {
66
67namespace mboxes {
68
69namespace inflight_limit {
70
71namespace errors {
72
73/*!
74 * \brief An attempt to use a message type that differs from mbox's message
75 * type.
76 *
77 * Type of message to be used with inflight_limit_mbox
78 * is fixed as part of inflight_limit_mbox type.
79 * An attempt to use different message type (for subscription, delivery or
80 * setting delivery filter) will lead to an exception with this error code.
81 *
82 * \since v.1.5.2
83 */
86
87/*!
88 * \brief Null pointer to underlying mbox.
89 *
90 * A inflight_limit_mbox uses underlying mbox and delegates all actions to that mbox.
91 * Because of that underlying mbox can't be nullptr.
92 *
93 * \since v.1.5.2
94 */
97
98} /* namespace impl */
99
100namespace impl {
101
102/*!
103 * \brief Separate type for holding inflight message counter as a separate object.
104 *
105 * It's expected that an instance of instances_counter_t will be created in
106 * dynamic memory and shared between entities via shared_ptr.
107 *
108 * \since v.1.5.2
109 */
111 {
112 //! Counter of inflight instances.
114 };
115
116/*!
117 * \brief An alias for shared_ptr to instances_counter.
118 *
119 * \since v.1.5.2
120 */
122
123/*!
124 * \brief Helper class for incrementing/decrementing number of messages in
125 * RAII style.
126 *
127 * An instance always increments the counter in the constructor. The result
128 * value is stored inside counter_incrementer_t instance. That value is available
129 * via value() method.
130 *
131 * The destructor decrements the counter if there weren't a call to
132 * do_not_decrement_in_destructor().
133 *
134 * The intended usage scenario is:
135 *
136 * - create an instance of counter_incrementer_t;
137 * - check the counter via value() method;
138 * - if limit wasn't exceeded then create an appropriate envelope for a message and
139 * call do_not_decrement_in_destructor(). In such a case the envelope will
140 * decrement the number of inflight messages;
141 * - if limit was exceeded then just stop the operation and the destructor of
142 * counter_incrementer_t will decrement number of messages automatically.
143 *
144 * \since v.1.5.2
145 */
147 {
149 const underlying_counter_t m_value;
150
152
153 public:
155 outliving_reference_t< instances_counter_t > counter ) noexcept
156 : m_counter{ counter.get() }
158 {}
159
165
166 void
171
172 [[nodiscard]]
174 value() const noexcept
175 {
176 return m_value;
177 }
178 };
179
180/*!
181 * \brief Type of envelope to be used by inflight_limit_mbox.
182 *
183 * \attention
184 * The envelope expects that the number of messages is already incremented before
185 * the creation of the envelope. That number is always decremented in the
186 * destructor.
187 *
188 * \since v.1.5.2
189 */
190class special_envelope_t final : public so_5::extra::enveloped_msg::just_envelope_t
191 {
192 using base_type_t = so_5::extra::enveloped_msg::just_envelope_t;
193
195
196 public:
197 //! Initializing constructor.
199 message_ref_t payload,
200 instances_counter_shptr_t counter )
203 {}
204
206 {
207 // Counter should always be decremented because it was
208 // incremented before the creation of envelope instance.
209 (m_counter->m_instances)--;
210 }
211 };
212
213/*!
214 * \brief Helper type that tells that underlying mbox isn't nullptr.
215 *
216 * \since v.1.5.2
217 */
219 {
222
224
225 not_null_underlying_mbox_t( so_5::mbox_t value )
226 : m_value{ std::move(value) }
227 {}
228
229 public:
230 [[nodiscard]]
231 const so_5::mbox_t &
232 value() const noexcept { return m_value; }
233 };
234
235//! Ensure that underlying mbox is not nullptr.
236/*!
237 * \throw so_5::exception_t if \a mbox is nullptr.
238 */
239[[nodiscard]]
242 so_5::mbox_t mbox )
243 {
244 if( !mbox )
245 SO_5_THROW_EXCEPTION(
247 "nullptr is used as underlying mbox" );
248
249 return { std::move(mbox) };
250 }
251
252/*!
253 * \brief Actual implementation of inflight_limit_mbox.
254 *
255 * \tparam Tracing_Base base class with implementation of message
256 * delivery tracing methods.
257 *
258 * \since v.1.5.2
259 */
260template< typename Tracing_Base >
261class actual_mbox_t final
262 : public so_5::abstract_message_box_t
263 , private Tracing_Base
264 {
265 //! Actual underlying mbox to be used for all calls.
266 /*!
267 * \attention Should not be nullptr.
268 */
270
271 //! Type of a message for that mbox is created.
273
274 //! The limit of inflight messages.
275 const underlying_counter_t m_limit;
276
277 //! Counter for inflight instances.
279
280 public:
281 /*!
282 * \brief Initializing constructor.
283 *
284 * \tparam Tracing_Args parameters for Tracing_Base constructor
285 * (can be empty list if Tracing_Base have only the default constructor).
286 */
287 template< typename... Tracing_Args >
289 //! Destination mbox.
290 const not_null_underlying_mbox_t & dest_mbox,
291 //! Type of a message for that mbox.
292 std::type_index msg_type,
293 //! The limit of inflight messages.
294 underlying_counter_t limit,
295 //! Optional parameters for Tracing_Base's constructor.
296 Tracing_Args &&... args )
297 : Tracing_Base{ std::forward< Tracing_Args >(args)... }
298 , m_underlying_mbox{ dest_mbox.value() }
299 , m_msg_type{ std::move(msg_type) }
300 , m_limit{ limit }
301 , m_instances_counter{ std::make_shared< instances_counter_t >() }
302 {}
303
305 id() const override
306 {
307 return m_underlying_mbox->id();
308 }
309
310 void
312 const std::type_index & msg_type,
313 abstract_message_sink_t & subscriber ) override
314 {
315 ensure_expected_msg_type(
316 msg_type,
317 "an attempt to subscribe with different message type" );
318
319 m_underlying_mbox->subscribe_event_handler(
320 msg_type, subscriber );
321 }
322
323 void
325 const std::type_index & msg_type,
326 abstract_message_sink_t & subscriber ) noexcept override
327 {
328 // Since v.1.6.0 we can't throw. So perform the main
329 // action only if types are the same.
330 if( msg_type != m_msg_type )
331 {
332 m_underlying_mbox->unsubscribe_event_handler(
333 msg_type, subscriber );
334 }
335 }
336
337 std::string
338 query_name() const override
339 {
340 return m_underlying_mbox->query_name();
341 }
342
344 type() const override
345 {
346 return m_underlying_mbox->type();
347 }
348
349 void
351 message_delivery_mode_t delivery_mode,
352 const std::type_index & msg_type,
353 const message_ref_t & message,
354 unsigned int redirection_deep ) override
355 {
356 ensure_expected_msg_type(
357 msg_type,
358 "an attempt to deliver message of a different message type" );
359
360 typename Tracing_Base::deliver_op_tracer tracer{
361 *this, // as Tracing_base
362 *this, // as abstract_message_box_t
363 "deliver_message",
364 delivery_mode,
365 msg_type, message, redirection_deep };
366
367 // Step 1: increment the counter and check that the limit
368 // isn't exceeded yet.
369 counter_incrementer_t incrementer{
370 outliving_mutable( *m_instances_counter )
371 };
372 if( incrementer.value() <= m_limit )
373 {
374 // NOTE: if there will be an exception then the number
375 // of instance will be decremented by incrementer.
376 message_ref_t our_envelope{
377 std::make_unique< special_envelope_t >(
378 std::move(message),
379 m_instances_counter )
380 };
381
382 // incrementer shouldn't control the number of instances
383 // anymore.
384 incrementer.do_not_decrement_in_destructor();
385
386 // Our envelope object has to be sent.
387 m_underlying_mbox->do_deliver_message(
388 delivery_mode,
389 msg_type,
390 our_envelope,
391 redirection_deep );
392 }
393 else
394 {
395 using namespace so_5::impl::msg_tracing_helpers::details::
396 extra_inflight_limit_specifics;
397
398 tracer.make_trace(
399 "too_many_inflight_messages",
400 limit_info{ m_limit, incrementer.value() } );
401 }
402 }
403
404 void
406 const std::type_index & msg_type,
407 const delivery_filter_t & filter,
408 abstract_message_sink_t & subscriber ) override
409 {
410 ensure_expected_msg_type(
411 msg_type,
412 "an attempt to set delivery_filter for different "
413 "message type" );
414
415 m_underlying_mbox->set_delivery_filter(
416 msg_type,
417 filter,
418 subscriber );
419 }
420
421 void
423 const std::type_index & msg_type,
424 abstract_message_sink_t & subscriber ) noexcept override
425 {
426 // Because drop_delivery_filter is noexcept we just ignore
427 // an errornous call with a different message type.
428 if( msg_type == m_msg_type )
429 {
430 m_underlying_mbox->drop_delivery_filter(
431 msg_type,
432 subscriber );
433 }
434 }
435
437 environment() const noexcept override
438 {
439 return m_underlying_mbox->environment();
440 }
441
442 private:
443 /*!
444 * Throws an error if msg_type differs from expected message type.
445 */
446 void
448 const std::type_index & msg_type,
449 std::string_view error_description ) const
450 {
451 if( msg_type != m_msg_type )
452 SO_5_THROW_EXCEPTION(
453 errors::rc_different_message_type,
454 error_description );
455 }
456 };
457
458/*!
459 * \brief Check for compatibility between mbox type and message type.
460 *
461 * Throws if mutable message is used with MPMC mbox.
462 *
463 * \since v.1.5.2
464 */
465template< typename Msg_Type >
466void
468 //! NOTE: it's expected to be not-null.
469 const so_5::mbox_t & underlying_mbox )
470 {
471 // Use of mutable message type for MPMC mbox should be prohibited.
472 if constexpr( is_mutable_message< Msg_Type >::value )
473 {
474 switch( underlying_mbox->type() )
475 {
479 "an attempt to make MPMC mbox for mutable message, "
480 "msg_type=" + std::string(typeid(Msg_Type).name()) );
481 break;
482
484 break;
485 }
486 }
487 }
488
489} /* namespace impl */
490
491/*!
492 * \brief Create an instance of inflight_limit_mbox.
493 *
494 * Usage example:
495 *
496 * \code
497 * namespace mbox_ns = so_5::extra::mboxes::inflight_limit;
498 *
499 * so_5::environment_t & env = ...;
500 *
501 * // Create an inflight_limit_mbox with underlying MPMC mbox for immutable message.
502 * auto my_mbox = mbox_ns::make_mbox<my_msg>(env.create_mbox(), 15u);
503 *
504 * // Create an inflight_limit_mbox with underlying MPSC mbox for mutable message.
505 * class demo_agent : public so_5::agent_t
506 * {
507 * const so_5::mbox_t my_mbox_;
508 * public:
509 * demo_agent(context_t ctx)
510 * : so_5::agent_t{std::move(ctx)}
511 * , my_mbox{ mbox_ns::make_mbox< so_5::mutable_msg<my_msg> >(so_direct_mbox(), 4u) }
512 * {...}
513 * ...
514 * };
515 * \endcode
516 *
517 * \tparam Msg_Type type of message to be used with a new mbox.
518 *
519 * \since v.1.5.2
520 */
521template< typename Msg_Type >
522[[nodiscard]]
523mbox_t
525 //! Actual destination mbox.
527 //! The limit of inflight messages.
529 {
531 std::move(dest_mbox) );
532
533 // Use of mutable message type for MPMC mbox should be prohibited.
536
537 auto & env = underlying_mbox.value()->environment();
538
539 return env.make_custom_mbox(
541 {
543
545 {
546 using T = impl::actual_mbox_t<
548
549 result = mbox_t{ new T{
554 } };
555 }
556 else
557 {
558 using T = impl::actual_mbox_t<
560
561 result = mbox_t{ new T{
565 } };
566 }
567
568 return result;
569 } );
570 }
571
572} /* namespace inflight_limit */
573
574} /* namespace mboxes */
575
576} /* namespace extra */
577
578} /* namespace so_5 */
A very simple implementation of envelope which do nothing except holding a payload.
void subscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) override
const std::type_index m_msg_type
Type of a message for that mbox is created.
actual_mbox_t(const not_null_underlying_mbox_t &dest_mbox, std::type_index msg_type, underlying_counter_t limit, Tracing_Args &&... args)
Initializing constructor.
instances_counter_shptr_t m_instances_counter
Counter for inflight instances.
so_5::environment_t & environment() const noexcept override
void unsubscribe_event_handler(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
const so_5::mbox_t m_underlying_mbox
Actual underlying mbox to be used for all calls.
const underlying_counter_t m_limit
The limit of inflight messages.
void ensure_expected_msg_type(const std::type_index &msg_type, std::string_view error_description) const
void set_delivery_filter(const std::type_index &msg_type, const delivery_filter_t &filter, abstract_message_sink_t &subscriber) override
void do_deliver_message(message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const message_ref_t &message, unsigned int redirection_deep) override
void drop_delivery_filter(const std::type_index &msg_type, abstract_message_sink_t &subscriber) noexcept override
Helper class for incrementing/decrementing number of messages in RAII style.
Helper type that tells that underlying mbox isn't nullptr.
friend not_null_underlying_mbox_t ensure_underlying_mbox_not_null(so_5::mbox_t)
Ensure that underlying mbox is not nullptr.
special_envelope_t(message_ref_t payload, instances_counter_shptr_t counter)
Initializing constructor.
const int mboxes_inflight_limit_errors
Starting point for errors of mboxes::inflight_limit submodule.
const int rc_different_message_type
An attempt to use a message type that differs from mbox's message type.
const int rc_nullptr_as_underlying_mbox
Null pointer to underlying mbox.
void ensure_valid_message_type_for_underlying_mbox(const so_5::mbox_t &underlying_mbox)
Check for compatibility between mbox type and message type.
mbox_t make_mbox(mbox_t dest_mbox, underlying_counter_t inflight_limit)
Create an instance of inflight_limit_mbox.
void make_trace_to_1(std::ostream &s, extra_inflight_limit_specifics::limit_info info)
void fill_trace_data_1(actual_trace_data_t &, extra_inflight_limit_specifics::limit_info)
Ranges for error codes of each submodules.
Definition details.hpp:13
Separate type for holding inflight message counter as a separate object.
std::atomic< underlying_counter_t > m_instances
Counter of inflight instances.
so_5::extra::mboxes::inflight_limit::underlying_counter_t m_current_number
so_5::extra::mboxes::inflight_limit::underlying_counter_t m_limit