SObjectizer-5 Extra
time_limited.hpp
Go to the documentation of this file.
1 /*!
2  * \file
3  * \brief Implementation of time-limited asynchronous one-time operation.
4  *
5  * \since
6  * v.1.0.4
7  */
8 
9 #pragma once
10 
11 #include <so_5_extra/async_op/details.hpp>
12 #include <so_5_extra/async_op/errors.hpp>
13 
14 #include <so_5/details/invoke_noexcept_code.hpp>
15 
16 #include <so_5/impl/internal_env_iface.hpp>
17 
18 #include <so_5/agent.hpp>
19 #include <so_5/send_functions.hpp>
20 
21 #include <so_5/timers.hpp>
22 
23 #include <so_5/outliving.hpp>
24 
25 #include <chrono>
26 #include <vector>
27 
28 namespace so_5 {
29 
30 namespace extra {
31 
32 namespace async_op {
33 
34 namespace time_limited {
35 
36 //! Enumeration for status of operation.
37 enum class status_t
38  {
39  //! Status of operation is unknown because the
40  //! operation data has been moved to another proxy-object.
42  //! Operation is not activated yet.
44  //! Operation is activated.
45  activated,
46  //! Operation is completed.
47  completed,
48  //! Operation is cancelled.
49  cancelled,
50  //! Operation is timed-out.
51  timedout
52  };
53 
54 namespace details {
55 
56 /*!
57  * \brief Container of all data related to async operation.
58  *
59  * \attention
60  * There can be cyclic references from op_data_t instance to
61  * completion/timeout handlers and back from completion/timeout
62  * handlers to op_data_t. Because of that op_data_t can't
63  * perform cleanup in its destructor because the destructor
64  * will not be called until these cyclic references exist.
65  * It requires special attention: content of op_data_t must
66  * be cleared by owners of op_data_t instances.
67  *
68  * \since
69  * v.1.0.4
70  */
71 class op_data_t : protected ::so_5::atomic_refcounted_t
72  {
73  private :
74  friend class ::so_5::intrusive_ptr_t<op_data_t>;
75 
76  //! Description of one completion handler subscription.
78  {
79  //! Mbox from that a message is expected.
81 
82  //! State for that a subscription should be created.
84 
85  //! Subscription type.
86  /*!
87  * \note
88  * This is a subscription type. Not a type which will
89  * be passed to the event handler.
90  */
92 
93  //! Event handler.
95 
96  //! Initializing constructor.
98  so_5::mbox_t mbox,
99  const so_5::state_t & state,
100  std::type_index subscription_type,
101  so_5::event_handler_method_t handler )
102  : m_mbox( std::move(mbox) )
105  , m_handler( std::move(handler) )
106  {}
107  };
108 
109  //! Description of one timeout handler subscription.
111  {
112  //! State for that a subscription should be created.
114 
115  //! Event handler.
117 
118  //! Initializing constructor.
120  const so_5::state_t & state,
121  so_5::event_handler_method_t handler )
123  , m_handler( std::move(handler) )
124  {}
125  };
126 
127  //! Owner of async operation.
129 
130  //! Type of timeout message.
132 
133  //! The status of the async operation.
135 
136  //! Subscriptions for completion handlers which should be created on
137  //! activation.
139 
140  //! Subscriptions for timeout handlers which should be created on
141  //! activation.
143 
144  //! A default timeout handler which will be used as
145  //! deadletter handler for timeout message/signal.
146  /*!
147  * \note
148  * Can be nullptr. If so the default timeout handler will
149  * be created during activation procedure.
150  */
152 
153  //! A mbox to which timeout message/signal will be sent.
154  /*!
155  * \note
156  * It will be limitless MPSC-mbox.
157  */
159 
160  //! An ID of timeout message/signal.
161  /*!
162  * Will be used for cancellation of async operation.
163  */
165 
166  //! Create subscriptions for all defined completion handlers.
167  /*!
168  * This method will rollback all subscriptions made in case of
169  * an exception. It means that if an exception is throw during
170  * subscription then all already subscribed completion handlers
171  * will be removed.
172  */
173  void
175  {
176  std::size_t i = 0;
177  ::so_5::details::do_with_rollback_on_exception( [&] {
178  for(; i != m_completion_handlers.size(); ++i )
179  {
180  auto & ch = m_completion_handlers[ i ];
181  m_owner.get().so_create_event_subscription(
182  ch.m_mbox,
183  ch.m_subscription_type,
184  ch.m_state.get(),
185  ch.m_handler,
186  ::so_5::thread_safety_t::unsafe,
187  ::so_5::event_handler_kind_t::final_handler );
188  }
189  },
190  [&] {
191  drop_completion_handlers_subscriptions_up_to( i );
192  } );
193  }
194 
195  //! Removes subscription for the first N completion handlers.
196  void
198  //! Upper bound in m_completion_handlers (not included).
199  std::size_t upper_border ) noexcept
200  {
201  for( std::size_t i = 0; i != upper_border; ++i )
202  {
203  const auto & ch = m_completion_handlers[ i ];
204  m_owner.get().so_destroy_event_subscription(
205  ch.m_mbox,
206  ch.m_subscription_type,
207  ch.m_state.get() );
208  }
209  }
210 
211  //! Create subscriptions for all defined timeout handlers
212  //! (including default handler).
213  /*!
214  * This method will rollback all subscriptions made in case of
215  * an exception. It means that if an exception is throw during
216  * subscription then all already subscribed timeout handlers
217  * will be removed.
218  */
219  void
221  {
223  ::so_5::details::do_with_rollback_on_exception( [&] {
224  do_subscribe_default_timeout_handler();
225  },
226  [&] {
227  do_unsubscribe_default_timeout_handler();
228  } );
229  }
230 
231  //! An implementation of subscription of timeout handlers.
232  /*!
233  * Default timeout handler is not subscribed by this method.
234  */
235  void
237  {
238  std::size_t i = 0;
239  ::so_5::details::do_with_rollback_on_exception( [&] {
240  for(; i != m_timeout_handlers.size(); ++i )
241  {
242  auto & th = m_timeout_handlers[ i ];
243  m_owner.get().so_create_event_subscription(
244  m_timeout_mbox,
245  m_timeout_msg_type,
246  th.m_state,
247  th.m_handler,
248  ::so_5::thread_safety_t::unsafe,
249  ::so_5::event_handler_kind_t::final_handler );
250  }
251  },
252  [&] {
253  drop_timeout_handlers_subscriptions_up_to( i );
254  } );
255  }
256 
257  //! An implementation of subscription of default timeout handler.
258  void
260  {
261  m_owner.get().so_create_deadletter_subscription(
262  m_timeout_mbox,
263  m_timeout_msg_type,
264  m_default_timeout_handler,
265  ::so_5::thread_safety_t::unsafe );
266  }
267 
268  //! An implementation of unsubscription of the first N timeout handlers.
269  /*!
270  * The default timeout handler is not unsubscribed by this method.
271  */
272  void
274  //! Upper bound in m_timeout_handlers (not included).
275  std::size_t upper_border ) noexcept
276  {
277  for( std::size_t i = 0; i != upper_border; ++i )
278  {
279  const auto & th = m_timeout_handlers[ i ];
280  m_owner.get().so_destroy_event_subscription(
281  m_timeout_mbox,
282  m_timeout_msg_type,
283  th.m_state.get() );
284  }
285  }
286 
287  //! Remove subscriptions for all completion handlers.
288  void
290  {
291  drop_completion_handlers_subscriptions_up_to(
292  m_completion_handlers.size() );
293  }
294 
295  //! Remove subscriptions for all timeout handlers
296  //! (including the default timeout handler).
297  void
299  {
302  }
303 
304  //! Actual unsubscription for the default timeout handler.
305  void
307  {
308  m_owner.get().so_destroy_deadletter_subscription(
309  m_timeout_mbox,
310  m_timeout_msg_type );
311  }
312 
313  //! Actual unsubscription for timeout handlers.
314  /*!
315  * The default timeout handler is not unsubscribed by this method.
316  */
317  void
319  {
320  drop_timeout_handlers_subscriptions_up_to(
321  m_timeout_handlers.size() );
322  }
323 
324  //! Performs total cleanup of the operation data.
325  /*!
326  * All subscriptions are removed.
327  * The delayed message is released.
328  *
329  * Content of m_completion_handlers, m_timeout_handlers and
330  * m_default_timeout_handler will be erased.
331  */
332  void
334  {
335  // All subscriptions must be removed.
338 
339  // Timer is no more needed.
340  m_timeout_timer_id.release();
341 
342  // Information about completion and timeout handlers
343  // is no more needed.
344  m_completion_handlers.clear();
345  m_timeout_handlers.clear();
346  m_default_timeout_handler = ::so_5::event_handler_method_t{};
347  }
348 
349  //! Creates the default timeout handler if it is not set
350  //! by the user.
351  void
353  {
354  if( !m_default_timeout_handler )
355  {
356  m_default_timeout_handler =
357  [op = ::so_5::intrusive_ptr_t<op_data_t>(this)](
358  ::so_5::message_ref_t & /*msg*/ )
359  {
360  op->timedout();
361  };
362  }
363  }
364 
365  //! Cleans the operation data and sets the status specified.
366  void
368  //! A new status to be set.
369  status_t status )
370  {
372  m_status = status;
373  }
374 
375  public :
376  //! Initializing constructor.
378  //! Owner of the async operation.
379  ::so_5::outliving_reference_t< ::so_5::agent_t > owner,
380  //! Type of the timeout message/signal.
381  const std::type_index & timeout_msg_type )
382  : m_owner( owner )
384  // Timeout mbox must be created right now.
385  // It will be used during timeout_handlers processing.
386  , m_timeout_mbox(
389  &m_owner.get(),
390  // No message limits for this mbox.
391  nullptr ) )
392  {}
393 
394  //! Access to timeout mbox.
395  [[nodiscard]] const ::so_5::mbox_t &
396  timeout_mbox() const noexcept
397  {
398  return m_timeout_mbox;
399  }
400 
401  //! Access to type of timeout message/signal.
402  [[nodiscard]] const std::type_index &
403  timeout_msg_type() const noexcept
404  {
405  return m_timeout_msg_type;
406  }
407 
408  //! Access to owner of the async operation.
409  [[nodiscard]] ::so_5::agent_t &
410  owner() noexcept
411  {
412  return m_owner.get();
413  }
414 
415  //! Access to SObjectizer Environment in that the owner is working.
416  [[nodiscard]] ::so_5::environment_t &
417  environment() noexcept
418  {
419  return m_owner.get().so_environment();
420  }
421 
422  //! Reserve a space for storage of completion handlers.
423  void
425  //! A required capacity.
426  std::size_t capacity )
427  {
428  m_completion_handlers.reserve( capacity );
429  }
430 
431  //! Reserve a space for storage of timeout handlers.
432  void
434  //! A required capacity.
435  std::size_t capacity )
436  {
437  m_timeout_handlers.reserve( capacity );
438  }
439 
440  //! Performs activation actions.
441  /*!
442  * The default timeout handler is created if necessary.
443  *
444  * Subscriptions for completion handlers and timeout handler are created.
445  * If an exception is thrown during subscription then all already
446  * subscribed completion and timeout handler will be unsubscribed.
447  */
448  void
450  {
452 
454 
455  ::so_5::details::do_with_rollback_on_exception(
456  [&] {
457  create_timeout_handlers_subscriptions();
458  },
459  [&] {
460  drop_all_completion_handlers_subscriptions();
461  } );
462  }
463 
464  //! Change the status of async operation.
465  void
467  //! A new status to be set.
468  status_t status ) noexcept
469  {
470  m_status = status;
471  }
472 
473  //! Get the current status of async operation.
474  [[nodiscard]] status_t
475  current_status() const noexcept
476  {
477  return m_status;
478  }
479 
480  //! Add a new completion handler for the async operation.
481  void
483  //! A mbox from which the completion message/signal is expected.
484  const ::so_5::mbox_t & mbox,
485  //! A state for completion handler.
486  const ::so_5::state_t & state,
487  //! A type of the completion message/signal.
488  const std::type_index msg_type,
489  //! Completion handler itself.
490  ::so_5::event_handler_method_t evt_handler )
491  {
492  m_completion_handlers.emplace_back(
493  mbox,
494  state,
495  msg_type,
496  std::move(evt_handler) );
497  }
498 
499  //! Add a new timeout handler for the async operation.
500  void
502  //! A state for timeout handler.
503  const ::so_5::state_t & state,
504  //! Timeout handler itself.
505  ::so_5::event_handler_method_t evt_handler )
506  {
507  m_timeout_handlers.emplace_back(
508  state,
509  std::move(evt_handler) );
510  }
511 
512  //! Set the default timeout handler.
513  /*!
514  * \note
515  * If there already is the default timeout handler then the old
516  * one will be replaced by new handler.
517  */
518  void
520  //! Timeout handler itself.
521  ::so_5::event_handler_method_t evt_handler )
522  {
523  m_default_timeout_handler = std::move(evt_handler);
524  }
525 
526  //! Set the ID of timeout message/signal timer.
527  void
529  //! Timer ID of delayed timeout message/signal.
530  ::so_5::timer_id_t id )
531  {
532  m_timeout_timer_id = std::move(id);
533  }
534 
535  //! Mark the async operation as completed.
536  void
537  completed() noexcept
538  {
540  }
541 
542  //! Mark the async operation as timedout.
543  void
544  timedout() noexcept
545  {
547  }
548 
549  //! Mark the async operation as cancelled.
550  void
551  cancelled() noexcept
552  {
554  }
555  };
556 
557 /*!
558  * \brief An alias for smart pointer to operation data.
559  *
560  * \tparam Operation_Data Type of actual operation data representation.
561  * It is expected to be op_data_t or some of its derivatives.
562  */
563 template< typename Operation_Data >
565 
566 } /* namespace details */
567 
568 /*!
569  * \brief An object that allows to cancel async operation.
570  *
571  * Usage example:
572  * \code
573  * namespace asyncop = so_5::extra::async_op::time_limited;
574  * class demo : public so_5::agent_t {
575  * asyncop::cancellation_point_t<> cp_;
576  * ...
577  * void initiate_async_op() {
578  * auto op = asyncop::make<timeout>(*this);
579  * op.completed_on(...);
580  * op.timeout_handler(...);
581  * cp_ = op.activate(std::chrono::milliseconds(300), ...);
582  * }
583  * void on_operation_aborted(mhood_t<op_aborted_notify> cmd) {
584  * // Operation aborted. There is no need to wait for
585  * // completion of operation.
586  * cp_.cancel();
587  * }
588  * };
589  * \endcode
590  *
591  * \note
592  * This class is DefaultConstructible and Moveable, but not Copyable.
593  *
594  * \attention
595  * Objects of this class are not thread safe. It means that cancellation
596  * point should be used only by agent which created it. And the cancellation
597  * point can't be used inside thread-safe event handlers of that agent.
598  *
599  * \tparam Operation_Data Type of actual operation data representation.
600  * Please note that this template parameter is indended to be used for
601  * debugging and testing purposes only.
602  *
603  * \since
604  * v.1.0.4
605  */
606 template< typename Operation_Data = details::op_data_t >
608  {
609  private :
610  template<typename Msg, typename Op_Data> friend class definition_point_t;
611 
612  //! Actual data for async op.
613  /*!
614  * \note
615  * This can be a nullptr if the default constructor was used,
616  * or if operation is already cancelled, or if the content of
617  * the cancellation_point was moved to another object.
618  */
620 
621  //! Initializing constructor to be used by definition_point.
623  //! Actual data for async operation.
624  //! Can't be null.
625  details::op_shptr_t< Operation_Data > op )
626  : m_op( std::move(op) )
627  {}
628 
629  public :
630  cancellation_point_t() = default;
631 
632  cancellation_point_t( const cancellation_point_t & ) = delete;
634 
636  operator=( const cancellation_point_t & ) = delete;
637 
639  operator=( cancellation_point_t && ) = default;
640 
641  //! Get the status of the operation.
642  /*!
643  * \note
644  * The value status_t::unknown_moved_away can be returned if
645  * the actual data of the async operation was moved to another object
646  * (like another cancellation_point_t). Or after a call to
647  * cleanup() method.
648  */
649  [[nodiscard]] status_t
650  status() const noexcept
651  {
652  if( m_op)
653  return m_op->current_status();
655  }
656 
657  //! Can the async operation be cancelled via this cancellation point?
658  /*!
659  * \return true if the cancellation_point holds actual async operation's
660  * data and this async operation is not completed nor timedout yet.
661  */
662  [[nodiscard]] bool
663  is_cancellable() const noexcept
664  {
665  return m_op && status_t::activated == m_op->current_status();
666  }
667 
668  //! An attempt to cancel the async operation.
669  /*!
670  * \note
671  * Operation will be cancelled only if (true == is_cancellable()).
672  *
673  * It is safe to call cancel() if the operation is already
674  * cancelled or completed, or timed out. In that case the call to
675  * cancel() will have no effect.
676  */
677  void
678  cancel() noexcept
679  {
680  if( is_cancellable() )
681  {
682  m_op->cancelled();
683  }
684  }
685 
686  //! Throw out a reference to the async operation data.
687  /*!
688  * A cancellation_point holds a reference to the async operation
689  * data. It means that the async operation data will be destroyed
690  * only when the cancellation_point will be destroyed. For example,
691  * in that case:
692  * \code
693  * namespace asyncop = so_5::extra::async_op::time_limited;
694  * class demo : public so_5::agent_t {
695  * ...
696  * asyncop::cancellation_point_t<> cp_;
697  * ...
698  * void initiate_async_op() {
699  * cp_ = asyncop::make<timeout_msg>(*this)
700  * ...
701  * .activate(...);
702  * ...
703  * }
704  * ...
705  * void on_some_interruption() {
706  * // Cancel asyncop.
707  * cp_.cancel();
708  * // Async operation is cancelled, but the async operation
709  * // data is still in memory. The data will be deallocated
710  * // only when cp_ receives new value or when cp_ will be
711  * // destroyed (e.g. after destruction of demo agent).
712  * }
713  * \endcode
714  *
715  * A call to cleanup() method removes the reference to the async
716  * operation data. It means that if the operation is already
717  * completed, timed out or cancelled, then the operation data
718  * fill be deallocated.
719  *
720  * \note
721  * If the operation is still in progress then a call to cleanup()
722  * doesn't break the operation. You need to call cancel() manually
723  * before calling cleanup() to cancel the operation.
724  */
725  void
726  cleanup() noexcept
727  {
728  m_op.reset();
729  }
730  };
731 
732 namespace details
733 {
734 
735 /*!
736  * \brief A basic part of implementation of definition_point.
737  *
738  * This part is independent from timeout message/signal type.
739  *
740  * \note
741  * This is Moveable class. But not DefaultConstructible.
742  * And not Copyable.
743  *
744  * \note
745  * Most of methods are declared as protected to be used only in
746  * derived class.
747  *
748  * \since
749  * v.1.0.4
750  */
751 template< typename Operation_Data >
753  {
754  protected :
755  //! Actual async operation data.
756  /*!
757  * \note This pointer can be nullptr after activation or
758  * when the object is move away.
759  */
761 
762  //! Initializing constructor.
764  //! Actual async operation data.
765  //! This pointer can't be nullptr.
766  details::op_shptr_t< Operation_Data > op )
767  : m_op( std::move(op) )
768  {}
769 
774 
779 
780  //! Can the async_op be activated?
781  /*!
782  * The async operation can't be activated if it is already activated.
783  * Or if the content of the definition_point is moved to another
784  * definition_point.
785  */
786  [[nodiscard]] bool
787  is_activable() const noexcept
788  {
789  return m_op;
790  }
791 
792  //! Throws an exception if the async operation can't be activated.
793  void
795  {
796  if( !is_activable() )
799  "An attempt to do something on non-activable async_op" );
800  }
801 
802  //! Reserve a space for storage of completion handlers.
803  void
805  //! A required capacity.
806  std::size_t capacity )
807  {
809 
811  }
812 
813  //! Reserve a space for storage of timeout handlers.
814  void
816  //! A required capacity.
817  std::size_t capacity )
818  {
820 
822  }
823 
824  //! The actual implementation of addition of completion handler.
825  /*!
826  * \tparam Msg_Target It can be a mbox, or a reference to an agent.
827  * In the case if Msg_Target if a reference to an agent the agent's
828  * direct mbox will be used as message source.
829  *
830  * \tparam Event_Handler Type of actual handler for message/signal.
831  * It can be a pointer to agent's method or lambda (or another
832  * type of functional object).
833  */
834  template<
835  typename Msg_Target,
836  typename Event_Handler >
837  void
839  //! A source from which a completion message/signal is expected.
840  //! It can be a mbox or an agent (in that case the agent's direct
841  //! mbox will be used).
842  Msg_Target && msg_target,
843  //! A state for that a completion handler is defined.
844  const ::so_5::state_t & state,
845  //! Completion handler itself.
846  Event_Handler && evt_handler )
847  {
849 
850  const auto mbox = ::so_5::extra::async_op::details::target_to_mbox(
851  msg_target );
852 
853  auto evt_handler_info =
855  mbox,
856  m_op->owner(),
858 
860  [op = m_op,
862  ::so_5::message_ref_t & msg )
863  {
864  op->completed();
865  user_handler( msg );
866  };
867 
869  mbox,
870  state,
872  std::move(actual_handler) );
873  }
874 
875  //! The actual implementation of addition of timeout handler.
876  /*!
877  * \tparam Event_Handler Type of actual handler for message/signal.
878  * It can be a pointer to agent's method or lambda (or another
879  * type of functional object).
880  */
881  template< typename Event_Handler >
882  void
884  //! A state for that a timeout handler is defined.
885  const ::so_5::state_t & state,
886  //! Timeout handler itself.
887  Event_Handler && evt_handler )
888  {
890 
892  state,
895  }
896 
897  //! The actual implementation of addition of the default timeout handler.
898  /*!
899  * \tparam Event_Handler Type of actual handler for message/signal.
900  * It can be a pointer to agent's method or lambda (or another
901  * type of functional object).
902  */
903  template< typename Event_Handler >
904  void
906  //! The default timeout handler itself.
907  Event_Handler && evt_handler )
908  {
910 
914  }
915 
916  private :
917  //! A helper method for creation a wrapper for a timeout handler.
918  template< typename Event_Handler >
921  //! Timeout handler to be wrapped.
923  {
924  auto evt_handler_info =
926  m_op->timeout_mbox(),
927  m_op->owner(),
932  std::string(
933  "An attempt to register timeout handler for "
934  "different message type. Expected type: " )
935  + m_op->timeout_msg_type().name()
936  + ", actual type: " + evt_handler_info.m_msg_type.name() );
937 
938  return
939  [op = m_op,
941  ::so_5::message_ref_t & msg )
942  {
943  op->timedout();
944  user_handler( msg );
945  };
946  }
947  };
948 
949 } /* namespace details */
950 
951 /*!
952  * \brief An interface for definition of async operation.
953  *
954  * Object of this type is usually created by make() function and
955  * is used for definition of async operation. Completion and timeout
956  * handlers are set for async operation by using definition_point object.
957  *
958  * Then an user calls activate() method and the definition_point transfers
959  * the async operation data into cancellation_point object. It means that
960  * after a call to activate() method the definition_point object should
961  * not be used. Because it doesn't hold any async operation anymore.
962  *
963  * A simple usage without storing the cancellation_point:
964  * \code
965  * namespace asyncop = so_5::extra::async_op::time_limited;
966  * class demo : public so_5::agent_t {
967  * struct timeout final : public so_5::signal_t {};
968  * ...
969  * void initiate_async_op() {
970  * // Create a definition_point for a new async operation...
971  * asyncop::make<timeout>(*this)
972  * // ...then set up completion handler(s)...
973  * .completed_on(
974  * *this,
975  * so_default_state(),
976  * &demo::on_first_completion_msg )
977  * .completed_on(
978  * some_external_mbox_,
979  * some_user_defined_state_,
980  * [this](mhood_t<another_completion_msg> cmd) {...})
981  * // ...then set up timeout handler(s)...
982  * .timeout_handler(
983  * so_default_state(),
984  * &demo::on_timeout )
985  * .timeout_handler(
986  * some_user_defined_state_,
987  * [this](mhood_t<timeout> cmd) {...})
988  * // ...and now we can activate the operation.
989  * .activate(std::chrono::milliseconds(300));
990  * ...
991  * }
992  * };
993  * \endcode
994  * \note
995  * There is no need to hold definition_point object after activation
996  * of the async operation. This object can be safely discarded.
997  *
998  * A more complex example using cancellation_point for
999  * cancelling the operation.
1000  * \code
1001  * namespace asyncop = so_5::extra::async_op::time_limited;
1002  * class demo : public so_5::agent_t {
1003  * struct timeout final : public so_5::signal_t {};
1004  * ...
1005  * // Cancellation point for the async operation.
1006  * asyncop::cancellation_point_t<> cp_;
1007  * ...
1008  * void initiate_async_op() {
1009  * // Create a definition_point for a new async operation
1010  * // and store the cancellation point after activation.
1011  * cp_ = asyncop::make<timeout>(*this)
1012  * // ...then set up completion handler(s)...
1013  * .completed_on(
1014  * *this,
1015  * so_default_state(),
1016  * &demo::on_first_completion_msg )
1017  * .completed_on(
1018  * some_external_mbox_,
1019  * some_user_defined_state_,
1020  * [this](mhood_t<another_completion_msg> cmd) {...})
1021  * // ...then set up timeout handler(s)...
1022  * .timeout_handler(
1023  * so_default_state(),
1024  * &demo::on_timeout )
1025  * .timeout_handler(
1026  * some_user_defined_state_,
1027  * [this](mhood_t<timeout> cmd) {...})
1028  * // ...and now we can activate the operation.
1029  * .activate(std::chrono::milliseconds(300));
1030  * ...
1031  * }
1032  * ...
1033  * void on_abortion_signal(mhood_t<abort_signal>) {
1034  * // The current async operation should be cancelled.
1035  * cp_.cancel();
1036  * }
1037  * };
1038  * \endcode
1039  *
1040  * \note
1041  * This class is Moveable, but not DefaultConstructible nor Copyable.
1042  *
1043  * \attention
1044  * Objects of this class are not thread safe. It means that a definition
1045  * point should be used only by agent which created it. And the definition
1046  * point can't be used inside thread-safe event handlers of that agent.
1047  *
1048  * \since
1049  * v.1.0.4
1050  */
1051 template<
1052  typename Message,
1053  typename Operation_Data = details::op_data_t >
1055  : protected details::msg_independent_part_of_definition_point_t< Operation_Data >
1056  {
1057  //! A short alias for base type.
1058  using base_type_t =
1060 
1061  public :
1062  //! Initializing constructor.
1064  //! The owner of the async operation.
1065  ::so_5::outliving_reference_t< ::so_5::agent_t > owner )
1066  : base_type_t(
1068  new Operation_Data( owner, typeid(Message) ) ) )
1069  {}
1070 
1072  {
1073  // If operation data is still here then it means that
1074  // there wasn't call to `activate()` and we should cancel
1075  // all described handlers.
1076  // This will lead to deallocation of operation data.
1077  if( this->m_op )
1078  this->m_op->cancelled();
1079  }
1080 
1081  definition_point_t( const definition_point_t & ) = delete;
1083  operator=( const definition_point_t & ) = delete;
1084 
1085  definition_point_t( definition_point_t && ) = default;
1087  operator=( definition_point_t && ) = default;
1088 
1089  using base_type_t::is_activable;
1090 
1091  /*!
1092  * \brief Reserve a space for storage of completion handlers.
1093  *
1094  * Usage example:
1095  * \code
1096  * namespace asyncop = so_5::extra::async_op::time_limited;
1097  * auto op = asyncop::make<timeout>(some_agent);
1098  * // Reserve space for four completion handlers.
1099  * op.reserve_completion_handlers_capacity(4);
1100  * op.completed_on(...);
1101  * op.completed_on(...);
1102  * op.completed_on(...);
1103  * op.completed_on(...);
1104  * op.activate(...);
1105  * \endcode
1106  */
1109  //! A required capacity.
1110  std::size_t capacity ) &
1111  {
1113  return *this;
1114  }
1115 
1116  //! Just a proxy for actual version of %reserve_completion_handlers_capacity.
1117  template< typename... Args >
1118  auto
1120  {
1121  return std::move(this->reserve_completion_handlers_capacity(
1122  std::forward<Args>(args)... ));
1123  }
1124 
1125  /*!
1126  * \brief Reserve a space for storage of timeout handlers.
1127  *
1128  * Usage example:
1129  * \code
1130  * namespace asyncop = so_5::extra::async_op::time_limited;
1131  * auto op = asyncop::make<timeout>(some_agent);
1132  * // Reserve space for eight timeout handlers.
1133  * op.reserve_timeout_handlers_capacity(8);
1134  * op.timeout_handler(...);
1135  * op.timeout_handler(...);
1136  * ...
1137  * op.activate(...);
1138  * \endcode
1139  */
1142  //! A required capacity.
1143  std::size_t capacity ) &
1144  {
1146  return *this;
1147  }
1148 
1149  //! Just a proxy for actual version of %reserve_timeout_handlers_capacity.
1150  template< typename... Args >
1151  auto
1153  {
1154  return std::move(this->reserve_timeout_handlers_capacity(
1155  std::forward<Args>(args)... ));
1156  }
1157 
1158  /*!
1159  * \brief Add a completion handler for the async operation.
1160  *
1161  * Usage example:
1162  * \code
1163  * namespace asyncop = so_5::extra::async_op::time_limited;
1164  * class demo : public so_5::agent_t {
1165  * ...
1166  * void initiate_async_op() {
1167  * asyncop::make<timeout>(*this)
1168  * .completed_on(
1169  * *this,
1170  * so_default_state(),
1171  * &demo::some_event )
1172  * .completed_on(
1173  * some_mbox_,
1174  * some_agent_state_,
1175  * [this](mhood_t<some_msg> cmd) {...})
1176  * ...
1177  * .activate(...);
1178  * }
1179  * };
1180  * \endcode
1181  *
1182  * \note
1183  * The completion handler will be stored inside async operation
1184  * data. Actual subscription for it will be made during activation
1185  * of the async operation.
1186  *
1187  * \tparam Msg_Target It can be a mbox, or a reference to an agent.
1188  * In the case if Msg_Target if a reference to an agent the agent's
1189  * direct mbox will be used as message source.
1190  *
1191  * \tparam Event_Handler Type of actual handler for message/signal.
1192  * It can be a pointer to agent's method or lambda (or another
1193  * type of functional object).
1194  */
1195  template<
1196  typename Msg_Target,
1197  typename Event_Handler >
1200  //! A source from which a completion message is expected.
1201  //! It \a msg_target is a reference to an agent then
1202  //! the agent's direct mbox will be used.
1203  Msg_Target && msg_target,
1204  //! A state for which that completion handler will be subscribed.
1205  const ::so_5::state_t & state,
1206  //! The completion handler itself.
1207  Event_Handler && evt_handler ) &
1208  {
1209  this->completed_on_impl(
1211  state,
1213 
1214  return *this;
1215  }
1216 
1217  //! Just a proxy for the main version of %completed_on.
1218  template< typename... Args >
1220  completed_on( Args && ...args ) &&
1221  {
1222  return std::move(this->completed_on(std::forward<Args>(args)...));
1223  }
1224 
1225  /*!
1226  * \brief Add a timeout handler for the async operation.
1227  *
1228  * Usage example:
1229  * \code
1230  * namespace asyncop = so_5::extra::async_op::time_limited;
1231  * class demo : public so_5::agent_t {
1232  * ...
1233  * void initiate_async_op() {
1234  * asyncop::make<timeout>(*this)
1235  * .timeout_handler(
1236  * so_default_state(),
1237  * &demo::some_event )
1238  * .timeout_handler(
1239  * some_agent_state_,
1240  * [this](mhood_t<timeout> cmd) {...})
1241  * ...
1242  * .activate(...);
1243  * }
1244  * };
1245  * \endcode
1246  *
1247  * \note
1248  * The timeout handler will be stored inside async operation
1249  * data. Actual subscription for it will be made during activation
1250  * of the async operation.
1251  *
1252  * \tparam Event_Handler Type of actual handler for message/signal.
1253  * It can be a pointer to agent's method or lambda (or another
1254  * type of functional object).
1255  * Please notice that Event_Handler must receive a message/signal
1256  * of type \a Message. It means that Event_Handler can have one of the
1257  * following formats:
1258  * \code
1259  * ret_type handler(Message);
1260  * ret_type handler(const Message &);
1261  * ret_type handler(mhood_t<Message>); // This is the recommended format.
1262  * \endcode
1263  */
1264  template< typename Event_Handler >
1267  const ::so_5::state_t & state,
1268  Event_Handler && evt_handler ) &
1269  {
1270  this->timeout_handler_impl(
1271  state,
1273 
1274  return *this;
1275  }
1276 
1277  //! Just a proxy for the main version of %timeout_handler.
1278  template< typename... Args >
1280  timeout_handler( Args && ...args ) &&
1281  {
1282  return std::move(this->timeout_handler(
1283  std::forward<Args>(args)...));
1284  }
1285 
1286  /*!
1287  * \brief Add the default timeout handler for the async operation.
1288  *
1289  * The default timeout handler will be called if no timeout
1290  * handler was called for timeout message/signal.
1291  * A deadletter handlers from SObjectizer-5.5.21 are used for
1292  * implementation of the default timeout handler for async
1293  * operation.
1294  *
1295  * There can be only one the default timeout handler.
1296  * If the default timeout handler is already specified a new call
1297  * to default_timeout_handler() will replace the old handler with
1298  * new one (the old default timeout handler will be lost).
1299  *
1300  * Usage example:
1301  * \code
1302  * namespace asyncop = so_5::extra::async_op::time_limited;
1303  * class demo : public so_5::agent_t {
1304  * ...
1305  * void initiate_async_op() {
1306  * asyncop::make<timeout>(*this)
1307  * .default_timeout_handler(
1308  * &demo::some_deadletter_handler )
1309  * ...
1310  * .activate(...);
1311  * }
1312  * };
1313  * \endcode
1314  *
1315  * \note
1316  * The default timeout handler will be stored inside async operation
1317  * data. Actual subscription for it will be made during activation
1318  * of the async operation.
1319  *
1320  * \tparam Event_Handler Type of actual handler for message/signal.
1321  * It can be a pointer to agent's method or lambda (or another
1322  * type of functional object).
1323  * Please notice that Event_Handler must receive a message/signal
1324  * of type \a Message. It means that Event_Handler can have one of the
1325  * following formats:
1326  * \code
1327  * ret_type handler(Message);
1328  * ret_type handler(const Message &);
1329  * ret_type handler(mhood_t<Message>); // This is the recommended format.
1330  * \endcode
1331  */
1332  template< typename Event_Handler >
1335  //! The default timeout handler itself.
1336  Event_Handler && evt_handler ) &
1337  {
1340 
1341  return *this;
1342  }
1343 
1344  //! Just a proxy for the main version of %default_timeout_handler.
1345  template< typename... Args >
1347  default_timeout_handler( Args && ...args ) &&
1348  {
1349  return std::move(this->default_timeout_handler(
1350  std::forward<Args>(args)...));
1351  }
1352 
1353  /*!
1354  * \brief Activation of the async operation.
1355  *
1356  * All subscriptions for completion and timeout handlers will be made
1357  * here. Then the timeout message/signal will be sent. And then
1358  * a cancellation_point for that async operation will be returned.
1359  *
1360  * An example of activation of the async operation in the case
1361  * when \a Message is a signal:
1362  * \code
1363  * namespace asyncop = so_5::extra::async_op::time_limited;
1364  * class demo : public so_5::agent_t {
1365  * struct timeout final : public so_5::signal_t {};
1366  * ...
1367  * void initiate_async_op() {
1368  * asyncop::make<timeout>()
1369  * ... // Set up completion and timeout handlers...
1370  * // Activation with just one argument - timeout duration.
1371  * .activate(std::chrono::milliseconds(200));
1372  * ...
1373  * }
1374  * };
1375  * \endcode
1376  *
1377  * An example of activation of the async operation in the case
1378  * when \a Message is a message:
1379  * \code
1380  * namespace asyncop = so_5::extra::async_op::time_limited;
1381  * class demo : public so_5::agent_t {
1382  * struct timeout final : public so_5::message_t {
1383  * int id_;
1384  * const std::string reply_payload_;
1385  * const so_5::mbox_t reply_mbox_;
1386  *
1387  * timeout(int id, std::string playload, so_5::mbox_t mbox)
1388  * : id_(id), reply_payload_(std::move(payload), reply_mbox_(std::move(mbox))
1389  * {}
1390  * };
1391  * ...
1392  * void initiate_async_op() {
1393  * asyncop::make<timeout>()
1394  * ... // Set up completion and timeout handlers...
1395  * // Activation with several arguments.
1396  * .activate(
1397  * // This is timeout duration.
1398  * std::chrono::milliseconds(200),
1399  * // All these arguments will be passed to the constructor of timeout message.
1400  * current_op_id,
1401  * op_reply_payload,
1402  * op_result_consumer_mbox);
1403  * ...
1404  * }
1405  * };
1406  * \endcode
1407  *
1408  * \note
1409  * There is no need to store the cancellation_point returned if you
1410  * don't want to cancel async operation. The return value of
1411  * activate() can be safely discarded in that case.
1412  *
1413  * \attention
1414  * If cancellation_point is stored then it should be used only on
1415  * the working context of the agent for that the async operation is
1416  * created. And the cancellation_point should not be used in the
1417  * thread-safe event handlers of that agent. It is because
1418  * cancellation_point is not a thread safe object.
1419  *
1420  * \note
1421  * If an exception is thrown during the activation procedure all
1422  * completion and timeout handlers which were subscribed before
1423  * the exception will be unsubscribed. And the async operation
1424  * data will be deleted. It means that after an exception in
1425  * activate() method the definition_point can't be used anymore.
1426  */
1427  template< typename... Args >
1428  cancellation_point_t< Operation_Data >
1430  //! The maximum duration for the async operation.
1431  //! Note: this should be non-negative value.
1432  std::chrono::steady_clock::duration timeout,
1433  //! Arguments for the construction of timeout message/signal
1434  //! of the type \a Message.
1435  //! This arguments will be passed to so_5::send_delayed
1436  //! function.
1437  Args && ...args ) &
1438  {
1439  this->ensure_activable();
1440 
1441  // From this point the definition_point will be detached
1442  // from operation data. It means that is_activable() will
1443  // return false.
1444  auto op = std::move(this->m_op);
1445 
1448  op->setup_timer_id(
1450  op->timeout_mbox(),
1451  timeout,
1453  std::forward<Args>(args)... ) );
1455  },
1456  [&] {
1457  op->cancelled();
1458  } );
1459 
1461  }
1462 
1463  //! Just a proxy for the main version of %activate.
1464  template< typename... Args >
1465  auto
1466  activate( Args && ...args ) &&
1467  {
1468  return this->activate( std::forward<Args>(args)... );
1469  }
1470  };
1471 
1472 //
1473 // make
1474 //
1475 /*!
1476  * \brief A factory for creation definition points of async operations.
1477  *
1478  * Instead of creation of definition_point_t object by hand this
1479  * helper function should be used.
1480  *
1481  * Usage examples:
1482  * \code
1483  * namespace asyncop = so_5::extra::async_op::time_limited;
1484  * class demo : public so_5::agent_t {
1485  * struct timeout final : public so_5::signal_t {};
1486  * ...
1487  * void initiate_async_op() {
1488  * // Create a definition_point for a new async operation...
1489  * asyncop::make<timeout>(*this)
1490  * // ...then set up completion handler(s)...
1491  * .completed_on(
1492  * *this,
1493  * so_default_state(),
1494  * &demo::on_first_completion_msg )
1495  * .completed_on(
1496  * some_external_mbox_,
1497  * some_user_defined_state_,
1498  * [this](mhood_t<another_completion_msg> cmd) {...})
1499  * // ...then set up timeout handler(s)...
1500  * .timeout_handler(
1501  * so_default_state(),
1502  * &demo::on_timeout )
1503  * .timeout_handler(
1504  * some_user_defined_state_,
1505  * [this](mhood_t<timeout> cmd) {...})
1506  * // ...and now we can activate the operation.
1507  * .activate(std::chrono::milliseconds(300));
1508  * ...
1509  * }
1510  * };
1511  * \endcode
1512  *
1513  * \since
1514  * v.1.0.4
1515  */
1516 template< typename Message >
1519  //! Agent for that this async operation will be created.
1520  ::so_5::agent_t & owner )
1521  {
1523  }
1524 
1525 } /* namespace time_limited */
1526 
1527 } /* namespace async_op */
1528 
1529 } /* namespace extra */
1530 
1531 } /* namespace so_5 */
void do_subscribe_timeout_handlers()
An implementation of subscription of timeout handlers.
void do_activation_actions()
Performs activation actions.
An object that allows to cancel async operation.
cancellation_point_t< Operation_Data > activate(std::chrono::steady_clock::duration timeout, Args &&...args) &
Activation of the async operation.
definition_point_t & reserve_completion_handlers_capacity(std::size_t capacity) &
Reserve a space for storage of completion handlers.
void drop_all_completion_handlers_subscriptions() noexcept
Remove subscriptions for all completion handlers.
::so_5::timer_id_t m_timeout_timer_id
An ID of timeout message/signal.
void timedout() noexcept
Mark the async operation as timedout.
void create_timeout_handlers_subscriptions()
Create subscriptions for all defined timeout handlers (including default handler).
msg_independent_part_of_definition_point_t & operator=(const msg_independent_part_of_definition_point_t &)=delete
completion_handler_subscription_t(so_5::mbox_t mbox, const so_5::state_t &state, std::type_index subscription_type, so_5::event_handler_method_t handler)
Initializing constructor.
auto activate(Args &&...args) &&
Just a proxy for the main version of activate.
timeout_handler_subscription_t(const so_5::state_t &state, so_5::event_handler_method_t handler)
Initializing constructor.
const ::so_5::mbox_t & timeout_mbox() const noexcept
Access to timeout mbox.
op_data_t(::so_5::outliving_reference_t< ::so_5::agent_t > owner, const std::type_index &timeout_msg_type)
Initializing constructor.
void completed() noexcept
Mark the async operation as completed.
void make_default_timeout_handler_if_necessary()
Creates the default timeout handler if it is not set by the user.
void cleanup() noexcept
Throw out a reference to the async operation data.
definition_point_t< Message > make(::so_5::agent_t &owner)
A factory for creation definition points of async operations.
::so_5::outliving_reference_t< const ::so_5::state_t > m_state
State for that a subscription should be created.
details::op_shptr_t< Operation_Data > m_op
Actual data for async op.
void drop_all_timeout_handlers_subscriptions() noexcept
Remove subscriptions for all timeout handlers (including the default timeout handler).
cancellation_point_t & operator=(const cancellation_point_t &)=delete
definition_point_t & operator=(const definition_point_t &)=delete
void cancel() noexcept
An attempt to cancel the async operation.
void reserve_completion_handlers_capacity(std::size_t capacity)
Reserve a space for storage of completion handlers.
void timeout_handler_impl(const ::so_5::state_t &state, Event_Handler &&evt_handler)
The actual implementation of addition of timeout handler.
void change_status(status_t status) noexcept
Change the status of async operation.
Ranges for error codes of each submodules.
Definition: details.hpp:13
status_t
Enumeration for status of operation.
std::vector< completion_handler_subscription_t > m_completion_handlers
Subscriptions for completion handlers which should be created on activation.
void cancelled() noexcept
Mark the async operation as cancelled.
void reserve_completion_handlers_capacity_impl(std::size_t capacity)
Reserve a space for storage of completion handlers.
definition_point_t & reserve_timeout_handlers_capacity(std::size_t capacity) &
Reserve a space for storage of timeout handlers.
cancellation_point_t(details::op_shptr_t< Operation_Data > op)
Initializing constructor to be used by definition_point.
const ::so_5::mbox_t m_timeout_mbox
A mbox to which timeout message/signal will be sent.
void do_unsubscribe_default_timeout_handler() noexcept
Actual unsubscription for the default timeout handler.
void do_subscribe_default_timeout_handler()
An implementation of subscription of default timeout handler.
definition_point_t && default_timeout_handler(Args &&...args) &&
Just a proxy for the main version of default_timeout_handler.
definition_point_t && completed_on(Args &&...args) &&
Just a proxy for the main version of completed_on.
void ensure_activable() const
Throws an exception if the async operation can&#39;t be activated.
msg_independent_part_of_definition_point_t(const msg_independent_part_of_definition_point_t &)=delete
definition_point_t(const definition_point_t &)=delete
status_t status() const noexcept
Get the status of the operation.
definition_point_t & timeout_handler(const ::so_5::state_t &state, Event_Handler &&evt_handler) &
Add a timeout handler for the async operation.
definition_point_t & operator=(definition_point_t &&)=default
auto reserve_completion_handlers_capacity(Args &&...args) &&
Just a proxy for actual version of reserve_completion_handlers_capacity.
void default_timeout_handler_impl(Event_Handler &&evt_handler)
The actual implementation of addition of the default timeout handler.
cancellation_point_t(cancellation_point_t &&)=default
definition_point_t & default_timeout_handler(Event_Handler &&evt_handler) &
Add the default timeout handler for the async operation.
Status of operation is unknown because the operation data has been moved to another proxy-object...
::so_5::agent_t & owner() noexcept
Access to owner of the async operation.
std::vector< timeout_handler_subscription_t > m_timeout_handlers
Subscriptions for timeout handlers which should be created on activation.
void add_completion_handler(const ::so_5::mbox_t &mbox, const ::so_5::state_t &state, const std::type_index msg_type, ::so_5::event_handler_method_t evt_handler)
Add a new completion handler for the async operation.
auto reserve_timeout_handlers_capacity(Args &&...args) &&
Just a proxy for actual version of reserve_timeout_handlers_capacity.
void completed_on_impl(Msg_Target &&msg_target, const ::so_5::state_t &state, Event_Handler &&evt_handler)
The actual implementation of addition of completion handler.
void add_timeout_handler(const ::so_5::state_t &state, ::so_5::event_handler_method_t evt_handler)
Add a new timeout handler for the async operation.
::so_5::event_handler_method_t m_default_timeout_handler
A default timeout handler which will be used as deadletter handler for timeout message/signal.
cancellation_point_t(const cancellation_point_t &)=delete
void reserve_timeout_handlers_capacity_impl(std::size_t capacity)
Reserve a space for storage of timeout handlers.
definition_point_t && timeout_handler(Args &&...args) &&
Just a proxy for the main version of timeout_handler.
const std::type_index m_timeout_msg_type
Type of timeout message.
const std::type_index & timeout_msg_type() const noexcept
Access to type of timeout message/signal.
msg_independent_part_of_definition_point_t & operator=(msg_independent_part_of_definition_point_t &&)=default
::so_5::event_handler_method_t create_actual_timeout_handler(Event_Handler &&evt_handler)
A helper method for creation a wrapper for a timeout handler.
::so_5::environment_t & environment() noexcept
Access to SObjectizer Environment in that the owner is working.
status_t current_status() const noexcept
Get the current status of async operation.
status_t m_status
The status of the async operation.
void drop_completion_handlers_subscriptions_up_to(std::size_t upper_border) noexcept
Removes subscription for the first N completion handlers.
void create_completion_handlers_subscriptions()
Create subscriptions for all defined completion handlers.
definition_point_t(::so_5::outliving_reference_t< ::so_5::agent_t > owner)
Initializing constructor.
definition_point_t & completed_on(Msg_Target &&msg_target, const ::so_5::state_t &state, Event_Handler &&evt_handler) &
Add a completion handler for the async operation.
msg_independent_part_of_definition_point_t(details::op_shptr_t< Operation_Data > op)
Initializing constructor.
::so_5::outliving_reference_t< const ::so_5::state_t > m_state
State for that a subscription should be created.
msg_independent_part_of_definition_point_t(msg_independent_part_of_definition_point_t &&)=default
Container of all data related to async operation.
bool is_cancellable() const noexcept
Can the async operation be cancelled via this cancellation point?
void setup_timer_id(::so_5::timer_id_t id)
Set the ID of timeout message/signal timer.
cancellation_point_t & operator=(cancellation_point_t &&)=default
void clean_with_status(status_t status)
Cleans the operation data and sets the status specified.
details::op_shptr_t< Operation_Data > m_op
Actual async operation data.
void do_cancellation_actions() noexcept
Performs total cleanup of the operation data.
void drop_timeout_handlers_subscriptions_up_to(std::size_t upper_border) noexcept
An implementation of unsubscription of the first N timeout handlers.
void default_timeout_handler(::so_5::event_handler_method_t evt_handler)
Set the default timeout handler.
::so_5::outliving_reference_t<::so_5::agent_t > m_owner
Owner of async operation.
void do_unsubscribe_timeout_handlers() noexcept
Actual unsubscription for timeout handlers.
void reserve_timeout_handlers_capacity(std::size_t capacity)
Reserve a space for storage of timeout handlers.