SObjectizer 5.8
Loading...
Searching...
No Matches
so_5/producer_consumer_mchain/main.cpp
/*
* An example of using mchain for solving producer/consumer problem.
*
* Several producers will send requests to the single consumer. All requests
* will be sent to size-limited mchain with a timeout on overflow.
* This timeout will slowdown producers.
*
* The consumer will periodically process requests from mchain and sends
* replies back. A very simple not-empty notificator will be used for
* mchain to inform the consumer about presense of new requests.
*/
#include <iostream>
#include <chrono>
#include <random>
#include <so_5/all.hpp>
using steady_clock = std::chrono::steady_clock;
//
// Stuff for logging.
//
using log_msg = std::string;
so_5::mbox_t make_logger( so_5::coop_t & coop )
{
class logger_actor final : public so_5::agent_t {
public:
void so_define_agent() override {
so_subscribe_self().event( [](mhood_t<log_msg> cmd ) {
std::cout << *cmd << std::endl;
} );
}
};
// Logger will work on its own working thread.
auto logger = coop.make_agent_with_binder< logger_actor >(
coop.environment() ).binder() );
return logger->so_direct_mbox();
}
template< typename A >
inline void operator<<=( const so_5::mbox_t & to, A && a )
{
so_5::send< log_msg >( to, std::forward< A >(a) );
}
struct msg_maker
{
std::ostringstream m_os;
template< typename A >
msg_maker & operator<<( A && a )
{
m_os << std::forward< A >(a);
return *this;
}
};
inline void operator<<=( const so_5::mbox_t & to, msg_maker & maker )
{
to <<= maker.m_os.str();
}
//
// Implementation of example shutdowner.
//
/*
* Shutdowner will shutdown the SObjectizer Environment when all
* producers send finish signals.
*/
class shutdowner final : public so_5::agent_t
{
struct another_producer_finished final : public so_5::signal_t {};
public :
shutdowner( context_t ctx, unsigned int producers_count )
: so_5::agent_t{ ctx }
, m_producers_left{ producers_count }
{
so_subscribe( so_environment().create_mbox( "shutdowner" ) )
.event( [this](mhood_t< another_producer_finished >) {
--m_producers_left;
if( !m_producers_left )
} );
}
static void producer_finished( const so_5::agent_t & p )
{
p.so_environment().create_mbox( "shutdowner" ) );
}
private :
unsigned int m_producers_left;
};
//
// Implementation of producers.
//
// A request to be sent for processing.
struct request
{
so_5::mbox_t m_who;
std::string m_payload;
};
// A repsonse from consumer.
struct reply
{
std::string m_payload;
};
/*
* Producer agent will send N requests and then initiate finish signal.
*/
class producer final : public so_5::agent_t
{
// This signal allows to send next request for consumer.
struct send_next final : public so_5::signal_t {};
public :
producer( context_t ctx,
std::string name,
so_5::mbox_t logger_mbox,
so_5::mbox_t consumer_mbox,
unsigned int requests )
: so_5::agent_t{ ctx }
, m_name( std::move(name) )
, m_logger_mbox{ std::move(logger_mbox) }
, m_consumer_mbox{ std::move(consumer_mbox) }
, m_requests_left{ requests }
{
.event( &producer::evt_reply )
.event( &producer::evt_send_next );
}
void so_evt_start() override
{
// Initiate request sending loop.
}
private :
const std::string m_name;
const so_5::mbox_t m_logger_mbox;
const so_5::mbox_t m_consumer_mbox;
unsigned int m_requests_left;
// An event for next attempt to send another requests.
void evt_send_next(mhood_t< send_next >)
{
if( m_requests_left )
{
// Send can wait on full mchain. Mark the start time to
// calculate send call duration later.
const auto started_at = steady_clock::now();
try
{
// Send another request.
// Note: this call can wait on full mchain.
so_5::send< request >( m_consumer_mbox,
m_name + "_request_" + std::to_string( m_requests_left ) );
// How much time the send take?
const auto ms = std::chrono::duration_cast<
std::chrono::milliseconds >( steady_clock::now()
- started_at ).count();
m_logger_mbox <<= msg_maker() << m_name << ": request sent in "
<< ms << "ms";
}
catch( const so_5::exception_t & ex )
{
// Log the reason of send request failure.
m_logger_mbox <<= msg_maker() << m_name << ": request NOT SENT, "
<< ex.what();
// Initiate next send attempt.
}
}
else
// No more requests to send. Shutdowner must known about it.
shutdowner::producer_finished( *this );
}
void evt_reply( const reply & msg )
{
m_logger_mbox <<= msg_maker() << m_name << ": reply received, "
<< msg.m_payload;
--m_requests_left;
}
};
//
// Implementation of consumer.
//
class consumer final : public so_5::agent_t
{
// This signal will be sent by not_empty_notificator when
// the first message is stored to the empty mchain.
struct chain_has_requests final : public so_5::signal_t {};
public :
consumer( context_t ctx, so_5::mbox_t logger_mbox )
: so_5::agent_t{ ctx + limit_then_drop< chain_has_requests >(1) }
, m_logger_mbox{ std::move(logger_mbox) }
{
// Appropriate mchain must be created.
// No more than 10 requests in the chain.
10,
// Preallocated storage for the chain.
// Throw an exception on overload.
// Wait no more than 0.5s on overflow.
std::chrono::milliseconds(150) )
// A custom notificator must be used for chain.
.not_empty_notificator( [this] {
} ) );
&consumer::process_requests );
}
// A mchain will look like an ordinary mbox from outside of consumer.
so_5::mbox_t consumer_mbox() const
{
return m_chain->as_mbox();
}
private :
const so_5::mbox_t m_logger_mbox;
so_5::mchain_t m_chain;
void process_requests(mhood_t< chain_has_requests >)
{
auto r = receive(
// Handle no more than 5 requests at once.
// No wait if queue is empty.
from( m_chain ).handle_n( 5 ).no_wait_on_empty(),
[]( const request & req ) {
// Imitation of some hardwork before sending a reply back.
std::this_thread::sleep_for( random_pause() );
so_5::send< reply >( req.m_who, req.m_payload + "#handled" );
} );
m_logger_mbox <<= msg_maker()
<< "=== " << r.handled() << " request(s) handled";
if( !m_chain->empty() )
// Not all messages from chain have been processed.
// Initiate new processing by sending the signal to itself.
}
static std::chrono::milliseconds
random_pause()
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::chrono::milliseconds(
std::uniform_int_distribution< unsigned int >{2u, 25u}(gen) );
}
};
void fill_demo_coop( so_5::coop_t & coop )
{
const unsigned int producers = 40;
// Shutdowner will work on the default dispatcher.
coop.make_agent_with_binder< shutdowner >(
producers );
// Logger will work on its own context.
const auto logger_mbox = make_logger( coop );
// Consumer agent must be created first.
// It will work on its own working thread.
auto consumer_mbox = coop.make_agent_with_binder< consumer >(
coop.environment() ).binder(),
logger_mbox )->consumer_mbox();
// All producers will work on thread pool dispatcher.
namespace tp_disp = so_5::disp::thread_pool;
auto disp = tp_disp::make_dispatcher( coop.environment() );
// All agents on this dispatcher will have independent event queues.
const auto bind_params = tp_disp::bind_params_t{}
.fifo( tp_disp::fifo_t::individual )
.max_demands_at_once( 1 );
// All producers can be created now.
for( unsigned int i = 0; i != producers; ++i )
coop.make_agent_with_binder< producer >(
disp.binder( bind_params ),
"producer-" + std::to_string( i + 1 ),
logger_mbox,
consumer_mbox,
10u );
}
int main()
{
try
{
env.introduce_coop( fill_demo_coop );
} );
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}
A helper header file for including all public SObjectizer stuff.
virtual bool empty() const =0
Is message chain empty?
A base class for agents.
Definition agent.hpp:673
const name_for_agent_t m_name
Optional name for the agent.
Definition agent.hpp:2948
virtual void so_define_agent()
Hook on define agent for SObjectizer.
Definition agent.cpp:841
subscription_bind_t so_subscribe_self()
Initiate subscription to agent's direct mbox.
Definition agent.hpp:1416
agent_t(environment_t &env)
Constructor.
Definition agent.cpp:646
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
Definition agent.cpp:762
environment_t & so_environment() const noexcept
Access to the SObjectizer Environment which this agent is belong.
Definition agent.cpp:853
subscription_bind_t so_subscribe(const mbox_t &mbox_ref)
Initiate subscription.
Definition agent.hpp:1359
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
Definition agent.cpp:701
Agent cooperation.
Definition coop.hpp:389
environment_t & environment() const noexcept
Access to SO Environment for which cooperation is bound.
Definition coop.hpp:453
Agent * make_agent_with_binder(so_5::disp_binder_shptr_t binder, Args &&... args)
Helper method for simplification of agents creation and binding to the specified dispatcher.
Definition coop.hpp:827
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
SObjectizer Environment.
mbox_t create_mbox()
Create an anonymous MPMC mbox.
mchain_t create_mchain(const mchain_params_t &params)
Create message chain.
void stop() noexcept
Send a shutdown signal to the Run-Time.
decltype(auto) introduce_coop(Args &&... args)
Helper method for simplification of cooperation creation and registration.
The base class for all SObjectizer exceptions.
Definition exception.hpp:34
A base class for agent signals.
Definition message.hpp:275
std::enable_if< details::is_agent_method_pointer< details::method_arity::unary, Method_Pointer >::value, subscription_bind_t & >::type event(Method_Pointer pfn, thread_safety_t thread_safety=not_thread_safe)
Make subscription to the message.
Definition agent.hpp:3490
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
Thread pool dispatcher.
@ throw_exception
An exception must be thrown.
@ preallocated
Storage must be preallocated once and doesn't change after that.
std::ostream & operator<<(std::ostream &to, const prefix_t &what)
Just a helper operator.
Definition prefix.hpp:139
Private part of message limit implementation.
Definition agent.cpp:33
void launch(Init_Routine &&init_routine)
Launch a SObjectizer Environment with default parameters.
Definition api.hpp:142
mchain_receive_params_t< mchain_props::msg_count_status_t::undefined > from(mchain_t chain)
A helper function for simplification of creation of mchain_receive_params instance.
Definition mchain.hpp:1540
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
disp_binder_shptr_t make_default_disp_binder(environment_t &env)
Create an instance of the default dispatcher binder.
mchain_receive_result_t receive(const mchain_receive_params_t< Msg_Count_Status > &params, Handlers &&... handlers)
Advanced version of receive from mchain.
Definition mchain.hpp:1828
mchain_params_t make_limited_with_waiting_mchain_params(std::size_t max_size, mchain_props::memory_usage_t memory_usage, mchain_props::overflow_reaction_t overflow_reaction, mchain_props::duration_t wait_timeout)
Create parameters for size-limited mchain with waiting on overflow.
Definition mchain.hpp:890