#if defined( _MSC_VER )
#if defined( __clang__ )
#pragma clang diagnostic ignored "-Wreserved-id-macro"
#endif
#define _CRT_SECURE_NO_WARNINGS
#endif
#include <algorithm>
#include <chrono>
#include <ctime>
#include <iostream>
#include <iterator>
#include <list>
#include <random>
#include <tuple>
unsigned int random_value( unsigned int left, unsigned int right )
{
std::random_device rd;
std::mt19937 gen{ rd() };
return std::uniform_int_distribution< unsigned int >{left, right}(gen);
}
void imitate_hard_work()
{
std::this_thread::sleep_for(
std::chrono::milliseconds{ random_value( 25, 125 ) } );
}
using clock_type = std::chrono::system_clock;
std::string ms_from_time( const clock_type::time_point & previous_point )
{
using namespace std::chrono;
const auto t = clock_type::now();
if( t > previous_point )
return std::to_string( duration_cast< milliseconds >(
clock_type::now() - previous_point ).count() ) + "ms";
else
return "0ms";
}
struct msg_log
{
std::string m_who;
std::string m_what;
};
void log(
std::string who,
std::string what )
{
}
{
{
public :
{
char local_time_sz[ 32 ];
auto t = clock_type::to_time_t( clock_type::now() );
std::strftime( local_time_sz, sizeof local_time_sz,
"%Y.%m.%d %H:%M:%S", std::localtime( &t ) );
std::cout << "[" << local_time_sz << "] {" << evt.m_who
<< "}: " << evt.m_what << std::endl;
} );
}
};
{
return a->so_direct_mbox();
} );
}
using story_id_type = unsigned long;
{
const clock_type::time_point m_timestamp;
news_board_message_base( clock_type::time_point timestamp )
: m_timestamp( std::move(timestamp) )
{}
};
struct news_board_request_base : public news_board_message_base
{
news_board_request_base(
clock_type::time_point timestamp,
: news_board_message_base( std::move(timestamp) )
, m_reply_to( std::move(reply_to) )
{}
};
struct msg_publish_story_req final : public news_board_request_base
{
const std::string m_title;
const std::string m_content;
msg_publish_story_req(
clock_type::time_point timestamp,
std::string title,
std::string content )
: news_board_request_base(
std::move(timestamp), std::move(reply_to) )
, m_title( std::move(title) )
, m_content( std::move(content) )
{}
};
struct msg_publish_story_resp final : public news_board_message_base
{
story_id_type m_id;
msg_publish_story_resp(
clock_type::time_point timestamp,
story_id_type id )
: news_board_message_base( std::move(timestamp) )
, m_id( id )
{}
};
struct msg_updates_req final : public news_board_request_base
{
story_id_type m_last_id;
msg_updates_req(
clock_type::time_point timestamp,
story_id_type last_id )
: news_board_request_base(
std::move(timestamp), std::move(reply_to) )
, m_last_id( last_id )
{}
};
struct msg_updates_resp final : public news_board_message_base
{
using story_list = std::list< std::tuple< story_id_type, std::string > >;
const story_list m_updates;
msg_updates_resp(
clock_type::time_point timestamp,
story_list updates )
: news_board_message_base( std::move(timestamp) )
, m_updates( std::move(updates) )
{}
};
struct msg_story_content_req final : public news_board_request_base
{
const story_id_type m_id;
msg_story_content_req(
clock_type::time_point timestamp,
story_id_type id )
: news_board_request_base(
std::move(timestamp), std::move(reply_to) )
, m_id( id )
{}
};
struct msg_story_content_resp_ack final : public news_board_message_base
{
const std::string m_content;
msg_story_content_resp_ack(
clock_type::time_point timestamp,
std::string content )
: news_board_message_base( std::move(timestamp) )
, m_content( std::move(content) )
{}
};
struct msg_story_content_resp_nack final : public news_board_message_base
{
msg_story_content_resp_nack(
clock_type::time_point timestamp )
: news_board_message_base( std::move(timestamp) )
{}
};
struct news_board_data
{
struct story_info
{
std::string m_title;
std::string m_content;
};
using story_map = std::map< story_id_type, story_info >;
story_map m_stories;
story_id_type m_last_id = 0;
};
{
public :
:
so_5::agent_t{ ctx + prio }
{}
template< typename Lambda >
void subscribe_event(
Lambda && handler )
{
}
};
void define_news_receiver_agent(
news_board_data & board_data,
{
auto receiver = coop.
make_agent< one_event_handler_t >(
receiver->subscribe_event( board_mbox,
{
auto story_id = ++(board_data.m_last_id);
board_data.m_stories.emplace( story_id,
news_board_data::story_info{ cmd->m_title, cmd->m_content } );
log( logger_mbox,
"board.receiver",
"new story published, id=" + std::to_string( story_id ) +
", title=" + cmd->m_title );
imitate_hard_work();
cmd->m_reply_to,
cmd->m_timestamp,
story_id );
if( 40 < board_data.m_stories.size() )
{
auto removed_id = board_data.m_stories.begin()->first;
board_data.m_stories.erase( board_data.m_stories.begin() );
log( logger_mbox,
"board.receiver",
"old story removed, id=" + std::to_string( removed_id ) );
}
} );
}
void define_news_directory_agent(
news_board_data & board_data,
{
auto directory = coop.
make_agent< one_event_handler_t >(
directory->subscribe_event( board_mbox,
{
log( logger_mbox,
"board.directory",
"request for updates received, last_id=" +
std::to_string( cmd->m_last_id ) );
imitate_hard_work();
msg_updates_resp::story_list new_stories;
std::transform(
board_data.m_stories.upper_bound( cmd->m_last_id ),
std::end( board_data.m_stories ),
std::back_inserter( new_stories ),
[]( const news_board_data::story_map::value_type & v ) {
return std::make_tuple( v.first, v.second.m_title );
} );
log( logger_mbox,
"board.directory",
std::to_string( new_stories.size() ) + " new stories found" );
cmd->m_reply_to,
cmd->m_timestamp,
std::move(new_stories) );
} );
}
void define_story_extractor_agent(
news_board_data & board_data,
{
auto extractor = coop.
make_agent< one_event_handler_t >(
extractor->subscribe_event( board_mbox,
{
log( logger_mbox,
"board.extractor",
"request for story content received, id=" +
std::to_string( cmd->m_id ) );
imitate_hard_work();
auto it = board_data.m_stories.find( cmd->m_id );
if( it != board_data.m_stories.end() )
{
log( logger_mbox,
"board.extractor",
"story {" + std::to_string( cmd->m_id ) + "} found" );
cmd->m_reply_to,
cmd->m_timestamp,
it->second.m_content );
}
else
{
log( logger_mbox,
"board.extractor",
"story {" + std::to_string( cmd->m_id ) + "} NOT found" );
cmd->m_reply_to,
cmd->m_timestamp );
}
} );
}
{
.set( p2, 20 )
.set( p3, 30 )
).binder(),
{
std::make_unique< news_board_data >() );
define_news_receiver_agent(
coop, *board_data, board_mbox, logger_mbox );
define_news_directory_agent(
coop, *board_data, board_mbox, logger_mbox );
define_story_extractor_agent(
coop, *board_data, board_mbox, logger_mbox );
} );
return board_mbox;
}
{
public :
story_publisher(
context_t ctx,
std::string publisher_name,
, m_name( std::move(publisher_name) )
, m_board_mbox( std::move(board_mbox) )
, m_logger_mbox( std::move(logger_mbox) )
{}
{
this >>= st_await_new_story;
st_await_new_story.event(
&story_publisher::evt_time_for_new_story );
st_await_publish_response.event(
&story_publisher::evt_publish_response );
}
{
initiate_time_for_new_story_signal();
}
private :
const state_t st_await_new_story{
this };
const state_t st_await_publish_response{
this };
unsigned int m_stories_counter = 0;
void initiate_time_for_new_story_signal()
{
*this,
std::chrono::milliseconds{ random_value( 100, 1500 ) } );
}
{
auto story_number = ++m_stories_counter;
std::string title = "A story from " + m_name + " #" +
std::to_string( story_number );
std::string content = "This is a content from a story '" +
title +
"' provided by " +
m_name;
log( m_logger_mbox, m_name, "Publish new story: " + title );
clock_type::now(),
std::move(title),
std::move(content) );
this >>= st_await_publish_response;
}
void evt_publish_response( const msg_publish_story_resp & resp )
{
log( m_logger_mbox, m_name, "Publish finished, id=" +
std::to_string( resp.m_id ) + ", publish took " +
ms_from_time( resp.m_timestamp ) );
this >>= st_await_new_story;
initiate_time_for_new_story_signal();
}
};
void create_publisher_coop(
{
{
for( int i = 0; i != 5; ++i )
"publisher" + std::to_string(i+1),
board_mbox,
logger_mbox );
} );
}
{
public :
news_reader(
context_t ctx,
std::string reader_name,
, m_name( std::move(reader_name) )
, m_board_mbox( std::move(board_mbox) )
, m_logger_mbox( std::move(logger_mbox) )
{}
{
this >>= st_sleeping;
st_sleeping.event(
&news_reader::evt_time_for_updates );
st_await_updates.event(
&news_reader::evt_updates_received );
st_await_story_content.event(
&news_reader::evt_story_content );
st_await_story_content.event(
&news_reader::evt_story_not_found );
}
{
initiate_time_for_updates_signal();
}
private :
const state_t st_await_updates{
this };
const state_t st_await_story_content{
this };
story_id_type m_last_id = 0;
msg_updates_resp::story_list m_stories_to_read;
void initiate_time_for_updates_signal()
{
*this,
std::chrono::milliseconds{ random_value( 500, 2500 ) } );
}
{
request_updates();
}
void evt_updates_received( const msg_updates_resp & resp )
{
log( m_logger_mbox,
m_name,
std::to_string( resp.m_updates.size() ) +
" updates received, took " +
ms_from_time( resp.m_timestamp ) );
if( resp.m_updates.empty() )
{
this >>= st_sleeping;
initiate_time_for_updates_signal();
}
else
{
this >>= st_await_story_content;
unsigned int c = 0;
for( auto it = resp.m_updates.rbegin();
it != resp.m_updates.rend() && c != 3; ++it, ++c )
m_stories_to_read.push_front( *it );
request_story_content();
}
}
void evt_story_content( const msg_story_content_resp_ack & resp )
{
const auto & id = std::get<0>( *std::begin(m_stories_to_read) );
const auto & title = std::get<1>( *std::begin(m_stories_to_read) );
log( m_logger_mbox,
m_name,
"read story {" + std::to_string( id ) + "} '" +
title + "': \"" + resp.m_content + "\", took " +
ms_from_time( resp.m_timestamp ) );
remove_current_story_and_read_next();
}
void evt_story_not_found( const msg_story_content_resp_nack & resp )
{
const auto & id = std::get<0>( *std::begin(m_stories_to_read) );
const auto & title = std::get<1>( *std::begin(m_stories_to_read) );
log( m_logger_mbox,
m_name,
"unable to read story {" + std::to_string( id ) + "} '" +
title + "', took " + ms_from_time( resp.m_timestamp ) );
remove_current_story_and_read_next();
}
void request_updates()
{
log( m_logger_mbox,
m_name,
"requesting updates, last_id=" + std::to_string( m_last_id ) );
clock_type::now(),
m_last_id );
this >>= st_await_updates;
}
void request_story_content()
{
auto id = std::get<0>( *std::begin(m_stories_to_read) );
log( m_logger_mbox,
m_name,
"requesting story {" + std::to_string( id ) + "}" );
m_board_mbox,
clock_type::now(),
id );
}
void remove_current_story_and_read_next()
{
m_last_id = std::get<0>( *std::begin(m_stories_to_read) );
m_stories_to_read.pop_front();
if( !m_stories_to_read.empty() )
request_story_content();
else
request_updates();
}
};
void create_reader_coop(
{
{
for( int i = 0; i != 50; ++i )
"reader" + std::to_string(i+1),
board_mbox,
logger_mbox );
} );
}
{
auto logger_mbox = create_logger_coop( env );
auto board_mbox = create_board_coop( env, logger_mbox );
create_publisher_coop( env, board_mbox, logger_mbox );
create_reader_coop( env, board_mbox, logger_mbox );
}
int main()
{
try
{
return 0;
}
catch( const std::exception & x )
{
std::cerr << "Exception: " << x.what() << std::endl;
}
return 2;
}
A helper header file for including all public SObjectizer stuff.
const name_for_agent_t m_name
Optional name for the agent.
virtual void so_define_agent()
Hook on define agent for SObjectizer.
subscription_bind_t so_subscribe_self()
Initiate subscription to agent's direct mbox.
agent_t(environment_t &env)
Constructor.
const mbox_t & so_direct_mbox() const
Get the agent's direct mbox.
subscription_bind_t so_subscribe(const mbox_t &mbox_ref)
Initiate subscription.
virtual void so_evt_start()
Hook on agent start inside SObjectizer.
Agent * make_agent(Args &&... args)
Helper method for simplification of agents creation.
T * take_under_control(std::unique_ptr< T > resource)
Take a user resouce under cooperation control.
A storage of quotes for priorities.
quotes_t & set(priority_t prio, std::size_t quote)
Set a new quote for a priority.
mbox_t create_mbox()
Create an anonymous MPMC mbox.
decltype(auto) introduce_coop(Args &&... args)
Helper method for simplification of cooperation creation and registration.
A base class for agent messages.
A message wrapped to be used as type of argument for event handlers.
A base class for agent signals.
Class for the representing agent state.
std::enable_if< details::is_agent_method_pointer< details::method_arity::unary, Method_Pointer >::value, subscription_bind_t & >::type event(Method_Pointer pfn, thread_safety_t thread_safety=not_thread_safe)
Make subscription to the message.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of active_group dispatcher.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
Dispatcher which handles events of different priorities in round-robin maner.
Helpers for working with priorities.
Private part of message limit implementation.
void launch(Init_Routine &&init_routine)
Launch a SObjectizer Environment with default parameters.
priority_t
Definition of supported priorities.
void send(Target &&to, Args &&... args)
A utility function for creating and delivering a message or a signal.
void send_delayed(Target &&target, std::chrono::steady_clock::duration pause, Args &&... args)
A utility function for creating and delivering a delayed message to the specified destination.