SObjectizer  5.8
Loading...
Searching...
No Matches
subscr_storage_map_based.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \since
7 * v.5.5.3
8 *
9 * \file
10 * \brief A map-based storage for agent's subscriptions information.
11 */
12
13#include <so_5/impl/subscription_storage_iface.hpp>
14
15#include <algorithm>
16#include <iterator>
17#include <map>
18
19#include <so_5/details/rollback_on_exception.hpp>
20
21namespace so_5
22{
23
24namespace impl
25{
26
27/*!
28 * \since
29 * v.5.5.3
30 *
31 * \brief A map-based storage for agent's subscriptions information.
32 */
34{
35
36/*!
37 * \since
38 * v.5.5.3
39 *
40 * \brief A map-based storage for agent's subscriptions information.
41 *
42 * This is very simple implementation of subscription storage which
43 * uses std::map for storing information.
44 */
46 {
47 public :
48 storage_t();
49 ~storage_t() override;
50
51 void
53 const mbox_t & mbox_ref,
54 const std::type_index & type_index,
55 abstract_message_sink_t & message_sink,
56 const state_t & target_state,
57 const event_handler_method_t & method,
58 thread_safety_t thread_safety,
59 event_handler_kind_t handler_kind ) override;
60
61 void
63 const mbox_t & mbox,
64 const std::type_index & msg_type,
65 const state_t & target_state ) noexcept override;
66
67 void
69 const mbox_t & mbox,
70 const std::type_index & msg_type ) noexcept override;
71
72 void
73 drop_all_subscriptions() noexcept override;
74
77 mbox_id_t mbox_id,
78 const std::type_index & msg_type,
79 const state_t & current_state ) const noexcept override;
80
81 void
82 debug_dump( std::ostream & to ) const override;
83
84 void
85 drop_content() noexcept override;
86
88 query_content() const override;
89
90 void
92 subscription_storage_common::subscr_info_vector_t && info ) override;
93
94 std::size_t
95 query_subscriptions_count() const override;
96
97 private :
98 //! Type of key in subscription's map.
99 struct key_t
100 {
101 mbox_id_t m_mbox_id;
104
106 mbox_id_t mbox_id,
107 std::type_index msg_type,
108 const state_t * state )
109 : m_mbox_id( mbox_id )
110 , m_msg_type( std::move( msg_type ) )
111 , m_state( state )
112 {}
113
114 bool
115 operator<( const key_t & o ) const
116 {
117 if( m_mbox_id < o.m_mbox_id )
118 return true;
119 else if( m_mbox_id == o.m_mbox_id )
120 {
121 if( m_msg_type < o.m_msg_type )
122 return true;
123 else if( m_msg_type == o.m_msg_type )
124 return m_state < o.m_state;
125 }
126
127 return false;
128 }
129 };
130
131 //! Type of value for subscription map's item.
132 struct value_t
133 {
134 //! Reference to mbox.
135 /*!
136 * Reference must be stored because we must have
137 * access to mbox during destroyment of all
138 * subscriptions in destructor.
139 */
140 const mbox_t m_mbox;
141
142 /*!
143 * Message sink used for that mbox.
144 */
146
147 /*!
148 * Event handler for that subscription.
149 */
151 };
152
153 //! Type of subscriptions map.
155
156 //! Subscription information.
158
159 void
160 destroy_all_subscriptions() noexcept;
161 };
162
163namespace
164{
165 template< class C >
166 [[nodiscard]]
167 auto
168 find( C & c,
169 const mbox_id_t & mbox_id,
170 const std::type_index & msg_type,
171 const state_t & target_state ) -> decltype( c.begin() )
172 {
173 return c.find( typename C::key_type {
174 mbox_id, msg_type, &target_state } );
175 }
176
178 {
179 const mbox_id_t m_id;
181
182 template< class K >
183 [[nodiscard]]
184 bool
185 operator()( const K & k ) const
186 {
187 return m_id == k.m_mbox_id && m_type == k.m_msg_type;
188 }
189 };
190
191 template< class M, class IT >
192 [[nodiscard]]
193 bool
195 {
196 const is_same_mbox_msg predicate{
197 it->first.m_mbox_id, it->first.m_msg_type };
198
199 if( it != s.begin() )
200 {
201 IT prev = std::prev( it );
202 if( predicate( prev->first ) )
203 return true;
204 }
205
206 IT next = std::next( it );
207 if( next != s.end() )
208 return predicate( next->first );
209
210 return false;
211 }
212
213} /* namespace anonymous */
214
217
222
223void
225 const mbox_t & mbox,
226 const std::type_index & msg_type,
227 abstract_message_sink_t & message_sink,
228 const state_t & target_state,
229 const event_handler_method_t & method,
230 thread_safety_t thread_safety,
231 event_handler_kind_t handler_kind )
232 {
233 using namespace std;
234 using namespace subscription_storage_common;
235
236 const auto mbox_id = mbox->id();
237
238 // Check that this subscription is new.
239 auto existed_position = find(
240 m_events, mbox_id, msg_type, target_state );
241
242 if( existed_position != m_events.end() )
244 rc_evt_handler_already_provided,
245 "agent is already subscribed to message, " +
246 make_subscription_description( mbox, msg_type, target_state ) );
247
248 // Just add subscription to the end.
249 auto ins_result = m_events.emplace(
250 key_t { mbox_id, msg_type, &target_state },
251 value_t {
252 mbox,
253 std::ref( message_sink ),
254 event_handler_data_t {
255 method,
256 thread_safety,
257 handler_kind
258 }
259 }
260 );
261
262 // Note: since v.5.5.9 mbox subscription is initiated even if
263 // it is MPSC mboxes. It is important for the case of message
264 // delivery tracing.
265
266 // If there was no subscription for that mbox
267 // then new subscription in the mbox must be created.
268 if( !is_known_mbox_msg_pair( m_events, ins_result.first ) )
269 {
270 so_5::details::do_with_rollback_on_exception(
271 [&] {
272 mbox->subscribe_event_handler(
273 msg_type,
274 message_sink );
275 },
276 [&] {
277 m_events.erase( ins_result.first );
278 } );
279 }
280 }
281
282void
284 const mbox_t & mbox,
285 const std::type_index & msg_type,
286 const state_t & target_state ) noexcept
287 {
288 auto existed_position = find(
289 m_events, mbox->id(), msg_type, target_state );
290 if( existed_position != m_events.end() )
291 {
292 // Note v.5.5.9 unsubscribe_event_handler is called for
293 // mbox even if it is MPSC mbox. It is necessary for the case
294 // of message delivery tracing.
295
296 // We must destroy mbox subscription in case if the agent has no
297 // more subscriptions for that mbox+msg_type pair.
298 // Detect this while existed_position is not invalidated.
299 bool must_unsubscribe_mbox =
300 !is_known_mbox_msg_pair( m_events, existed_position );
301
302 // Store a reference to message_sink for a case if
303 // unsubscribe_event_handler has to be called.
304 abstract_message_sink_t & sink = existed_position->second.m_message_sink.get();
305
306 m_events.erase( existed_position );
307
308 if( must_unsubscribe_mbox )
309 {
310 mbox->unsubscribe_event_handler( msg_type, sink );
311 }
312 }
313 }
314
315void
317 const mbox_t & mbox,
318 const std::type_index & msg_type ) noexcept
319 {
320 const is_same_mbox_msg is_same{ mbox->id(), msg_type };
321
322 auto lower_bound = m_events.lower_bound(
323 key_t{ mbox->id(), msg_type, nullptr } );
324
325 auto need_erase = [&] {
326 return lower_bound != std::end(m_events) &&
327 is_same( lower_bound->first );
328 };
329 const bool events_found = need_erase();
330
331 if( events_found )
332 {
333 // Store a reference to message_sink because it's required
334 // for unsubscribe_event_handler call.
335 abstract_message_sink_t & sink = lower_bound->second.m_message_sink.get();
336
337 // Erase all subscribed event handlers.
338 do
339 {
340 m_events.erase( lower_bound++ );
341 }
342 while( need_erase() );
343
344 // Note: since v.5.5.9 mbox unsubscription is initiated even if
345 // it is MPSC mboxes. It is important for the case of message
346 // delivery tracing.
347
348 mbox->unsubscribe_event_handler( msg_type, sink );
349 }
350 }
351
352void
357
360 mbox_id_t mbox_id,
361 const std::type_index & msg_type,
362 const state_t & current_state ) const noexcept
363 {
364 auto it = find( m_events, mbox_id, msg_type, current_state );
365
366 if( it != std::end( m_events ) )
367 return &(it->second.m_handler);
368 else
369 return nullptr;
370 }
371
372void
373storage_t::debug_dump( std::ostream & to ) const
374 {
375 for( const auto & e : m_events )
376 to << "{" << e.first.m_mbox_id << ", "
377 << e.first.m_msg_type.name() << ", "
378 << e.first.m_state->query_name() << "}"
379 << std::endl;
380 }
381
382void
384 {
385 using namespace std;
386
387 auto it = begin( m_events );
388 while( it != end( m_events ) )
389 {
390 auto cur = it++;
391
392 if( it == end( m_events ) || !is_same_mbox_msg{
393 cur->first.m_mbox_id,
394 cur->first.m_msg_type }( it->first ) )
395 {
396 cur->second.m_mbox->unsubscribe_event_handler(
397 cur->first.m_msg_type,
398 cur->second.m_message_sink.get() );
399 }
400
401 m_events.erase( cur );
402 }
403 }
404
405void
407 {
408 m_events.clear();
409 }
410
413 {
414 using namespace std;
415 using namespace subscription_storage_common;
416
417 subscr_info_vector_t result;
418
419 if( !m_events.empty() )
420 {
421 result.reserve( m_events.size() );
422
423 transform( begin( m_events ), end( m_events ),
424 back_inserter( result ),
425 []( const subscr_map_t::value_type & e )
426 {
427 return subscr_info_t(
428 e.second.m_mbox,
429 e.first.m_msg_type,
430 e.second.m_message_sink.get(),
431 *(e.first.m_state),
432 e.second.m_handler.m_method,
433 e.second.m_handler.m_thread_safety,
434 e.second.m_handler.m_kind );
435 } );
436 }
437
438 return result;
439 }
440
441void
443 subscription_storage_common::subscr_info_vector_t && info )
444 {
445 using namespace std;
446 using namespace subscription_storage_common;
447
448 subscr_map_t events;
449 transform( begin(info), end(info),
450 inserter( events, events.begin() ),
451 []( const subscr_info_t & i )
452 {
453 return subscr_map_t::value_type {
454 key_t {
455 i.m_mbox->id(),
456 i.m_msg_type,
457 i.m_state
458 },
459 value_t {
460 i.m_mbox,
461 i.m_message_sink,
462 i.m_handler
463 } };
464 } );
465
466 swap( m_events, events );
467 }
468
469std::size_t
471 {
472 return m_events.size();
473 }
474
475} /* namespace map_based_subscr_storage */
476
477} /* namespace impl */
478
481 {
482 return []() {
483 return impl::subscription_storage_unique_ptr_t(
484 new impl::map_based_subscr_storage::storage_t() );
485 };
486 }
487
488} /* namespace so_5 */
Interface for message sink.
A map-based storage for agent's subscriptions information.
void drop_subscription_for_all_states(const mbox_t &mbox, const std::type_index &msg_type) noexcept override
const event_handler_data_t * find_handler(mbox_id_t mbox_id, const std::type_index &msg_type, const state_t &current_state) const noexcept override
void drop_all_subscriptions() noexcept override
Drop all subscriptions.
void drop_content() noexcept override
Drop all content.
void create_event_subscription(const mbox_t &mbox_ref, const std::type_index &type_index, abstract_message_sink_t &message_sink, const state_t &target_state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind) override
void setup_content(subscription_storage_common::subscr_info_vector_t &&info) override
Setup content from information from another storage object.
std::size_t query_subscriptions_count() const override
Count of subscriptions in the storage.
void drop_subscription(const mbox_t &mbox, const std::type_index &msg_type, const state_t &target_state) noexcept override
subscription_storage_common::subscr_info_vector_t query_content() const override
An interface of subscription storage.
std::enable_if< details::is_agent_method_pointer< details::method_arity::nullary, Method_Pointer >::value, state_t & >::type on_enter(Method_Pointer pfn)
Set on enter handler.
Definition agent.hpp:4042
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
auto find(C &c, const mbox_id_t &mbox_id, const std::type_index &msg_type, const state_t &target_state) -> decltype(c.begin())
A map-based storage for agent's subscriptions information.
Common stuff for various subscription storage implementations.
Details of SObjectizer run-time implementations.
Definition agent.cpp:905
Private part of message limit implementation.
Definition agent.cpp:33
thread_safety_t
Thread safety indicator.
Definition types.hpp:50
SO_5_FUNC subscription_storage_factory_t map_based_subscription_storage_factory()
Factory for subscription storage based on std::map.
event_handler_kind_t
Kind of an event handler.
Definition types.hpp:154
Information about event_handler and its properties.
key_t(mbox_id_t mbox_id, std::type_index msg_type, const state_t *state)
const std::reference_wrapper< abstract_message_sink_t > m_message_sink