SObjectizer  5.5
so-5.5 In Depth - Message Chains

Purpose

The main purpose of message chain (or just mchain) mechanism is providing a way for interacting between SObjectizer- and non-SObjectizer-part of an application. The interaction in opposite direction is very simple: usual message passing via mboxes is used. For example:

class worker : public so_5::agent_t {
public:
...
virtual void so_define_agent() override {
// A mbox to be used to receive requests.
auto mbox = so_environment().create_mbox("requests");
// Subscription to messages from that mbox...
so_subscribe(mbox).event(&worker::request_handler1)
.event(&worker::request_handler2)
.event(&worker::request_handler3)
...;
}
...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
launch_sobjectizer(...);
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
so_5::send<request1>(mbox, ...);
so_5::send<request2>(mbox, ...);
...

But how to receive some messages from an agent back to non-SObjectizer-part of the application?

Message chain is the answer.

Message chain looks almost like mbox for agents. An agent can send messages to mchain exactly the same way as for mbox. So mchain can be passed to agent and agent will use it as destination for reply messages. On the other side of a mchain will be non-SObjectizer message handler. This handler will receive messages from the mchain by using special API functions and handle them appropriately.

class worker : public so_5::agent_t {
public:
worker(context_t ctx, so_5::mbox_t reply_to)
: so_5::agent_t{ctx}, m_reply_to{std::move(reply_to)}
{...}
...
virtual void so_define_agent() override {
// A mbox to be used to receive requests.
auto mbox = so_environment().create_mbox("requests");
// Subscription to messages from that mbox...
so_subscribe(mbox).event(&worker::request_handler1)
.event(&worker::request_handler2)
.event(&worker::request_handler3)
...;
}
private:
// A mbox for sending responses.
const so_5::mbox_t m_reply_to;
...
// Request handler.
void request_handler1(const request1 & r) {
... // Some processing.
// Sending response back as a message.
so_5::send<reply1>(m_reply_to, ...);
}
...
};
...
// Somewhere in the main thread...
so_5::wrapped_env_t sobj;
// Chain for receiving responses.
so_5::mchain_t reply_ch = sobj.environment().create_mchain(
// Parameters for the simplest chain without any limitations.
launch_sobjectizer(
// Passing mchain as a parameter.
// Cast it to mbox to look exactly like ordinary mbox.
reply_ch.as_mbox(), ...);
auto mbox = sobj.environment().create_mbox("requests");
...
// Sending requests to the SObjectizer-part of the application.
// And receive responses.
so_5::send<request1>(mbox, ...);
receive(reply_ch, so_5::infinite_wait, [](const reply1 &) {...});
so_5::send<request2>(mbox, ...);
receive(reply_ch, so_5::infinite_wait, [](const reply2 &) {...});
...

More Details

Types of Mchains

There are two types of mchains:

  1. Unlimited mchains. Those mchains have no limitation for number of messages to be stored inside mchain. This number is limited only by the amount of available memory and by common sense of a developer.
  2. Limited mchains. Those mchains have strict limit for number of messages. It is impossible to just push yet another message into full size-limited mchain.

Type of mchain is specified at the creation time. Once created the type of mchain cannot be changed.

Type of mchain is specified by content of so_5::mchain_params_t instance passed to so_5::environment_t::create_mchain method. There are three helper functions which return properly initialized mchain_params_t instances:

// Create size-unlimited mchain.
// Create size-limited mchain without waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m2 = env.create_mchain(
200,
// Create size-limited mchain with waiting on attempt of pushing new message
// to the full mchain.
so_5::mchain_t m3 = env.create_mchain(
200,
std::chrono::milliseconds(500)));

The type so_5::mchain_t is like the type so_5::mbox_t – it is an alias for smart intrusive pointer to so_5::abstract_message_chain_t. It means that mchain created by create_mchain will be destroyed automatically after destruction of the last mchain_t object pointed to it.

More About Size-limited Mchain

Size-limited mchains have a serious difference from size-unlimited mchains: a size-limited mchain can't contain more message than the max capacity of the mchain. So there should be some reaction on attempt to add another message to full mchain.

There could be size-limited mchains which will perform waiting for some time on attempt of pushing new message to full mchain. If there is a free place in the mchain after that waiting then new message will be stored into the mchain. An appropriate overload reaction will be performed otherwise.

There also could be size-limited mchains without any waiting on full mchain. If there is no free room in the mchain then an appropriate overload reaction will be performed immediately.

There are four overload reactions which can be selected for an mchain at the moment of mchain creation:

  1. Dropping the new message. It means that new message will be simply ignored.
  2. Removing of the oldest message from the mchain. It means that oldest message in the mchain will be removed and it will not be processed.
  3. Throwing an exception. An exception so_5::exception_t with error code so_5::rc_msg_chain_overflow will be raised as result of attempt of pushing new message to full mchain.
  4. Aborting the whole application by calling std::abort().

All those variants are described as items of enumeration so_5::mchain_props::overflow_reaction_t.

There is yet another important property which must be specified for size-limited mchain at the creation time: the type of memory usage.

Memory for storing messages inside mchain can be used dynamically: it would be allocated when mchain grows and deallocated when mchain shrinks.

Or memory for mchain could be preallocated and there will be fixed-size buffer for messages. Size of that buffer will not change during growth and shrinking of the mchain.

Types of memory usage are described as items of enumeration so_5::mchain_props::memory_usage_t.

All those traits of size-limited mchains are controlled by mchain_params_t object passed to create_mchain method. The simplest and recommended way of preparing the corresponding mchain_params_t is usage of make_limited_without_waiting_mchain_params and make_limited_with_waiting_mchain_params helper functions:

so_5::environment_t & env = ...;
// No more than 200 messages in the chain.
200,
// Memory will be allocated dynamically.
// New messages will be ignored on chain's overflow.
// No more than 200 messages in the chain.
200,
// Memory will be preallocated.
// The oldest message will be removed on chain's overflow.
// But before dropping a new message there will be 500ms timeout
std::chrono::milliseconds(500)));

How To Receive And Handle Messages From Mchain?

There are two variants of so_5::receive function which allows to receive and handle messages from a mchain.

The first variant of receive takes a mchain_t, a timeout and a list of message handlers:

...
receive(ch, std::chrono::milliseconds(500),
[](const reply1 & r) {...},
[](const reply2 & r) {...},
...
[](const replyN & r) {...});

It checks the mchain and wait for no more than 500ms if the mchain is empty. If the mchain is not empty it extracts just one message from the mchain and tries to find an appropriate handler for it. If handler is found it is called. If a handler is not found then the message extracted will be thrown out without any processing.

If the mchain is empty even after the specified timeout then receive will do nothing.

There are two special values which can be used as timeout:

There is also more advanced version of so_5::receive which can receive and handle more than one message from mchain. It receives a so_5::mchain_receive_params_t objects with list of conditions and returns control if any of those conditions becomes true. For example:

// Handle 3 messages.
// If there are no 3 messages in the mchan then wait for them.
// The receive returns control when 3 messages will be processed or when the mchain
// will be closed explicitly.
receive( from(chain).handle_n( 3 ),
handlers... );
// Handle 3 messages but wait no more than 200ms on empty mchain.
// If mchain is empty for more than 200ms then receive returns control even
// if less than 3 messages were handled.
receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
handlers... );
// Handle all messages which are going to the mchain. But handling will be finished
// if a pause between arrivals of messages exceeds 500ms.
receive( from(chain).empty_timeout( milliseconds(500) ),
handlers... );
// Handle all messages for 2s.
// The return from receive will be after 2s or if the mchain is closed explicitly.
receive( from(chain).total_time( seconds(2) ),
handlers... );
// The same as previous but the return can occurs after extraction of 1000 messages.
receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
handlers... );

There is a difference between a number of extracted messages and a number of handled messages. A message is extracted from a mchain and count of extracted messages increases. But the count of handled messages is increased only if a handler for that message type if found and called. It means that the number of extracted messages can be greater than number of handled messages.

Both receive return an object of so_5::mchain_receive_result_t type. Methods of that object allow to get number of extracted and handled messages, and also the status of receive operation.

The usage of advanced version of receive could look like:

so_5::send< a_supervisor::ask_status >( req_mbox );
auto r = receive(
from( chain ).empty_timeout( milliseconds(200) ),
[]( a_supervisor::status_idle ) {
cout << "status: IDLE" << endl;
},
[]( a_supervisor::status_in_progress ) {
cout << "status: IN PROGRESS" << endl;
},
[]( a_supervisor::status_finished v ) {
cout << "status: finished in " << v.m_ms << "ms" << endl;
} );
if( !r.handled() )
cout << "--- no response from supervisor ---" << endl;

Message Handlers

Message handlers which are passed to receive functions family must be lambda-functions or functional objects with format:

return_type handler(const message_type&);
return_type handler(message_type);

For example:

struct classical_message : public so_5::message_t { ... };
struct user_message { ... };
...
receive(chain, so_5::infinite_wait,
[](const classical_message & m) {...},
[](const user_message & m) {...},
[](const int & m) {...},
[](long m) {...});

A handler for signal of type signal_type can be created by the help of so_5::handler<signal_type>() function:

struct stopped : public so_5::signal_t {};
struct waiting : public so_5::signal_t {};
...
receive(chain, so_5::infinite_wait,
[](const classical_message & m) {...},
[](const user_message & m) {...},
so_5::handler<stopped>( []{...} ),
so_5::handler<waiting>( []{...} ));

NOTE! All message handlers must handle different message types. It is an error if some handlers are defined for the same message type.

Usage Of Mchains With Send Functions

All traditional send-functions like so_5::send, so_5::send_delayed and so_5::send_periodic work with mchains the same way they work with mboxes. It allows to write code the traditional way:

...
// Sending a message.
so_5::send<my_message>(ch, ... ); // some args for my_message's constructor.
// Sending a delayed message.
so_5::send_delayed<my_message>(ch, std::chrono::milliseconds(200),
... ); // some args for my_message's constructor
// Sending a periodic message.
auto timer_id = so_5::send_periodic<my_message>(ch,
std::chrono::milliseconds(200),
std::chrono::milliseconds(250),
... ); // some args for my_message's constructor

The functions for performing service request, so_5::request_value and so_5::request_future, work with mchain too. It means that an agent can make a service request to non-SObjectizer part of an application and receive the result of that request as usual:

// Somewhere in SObjectizer part of an application...
class worker : public so_5::agent_t {
...
const so_5::mchain_t m_request_ch;
void some_event_handler() {
auto f = so_5::request_future<response, request>(m_request_ch,
... ); // some args for request's constructor
...
handle_response(f.get());
}
};
...
// Somewhere in non-SObjectizer part of an application...
const so_5::mchain_t request_ch = ...;
...
receive(from(request_ch).handle_n(1000),
[](const request & req) -> response {...},
...);

Mchains as Mboxes

There is a method so_5::abstract_message_chain_t::as_mbox() which can represent mchain as almost ordinary mbox. This method returns so_5::mbox_t and this mbox can be used for sending messages and performing service request to the mchain.

Method as_mbox() can be useful if there is a necessity to hide the fact of mchain existence. For example, an agent inside SObjectizer part of an application can receive mbox and think that there is another agent on the opposite side:

// SObjectizer part of an application.
class worker : so_5::agent_t {
public:
worker(context_t ctx, so_5::mbox_t request_mbox)
: so_5::agent_t{ctx}, m_request_mbox{std::move(request_mbox)}
{}
...
private:
const so_5::mbox_t m_request_mbox;
...
void some_event_handler() {
auto f = so_5::request_future<response, request>(m_request_mbox,
... ); // some args for request's constructor
...
handle_response(f.get());
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<worker>(
// Passing mchain as mbox to worker agent.
ch->as_mbox());
...
} );
...
receive(from(ch).handle_n(1000),
[](const request & req) -> response {...},
...);

The only difference between ordinary mbox created by so_5::environment_t::create_mbox() and mbox created by as_mbox() is impossibility of subscriptions creation. It means that agent worker from the example above can't subscribe its event handlers to messages from m_request_mbox.

The only case when mchain can be passed to SObjectizer part directly as mchain (without casting it to mbox by as_mbox()) is a necessity of explicit closing of mchain in the SObjectizer part of an application:

class data_producer : public so_5::agent_t {
public:
data_producer(context_t ctx, so_5::mchain_t data_ch)
: so_5::agent_t{ctx}, m_data_ch{std::move(data_ch)}
{}
...
private:
const so_5::mchain_t m_data_ch;
...
void read_data_event() {
auto data = read_next_data();
if(data_read())
so_5::send<data_sample>(m_data_ch, data);
else {
// Data source is closed. No more data can be received.
// Close the data chain and finish out work.
m_data_ch->close();
so_deregister_coop_normally();
}
}
};
...
// Non-SObjectizer part of an application.
so_5::mchain_t ch = env.create_mbox(...);
env.introduce_coop([&](so_5::coop_t & coop) {
coop.make_agent<data_reader>(
// Passing mchain as mchain to data_reader agent.
ch);
...
} );
...
// Receive and handle all data samples until the chain will be closed.
receive(from(ch), [](const data_sample & data) {...});

Non-Empty Notificator For Mchain

There is a possibility to specify a function which will be called automatically when a message in stored into the empty mchain:

so_5::environment_t & env = ...;
so_5::make_unlimited_mchain_params().non_empty_notificator( []{ ... } ));

This feature can be used in GUI applications for example. Some widget can create mchain and needs to know when there are messages in that mchain. Then widget can add notificator to the mchain and send some GUI-message/signal to itself from that notificator. Some of receive functions will be called in processing of that GUI-message/signal.

Mchain Has Multi-Producer/Single-Consumer Queue

The mchain mechanism introduced in v.5.5.13 uses MPSC queue inside. It means that receive on a mchain must be called from one thread.

There will be no any damage if several receive will be called for one mchain from different threads at the same time (no messages will be lost or processed several times). But there could be not efficient work of that threads (like threads starvation).