SObjectizer 5.8
Loading...
Searching...
No Matches
adv_thread_pool/pub.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \since
7 * v.5.4.0
8 *
9 * \file
10 * \brief Public interface of advanced thread pool dispatcher.
11 */
12
13#pragma once
14
15#include <so_5/declspec.hpp>
16
17#include <so_5/disp_binder.hpp>
18
19#include <so_5/disp/mpmc_queue_traits/pub.hpp>
20
21#include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
22#include <so_5/disp/reuse/work_thread_factory_params.hpp>
23#include <so_5/disp/reuse/default_thread_pool_size.hpp>
24
25#include <string_view>
26#include <utility>
27#include <thread>
28
29namespace so_5
30{
31
32namespace disp
33{
34
35namespace adv_thread_pool
36{
37
38/*!
39 * \brief Alias for namespace with traits of event queue.
40 *
41 * \since
42 * v.5.5.11
43 */
44namespace queue_traits = so_5::disp::mpmc_queue_traits;
45
46//
47// disp_params_t
48//
49/*!
50 * \brief Parameters for %adv_thread_pool dispatcher.
51 *
52 * \since
53 * v.5.5.11
54 */
58 {
59 using activity_tracking_mixin_t = so_5::disp::reuse::
61 using thread_factory_mixin_t = so_5::disp::reuse::
63
64 public :
65 //! Default constructor.
67
68 friend inline void
69 swap( disp_params_t & a, disp_params_t & b ) noexcept
70 {
71 using std::swap;
72
73 swap(
74 static_cast< activity_tracking_mixin_t & >(a),
75 static_cast< activity_tracking_mixin_t & >(b) );
76
77 swap(
78 static_cast< work_thread_factory_mixin_t & >(a),
79 static_cast< work_thread_factory_mixin_t & >(b) );
80
83 }
84
85 //! Setter for thread count.
87 thread_count( std::size_t count )
88 {
89 m_thread_count = count;
90 return *this;
91 }
92
93 //! Getter for thread count.
94 std::size_t
96 {
97 return m_thread_count;
98 }
99
100 //! Setter for queue parameters.
103 {
104 m_queue_params = std::move(p);
105 return *this;
106 }
107
108 //! Tuner for queue parameters.
109 /*!
110 * Accepts lambda-function or functional object which tunes
111 * queue parameters.
112 \code
113 using namespace so_5::disp::thread_pool;
114 auto disp = make_dispatcher( env,
115 "workers_disp",
116 disp_params_t{}
117 .thread_count( 10 )
118 .tune_queue_params(
119 []( queue_traits::queue_params_t & p ) {
120 p.lock_factory( queue_traits::simple_lock_factory() );
121 } ) );
122 \endcode
123 */
124 template< typename L >
127 {
129 return *this;
130 }
131
132 //! Getter for queue parameters.
133 const queue_traits::queue_params_t &
135 {
136 return m_queue_params;
137 }
138
139 private :
140 //! Count of working threads.
141 /*!
142 * Value 0 means that actual thread will be detected automatically.
143 */
144 std::size_t m_thread_count = { 0 };
145 //! Queue parameters.
147 };
148
149//
150// fifo_t
151//
152/*!
153 * \brief Type of FIFO mechanism for agent's demands.
154 *
155 * \since
156 * v.5.4.0
157 */
158enum class fifo_t
159 {
160 //! A FIFO for demands for all agents from the same cooperation.
161 /*!
162 * It means that agents from the same cooperation for which this
163 * FIFO mechanism is used will be worked on the same thread.
164 *
165 * If the same disp_binder with fifo_t::cooperation is used for
166 * several cooperations then each coop will have a separate
167 * event queue (thus agents from different coops may work on
168 * different worker threads).
169 */
171 //! A FIFO for demands only for one agent.
172 /*!
173 * It means that FIFO is only supported for the concrete agent.
174 * If several agents from a cooperation have this FIFO type they
175 * will process demands independently and on different threads.
176 */
178 };
179
180//
181// bind_params_t
182//
183/*!
184 * \brief Parameters for binding agents to %adv_thread_pool dispatcher.
185 *
186 * \since
187 * v.5.5.11
188 */
190 {
191 public :
192 //! Set FIFO type.
195 {
196 m_fifo = v;
197 return *this;
198 }
199
200 //! Get FIFO type.
201 fifo_t
203 {
204 return m_fifo;
205 }
206
207 private :
208 //! FIFO type.
210 };
211
212//
213// default_thread_pool_size
214//
215using so_5::disp::reuse::default_thread_pool_size;
216
217namespace impl {
218
220
221//
222// basic_dispatcher_iface_t
223//
224/*!
225 * \brief The very basic interface of %adv_thread_pool dispatcher.
226 *
227 * This class contains a minimum that is necessary for implementation
228 * of dispatcher_handle class.
229 *
230 * \since
231 * v.5.6.0
232 */
234 : public std::enable_shared_from_this<actual_dispatcher_iface_t>
235 {
236 public :
237 virtual ~basic_dispatcher_iface_t() noexcept = default;
238
239 [[nodiscard]]
240 virtual disp_binder_shptr_t
241 binder( bind_params_t params ) = 0;
242 };
243
244using basic_dispatcher_iface_shptr_t =
245 std::shared_ptr< basic_dispatcher_iface_t >;
246
248
249} /* namespace impl */
250
251//
252// dispatcher_handle_t
253//
254
255/*!
256 * \since
257 * v.5.6.0
258 *
259 * \brief A handle for %adv_thread_pool dispatcher.
260 */
261class [[nodiscard]] dispatcher_handle_t
262 {
264
265 //! A reference to actual implementation of a dispatcher.
266 impl::basic_dispatcher_iface_shptr_t m_dispatcher;
267
269 impl::basic_dispatcher_iface_shptr_t dispatcher ) noexcept
270 : m_dispatcher{ std::move(dispatcher) }
271 {}
272
273 //! Is this handle empty?
274 bool
275 empty() const noexcept { return !m_dispatcher; }
276
277 public :
278 dispatcher_handle_t() noexcept = default;
279
280 //! Get a binder for that dispatcher.
281 /*!
282 * Usage example:
283 * \code
284 * using namespace so_5::disp::adv_thread_pool;
285 *
286 * so_5::environment_t & env = ...;
287 * auto disp = make_dispatcher( env );
288 * bind_params_t params;
289 * params.fifo( fifo_t::individual );
290 *
291 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
292 * coop.make_agent_with_binder< some_agent_type >(
293 * disp.binder( params ),
294 * ... );
295 *
296 * coop.make_agent_with_binder< another_agent_type >(
297 * disp.binder( params ),
298 * ... );
299 *
300 * ...
301 * } );
302 * \endcode
303 *
304 * \attention
305 * An attempt to call this method on empty handle is UB.
306 */
307 [[nodiscard]]
308 disp_binder_shptr_t
310 bind_params_t params ) const
311 {
312 return m_dispatcher->binder( params );
313 }
314
315 //! Create a binder for that dispatcher.
316 /*!
317 * This method allows parameters tuning via lambda-function
318 * or other functional objects.
319 *
320 * Usage example:
321 * \code
322 * using namespace so_5::disp::adv_thread_pool;
323 *
324 * so_5::environment_t & env = ...;
325 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
326 * coop.make_agent_with_binder< some_agent_type >(
327 * // Create dispatcher instance.
328 * make_dispatcher( env )
329 * // Make and tune binder for that dispatcher.
330 * .binder( []( auto & params ) {
331 * params.fifo( fifo_t::individual );
332 * } ),
333 * ... );
334 * \endcode
335 *
336 * \attention
337 * An attempt to call this method on empty handle is UB.
338 */
339 template< typename Setter >
340 [[nodiscard]]
341 std::enable_if_t<
342 std::is_invocable_v< Setter, bind_params_t& >,
343 disp_binder_shptr_t >
345 //! Function for the parameters tuning.
346 Setter && params_setter ) const
347 {
349 params_setter( p );
350
351 return this->binder( p );
352 }
353
354 //! Get a binder for that dispatcher with default binding params.
355 /*!
356 * \attention
357 * An attempt to call this method on empty handle is UB.
358 */
359 [[nodiscard]]
360 disp_binder_shptr_t
361 binder() const
362 {
363 return this->binder( bind_params_t{} );
364 }
365
366 //! Is this handle empty?
367 operator bool() const noexcept { return empty(); }
368
369 //! Does this handle contain a reference to dispatcher?
370 bool
371 operator!() const noexcept { return !empty(); }
372
373 //! Drop the content of handle.
374 void
375 reset() noexcept { m_dispatcher.reset(); }
376 };
377
378//
379// make_dispatcher
380//
381/*!
382 * \brief Create an instance of %adv_thread_pool dispatcher.
383 *
384 * \par Usage sample
385\code
386using namespace so_5::disp::adv_thread_pool;
387auto disp = make_dispatcher(
388 env,
389 "db_workers_pool",
390 disp_params_t{}
391 .thread_count( 16 )
392 .tune_queue_params( []( queue_traits::queue_params_t & params ) {
393 params.lock_factory( queue_traits::simple_lock_factory() );
394 } ) );
395auto coop = env.make_coop(
396 // The main dispatcher for that coop will be
397 // this instance of adv_thread_pool dispatcher.
398 disp.binder() );
399\endcode
400 *
401 * \since
402 * v.5.6.0
403 */
406 //! SObjectizer Environment to work in.
407 environment_t & env,
408 //! Value for creating names of data sources for
409 //! run-time monitoring.
410 const std::string_view data_sources_name_base,
411 //! Parameters for the dispatcher.
412 disp_params_t disp_params );
413
414//
415// make_dispatcher
416//
417/*!
418 * \brief Create an instance of %adv_thread_pool dispatcher.
419 *
420 * \par Usage sample
421\code
422auto disp = so_5::disp::adv_thread_pool::make_dispatcher(
423 env,
424 "req_processors",
425 16 );
426auto coop = env.make_coop(
427 // The main dispatcher for that coop will be
428 // this instance of adv_thread_pool dispatcher.
429 disp.binder() );
430\endcode
431 *
432 * \since
433 * v.5.6.0
434 */
437 //! SObjectizer Environment to work in.
438 environment_t & env,
439 //! Value for creating names of data sources for
440 //! run-time monitoring.
441 const std::string_view data_sources_name_base,
442 //! Count of working threads.
443 std::size_t thread_count )
444 {
445 return make_dispatcher(
446 env,
447 data_sources_name_base,
448 disp_params_t{}.thread_count( thread_count ) );
449 }
450
451/*!
452 * \brief Create an instance of %adv_thread_pool dispatcher.
453 *
454 * \par Usage sample
455\code
456auto disp = so_5::disp::adv_thread_pool::make_dispatcher( env, 16 );
457
458auto coop = env.make_coop(
459 // The main dispatcher for that coop will be
460 // this instance of adv_thread_pool dispatcher.
461 disp.binder() );
462\endcode
463 *
464 * \since
465 * v.5.6.0
466 */
469 //! SObjectizer Environment to work in.
470 environment_t & env,
471 //! Count of working threads.
472 std::size_t thread_count )
473 {
474 return make_dispatcher( env, std::string_view{}, thread_count );
475 }
476
477//
478// make_dispatcher
479//
480/*!
481 * \brief Create an instance of %adv_thread_pool dispatcher with the default
482 * count of work threads.
483 *
484 * Count of work threads will be detected by default_thread_pool_size()
485 * function.
486 *
487 * \par Usage sample
488\code
489auto disp = so_5::disp::adv_thread_pool::make_dispatcher( env );
490
491auto coop = env.make_coop(
492 // The main dispatcher for that coop will be
493 // this instance of adv_thread_pool dispatcher.
494 disp.binder() );
495\endcode
496 *
497 * \since
498 * v.5.6.0
499 */
502 //! SObjectizer Environment to work in.
503 environment_t & env )
504 {
506 }
507
508} /* namespace adv_thread_pool */
509
510} /* namespace disp */
511
512} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
Parameters for binding agents to adv_thread_pool dispatcher.
bind_params_t & fifo(fifo_t v)
Set FIFO type.
Alias for namespace with traits of event queue.
std::size_t thread_count() const
Getter for thread count.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
std::size_t m_thread_count
Count of working threads.
queue_traits::queue_params_t m_queue_params
Queue parameters.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
A handle for adv_thread_pool dispatcher.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &&params_setter) const
Create a binder for that dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
bool empty() const noexcept
Is this handle empty?
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void reset() noexcept
Drop the content of handle.
operator bool() const noexcept
Is this handle empty?
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
const bind_params_t m_params
Binding parameters.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
The very basic interface of adv_thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
#define SO_5_FUNC
Definition declspec.hpp:48
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
Internal implementation details of advanced thread pool dispatcher.
Advanced thread pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of adv_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of adv_thread_pool dispatcher with the default count of work threads.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance of adv_thread_pool dispatcher.
fifo_t
Type of FIFO mechanism for agent's demands.
@ cooperation
A FIFO for demands for all agents from the same cooperation.
@ individual
A FIFO for demands only for one agent.
Various stuff related to MPMC event queue implementation and tuning.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Event dispatchers.
Private part of message limit implementation.
Definition agent.cpp:33
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.