#include <so_5/all.hpp>
using namespace std::literals::chrono_literals;
class service_provider_t final : public so_5::agent_t
{
const std::chrono::steady_clock::duration m_reply_delay;
public:
struct ask_service final : public so_5::signal_t {};
struct service_ack final : public so_5::signal_t {};
service_provider_t(
context_t ctx,
const std::string & service_name,
std::chrono::steady_clock::duration reply_delay)
: so_5::agent_t(std::move(ctx))
, m_reply_delay(reply_delay)
{
const auto service_mbox = so_environment().create_mbox(service_name);
so_subscribe(service_mbox).event(
[this,service_mbox](mhood_t<ask_service>) {
so_5::send_delayed<service_ack>(
service_mbox,
m_reply_delay);
});
}
};
class customer_t final : public so_5::agent_t
{
struct service_timedout final : public so_5::message_t
{
const std::string m_service_name;
service_timedout(std::string name) : m_service_name(std::move(name)) {}
};
struct finish final : public so_5::signal_t {};
public:
customer_t(context_t ctx) :
so_5::agent_t(std::move(ctx))
{
so_subscribe_self().event([this](mhood_t<finish>) {
so_deregister_agent_coop_normally();
});
}
virtual void so_evt_start() override
{
initiate_async_op_for("alpha");
initiate_async_op_for("beta");
initiate_async_op_for("gamma");
}
private:
int m_acks_received = 0;
void initiate_async_op_for(const std::string & service_name)
{
const auto service_mbox = so_environment().create_mbox(service_name);
asyncop::make<service_timedout>(*this)
.completed_on(
service_mbox,
so_default_state(),
[this, service_name](mhood_t<service_provider_t::service_ack>) {
std::cout << "ack from a service provider: "
<< service_name << std::endl;
on_ack_or_timeout();
})
.timeout_handler(
so_default_state(),
[this](mhood_t<service_timedout> cmd) {
std::cout << "*** no reply from service provider: "
<< cmd->m_service_name << std::endl;
on_ack_or_timeout();
})
.activate(
250ms,
service_name);
so_5::send<service_provider_t::ask_service>(service_mbox);
}
void on_ack_or_timeout()
{
++m_acks_received;
if(3 == m_acks_received)
so_5::send_delayed<finish>(*this, 200ms);
}
};
int main()
{
try
{
so_5::launch([](so_5::environment_t & env) {
using namespace so_5::disp::thread_pool;
env.introduce_coop(
bind_params_t{}.fifo(fifo_t::individual)),
[](so_5::coop_t & coop) {
coop.make_agent<service_provider_t>("alpha", 100ms);
coop.make_agent<service_provider_t>("beta", 200ms);
coop.make_agent<service_provider_t>("gamma", 300ms);
coop.make_agent<customer_t>();
});
});
return 0;
}
catch(const std::exception & ex)
{
std::cerr << "Exception caught: " << ex.what() << std::endl;
}
return 2;
}
Ranges for error codes of each submodules.
Implementation of time-limited asynchronous one-time operation.