#include <iostream>
#include <chrono>
#include <random>
using steady_clock = std::chrono::steady_clock;
using log_msg = std::string;
{
public:
std::cout << *cmd << std::endl;
} );
}
};
return logger->so_direct_mbox();
}
template< typename A >
{
}
struct msg_maker
{
std::ostringstream m_os;
template< typename A >
{
m_os << std::forward< A >(a);
return *this;
}
};
inline void operator<<=(
const so_5::mbox_t & to, msg_maker & maker )
{
to <<= maker.m_os.str();
}
{
public :
shutdowner( context_t ctx, unsigned int producers_count )
, m_producers_left{ producers_count }
{
.
event( [
this](mhood_t< another_producer_finished >) {
--m_producers_left;
if( !m_producers_left )
} );
}
{
}
private :
unsigned int m_producers_left;
};
struct request
{
std::string m_payload;
};
struct reply
{
std::string m_payload;
};
{
public :
producer( context_t ctx,
std::string name,
unsigned int requests )
, m_name( std::move(name) )
, m_logger_mbox{ std::move(logger_mbox) }
, m_consumer_mbox{ std::move(consumer_mbox) }
, m_requests_left{ requests }
{
.
event( &producer::evt_reply )
.event( &producer::evt_send_next );
}
{
}
private :
unsigned int m_requests_left;
void evt_send_next(mhood_t< send_next >)
{
if( m_requests_left )
{
const auto started_at = steady_clock::now();
try
{
m_name + "_request_" + std::to_string( m_requests_left ) );
const auto ms = std::chrono::duration_cast<
std::chrono::milliseconds >( steady_clock::now()
- started_at ).count();
m_logger_mbox <<= msg_maker() << m_name << ": request sent in "
<< ms << "ms";
}
{
m_logger_mbox <<= msg_maker() << m_name << ": request NOT SENT, "
<< ex.what();
}
}
else
shutdowner::producer_finished( *this );
}
void evt_reply( const reply & msg )
{
m_logger_mbox <<= msg_maker() << m_name << ": reply received, "
<< msg.m_payload;
--m_requests_left;
}
};
{
public :
:
so_5::agent_t{ ctx + limit_then_drop< chain_has_requests >(1) }
, m_logger_mbox{ std::move(logger_mbox) }
{
10,
std::chrono::milliseconds(150) )
.not_empty_notificator( [this] {
} ) );
&consumer::process_requests );
}
{
return m_chain->as_mbox();
}
private :
void process_requests(mhood_t< chain_has_requests >)
{
from( m_chain ).handle_n( 5 ).no_wait_on_empty(),
[]( const request & req ) {
std::this_thread::sleep_for( random_pause() );
} );
m_logger_mbox <<= msg_maker()
<< "=== " << r.handled() << " request(s) handled";
}
static std::chrono::milliseconds
random_pause()
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::chrono::milliseconds(
std::uniform_int_distribution< unsigned int >{2u, 25u}(gen) );
}
};
{
const unsigned int producers = 40;
producers );
const auto logger_mbox = make_logger( coop );
logger_mbox )->consumer_mbox();
auto disp = tp_disp::make_dispatcher( coop.
environment() );
const auto bind_params = tp_disp::bind_params_t{}
.fifo( tp_disp::fifo_t::individual )
.max_demands_at_once( 1 );
for( unsigned int i = 0; i != producers; ++i )
disp.binder( bind_params ),
"producer-" + std::to_string( i + 1 ),
logger_mbox,
consumer_mbox,
10u );
}
int main()
{
try
{
} );
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}
A helper header file for including all public SObjectizer stuff.
virtual bool empty() const =0
Is message chain empty?
const name_for_agent_t m_name
Optional name for the agent.
virtual void so_define_agent()
Hook on define agent for SObjectizer.
subscription_bind_t so_subscribe_self()
Initiate subscription to agent's direct mbox.
agent_t(environment_t &env)
Constructor.
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
environment_t & so_environment() const noexcept
Access to the SObjectizer Environment which this agent is belong.
subscription_bind_t so_subscribe(const mbox_t &mbox_ref)
Initiate subscription.
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
environment_t & environment() const noexcept
Access to SO Environment for which cooperation is bound.
Agent * make_agent_with_binder(so_5::disp_binder_shptr_t binder, Args &&... args)
Helper method for simplification of agents creation and binding to the specified dispatcher.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
mbox_t create_mbox()
Create an anonymous MPMC mbox.
mchain_t create_mchain(const mchain_params_t ¶ms)
Create message chain.
void stop() noexcept
Send a shutdown signal to the Run-Time.
decltype(auto) introduce_coop(Args &&... args)
Helper method for simplification of cooperation creation and registration.
The base class for all SObjectizer exceptions.
A base class for agent signals.
std::enable_if< details::is_agent_method_pointer< details::method_arity::unary, Method_Pointer >::value, subscription_bind_t & >::type event(Method_Pointer pfn, thread_safety_t thread_safety=not_thread_safe)
Make subscription to the message.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
@ throw_exception
An exception must be thrown.
@ preallocated
Storage must be preallocated once and doesn't change after that.
std::ostream & operator<<(std::ostream &to, const prefix_t &what)
Just a helper operator.
Private part of message limit implementation.
void launch(Init_Routine &&init_routine)
Launch a SObjectizer Environment with default parameters.
mchain_receive_params_t< mchain_props::msg_count_status_t::undefined > from(mchain_t chain)
A helper function for simplification of creation of mchain_receive_params instance.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
disp_binder_shptr_t make_default_disp_binder(environment_t &env)
Create an instance of the default dispatcher binder.
mchain_receive_result_t receive(const mchain_receive_params_t< Msg_Count_Status > ¶ms, Handlers &&... handlers)
Advanced version of receive from mchain.
mchain_params_t make_limited_with_waiting_mchain_params(std::size_t max_size, mchain_props::memory_usage_t memory_usage, mchain_props::overflow_reaction_t overflow_reaction, mchain_props::duration_t wait_timeout)
Create parameters for size-limited mchain with waiting on overflow.