#include <iostream>
#include <cstdint>
template< typename M >
template< typename M, typename... Args >
stage_result_t< M >
make_result( Args &&... args )
{
return stage_result_t< M >( new M(forward< Args >(args)...) );
}
template< typename M >
stage_result_t< M >
make_empty()
{
return stage_result_t< M >();
}
template< typename In, typename Out >
struct handler_traits_t
{
using input = In;
using output = stage_result_t< Out >;
};
template< typename In >
struct handler_traits_t< In, void >
{
using input = In;
using output = void;
};
template< typename In, typename Out >
class stage_handler_t
{
public :
using traits = handler_traits_t< In, Out >;
using func_type = function< typename traits::output(const typename traits::input &) >;
stage_handler_t( func_type
handler )
{}
template< typename Callable >
typename traits::output
operator()( const typename traits::input & a ) const
{
return m_handler( a );
}
private :
func_type m_handler;
};
template< typename In, typename Out >
class a_stage_point_t :
public agent_t {
public :
a_stage_point_t(
context_t ctx,
stage_handler_t< In, Out >
handler,
, m_next{ move(next_stage) }
{}
virtual void so_define_agent() override
{
if( m_next )
so_subscribe_self().event( [=]( const In & evt ) {
auto r = m_handler( evt );
if( r )
m_next->deliver_message( move( r ) );
} );
else
so_subscribe_self().event( [=]( const In & evt ) {
m_handler( evt );
} );
}
private :
const stage_handler_t< In, Out > m_handler;
};
template< typename In >
class a_stage_point_t< In, void > :
public agent_t {
public :
a_stage_point_t(
context_t ctx,
stage_handler_t< In, void >
handler,
{
if( next_stage )
throw std::runtime_error( "sink point cannot have next stage" );
}
virtual void so_define_agent() override
{
so_subscribe_self().event( [=]( const In & evt ) {
m_handler( evt );
} );
}
private :
const stage_handler_t< In, void > m_handler;
};
template< typename In >
class a_broadcaster_t :
public agent_t {
public :
a_broadcaster_t(
context_t ctx,
vector< mbox_t > && next_stages )
, m_next_stages( move(next_stages) )
{}
virtual void so_define_agent() override
{
so_subscribe_self().event( &a_broadcaster_t::evt_broadcast );
}
private :
const vector< mbox_t > m_next_stages;
{
auto msg = evt.make_reference();
for( const auto & mbox : m_next_stages )
mbox->deliver_message( msg );
}
};
struct source_t {};
static const source_t src = source_t{};
using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >;
template< typename In, typename Out >
struct stage_t
{
stage_builder_t m_builder;
};
template< typename R, typename A >
struct callable_traits_typedefs_t
{
using arg_type = A;
using result_type = R;
};
template< typename T >
struct lambda_traits_t;
template< typename M, typename A, typename T >
struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) const >
: public callable_traits_typedefs_t< M, A >
{};
template< typename A, typename T >
struct lambda_traits_t< void (T::*)(const A &) const >
: public callable_traits_typedefs_t< void, A >
{};
template< typename M, typename A, typename T >
struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) >
: public callable_traits_typedefs_t< M, A >
{};
template< typename A, typename T >
struct lambda_traits_t< void (T::*)(const A &) >
: public callable_traits_typedefs_t< void, A >
{};
template< typename T >
struct callable_traits_t
: public lambda_traits_t< decltype(&T::operator()) >
{};
template< typename M, typename A >
struct callable_traits_t< stage_result_t< M >(*)(const A &) >
: public callable_traits_typedefs_t< M, A >
{};
template< typename A >
struct callable_traits_t< void(*)(const A &) >
: public callable_traits_typedefs_t< void, A >
{};
template<
typename Callable,
typename In = typename callable_traits_t< Callable >::arg_type,
typename Out = typename callable_traits_t< Callable >::result_type >
stage_handler_t< In, Out >
{
return stage_handler_t< In, Out >{ move(
handler) };
}
template< typename In >
struct broadcast_sinks_t
{
vector< stage_builder_t > m_builders;
};
template< typename In, typename Out >
void
move_sink_builder_to(
stage_t< In, Out > && first,
vector< stage_builder_t > & receiver )
{
receiver.emplace_back( move( first.m_builder ) );
}
template< typename In, typename Out1, typename Out2, typename... Rest >
void
move_sink_builder_to(
stage_t< In, Out1 > && first,
vector< stage_builder_t > & receiver,
stage_t< In, Out2 > && second,
Rest &&... rest )
{
receiver.emplace_back( move( first.m_builder ) );
move_sink_builder_to( move(second), receiver, forward< Rest >(rest)... );
}
template< typename In, typename Out >
vector< stage_builder_t >
collect_sink_builders( stage_t< In, Out > && first )
{
vector< stage_builder_t > receiver;
move_sink_builder_to( move(first), receiver );
return receiver;
}
template< typename In, typename Out1, typename Out2, typename... Rest >
vector< stage_builder_t >
collect_sink_builders(
stage_t< In, Out1 > && first,
stage_t< In, Out2 > && second,
Rest &&... stages )
{
vector< stage_builder_t > receiver;
receiver.reserve( 2 + sizeof...(stages) );
move_sink_builder_to( move(first), receiver, move(second), forward< Rest >(stages)... );
return receiver;
}
template< typename In, typename Out, typename... Rest >
broadcast_sinks_t< In >
broadcast( stage_t< In, Out > && first, Rest &&... stages )
{
return broadcast_sinks_t< In >{
collect_sink_builders( move(first), forward< Rest >(stages)... )
};
}
template< typename In, typename Out >
stage_t< In, Out >
operator|(
const source_t & ,
stage_handler_t< In, Out > &&
handler )
{
return stage_t< In, Out >{
stage_builder_t{
{
auto stage_agent = coop.
make_agent< a_stage_point_t< In, Out > >(
return stage_agent->so_direct_mbox();
}
}
};
}
template< typename In, typename Out1, typename Out2 >
stage_t< In, Out2 >
operator|(
stage_t< In, Out1 > &&
prev,
stage_handler_t< Out1, Out2 > &&
next )
{
return stage_t< In, Out2 >{
stage_builder_t{
{
auto stage_agent = coop.make_agent< a_stage_point_t< Out1, Out2 > >(
move(
next), move(next_stage) );
return prev.m_builder( coop, stage_agent->so_direct_mbox() );
}
}
};
}
template< typename In, typename Broadcast_In >
stage_t< In, void >
operator|(
stage_t< In, Broadcast_In > &&
prev,
broadcast_sinks_t< Broadcast_In > && broadcasts )
{
return stage_t< In, void >{
stage_builder_t{
{
vector< mbox_t > mboxes;
mboxes.reserve( broadcasts.m_builders.size() );
for( const auto & b : broadcasts.m_builders )
mboxes.emplace_back( b( coop,
mbox_t{} ) );
auto broadcaster = coop.make_agent< a_broadcaster_t< Broadcast_In > >(
move(mboxes) );
return prev.m_builder( coop, broadcaster->so_direct_mbox() );
}
}
};
}
template< typename In, typename Out, typename... Args >
make_pipeline(
stage_t< In, Out > && sink,
Args &&... args )
{
}
struct raw_measure
{
int m_meter_id;
uint8_t m_high_bits;
uint8_t m_low_bits;
};
{
raw_measure m_data;
raw_value( raw_measure data )
: m_data( data )
{}
};
{
raw_measure m_data;
valid_raw_value( raw_measure data )
: m_data( data )
{}
};
struct calculated_measure
{
int m_meter_id;
float m_measure;
};
{
calculated_measure m_data;
sensor_value( calculated_measure data )
: m_data( data )
{}
};
{
calculated_measure m_data;
suspicion_value( calculated_measure data )
: m_data( data )
{}
};
{
int m_meter_id;
alarm_detected( int meter_id )
: m_meter_id( meter_id )
{}
};
stage_result_t< valid_raw_value >
validation( const raw_value & v )
{
if( 0x7 >= v.m_data.m_high_bits )
return make_result< valid_raw_value >( v.m_data );
else
return make_empty< valid_raw_value >();
}
stage_result_t< sensor_value >
conversion( const valid_raw_value & v )
{
return make_result< sensor_value >(
calculated_measure{ v.m_data.m_meter_id,
0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) +
v.m_data.m_low_bits) } );
}
void
archivation( const sensor_value & v )
{
clog << "archiving (" << v.m_data.m_meter_id << ","
<< v.m_data.m_measure << ")" << endl;
}
void
distribution( const sensor_value & v )
{
clog << "distributing (" << v.m_data.m_meter_id << ","
<< v.m_data.m_measure << ")" << endl;
}
stage_result_t< suspicion_value >
range_checking( const sensor_value & v )
{
if( v.m_data.m_measure >= 45.0f )
return make_result< suspicion_value >( v.m_data );
else
return make_empty< suspicion_value >();
}
class alarm_detector
{
using clock = chrono::steady_clock;
public :
stage_result_t< alarm_detected >
operator()( const suspicion_value & v )
{
if( m_has_value )
if( m_previous + chrono::milliseconds(25) > clock::now() )
{
m_has_value = false;
return make_result< alarm_detected >( v.m_data.m_meter_id );
}
m_previous = clock::now();
m_has_value = true;
return make_empty< alarm_detected >();
}
private :
clock::time_point m_previous;
bool m_has_value = false;
};
void
alarm_initiator( const alarm_detected & v )
{
clog << "=== alarm (" << v.m_meter_id << ") ===" << endl;
}
void
alarm_distribution( ostream & to, const alarm_detected & v )
{
to << "alarm_distribution (" << v.m_meter_id << ")" << endl;
}
{
public :
a_parent_t( context_t ctx ) :
agent_t( ctx )
{}
virtual void so_define_agent() override
{
[this] { so_deregister_agent_coop_normally(); } );
}
virtual void so_evt_start() override
{
auto pipeline = make_pipeline( *this,
src | stage(validation) | stage(conversion) | broadcast(
src | stage(archivation),
src | stage(distribution),
src | stage(range_checking) | stage(alarm_detector{}) | broadcast(
src | stage(alarm_initiator),
src | stage( []( const alarm_detected & v ) {
alarm_distribution( cerr, v );
} )
)
),
send_delayed< shutdown >( *this, chrono::seconds(1) );
for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 )
send_delayed< raw_value >(
so_environment(),
pipeline,
chrono::milliseconds( i ),
raw_measure{ 0, 0, i } );
}
};
int main()
{
try
{
} );
}
catch( const exception & x )
{
cerr << "Exception: " << x.what() << endl;
}
}