SObjectizer 5.8
Loading...
Searching...
No Matches
subscr_storage_flat_set_based.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief A flat-set based storage for agent's subscriptions information.
8 *
9 * \since v.5.8.2
10 */
11
12#include <so_5/impl/subscription_storage_iface.hpp>
13
14#include <so_5/details/rollback_on_exception.hpp>
15
16#include <algorithm>
17#include <iterator>
18#include <tuple>
19#include <vector>
20
21namespace so_5
22{
23
24namespace impl
25{
26
27/*!
28 * \brief A flat-set storage for agent's subscriptions information.
29 *
30 * \since v.5.8.2
31 *
32 */
34{
35
36/*!
37 * \brief A flat-set based storage for agent's subscriptions information.
38 *
39 * This implementation uses a sorted vector (aka "flat_set") of subscr_info_t.
40 * Binary search is used for searching subscription, create and drop subscription
41 * operations.
42 *
43 * An std::vector is used as underlying storage. It will grow as necessary. Initial
44 * capacity is specified in the constructor.
45 *
46 * \since v.5.8.2
47 */
49 {
50 public :
52 std::size_t initial_capacity );
53 ~storage_t() override;
54
55 virtual void
57 const mbox_t & mbox_ref,
58 const std::type_index & type_index,
59 abstract_message_sink_t & message_sink,
60 const state_t & target_state,
61 const event_handler_method_t & method,
62 thread_safety_t thread_safety,
63 event_handler_kind_t handler_kind ) override;
64
65 virtual void
67 const mbox_t & mbox,
68 const std::type_index & msg_type,
69 const state_t & target_state ) noexcept override;
70
71 void
73 const mbox_t & mbox,
74 const std::type_index & msg_type ) noexcept override;
75
76 void
77 drop_all_subscriptions() noexcept override;
78
81 mbox_id_t mbox_id,
82 const std::type_index & msg_type,
83 const state_t & current_state ) const noexcept override;
84
85 void
86 debug_dump( std::ostream & to ) const override;
87
88 void
89 drop_content() noexcept override;
90
91 subscription_storage_common::subscr_info_vector_t
92 query_content() const override;
93
94 void
96 subscription_storage_common::subscr_info_vector_t && info ) override;
97
98 std::size_t
99 query_subscriptions_count() const override;
100
101 private :
103 using subscr_info_vector_t =
104 subscription_storage_common::subscr_info_vector_t;
105
106 //! A helper predicate for searching the same mbox and message type pairs.
107 /*!
108 * This predicate is useful when the pointer to target state has be
109 * be ignored. For example, when we have to remove all subscriptions
110 * for all states.
111 */
113 {
114 const mbox_id_t m_id;
115 const std::type_index & m_type;
116
117 [[nodiscard]] bool
118 operator()( const info_t & info ) const noexcept
119 {
120 return m_id == info.m_mbox->id() &&
121 m_type == info.m_msg_type;
122 }
123 };
124
125 /*!
126 * @brief Helper type for storing only key information about a subscription.
127 *
128 * For fast search in a vector of subscriptions we have to deal with
129 * only a few of key fields of subscr_info_t. This helper type allows to
130 * agregate all those fields in a (rather) small object.
131 */
133 {
134 mbox_id_t m_mbox_id;
135 const std::type_index & m_msg_type;
136 const state_t * m_state;
137 };
138
139 //! A helper predicate for searching specific subscription
140 /*!
141 * This predicate is intended to be used for adding new subscription
142 * and removing of an existing subscription.
143 *
144 * It has several operator() to be used for comparison if different
145 * applications.
146 */
148 {
149 [[nodiscard]] bool
150 operator()( const key_info_t & a, const key_info_t & b ) const noexcept
151 {
152 if( a.m_mbox_id < b.m_mbox_id )
153 return true;
154 else if( a.m_mbox_id == b.m_mbox_id )
155 {
156 if( a.m_msg_type < b.m_msg_type )
157 return true;
158 else if( a.m_msg_type == b.m_msg_type )
159 {
160 // NOTE: it's UB to compare two arbitrary pointers.
161 using ptr_comparator_t = std::less< const state_t * >;
162 return ptr_comparator_t{}( a.m_state, b.m_state );
163 }
164 }
165
166 return false;
167 }
168
169 [[nodiscard]] bool
170 operator()( const info_t & a, const key_info_t & b ) const noexcept
171 {
172 return (*this)(
174 b );
175 }
176
177 [[nodiscard]] bool
178 operator()( const info_t & a, const info_t & b ) const noexcept
179 {
180 return (*this)(
183 }
184 };
185
186 //! Subscription information.
187 subscr_info_vector_t m_events;
188
189 void
190 destroy_all_subscriptions() noexcept;
191
192 /*!
193 * @brief Helper for checking presence of subscriptions for the same
194 * message from the same mbox.
195 *
196 * Accepts a valid iterator and returns true if there are at least one
197 * item around it (one to the left or/and one to the right) that
198 * has the same mbox_id and msg_type.
199 *
200 * This helper handles case when @a it points to the first or
201 * the right item of m_events.
202 *
203 * @attention
204 * The @a it should not be "past the end" iterator.
205 */
206 [[nodiscard]] bool
208 subscr_info_vector_t::iterator it,
209 const is_same_mbox_msg_t & predicate ) const noexcept;
210 };
211
212namespace
213{
214
215/*!
216 * @brief Helper to check if two objects are for the same subscription.
217 *
218 * @note
219 * Only mbox_id, msg_type and target_state are compared.
220 */
221[[nodiscard]] bool
224 const subscription_storage_common::subscr_info_t & b ) noexcept
225{
226 return a.m_mbox->id() == b.m_mbox->id()
228 && a.m_state == b.m_state
229 ;
230}
231
232/*!
233 * @brief Helper to check if subscription information the same.
234 *
235 * @note
236 * Only mbox_id, msg_type and target_state are compared.
237 */
238[[nodiscard]] bool
241 mbox_id_t mbox_id,
242 const std::type_index & msg_type,
243 const state_t * target_state ) noexcept
244{
245 return a.m_mbox->id() == mbox_id
246 && a.m_msg_type == msg_type
247 && a.m_state == target_state
248 ;
249}
250
251} /* namespace anonymous */
252
254 std::size_t initial_capacity )
255 {
256 m_events.reserve( initial_capacity );
257 }
258
263
264void
266 const mbox_t & mbox,
267 const std::type_index & msg_type,
268 abstract_message_sink_t & message_sink,
269 const state_t & target_state,
270 const event_handler_method_t & method,
271 thread_safety_t thread_safety,
272 event_handler_kind_t handler_kind )
273 {
274 using namespace std;
275 using namespace subscription_storage_common;
276
277 info_t info_to_store{
278 mbox,
279 msg_type,
280 message_sink,
281 target_state,
282 method,
283 thread_safety,
284 handler_kind
285 };
286
287 // Check that this subscription is new.
288 auto it = std::lower_bound(
289 m_events.begin(), m_events.end(),
290 info_to_store,
292 if( it != m_events.end() && is_equal( *it, info_to_store ) )
293 // Subscription already exists. It's an error!
296 "agent is already subscribed to message, " +
297 make_subscription_description( mbox, msg_type, target_state ) );
298
299 // Just add subscription to the storage.
300 it = m_events.insert( it, std::move(info_to_store) );
301
302 // Need to check is there existing information for (mbox_id, msg_type)
303 // pair. If it's already here then there is no need to call
304 // subscribe_event_handler for the mbox.
305 const bool info_for_mbox_msg_type_exists =
307 it,
308 is_same_mbox_msg_t{ mbox->id(), msg_type } );
309
310 // Note: since v.5.5.9 mbox subscription is initiated even if
311 // it is MPSC mboxes. It is important for the case of message
312 // delivery tracing.
313
314 if( !info_for_mbox_msg_type_exists )
315 {
316 // Mbox must create subscription.
318 [&] {
320 msg_type,
321 message_sink );
322 },
323 [&] {
324 m_events.erase( it );
325 } );
326 }
327 }
328
329void
331 const mbox_t & mbox,
332 const std::type_index & msg_type,
333 const state_t & target_state ) noexcept
334 {
335 using namespace std;
336
337 auto existed_position = std::lower_bound(
338 m_events.begin(), m_events.end(),
339 key_info_t{ mbox->id(), msg_type, std::addressof(target_state) },
341 if( existed_position != m_events.end()
342 && is_equal( *existed_position,
343 mbox->id(), msg_type, std::addressof(target_state) ) )
344 {
345 // This value may be necessary for unsubscription.
346 abstract_message_sink_t & message_sink =
347 existed_position->m_message_sink.get();
348
349 // Need to check is there existing information for (mbox_id, msg_type)
350 // pair. If it's already here then there is no need to call
351 // subscribe_event_handler for the mbox.
352 const bool info_for_mbox_msg_type_exists =
354 existed_position,
355 is_same_mbox_msg_t{ mbox->id(), msg_type } );
356
357 // Item is no more needed.
358 m_events.erase( existed_position );
359
360 // Note v.5.5.9 unsubscribe_event_handler is called for
361 // mbox even if it is MPSC mbox. It is necessary for the case
362 // of message delivery tracing.
363
364 // If there is no more subscriptions to that mbox then
365 // the mbox must remove information about that agent.
366 if( !info_for_mbox_msg_type_exists )
367 {
368 // If we are here then there is no more references
369 // to the mbox. And mbox must not hold reference
370 // to the agent.
371 mbox->unsubscribe_event_handler( msg_type, message_sink );
372 }
373 }
374 }
375
376void
378 const mbox_t & mbox,
379 const std::type_index & msg_type ) noexcept
380 {
381 using namespace std;
382
383 const auto predicate = is_same_mbox_msg_t{ mbox->id(), msg_type };
384 if( auto it = std::lower_bound( m_events.begin(), m_events.end(),
385 // NOTE: use NULL instead of actual pointer to a state.
386 key_info_t{ mbox->id(), msg_type, nullptr },
388 it != m_events.end() && predicate( *it ) )
389 {
390 // There are subscriptions to be removed.
391 // Have to store message_sink reference because it has to
392 // be passed to unsubscribe_event_handler.
393 auto & message_sink = it->m_message_sink.get();
394
395 // Remove all items that match the predicate.
396 m_events.erase(
397 remove_if( it, end( m_events ), predicate ),
398 end( m_events ) );
399
400 // Note: since v.5.5.9 mbox unsubscription is initiated even if
401 // it is MPSC mboxes. It is important for the case of message
402 // delivery tracing.
403 mbox->unsubscribe_event_handler( msg_type, message_sink );
404 }
405 }
406
407void
412
415 mbox_id_t mbox_id,
416 const std::type_index & msg_type,
417 const state_t & current_state ) const noexcept
418 {
419 auto existed_position = std::lower_bound(
420 m_events.begin(), m_events.end(),
421 key_info_t{ mbox_id, msg_type, std::addressof(current_state) },
423 if( existed_position != m_events.end()
424 && is_equal( *existed_position,
425 mbox_id, msg_type, std::addressof(current_state) ) )
426 {
427 return std::addressof(existed_position->m_handler);
428 }
429 else
430 return nullptr;
431 }
432
433void
434storage_t::debug_dump( std::ostream & to ) const
435 {
436 for( const auto & e : m_events )
437 to << "{" << e.m_mbox->id() << ", "
438 << e.m_msg_type.name() << ", "
439 << e.m_state->query_name() << "}"
440 << std::endl;
441 }
442
443void
445 {
446 if( m_events.empty() )
447 // Nothing to do at empty subscription list.
448 return;
449
450 // Destroy all subscriptions for unique (mbox, msg_type).
451 const auto total_items = m_events.size();
452 for( std::size_t i = 0u; i < total_items; )
453 {
454 auto & current_info = m_events[ i ];
456 current_info.m_msg_type,
457 current_info.m_message_sink );
458
459 // We should skip all consequtive items with the same
460 // (mbox, msg_type) pairs.
461 std::size_t j = 1u;
462 for( ; (i+j) < total_items; ++j )
463 {
464 const auto & next_info = m_events[ i+j ];
465 if( current_info.m_mbox->id() != next_info.m_mbox->id() ||
466 current_info.m_msg_type != next_info.m_msg_type )
467 break;
468 }
469
470 i += j;
471 }
472
473 // Cleanup subscription vector.
475 }
476
477bool
479 subscr_info_vector_t::iterator it,
480 const is_same_mbox_msg_t & predicate ) const noexcept
481 {
482 bool result = false;
483 if( it != m_events.begin() )
484 result = predicate( *(std::prev(it)) );
485
486 if( !result )
487 {
488 if( auto next = std::next(it); next != m_events.end() )
489 result = predicate( *(std::next(it)) );
490 }
491
492 return result;
493 }
494
495void
497 {
498 m_events.clear();
499 }
500
501subscription_storage_common::subscr_info_vector_t
503 {
504 return m_events;
505 }
506
507void
509 subscription_storage_common::subscr_info_vector_t && info )
510 {
511 m_events = std::move( info );
512 std::sort( m_events.begin(), m_events.end(), key_info_comparator_t{} );
513 }
514
515std::size_t
517 {
518 return m_events.size();
519 }
520
521} /* namespace flat_set_based_subscr_storage */
522
523} /* namespace impl */
524
525SO_5_FUNC subscription_storage_factory_t
527 std::size_t initial_capacity )
528 {
529 return [initial_capacity]() {
530 return impl::subscription_storage_unique_ptr_t(
532 initial_capacity ) );
533 };
534 }
535
536} /* namespace so_5 */
virtual void unsubscribe_event_handler(const std::type_index &type_index, abstract_message_sink_t &subscriber) noexcept=0
Remove all message handlers.
virtual void subscribe_event_handler(const std::type_index &type_index, abstract_message_sink_t &subscriber)=0
Add the message handler.
virtual mbox_id_t id() const =0
Unique ID of this mbox.
Interface for message sink.
A flat-set based storage for agent's subscriptions information.
subscription_storage_common::subscr_info_vector_t query_content() const override
virtual void create_event_subscription(const mbox_t &mbox_ref, const std::type_index &type_index, abstract_message_sink_t &message_sink, const state_t &target_state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind) override
void setup_content(subscription_storage_common::subscr_info_vector_t &&info) override
Setup content from information from another storage object.
const event_handler_data_t * find_handler(mbox_id_t mbox_id, const std::type_index &msg_type, const state_t &current_state) const noexcept override
virtual void drop_subscription(const mbox_t &mbox, const std::type_index &msg_type, const state_t &target_state) noexcept override
std::size_t query_subscriptions_count() const override
Count of subscriptions in the storage.
bool check_presence_of_mbox_msg_type_info_around_it(subscr_info_vector_t::iterator it, const is_same_mbox_msg_t &predicate) const noexcept
Helper for checking presence of subscriptions for the same message from the same mbox.
void drop_subscription_for_all_states(const mbox_t &mbox, const std::type_index &msg_type) noexcept override
void drop_all_subscriptions() noexcept override
Drop all subscriptions.
An interface of subscription storage.
T * operator->() const noexcept
std::string query_name() const
Get textual name of the state.
Definition agent.cpp:409
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
auto do_with_rollback_on_exception(Main_Action main_action, Rollback_Action rollback_action) -> decltype(main_action())
Helper function for do some action with rollback in the case of an exception.
bool is_equal(const subscription_storage_common::subscr_info_t &a, const subscription_storage_common::subscr_info_t &b) noexcept
Helper to check if two objects are for the same subscription.
bool is_equal(const subscription_storage_common::subscr_info_t &a, mbox_id_t mbox_id, const std::type_index &msg_type, const state_t *target_state) noexcept
Helper to check if subscription information the same.
A flat-set storage for agent's subscriptions information.
Common stuff for various subscription storage implementations.
std::string make_subscription_description(const mbox_t &mbox_ref, std::type_index msg_type, const state_t &state)
A helper function for creating subscription description.
Details of SObjectizer run-time implementations.
Definition agent.cpp:780
Private part of message limit implementation.
Definition agent.cpp:33
SO_5_FUNC subscription_storage_factory_t flat_set_based_subscription_storage_factory(std::size_t initial_capacity)
Factory for subscription storage based on sorted std::vector.
thread_safety_t
Thread safety indicator.
Definition types.hpp:50
const int rc_evt_handler_already_provided
A handler for that event/mbox/state is already registered.
Definition ret_code.hpp:105
event_handler_kind_t
Kind of an event handler.
Definition types.hpp:154
Information about event_handler and its properties.
A helper predicate for searching the same mbox and message type pairs.
bool operator()(const key_info_t &a, const key_info_t &b) const noexcept
Helper type for storing only key information about a subscription.
std::reference_wrapper< abstract_message_sink_t > m_message_sink
Message sink used for subscription.
subscr_info_t(mbox_t mbox, std::type_index msg_type, abstract_message_sink_t &message_sink, const state_t &state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind)