SObjectizer 5.8
Loading...
Searching...
No Matches
so_5/introduce_named_mbox/main.cpp
/*
* Simple usage of environment_t::introduce_named_mbox()
* and unique_subscribers mbox.
*/
#include <so_5/all.hpp>
#include <iostream>
#include <memory>
// Data type to be passed between agents.
struct data_t
{
std::string m_stage{ "initial" };
std::string m_prefix;
std::string m_payload;
std::string m_suffix;
};
std::ostream & operator<<( std::ostream & to, const data_t & what )
{
return (to << "(<" << what.m_stage << ">:[" << what.m_prefix
<< "]= '" << what.m_payload << "' =[" << what.m_suffix << "])");
}
// Messages to be used for interaction between agents.
// Because all messages are similar we use class templates for them.
struct preprocess_tag {};
struct process_tag {};
struct postprocess_tag {};
template< typename Tag >
struct msg_handle_data final : public so_5::message_t
{
std::unique_ptr< data_t > m_data;
const so_5::mbox_t m_reply_to;
msg_handle_data(
std::unique_ptr< data_t > data,
so_5::mbox_t reply_to )
: m_data{ std::move(data) }
, m_reply_to{ std::move(reply_to) }
{}
};
template< typename Tag >
struct msg_handling_finished final : public so_5::message_t
{
std::unique_ptr< data_t > m_data;
explicit msg_handling_finished(
std::unique_ptr< data_t > data )
: m_data{ std::move(data) }
{}
};
using msg_preprocess_data = msg_handle_data< preprocess_tag >;
using msg_preprocess_finished = msg_handling_finished< preprocess_tag >;
using msg_process_data = msg_handle_data< process_tag >;
using msg_process_finished = msg_handling_finished< process_tag >;
using msg_postprocess_data = msg_handle_data< postprocess_tag >;
using msg_postprocess_finished = msg_handling_finished< postprocess_tag >;
// Helper function that makes a named unique-subscriber mbox.
[[nodiscard]]
make_processing_mbox( so_5::environment_t & env )
{
so_5::mbox_namespace_name_t{ "sample.introduce_named_mbox" },
"processing",
[&env]() {
} );
}
// Type of agent for coordination of data processing.
class processing_manager_t final : public so_5::agent_t
{
const so_5::mbox_t m_processing_mbox;
public:
processing_manager_t( context_t ctx )
: so_5::agent_t{ std::move(ctx) }
, m_processing_mbox{ make_processing_mbox( so_environment() ) }
{}
void so_define_agent() override
{
.event( &processing_manager_t::evt_preprocess_finished )
.event( &processing_manager_t::evt_process_finished )
.event( &processing_manager_t::evt_postprocess_finished )
;
}
void so_evt_start() override
{
// Data to be processed by several workers.
auto data = std::make_unique< data_t >();
data->m_payload = "Hello, World";
std::cout << "data to be processed: " << *data << std::endl;
// Initiate processing.
m_processing_mbox,
std::move(data),
}
private:
void evt_preprocess_finished(
mutable_mhood_t< msg_preprocess_finished > cmd )
{
std::cout << "preprocessed data: " << *(cmd->m_data) << std::endl;
// Initiate the next stage.
m_processing_mbox,
std::move(cmd->m_data),
}
void evt_process_finished(
mutable_mhood_t< msg_process_finished > cmd )
{
std::cout << "processed data: " << *(cmd->m_data) << std::endl;
// Initiate the next stage.
m_processing_mbox,
std::move(cmd->m_data),
}
void evt_postprocess_finished(
mutable_mhood_t< msg_postprocess_finished > cmd )
{
std::cout << "postprocessed data: " << *(cmd->m_data) << std::endl;
// Finish the example.
}
};
// Type of agent for preprocessing of the data.
class preprocessor_t final : public so_5::agent_t
{
const so_5::mbox_t m_processing_mbox;
public:
preprocessor_t( context_t ctx )
: so_5::agent_t{ std::move(ctx) }
, m_processing_mbox{ make_processing_mbox( so_environment() ) }
{}
void so_define_agent() override
{
so_subscribe( m_processing_mbox )
.event( []( mutable_mhood_t<msg_preprocess_data> cmd ) {
cmd->m_data->m_stage = "preprocessed";
cmd->m_data->m_prefix = "-=#";
cmd->m_data->m_suffix = "#=-";
cmd->m_reply_to,
std::move(cmd->m_data) );
} );
}
};
// Type of agent for processing of the data.
class processor_t final : public so_5::agent_t
{
const so_5::mbox_t m_processing_mbox;
public:
processor_t( context_t ctx )
: so_5::agent_t{ std::move(ctx) }
, m_processing_mbox{ make_processing_mbox( so_environment() ) }
{}
void so_define_agent() override
{
so_subscribe( m_processing_mbox )
.event( []( mutable_mhood_t<msg_process_data> cmd ) {
cmd->m_data->m_stage = "processed";
// Reverse the content of the data.
cmd->m_data->m_payload = std::string{
cmd->m_data->m_payload.rbegin(),
cmd->m_data->m_payload.rend()
};
cmd->m_reply_to,
std::move(cmd->m_data) );
} );
}
};
// Type of agent for processing of the data.
class postprocessor_t final : public so_5::agent_t
{
const so_5::mbox_t m_processing_mbox;
public:
postprocessor_t( context_t ctx )
: so_5::agent_t{ std::move(ctx) }
, m_processing_mbox{ make_processing_mbox( so_environment() ) }
{}
void so_define_agent() override
{
so_subscribe( m_processing_mbox )
.event( []( mutable_mhood_t<msg_postprocess_data> cmd ) {
cmd->m_data->m_stage = "postprocessed";
// Add prefix and suffix to the payload.
cmd->m_data->m_payload = cmd->m_data->m_prefix + " " +
cmd->m_data->m_payload + " " + cmd->m_data->m_suffix;
cmd->m_reply_to,
std::move(cmd->m_data) );
} );
}
};
int main()
{
try
{
env.introduce_coop( []( so_5::coop_t & coop ) {
// Fill the coop with manager and workers.
coop.make_agent< processing_manager_t >();
coop.make_agent< preprocessor_t >();
coop.make_agent< processor_t >();
coop.make_agent< postprocessor_t >();
} );
} );
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception caught: " << x.what() << std::endl;
}
return 2;
}
A helper header file for including all public SObjectizer stuff.
A base class for agents.
Definition agent.hpp:673
virtual void so_define_agent()
Hook on define agent for SObjectizer.
Definition agent.cpp:841
subscription_bind_t so_subscribe_self()
Initiate subscription to agent's direct mbox.
Definition agent.hpp:1416
void so_deregister_agent_coop_normally()
A helper method for deregistering agent's coop in case of normal deregistration.
Definition agent.cpp:982
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
Definition agent.cpp:762
subscription_bind_t so_subscribe(const mbox_t &mbox_ref)
Initiate subscription.
Definition agent.hpp:1359
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
Definition agent.cpp:701
Agent cooperation.
Definition coop.hpp:389
Agent * make_agent(Args &&... args)
Helper method for simplification of agents creation.
Definition coop.hpp:792
SObjectizer Environment.
mbox_t introduce_named_mbox(mbox_namespace_name_t mbox_namespace, nonempty_name_t mbox_name, const std::function< mbox_t() > &mbox_factory)
Introduce named mbox with user-provided factory.
decltype(auto) introduce_coop(Args &&... args)
Helper method for simplification of cooperation creation and registration.
A class for the name of mbox_namespace.
A base class for agent messages.
Definition message.hpp:47
std::enable_if< details::is_agent_method_pointer< details::method_arity::unary, Method_Pointer >::value, subscription_bind_t & >::type event(Method_Pointer pfn, thread_safety_t thread_safety=not_thread_safe)
Make subscription to the message.
Definition agent.hpp:3490
Private part of message limit implementation.
Definition agent.cpp:33
void launch(Init_Routine &&init_routine)
Launch a SObjectizer Environment with default parameters.
Definition api.hpp:142
mbox_t make_unique_subscribers_mbox(so_5::environment_t &env)
Factory function for creation of a new instance of unique_subscribers mbox.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.