#include <iostream>
#include <random>
#include <deque>
std::mutex g_trace_mutex;
class locker_t
{
public :
locker_t( std::mutex & m ) : m_lock( m ) {}
locker_t( locker_t && o ) : m_lock( std::move( o.m_lock ) ) {}
operator bool() const { return true; }
private :
std::unique_lock< std::mutex > m_lock;
};
#define TRACE() \
if( auto l = locker_t{ g_trace_mutex } ) std::cout
class random_generator_mixin_t
{
public :
random_generator_mixin_t()
{
m_random_engine.seed();
}
int
random( int l, int h )
{
return std::uniform_int_distribution<>( l, h )( m_random_engine );
}
private :
std::default_random_engine m_random_engine;
};
{
std::string m_to;
std::string m_from;
std::string m_payload;
std::string m_attributes;
std::string m_generator;
application_request(
std::string to,
std::string payload,
std::string attributes,
std::string generator )
: m_to(
std::move( to ) )
, m_payload(
std::move( payload ) )
, m_attributes(
std::move( attributes ) )
, m_generator(
std::move( generator ) )
{}
};
private random_generator_mixin_t
{
public :
a_generator_t(
context_t ctx,
std::string name,
const std::vector< so_5::mbox_t > & workers_mboxes )
, m_name(
std::move( name ) )
, m_workers_mboxes( workers_mboxes )
{}
{
&a_generator_t::evt_next_turn );
}
{
so_5::send< msg_next_turn >( *this );
}
private :
const std::string m_name;
const std::vector< so_5::mbox_t > m_workers_mboxes;
void evt_next_turn()
{
const int requests = random( 1, 100 );
TRACE() << "GEN(" << m_name << ") turn started, requests="
<< requests << std::endl;
std::vector< so_5::mbox_t > live_workers( m_workers_mboxes );
int sent = 0;
while( sent < requests && !live_workers.empty() )
{
if( generate_next_request( live_workers ) )
++sent;
}
const auto next_turn_pause = std::chrono::milliseconds( random(0, 50) );
TRACE() << "GEN(" << m_name << ") requests generated="
<< sent << ", will sleep for "
<< next_turn_pause.count() << "ms" << std::endl;
so_5::send_delayed< msg_next_turn >( *this, next_turn_pause );
}
bool generate_next_request(
std::vector< so_5::mbox_t > & workers )
{
auto it = workers.begin();
if( workers.size() > 1 )
std::advance( it, random( 0, static_cast< int >(workers.size()) - 1 ) );
auto request = std::unique_ptr< application_request >(
new application_request(
"Mr.Alexander Graham Bell",
"Mr.Thomas A. Watson",
"Mr. Watson - Come here - I want to see you",
"BestEffort,InMemory,NormalPriority",
m_name ) );
auto result = push_request_to_receiver( *it, std::move( request ) );
if( !result )
workers.erase( it );
return result;
}
bool push_request_to_receiver(
std::unique_ptr< application_request > req )
{
try
{
.wait_for( std::chrono::milliseconds( 10 ) )
.sync_get( std::move( req ) );
}
catch( const std::exception & x )
{
TRACE()<< "GEN(" << m_name << ") failed to push request: "
<< x.what() << std::endl;
}
return false;
}
};
{
public :
a_collector_t(
context_t ctx,
std::string name,
std::size_t max_receiver_capacity,
std::size_t max_concurrent_jobs )
, m_name(
std::move( name ) )
, max_capacity( max_receiver_capacity )
, m_available_concurrent_performers( max_concurrent_jobs )
{
}
{
}
{
.
event( &a_collector_t::evt_receive_job )
.
event< msg_select_next_job >( &a_collector_t::evt_select_next_job );
}
private :
const std::string m_name;
const std::size_t max_capacity;
std::deque< application_request > m_requests;
std::size_t m_available_concurrent_performers;
bool evt_receive_job( const application_request & what )
{
bool processed = true;
if( m_available_concurrent_performers )
{
--m_available_concurrent_performers;
so_5::send< application_request >( m_performer_mbox, what );
}
else if( m_requests.size() < max_capacity )
{
m_requests.push_back( what );
}
else
{
TRACE() << "COL(" << m_name << ") reject request from "
<< what.m_generator << std::endl;
processed = false;
}
return processed;
}
void evt_select_next_job()
{
++m_available_concurrent_performers;
if( !m_requests.empty() )
{
--m_available_concurrent_performers;
so_5::send< application_request >(
m_performer_mbox, m_requests.front() );
m_requests.pop_front();
}
}
};
private random_generator_mixin_t
{
public :
a_performer_t(
context_t ctx,
std::string name,
, m_name(
std::move( name ) )
, m_collector_mbox(
std::move(collector_mbox) )
{}
{
&a_performer_t::evt_perform_job,
}
private :
const std::string m_name;
void evt_perform_job( const application_request & job )
{
process_request( job );
so_5::send< a_collector_t::msg_select_next_job >( m_collector_mbox );
}
void process_request( const application_request & )
{
TRACE() << "PER(" << m_name << ") start processing; thread="
const auto processing_time = std::chrono::microseconds(
random( 150, 1500 ) );
std::this_thread::sleep_for( processing_time );
TRACE() << "PER(" << m_name << ") finish processing; thread="
<< processing_time.count() / 1000.0 << "ms" << std::endl;
}
};
std::vector< so_5::mbox_t >
{
std::vector< so_5::mbox_t > result;
std::size_t capacities[] = { 25, 35, 40, 15, 20 };
const std::size_t concurrent_slots = 5;
int i = 0;
for( auto c : capacities )
{
env,
concurrent_slots );
"r" + std::to_string(i), c, concurrent_slots );
auto collector_mbox = collector->so_direct_mbox();
result.push_back( collector_mbox );
"p" + std::to_string(i), collector_mbox );
collector->set_performer_mbox( performer->so_direct_mbox() );
} );
++i;
}
return result;
}
{
auto receivers = create_processing_coops( env );
p.fifo( fifo_t::individual );
} ),
for( int i = 0; i != 3; ++i )
"g" + std::to_string(i), receivers );
} );
std::this_thread::sleep_for( std::chrono::seconds( 10 ) );
}
int main()
{
try
{
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}