#include <algorithm>
#include <chrono>
#include <iostream>
#include <iterator>
#include <random>
#include <queue>
unsigned int random_value( unsigned int left, unsigned int right )
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::uniform_int_distribution< unsigned int >{left, right}(gen);
}
void imitate_hard_work( unsigned int pause )
{
std::this_thread::sleep_for( std::chrono::milliseconds{ pause } );
}
using clock_type = std::chrono::steady_clock;
const unsigned int max_dimension = 10000;
struct request_metadata
{
clock_type::time_point m_generated_at;
clock_type::time_point m_queued_at;
clock_type::time_point m_processing_started_at;
clock_type::time_point m_processing_finished_at;
};
using request_metadata_shptr = std::shared_ptr< request_metadata >;
{
const unsigned int m_id;
const unsigned int m_dimension;
request_metadata_shptr m_metadata;
generation_request(
unsigned int id,
unsigned int dimension,
request_metadata_shptr metadata )
: m_id( id )
, m_dimension( dimension )
, m_metadata( std::move(metadata) )
{}
};
{
const unsigned int m_id;
request_metadata_shptr m_metadata;
generation_result(
unsigned int id,
request_metadata_shptr metadata )
: m_id( id )
, m_metadata( metadata )
{}
};
{
const unsigned int m_id;
generation_rejected( unsigned int id )
: m_id( id )
{}
};
{
public :
request_generator(
context_t ctx,
, m_interaction_mbox( std::move( interaction_mbox ) )
{}
{
.
event( &request_generator::evt_produce_next );
.
event( &request_generator::evt_generation_result )
.event( &request_generator::evt_generation_rejected );
}
{
so_5::send< produce_next >( *this );
}
private :
unsigned int m_last_id = 0;
unsigned int m_x = 0;
unsigned int m_y = 0;
unsigned int m_z = 0;
void evt_produce_next(mhood_t< produce_next >)
{
auto id = ++m_last_id;
auto dimension = generate_next_dimension();
auto metadata = std::make_shared< request_metadata >();
metadata->m_generated_at = clock_type::now();
so_5::send< generation_request >( m_interaction_mbox,
id, dimension, std::move( metadata ) );
std::cout << "generated {" << id << "}, dimension: "
<< dimension << std::endl;
so_5::send_delayed< produce_next >( *this,
std::chrono::milliseconds( random_value( 0, 100 ) ) );
}
void evt_generation_result( const generation_result & evt )
{
auto ms =
[]( const clock_type::time_point a,
const clock_type::time_point b )
{
return std::to_string( std::chrono::duration_cast<
std::chrono::milliseconds >( b - a ).count() ) + "ms";
};
const auto & meta = *evt.m_metadata;
const auto & d1 = ms( meta.m_generated_at, meta.m_queued_at );
const auto & d2 = ms( meta.m_queued_at,
meta.m_processing_started_at );
const auto & d3 = ms( meta.m_processing_started_at,
meta.m_processing_finished_at );
std::cout << "result {" << evt.m_id << "}: "
<< "in route: " << d1 << ", waiting(p"
<< "): " << d2
<< ", processing(p"
<< "): " << d3
<< std::endl;
}
void evt_generation_rejected( const generation_rejected & evt )
{
std::cout << "*** REJECTION: " << evt.m_id << std::endl;
}
unsigned int generate_next_dimension()
{
if( 0 == m_x + m_y + m_z )
{
m_x = 60; m_y = 30; m_z = 10;
}
auto make_result =
[]( unsigned int & param, unsigned int l, unsigned int r ) {
param -= 1;
return random_value( l, r );
};
const auto v = random_value( 0, m_x + m_y + m_z );
if( v < m_x )
return make_result( m_x, 100, 2999 );
else if( v < m_x + m_y )
return make_result( m_y, 3000, 7999 );
else
return make_result( m_z, 8000, 10000 );
}
};
struct request_scheduling_data
{
using request_queue = std::queue< generation_request_holder >;
struct priority_data
{
request_queue m_requests;
bool m_processor_is_free = true;
};
priority_data &
{
}
};
{
: m_priority( priority )
{}
};
{
: m_priority( priority )
{}
};
{
public :
request_acceptor(
context_t ctx,
request_scheduling_data & data )
+ limit_then_transform( 10,
[this]( const generation_request & req ) {
return make_transformed< generation_rejected >(
m_interaction_mbox,
req.m_id );
} ) )
, m_interaction_mbox( std::move( interaction_mbox ) )
, m_data( data )
{}
{
.
event( &request_acceptor::evt_request );
}
private :
request_scheduling_data & m_data;
void evt_request( mhood_t< generation_request > evt )
{
static_cast< std::size_t >( evt->m_dimension / step );
auto & info = m_data.m_processors[ pos ];
if( info.m_requests.size() < 100 )
{
if( info.m_requests.empty() && info.m_processor_is_free )
so_5::send< processor_can_be_loaded >(
m_interaction_mbox,
info.m_requests.push( evt.make_holder() );
evt->m_metadata->m_queued_at = clock_type::now();
}
else
so_5::send< generation_rejected >( m_interaction_mbox,
evt->m_id );
}
};
{
public :
request_scheduler(
context_t ctx,
request_scheduling_data & data )
, m_interaction_mbox( std::move( interaction_mbox ) )
, m_data( data )
{}
void so_define_agent() override
{
so_subscribe( m_interaction_mbox )
.event( &request_scheduler::evt_processor_can_be_loaded )
.event( &request_scheduler::evt_ask_for_work );
}
void so_evt_start() override
{
*this,
so_environment() ).binder(),
{
create_processor_agent( coop, p );
} );
} );
}
private :
request_scheduling_data & m_data;
void evt_processor_can_be_loaded( const processor_can_be_loaded & evt )
{
auto & info = m_data.info_at( evt.m_priority );
if( info.m_processor_is_free )
try_schedule_work_to( evt.m_priority );
}
void evt_ask_for_work( const ask_for_work & evt )
{
m_data.info_at( evt.m_priority ).m_processor_is_free = true;
try_schedule_work_to( evt.m_priority );
}
void create_processor_agent(
{
{
public :
processor_t(
context_t ctx,
:
so_5::agent_t{ ctx + priority
+ limit_then_abort< generation_request >( 1 ) }
{
so_subscribe_self().event(
[priority, interaction_mbox]
( mhood_t<generation_request> cmd ) {
cmd->m_metadata->m_processing_started_at =
clock_type::now();
cmd->m_metadata->m_processor_prio = priority;
imitate_hard_work( cmd->m_dimension / 10 );
cmd->m_metadata->m_processing_finished_at =
clock_type::now();
so_5::send< generation_result >(
interaction_mbox,
cmd->m_id,
cmd->m_metadata );
so_5::send< ask_for_work >(
interaction_mbox,
priority );
} );
}
};
priority,
std::cref(m_interaction_mbox) );
m_data.info_at( priority ).m_processor = processor->so_direct_mbox();
}
{
auto & free_processor_info = m_data.info_at( priority );
const auto max_deep = 5;
auto deep = 0;
do
{
auto & info = m_data.info_at( priority );
if( !info.m_requests.empty() )
{
auto req = info.m_requests.front();
info.m_requests.pop();
free_processor_info.m_processor,
req );
free_processor_info.m_processor_is_free = false;
break;
}
else
{
{
++deep;
}
else
break;
}
}
while( deep < max_deep );
}
};
{
std::make_unique< request_scheduling_data >() );
prio_disp.binder(),
mbox, *data );
prio_disp.binder(),
mbox, *data );
} );
}
int main()
{
try
{
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}