SObjectizer 5.8
Loading...
Searching...
No Matches
so_5/mchain_multi_consumers/main.cpp
/*
* An example for demonstration of using receive from several threads.
*/
#include <iostream>
#include <string>
// Main SObjectizer header file.
#include <so_5/all.hpp>
using namespace std;
size_t get_workers_count()
{
auto c = thread::hardware_concurrency();
if( c > 2 )
// Decrement count of thread because one thread will be producer.
c -= 1;
else if( 1 == c )
// We must have at least two consumer threads.
c = 2;
return c;
}
void demo()
{
// Type of message to be received from every consumer thread as a result.
struct consumer_result
{
thread::id m_id;
size_t m_values_received;
uint64_t m_sum;
};
// A SObjectizer instance.
// Message chain to be used for values speading between worker threads.
auto values_ch = so_5::create_mchain( sobj,
// Chain is size-limited with blocking of sender on overload.
// Wait on overloaded mchain for 5min.
chrono::minutes{5},
// No more than 300 messages in chain.
300u,
// Space for mchain will be preallocated.
// What to do on overflow.
// This value has no sence because we use too large time limit.
// Because of that use hardest case.
// Message chain to be used for results from consumers.
// A very simple chain will be created for that.
auto results_ch = create_mchain( sobj );
// Create workers.
const auto workers_count = get_workers_count();
vector< thread > workers;
workers.reserve( workers_count );
for( size_t i = 0; i != workers_count; ++i )
workers.emplace_back( thread{ [&values_ch, &results_ch] {
// Receive all data from input chain.
size_t received = 0u;
uint64_t sum = 0u;
receive( from( values_ch ).handle_all(),
[&sum, &received]( unsigned int v ) {
++received;
sum += v;
} );
// Send result back.
so_5::send< consumer_result >( results_ch,
this_thread::get_id(), received, sum );
} } );
cout << "Workers created: " << workers_count << endl;
// Send a bunch of values for consumers.
for( unsigned int i = 0; i != 10000; ++i )
so_5::send< unsigned int >( values_ch, i );
// No more values will be sent.
// Receive responses from consumers.
// Exactly workers_count results expected.
from( results_ch ).handle_n( workers_count ),
[]( const consumer_result & r ) {
cout << "Thread: " << r.m_id
<< ", values: " << r.m_values_received
<< ", sum: " << r.m_sum
<< endl;
} );
// All consumer threads must be finished.
for_each( begin(workers), end(workers), []( thread & t ) { t.join(); } );
// SObjectizer will be stopped automatically.
}
int main()
{
try
{
demo();
}
catch( const exception & ex )
{
cerr << "Error: " << ex.what() << endl;
}
return 0;
}
A helper header file for including all public SObjectizer stuff.
A wrapped environment.
@ abort_app
Application must be aborted.
@ preallocated
Storage must be preallocated once and doesn't change after that.
mchain_receive_params_t< mchain_props::msg_count_status_t::undefined > from(mchain_t chain)
A helper function for simplification of creation of mchain_receive_params instance.
Definition mchain.hpp:1540
mchain_t create_mchain(environment_t &env)
Create size-unlimited chain.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
constexpr exceptions_enabled_t exceptions_enabled
Value that indicates that exceptions are enabled.
void close_retain_content(Exceptions_Control exceptions_control, const mchain_t &ch) noexcept(noexcept(details::should_terminate_if_throws_t< Exceptions_Control >::value))
Helper function for closing a message chain with retaining all its content.
Definition mchain.hpp:708
mchain_receive_result_t receive(const mchain_receive_params_t< Msg_Count_Status > &params, Handlers &&... handlers)
Advanced version of receive from mchain.
Definition mchain.hpp:1828