#include <iostream>
#include <chrono>
using log_msg = std::string;
{
public:
std::cout << *cmd << std::endl;
} );
}
};
return logger->so_direct_mbox();
}
template< typename A >
{
}
struct msg_maker
{
std::ostringstream m_os;
template< typename A >
{
m_os << std::forward< A >(a);
return *this;
}
};
inline void operator<<=(
const so_5::mbox_t & to, msg_maker & maker )
{
to <<= maker.m_os.str();
}
struct request
{
std::string m_payload;
};
{
public :
producer( context_t ctx,
std::string name,
unsigned int requests )
, m_name(
std::move(name) )
, m_logger_mbox{
std::move(logger_mbox) }
, m_attempts_left{ requests }
{
5,
.empty_notificator(
} )
);
}
[[nodiscard]]
{
return m_target_mchain;
}
{
this >>= st_sending_enabled;
st_sending_enabled
.event( &producer::evt_send_next )
;
st_sending_blocked
.event( &producer::evt_send_next_when_blocked )
.event( &producer::evt_mchain_is_empty )
;
}
{
*this,
std::chrono::milliseconds::zero(),
std::chrono::milliseconds{ 20 });
}
private :
state_t st_sending_enabled{ this, "sending_enabled" };
state_t st_sending_blocked{ this, "sending_blocked" };
unsigned int m_attempts_left;
void evt_send_next(mhood_t< send_next >)
{
m_target_mchain,
m_name + "_request_" + std::to_string( m_attempts_left ) ),
[this]() {
m_logger_mbox <<= ( msg_maker() << m_name
<< ": message stored to target mbox" );
--m_attempts_left;
} ) );
if( !result.was_sent() )
{
m_logger_mbox <<= ( msg_maker{} << m_name
<< ": message is not sent because mchain is full" );
this >>= st_sending_blocked;
}
else
{
if( !m_attempts_left )
{
m_target_mchain );
}
}
}
void evt_send_next_when_blocked(mhood_t< send_next >)
{
m_logger_mbox <<= ( msg_maker{} << m_name
<< ": message can't be sent because mchain is full" );
}
void evt_mchain_is_empty( mhood_t<mchain_is_empty> )
{
m_logger_mbox <<= ( msg_maker{} << m_name
<< ": mchain_is_empty received" );
this >>= st_sending_enabled;
}
};
void run_example()
{
{
chain_to_use = env.introduce_coop(
logger_mbox = make_logger( coop );
"Alice",
logger_mbox,
20 );
return a_producer->target_mchain();
});
}
};
{
.handle_all()
.no_wait_on_empty(),
[&logger_mbox]( const request & req )
{
logger_mbox <<= "Bob: start handling of received request";
logger_mbox <<= ( msg_maker{} << "Bob: request payload: "
<< req.m_payload );
std::this_thread::sleep_for( std::chrono::milliseconds{ 75 } );
logger_mbox <<= "Bob: finish handling of received request";
}
}
int main()
{
try
{
run_example();
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}
A helper header file for including all public SObjectizer stuff.
const name_for_agent_t m_name
Optional name for the agent.
virtual void so_define_agent()
Hook on define agent for SObjectizer.
subscription_bind_t so_subscribe_self()
Initiate subscription to agent's direct mbox.
agent_t(environment_t &env)
Constructor.
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
environment_t & so_environment() const noexcept
Access to the SObjectizer Environment which this agent is belong.
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
environment_t & environment() const noexcept
Access to SO Environment for which cooperation is bound.
Agent * make_agent_with_binder(so_5::disp_binder_shptr_t binder, Args &&... args)
Helper method for simplification of agents creation and binding to the specified dispatcher.
Agent * make_agent(Args &&... args)
Helper method for simplification of agents creation.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
mchain_t create_mchain(const mchain_params_t ¶ms)
Create message chain.
A class for holding an instance of a message.
A base class for agent signals.
std::enable_if< details::is_agent_method_pointer< details::method_arity::unary, Method_Pointer >::value, subscription_bind_t & >::type event(Method_Pointer pfn, thread_safety_t thread_safety=not_thread_safe)
Make subscription to the message.
An indentificator for the timer.
void release() noexcept
Release the timer event.
static constexpr wait_init_completion_t wait_init_completion
Special indicator that tells that synchronous mode has to be used for calling init-function.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
status
Status of the message chain.
@ throw_exception
An exception must be thrown.
@ chain_closed
Message cannot be extracted because chain is closed.
@ no_messages
No available messages in the chain.
@ preallocated
Storage must be preallocated once and doesn't change after that.
std::ostream & operator<<(std::ostream &to, const prefix_t &what)
Just a helper operator.
Private part of message limit implementation.
mchain_receive_params_t< mchain_props::msg_count_status_t::undefined > from(mchain_t chain)
A helper function for simplification of creation of mchain_receive_params instance.
mchain_select_params_t< mchain_props::msg_count_status_t::undefined > from_all()
Helper function for creation of mchain_select_params instance with default values.
mchain_select_result_t select(const mchain_select_params_t< Msg_Count_Status > ¶ms, Cases &&... cases)
An advanced form of multi chain select.
timer_id_t send_periodic(Target &&target, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period, Args &&... args)
A utility function for creating and delivering a periodic message to the specified destination.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
mchain_props::select_case_unique_ptr_t send_case(mchain_t chain, message_holder_t< Msg, Ownership > msg, On_Success_Handler &&handler)
A helper for creation of select_case object for one send-case of a multi chain select.
constexpr exceptions_enabled_t exceptions_enabled
Value that indicates that exceptions are enabled.
void close_retain_content(Exceptions_Control exceptions_control, const mchain_t &ch) noexcept(noexcept(details::should_terminate_if_throws_t< Exceptions_Control >::value))
Helper function for closing a message chain with retaining all its content.
mchain_receive_result_t receive(const mchain_receive_params_t< Msg_Count_Status > ¶ms, Handlers &&... handlers)
Advanced version of receive from mchain.
mchain_params_t make_limited_without_waiting_mchain_params(std::size_t max_size, mchain_props::memory_usage_t memory_usage, mchain_props::overflow_reaction_t overflow_reaction)
Create parameters for size-limited mchain without waiting on overflow.