SObjectizer  5.8
Loading...
Searching...
No Matches
one_thread/pub.cpp
Go to the documentation of this file.
1/*
2 SObjectizer 5.
3*/
4
5#include <so_5/disp/one_thread/pub.hpp>
6
7#include <so_5/environment.hpp>
8#include <so_5/send_functions.hpp>
9
10#include <so_5/stats/repository.hpp>
11#include <so_5/stats/messages.hpp>
12#include <so_5/stats/std_names.hpp>
13
14#include <so_5/stats/impl/activity_tracking.hpp>
15
16#include <so_5/disp/reuse/work_thread/work_thread.hpp>
17
18#include <so_5/disp/reuse/actual_work_thread_factory_to_use.hpp>
19#include <so_5/disp/reuse/data_source_prefix_helpers.hpp>
20#include <so_5/disp/reuse/make_actual_dispatcher.hpp>
21
22#include <so_5/details/rollback_on_exception.hpp>
23
24#include <atomic>
25
26namespace so_5
27{
28
29namespace disp
30{
31
32namespace one_thread
33{
34
35namespace impl
36{
37
38namespace work_thread = so_5::disp::reuse::work_thread;
39namespace stats = so_5::stats;
40
42{
43
44template< typename Work_Thread >
46 {
47 //! Prefix for dispatcher-related data.
49 //! Prefix for working thread-related data.
51
52 //! Working thread of the dispatcher.
53 Work_Thread & m_work_thread;
54
55 //! Count of agents bound to the dispatcher.
57
59 Work_Thread & work_thread,
60 std::atomic< std::size_t > & agents_bound )
63 {}
64 };
65
66inline void
68 const mbox_t &,
69 const common_data_t< work_thread::work_thread_no_activity_tracking_t > & )
70 {}
71
72inline void
74 const mbox_t & mbox,
75 const common_data_t< work_thread::work_thread_with_activity_tracking_t > & data )
76 {
77 so_5::send< stats::messages::work_thread_activity >(
78 mbox,
79 data.m_base_prefix,
80 stats::suffixes::work_thread_activity(),
81 data.m_work_thread.thread_id(),
82 data.m_work_thread.take_activity_stats() );
83 }
84
85} /* namespace data_source_details */
86
87/*!
88 * \brief Data source for one-thread dispatcher.
89 */
90template< typename Work_Thread >
91class data_source_t final
92 : public stats::source_t
93 , protected data_source_details::common_data_t< Work_Thread >
94 {
95 public :
96 using actual_work_thread_type_t = Work_Thread;
97
99 actual_work_thread_type_t & work_thread,
100 std::atomic< std::size_t > & agents_bound,
101 const std::string_view name_base,
102 const void * pointer_to_disp )
103 : data_source_details::common_data_t< Work_Thread >{
104 work_thread, agents_bound }
105 {
106 // Names of data sources must be formed.
107 using namespace so_5::disp::reuse;
108
109 this->m_base_prefix = make_disp_prefix(
110 "ot", // ot -- one_thread
111 name_base,
112 pointer_to_disp );
113
114 this->m_work_thread_prefix = make_disp_working_thread_prefix(
115 this->m_base_prefix,
116 0 );
117 }
118
119 void
120 distribute( const mbox_t & mbox ) override
121 {
122 so_5::send< stats::messages::quantity< std::size_t > >(
123 mbox,
124 this->m_base_prefix,
125 stats::suffixes::agent_count(),
126 this->m_agents_bound.load( std::memory_order_acquire ) );
127
128 so_5::send< stats::messages::quantity< std::size_t > >(
129 mbox,
130 this->m_work_thread_prefix,
131 stats::suffixes::work_thread_queue_size(),
132 this->m_work_thread.demands_count() );
133
134 data_source_details::track_activity( mbox, *this );
135 }
136 };
137
138//
139// actual_dispatcher_t
140//
141
142/*!
143 * \brief A dispatcher with the single working thread and an event queue.
144 *
145 * \note
146 * This class implements disp_binder_t. It makes possible to use instance
147 * of that class as dispatcher binder (this avoids creation of small
148 * binder objects).
149 *
150 * \since
151 * v.5.6.0
152 */
153template< typename Work_Thread >
154class actual_dispatcher_t final : public disp_binder_t
155 {
156 public:
159 const std::string_view name_base,
160 disp_params_t params )
161 : m_work_thread{
162 acquire_work_thread( params, env.get() ),
163 params.queue_params().lock_factory() }
164 , m_data_source{
165 outliving_mutable(env.get().stats_repository()),
166 m_work_thread,
167 m_agents_bound,
168 name_base,
169 this }
170 {
171 m_work_thread.start();
172 }
173
174 ~actual_dispatcher_t() noexcept override
175 {
176 m_work_thread.shutdown();
177 m_work_thread.wait();
178 }
179
180 // Implementation of methods, inherited from disp_binder.
181 void
183 agent_t & /*agent*/ ) override
184 {
185 // Nothing to do.
186 }
187
188 void
190 agent_t & /*agent*/ ) noexcept override
191 {
192 // Nothing to do.
193 }
194
195 virtual void
197 agent_t & agent ) noexcept override
198 {
199 agent.so_bind_to_dispatcher( *(m_work_thread.get_agent_binding()) );
200 ++m_agents_bound;
201 }
202
203 virtual void
205 agent_t & /*agent*/ ) noexcept override
206 {
207 --m_agents_bound;
208 }
209
210 private:
211 //! Working thread for the dispatcher.
212 Work_Thread m_work_thread;
213
214 /*!
215 * \since
216 * v.5.5.4
217 *
218 * \brief Count of agents bound to this dispatcher.
219 */
221
222 /*!
223 * \brief Data source for run-time monitoring.
224 *
225 * \since
226 * v.5.5.4, v.5.6.0
227 */
228 stats::auto_registered_source_holder_t< data_source_t< Work_Thread > >
230 };
231
232//
233// dispatcher_handle_maker_t
234//
236 {
237 public :
239 make( disp_binder_shptr_t binder ) noexcept
240 {
241 return { std::move( binder ) };
242 }
243 };
244
245} /* namespace impl */
246
247//
248// make_dispatcher
249//
252 environment_t & env,
253 const std::string_view data_sources_name_base,
254 disp_params_t params )
255 {
256 using namespace so_5::disp::reuse;
257
258 using dispatcher_no_activity_tracking_t =
259 impl::actual_dispatcher_t<
260 work_thread::work_thread_no_activity_tracking_t >;
261
262 using dispatcher_with_activity_tracking_t =
263 impl::actual_dispatcher_t<
264 work_thread::work_thread_with_activity_tracking_t >;
265
266 disp_binder_shptr_t binder = so_5::disp::reuse::make_actual_dispatcher<
267 disp_binder_t,
268 dispatcher_no_activity_tracking_t,
269 dispatcher_with_activity_tracking_t >(
270 outliving_mutable(env),
271 data_sources_name_base,
272 std::move(params) );
273
274 return impl::dispatcher_handle_maker_t::make( std::move(binder) );
275 }
276
277} /* namespace one_thread */
278
279} /* namespace disp */
280
281} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
Alias for namespace with traits of event queue.
A handle for one_thread dispatcher.
void preallocate_resources(agent_t &) override
Allocate resources in dispatcher for new agent.
virtual void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
virtual void unbind(agent_t &) noexcept override
Unbind agent from dispatcher.
Work_Thread m_work_thread
Working thread for the dispatcher.
void undo_preallocation(agent_t &) noexcept override
Undo resources allocation.
stats::auto_registered_source_holder_t< data_source_t< Work_Thread > > m_data_source
Data source for run-time monitoring.
std::atomic< std::size_t > m_agents_bound
Count of agents bound to this dispatcher.
actual_dispatcher_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
data_source_t(actual_work_thread_type_t &work_thread, std::atomic< std::size_t > &agents_bound, const std::string_view name_base, const void *pointer_to_disp)
void distribute(const mbox_t &mbox) override
Send appropriate notification about the current value.
static dispatcher_handle_t make(disp_binder_shptr_t binder) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
A holder for data-souce that should be automatically registered and deregistered in registry.
A type for storing prefix of data_source name.
Definition prefix.hpp:32
An interface of data source.
#define SO_5_FUNC
Definition declspec.hpp:48
void track_activity(const mbox_t &, const common_data_t< work_thread::work_thread_no_activity_tracking_t > &)
void track_activity(const mbox_t &mbox, const common_data_t< work_thread::work_thread_with_activity_tracking_t > &data)
Implementation details for dispatcher with single working thread.
Dispatcher with single working thread.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of one_thread dispatcher.
Implemetation details of dispatcher's working thread.
Reusable components for dispatchers.
Event dispatchers.
All stuff related to run-time monitoring and statistics.
Private part of message limit implementation.
Definition agent.cpp:33
stats::prefix_t m_work_thread_prefix
Prefix for working thread-related data.
Work_Thread & m_work_thread
Working thread of the dispatcher.
common_data_t(Work_Thread &work_thread, std::atomic< std::size_t > &agents_bound)
stats::prefix_t m_base_prefix
Prefix for dispatcher-related data.
std::atomic< std::size_t > & m_agents_bound
Count of agents bound to the dispatcher.