SObjectizer  5.8
Loading...
Searching...
No Matches
so_5/mchain_empty_notificator/main.cpp
/*
* An example of using mchain with empty_notificator.
*
* There is a producer that tries to send a message to a mchain, but
* only if the mchain is empty. The emptyness of the mchain is detected
* by a empty_notificator.
*/
#include <iostream>
#include <chrono>
#include <so_5/all.hpp>
//
// Stuff for logging.
//
using log_msg = std::string;
so_5::mbox_t make_logger( so_5::coop_t & coop )
{
class logger_actor final : public so_5::agent_t {
public:
void so_define_agent() override {
so_subscribe_self().event( [](mhood_t<log_msg> cmd ) {
std::cout << *cmd << std::endl;
} );
}
};
// Logger will work on its own working thread.
auto logger = coop.make_agent_with_binder< logger_actor >(
coop.environment() ).binder() );
return logger->so_direct_mbox();
}
template< typename A >
inline void operator<<=( const so_5::mbox_t & to, A && a )
{
so_5::send< log_msg >( to, std::forward< A >(a) );
}
struct msg_maker
{
std::ostringstream m_os;
template< typename A >
msg_maker & operator<<( A && a )
{
m_os << std::forward< A >(a);
return *this;
}
};
inline void operator<<=( const so_5::mbox_t & to, msg_maker & maker )
{
to <<= maker.m_os.str();
}
//
// Implementation of producers.
//
// A request to be sent for processing.
struct request
{
std::string m_payload;
};
/*
* Producer agent will send N requests and then closes mchain.
*/
class producer final : public so_5::agent_t
{
// This signal initiates next send attempt.
struct send_next final : public so_5::signal_t {};
// This signal indicates that the mchain is empty.
struct mchain_is_empty final : public so_5::signal_t {};
public :
producer( context_t ctx,
std::string name,
so_5::mbox_t logger_mbox,
unsigned int requests )
: so_5::agent_t{ ctx }
, m_name( std::move(name) )
, m_logger_mbox{ std::move(logger_mbox) }
, m_attempts_left{ requests }
{
// Create the target mchain.
m_target_mchain = so_environment().create_mchain(
// Just one message inside, there is no need for more.
1,
.empty_notificator(
[m = so_direct_mbox()]() {
// Tell the agent that the mchain is empty now.
} )
);
}
// Get the target mchain.
[[nodiscard]]
so_5::mchain_t target_mchain() const
{
return m_target_mchain;
}
void so_define_agent() override
{
.event( &producer::evt_send_next )
.event( &producer::evt_mchain_is_empty )
;
}
void so_evt_start() override
{
// Initiate request sending loop.
}
private :
const std::string m_name;
const so_5::mbox_t m_logger_mbox;
// How many attempts remains.
unsigned int m_attempts_left;
// The target mchain to be used.
so_5::mchain_t m_target_mchain;
// Indicator of mchain's emptiness.
// The mchain is empty at the beginning.
bool m_target_is_empty = true;
// An event for next attempt to send another requests.
void evt_send_next(mhood_t< send_next >)
{
if( m_target_is_empty )
{
m_logger_mbox <<= ( msg_maker() << m_name
<< ": sending a message..." );
// Assume that mchain won't be empty after the sent.
m_target_is_empty = false;
m_target_mchain,
m_name + "_request_" + std::to_string( m_attempts_left ) );
}
else
{
m_logger_mbox <<= ( msg_maker{} << m_name
<< ": message is not sent because mchain is full" );
}
--m_attempts_left;
if( m_attempts_left )
{
// Next try after a timeout.
*this,
std::chrono::milliseconds{ 50 } );
}
else
{
// Nothing to do, it's time to close the target mchain.
m_target_mchain );
m_logger_mbox <<= ( msg_maker{} << m_name
<< ": target mchain is closed" );
}
}
void evt_mchain_is_empty( mhood_t<mchain_is_empty> )
{
m_logger_mbox <<= ( msg_maker{} << m_name
<< ": mchain_is_empty received" );
m_target_is_empty = true;
}
};
void run_example()
{
so_5::mbox_t logger_mbox;
so_5::mchain_t chain_to_use;
// Launch SObjectizer without blocking the current thread.
so_5::wrapped_env_t sobjectizer{
[&logger_mbox, &chain_to_use]( so_5::environment_t & env )
{
chain_to_use = env.introduce_coop(
[&logger_mbox]( so_5::coop_t & coop ) {
// Logger will work on its own context.
logger_mbox = make_logger( coop );
auto * a_producer = coop.make_agent< producer >(
"Alice",
logger_mbox,
5 );
return a_producer->target_mchain();
});
}
};
// Loop for reading messages from chain_to_use.
{
// Take a pause.
std::this_thread::sleep_for( std::chrono::milliseconds{ 75 } );
// Try to get a message from the chain.
so_5::from( chain_to_use )
.handle_all()
.no_wait_on_empty(),
[&logger_mbox]( const request & req )
{
logger_mbox <<= "Bob: start handling of received request";
logger_mbox <<= ( msg_maker{} << "Bob: request payload: "
<< req.m_payload );
logger_mbox <<= "Bob: finish handling of received request";
} ).status();
}
// SObjectizer will be shutdown automatically.
}
int main()
{
try
{
run_example();
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}
A helper header file for including all public SObjectizer stuff.
A base class for agents.
Definition agent.hpp:673
const name_for_agent_t m_name
Optional name for the agent.
Definition agent.hpp:2999
virtual void so_define_agent()
Hook on define agent for SObjectizer.
Definition agent.cpp:975
subscription_bind_t so_subscribe_self()
Initiate subscription to agent's direct mbox.
Definition agent.hpp:1461
agent_t(environment_t &env)
Constructor.
Definition agent.cpp:775
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
Definition agent.cpp:887
environment_t & so_environment() const noexcept
Access to the SObjectizer Environment which this agent is belong.
Definition agent.cpp:987
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
Definition agent.cpp:832
Agent cooperation.
Definition coop.hpp:389
environment_t & environment() const noexcept
Access to SO Environment for which cooperation is bound.
Definition coop.hpp:453
Agent * make_agent_with_binder(so_5::disp_binder_shptr_t binder, Args &&... args)
Helper method for simplification of agents creation and binding to the specified dispatcher.
Definition coop.hpp:827
Agent * make_agent(Args &&... args)
Helper method for simplification of agents creation.
Definition coop.hpp:792
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
SObjectizer Environment.
mchain_t create_mchain(const mchain_params_t &params)
Create message chain.
A base class for agent signals.
Definition message.hpp:275
std::enable_if< details::is_agent_method_pointer< details::method_arity::unary, Method_Pointer >::value, subscription_bind_t & >::type event(Method_Pointer pfn, thread_safety_t thread_safety=not_thread_safe)
Make subscription to the message.
Definition agent.hpp:3657
A wrapped environment.
static constexpr wait_init_completion_t wait_init_completion
Special indicator that tells that synchronous mode has to be used for calling init-function.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
status
Status of the message chain.
@ throw_exception
An exception must be thrown.
@ chain_closed
Message cannot be extracted because chain is closed.
@ no_messages
No available messages in the chain.
@ preallocated
Storage must be preallocated once and doesn't change after that.
std::ostream & operator<<(std::ostream &to, const prefix_t &what)
Just a helper operator.
Definition prefix.hpp:139
Private part of message limit implementation.
Definition agent.cpp:33
mchain_receive_params_t< mchain_props::msg_count_status_t::undefined > from(mchain_t chain)
A helper function for simplification of creation of mchain_receive_params instance.
Definition mchain.hpp:1595
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
void send_delayed(Target &&target, std::chrono::steady_clock::duration pause, Args &&... args)
A utility function for creating and delivering a delayed message to the specified destination.
constexpr exceptions_enabled_t exceptions_enabled
Value that indicates that exceptions are enabled.
void close_retain_content(Exceptions_Control exceptions_control, const mchain_t &ch) noexcept(noexcept(details::should_terminate_if_throws_t< Exceptions_Control >::value))
Helper function for closing a message chain with retaining all its content.
Definition mchain.hpp:720
mchain_receive_result_t receive(const mchain_receive_params_t< Msg_Count_Status > &params, Handlers &&... handlers)
Advanced version of receive from mchain.
Definition mchain.hpp:1883
mchain_params_t make_limited_without_waiting_mchain_params(std::size_t max_size, mchain_props::memory_usage_t memory_usage, mchain_props::overflow_reaction_t overflow_reaction)
Create parameters for size-limited mchain without waiting on overflow.
Definition mchain.hpp:883
STL namespace.