SObjectizer  5.8
Loading...
Searching...
No Matches
agent.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5#include <so_5/agent.hpp>
6#include <so_5/mbox.hpp>
7#include <so_5/enveloped_msg.hpp>
8#include <so_5/environment.hpp>
9#include <so_5/send_functions.hpp>
10
11#include <so_5/impl/internal_env_iface.hpp>
12#include <so_5/impl/coop_private_iface.hpp>
13
14#include <so_5/impl/delivery_filter_storage.hpp>
15#include <so_5/impl/msg_tracing_helpers.hpp>
16#include <so_5/impl/process_unhandled_exception.hpp>
17#include <so_5/impl/std_message_sinks.hpp>
18#include <so_5/impl/subscription_storage_iface.hpp>
19
20#include <so_5/impl/enveloped_msg_details.hpp>
21
22#include <so_5/details/abort_on_fatal_error.hpp>
23
24#include <so_5/spinlocks.hpp>
25
26#include <algorithm>
27#include <sstream>
28#include <cstdint>
29#include <cstdlib>
30#include <limits>
31
32namespace so_5
33{
34
35//
36// agent_identity_t::pointer_only_t
37//
40 {
41 static constexpr std::array<char, 16> hex_symbols{
42 '0', '1', '2', '3', '4', '5', '6', '7',
43 '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
44 };
45
46 std::array<char, c_string_size> result;
47 auto it = std::copy(
48 c_string_prefix.begin(),
49 c_string_prefix.end(),
50 result.begin() );
51
52 // NOTE: this code won't compile if there is no std::uintptr_t type,
53 // but we don't care about such platforms at the moment.
54 std::uintptr_t ptr_as_uint =
55 reinterpret_cast<std::uintptr_t>(m_pointer_value);
56
57 // Handle by 4 bits portions from the most significant bit.
58 // Leading zeros are not skipped (for the simplicity of implementation).
59 constexpr unsigned int half_octet = 4u;
60 unsigned bits_to_process = sizeof(void *) * (half_octet * 2u);
61 while( bits_to_process >= half_octet )
62 {
63 const auto index = static_cast<std::size_t>(
64 (ptr_as_uint >> (bits_to_process - half_octet)) & 0x0Fu
65 );
66
67 *(it++) = hex_symbols[ index ];
68
69 bits_to_process -= half_octet;
70 }
71
72 it = std::copy( c_string_suffix.begin(), c_string_suffix.end(), it );
73 *it = 0;
74
75 return result;
76 }
77
78namespace
79{
80
81[[nodiscard]]
82unsigned int
83ensure_valid_agent_name_length( std::size_t length )
84 {
85 if( 0u == length )
86 SO_5_THROW_EXCEPTION( rc_empty_agent_name,
87 "Name of an agent can't be empty" );
88
89 constexpr std::size_t max_unit_value =
90 std::numeric_limits<unsigned int>::max();
91 if( max_unit_value < length )
92 SO_5_THROW_EXCEPTION( rc_empty_agent_name,
93 "Name of an agent is too long (length should fit "
94 "into unsigned int)" );
95
96 return static_cast<unsigned int>( length );
97 }
98
99} /* namespace anonymous */
100
101//
102// name_for_agent_t
103//
107
108name_for_agent_t::name_for_agent_t( std::string_view value )
110 {
111 m_value.reset( new char[ value.size() ] );
112 std::copy( std::begin(value), std::end(value), m_value.get() );
113 }
114
116 : m_length{ other.m_length }
117 {
118 if( other.has_value() )
119 {
120 m_value.reset( new char[ m_length ] );
121 std::copy(
122 other.m_value.get(),
123 other.m_value.get() + m_length,
124 m_value.get() );
125 }
126 }
127
130 {
131 name_for_agent_t tmp{ other };
132 swap( *this, tmp );
133 return *this;
134 }
135
137 : m_value{ std::exchange( other.m_value, std::unique_ptr< char[] >{} ) }
138 , m_length{ std::exchange( other.m_length, 0u ) }
139 {}
140
143 {
144 name_for_agent_t tmp{ std::move(other) };
145 swap( *this, tmp );
146 return *this;
147 }
148
149SO_5_FUNC void
151 {
152 using std::swap;
153 swap( a.m_value, b.m_value );
154 swap( a.m_length, b.m_length );
155 }
156
158
161 {
162 std::string_view result;
163 if( has_value() )
164 result = std::string_view{ m_value.get(), m_length };
165 return result;
166 }
167
168bool
170 {
171 return 0u != m_length;
172 }
173
174namespace
175{
176
177/*!
178 * \since
179 * v.5.4.0
180 *
181 */
182std::string
184 {
185 std::ostringstream ss;
186 ss << "<state:target=" << agent << ":this=" << st << ">";
187 return ss.str();
188 }
189
190} /* namespace anonymous */
191
192// NOTE: Implementation of state_t is moved to that file in v.5.4.0.
193
194//
195// state_t::time_limit_t
196//
197/// Information of time_limit for a state.
199{
200public:
201 /// Type of signal to be used for controlling timeouts.
202 using msg_timeout = so_5::details::msg_state_timeout;
203
204 /// Type of clock to be used.
206
207 /// Initializing constructor.
209 /// Value of the time limit.
210 duration_t limit,
211 /// The target state to switch to after the time limit has been exceeded.
212 const state_t & state_to_switch ) noexcept
213 : m_limit{ limit }
215 {}
216
217 /// Change parameters of time limit.
218 ///
219 /// \note
220 /// This method only changes m_limit and m_state_to_switch but
221 /// doesn't change m_activation_data. If time_limit is activated
222 /// it has to be deactivated explicitly via deactivate() method.
223 void
225 duration_t limit,
226 const state_t & state_to_switch ) noexcept
227 {
228 m_limit = limit;
229 m_state_to_switch = state_to_switch;
230 }
231
232 /// React to activation of the state for that time_limit is defined.
233 ///
234 /// Sends delayed msg_timeout message and sets the m_activation_data.
235 ///
236 /// \attention
237 /// This method is marked as noexcept because it has to be called
238 /// in a noexcept context. But it calls methods those can throw
239 /// (like so_5::send_periodic). This is a consequence of the current
240 /// design of SObjectizer-5 (there is no non-throwing ways to send
241 /// a message yet).
242 void
244 const agent_t::state_time_limit_handling_data_t & info ) noexcept
245 {
246 activate( info );
247 }
248
249 /// React to deactivation of the state for that time_limit is defined.
250 ///
251 /// Resets the value of m_activation_data and cancels the delayed
252 /// msg_timeout message.
253 void
255 {
256 // We call reset() always even if time_limit is not activated,
257 // just for simplicity of the implementation.
258 m_activation_data.reset();
259 }
260
261 /// Sends the delayed msg_timeout signal.
262 ///
263 /// Updates the value of m_activation_data.
264 ///
265 /// It's assumed that this method will be called only if
266 /// time_limit is not active yet.
267 void
270 {
271 const auto limit_exceeded_at = steady_clock::now() + m_limit;
272 m_activation_data.emplace(
273 so_5::send_periodic< msg_timeout >(
274 info.timeout_mbox(),
275 m_limit,
276 steady_clock::duration::zero() ),
277 limit_exceeded_at );
278 }
279
280 /// Deactivates the time limit.
281 ///
282 /// This method can be called even if the time limit is not active.
283 ///
284 /// Resets the value of m_activation_data and cancels the delayed
285 /// msg_timeout signal.
286 void
288 {
289 // We call reset() always even if time_limit is not activated,
290 // just for simplicity of the implementation.
291 m_activation_data.reset();
292 }
293
294 /// \retval true if timeout has been exceeded and the agent has to
295 /// be switched to a new state.
296 /// \retval false if timeout has not been exceeded yet (or if time limit
297 /// isn't activated).
298 [[nodiscard]]
299 bool
301 const steady_clock::time_point current_time ) const noexcept
302 {
303 if( m_activation_data.has_value() )
304 {
305 return m_activation_data->m_expiration_point <= current_time;
306 }
307
308 return false;
309 }
310
311 /// \return Reference to the state to switch after expiration of the timeout.
312 [[nodiscard]]
313 const state_t &
314 state_to_switch() const noexcept
315 {
316 return m_state_to_switch.get();
317 }
318
319private:
320 /// Information for active timeout.
321 ///
322 /// \note
323 /// Destruction of an instance of activation_data_t will lead
324 /// to destruction of m_timer and this will lead to cancelling
325 /// of the delayed message.
326 ///
327 /// \since v.5.8.5
329 {
330 /// ID of delayed timeout signal.
332
333 /// Timeout of timeout expiration.
335
337 timer_id_t timer,
338 steady_clock::time_point expiration_point )
339 : m_timer{ std::move(timer) }
341 {}
342 };
343
344 /// The current duration of the timeout.
345 ///
346 /// Will be changed on the next call to state_t::time_limit().
348
349 /// The target state to switch after the timeout.
351
352 /// Information required to serve timeout when it's activated.
353 ///
354 /// Empty value means that the timeout isn't activated.
356};
357
358//
359// state_t
360//
361
363 agent_t * target_agent,
364 std::string state_name,
365 state_t * parent_state,
366 std::size_t nested_level,
367 history_t state_history )
368 : m_target_agent{ target_agent }
370 , m_parent_state{ parent_state }
371 , m_initial_substate{ nullptr }
372 , m_state_history{ state_history }
373 , m_last_active_substate{ nullptr }
375 , m_substate_count{ 0 }
376{
377 if( parent_state )
378 {
379 // We should check the deep of nested states.
380 if( m_nested_level >= max_deep )
381 SO_5_THROW_EXCEPTION( rc_state_nesting_is_too_deep,
382 "max nesting deep for agent states is " +
383 std::to_string( max_deep ) );
384
385 // Now we can safely mark parent state as composite.
386 parent_state->m_substate_count += 1;
387 }
388}
389
391 agent_t * agent )
393{
394}
395
397 agent_t * agent,
398 history_t state_history )
399 : state_t{ agent, std::string(), nullptr, 0, state_history }
400{
401}
402
404 agent_t * agent,
405 std::string state_name )
407{}
408
410 agent_t * agent,
411 std::string state_name,
412 history_t state_history )
413 : state_t{ agent, std::move(state_name), nullptr, 0, state_history }
414{}
415
420
422 initial_substate_of parent,
423 std::string state_name )
425{}
426
428 initial_substate_of parent,
429 std::string state_name,
430 history_t state_history )
431 : state_t{
437{
438 if( m_parent_state->m_initial_substate )
439 SO_5_THROW_EXCEPTION( rc_initial_substate_already_defined,
440 "initial substate for state " + m_parent_state->query_name() +
441 " is already defined: " +
442 m_parent_state->m_initial_substate->query_name() );
443
445}
446
448 substate_of parent )
450{}
451
453 substate_of parent,
454 std::string state_name )
456{}
457
459 substate_of parent,
460 std::string state_name,
461 history_t state_history )
462 : state_t{
468{}
469
486
488{
489}
490
491bool
492state_t::operator == ( const state_t & state ) const noexcept
493{
494 return &state == this;
495}
496
497std::string
499{
500 auto getter = [this]() -> std::string {
501 if( m_state_name.empty() )
502 return create_anonymous_state_name( m_target_agent, this );
503 else
504 return m_state_name;
505 };
506
507 if( m_parent_state )
508 return m_parent_state->query_name() + "." + getter();
509 else
510 return getter();
511}
512
513namespace {
514
515#if defined(__clang__)
516#pragma clang diagnostic push
517#pragma clang diagnostic ignored "-Wexit-time-destructors"
518#pragma clang diagnostic ignored "-Wglobal-constructors"
519#endif
520
521/*!
522 * \since
523 * v.5.4.0
524 *
525 * \brief A special object for the state in which agent is awaiting
526 * for deregistration after unhandled exception.
527 *
528 * This object will be shared between all agents.
529 */
531 nullptr, "<AWAITING_DEREGISTRATION_AFTER_UNHANDLED_EXCEPTION>" );
532
533/*!
534 * \since
535 * v.5.5.21
536 *
537 * \brief A special object to be used as state for make subscriptions
538 * for deadletter handlers.
539 *
540 * This object will be shared between all agents.
541 */
543 nullptr, "<DEADLETTER_STATE>" );
544
545#if defined(__clang__)
546#pragma clang diagnostic pop
547#endif
548
549} /* namespace anonymous */
550
551bool
552state_t::is_target( const agent_t * agent ) const noexcept
553{
554 if( m_target_agent )
555 return m_target_agent == agent;
556 else if( this == &awaiting_deregistration_state )
557 return true;
558 else
559 return false;
560}
561
562void
567
568state_t &
570 duration_t timeout,
571 const state_t & state_to_switch )
572{
573 if( duration_t::zero() == timeout )
574 SO_5_THROW_EXCEPTION( rc_invalid_time_limit_for_state,
575 "zero can't be used as time limit for state: " +
576 query_name() );
577
579
580 // Existing time_limit_t object will be reused, but if there is no
581 // such object yet it has to be created.
582 //
583 // NOTE: by changing m_time_limit we can provide only basic
584 // exception guarantee.
585 if( !m_time_limit )
586 {
587 m_time_limit = std::make_unique< time_limit_t >(
588 timeout, state_to_switch );
589 }
590 else
591 {
592 // NOTE: if the state is active and there was old time_limit
593 // (with already sent instance of msg_timeout) then the old
594 // delayed msg_timeout signal will be automatically cancelled.
595 m_time_limit->deactivate();
596 m_time_limit->change( timeout, state_to_switch );
597 }
598
599 // If this state is active then new time limit must be activated.
600 if( is_active() )
601 {
602 // If this action fails then m_time_limit should be reset
603 // to nullptr.
604 so_5::details::do_with_rollback_on_exception(
605 [this] {
606 m_time_limit->activate(
607 m_target_agent->m_state_time_limit_handling_data );
608 },
609 [this] {
610 m_time_limit.reset();
611 } );
612 }
613
614 return *this;
615}
616
617state_t &
619{
620 m_time_limit.reset();
621
622 return *this;
623}
624
625const state_t *
627{
628 const state_t * s = this;
629 while( 0 != s->m_substate_count )
630 {
632 // Note: for states with shallow history m_last_active_substate
633 // can point to composite substate. This substate must be
634 // processed usual way with checking for substate count, presence
635 // of initial substate and so on...
637 else if( !s->m_initial_substate )
638 SO_5_THROW_EXCEPTION( rc_no_initial_substate,
639 "there is no initial substate for composite state: " +
640 query_name() );
641 else
643 }
644
645 return s;
646}
647
648void
650{
651 auto p = m_parent_state;
652
653 // This pointer will be used for update states with shallow history.
654 // This pointer will be changed on every iteration.
655 auto c = this;
656
657 while( p )
658 {
663
664 c = p;
665 p = p->m_parent_state;
666 }
667}
668
669void
671{
672 m_time_limit->on_state_activation(
673 m_target_agent->m_state_time_limit_handling_data );
674}
675
676void
678{
679 m_time_limit->on_state_deactivation();
680}
681
682//
683// agent_t::state_time_limit_handling_data_t
684//
688
690
691bool
693{
694 return static_cast<bool>(m_timeout_mbox);
695}
696
697void
699 mbox_t timeout_mbox )
700{
701 m_timeout_mbox = std::move(timeout_mbox);
702}
703
704mbox_t
706{
707 return m_timeout_mbox;
708}
709
710namespace
711{
712
713/*!
714 * \brief Helper for creation of the direct mbox for an agent.
715 *
716 * If there is a custom direct mbox factory in \a tuning_options
717 * then the return value of that factory is used. Otherwise the
718 * \a standard_mbox is returned.
719 */
720[[nodiscard]]
721mbox_t
724 const agent_tuning_options_t & tuning_options,
725 mbox_t standard_mbox )
726 {
727 mbox_t result{ std::move(standard_mbox) };
728
729 const auto & factory =
730 tuning_options.query_custom_direct_mbox_factory();
731 if( factory )
732 {
733 result = factory( agent_ptr, std::move(result) );
734
735 if( mbox_type_t::multi_producer_single_consumer
736 != result->type() )
737 {
739 rc_mpsc_mbox_expected,
740 "MPSC mbox is expected as the direct mbox "
741 "for an agent" );
742 }
743 }
744
745 return result;
746 }
747
748/*!
749 * \brief Helper for selection of subscription storage factory.
750 *
751 * If a factory is specified in @a tuning_options explicitly then it's
752 * used. Otherwise the default subscription storage factory from
753 * SObjectizer Environment will be used.
754 *
755 * \since v.5.8.2
756 */
757[[nodiscard]]
762 {
764 return tuning_options.query_subscription_storage_factory();
765 else
766 return impl::internal_env_iface_t{ env }.default_subscription_storage_factory();
767 }
768
769} /* namespace anonymous */
770
771//
772// agent_t
773//
774
776 environment_t & env )
778{
779}
780
782 environment_t & env,
783 agent_tuning_options_t options )
784 : agent_t( context_t( env, std::move( options ) ) )
785{
786}
787
823
825{
826 // Sometimes it is possible that agent is destroyed without
827 // correct deregistration from SO Environment.
829}
830
831void
833{
834 // Default implementation do nothing.
835}
836
837void
839{
840 // Default implementation do nothing.
841}
842
843bool
844agent_t::so_is_active_state( const state_t & state_to_check ) const noexcept
845{
846 state_t::state_path_t current_path{ *m_current_state_ptr };
847
848 return current_path.end() != std::find(
849 current_path.begin(), current_path.end(), &state_to_check );
850}
851
852void
854 agent_state_listener_t & state_listener )
855{
856 m_state_listener_controller.add(
857 impl::state_listener_controller_t::wrap_nondestroyable(
858 state_listener ) );
859}
860
861void
863 agent_state_listener_unique_ptr_t state_listener )
864{
865 m_state_listener_controller.add(
866 impl::state_listener_controller_t::wrap_destroyable(
867 std::move( state_listener ) ) );
868}
869
872{
873 if( m_agent_coop )
875 else
876 // This is very strange case. So it would be better to abort.
877 return abort_on_exception;
878}
879
880void
885
886const mbox_t &
888{
889 return m_direct_mbox;
890}
891
892mbox_t
894{
895 return impl::internal_env_iface_t{ so_environment() }.create_ordinary_mpsc_mbox(
896 *self_ptr() );
897}
898
899const state_t &
901{
902 return st_default;
903}
904
905namespace impl {
906
908{
911
912public :
914 agent_t & agent )
915 : m_agent( agent )
917 {
918 if( agent_t::agent_status_t::state_switch_in_progress
919 == agent.m_current_status )
921 rc_another_state_switch_in_progress,
922 "an attempt to switch agent state when another state "
923 "switch operation is in progress for the same agent" );
924
926 }
931};
932
933} /* namespace impl */
934
935void
937 const state_t & new_state )
938{
940
941 do_change_agent_state( new_state );
942}
943
944void
952
953void
961
962void
964{
966 m_working_thread_id,
967 so_5::query_current_thread_id() );
968
970
972}
973
974void
976{
977 // Default implementation do nothing.
978}
979
980bool
985
987agent_t::so_environment() const noexcept
988{
989 return m_env;
990}
991
992[[nodiscard]]
995{
996 if( !m_agent_coop )
998 rc_agent_has_no_cooperation,
999 "agent_t::so_coop() can be completed because agent is not bound "
1000 "to any cooperation" );
1001
1003}
1004
1005void
1007 event_queue_t & queue ) noexcept
1008{
1009 // Since v.5.5.24 we should use event_queue_hook to get an
1010 // actual event_queue.
1011 auto * actual_queue = impl::internal_env_iface_t{ m_env }
1012 .event_queue_on_bind( this, &queue );
1013
1014 std::lock_guard< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
1015
1016 // Cooperation usage counter should be incremented.
1017 // It will be decremented during final agent event execution.
1019
1020 // A starting demand must be sent first.
1021 actual_queue->push_evt_start(
1022 execution_demand_t(
1023 this,
1024 message_limit::control_block_t::none(),
1025 0,
1026 typeid(void),
1027 message_ref_t(),
1028 &agent_t::demand_handler_on_start ) );
1029
1030 // Only then pointer to the queue could be stored.
1031 m_event_queue = actual_queue;
1032}
1033
1036 execution_demand_t & d )
1037{
1038 enum class demand_type_t {
1039 message, enveloped_msg, other
1040 };
1041
1042 // We can't use message_kind_t here because there are special
1043 // demands like demands for so_evt_start/so_evt_finish.
1044 // Because of that a pointer to demand handler will be analyzed.
1045 const auto demand_type =
1046 (d.m_demand_handler == &agent_t::demand_handler_on_message ?
1047 demand_type_t::message :
1048 (d.m_demand_handler == &agent_t::demand_handler_on_enveloped_msg ?
1049 demand_type_t::enveloped_msg : demand_type_t::other));
1050
1051 if( demand_type_t::other != demand_type )
1052 {
1053 // Try to find handler for the demand.
1054 auto handler = d.m_receiver->m_handler_finder(
1055 d, "create_execution_hint" );
1056 if( demand_type_t::message == demand_type )
1057 {
1058 if( handler )
1059 return execution_hint_t(
1060 d,
1061 [handler](
1062 execution_demand_t & demand,
1063 current_thread_id_t thread_id ) {
1064 process_message(
1065 thread_id,
1066 demand,
1067 handler->m_thread_safety,
1068 handler->m_method );
1069 },
1070 handler->m_thread_safety );
1071 else
1072 // Handler not found.
1074 }
1075 else
1076 {
1077 // Execution hint for enveloped message is
1078 // very similar to hint for service request.
1079 return execution_hint_t(
1080 d,
1081 [handler](
1082 execution_demand_t & demand,
1083 current_thread_id_t thread_id ) {
1084 process_enveloped_msg(
1085 thread_id,
1086 demand,
1087 handler );
1088 },
1089 handler ? handler->m_thread_safety :
1090 // If there is no real handler then
1091 // there will only be actions from
1092 // envelope.
1093 // These actions should be thread safe.
1094 thread_safe );
1095 }
1096 }
1097 else
1098 // This is demand_handler_on_start or demand_handler_on_finish.
1099 return execution_hint_t(
1100 d,
1101 []( execution_demand_t & demand,
1102 current_thread_id_t thread_id ) {
1103 demand.call_handler( thread_id );
1104 },
1106}
1107
1108void
1110{
1112 m_agent_coop->handle(), dereg_reason );
1113}
1114
1115void
1120
1123{
1124 if( !m_agent_coop )
1126 rc_agent_has_no_cooperation,
1127 "agent_t::so_this_coop_disp_binder() can be completed "
1128 "because agent is not bound to any cooperation" );
1129
1130 return m_agent_coop->coop_disp_binder();
1131}
1132
1134agent_t::so_agent_name() const noexcept
1135{
1136 if( m_name.has_value() )
1137 return { m_name.as_string_view() };
1138 else
1139 return { this };
1140}
1141
1142void
1144{
1146 m_subscriptions->drop_all_subscriptions();
1147}
1148
1149agent_ref_t
1151{
1152 agent_ref_t agent_ref( this );
1153 return agent_ref;
1154}
1155
1156void
1158{
1159 m_agent_coop = &coop;
1160}
1161
1162void
1164{
1165 event_queue_t * actual_queue = nullptr;
1166 {
1167 std::lock_guard< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
1168
1169 // Since v.5.5.8 shutdown is done by two simple step:
1170 // - remove actual value from m_event_queue;
1171 // - pushing final demand to actual event queue.
1172 //
1173 // No new demands will be sent to the agent, but all the subscriptions
1174 // remains. They will be destroyed at the very end of agent's lifetime.
1175
1176 if( m_event_queue )
1177 {
1178 // This pointer will be used later.
1179 actual_queue = m_event_queue;
1180
1181 // Final event must be pushed to queue.
1182 so_5::details::invoke_noexcept_code( [&] {
1183 m_event_queue->push_evt_finish(
1184 execution_demand_t(
1185 this,
1186 message_limit::control_block_t::none(),
1187 0,
1188 typeid(void),
1189 message_ref_t(),
1190 &agent_t::demand_handler_on_finish ) );
1191
1192 // No more events will be stored to the queue.
1193 m_event_queue = nullptr;
1194 } );
1195
1196 }
1197 else
1198 so_5::details::abort_on_fatal_error( [&] {
1199 SO_5_LOG_ERROR( so_environment(), log_stream )
1200 {
1201 log_stream << "Unexpected error: m_event_queue contains "
1202 "nullptr. Unable to push demand_handler_on_finish for "
1203 "the agent (" << this << "). Application will be aborted"
1204 << std::endl;
1205 }
1206 } );
1207 }
1208
1209 // Since v.5.8.5 pending demands can be skipped.
1212
1213 if( actual_queue )
1214 // Since v.5.5.24 we should utilize event_queue via
1215 // event_queue_hook.
1217 .event_queue_on_unbind( this, actual_queue );
1218}
1219
1220void
1222 const mbox_t & mbox_ref,
1223 std::type_index msg_type,
1224 const state_t & target_state,
1225 const event_handler_method_t & method,
1226 thread_safety_t thread_safety,
1227 event_handler_kind_t handler_kind )
1228{
1229 // Since v.5.4.0 there is no need for locking agent's mutex
1230 // because this operation can be performed only on agent's
1231 // working thread.
1232
1233 ensure_operation_is_on_working_thread( "so_create_event_subscription" );
1234
1235 // A new subscription can't be made if agent is already deactivated.
1236 if( is_agent_deactivated() )
1238 so_5::rc_agent_deactivated,
1239 "new subscription can't made for deactivated agent" );
1240
1241 m_subscriptions->create_event_subscription(
1242 mbox_ref,
1243 msg_type,
1244 detect_sink_for_message_type( msg_type ),
1245 target_state,
1246 method,
1247 thread_safety,
1248 handler_kind );
1249}
1250
1251void
1252agent_t::so_create_deadletter_subscription(
1253 const mbox_t & mbox,
1254 const std::type_index & msg_type,
1255 const event_handler_method_t & method,
1256 thread_safety_t thread_safety )
1257{
1258 ensure_operation_is_on_working_thread( "so_create_deadletter_subscription" );
1259
1260 // A new deadletter handler can't be made if agent is already deactivated.
1261 if( is_agent_deactivated() )
1263 so_5::rc_agent_deactivated,
1264 "new deadletter handler can't be set for deactivated agent" );
1265
1266 m_subscriptions->create_event_subscription(
1267 mbox,
1268 msg_type,
1269 detect_sink_for_message_type( msg_type ),
1270 deadletter_state,
1271 method,
1272 thread_safety,
1273 event_handler_kind_t::final_handler );
1274}
1275
1276void
1278 const mbox_t & mbox,
1279 const std::type_index & msg_type )
1280{
1281 // Since v.5.4.0 there is no need for locking agent's mutex
1282 // because this operation can be performed only on agent's
1283 // working thread.
1284
1285 ensure_operation_is_on_working_thread( "do_drop_deadletter_handler" );
1286
1287 m_subscriptions->drop_subscription( mbox, msg_type, deadletter_state );
1288}
1289
1292 const std::type_index & msg_type )
1293{
1294 auto * result = m_message_sinks->find_or_create( msg_type );
1295
1296 if( !result )
1298 so_5::rc_message_has_no_limit_defined,
1299 std::string( "message type without "
1300 "predefined limit for that type, type: " ) +
1301 msg_type.name() );
1302
1303 return *result;
1304}
1305
1306void
1308 const mbox_t & mbox,
1309 const std::type_index & msg_type,
1310 const state_t & target_state )
1311{
1312 // Since v.5.4.0 there is no need for locking agent's mutex
1313 // because this operation can be performed only on agent's
1314 // working thread.
1315
1316 ensure_operation_is_on_working_thread( "do_drop_subscription" );
1317
1318 m_subscriptions->drop_subscription( mbox, msg_type, target_state );
1319}
1320
1321void
1323 const mbox_t & mbox,
1324 const std::type_index & msg_type )
1325{
1326 // Since v.5.4.0 there is no need for locking agent's mutex
1327 // because this operation can be performed only on agent's
1328 // working thread.
1329
1331 "do_drop_subscription_for_all_states" );
1332
1333 m_subscriptions->drop_subscription_for_all_states( mbox, msg_type );
1334}
1335
1336bool
1338 const mbox_t & mbox,
1339 const std::type_index & msg_type,
1340 const state_t & target_state ) const noexcept
1341{
1342 return nullptr != m_subscriptions->find_handler(
1343 mbox->id(), msg_type, target_state );
1344}
1345
1346bool
1348 const mbox_t & mbox,
1349 const std::type_index & msg_type ) const noexcept
1350{
1351 return nullptr != m_subscriptions->find_handler(
1352 mbox->id(), msg_type, deadletter_state );
1353}
1354
1355namespace {
1356
1357/*!
1358 * \brief A helper function to select actual demand handler in
1359 * dependency of message kind.
1360 *
1361 * \since
1362 * v.5.5.23
1363 */
1364inline demand_handler_pfn_t
1366 const agent_t & agent,
1367 const message_ref_t & msg )
1368{
1369 demand_handler_pfn_t result = &agent_t::demand_handler_on_message;
1370 if( msg )
1371 {
1372 switch( message_kind( *msg ) )
1373 {
1374 case message_t::kind_t::classical_message : // Already has value.
1375 break;
1376
1377 case message_t::kind_t::user_type_message : // Already has value.
1378 break;
1379
1380 case message_t::kind_t::enveloped_msg :
1381 result = &agent_t::demand_handler_on_enveloped_msg;
1382 break;
1383
1384 case message_t::kind_t::signal :
1385 so_5::details::abort_on_fatal_error( [&] {
1386 SO_5_LOG_ERROR( agent.so_environment(), log_stream )
1387 {
1388 log_stream << "message that has data and message_kind_t::signal!"
1389 "Signals can't have data. Application will be aborted!"
1390 << std::endl;
1391 }
1392 } );
1393 break;
1394 }
1395 }
1396
1397 return result;
1398}
1399
1400} /* namespace anonymous */
1401
1402void
1404 const message_limit::control_block_t * limit,
1405 mbox_id_t mbox_id,
1406 const std::type_index & msg_type,
1407 const message_ref_t & message )
1408{
1409 const auto handler = select_demand_handler_for_message( *this, message );
1410
1411 read_lock_guard_t< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
1412
1413 if( m_event_queue )
1414 m_event_queue->push(
1415 execution_demand_t(
1416 this,
1417 limit,
1418 mbox_id,
1419 msg_type,
1420 message,
1421 handler ) );
1422}
1423
1424void
1425agent_t::demand_handler_on_start(
1426 current_thread_id_t working_thread_id,
1427 execution_demand_t & d )
1428{
1430
1432 d.m_receiver->m_working_thread_id,
1433 working_thread_id );
1434
1435 try
1436 {
1438 }
1439 catch( const std::exception & x )
1440 {
1441 impl::process_unhandled_exception(
1442 working_thread_id, x, *(d.m_receiver) );
1443 }
1444 catch( ... ) // Since v.5.5.24.3
1445 {
1446 impl::process_unhandled_unknown_exception(
1447 working_thread_id, *(d.m_receiver) );
1448 }
1449}
1450
1451void
1453{
1454 // Nothing more to do.
1455 // Just lock coop's binding_lock. If cooperation is not finished yet
1456 // it would stop the current thread.
1457 std::lock_guard< std::mutex > binding_lock{ m_agent_coop->m_lock };
1458}
1459
1460demand_handler_pfn_t
1462{
1463 return &agent_t::demand_handler_on_start;
1464}
1465
1466void
1467agent_t::demand_handler_on_finish(
1468 current_thread_id_t working_thread_id,
1469 execution_demand_t & d )
1470{
1471 {
1472 // Sentinel must finish its work before decrementing
1473 // reference count to cooperation.
1475 d.m_receiver->m_working_thread_id,
1476 working_thread_id );
1477
1478 try
1479 {
1481 }
1482 catch( const std::exception & x )
1483 {
1484 impl::process_unhandled_exception(
1485 working_thread_id, x, *(d.m_receiver) );
1486 }
1487 catch( ... ) // Since v.5.5.24.3
1488 {
1489 impl::process_unhandled_unknown_exception(
1490 working_thread_id, *(d.m_receiver) );
1491 }
1492
1493 // Since v.5.5.15 agent should be returned in default state.
1495 }
1496
1497 // Cooperation should receive notification about agent deregistration.
1500}
1501
1502demand_handler_pfn_t
1504{
1505 return &agent_t::demand_handler_on_finish;
1506}
1507
1508void
1509agent_t::demand_handler_on_message(
1510 current_thread_id_t working_thread_id,
1511 execution_demand_t & d )
1512{
1514
1515 auto handler = d.m_receiver->m_handler_finder(
1516 d, "demand_handler_on_message" );
1517 if( handler )
1518 process_message(
1519 working_thread_id,
1520 d,
1521 handler->m_thread_safety,
1522 handler->m_method );
1523}
1524
1525demand_handler_pfn_t
1527{
1528 return &agent_t::demand_handler_on_message;
1529}
1530
1531void
1532agent_t::demand_handler_on_enveloped_msg(
1533 current_thread_id_t working_thread_id,
1534 execution_demand_t & d )
1535{
1537
1538 auto handler = d.m_receiver->m_handler_finder(
1539 d, "demand_handler_on_enveloped_msg" );
1540 process_enveloped_msg( working_thread_id, d, handler );
1541}
1542
1543demand_handler_pfn_t
1545{
1546 return &agent_t::demand_handler_on_enveloped_msg;
1547}
1548
1549void
1550agent_t::process_message(
1551 current_thread_id_t working_thread_id,
1553 thread_safety_t thread_safety,
1554 event_handler_method_t method )
1555{
1556 // Since v.5.8.5 pending demands may be skipped after dereg.
1559 {
1560 // Demand has to be ignored.
1561 return;
1562 }
1563
1565 d.m_receiver->m_working_thread_id,
1566 // v.5.7.3
1567 // If event_handler is thread_safe-handler then null_thread_id
1568 // has to be used instead of the actual working_thread_id.
1569 so_5::thread_safe == thread_safety
1570 ? null_current_thread_id()
1571 : working_thread_id
1572 };
1573
1574 try
1575 {
1576 method( d.m_message_ref );
1577 }
1578 catch( const std::exception & x )
1579 {
1580 impl::process_unhandled_exception(
1581 working_thread_id, x, *(d.m_receiver) );
1582 }
1583 catch( ... ) // Since v.5.5.24.3
1584 {
1585 impl::process_unhandled_unknown_exception(
1586 working_thread_id, *(d.m_receiver) );
1587 }
1588}
1589
1590void
1591agent_t::process_enveloped_msg(
1592 current_thread_id_t working_thread_id,
1594 const impl::event_handler_data_t * handler_data )
1595{
1596 // Since v.5.8.5 pending demands may be skipped after dereg.
1599 {
1600 // Demand has to be ignored.
1601 return;
1602 }
1603
1604 using namespace enveloped_msg::impl;
1605
1606 if( handler_data )
1607 {
1608 // If this is intermediate_handler then we should pass the
1609 // whole envelope to it.
1611 // Just call process_message() in that case because
1612 // process_message() already does what we need (including
1613 // setting working_thread_id and handling of exceptions).
1614 process_message(
1615 working_thread_id,
1616 d,
1617 handler_data->m_thread_safety,
1618 handler_data->m_method );
1619 else
1620 // For a final_handler the payload should be extracted
1621 // from the envelope and the extracted payload should go
1622 // to the handler.
1623 // We don't expect exceptions here and can't restore after them.
1624 so_5::details::invoke_noexcept_code( [&] {
1625 auto & envelope = message_to_envelope( d.m_message_ref );
1627 working_thread_id,
1628 d,
1629 *handler_data
1630 };
1631 envelope.access_hook(
1632 so_5::enveloped_msg::access_context_t::handler_found,
1633 invoker );
1634 } );
1635 }
1636}
1637
1638void
1640 const char * operation_name ) const
1641{
1642 if( so_5::query_current_thread_id() != m_working_thread_id )
1643 {
1644 std::ostringstream s;
1645
1646 s << operation_name
1647 << ": operation is enabled only on agent's working thread; "
1648 << "working_thread_id: ";
1649
1650 if( m_working_thread_id == null_current_thread_id() )
1651 s << "<NONE>";
1652 else
1653 s << m_working_thread_id;
1654
1655 s << ", current_thread_id: " << so_5::query_current_thread_id();
1656
1658 so_5::rc_operation_enabled_only_on_agent_working_thread,
1659 s.str() );
1660 }
1661}
1662
1663void
1665{
1666 if( m_delivery_filters )
1667 {
1668 m_delivery_filters->drop_all();
1669 m_delivery_filters.reset();
1670 }
1671}
1672
1673void
1675 const mbox_t & mbox,
1676 const std::type_index & msg_type,
1677 delivery_filter_unique_ptr_t filter )
1678{
1679 ensure_operation_is_on_working_thread( "set_delivery_filter" );
1680
1681 // A new delivery filter can't be set if agent is already deactivated.
1682 if( is_agent_deactivated() )
1684 so_5::rc_agent_deactivated,
1685 "new delivery filter can't be set for deactivated agent" );
1686
1687 // Message sink for that message type have to be obtained with
1688 // the respect to message limits.
1689 auto & target_sink = detect_sink_for_message_type( msg_type );
1690
1691 if( !m_delivery_filters )
1692 m_delivery_filters.reset( new impl::delivery_filter_storage_t() );
1693
1694 m_delivery_filters->set_delivery_filter(
1695 mbox,
1696 msg_type,
1697 std::move(filter),
1698 outliving_mutable( target_sink ) );
1699}
1700
1701void
1703 const mbox_t & mbox,
1704 const std::type_index & msg_type ) noexcept
1705{
1706 ensure_operation_is_on_working_thread( "set_delivery_filter" );
1707
1708 if( m_delivery_filters )
1709 m_delivery_filters->drop_delivery_filter( mbox, msg_type );
1710}
1711
1715 const char * /*context_marker*/ )
1716{
1717 auto search_result = find_event_handler_for_current_state( d );
1718 if( !search_result )
1719 // Since v.5.5.21 we should check for deadletter handler for that demand.
1720 search_result = find_deadletter_handler( d );
1721
1722 return search_result;
1723}
1724
1728 const char * context_marker )
1729{
1730 auto search_result = find_event_handler_for_current_state( d );
1731
1732 if( !search_result )
1733 {
1734 // Since v.5.5.21 we should check for deadletter handler for that demand.
1735 search_result = find_deadletter_handler( d );
1736
1737 if( search_result )
1738 {
1739 // Deadletter handler found. This must be reflected in trace.
1741 d,
1742 context_marker,
1743 search_result );
1744
1745 return search_result;
1746 }
1747 }
1748
1749 // This trace will be made if an event_handler is found for the
1750 // current state or not found at all (including deadletter handlers).
1752 d,
1753 context_marker,
1754 search_result );
1755
1756 return search_result;
1757}
1758
1761 execution_demand_t & d )
1762{
1763 const impl::event_handler_data_t * search_result = nullptr;
1764 const state_t * s = &d.m_receiver->so_current_state();
1765
1766 do {
1767 search_result = d.m_receiver->m_subscriptions->find_handler(
1768 d.m_mbox_id,
1769 d.m_msg_type,
1770 *s );
1771
1772 if( !search_result )
1773 s = s->parent_state();
1774
1775 } while( search_result == nullptr && s != nullptr );
1776
1777 return search_result;
1778}
1779
1782 execution_demand_t & demand )
1783{
1784 return demand.m_receiver->m_subscriptions->find_handler(
1785 demand.m_mbox_id,
1786 demand.m_msg_type,
1788}
1789
1790void
1792 const state_t & state_to_be_set )
1793{
1794 // The agent can't leave awaiting_deregistration_state if it's
1795 // in that state already.
1797 state_to_be_set != awaiting_deregistration_state )
1798 {
1800 rc_agent_deactivated,
1801 "unable to switch agent to another state because the "
1802 "agent is already deactivated" );
1803 }
1804
1805 if( state_to_be_set.is_target( this ) )
1806 {
1807 // Since v.5.5.18 we must check nested state switch operations.
1808 // This object will drop pointer to the current state.
1809 impl::state_switch_guard_t switch_op_guard( *this );
1810
1811 auto actual_new_state = state_to_be_set.actual_state_to_enter();
1812 if( !( *actual_new_state == *m_current_state_ptr ) )
1813 {
1814 // New state differs from the current one.
1815 // Actual state switch must be performed.
1816 do_state_switch( *actual_new_state );
1817
1818 // State listener should be informed.
1819 m_state_listener_controller.changed(
1820 *this,
1821 *m_current_state_ptr );
1822 }
1823 }
1824 else
1826 rc_agent_unknown_state,
1827 "unable to switch agent to alien state "
1828 "(the state that doesn't belong to this agent)" );
1829}
1830
1831void
1833 const state_t & state_to_be_set ) noexcept
1834{
1835 state_t::path_t old_path;
1836 state_t::path_t new_path;
1837
1838 // Since v.5.5.22 we will change the value of m_current_state_ptr
1839 // during state change procedure.
1840 auto current_st = m_current_state_ptr;
1841
1842 current_st->fill_path( old_path );
1843 state_to_be_set.fill_path( new_path );
1844
1845 // Find the first item which is different in the paths.
1846 std::size_t first_diff = 0;
1847 for(; first_diff < std::min(
1848 current_st->nested_level(),
1849 state_to_be_set.nested_level() );
1850 ++first_diff )
1851 if( old_path[ first_diff ] != new_path[ first_diff ] )
1852 break;
1853
1854 // Do call for on_exit and on_enter for states.
1855 // on_exit and on_enter should not throw exceptions.
1856 so_5::details::invoke_noexcept_code( [&] {
1857
1859 *this, *current_st );
1860
1861 for( std::size_t i = current_st->nested_level();
1862 i >= first_diff; )
1863 {
1864 // Modify current state before calling on_exit handler.
1865 m_current_state_ptr = old_path[ i ];
1866 // Perform on_exit actions.
1867 old_path[ i ]->call_on_exit();
1868 if( i )
1869 --i;
1870 else
1871 break;
1872 }
1873
1875 *this, state_to_be_set );
1876
1877 for( std::size_t i = first_diff;
1878 i <= state_to_be_set.nested_level();
1879 ++i )
1880 {
1881 // Modify current state before calling on_exit handler.
1882 m_current_state_ptr = new_path[ i ];
1883
1884 // Perform on_enter actions.
1885 new_path[ i ]->call_on_enter();
1886 }
1887 } );
1888
1889 // Now the current state for the agent can be changed.
1890 m_current_state_ptr = &state_to_be_set;
1892}
1893
1894void
1896{
1897 if( !( st_default == so_current_state() ||
1898 is_agent_deactivated() ) )
1899 {
1900 // The agent must be returned to the default state.
1901 // All on_exit handlers must be called at this point.
1902 so_change_state( st_default );
1903 }
1904}
1905
1906bool
1911
1912void
1914{
1916 "state_t::time_limit" );
1917
1918 // NOTE: the agent can be in awaiting_deregistration_state, but we don't
1919 // check it because an attempt to make a deadletter subscription will
1920 // fail in that case.
1921
1922 if( !m_state_time_limit_handling_data.is_defined() )
1923 {
1924 // We need a new special mbox for msg_timeout message.
1925 auto unique_mbox =
1927 // A new MPSC mbox will be used for that.
1929 // New MPSC mbox will be directly connected
1930 // to this agent.
1931 *this );
1932
1933 // We need a special deadletter handler.
1934 so_subscribe_deadletter_handler(
1935 unique_mbox,
1936 &agent_t::evt_state_time_limit );
1937
1938 // Now we can store time_limit_handling_data in the agent.
1939 m_state_time_limit_handling_data.make_defined(
1940 std::move(unique_mbox) );
1941 }
1942}
1943
1944void
1947{
1948 const auto now = state_t::time_limit_t::steady_clock::now();
1949
1950 // We should have all active states in a row.
1951 state_t::state_path_t current_state_path{ *m_current_state_ptr };
1952
1953 // NOTE: we're going from the outer-most to the inner-most state.
1954 // It means that if A is the parent for B and A's timeout exceeded
1955 // then the timeout for B is ignored (even if it's exceeded too).
1956 for( const state_t * st : current_state_path )
1957 {
1958 if( st->m_time_limit )
1959 {
1960 if( st->m_time_limit->is_limit_exceeded( now ) )
1961 {
1962 // We have to switch the agent to a new state.
1963 so_change_state( st->m_time_limit->state_to_switch() );
1964 break;
1965 }
1966 }
1967 }
1968
1969 // NOTE: it's possible that we haven't find an exceeded timeout.
1970 // This could happen if msg_timeout waited too long in the event_queue and
1971 // the agent changed its state during that time.
1972}
1973
1974} /* namespace so_5 */
Interface for message sink.
agent_tuning_options_t & options()
Read-Write access to agent options.
environment_t & env() const
Access to SObjectizer Environment.
Helper class for holding agent's identity (name or pointer).
agent_identity_t(const agent_t *pointer) noexcept
Initializing constructor for case when agent has no user specified name.
Interface of the agent state listener.
Type for holding information necessary for handling time limits for agent states.
Definition agent.hpp:3019
bool is_defined() const noexcept
Is the data for handling time limits defined?
Definition agent.cpp:692
A base class for agents.
Definition agent.hpp:673
static demand_handler_pfn_t get_demand_handler_on_start_ptr() noexcept
Definition agent.cpp:1461
void do_change_agent_state(const state_t &state_to_be_set)
Perform actual operations related to state switch.
Definition agent.cpp:1791
void so_initiate_agent_definition()
A correct initiation of so_define_agent method call.
Definition agent.cpp:963
void evt_state_time_limit(mhood_t< so_5::details::msg_state_timeout >)
Special event handler to process state time limits.
Definition agent.cpp:1945
static const impl::event_handler_data_t * find_deadletter_handler(execution_demand_t &demand)
Search for event handler between deadletter handlers.
Definition agent.cpp:1781
static demand_handler_pfn_t get_demand_handler_on_message_ptr() noexcept
Definition agent.cpp:1526
bool is_agent_deactivated() const noexcept
Is agent already deactivated.
Definition agent.cpp:1907
void so_switch_to_awaiting_deregistration_state()
Switching agent to special state in case of unhandled exception.
Definition agent.cpp:881
const state_t & so_current_state() const
Access to the current agent state.
Definition agent.hpp:967
agent_ref_t create_ref()
Make an agent reference.
Definition agent.cpp:1150
bool so_is_active_state(const state_t &state_to_check) const noexcept
Is a state activated?
Definition agent.cpp:844
void ensure_operation_is_on_working_thread(const char *operation_name) const
Enables operation only if it is performed on agent's working thread.
Definition agent.cpp:1639
bool so_was_defined() const
Is method define_agent already called?
Definition agent.cpp:981
agent_status_t
Enumeration of possible agent statuses.
Definition agent.hpp:2834
@ shutdown_with_skipping_pending_demands
Agent was shutdown and all pending demands have to be skipped.
@ defined
Agent is defined.
@ state_switch_in_progress
State switch operation is in progress.
void destroy_all_subscriptions_and_filters() noexcept
Destroy all agent's subscriptions.
Definition agent.cpp:1143
void shutdown_agent() noexcept
Agent shutdown deriver.
Definition agent.cpp:1163
void ensure_binding_finished()
Ensures that all agents from cooperation are bound to dispatchers.
Definition agent.cpp:1452
coop_t * m_agent_coop
Agent is belong to this cooperation.
Definition agent.hpp:2953
agent_status_t m_current_status
Current agent status.
Definition agent.hpp:2851
event_queue_t * m_event_queue
A pointer to event_queue.
Definition agent.hpp:2933
virtual void so_define_agent()
Hook on define agent for SObjectizer.
Definition agent.cpp:975
void drop_all_delivery_filters() noexcept
Drops all delivery filters.
Definition agent.cpp:1664
bool do_check_subscription_presence(const mbox_t &mbox, const std::type_index &msg_type, const state_t &target_state) const noexcept
Check the presence of a subscription.
Definition agent.cpp:1337
void do_state_switch(const state_t &state_to_be_set) noexcept
Actual action for switching agent state.
Definition agent.cpp:1832
void return_to_default_state_if_possible() noexcept
Return agent to the default state.
Definition agent.cpp:1895
agent_t(environment_t &env)
Constructor.
Definition agent.cpp:775
void do_set_delivery_filter(const mbox_t &mbox, const std::type_index &msg_type, delivery_filter_unique_ptr_t filter)
Set a delivery filter.
Definition agent.cpp:1674
mbox_t so_make_new_direct_mbox()
Create a new direct mbox for that agent.
Definition agent.cpp:893
static execution_hint_t so_create_execution_hint(execution_demand_t &demand)
Create execution hint for the specified demand.
Definition agent.cpp:1035
void so_change_state(const state_t &new_state)
Change the current state of the agent.
Definition agent.cpp:936
void push_event(const message_limit::control_block_t *limit, mbox_id_t mbox_id, const std::type_index &msg_type, const message_ref_t &message)
Push event into the event queue.
Definition agent.cpp:1403
coop_handle_t so_coop() const
Get a handle of agent's coop.
Definition agent.cpp:994
void so_deregister_agent_coop_normally()
A helper method for deregistering agent's coop in case of normal deregistration.
Definition agent.cpp:1116
void do_drop_delivery_filter(const mbox_t &mbox, const std::type_index &msg_type) noexcept
Drop a delivery filter.
Definition agent.cpp:1702
virtual exception_reaction_t so_exception_reaction() const noexcept
A reaction from SObjectizer to an exception from agent's event.
Definition agent.cpp:871
void so_deactivate_agent()
Deactivate the agent.
Definition agent.cpp:945
disp_binder_shptr_t so_this_coop_disp_binder() const
Returns the dispatcher binder that is used as the default binder for the agent's coop.
Definition agent.cpp:1122
abstract_message_sink_t & detect_sink_for_message_type(const std::type_index &msg_type)
Helper function that returns a message sink to be used for subscriptions for specified message type.
Definition agent.cpp:1291
const demands_handling_on_dereg_t m_demands_handling_on_dereg
What to do with pending demands on deregistration.
Definition agent.hpp:3085
void so_drop_all_subscriptions_and_filters()
Dropping all agents subscriptions and filters.
Definition agent.cpp:954
virtual void so_evt_finish()
Hook of agent finish in SObjectizer.
Definition agent.cpp:838
static demand_handler_pfn_t get_demand_handler_on_finish_ptr() noexcept
Definition agent.cpp:1503
void so_add_nondestroyable_listener(agent_state_listener_t &state_listener)
Add a state listener to the agent.
Definition agent.cpp:853
static const impl::event_handler_data_t * handler_finder_msg_tracing_disabled(execution_demand_t &demand, const char *context_marker)
Handler finder for the case when message delivery tracing is disabled.
Definition agent.cpp:1713
const state_t * m_current_state_ptr
Current agent state.
Definition agent.hpp:2826
const priority_t m_priority
Priority of the agent.
Definition agent.hpp:2969
agent_identity_t so_agent_name() const noexcept
Get an optional name of the agent.
Definition agent.cpp:1134
static const impl::event_handler_data_t * handler_finder_msg_tracing_enabled(execution_demand_t &demand, const char *context_marker)
Handler finder for the case when message delivery tracing is enabled.
Definition agent.cpp:1726
void bind_to_coop(coop_t &coop)
Bind agent to the cooperation.
Definition agent.cpp:1157
static const impl::event_handler_data_t * find_event_handler_for_current_state(execution_demand_t &demand)
Actual search for event handler with respect to parent-child relationship between agent states.
Definition agent.cpp:1760
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
Definition agent.cpp:887
void so_destroy_deadletter_subscription(const mbox_t &mbox, const std::type_index &msg_type)
Destroy a subscription for a deadletter handler.
Definition agent.cpp:1277
bool do_check_deadletter_presence(const mbox_t &mbox, const std::type_index &msg_type) const noexcept
Check the presence of a deadletter handler.
Definition agent.cpp:1347
void do_drop_subscription_for_all_states(const mbox_t &mbox, const std::type_index &msg_type)
Remove subscription for all states.
Definition agent.cpp:1322
agent_t(context_t ctx)
Constructor which simplifies agent construction with or without agent's tuning options.
Definition agent.cpp:788
static agent_tuning_options_t tuning_options()
Create tuning options object with default values.
Definition agent.hpp:1136
void so_add_destroyable_listener(agent_state_listener_unique_ptr_t state_listener)
Add a state listener to the agent.
Definition agent.cpp:862
environment_t & so_environment() const noexcept
Access to the SObjectizer Environment which this agent is belong.
Definition agent.cpp:987
const state_t & so_default_state() const
Access to the agent's default state.
Definition agent.cpp:900
virtual ~agent_t()
Definition agent.cpp:824
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:1006
environment_t & m_env
SObjectizer Environment for which the agent is belong.
Definition agent.hpp:2897
void so_deregister_agent_coop(int dereg_reason)
A helper method for deregistering agent's coop.
Definition agent.cpp:1109
void do_drop_subscription(const mbox_t &mbox, const std::type_index &msg_type, const state_t &target_state)
Remove subscription for the state specified.
Definition agent.cpp:1307
agent_t(environment_t &env, agent_tuning_options_t tuning_options)
Constructor which allows specification of agent's tuning options.
Definition agent.cpp:781
handler_finder_t m_handler_finder
Function for searching event handler.
Definition agent.hpp:2874
void define_state_time_limit_handling_data_if_needed()
Initialize data for handling time limit of agent's states.
Definition agent.cpp:1913
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
Definition agent.cpp:832
static demand_handler_pfn_t get_demand_handler_on_enveloped_msg_ptr() noexcept
Definition agent.cpp:1544
void so_create_event_subscription(const mbox_t &mbox_ref, std::type_index type_index, const state_t &target_state, const event_handler_method_t &method, thread_safety_t thread_safety, event_handler_kind_t handler_kind)
Create a subscription for an event.
Definition agent.cpp:1221
A collector for agent tuning options.
so_5::priority_t query_priority() const noexcept
Get priority value.
bool is_user_provided_subscription_storage_factory() const noexcept
Does a user provide a specific subscription_storage_factory?
demands_handling_on_dereg_t demands_handling_on_dereg() const noexcept
Type of smart handle for a cooperation.
Agent cooperation.
Definition coop.hpp:389
exception_reaction_t exception_reaction() const noexcept
Get the current exception rection flag for that cooperation.
Definition coop.hpp:758
coop_handle_t handle() noexcept
Get handle for this coop.
Definition coop.hpp:432
An implementation of handler_invoker interface.
SObjectizer Environment.
void deregister_coop(coop_handle_t coop, int reason) noexcept
Deregister the cooperation.
An interface of event queue for agent.
A hint for a dispatcher for execution of event for the concrete execution_demand.
static execution_hint_t create_empty_execution_hint(execution_demand_t &demand)
A special class for accessing private members of agent_coop.
static void decrement_usage_count(coop_t &coop)
static void increment_usage_count(coop_t &coop) noexcept
A helper class for accessing the functionality of environment-class which is specific for SObjectizer...
event_queue_t * event_queue_on_bind(agent_t *agent, event_queue_t *original_queue) noexcept
Call the event_queue_hook when an agent is being bound to a particular event_queue.
void event_queue_on_unbind(agent_t *agent, event_queue_t *queue) noexcept
Call the event_queue_hook when an agent is being unbound from its event_queue.
bool is_msg_tracing_enabled() const
Is message delivery tracing enabled?
internal_env_iface_t(environment_t &env)
Initializing constructor.
mbox_t create_limitless_mpsc_mbox(agent_t &single_consumer)
Create multi-producer/single-consumer mbox that ignores message limits.
agent_t::agent_status_t m_previous_status
Definition agent.cpp:910
state_switch_guard_t(agent_t &agent)
Definition agent.cpp:913
A base class for agent messages.
Definition message.hpp:47
Type for holding agent name.
unsigned int m_length
Name length.
name_for_agent_t & operator=(name_for_agent_t &&other) noexcept
Definition agent.cpp:142
name_for_agent_t(const name_for_agent_t &)
Definition agent.cpp:115
name_for_agent_t(name_for_agent_t &&other) noexcept
Definition agent.cpp:136
bool has_value() const noexcept
Does this object have a value?
Definition agent.cpp:169
name_for_agent_t & operator=(const name_for_agent_t &)
Definition agent.cpp:129
std::string_view as_string_view() const
Get the value as a string_view.
Definition agent.cpp:160
name_for_agent_t()
Default constructor makes an null value.
Definition agent.cpp:104
name_for_agent_t(std::string_view value)
Initializing constructor.
Definition agent.cpp:108
Wrapper around a pointer to partially constructed agent.
Scoped guard for shared locks.
Helper class for simplify iteration on state's path.
Definition state.hpp:1841
state_path_t(const state_t &state) noexcept
Initializing constructor.
Definition state.hpp:1853
Information of time_limit for a state.
Definition agent.cpp:199
void change(duration_t limit, const state_t &state_to_switch) noexcept
Definition agent.cpp:224
const state_t & state_to_switch() const noexcept
Definition agent.cpp:314
void on_state_activation(const agent_t::state_time_limit_handling_data_t &info) noexcept
Definition agent.cpp:243
bool is_limit_exceeded(const steady_clock::time_point current_time) const noexcept
Definition agent.cpp:300
std::optional< activation_data_t > m_activation_data
Definition agent.cpp:355
std::reference_wrapper< const state_t > m_state_to_switch
The target state to switch after the timeout.
Definition agent.cpp:350
void activate(const agent_t::state_time_limit_handling_data_t &info)
Definition agent.cpp:268
time_limit_t(duration_t limit, const state_t &state_to_switch) noexcept
Initializing constructor.
Definition agent.cpp:208
void on_state_deactivation() noexcept
Definition agent.cpp:254
bool is_target(const agent_t *agent) const noexcept
Is agent owner of this state?
Definition agent.cpp:552
state_t & time_limit(duration_t timeout, const state_t &state_to_switch)
Set up a time limit for the state.
Definition agent.cpp:569
std::enable_if< details::is_agent_method_pointer< details::method_arity::nullary, Method_Pointer >::value, state_t & >::type on_enter(Method_Pointer pfn)
Set on enter handler.
Definition agent.hpp:4042
state_t & drop_time_limit()
Drop time limit for the state if defined.
Definition agent.cpp:618
history_t m_state_history
Type of state history.
Definition state.hpp:1643
const state_t * parent_state() const noexcept
Get a parent state if exists.
Definition state.hpp:1719
state_t(initial_substate_of parent, std::string state_name, history_t state_history)
Constructor for the case when state is the initial substate of some parent state.
Definition agent.cpp:427
state_t(substate_of parent)
Constructor for the case when state is a substate of some parent state.
Definition agent.cpp:447
bool is_active() const noexcept
Is this state or any of its substates activated?
Definition agent.hpp:3918
state_t(state_t &&other)
Move constructor.
Definition agent.cpp:470
state_t(agent_t *target_agent, std::string state_name, state_t *parent_state, std::size_t nested_level, history_t state_history)
Fully initializing constructor.
Definition agent.cpp:362
state_t(agent_t *agent, std::string state_name, history_t state_history)
Definition agent.cpp:409
state_t * m_parent_state
Parent state.
Definition state.hpp:1624
state_t(substate_of parent, std::string state_name, history_t state_history)
Constructor for the case when state is a substate of some parent state.
Definition agent.cpp:458
const state_t * actual_state_to_enter() const
Find actual state to be activated for agent.
Definition agent.cpp:626
state_t(initial_substate_of parent)
Constructor for the case when state is the initial substate of some parent state.
Definition agent.cpp:416
const state_t * m_initial_substate
The initial substate.
Definition state.hpp:1636
bool operator==(const state_t &state) const noexcept
Definition agent.cpp:492
state_t(initial_substate_of parent, std::string state_name)
Constructor for the case when state is the initial substate of some parent state.
Definition agent.cpp:421
size_t m_substate_count
Number of substates.
Definition state.hpp:1673
state_t(agent_t *agent, history_t state_history)
Definition agent.cpp:396
bool operator!=(const state_t &state) const noexcept
Definition state.hpp:360
agent_t *const m_target_agent
Owner of this state.
Definition state.hpp:1605
std::string query_name() const
Get textual name of the state.
Definition agent.cpp:498
state_t(substate_of parent, std::string state_name)
Constructor for the case when state is a substate of some parent state.
Definition agent.cpp:452
const state_t * m_last_active_substate
Last active substate.
Definition state.hpp:1654
void handle_time_limit_on_enter() const
A special handler of time limit to be used on entering into state.
Definition agent.cpp:670
state_t(agent_t *agent, std::string state_name)
Definition agent.cpp:403
void activate() const
Switch agent to that state.
Definition agent.cpp:563
history_t
Type of history for state.
Definition state.hpp:173
@ none
State has no history.
@ deep
State has deep history.
@ shallow
State has shallow history.
void update_history_in_parent_states() const
A helper method which is used during state change for update state with history.
Definition agent.cpp:649
void handle_time_limit_on_exit() const
A special handler of time limit to be used on exiting from state.
Definition agent.cpp:677
state_t(agent_t *agent)
Definition agent.cpp:390
An indentificator for the timer.
Definition timers.hpp:82
#define SO_5_FUNC
Definition declspec.hpp:48
#define SO_5_LOG_ERROR(logger, var_name)
A special macro for helping error logging.
#define SO_5_THROW_EXCEPTION(error_code, desc)
Definition exception.hpp:74
demand_handler_pfn_t select_demand_handler_for_message(const agent_t &agent, const message_ref_t &msg)
A helper function to select actual demand handler in dependency of message kind.
Definition agent.cpp:1365
mbox_t make_direct_mbox_with_respect_to_custom_factory(partially_constructed_agent_ptr_t agent_ptr, const agent_tuning_options_t &tuning_options, mbox_t standard_mbox)
Helper for creation of the direct mbox for an agent.
Definition agent.cpp:722
unsigned int ensure_valid_agent_name_length(std::size_t length)
Definition agent.cpp:83
std::string create_anonymous_state_name(const agent_t *agent, const state_t *st)
Definition agent.cpp:183
const state_t deadletter_state(nullptr, "<DEADLETTER_STATE>")
A special object to be used as state for make subscriptions for deadletter handlers.
subscription_storage_factory_t detect_subscription_storage_factory_to_use(environment_t &env, const agent_tuning_options_t &tuning_options)
Helper for selection of subscription storage factory.
Definition agent.cpp:759
const state_t awaiting_deregistration_state(nullptr, "<AWAITING_DEREGISTRATION_AFTER_UNHANDLED_EXCEPTION>")
A special object for the state in which agent is awaiting for deregistration after unhandled exceptio...
Enumeration of cooperation deregistration reasons.
Definition coop.hpp:39
const int normal
Normal deregistration.
Definition coop.hpp:46
Some reusable and low-level classes/functions which can be used in public header files.
Internal namespace with details of agent_t implementation.
Definition agent.hpp:462
Various helpers for message delivery tracing mechanism.
void safe_trace_state_leaving(const agent_t &state_owner, const state_t &state)
Helper for tracing the fact of leaving a state.
void safe_trace_state_entering(const agent_t &state_owner, const state_t &state)
Helper for tracing the fact of entering into a state.
void trace_deadletter_handler_search_result(const execution_demand_t &demand, const char *context_marker, const event_handler_data_t *search_result)
Helper for tracing the result of search of deadletter handler.
void trace_event_handler_search_result(const execution_demand_t &demand, const char *context_marker, const event_handler_data_t *search_result)
Helper for tracing the result of event handler search.
Details of SObjectizer run-time implementations.
Definition agent.cpp:905
All stuff related to message limits.
Definition message.hpp:862
Private part of message limit implementation.
Definition agent.cpp:33
exception_reaction_t
A reaction of SObjectizer to an exception from agent event.
Definition agent.hpp:65
@ abort_on_exception
Execution of application must be aborted immediatelly.
Definition agent.hpp:67
demands_handling_on_dereg_t
How pending demands should be handled on deregistration.
@ skip
Pending demands have to be skipped.
const thread_safety_t not_thread_safe
Shorthand for thread unsafety indicator.
Definition types.hpp:62
const thread_safety_t thread_safe
Shorthand for thread safety indicator.
Definition types.hpp:69
thread_safety_t
Thread safety indicator.
Definition types.hpp:50
@ user_type_message
Message is an user type message.
SO_5_FUNC void swap(name_for_agent_t &a, name_for_agent_t &b) noexcept
Definition agent.cpp:150
event_handler_kind_t
Kind of an event handler.
Definition types.hpp:154
Type for case when agent has no user-provided name.
SO_5_FUNC std::array< char, c_string_size > make_c_string() const noexcept
Make a c-string with text representation of a value.
Definition agent.cpp:39
A description of event execution demand.
mbox_id_t m_mbox_id
ID of mbox.
agent_t * m_receiver
Receiver of demand.
const message_limit::control_block_t * m_limit
Optional message limit for that message.
A helper class for temporary setting and then dropping the ID of the current working thread.
Definition agent.hpp:474
Information about event_handler and its properties.
thread_safety_t m_thread_safety
Is event handler thread safe or not.
event_handler_kind_t m_kind
Kind of this event handler.
Helper for marking initial substate of composite state.
Definition state.hpp:57
A control block for one message limit.
Definition message.hpp:976
static void decrement(const control_block_t *limit)
Definition message.hpp:1028
activation_data_t(timer_id_t timer, steady_clock::time_point expiration_point)
Definition agent.cpp:336
steady_clock::time_point m_expiration_point
Timeout of timeout expiration.
Definition agent.cpp:334
timer_id_t m_timer
ID of delayed timeout signal.
Definition agent.cpp:331
Helper for marking a substate of composite state.
Definition state.hpp:89