SObjectizer  5.5
so-5.3.0: Synchronous interaction with agents

Introduction

There is a new major change introduced in v.5.3.0 – a possibility to synchronous wait for result of message processing. An agent, called service client, could ask another agent (called service_handler) to do something by asynchronously sending message to service_handler and then synchronously waiting for result of that request.

All parameters for service request must be defined as ordinary message or signal type:

// Message for service request with parameters.
struct msg_convert : public so_5::rt::message_t
{
// String to conversion to int.
std::string m_value;
msg_convert( const std::string & v ) : m_value( v ) {}
};
// Signal for service request without parameters.
struct msg_get_status : public so_5::rt::signal_t {}

Service_handler is an ordinary agent which defines its service_request handling methods by traditional way. The only change with previous version is that event handlers could return values since v.5.3.0:

// service_handler event for msg_convert request.
int
a_my_service_t::evt_convert( const msg_convert & msg )
{
std::istringstream s( msg.m_value );
int result;
s >> result;
if( s.fail() )
throw std::invalid_argument(
"unable to convert to int: '" + msg.m_value + "'" );
return result;
}
// service_handler event for msg_get_status request.
std::string
a_my_service_t::evt_get_status()
{
return "Ready";
}

Event handler methods are subscribed as usual:

class a_my_service_t : public so_5::rt::agent_t
{
public :
virtual void
so_define_agent()
{
so_subscribe( mbox ).event( &a_my_service_t::evt_convert );
so_subscribe( mbox ).event( so_5::signal< msg_get_status >,
&a_my_service_t::evt_get_status );
}
...
};

All those mean that there is almost no differences for service_handler: it always works as ordinary agent. And all event handlers for that agent are invoked on agent’s working thread.

All requests are delivered to service_handler’s agent as ordinary messages via agent’s event queue. The mechanism for event handler invocation for service requests is the same as for traditional asynchronous messages.

To send service request it is necessary to invoke mbox’s get_one template method. It means that exactly one request of the specified type will be requested:

const so_5::rt::mbox_ref_t & mbox = ...; // Mbox representing service.
// The start part of service request initiation:
mbox->get_one< std::string >()... // other stuff will go here

Then it is necessary to declare how the waiting of service request will be performed. There are the three variants.

The first one, the most simple and the most dangerous, is wait service request result infinitely long:

// Initiate the service request and wait for result infinitely.
int v = mbox->get_one< int >().wait_forever().sync_get( new msg_convert( "42" ) );
// Invocation of service request by a signal.
std::string s = mbox->get_one< std::string >().wait_forever().sync_get< msg_get_status >();

The second one, simple and one of the most safe, is wait service request result for the specified amount of time:

// Initiate the service request and wait for 300ms.
int v = mbox->get_one< int >()
.wait_for( std::chrono::milliseconds(300) )
.sync_get( new msg_convert( "42" ) );
// Invocation of service request by a signal and wait for 5s.
std::string s = mbox->get_one< std::string >()
.wait_for( std::chrono::seconds(5) )
.sync_get< msg_get_status >();

The third one, the most complex and the most flexible, is receive std::future object associated with service request and then perform any operation user wish:

std::future< int > v = mbox->get_one< int >.async( new msg_convert( "42" ) );
std::future< std::string > s = mbox->get_one< std::string >.async< msg_get_status >();
// Do some useful stuff until conversion result get ready.
while( true )
{
... // Do something here.
// Check the future status.
if( std::future_status::ready == v.wait_for( std::chrono::seconds(0) ) )
break;
}
// Conversion result is ready and could be obtained without waiting.
std::cout << "Conversion result: " << v.get() << std::endl;
// Wait for get_status request infinitely.
std::cout << "Service status: " << s.get() << std::endl;

Deadlocks

There is no any help to user from SObjectizer to avoid or to diagnose deadlocks when synchronous interactions are used. It is the user task to guarantee that service_handler and service client are working on different threads and there is no any complex internal interactions like: agent A1 calls A2 which calls A3 which calls A1.

It is the reason why the wait_forever() in a dangerous form. If there is a deadlock the application will hang forever.

The safest way – is to use wait_for() with the reasonable timeout. Even if a deadlock is here it will be automatically destroyed after timeout expiration.

Exceptions

There is a strong possibility of exceptions appearance during synchronous interactions.

Some exceptions could be raised by SObjectizer. Those are:

In such cases SObjectizer raises object of so_5::exception_t class with the appropriate error_code inside.

The exceptions could be raised by event handlers itself. As in the sample of evt_convert event above. Those exceptions are intercepted by SObjectizer and then transferred to the std::future connected with the service request. It means that those exceptions will be reraised during std::future::get() invocation (as in the case of direct call by used and in the case of calling get() inside of wait_forever() and wait_for()). So the application exception which are went out from event handler will be propagated to service client.

The dialing with non-handled application exception is the main difference between ordinary event-handling and service_request-handling. When agent lets the exception go out during ordinary event handler invocation then SObjectizer calls agent’s so_exception_reaction() method and does appropriate actions. But when agent lets the exception go out during service_request handling then the exception is propagated to service client and so_exception_reaction() is not called.

Usage simplifications

Use of variadic templates

If the compiler supports variadic templates then make_async and make_sync_get methods could be used instead of async and sync_get. It allows to write:

mbox.get_one< int >().make_async< msg_convert >( "42" );

instead of

mbox.get_one< int >().wait_forever().make_sync_get< msg_convert >( "42" );

run_one as simplier form of get_one<void>

There could be cases where service_handler returns void. For example:

// A signal to flushing memory buffers to the disk.
struct msg_flush : public so_5::rt::signal_t {};
class a_membuffer_manager_t : public so_5::rt::agent_t
{
public :
...
virtual void
so_define_agent()
{
so_subscribe( mbox ).event( so_5::signal< msg_flush >,
&a_membuffer_manager_t::evt_flush );
...
}
...
void
evt_flush()
{
// Do buffer content flushing...
}
};

That service could be called by ordinary get_one method chain:

// Returns when flushing is done.
mbox.get_one< void >().wait_forever().sync_get< msg_flush >();

But instead of get_one<void> it is possible to use run_one method that is a synonym for get_one<void>:

// Returns when flushing is done.
mbox.run_one().wait_forever().sync_get< msg_flush >();

Or:

auto f = mbox.run_one().async< msg_flush >();
...
f.get(); // Returns when flushing is done.

Use auto for temporary proxy objects

Method chains get_one(), get_one().wait_forever() and get_one().wait_for() produces temporary proxy objects which could be captured as local variable by auto keyword. It would lead to shorter form of service calling:

auto svc = mbox.get_one< int >().wait_for( std::chrono::seconds(5) );
auto v1 = svc.make_sync_request< msg_convert >( "42" );
auto v2 = svc.make_sync_request< msg_convert >( "43" );
...