#include <so_5/all.hpp>
#include <asio.hpp>
#include <asio/ip/tcp.hpp>
#include <sstream>
using asio::ip::tcp;
using hires_clock = std::chrono::high_resolution_clock;
class resolver_t final : public so_5::agent_t
{
public :
struct resolve final : public so_5::message_t
{
so_5::mbox_t m_reply_to;
std::string m_what;
std::size_t m_index;
resolve( so_5::mbox_t reply_to, std::string what, std::size_t index )
: m_reply_to(std::move(reply_to))
, m_what(std::move(what))
, m_index(index)
{}
};
struct resolve_successed final : public so_5::message_t
{
std::size_t m_index;
asio::ip::address m_result;
resolve_successed( std::size_t index, asio::ip::address result )
: m_index(index)
, m_result(std::move(result))
{}
};
struct resolve_failed : public so_5::message_t
{
std::size_t m_index;
std::string m_description;
resolve_failed( std::size_t index, std::string description )
: m_index(index)
, m_description(std::move(description))
{}
};
resolver_t( context_t ctx, asio::io_context & io_context )
: so_5::agent_t( ctx )
, m_resolver( io_context )
{
so_subscribe_self().event( &resolver_t::on_resolve );
}
private :
tcp::resolver m_resolver;
void
on_resolve( const resolve & msg )
{
m_resolver.async_resolve(
msg.m_what,
std::string() ,
tcp::resolver::numeric_service |
tcp::resolver::address_configured,
[reply_to = msg.m_reply_to, index = msg.m_index](
const asio::error_code & ec,
tcp::resolver::results_type results )
{
handle_resolve_result( reply_to, index, ec, results );
} );
}
static void
handle_resolve_result(
const so_5::mbox_t & reply_to,
std::size_t index,
const asio::error_code & ec,
tcp::resolver::results_type results )
{
if( !ec )
so_5::send< resolve_successed >( reply_to, index,
results.begin()->endpoint().address() );
else
{
std::ostringstream s;
s << ec;
so_5::send< resolve_failed >( reply_to, index, s.str() );
}
}
};
class resolve_request_manager_t final : public so_5::agent_t
{
struct resolve_timeout final : public so_5::message_t
{
std::size_t m_index;
resolve_timeout(std::size_t index) : m_index(index) {}
};
public :
resolve_request_manager_t(
context_t ctx,
so_5::mbox_t resolver,
std::vector< std::string > host_names )
:
so_5::agent_t( std::move(ctx) )
, m_resolver( std::move(resolver) )
, m_data( make_data( std::move(host_names) ) )
{
so_subscribe_self()
.event( &resolve_request_manager_t::on_resolve_successed )
.event( &resolve_request_manager_t::on_resolve_failed )
.event( &resolve_request_manager_t::on_resolve_timeout );
}
virtual void
so_evt_start() override
{
initiate_some_requests();
}
private :
struct host_t
{
{
not_processed_yet,
in_progress,
resolved,
resolving_failed
};
std::string m_name;
status_t m_status = status_t::not_processed_yet;
hires_clock::time_point m_started_at;
so_5::timer_id_t m_timeout_timer;
host_t() = default;
host_t( std::string name ) : m_name( std::move(name) ) {}
};
using data_container_t = std::vector< host_t >;
const so_5::mbox_t m_resolver;
data_container_t m_data;
std::size_t m_first_unprocessed = 0u;
std::size_t m_in_progress_now = 0u;
static data_container_t
make_data( std::vector< std::string > host_names )
{
data_container_t result;
result.reserve( host_names.size() );
std::transform( std::begin(host_names), std::end(host_names),
std::back_inserter(result),
[]( auto & name ) { return host_t( std::move(name) ); } );
return result;
}
static auto
ms( const hires_clock::duration duration )
{
return std::chrono::duration_cast< std::chrono::milliseconds >(
duration ).count();
}
void
initiate_some_requests()
{
static constexpr std::size_t total_in_progress = 3u;
if( m_first_unprocessed == m_data.size() && 0 == m_in_progress_now )
so_deregister_agent_coop_normally();
else
{
while( m_first_unprocessed < m_data.size() &&
m_in_progress_now < total_in_progress )
send_next_unprocessed();
}
}
void
send_next_unprocessed()
{
auto & item = m_data[ m_first_unprocessed ];
item.m_status = host_t::status_t::in_progress;
item.m_started_at = hires_clock::now();
so_5::send< resolver_t::resolve >( m_resolver,
so_direct_mbox(),
item.m_name,
m_first_unprocessed );
item.m_timeout_timer = so_5::send_periodic< resolve_timeout >(
*this,
std::chrono::seconds(15),
std::chrono::seconds::zero(),
m_first_unprocessed );
++m_first_unprocessed;
++m_in_progress_now;
}
void
on_resolve_successed( mhood_t<resolver_t::resolve_successed> cmd )
{
handle_result(
*cmd,
[this]( const auto & result, auto & item, const auto duration ) {
item.m_status = host_t::status_t::resolved;
std::cout << item.m_name << " -> " << result.m_result
<< " (" << duration << "ms)"
<< std::endl;
} );
}
void
on_resolve_failed( mhood_t<resolver_t::resolve_failed> cmd )
{
handle_result(
*cmd,
[this]( const auto & result, auto & item, const auto duration ) {
item.m_status = host_t::status_t::resolving_failed;
std::cout << item.m_name << " FAILURE: " << result.m_description
<< " (" << duration << "ms)"
<< std::endl;
} );
}
void
on_resolve_timeout( mhood_t<resolve_timeout> cmd )
{
handle_result(
*cmd,
[this]( const auto & , auto & item, const auto duration ) {
item.m_status = host_t::status_t::resolving_failed;
std::cout << item.m_name << " FAILURE: TIMEOUT"
<< " (" << duration << "ms)"
<< std::endl;
} );
}
template< typename R, typename L >
void
handle_result( const R & msg, L && lambda )
{
const auto result_at = hires_clock::now();
auto & item = m_data[ msg.m_index ];
item.m_timeout_timer.release();
if( host_t::status_t::in_progress != item.m_status )
return;
--m_in_progress_now;
lambda( msg, item, ms( result_at - item.m_started_at ) );
initiate_some_requests();
}
};
[[nodiscard]]
auto
argv_to_host_name_list( int argc, char ** argv )
{
if( 1 == argc )
throw std::runtime_error( "a list of host names must be passed in "
"command line" );
std::vector< std::string > hosts;
std::transform( &argv[1], &argv[argc],
std::back_inserter(hosts),
[](const char * n) { return std::string(n); } );
return hosts;
}
[[nodiscard]]
auto
make_asio_disp( so_5::environment_t & env )
{
asio_disp::disp_params_t params;
params.use_own_io_context();
return asio_disp::make_dispatcher( env, "asio_disp", std::move(params) );
}
int main( int argc, char ** argv )
{
try
{
auto hosts = argv_to_host_name_list( argc, argv );
so_5::launch( [&](so_5::environment_t & env) {
env.introduce_coop( [&]( so_5::coop_t & coop ) {
auto asio_disp = make_asio_disp( env );
auto resolver = coop.make_agent_with_binder< resolver_t >(
asio_disp.binder(),
std::ref(asio_disp.io_context()) );
coop.make_agent< resolve_request_manager_t >(
resolver->so_direct_mbox(),
std::move(hosts) );
} );
} );
}
catch( const std::exception & x )
{
std::cout << "Exception caught: " << x.what() << std::endl;
}
return 0;
}
Implementation of Asio's One Thread dispatcher.
Ranges for error codes of each submodules.