SObjectizer  5.5
so-5.5 In Depth - Message Delivery Filters

There are situations when an agent must filter the messages received. A good example is receiving run-time monitoring information from SObjectizer Environment (see so-5.5 In Depth - Run-Time Monitoring): there are lot of messages and an agent have to handle only few of them:

void timer_thread_stats_listener::evt_timer_quantities(
const so_5::rt::stats::messages::quantity< std::size_t > & evt )
{
// Ignore messages unrelated to timer thread.
return;
... // Processing of related to timer thread messages.
}

This kind of filtering of inappropriate messages is not efficient and has significant run-time cost. Every message the agent subscribed for is passed to agent's queue, then enqueued, then passed to agent's event handler. Only to be thrown out. Message delivery filters are more efficient message-filtering mechanism.

Message delivery filter – is a predicate which gets the message instance and returns true if message is allowed to be delivered to corresponding recipient or false if message must be thrown out.

For the example above a message delivery filter can be set as:

void timer_thread_stats_listener::so_define_agent()
{
so_set_delivery_filter(
so_environment().stats_controller().mbox(),
[]( const so_5::rt::stats::messages::quantity< std::size_t > & msg ) {
return so_5::rt::stats::prefixes::timer_thread() == msg.m_prefix;
} );
...
}

In v.5.5.5 only ordinary agents can set delivery filters. And delivery filters must be represented as lambda-function (or functional object) with just one argument – constant reference to message instance. Delivery filter lambda-function must return bool.

An agent stores all the delivery filters defined. All delivery filters are destroyed automatically at the end of agent's lifetime. A delivery filter can be explicitly dropped and destroyed by the so_drop_delivery_filter() method:

void timer_thread_stats_listener::evt_some_action()
{
so_drop_delivery_filter<
so_5::rt::stats::messages::quantity< std::size_t >
>( so_environment().stats_controller().mbox() );
...
}

If there is need to change the delivery filter it is enough to call so_set_delivery_filter() yet another time without the need of calling so_drop_delivery_filter():

// Setting the filter first time.
so_set_delivery_filter( mbox, []( const some_message & msg ) {
return ...; // some predicate
} );
... // Some messages could sent to mbox here.
// Changing the filter by new one.
so_set_delivery_filter( mbox, []( const some_message & msg ) {
return ... // another predicate
} );
... // Some messages could sent to mbox here.

Method so_set_delivery_filter() sets the reference to delivery filter to the appropriate message mbox. That mbox now knows that before message delivery for the corresponding recipient the delivery filter must be invoked to check the necessity of message delivery. If delivery filter returns false message is not stored to the recipient queue at all.

Since v.5.5.5 multi-producer multi-consumer mboxes work this way:

Please note that delivery filter is applied to a message only once – during message dispatching inside mbox. When a message is stored in an agent's event queue and then extracted for processing the message instance no more checked. It can be very important if delivery filter is replaced by new one. For example:

void my_agent::evt_demo()
{
// All actions are performed inside an event handler. It means
// that agent can handle all the messages sent only after finish
// of that event handler.
// MPMC-mbox is necessary.
const auto mbox = so_environment().create_local_mbox();
// Create event subscription for message.
so_subscribe( mbox ).event( []( const current_temperature & evt ) {
cout << "temperature is: " << evt.value() << endl;
} );
// Send the first message. Temperature is 10 degrees.
so_5::send< current_temperature >( mbox, 10 );
// Sets the first filter.
so_set_delivery_filter( mbox, []( const current_temperature & evt ) {
// Allows only messages with values outside of normal range [0,45].
return !( 0 <= evt.value() && evt.value() <= 45 );
} );
// Send the second message. Temperature is -10 degress.
so_5::send< current_temperatur >( mbox, -10 );
// Replace the filter with another.
so_set_delivery_filter( mbox, []( const current_temperature & evt ) {
// Allows only messages with values in very narrow range.
return ( 20 <= evt.value() && evt.value() <= 22 );
} );
// Send the third message. Temperature is 21 degrees.
so_5::send< current_temperature >( mbox, 21 );
}

In that case an agent will receive all three messages. The first one because there was no any filter during send operation. The second one because it passes the first delivery filter. And the third one because it passes the second delivery filter.

This approach could be a surprise for a developer if the developer thinks that delivery filter applies for messages which is already in agent queue. It is not true.

Important note. A delivery filter can be set only for MPMC-mbox. It is impossible to set a delivery filter to a MPSC (direct) mbox.

It is because main purpose of MPSC-mbox is to be very fast and efficient direct channel to an agent. Any additional checking will slow down the work of MPSC-mbox.

There are some demands to delivery filters implementations:

In the ideal case a delivery filter must be represented by a small and light side-effect free lambda-function (like in the example above).

The delivery filter setup and event subscription is a different, not tightly related operations. It is possible to set up delivery filter without subscribing to that message. It is also possible to subscribing to message without setting a delivery filter up.

In the situation when a delivery filter is set but there are no subscriptions to the message the message box stores only filter. The mbox knows that there are no subscriptions to the message and will not call delivery filter during a message dispatch.

A delivery filter and subscription for a message can be handled independently. For example:

using namespace std;
using namespace so_5;
using namespace so_5::rt;
// Message to be filtered.
using msg_sample = tuple_as_message_t< mtag<0>, int, string >;
// A signal for doing second part of example.
struct msg_second_part : public signal_t {};
// A signal for finish the example.
struct msg_shutdown : public signal_t {};
// Main example agent.
// Ordinary agent is necessary because a delivery filter can be set
// only by ordinary agent.
class a_example_t : public agent_t
{
public :
a_example_t( context_t ctx )
: agent_t( ctx )
, m_mbox( so_environment().create_local_mbox() )
{}
virtual void
so_define_agent() override
{
so_subscribe_self()
.event< msg_second_part >( &a_example_t::evt_second_part )
.event< msg_shutdown >( [this] {
so_deregister_agent_coop_normally();
} );
}
virtual void
so_evt_start() override
{
// Subscribe for the message. Without filter.
so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
cout << "[first]: " << get<0>(evt) << "-" << get<1>(evt) << endl;
} );
// Sending several messages...
// All of them will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "only-subscription" );
send< msg_sample >( m_mbox, 1, "only-subscription" );
// Setting a delivery filter for message.
so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
return 1 == get<0>(evt);
} );
// Sending several messages...
// Only one will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "subscription-and-filter" );
send< msg_sample >( m_mbox, 1, "subscription-and-filter" );
// Take time for processing already queued messages.
send_to_agent< msg_second_part >( *this );
}
void
evt_second_part()
{
// Drop the subscription.
so_drop_subscription< msg_sample >( m_mbox );
// Sending several messages...
// No one of them will be stored to the agent's queue nor handled.
send< msg_sample >( m_mbox, 0, "only-filter" );
send< msg_sample >( m_mbox, 1, "only-filter" );
// Subscribe for the message again.
so_subscribe( m_mbox ).event( []( const msg_sample & evt ) {
cout << "[second]: " << get<0>(evt) << "-" << get<1>(evt) << endl;
} );
// Sending several messages...
// Only one will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "subscription-and-filter-2" );
send< msg_sample >( m_mbox, 1, "subscription-and-filter-2" );
// Changing the filter to new one.
so_set_delivery_filter( m_mbox, []( const msg_sample & evt ) {
return 0 == get<0>(evt);
} );
// Sending several messages...
// Only one will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "subscription-and-filter-3" );
send< msg_sample >( m_mbox, 1, "subscription-and-filter-3" );
// Dropping the filter.
so_drop_delivery_filter< msg_sample >( m_mbox );
// Sending several messages...
// All of them will be stored to the agent's queue and handled.
send< msg_sample >( m_mbox, 0, "only-subscription-2" );
send< msg_sample >( m_mbox, 1, "only-subscription-2" );
// Example could be finished.
send_to_agent< msg_shutdown >( *this );
}
private :
// Separate MPMC-mbox is necessary for delivery filter.
const mbox_t m_mbox;
};