#include <iostream>
#include <random>
std::mutex g_trace_mutex;
class locker_t
{
public :
locker_t( std::mutex & m ) : m_lock( m ) {}
locker_t( locker_t && o ) : m_lock( std::move( o.m_lock ) ) {}
operator bool() const { return true; }
private :
std::unique_lock< std::mutex > m_lock;
};
#define TRACE() \
if( auto l = locker_t{ g_trace_mutex } ) std::cout
class random_generator_mixin_t
{
public :
random_generator_mixin_t()
{
m_random_engine.seed();
}
int
random( int l, int h )
{
return std::uniform_int_distribution<>( l, h )( m_random_engine );
}
private :
std::default_random_engine m_random_engine;
};
{
std::string m_to;
std::string m_from;
std::string m_payload;
std::string m_attributes;
std::string m_generator;
application_request(
std::string to,
std::string payload,
std::string attributes,
std::string generator )
: m_to(
std::move( to ) )
, m_payload(
std::move( payload ) )
, m_attributes(
std::move( attributes ) )
, m_generator(
std::move( generator ) )
{}
};
private random_generator_mixin_t
{
public :
a_generator_t(
context_t ctx,
std::string name,
const std::vector< so_5::mbox_t > & workers_mboxes )
, m_name(
std::move( name ) )
, m_workers_mboxes( workers_mboxes )
{}
{
&a_generator_t::evt_next_turn );
}
{
so_5::send< msg_next_turn >( *this );
}
private :
const std::string m_name;
const std::vector< so_5::mbox_t > m_workers_mboxes;
void evt_next_turn()
{
const int requests = random( 1, 100 );
TRACE() << "GEN(" << m_name << ") turn started, requests="
<< requests << std::endl;
std::vector< so_5::mbox_t > live_workers( m_workers_mboxes );
int sent = 0;
while( sent < requests && !live_workers.empty() )
{
if( generate_next_request( live_workers ) )
++sent;
}
const auto next_turn_pause = std::chrono::milliseconds( random(0, 50) );
TRACE() << "GEN(" << m_name << ") requests generated="
<< sent << ", will sleep for "
<< next_turn_pause.count() << "ms" << std::endl;
so_5::send_delayed< msg_next_turn >( *this, next_turn_pause );
}
bool generate_next_request( std::vector< so_5::mbox_t > & workers )
{
auto it = workers.begin();
if( workers.size() > 1 )
std::advance( it, random( 0, static_cast< int >(workers.size()) - 1 ) );
auto request = std::unique_ptr< application_request >(
new application_request(
"Mr.Alexander Graham Bell",
"Mr.Thomas A. Watson",
"Mr. Watson - Come here - I want to see you",
"BestEffort,InMemory,NormalPriority",
m_name ) );
auto result = push_request_to_receiver( *it, std::move( request ) );
if( !result )
workers.erase( it );
return result;
}
bool push_request_to_receiver(
std::unique_ptr< application_request > req )
{
try
{
.wait_for( std::chrono::milliseconds( 10 ) )
.sync_get( std::move( req ) );
}
catch( const std::exception & x )
{
TRACE()<< "GEN(" << m_name << ") failed to push request: "
<< x.what() << std::endl;
}
return false;
}
};
{
public :
a_receiver_t(
context_t ctx,
std::string name,
std::size_t max_receiver_capacity )
, m_name(
std::move( name ) )
, max_capacity( max_receiver_capacity )
{
m_requests.reserve( max_capacity );
}
{
this >>= st_not_full;
st_not_full
.event( &a_receiver_t::evt_store_request )
.event< msg_take_requests >( &a_receiver_t::evt_take_requests );
st_overload
.event( &a_receiver_t::evt_reject_request )
.event< msg_take_requests >( &a_receiver_t::evt_take_requests );
}
private :
const std::string m_name;
const std::size_t max_capacity;
std::vector< application_request > m_requests;
bool evt_store_request( const application_request & what )
{
m_requests.push_back( what );
if( m_requests.size() < max_capacity )
return true;
else
{
this >>= st_overload;
return false;
}
}
bool evt_reject_request( const application_request & what )
{
TRACE() << "REC(" << m_name << ") reject request from "
<< what.m_generator << std::endl;
return false;
}
std::vector< application_request >
evt_take_requests()
{
std::vector< application_request > result;
result.swap( m_requests );
TRACE() << "REC(" << m_name << ") takes requests off, count: "
<< result.size() << std::endl;
m_requests.reserve( max_capacity );
this >>= st_not_full;
return result;
}
};
private random_generator_mixin_t
{
public :
a_processor_t(
context_t ctx,
std::string name,
, m_name(
std::move( name ) )
, m_receiver( receiver )
{}
{
&a_processor_t::evt_next_turn );
}
{
so_5::send< msg_next_turn >( *this );
}
private :
const std::string m_name;
void evt_next_turn()
{
auto requests = take_requests();
if( requests.empty() )
{
TRACE() << "PRO(" << m_name << ") no request received, sleeping"
<< std::endl;
so_5::send_delayed< msg_next_turn >(
*this, std::chrono::milliseconds( 25 ) );
}
else
{
process_requests( requests );
so_5::send< msg_next_turn >( *this );
}
}
std::vector< application_request >
take_requests()
{
try
{
std::vector< application_request >,
a_receiver_t::msg_take_requests >
( m_receiver, std::chrono::milliseconds( 20 ) );
}
catch( const std::exception & x )
{
TRACE() << "PRO(" << m_name << ") failed to take requests: "
<< x.what() << std::endl;
}
return std::vector< application_request >();
}
void process_requests( const std::vector< application_request > & requests )
{
TRACE() << "PRO(" << m_name << ") start processing, requests="
<< requests.size() << std::endl;
const auto processing_time = std::chrono::microseconds(
requests.size() * static_cast< unsigned int >(random( 150, 1500 )) );
std::this_thread::sleep_for( processing_time );
TRACE() << "PRO(" << m_name << ") processing took: "
<< processing_time.count() / 1000.0 << "ms" << std::endl;
}
};
std::vector< so_5::mbox_t >
{
std::vector< so_5::mbox_t > result;
std::size_t capacities[] = { 25, 35, 40, 15, 20 };
int i = 0;
for( auto c : capacities )
{
"r" + std::to_string(i), c );
const auto receiver_mbox = receiver->so_direct_mbox();
result.push_back( receiver_mbox );
processor_disp->binder(),
"p" + std::to_string(i), receiver_mbox );
} );
++i;
}
return result;
}
{
auto receivers = create_processing_coops( env );
for( int i = 0; i != 3; ++i )
"g" + std::to_string(i), receivers );
} );
std::this_thread::sleep_for( std::chrono::seconds( 10 ) );
}
int main()
{
try
{
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}