#if defined( _MSC_VER )
#if defined( __clang__ )
#pragma clang diagnostic ignored "-Wreserved-id-macro"
#endif
#define _CRT_SECURE_NO_WARNINGS
#endif
#include <iostream>
#include <ctime>
#include <sstream>
#include <queue>
{
std::string m_id;
std::time_t m_deadline;
msg_request(
std::string id,
std::time_t deadline,
: m_id( std::move( id ) )
, m_deadline( deadline )
, m_reply_to( reply_to )
{}
};
{
std::string m_id;
std::string m_result;
std::time_t m_started_at;
msg_positive_reply(
std::string id,
std::string result,
std::time_t started_at )
: m_id( std::move( id ) )
, m_result( std::move( result ) )
, m_started_at( started_at )
{}
};
{
std::string m_id;
std::time_t m_deadline;
msg_negative_reply(
std::string id,
std::time_t deadline )
: m_id( std::move( id ) )
, m_deadline( deadline )
{}
};
std::string time_to_string( std::time_t t )
{
char r[ 32 ];
std::strftime( r, sizeof(r) - 1, "%H:%M:%S", std::localtime(&t) );
return r;
}
{
public :
, m_processor_mbox( std::move(processor_mbox) )
{}
{
.
event( &a_generator_t::evt_positive_reply )
.
event( &a_generator_t::evt_negative_reply );
}
{
unsigned int delays[] = { 1, 4, 5, 3, 9, 15, 12 };
const std::time_t now = std::time(nullptr);
int i = 0;
for( auto d : delays )
{
std::ostringstream idstream;
idstream << "i=" << i << ";d=" << d;
const std::string id = idstream.str();
const std::time_t deadline = now + d;
so_5::send< msg_request >( m_processor_mbox,
id,
deadline,
std::cout << "sent: [" << id << "], deadline: "
<< time_to_string( deadline ) << std::endl;
++m_expected_replies;
++i;
}
}
private :
unsigned int m_expected_replies = 0;
void evt_positive_reply( const msg_positive_reply & evt )
{
std::cout
<< time_to_string( std::time(nullptr) ) << " - OK: ["
<< evt.m_id << "], started_at: "
<< time_to_string( evt.m_started_at )
<< ", result: " << evt.m_result
<< std::endl;
count_reply();
}
void evt_negative_reply( const msg_negative_reply & evt )
{
std::cout
<< time_to_string( std::time(nullptr) ) << " - FAIL: ["
<< evt.m_id << "], deadline: "
<< time_to_string( evt.m_deadline )
<< std::endl;
count_reply();
}
void count_reply()
{
--m_expected_replies;
if( !m_expected_replies )
}
};
{
public :
a_collector_t( context_t ctx ) :
so_5::agent_t( ctx )
{}
{
}
{
this >>= st_performer_is_free;
st_performer_is_free
.event( &a_collector_t::evt_first_request );
st_performer_is_busy
.event( &a_collector_t::evt_yet_another_request )
.event( &a_collector_t::evt_select_next_job )
.event( &a_collector_t::evt_check_deadline );
}
private :
struct request_comparator_t
{
bool operator()(
const msg_request_holder_t & a,
const msg_request_holder_t & b ) const
{
return a->m_deadline > b->m_deadline;
}
};
const state_t st_performer_is_free{ this };
const state_t st_performer_is_busy{ this };
std::priority_queue<
msg_request_holder_t,
std::vector< msg_request_holder_t >,
request_comparator_t >
m_pending_requests;
void evt_first_request( mhood_t< msg_request > evt )
{
this >>= st_performer_is_busy;
}
void evt_yet_another_request( mhood_t< msg_request > evt )
{
const std::time_t now = std::time(nullptr);
if( now < evt->m_deadline )
{
m_pending_requests.push( evt.make_holder() );
so_5::send_delayed< msg_check_deadline >(
*this,
std::chrono::seconds( evt->m_deadline - now ) );
}
else
{
send_negative_reply( *evt );
}
}
void evt_select_next_job(mhood_t< msg_select_next_job >)
{
if( m_pending_requests.empty() )
this >>= st_performer_is_free;
else
{
auto & request = m_pending_requests.top();
m_pending_requests.pop();
}
}
void evt_check_deadline(mhood_t< msg_check_deadline >)
{
const std::time_t now = std::time(nullptr);
while( !m_pending_requests.empty() )
{
auto & request = m_pending_requests.top();
if( now >= request->m_deadline )
{
send_negative_reply( *request );
m_pending_requests.pop();
}
else
break;
}
}
void send_negative_reply( const msg_request & request )
{
so_5::send< msg_negative_reply >(
request.m_reply_to,
request.m_id,
request.m_deadline );
}
};
{
public :
, m_collector_mbox( std::move(collector_mbox) )
{}
{
}
private :
void evt_request( const msg_request & evt )
{
const std::time_t started_at = std::time(nullptr);
std::this_thread::sleep_for( std::chrono::seconds(4) );
so_5::send< msg_positive_reply >(
evt.m_reply_to,
evt.m_id,
"-=<" + evt.m_id + ">=-",
started_at );
so_5::send< a_collector_t::msg_select_next_job >( m_collector_mbox );
}
};
{
collector->so_direct_mbox() );
collector->set_performer_mbox( performer->so_direct_mbox() );
c.
make_agent< a_generator_t >( collector->so_direct_mbox() );
});
}
int main()
{
try
{
return 0;
}
catch( const std::exception & x )
{
std::cerr << "*** Exception caught: " << x.what() << std::endl;
}
return 2;
}