SObjectizer 5.8
Loading...
Searching...
No Matches
subscr_storage_hash_table_based.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \since
7 * v.5.5.3
8 *
9 * \file
10 * \brief A hash_table-based storage for agent's subscriptions information.
11 */
12
13#include <so_5/impl/subscription_storage_iface.hpp>
14
15#include <algorithm>
16#include <map>
17#include <unordered_map>
18#include <iterator>
19
20#include <so_5/details/rollback_on_exception.hpp>
21
22namespace so_5
23{
24
25namespace impl
26{
27
28/*!
29 * \since
30 * v.5.4.0
31 *
32 * \brief A hash_table-based storage for agent's subscriptions information.
33 */
35{
36
37//! Subscription key type.
38struct key_t
39{
40 //! Unique ID of mbox.
41 mbox_id_t m_mbox_id;
42 //! Message type.
43 std::type_index m_msg_type;
44 //! State of agent.
45 const state_t * m_state;
46
47 //! Default constructor.
48 inline key_t()
50 , m_msg_type( typeid(void) )
51 , m_state( nullptr )
52 {}
53
54 //! Constructor for the case when it is necessary to
55 //! find all keys with (mbox_id, msg_type) prefix.
56 inline key_t(
57 mbox_id_t mbox_id,
58 std::type_index msg_type )
59 : m_mbox_id( mbox_id )
60 , m_msg_type( msg_type )
61 , m_state( nullptr )
62 {}
63
64 //! Initializing constructor.
65 inline key_t(
66 mbox_id_t mbox_id,
67 std::type_index msg_type,
68 const state_t & state )
69 : m_mbox_id( mbox_id )
70 , m_msg_type( msg_type )
71 , m_state( &state )
72 {}
73
74 inline bool
75 operator<( const key_t & o ) const noexcept
76 {
77 if( m_mbox_id < o.m_mbox_id )
78 return true;
79 else if( m_mbox_id == o.m_mbox_id )
80 {
81 if( m_msg_type < o.m_msg_type )
82 return true;
83 else if( m_msg_type == o.m_msg_type )
84 return m_state < o.m_state;
85 }
86
87 return false;
88 }
89
90 inline bool
91 operator==( const key_t & o ) const noexcept
92 {
93 return m_mbox_id == o.m_mbox_id &&
95 m_state == o.m_state;
96 }
97
98 inline bool
99 is_same_mbox_msg_pair( const key_t & o ) const noexcept
100 {
101 return m_mbox_id == o.m_mbox_id &&
103 }
104};
105
106//
107// hash_t
108//
109/*!
110 * \since
111 * v.5.4.0
112 *
113 * \brief A special class for calculating hash value via pointer to key.
114 */
115struct hash_t
116 {
117 [[nodiscard]]
118 std::size_t
119 operator()( const key_t * ptr ) const noexcept
120 {
121 // This details have been borrowed from documentation fo
122 // boost::hash_combine function:
123 // http://www.boost.org/doc/libs/1_46_1/doc/html/hash/reference.html#boost.hash_combine
124 //
125 const auto h1 =
126 std::hash< so_5::mbox_id_t >()( ptr->m_mbox_id );
127 const auto h2 = h1 ^
128 (std::hash< std::type_index >()( ptr->m_msg_type ) +
129 0x9e3779b9 + (h1 << 6) + (h1 >> 2));
130
131 return h2 ^ (std::hash< const state_t * >()(
132 ptr->m_state ) +
133 0x9e3779b9 + (h2 << 6) + (h2 >> 2));
134 }
135 };
136
137//
138// equal_to_t
139//
140/*!
141 * \since
142 * v.5.4.0
143 *
144 * \brief A special class for checking equality via pointer to key.
145 */
147 {
148 [[nodiscard]]
149 bool
150 operator()( const key_t * a, const key_t * b ) const
151 {
152 return (*a) == (*b);
153 }
154 };
155
156namespace
157{
158 template< class S >
159 [[nodiscard]]
160 bool
162 S & s,
163 typename S::iterator it )
164 {
165 if( it != s.begin() )
166 {
167 typename S::iterator prev = it;
168 --prev;
169 if( it->first.is_same_mbox_msg_pair( prev->first ) )
170 return true;
171 }
172
173 typename S::iterator next = it;
174 ++next;
175 if( next != s.end() )
176 return it->first.is_same_mbox_msg_pair( next->first );
177
178 return false;
179 }
180
181} /* namespace anonymous */
182
183/*!
184 * \since
185 * v.5.4.0
186 *
187 * \brief A storage for agent's subscriptions information.
188 *
189 * The motivation for this class is the following:
190 *
191 * Initially an agent had only one subscription map where the key
192 * was a compound key: (mbox_id, msg_type, state).
193 *
194 * This map was used in both operation: subscription creation/removing
195 * and event handler searching. But this data structure was very inefficient
196 * for event handler lookup. Especially when agent receive the same
197 * message from same mbox in different states. It means that for most
198 * lookup operations (mbox_id, msg_type) were the same.
199 *
200 * This class uses std::map and std::unordered_map for different purposes:
201 * - std::map is used for subscription management (strictly ordered data
202 * structure makes subscription management a lot easier);
203 * - std::unordered_map is used for event handler lookup operations
204 * (hash_table is more effiecient especially for the cases where the
205 * only difference in lookup keys is in state field).
206 */
208 {
209 public :
210 storage_t();
211 ~storage_t() override;
212
213 virtual void
215 const mbox_t & mbox_ref,
216 const std::type_index & type_index,
217 abstract_message_sink_t & message_sink,
218 const state_t & target_state,
219 const event_handler_method_t & method,
220 thread_safety_t thread_safety,
221 event_handler_kind_t handler_kind ) override;
222
223 virtual void
225 const mbox_t & mbox_ref,
226 const std::type_index & type_index,
227 const state_t & target_state ) noexcept override;
228
229 void
231 const mbox_t & mbox_ref,
232 const std::type_index & type_index ) noexcept override;
233
234 void
235 drop_all_subscriptions() noexcept override;
236
239 mbox_id_t mbox_id,
240 const std::type_index & msg_type,
241 const state_t & current_state ) const noexcept override;
242
243 void
244 debug_dump( std::ostream & to ) const override;
245
246 void
247 drop_content() noexcept override;
248
249 subscription_storage_common::subscr_info_vector_t
250 query_content() const override;
251
252 void
254 subscription_storage_common::subscr_info_vector_t && info ) override;
255
256 std::size_t
257 query_subscriptions_count() const override;
258
259 private :
260 //! Information about mbox and message_sink used for
261 //! subscription to that mbox.
263 {
264 mbox_t m_mbox;
265 std::reference_wrapper< abstract_message_sink_t > m_message_sink;
266 };
267
268 //! Type of subscription map.
269 using map_t = std::map< key_t, mbox_with_sink_info_t >;
270
271 //! Map of subscriptions.
272 /*!
273 * It is important to has right order of elements in this map:
274 * all subscription for the same (mbox,message) must be groupped
275 * together.
276 */
277 map_t m_map;
278
279 //! Type of event handlers hash table.
280 using hash_table_t = std::unordered_map<
281 const key_t *,
283 hash_t,
284 equal_to_t >;
285
286 //! Hash table of event handlers.
287 hash_table_t m_hash_table;
288
289 void
290 destroy_all_subscriptions() noexcept;
291 };
292
295
300
301void
303 const mbox_t & mbox,
304 const std::type_index & type_index,
305 abstract_message_sink_t & message_sink,
306 const state_t & target_state,
307 const event_handler_method_t & method,
308 thread_safety_t thread_safety,
309 event_handler_kind_t handler_kind )
310 {
311 using namespace subscription_storage_common;
312
313 key_t key{ mbox->id(), type_index, target_state };
314
315 auto insertion_result = m_map.emplace(
316 key,
317 mbox_with_sink_info_t{ mbox, std::ref(message_sink) } );
318
319 if( !insertion_result.second )
322 "agent is already subscribed to message, " +
323 make_subscription_description( mbox, type_index, target_state ) );
324
326 [&] {
327 m_hash_table.emplace( &(insertion_result.first->first),
328 event_handler_data_t( method, thread_safety, handler_kind ) );
329 },
330 [&] { m_map.erase( insertion_result.first ); } );
331
332 auto mbox_msg_known = is_known_mbox_msg_pair(
333 m_map, insertion_result.first );
334 if( !mbox_msg_known )
335 {
336 // Mbox must create subscription.
338 [&] {
340 type_index, message_sink );
341 },
342 [&] {
343 m_hash_table.erase( &(insertion_result.first->first) );
344 m_map.erase( insertion_result.first );
345 } );
346 }
347 }
348
349void
351 const mbox_t & mbox_ref,
352 const std::type_index & type_index,
353 const state_t & target_state ) noexcept
354 {
355 key_t key( mbox_ref->id(), type_index, target_state );
356
357 auto it = m_map.find( key );
358
359 if( m_map.end() != it )
360 {
361 bool mbox_msg_known = is_known_mbox_msg_pair( m_map, it );
362
363 // A reference to message_sink has to be stored for a case when
364 // unsubscribe_event_handler has to be called.
365 abstract_message_sink_t & sink = it->second.m_message_sink.get();
366
367 m_hash_table.erase( &(it->first) );
368 m_map.erase( it );
369
370 if( !mbox_msg_known )
371 {
372 mbox_ref->unsubscribe_event_handler( type_index, sink );
373 }
374 }
375 }
376
377void
379 const mbox_t & mbox_ref,
380 const std::type_index & type_index ) noexcept
381 {
382 const key_t key( mbox_ref->id(), type_index );
383
384 auto it = m_map.lower_bound( key );
385 auto need_erase = [&] {
386 return it != m_map.end() &&
387 key.is_same_mbox_msg_pair( it->first );
388 };
389 const bool found = need_erase();
390 if( found )
391 {
392 // A reference to message_sink is necessary for calling
393 // unsubscribe_event_handler. Store it here before removing
394 // items from m_hash_table and m_map.
395 abstract_message_sink_t & sink = it->second.m_message_sink.get();
396
397 do
398 {
399 m_hash_table.erase( &(it->first) );
400 m_map.erase( it++ );
401 }
402 while( need_erase() );
403
404 mbox_ref->unsubscribe_event_handler( type_index, sink );
405 }
406 }
407
408void
413
416 mbox_id_t mbox_id,
417 const std::type_index & msg_type,
418 const state_t & current_state ) const noexcept
419 {
420 key_t k( mbox_id, msg_type, current_state );
421 auto it = m_hash_table.find( &k );
422 if( it != m_hash_table.end() )
423 return &(it->second);
424 else
425 return nullptr;
426 }
427
428void
429storage_t::debug_dump( std::ostream & to ) const
430 {
431 for( const auto & v : m_map )
432 to << "{" << v.first.m_mbox_id << ", "
433 << v.first.m_msg_type.name() << ", "
434 << v.first.m_state->query_name() << "}"
435 << std::endl;
436 }
437
438void
440 {
441 {
442 const map_t::value_type * previous = nullptr;
443 for( auto & i : m_map )
444 {
445 // Optimisation: for several consequtive keys with
446 // the same (mbox, msg_type) pair it is necessary to
447 // call unsubscribe_event_handler only once.
448 if( !previous ||
449 !previous->first.is_same_mbox_msg_pair( i.first ) )
450 {
452 i.first.m_msg_type,
453 i.second.m_message_sink.get() );
454 }
455
456 previous = &i;
457 }
458 }
459
461 }
462
463void
465 {
466 m_hash_table.clear();
467
468 m_map.clear();
469 }
470
471subscription_storage_common::subscr_info_vector_t
473 {
474 using namespace std;
475 using namespace subscription_storage_common;
476
477 subscr_info_vector_t events;
478
479 if( !m_hash_table.empty() )
480 {
481 events.reserve( m_hash_table.size() );
482
483 transform( begin(m_hash_table), end(m_hash_table),
484 back_inserter(events),
485 [this]( const hash_table_t::value_type & i )
486 {
487 auto map_item = m_map.find( *(i.first) );
488
489 return subscr_info_t {
490 map_item->second.m_mbox,
491 map_item->first.m_msg_type,
492 map_item->second.m_message_sink.get(),
493 *(map_item->first.m_state),
494 i.second.m_method,
495 i.second.m_thread_safety,
496 i.second.m_kind
497 };
498 } );
499 }
500
501 return events;
502 }
503
504void
506 subscription_storage_common::subscr_info_vector_t && info )
507 {
508 using namespace std;
509 using namespace subscription_storage_common;
510
511 map_t fresh_map;
512 hash_table_t fresh_table;
513
514 for_each( begin(info), end(info),
515 [&]( const subscr_info_t & i )
516 {
518
519 auto ins_result = fresh_map.emplace(
520 k,
522 i.m_mbox,
524 } );
525
526 fresh_table.emplace( &(ins_result.first->first), i.m_handler );
527 } );
528
529 swap( m_map, fresh_map );
530 swap( m_hash_table, fresh_table );
531 }
532
533std::size_t
535 {
536 return m_hash_table.size();
537 }
538
539} /* namespace hash_table_subscr_storage */
540
541} /* namespace impl */
542
543SO_5_FUNC subscription_storage_factory_t
545 {
546 return []() {
547 return impl::subscription_storage_unique_ptr_t(
549 };
550 }
551
552} /* namespace so_5 */
virtual void unsubscribe_event_handler(const std::type_index &type_index, abstract_message_sink_t &subscriber) noexcept=0
Remove all message handlers.
virtual void subscribe_event_handler(const std::type_index &type_index, abstract_message_sink_t &subscriber)=0
Add the message handler.
virtual mbox_id_t id() const =0
Unique ID of this mbox.
Interface for message sink.
A storage for agent's subscriptions information.
void drop_subscription_for_all_states(const mbox_t &mbox_ref, const std::type_index &type_index) noexcept override
void setup_content(subscription_storage_common::subscr_info_vector_t &&info) override
Setup content from information from another storage object.
virtual void create_event_subscription(const mbox_t &mbox_ref, const std::type_index &type_index, abstract_message_sink_t &message_sink, const state_t &target_state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind) override
const event_handler_data_t * find_handler(mbox_id_t mbox_id, const std::type_index &msg_type, const state_t &current_state) const noexcept override
virtual void drop_subscription(const mbox_t &mbox_ref, const std::type_index &type_index, const state_t &target_state) noexcept override
std::size_t query_subscriptions_count() const override
Count of subscriptions in the storage.
subscription_storage_common::subscr_info_vector_t query_content() const override
void drop_all_subscriptions() noexcept override
Drop all subscriptions.
An interface of subscription storage.
T * operator->() const noexcept
std::string query_name() const
Get textual name of the state.
Definition agent.cpp:409
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
auto do_with_rollback_on_exception(Main_Action main_action, Rollback_Action rollback_action) -> decltype(main_action())
Helper function for do some action with rollback in the case of an exception.
A hash_table-based storage for agent's subscriptions information.
Common stuff for various subscription storage implementations.
std::string make_subscription_description(const mbox_t &mbox_ref, std::type_index msg_type, const state_t &state)
A helper function for creating subscription description.
Details of SObjectizer run-time implementations.
Definition agent.cpp:780
Private part of message limit implementation.
Definition agent.cpp:33
thread_safety_t
Thread safety indicator.
Definition types.hpp:50
SO_5_FUNC subscription_storage_factory_t hash_table_based_subscription_storage_factory()
Factory for default subscription storage based on std::unordered_map.
mbox_id_t null_mbox_id()
Default value for null mbox_id.
Definition types.hpp:39
const int rc_evt_handler_already_provided
A handler for that event/mbox/state is already registered.
Definition ret_code.hpp:105
event_handler_kind_t
Kind of an event handler.
Definition types.hpp:154
Information about event_handler and its properties.
thread_safety_t m_thread_safety
Is event handler thread safe or not.
event_handler_method_t m_method
Method for handling event.
event_handler_data_t(event_handler_method_t method, thread_safety_t thread_safety, event_handler_kind_t kind)
event_handler_kind_t m_kind
Kind of this event handler.
A special class for checking equality via pointer to key.
A special class for calculating hash value via pointer to key.
std::size_t operator()(const key_t *ptr) const noexcept
key_t(mbox_id_t mbox_id, std::type_index msg_type, const state_t &state)
Initializing constructor.
key_t(mbox_id_t mbox_id, std::type_index msg_type)
std::reference_wrapper< abstract_message_sink_t > m_message_sink
Message sink used for subscription.
subscr_info_t(mbox_t mbox, std::type_index msg_type, abstract_message_sink_t &message_sink, const state_t &state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind)