#include <iostream>
#include <cstdint>
using namespace std;
template< typename M >
template< typename M, typename... Args >
stage_result_t< M >
make_result( Args &&... args )
{
return stage_result_t< M >::make(forward< Args >(args)...);
}
template< typename M >
stage_result_t< M >
make_empty()
{
return stage_result_t< M >();
}
template< typename In, typename Out >
struct handler_traits_t
{
using input = In;
using output = stage_result_t< Out >;
};
template< typename In >
struct handler_traits_t< In, void >
{
using input = In;
using output = void;
};
template< typename In, typename Out >
class stage_handler_t
{
public :
using traits = handler_traits_t< In, Out >;
using func_type = function< typename traits::output(const typename traits::input &) >;
stage_handler_t( func_type handler )
: m_handler( move(handler) )
{}
template< typename Callable >
stage_handler_t( Callable handler ) : m_handler( handler ) {}
typename traits::output
operator()( const typename traits::input & a ) const
{
return m_handler( a );
}
private :
func_type m_handler;
};
template< typename In, typename Out >
class a_stage_point_t final :
public agent_t {
public :
a_stage_point_t(
context_t ctx,
stage_handler_t< In, Out > handler,
, m_handler{ move( handler ) }
, m_next{ move(next_stage) }
{}
void so_define_agent() override
{
if( m_next )
so_subscribe_self().event( [this]( const In & evt ) {
auto r = m_handler( evt );
if( r )
} );
else
so_subscribe_self().event( [this]( const In & evt ) {
m_handler( evt );
} );
}
private :
const stage_handler_t< In, Out > m_handler;
};
template< typename In >
class a_stage_point_t< In, void > final :
public agent_t {
public :
a_stage_point_t(
context_t ctx,
stage_handler_t< In, void > handler,
, m_handler{ move( handler ) }
{
if( next_stage )
throw std::runtime_error( "sink point cannot have next stage" );
}
void so_define_agent() override
{
so_subscribe_self().event( [this]( const In & evt ) {
m_handler( evt );
} );
}
private :
const stage_handler_t< In, void > m_handler;
};
template< typename In >
class a_broadcaster_t final :
public agent_t {
public :
a_broadcaster_t(
context_t ctx,
vector< mbox_t > && next_stages )
, m_next_stages( move(next_stages) )
{}
void so_define_agent() override
{
so_subscribe_self().event( &a_broadcaster_t::evt_broadcast );
}
private :
const vector< mbox_t > m_next_stages;
{
for( const auto & mbox : m_next_stages )
}
};
using stage_builder_t = function< mbox_t(coop_t &, mbox_t) >;
template< typename In, typename Out >
struct stage_t
{
stage_builder_t m_builder;
};
template< typename R, typename A >
struct callable_traits_typedefs_t
{
using arg_type = A;
using result_type = R;
};
template< typename T >
struct lambda_traits_t;
template< typename M, typename A, typename T >
struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) const >
: public callable_traits_typedefs_t< M, A >
{};
template< typename A, typename T >
struct lambda_traits_t< void (T::*)(const A &) const >
: public callable_traits_typedefs_t< void, A >
{};
template< typename M, typename A, typename T >
struct lambda_traits_t< stage_result_t< M >(T::*)(const A &) >
: public callable_traits_typedefs_t< M, A >
{};
template< typename A, typename T >
struct lambda_traits_t< void (T::*)(const A &) >
: public callable_traits_typedefs_t< void, A >
{};
template< typename T >
struct callable_traits_t
: public lambda_traits_t< decltype(&T::operator()) >
{};
template< typename M, typename A >
struct callable_traits_t< stage_result_t< M >(*)(const A &) >
: public callable_traits_typedefs_t< M, A >
{};
template< typename A >
struct callable_traits_t< void(*)(const A &) >
: public callable_traits_typedefs_t< void, A >
{};
template<
typename Callable,
typename In = typename callable_traits_t< Callable >::arg_type,
typename Out = typename callable_traits_t< Callable >::result_type >
stage_t< In, Out >
stage( Callable handler )
{
stage_builder_t builder{
[h = std::move(handler)](
{
return coop.make_agent< a_stage_point_t<In, Out> >(
std::move(h),
std::move(next_stage) )
->so_direct_mbox();
}
};
return { std::move(builder) };
}
template< typename In, typename Out, typename... Rest >
void
move_sink_builder_to(
vector< stage_builder_t > & receiver,
stage_t< In, Out > && first,
Rest &&... rest )
{
receiver.emplace_back( move( first.m_builder ) );
if constexpr( 0u != sizeof...(rest) )
move_sink_builder_to<In>( receiver, forward< Rest >(rest)... );
}
template< typename In, typename Out, typename... Rest >
vector< stage_builder_t >
collect_sink_builders( stage_t< In, Out > && first, Rest &&... stages )
{
vector< stage_builder_t > receiver;
receiver.reserve( 1 + sizeof...(stages) );
move_sink_builder_to<In>(
receiver,
move(first),
std::forward<Rest>(stages)... );
return receiver;
}
template< typename In, typename Out, typename... Rest >
stage_t< In, void >
broadcast( stage_t< In, Out > && first, Rest &&... stages )
{
stage_builder_t builder{
[broadcasts = collect_sink_builders(
move(first), forward< Rest >(stages)...)]
{
vector< mbox_t > mboxes;
mboxes.reserve( broadcasts.size() );
for( const auto & b : broadcasts )
mboxes.emplace_back( b( coop,
mbox_t{} ) );
return coop.make_agent< a_broadcaster_t<In> >( std::move(mboxes) )
->so_direct_mbox();
}
};
return { std::move(builder) };
}
template< typename In, typename Out1, typename Out2 >
stage_t< In, Out2 >
operator|(
stage_t< In, Out1 > &&
prev,
stage_t< Out1, Out2 > &&
next )
{
return {
stage_builder_t{
{
auto m =
next.m_builder( coop, std::move(next_stage) );
return prev.m_builder( coop, std::move(m) );
}
}
};
}
template< typename In, typename Out, typename... Args >
make_pipeline(
stage_t< In, Out > && sink,
Args &&... args )
{
}
struct raw_measure
{
int m_meter_id;
uint8_t m_high_bits;
uint8_t m_low_bits;
};
struct raw_value
{
raw_measure m_data;
};
struct valid_raw_value
{
raw_measure m_data;
};
struct calculated_measure
{
int m_meter_id;
float m_measure;
};
struct sensor_value
{
calculated_measure m_data;
};
struct suspicional_value
{
calculated_measure m_data;
};
struct alarm_detected
{
int m_meter_id;
};
stage_result_t< valid_raw_value >
validation( const raw_value & v )
{
if( 0x7 >= v.m_data.m_high_bits )
return make_result< valid_raw_value >( v.m_data );
else
return make_empty< valid_raw_value >();
}
stage_result_t< sensor_value >
conversion( const valid_raw_value & v )
{
return make_result< sensor_value >(
calculated_measure{ v.m_data.m_meter_id,
0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) +
v.m_data.m_low_bits) } );
}
void
archivation( const sensor_value & v )
{
clog << "archiving (" << v.m_data.m_meter_id << ","
<< v.m_data.m_measure << ")" << endl;
}
void
distribution( const sensor_value & v )
{
clog << "distributing (" << v.m_data.m_meter_id << ","
<< v.m_data.m_measure << ")" << endl;
}
stage_result_t< suspicional_value >
range_checking( const sensor_value & v )
{
if( v.m_data.m_measure >= 45.0f )
return make_result< suspicional_value >( v.m_data );
else
return make_empty< suspicional_value >();
}
class alarm_detector
{
using clock = chrono::steady_clock;
public :
stage_result_t< alarm_detected >
operator()( const suspicional_value & v )
{
if( m_previous )
if( *m_previous + chrono::milliseconds(25) > clock::now() )
{
m_previous = std::nullopt;
return make_result< alarm_detected >( v.m_data.m_meter_id );
}
m_previous = clock::now();
return make_empty< alarm_detected >();
}
private :
optional< clock::time_point > m_previous;
};
void
alarm_initiator( const alarm_detected & v )
{
clog << "=== alarm (" << v.m_meter_id << ") ===" << endl;
}
void
alarm_distribution( ostream & to, const alarm_detected & v )
{
to << "alarm_distribution (" << v.m_meter_id << ")" << endl;
}
class a_parent_t final :
public agent_t {
public :
a_parent_t( context_t ctx ) :
agent_t( ctx )
{}
void so_define_agent() override
{
so_subscribe_self().event(
}
void so_evt_start() override
{
auto pipeline = make_pipeline( *this,
stage(validation) | stage(conversion) | broadcast(
stage(archivation),
stage(distribution),
stage(range_checking) | stage(alarm_detector{}) | broadcast(
stage(alarm_initiator),
stage( []( const alarm_detected & v ) {
alarm_distribution( cerr, v );
} )
)
) );
send_delayed< shutdown >( *this, chrono::seconds(1) );
for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 )
send_delayed< raw_value >(
pipeline,
chrono::milliseconds( i ),
raw_measure{ 0, 0, i } );
}
};
int main()
{
try
{
} );
}
catch( const exception & x )
{
cerr << "Exception: " << x.what() << endl;
}
}