SObjectizer  5.8
Loading...
Searching...
No Matches
subscr_storage_vector_based.cpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \since
7 * v.5.5.3
8 *
9 * \file
10 * \brief A vector-based storage for agent's subscriptions information.
11 */
12
13#include <so_5/impl/subscription_storage_iface.hpp>
14
15#include <algorithm>
16#include <vector>
17#include <iterator>
18
19#include <so_5/details/rollback_on_exception.hpp>
20
21namespace so_5
22{
23
24namespace impl
25{
26
27/*!
28 * \since
29 * v.5.5.3
30 *
31 * \brief A vector-based storage for agent's subscriptions information.
32 */
34{
35
36/*!
37 * \since
38 * v.5.5.3
39 *
40 * \brief A vector-based storage for agent's subscriptions information.
41 *
42 * This is very simple implementation of subscription storage which
43 * uses std::vector for storing information.
44 *
45 * All manipulation is performed by very simple linear search inside
46 * that vector. For agents with few subscriptions this will be the most
47 * efficient approach.
48 */
50 {
51 public :
53 std::size_t initial_capacity );
54 ~storage_t() override;
55
56 virtual void
58 const mbox_t & mbox_ref,
59 const std::type_index & type_index,
60 abstract_message_sink_t & message_sink,
61 const state_t & target_state,
62 const event_handler_method_t & method,
63 thread_safety_t thread_safety,
64 event_handler_kind_t handler_kind ) override;
65
66 virtual void
68 const mbox_t & mbox,
69 const std::type_index & msg_type,
70 const state_t & target_state ) noexcept override;
71
72 void
74 const mbox_t & mbox,
75 const std::type_index & msg_type ) noexcept override;
76
77 void
78 drop_all_subscriptions() noexcept override;
79
82 mbox_id_t mbox_id,
83 const std::type_index & msg_type,
84 const state_t & current_state ) const noexcept override;
85
86 void
87 debug_dump( std::ostream & to ) const override;
88
89 void
90 drop_content() noexcept override;
91
93 query_content() const override;
94
95 void
97 subscription_storage_common::subscr_info_vector_t && info ) override;
98
99 std::size_t
100 query_subscriptions_count() const override;
101
102 private :
106
107 //! A helper predicate for searching the same
108 //! mbox and message type pairs.
110 {
111 const mbox_id_t m_id;
113
114 bool
115 operator()( const info_t & info ) const
116 {
117 return m_id == info.m_mbox->id() &&
118 m_type == info.m_msg_type;
119 }
120 };
121
122 //! Subscription information.
124
125 void
126 destroy_all_subscriptions() noexcept;
127 };
128
129namespace
130{
131 template< class Container >
132 auto
133 find( Container & c,
134 const mbox_id_t & mbox_id,
135 const std::type_index & msg_type,
136 const state_t & target_state ) -> decltype( c.begin() )
137 {
138 using namespace std;
139
140 return find_if( begin( c ), end( c ),
141 [&]( typename Container::value_type const & o ) {
142 return ( o.m_mbox->id() == mbox_id &&
143 o.m_msg_type == msg_type &&
144 o.m_state == &target_state );
145 } );
146 }
147
148} /* namespace anonymous */
149
151 std::size_t initial_capacity )
152 {
153 m_events.reserve( initial_capacity );
154 }
155
160
161void
163 const mbox_t & mbox,
164 const std::type_index & msg_type,
165 abstract_message_sink_t & message_sink,
166 const state_t & target_state,
167 const event_handler_method_t & method,
168 thread_safety_t thread_safety,
169 event_handler_kind_t handler_kind )
170 {
171 using namespace std;
172 using namespace subscription_storage_common;
173
174 const auto mbox_id = mbox->id();
175
176 // Check that this subscription is new.
177 bool has_subscriptions_from_that_mbox = false;
178 for( auto it = m_events.begin(), it_end = m_events.end();
179 it != it_end; ++it )
180 {
181 if( it->m_mbox->id() == mbox_id &&
182 it->m_msg_type == msg_type )
183 {
184 has_subscriptions_from_that_mbox = true;
185 if( it->m_state == std::addressof(target_state) )
187 rc_evt_handler_already_provided,
188 "agent is already subscribed to message, " +
189 make_subscription_description(
190 mbox,
191 msg_type,
192 target_state ) );
193 }
194 }
195
196 // If we're here then there is no such a subscription yet.
197
198 // Just add subscription to the end.
199 m_events.emplace_back(
200 mbox,
201 msg_type,
202 message_sink,
203 target_state,
204 method,
205 thread_safety,
206 handler_kind );
207
208 // Note: since v.5.5.9 mbox subscription is initiated even if
209 // it is MPSC mboxes. It is important for the case of message
210 // delivery tracing.
211 if( !has_subscriptions_from_that_mbox )
212 {
213 // Mbox must create subscription.
214 so_5::details::do_with_rollback_on_exception(
215 [&] {
216 mbox->subscribe_event_handler(
217 msg_type,
218 message_sink );
219 },
220 [&] {
221 m_events.pop_back();
222 } );
223 }
224 }
225
226void
228 const mbox_t & mbox,
229 const std::type_index & msg_type,
230 const state_t & target_state ) noexcept
231 {
232 using namespace std;
233
234 const auto mbox_id = mbox->id();
235
236 // Try to find a subscription. And calculate number of subscriptions
237 // from the same mbox for same msg_type.
238 std::size_t number_of_subscriptions{};
239 subscr_info_vector_t::iterator it = m_events.begin();
240 subscr_info_vector_t::iterator it_end = m_events.end();
241
242 for(; it != it_end; ++it )
243 {
244 if( it->m_mbox->id() == mbox_id &&
245 it->m_msg_type == msg_type )
246 {
247 ++number_of_subscriptions;
248 if( it->m_state == std::addressof(target_state) )
249 // Subscription found.
250 break;
251 }
252 }
253 if( it != it_end )
254 {
255 // This value may be necessary for unsubscription.
256 abstract_message_sink_t & message_sink = it->m_message_sink.get();
257
258 // Item is no more needed.
259 it = m_events.erase( it );
260 --number_of_subscriptions;
261
262 // Note v.5.5.9 unsubscribe_event_handler is called for
263 // mbox even if it is MPSC mbox. It is necessary for the case
264 // of message delivery tracing.
265
266 // We have to check presence of other subscriptions from this mbox
267 // for the same msg_type.
268 if( !number_of_subscriptions )
269 {
270 // Maybe there are subscriptions in the right part of m_events?
271 if( m_events.end() != std::find_if( it, m_events.end(),
272 is_same_mbox_msg{ mbox_id, msg_type } ) )
273 number_of_subscriptions = 1;
274 }
275
276 // If there is no more subscriptions to that mbox then
277 // the mbox must remove information about that agent.
278 if( !number_of_subscriptions )
279 {
280 // If we are here then there is no more references
281 // to the mbox. And mbox must not hold reference
282 // to the agent.
283 mbox->unsubscribe_event_handler( msg_type, message_sink );
284 }
285 }
286 }
287
288void
290 const mbox_t & mbox,
291 const std::type_index & msg_type ) noexcept
292 {
293 using namespace std;
294
295 const auto predicate = is_same_mbox_msg{ mbox->id(), msg_type };
296 if( auto it =
297 find_if( begin( m_events ), end( m_events ), predicate );
298 it != end( m_events ) )
299 {
300 // There are subscriptions to be removed.
301 // Have to store message_sink reference because it has to
302 // be passed to unsubscribe_event_handler.
303 auto & message_sink = it->m_message_sink.get();
304
305 // Remove all items that match the predicate.
306 m_events.erase(
307 remove_if( it, end( m_events ), predicate ),
308 end( m_events ) );
309
310 // Note: since v.5.5.9 mbox unsubscription is initiated even if
311 // it is MPSC mboxes. It is important for the case of message
312 // delivery tracing.
313 mbox->unsubscribe_event_handler( msg_type, message_sink );
314 }
315 }
316
317void
322
325 mbox_id_t mbox_id,
326 const std::type_index & msg_type,
327 const state_t & current_state ) const noexcept
328 {
329 auto it = find( m_events, mbox_id, msg_type, current_state );
330
331 if( it != std::end( m_events ) )
332 return &(it->m_handler);
333 else
334 return nullptr;
335 }
336
337void
338storage_t::debug_dump( std::ostream & to ) const
339 {
340 for( const auto & e : m_events )
341 to << "{" << e.m_mbox->id() << ", "
342 << e.m_msg_type.name() << ", "
343 << e.m_state->query_name() << "}"
344 << std::endl;
345 }
346
347void
349 {
350 if( m_events.empty() )
351 // Nothing to do at empty subscription list.
352 return;
353
354 using namespace std;
355
356 // Step one.
357 //
358 // Sort all event_info to have all subscriptions for the
359 // same (mbox, msg_type) one after another.
360 sort( begin( m_events ), end( m_events ),
361 []( const auto & a, const auto & b )
362 {
363 return a.m_mbox->id() < b.m_mbox->id() ||
364 ( a.m_mbox->id() == b.m_mbox->id() &&
365 a.m_msg_type < b.m_msg_type );
366 } );
367
368 // Step two.
369 //
370 // Destroy all subscriptions for unique (mbox, msg_type).
371 const auto total_items = m_events.size();
372 for( std::size_t i = 0u; i < total_items; )
373 {
374 auto & current_info = m_events[ i ];
375 current_info.m_mbox->unsubscribe_event_handler(
376 current_info.m_msg_type,
377 current_info.m_message_sink );
378
379 // We should skip all consequtive items with the same
380 // (mbox, msg_type) pairs.
381 std::size_t j = 1u;
382 for( ; (i+j) < total_items; ++j )
383 {
384 const auto & next_info = m_events[ i+j ];
385 if( current_info.m_mbox->id() != next_info.m_mbox->id() ||
386 current_info.m_msg_type != next_info.m_msg_type )
387 break;
388 }
389
390 i += j;
391 }
392
393 // Third step.
394 //
395 // Cleanup subscription vector.
397 }
398
399void
401 {
402 m_events.clear();
403 }
404
407 {
408 return m_events;
409 }
410
411void
413 subscription_storage_common::subscr_info_vector_t && info )
414 {
415 m_events = std::move( info );
416 }
417
418std::size_t
420 {
421 return m_events.size();
422 }
423
424} /* namespace vector_based_subscr_storage */
425
426} /* namespace impl */
427
431 {
432 return [initial_capacity]() {
433 return impl::subscription_storage_unique_ptr_t(
434 new impl::vector_based_subscr_storage::storage_t(
435 initial_capacity ) );
436 };
437 }
438
439} /* namespace so_5 */
Interface for message sink.
An interface of subscription storage.
A vector-based storage for agent's subscriptions information.
void drop_all_subscriptions() noexcept override
Drop all subscriptions.
void setup_content(subscription_storage_common::subscr_info_vector_t &&info) override
Setup content from information from another storage object.
const event_handler_data_t * find_handler(mbox_id_t mbox_id, const std::type_index &msg_type, const state_t &current_state) const noexcept override
subscription_storage_common::subscr_info_vector_t query_content() const override
void drop_subscription_for_all_states(const mbox_t &mbox, const std::type_index &msg_type) noexcept override
virtual void create_event_subscription(const mbox_t &mbox_ref, const std::type_index &type_index, abstract_message_sink_t &message_sink, const state_t &target_state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind) override
subscr_info_vector_t m_events
Subscription information.
std::size_t query_subscriptions_count() const override
Count of subscriptions in the storage.
virtual void drop_subscription(const mbox_t &mbox, const std::type_index &msg_type, const state_t &target_state) noexcept override
std::enable_if< details::is_agent_method_pointer< details::method_arity::nullary, Method_Pointer >::value, state_t & >::type on_enter(Method_Pointer pfn)
Set on enter handler.
Definition agent.hpp:4042
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
Some reusable and low-level classes/functions which can be used in public header files.
Common stuff for various subscription storage implementations.
auto find(Container &c, const mbox_id_t &mbox_id, const std::type_index &msg_type, const state_t &target_state) -> decltype(c.begin())
A vector-based storage for agent's subscriptions information.
Details of SObjectizer run-time implementations.
Definition agent.cpp:905
Private part of message limit implementation.
Definition agent.cpp:33
thread_safety_t
Thread safety indicator.
Definition types.hpp:50
SO_5_FUNC subscription_storage_factory_t vector_based_subscription_storage_factory(std::size_t initial_capacity)
Factory for subscription storage based on unsorted std::vector.
event_handler_kind_t
Kind of an event handler.
Definition types.hpp:154
Information about event_handler and its properties.