#include <iostream>
#include <chrono>
#include <random>
using steady_clock = std::chrono::steady_clock;
using log_msg = std::string;
{
public:
std::cout << *cmd << std::endl;
} );
}
};
return logger->so_direct_mbox();
}
template< typename A >
{
so_5::send< log_msg >( to, std::forward< A >(a) );
}
struct msg_maker
{
std::ostringstream m_os;
template< typename A >
{
m_os << std::forward< A >(a);
return *this;
}
};
inline void operator<<=(
const so_5::mbox_t & to, msg_maker & maker )
{
to <<= maker.m_os.str();
}
{
public :
shutdowner( context_t ctx, unsigned int producers_count )
, m_producers_left{ producers_count }
{
.
event( [
this](mhood_t< another_producer_finished >) {
--m_producers_left;
if( !m_producers_left )
} );
}
{
so_5::send< another_producer_finished >(
}
private :
unsigned int m_producers_left;
};
struct request
{
std::string m_payload;
};
struct reply
{
std::string m_payload;
};
{
public :
producer( context_t ctx,
std::string name,
unsigned int requests )
, m_name( std::move(name) )
, m_logger_mbox{ std::move(logger_mbox) }
, m_consumer_mbox{ std::move(consumer_mbox) }
, m_requests_left{ requests }
{
.
event( &producer::evt_reply )
.event( &producer::evt_send_next );
}
{
so_5::send< send_next >( *this );
}
private :
const std::string m_name;
unsigned int m_requests_left;
void evt_send_next(mhood_t< send_next >)
{
if( m_requests_left )
{
const auto started_at = steady_clock::now();
try
{
so_5::send< request >( m_consumer_mbox,
m_name + "_request_" + std::to_string( m_requests_left ) );
const auto ms = std::chrono::duration_cast<
std::chrono::milliseconds >( steady_clock::now()
- started_at ).count();
m_logger_mbox <<= msg_maker() << m_name << ": request sent in "
<< ms << "ms";
}
{
m_logger_mbox <<= msg_maker() << m_name << ": request NOT SENT, "
<< ex.what();
so_5::send< send_next >( *this );
}
}
else
shutdowner::producer_finished( *this );
}
void evt_reply( const reply & msg )
{
m_logger_mbox <<= msg_maker() << m_name << ": reply received, "
<< msg.m_payload;
--m_requests_left;
so_5::send< send_next >( *this );
}
};
{
public :
:
so_5::agent_t{ ctx + limit_then_drop< chain_has_requests >(1) }
, m_logger_mbox{ std::move(logger_mbox) }
{
10,
std::chrono::milliseconds(150) )
.not_empty_notificator( [this] {
so_5::send< chain_has_requests >( *this );
} ) );
&consumer::process_requests );
}
{
return m_chain->as_mbox();
}
private :
void process_requests(mhood_t< chain_has_requests >)
{
from( m_chain ).handle_n( 5 ).no_wait_on_empty(),
[]( const request & req ) {
std::this_thread::sleep_for( random_pause() );
so_5::send< reply >( req.m_who, req.m_payload + "#handled" );
} );
m_logger_mbox <<= msg_maker()
<< "=== " << r.handled() << " request(s) handled";
so_5::send< chain_has_requests >( *this );
}
static std::chrono::milliseconds
random_pause()
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::chrono::milliseconds(
std::uniform_int_distribution< unsigned int >{2u, 25u}(gen) );
}
};
{
const unsigned int producers = 40;
producers );
const auto logger_mbox = make_logger( coop );
logger_mbox )->consumer_mbox();
const auto bind_params = tp_disp::bind_params_t{}
.fifo( tp_disp::fifo_t::individual )
.max_demands_at_once( 1 );
for( unsigned int i = 0; i != producers; ++i )
disp.binder( bind_params ),
"producer-" + std::to_string( i + 1 ),
logger_mbox,
consumer_mbox,
10u );
}
int main()
{
try
{
} );
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}