#include <chrono>
#include <cstdlib>
#include <ctime>
#include <deque>
#include <iostream>
#include <sstream>
#include <string>
#include <random>
struct log_message
{
std::string m_what;
};
{
public :
a_logger_t( context_t ctx )
+ limit_then_abort< log_message >( 100 ) )
, m_started_at( std::chrono::steady_clock::now() )
{}
{
[this]( const log_message & evt ) {
std::cout << "[+" << time_delta()
<< "] -- " << evt.m_what << std::endl;
} );
}
private :
const std::chrono::steady_clock::time_point m_started_at;
std::string time_delta() const
{
auto now = std::chrono::steady_clock::now();
std::ostringstream ss;
ss << double(
std::chrono::duration_cast< std::chrono::milliseconds >(
now - m_started_at ).count()
) / 1000.0 << "ms";
return ss.str();
}
};
{
public :
a_stats_listener_t(
context_t ctx,
, m_logger( std::move( logger ) )
{}
{
auto & controller = so_environment().stats_controller();
so_set_delivery_filter(
controller.mbox(),
} );
so_default_state()
.event(
controller.mbox(),
&a_stats_listener_t::evt_quantity )
.event( controller.mbox(),
so_5::send< log_message >( m_logger, "--- DISTRIBUTION STARTED ---" );
} )
.event( controller.mbox(),
so_5::send< log_message >( m_logger, "--- DISTRIBUTION FINISHED ---" );
} );
}
{
std::chrono::milliseconds( 330 ) );
}
private :
void evt_quantity(
{
std::ostringstream ss;
so_5::send< log_message >( m_logger, ss.str() );
}
};
{
public :
a_generator_t(
context_t ctx,
std::vector< so_5::mbox_t > workers )
, m_logger( std::move( logger ) )
, m_workers( std::move( workers ) )
, m_turn_pause( 600 )
{}
void so_define_agent() override
{
so_default_state()
.event( &a_generator_t::evt_next_turn );
}
void so_evt_start() override
{
so_5::send< msg_next_turn >( *this );
}
private :
const std::vector< so_5::mbox_t > m_workers;
const std::chrono::milliseconds m_turn_pause;
void evt_next_turn(mhood_t< msg_next_turn >)
{
generate_new_requests( random( 100, 200 ) );
so_5::send_delayed< msg_next_turn >( *this, m_turn_pause );
}
void generate_new_requests( unsigned int requests )
{
const auto size = m_workers.size();
for( unsigned int i = 0; i != requests; ++i )
so_5::send< msg_start_thinking >( m_workers[ i % size ] );
so_5::send< log_message >( m_logger,
std::to_string( requests ) + " requests are sent" );
}
static unsigned int
random( unsigned int left, unsigned int right )
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::uniform_int_distribution< unsigned int >{left, right}(gen);
}
};
{
public :
a_worker_t( context_t ctx )
+ limit_then_drop< msg_start_thinking >( 50 ) )
{}
void so_define_agent() override
{
so_default_state().event( [](mhood_t< msg_start_thinking >) {
std::this_thread::sleep_for(
std::chrono::milliseconds( 10 ) );
} );
}
};
{
env,
"stats_listener" ).
binder(),
logger->so_direct_mbox() );
env,
"workers",
3 );
std::vector< so_5::mbox_t > workers;
for( int i = 0; i != 5; ++i )
{
worker_disp.binder( worker_binding_params ) );
workers.push_back( w->so_direct_mbox() );
}
env, "generator" );
generator_disp.binder(),
logger->so_direct_mbox(),
std::move( workers ) );
});
std::this_thread::sleep_for( std::chrono::seconds(50) );
}
int main()
{
try
{
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}