#include <so_5/all.hpp>
using namespace std::chrono_literals;
using request_id_t = int;
struct request_t final
{
request_id_t m_id;
std::string m_data;
};
struct delivery_receipt_t final
{
request_id_t m_id;
};
class processor_t final : public so_5::agent_t
{
state_t st_normal{this, "normal"};
state_t st_busy{this, "busy"};
public:
processor_t(context_t ctx) :
so_5::agent_t{std::move(ctx)}
{
this >>= st_normal;
st_normal.event(&processor_t::on_request);
st_busy.time_limit(2s, st_normal);
}
private:
void on_request(mhood_t<request_t> cmd)
{
std::cout << "processor: on_request(" << cmd->m_id << ", "
<< cmd->m_data << ")" << std::endl;
this >>= st_busy;
}
};
class custom_envelope_t final : public envelope_ns::just_envelope_t
{
const so_5::mbox_t m_to;
const request_id_t m_id;
public:
custom_envelope_t(
so_5::message_ref_t payload,
so_5::mbox_t to,
request_id_t id)
: envelope_ns::just_envelope_t{std::move(payload)}
, m_to{std::move(to)}
, m_id{id}
{}
void
access_hook(
access_context_t context,
handler_invoker_t & invoker) noexcept override
{
if(access_context_t::handler_found == context)
{
so_5::send<delivery_receipt_t>(m_to, m_id);
}
envelope_ns::just_envelope_t::access_hook(context, invoker);
}
};
class requests_generator_t final : public so_5::agent_t
{
const so_5::mbox_t m_processor;
std::map<request_id_t, std::string> m_requests;
struct resend_requests final : public so_5::signal_t {};
public:
requests_generator_t(context_t ctx, so_5::mbox_t processor)
:
so_5::agent_t{std::move(ctx)}
, m_processor{std::move(processor)}
{
so_subscribe_self()
.event(&requests_generator_t::on_delivery_receipt)
.event(&requests_generator_t::on_resend);
}
void so_evt_start() override
{
m_requests.emplace(0, "First");
m_requests.emplace(1, "Second");
m_requests.emplace(2, "Third");
m_requests.emplace(3, "Four");
send_requests();
}
private:
void on_delivery_receipt(mhood_t<delivery_receipt_t> cmd)
{
std::cout << "request delivered: " << cmd->m_id << std::endl;
m_requests.erase(cmd->m_id);
if(m_requests.empty())
so_deregister_agent_coop_normally();
}
void on_resend(mhood_t<resend_requests>)
{
std::cout << "time to resend requests, pending requests: "
<< m_requests.size() << std::endl;
send_requests();
}
void send_requests()
{
for(const auto & item : m_requests)
{
std::cout << "sending request: (" << item.first << ", "
<< item.second << ")" << std::endl;
envelope_ns::make<request_t>(item.first, item.second)
.envelope<custom_envelope_t>(so_direct_mbox(), item.first)
.send_to(m_processor);
}
so_5::send_delayed<resend_requests>(*this, 3s);
}
};
int main()
{
so_5::launch([](so_5::environment_t & env) {
env.introduce_coop([](so_5::coop_t & coop) {
auto processor = coop.make_agent<processor_t>();
coop.make_agent<requests_generator_t>(processor->so_direct_mbox());
});
});
}
An implementation of just_envelope_t class.
Ranges for error codes of each submodules.
Variuos send functions for simplification of sending enveloped messages.