#include <so_5/all.hpp>
#include <vector>
using namespace std::chrono;
using data_sources_container_t = std::vector< so_5::mbox_t >;
class data_producer_t final : public so_5::agent_t
{
struct send_next_t final : public so_5::signal_t {};
public :
struct data_t final : public so_5::message_t
{
const std::string m_name;
int m_value;
data_t( std::string name, int value )
: m_name( std::move(name) )
, m_value( value )
{}
friend std::ostream & operator<<(std::ostream & to, const data_t & d)
{
return (to << "[" << d.m_name << "]=" << d.m_value);
}
};
data_producer_t(
context_t ctx,
milliseconds pause,
so_5::mbox_t distribution_mbox )
: so_5::agent_t( std::move(ctx) )
, m_name( "data_" + std::to_string(pause.count()) + "ms" )
, m_pause( pause )
, m_distribution_mbox( std::move(distribution_mbox) )
{
so_subscribe_self().event( &data_producer_t::on_send_next );
}
virtual void so_evt_start() override
{
m_timer = so_5::send_periodic<send_next_t>( *this, m_pause, m_pause );
}
private :
const std::string m_name;
const milliseconds m_pause;
const so_5::mbox_t m_distribution_mbox;
so_5::timer_id_t m_timer;
int m_value{ 0 };
void on_send_next( mhood_t<send_next_t> )
{
std::cout << m_name << ": produce next value: " << m_value << std::endl;
so_5::send< data_t >( m_distribution_mbox, m_name, m_value );
++m_value;
}
};
class data_listener_t final : public so_5::agent_t
{
struct finish_t final : public so_5::signal_t {};
public :
data_listener_t(
context_t ctx,
std::string name,
data_sources_container_t data_mboxes )
:
so_5::agent_t( std::move(ctx) )
, m_name( std::move(name) )
, m_data_mboxes( std::move(data_mboxes) )
{
so_subscribe_self().event( &data_listener_t::on_finish );
}
virtual void so_evt_start() override
{
for( const auto & mb : m_data_mboxes )
so_subscribe( mb ).event( &data_listener_t::on_data );
so_5::send_delayed< finish_t >( *this, 2s );
std::cout << "listener(" << m_name << ") started" << std::endl;
}
virtual void so_evt_finish() override
{
std::cout << "listener(" << m_name << ") finished" << std::endl;
}
private :
const std::string m_name;
const data_sources_container_t m_data_mboxes;
void on_finish( mhood_t<finish_t> )
{
so_deregister_agent_coop_normally();
}
void on_data( mhood_t<data_producer_t::data_t> cmd )
{
std::cout << "listener(" << m_name << ") data received: "
<< *cmd << std::endl;
}
};
class example_manager_t final : public so_5::agent_t
{
struct make_listener_t final : public so_5::message_t
{
const std::string m_name;
make_listener_t( std::string name ) : m_name( std::move(name) ) {}
};
struct finish_t final : public so_5::signal_t {};
public :
example_manager_t(
context_t ctx,
data_sources_container_t data_mboxes )
:
so_5::agent_t( std::move(ctx) )
, m_data_mboxes( std::move(data_mboxes) )
{
so_subscribe_self()
.event( &example_manager_t::on_make_listener )
.event( &example_manager_t::on_finish );
}
virtual void so_evt_start() override
{
so_5::introduce_child_coop( *this,
[&]( so_5::coop_t & coop ) {
auto current_pause = 215ms;
for( const auto & mb : m_data_mboxes )
{
coop.make_agent< data_producer_t >( current_pause, mb );
current_pause += 175ms;
}
} );
so_5::send_delayed< make_listener_t >( *this,
500ms,
"first" );
so_5::send_delayed< make_listener_t >( *this,
1000ms,
"second" );
so_5::send_delayed< make_listener_t >( *this,
1500ms,
"third" );
so_5::send_delayed< finish_t >( *this, 4s );
}
private :
const data_sources_container_t m_data_mboxes;
void on_make_listener( mhood_t<make_listener_t> cmd )
{
so_5::introduce_child_coop( *this,
[&]( so_5::coop_t & coop ) {
coop.make_agent< data_listener_t >( cmd->m_name, m_data_mboxes );
} );
}
void on_finish( mhood_t<finish_t> )
{
so_deregister_agent_coop_normally();
}
};
int main()
{
so_5::launch( []( so_5::environment_t & env ) {
env.introduce_coop( [&]( so_5::coop_t & coop ) {
data_sources_container_t mboxes{
coop.make_agent< example_manager_t >( std::move(mboxes) );
} );
} );
return 0;
}
Ranges for error codes of each submodules.
Implementation of mbox which holds last sent message.