#include <so_5/all.hpp>
#include <sstream>
using namespace std::chrono_literals;
struct msg_acquisition_started final : public so_5::signal_t {};
struct msg_acquisition_finished final : public so_5::signal_t {};
struct msg_acquired_data final : public so_5::message_t
{
const std::string m_data;
explicit msg_acquired_data( std::string data )
: m_data{ std::move(data) }
{}
};
class status_listener final : public so_5::agent_t
{
public:
using so_5::agent_t::agent_t;
void so_define_agent() override
{
so_subscribe_self()
.event( [](mhood_t<msg_acquisition_started>) {
std::cout << "acquisition: started" << std::endl;
} )
.event( [](mhood_t<msg_acquisition_finished>) {
std::cout << "acquisition: finished" << std::endl;
} )
;
}
};
class data_consumer final : public so_5::agent_t
{
public:
using so_5::agent_t::agent_t;
void so_define_agent() override
{
so_subscribe_self().event( &data_consumer::evt_data );
}
private:
void evt_data( mhood_t<msg_acquired_data> cmd )
{
std::cout << "data received: '" << cmd->m_data << "'" << std::endl;
}
};
class data_producer final : public so_5::agent_t
{
struct msg_acquire final : public so_5::signal_t {};
const so_5::mbox_t m_data_mbox;
so_5::timer_id_t m_acquisition_timer;
int m_data_index{};
public:
data_producer( context_t ctx, so_5::mbox_t data_mbox )
:
so_5::agent_t{ std::move(ctx) }
, m_data_mbox{ std::move(data_mbox) }
{}
void so_define_agent() override
{
so_subscribe_self()
.event( &data_producer::evt_acquire )
;
}
void so_evt_start() override
{
m_acquisition_timer = so_5::send_periodic< msg_acquire >(
*this,
0ms,
100ms );
}
private:
void evt_acquire( mhood_t<msg_acquire> )
{
so_5::send< msg_acquisition_started >( m_data_mbox );
std::ostringstream data_ss;
data_ss << "index:" << m_data_index;
++m_data_index;
std::this_thread::sleep_for( 10ms );
so_5::send< msg_acquired_data >( m_data_mbox, data_ss.str() );
so_5::send< msg_acquisition_finished >( m_data_mbox );
}
};
int main()
{
so_5::launch( []( so_5::environment_t & env ) {
env.introduce_coop( []( so_5::coop_t & coop ) {
auto * listener = coop.make_agent< status_listener >();
auto * consumer = coop.make_agent< data_consumer >();
.
add< msg_acquisition_started >( listener->so_direct_mbox() )
.add< msg_acquisition_finished >( listener->so_direct_mbox() )
.add< msg_acquired_data >( consumer->so_direct_mbox() )
.
make( coop.environment() );
coop.make_agent_with_binder< data_producer >(
coop.environment() ).binder(),
data_mbox );
} );
std::this_thread::sleep_for( 500ms );
env.stop();
} );
}