#include <chrono>
#include <cstdlib>
#include <ctime>
#include <deque>
#include <iostream>
#include <sstream>
#include <string>
#include <random>
struct log_message
{
std::string m_what;
};
{
public :
a_logger_t( context_t ctx )
+ limit_then_abort< log_message >( 100 ) )
, m_started_at( std::chrono::steady_clock::now() )
{}
{
[this]( const log_message & evt ) {
std::cout << "[+" << time_delta()
<< "] -- " << evt.m_what << std::endl;
} );
}
private :
const std::chrono::steady_clock::time_point m_started_at;
std::string time_delta() const
{
auto now = std::chrono::steady_clock::now();
std::ostringstream ss;
ss << double(
std::chrono::duration_cast< std::chrono::milliseconds >(
now - m_started_at ).count()
) / 1000.0 << "ms";
return ss.str();
}
};
{
public :
a_stats_listener_t(
context_t ctx,
, m_logger( std::move( logger ) )
{}
{
controller.mbox(),
return suffixes::work_thread_queue_size() == msg.m_suffix;
} );
controller.mbox(),
&a_stats_listener_t::evt_quantity )
.
event( controller.mbox(),
so_5::send< log_message >( m_logger, "--- DISTRIBUTION STARTED ---" );
} )
.
event( controller.mbox(),
so_5::send< log_message >( m_logger, "--- DISTRIBUTION FINISHED ---" );
} );
}
{
std::chrono::milliseconds( 330 ) );
}
private :
void evt_quantity(
{
std::ostringstream ss;
}
};
{
public :
a_generator_t(
context_t ctx,
std::vector< so_5::mbox_t > workers )
, m_logger( std::move( logger ) )
, m_workers( std::move( workers ) )
, m_turn_pause( 600 )
{}
{
.
event( &a_generator_t::evt_next_turn );
}
{
}
private :
const std::vector< so_5::mbox_t > m_workers;
const std::chrono::milliseconds m_turn_pause;
{
generate_new_requests( random( 100, 200 ) );
}
void generate_new_requests( unsigned int requests )
{
const auto size = m_workers.size();
for( unsigned int i = 0; i != requests; ++i )
std::to_string( requests ) + " requests are sent" );
}
static unsigned int
random( unsigned int left, unsigned int right )
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::uniform_int_distribution< unsigned int >{left, right}(gen);
}
};
{
public :
a_worker_t( context_t ctx )
+ limit_then_drop< msg_start_thinking >( 50 ) )
{}
{
std::this_thread::sleep_for(
std::chrono::milliseconds( 10 ) );
} );
}
};
{
env,
"stats_listener" ).
binder(),
logger->so_direct_mbox() );
env,
"workers",
3 );
std::vector< so_5::mbox_t > workers;
for( int i = 0; i != 5; ++i )
{
worker_disp.binder( worker_binding_params ) );
workers.push_back( w->so_direct_mbox() );
}
env, "generator" );
generator_disp.binder(),
logger->so_direct_mbox(),
std::move( workers ) );
});
std::this_thread::sleep_for( std::chrono::seconds(50) );
}
int main()
{
try
{
}
catch( const std::exception & ex )
{
std::cerr << "Error: " << ex.what() << std::endl;
return 1;
}
return 0;
}
A helper header file for including all public SObjectizer stuff.
virtual void so_define_agent()
Hook on define agent for SObjectizer.
environment_t & so_environment() const noexcept
Access to the SObjectizer Environment which this agent is belong.
const state_t & so_default_state() const
Access to the agent's default state.
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
void so_set_delivery_filter(const mbox_t &mbox, delivery_filter_unique_ptr_t filter)
Set a delivery filter.
Agent * make_agent_with_binder(so_5::disp_binder_shptr_t binder, Args &&... args)
Helper method for simplification of agents creation and binding to the specified dispatcher.
Agent * make_agent(Args &&... args)
Helper method for simplification of agents creation.
disp_binder_shptr_t binder() const noexcept
Get a binder for that dispatcher.
Parameters for binding agents to thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
stats::controller_t & stats_controller()
Access to controller of run-time monitoring.
void stop() noexcept
Send a shutdown signal to the Run-Time.
decltype(auto) introduce_coop(Args &&... args)
Helper method for simplification of cooperation creation and registration.
A message wrapped to be used as type of argument for event handlers.
A base class for agent signals.
const state_t & event(Args &&... args) const
Helper for subscription of event handler in this state.
virtual void turn_on()=0
Turn the monitoring on.
virtual std::chrono::steady_clock::duration set_distribution_period(std::chrono::steady_clock::duration period)=0
Set distribution period.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_obj dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
@ individual
A FIFO for demands only for one agent.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance thread_pool dispatcher.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
void launch(Init_Routine &&init_routine)
Launch a SObjectizer Environment with default parameters.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
void send_delayed(Target &&target, std::chrono::steady_clock::duration pause, Args &&... args)
A utility function for creating and delivering a delayed message to the specified destination.
Notification about finish of stats distribution.
Notification about start of new stats distribution.
A message with value of some quantity.
suffix_t m_suffix
Suffix of data_source name.
T m_value
Actual quantity value.
prefix_t m_prefix
Prefix of data_source name.