#include <iostream>
namespace example
{
struct operation_header_t
{
int m_id;
std::string m_header_data;
operation_header_t( int id, std::string header_data )
: m_id{ id }
, m_header_data{ std::move(header_data) }
{}
};
struct operation_payload_t
{
std::string m_payload_data;
explicit operation_payload_t( std::string payload_data )
: m_payload_data{ std::move(payload_data) }
{}
};
struct operation_t
{
operation_header_t m_header;
operation_payload_t m_payload;
operation_t(
int id,
std::string header_data,
std::string payload_data )
: m_header{ id, std::move(header_data) }
, m_payload{ std::move(payload_data) }
{}
};
struct handle_payload_t
{
int m_id;
std::string m_data;
handle_payload_t( const operation_t & op )
: m_id{ op.m_header.m_id }
, m_data{ op.m_payload.m_payload_data }
{}
};
{
public:
void so_define_agent() override
{
so_subscribe_self().event( [](mhood_t<operation_header_t> cmd) {
std::cout << "registering OP: " << cmd->m_id << " '"
<< cmd->m_header_data << "'" << std::endl;
} );
}
};
{
const std::string m_processor_name;
const std::chrono::milliseconds m_processing_time;
public:
a_op_processor_t(
context_t ctx,
std::string processor_name,
std::chrono::milliseconds processing_time )
, m_processor_name{ std::move(processor_name) }
, m_processing_time{ processing_time }
{}
void so_define_agent() override
{
so_subscribe_self().event(
[this](mutable_mhood_t<handle_payload_t> cmd) {
std::cout << m_processor_name << " processing started. OP: "
<< cmd->m_id << " '" << cmd->m_data << "'" << std::endl;
std::this_thread::sleep_for( m_processing_time );
std::cout << m_processor_name << " processing finished. OP: "
<< cmd->m_id << std::endl;
} );
}
};
{
const std::string m_initiator_name;
const std::chrono::milliseconds m_generation_period;
int m_current_id;
public:
a_op_initiator_t(
context_t ctx,
std::string initiator_name,
int base_id,
std::chrono::milliseconds generation_period )
, m_destination{ std::move(destination) }
, m_initiator_name{ std::move(initiator_name) }
, m_generation_period{ generation_period }
, m_current_id{ base_id }
{}
void so_define_agent() override
{
so_subscribe_self().event( &a_op_initiator_t::evt_time_to_generate );
}
void so_evt_start() override
{
*this,
m_generation_period,
m_generation_period );
}
private:
void evt_time_to_generate( mhood_t<msg_time_to_generate> )
{
m_destination,
m_current_id++,
"from: " + m_initiator_name,
"data generated by: " + m_initiator_name );
}
};
struct distribution_data_t
{
static constexpr std::size_t handler_count = 3;
using mbox_array_t = std::array< so_5::mbox_t, handler_count >;
mbox_array_t m_destinations;
std::atomic< std::size_t > m_current{ 0u };
explicit distribution_data_t( mbox_array_t destinations )
: m_destinations{ destinations }
{}
};
{
destination,
"Robert",
0,
std::chrono::milliseconds{ 125 } );
destination,
"Garry",
1'000'000,
std::chrono::milliseconds{ 210 } );
coop.
make_agent< a_op_registrator_t >()->so_direct_mbox();
auto distribution_data = std::make_shared< distribution_data_t >(
distribution_data_t::mbox_array_t{
"Alice",
std::chrono::milliseconds{ 150 } )->so_direct_mbox(),
"Bob",
std::chrono::milliseconds{ 250 } )->so_direct_mbox(),
"Eve",
std::chrono::milliseconds{ 200 } )->so_direct_mbox()
} );
[registrator_mbox]( const operation_t & msg ) {
registrator_mbox,
msg.m_header );
} );
[distribution_data]( const operation_t & msg ) {
const auto index = ( ++(distribution_data->m_current) ) %
distribution_data->m_destinations.size();
distribution_data->m_destinations[ index ],
msg );
} );
} );
}
}
int main()
{
using namespace example;
try
{
{
make_coop( env );
std::this_thread::sleep_for( std::chrono::seconds{ 2 } );
},
{
params.message_delivery_tracer(
} );
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}
A helper header file for including all public SObjectizer stuff.
agent_t(environment_t &env)
Constructor.
environment_t & environment() const noexcept
Access to SO Environment for which cooperation is bound.
Agent * make_agent(Args &&... args)
Helper method for simplification of agents creation.
T * take_under_control(std::unique_ptr< T > resource)
Take a user resouce under cooperation control.
Parameters for the SObjectizer Environment initialization.
mbox_t create_mbox()
Create an anonymous MPMC mbox.
void stop() noexcept
Send a shutdown signal to the Run-Time.
decltype(auto) introduce_coop(Args &&... args)
Helper method for simplification of cooperation creation and registration.
Helper class for managing multiple sink bindings.
A base class for agent signals.
An indentificator for the timer.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_obj dispatcher.
SO_5_FUNC tracer_unique_ptr_t std_cerr_tracer()
Factory for tracer which uses std::cerr stream.
void launch(Init_Routine &&init_routine)
Launch a SObjectizer Environment with default parameters.
timer_id_t send_periodic(Target &&target, std::chrono::steady_clock::duration pause, std::chrono::steady_clock::duration period, Args &&... args)
A utility function for creating and delivering a periodic message to the specified destination.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
transformed_message_t< Msg > make_transformed(mbox_t mbox, Args &&... args)
Helper function for creation of an instance of transformed_message_t.
void bind_transformer(Binding &binding, const so_5::mbox_t &src_mbox, Transformer &&transformer)
Helper function to add transform_then_redirect msink to a binding object.