SObjectizer 5.8
Loading...
Searching...
No Matches
nef_thread_pool/pub.hpp
Go to the documentation of this file.
1/*
2 * SObjectizer-5
3 */
4
5/*!
6 * \file
7 * \brief Public interface of thread pool dispatcher that
8 * provides noexcept guarantee for scheduling evt_finish demand.
9 *
10 * \since v.5.8.0
11 */
12
13#pragma once
14
15#include <so_5/declspec.hpp>
16
17#include <so_5/disp_binder.hpp>
18
19#include <so_5/disp/mpmc_queue_traits/pub.hpp>
20
21#include <so_5/disp/reuse/work_thread_activity_tracking.hpp>
22#include <so_5/disp/reuse/work_thread_factory_params.hpp>
23#include <so_5/disp/reuse/default_thread_pool_size.hpp>
24
25#include <string_view>
26#include <thread>
27#include <utility>
28
29namespace so_5
30{
31
32namespace disp
33{
34
35namespace nef_thread_pool
36{
37
38/*!
39 * \brief Alias for namespace with traits of event queue.
40 *
41 * \since v.5.8.0
42 */
43namespace queue_traits = so_5::disp::mpmc_queue_traits;
44
45//
46// disp_params_t
47//
48/*!
49 * \brief Parameters for %nef_thread_pool dispatcher.
50 *
51 * \since v.5.8.0
52 */
56 {
57 using activity_tracking_mixin_t = so_5::disp::reuse::
59 using thread_factory_mixin_t = so_5::disp::reuse::
61
62 public :
63 //! Default constructor.
64 disp_params_t() = default;
65
66 friend inline void
68 disp_params_t & a, disp_params_t & b ) noexcept
69 {
70 using std::swap;
71
72 swap(
73 static_cast< activity_tracking_mixin_t & >(a),
74 static_cast< activity_tracking_mixin_t & >(b) );
75
76 swap(
77 static_cast< work_thread_factory_mixin_t & >(a),
78 static_cast< work_thread_factory_mixin_t & >(b) );
79
82 }
83
84 //! Setter for thread count.
86 thread_count( std::size_t count )
87 {
88 m_thread_count = count;
89 return *this;
90 }
91
92 //! Getter for thread count.
93 std::size_t
95 {
96 return m_thread_count;
97 }
98
99 //! Setter for queue parameters.
102 {
103 m_queue_params = std::move(p);
104 return *this;
105 }
106
107 //! Tuner for queue parameters.
108 /*!
109 * Accepts lambda-function or functional object which tunes
110 * queue parameters.
111 \code
112 using namespace so_5::disp::nef_thread_pool;
113 auto disp = make_dispatcher( env,
114 "workers_disp",
115 disp_params_t{}
116 .thread_count( 10 )
117 .tune_queue_params(
118 []( queue_traits::queue_params_t & p ) {
119 p.lock_factory( queue_traits::simple_lock_factory() );
120 } ) );
121 \endcode
122 */
123 template< typename L >
126 {
128 return *this;
129 }
130
131 //! Getter for queue parameters.
132 const queue_traits::queue_params_t &
134 {
135 return m_queue_params;
136 }
137
138 private :
139 //! Count of working threads.
140 /*!
141 * Value 0 means that actual thread will be detected automatically.
142 */
143 std::size_t m_thread_count = { 0 };
144 //! Queue parameters.
146 };
147
148//
149// bind_params_t
150//
151/*!
152 * \brief Parameters for binding agents to %nef_thread_pool dispatcher.
153 *
154 * \since v.5.8.0
155 */
157 {
158 public :
159 //! Set maximum count of demands to be processed at once.
161 max_demands_at_once( std::size_t v )
162 {
164 return *this;
165 }
166
167 //! Get maximum count of demands to do processed at once.
168 [[nodiscard]]
169 std::size_t
171 {
173 }
174
175 private :
176 //! Maximum count of demands to be processed at once.
177 std::size_t m_max_demands_at_once = { 1 };
178 };
179
180//
181// default_thread_pool_size
182//
183using so_5::disp::reuse::default_thread_pool_size;
184
185namespace impl {
186
188
189//
190// basic_dispatcher_iface_t
191//
192/*!
193 * \brief The very basic interface of %thread_pool dispatcher.
194 *
195 * This class contains a minimum that is necessary for implementation
196 * of dispatcher_handle class.
197 *
198 * \since v.5.8.0
199 */
201 : public std::enable_shared_from_this<actual_dispatcher_iface_t>
202 {
203 public :
204 virtual ~basic_dispatcher_iface_t() noexcept = default;
205
206 [[nodiscard]]
207 virtual disp_binder_shptr_t
208 binder( bind_params_t params ) = 0;
209 };
210
211using basic_dispatcher_iface_shptr_t =
212 std::shared_ptr< basic_dispatcher_iface_t >;
213
215
216} /* namespace impl */
217
218//
219// dispatcher_handle_t
220//
221
222/*!
223 * \brief A handle for %nef_thread_pool dispatcher.
224 *
225 * \since v.5.8.0
226 */
227class [[nodiscard]] dispatcher_handle_t
228 {
230
231 //! A reference to actual implementation of a dispatcher.
232 impl::basic_dispatcher_iface_shptr_t m_dispatcher;
233
235 impl::basic_dispatcher_iface_shptr_t dispatcher ) noexcept
236 : m_dispatcher{ std::move(dispatcher) }
237 {}
238
239 //! Is this handle empty?
240 bool
241 empty() const noexcept { return !m_dispatcher; }
242
243 public :
244 dispatcher_handle_t() noexcept = default;
245
246 //! Get a binder for that dispatcher.
247 /*!
248 * Usage example:
249 * \code
250 * using namespace so_5::disp::nef_thread_pool;
251 *
252 * so_5::environment_t & env = ...;
253 * auto disp = make_dispatcher( env );
254 * bind_params_t params;
255 * params.max_demands_at_once( 10u );
256 *
257 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
258 * coop.make_agent_with_binder< some_agent_type >(
259 * disp.binder( params ),
260 * ... );
261 *
262 * coop.make_agent_with_binder< another_agent_type >(
263 * disp.binder( params ),
264 * ... );
265 *
266 * ...
267 * } );
268 * \endcode
269 *
270 * \attention
271 * An attempt to call this method on empty handle is UB.
272 */
273 [[nodiscard]]
274 disp_binder_shptr_t
276 bind_params_t params ) const
277 {
278 return m_dispatcher->binder( params );
279 }
280
281 //! Create a binder for that dispatcher.
282 /*!
283 * This method allows parameters tuning via lambda-function
284 * or other functional objects.
285 *
286 * Usage example:
287 * \code
288 * using namespace so_5::disp::nef_thread_pool;
289 *
290 * so_5::environment_t & env = ...;
291 * env.introduce_coop( [&]( so_5::coop_t & coop ) {
292 * coop.make_agent_with_binder< some_agent_type >(
293 * // Create dispatcher instance.
294 * make_dispatcher( env )
295 * // Make and tune binder for that dispatcher.
296 * .binder( []( auto & params ) {
297 * params.max_demands_at_once( 10u );
298 * } ),
299 * ... );
300 * \endcode
301 *
302 * \attention
303 * An attempt to call this method on empty handle is UB.
304 */
305 template< typename Setter >
306 [[nodiscard]]
307 std::enable_if_t<
308 std::is_invocable_v< Setter, bind_params_t& >,
309 disp_binder_shptr_t >
311 //! Function for the parameters tuning.
312 Setter && params_setter ) const
313 {
315 params_setter( p );
316
317 return this->binder( p );
318 }
319
320 //! Get a binder for that dispatcher with default binding params.
321 /*!
322 * \attention
323 * An attempt to call this method on empty handle is UB.
324 */
325 [[nodiscard]]
326 disp_binder_shptr_t
327 binder() const
328 {
329 return this->binder( bind_params_t{} );
330 }
331
332 //! Is this handle empty?
333 operator bool() const noexcept { return empty(); }
334
335 //! Does this handle contain a reference to dispatcher?
336 bool
337 operator!() const noexcept { return !empty(); }
338
339 //! Drop the content of handle.
340 void
341 reset() noexcept { m_dispatcher.reset(); }
342 };
343
344//
345// make_dispatcher
346//
347/*!
348 * \brief Create an instance %nef_thread_pool dispatcher.
349 *
350 * \par Usage sample
351\code
352using namespace so_5::disp::nef_thread_pool;
353auto disp = make_dispatcher(
354 env,
355 "db_workers_pool",
356 disp_params_t{}
357 .thread_count( 16 )
358 .tune_queue_params( []( queue_traits::queue_params_t & params ) {
359 params.lock_factory( queue_traits::simple_lock_factory() );
360 } ) );
361auto coop = env.make_coop(
362 // The main dispatcher for that coop will be
363 // this instance of nef_thread_pool dispatcher.
364 disp.binder() );
365\endcode
366 *
367 * \since v.5.8.0
368 */
369[[nodiscard]]
372 //! SObjectizer Environment to work in.
373 environment_t & env,
374 //! Value for creating names of data sources for
375 //! run-time monitoring.
376 const std::string_view data_sources_name_base,
377 //! Parameters for the dispatcher.
378 disp_params_t disp_params );
379
380//
381// make_dispatcher
382//
383/*!
384 * \brief Create an instance of %nef_thread_pool dispatcher.
385 *
386 * \par Usage sample
387\code
388auto disp = so_5::disp::nef_thread_pool::make_dispatcher(
389 env,
390 "db_workers_pool",
391 16 );
392auto coop = env.make_coop(
393 // The main dispatcher for that coop will be
394 // this instance of nef_thread_pool dispatcher.
395 disp.binder() );
396\endcode
397 *
398 * \since v.5.8.0
399 */
400[[nodiscard]]
403 //! SObjectizer Environment to work in.
404 environment_t & env,
405 //! Value for creating names of data sources for
406 //! run-time monitoring.
407 const std::string_view data_sources_name_base,
408 //! Count of working threads.
409 std::size_t thread_count )
410 {
411 return make_dispatcher(
412 env,
413 data_sources_name_base,
414 disp_params_t{}.thread_count( thread_count ) );
415 }
416
417/*!
418 * \brief Create an instance of %nef_thread_pool dispatcher.
419 *
420 * \par Usage sample
421\code
422auto disp = so_5::disp::nef_thread_pool::make_dispatcher( env, 16 );
423
424auto coop = env.make_coop(
425 // The main dispatcher for that coop will be
426 // this instance of nef_thread_pool dispatcher.
427 disp.binder() );
428\endcode
429 *
430 * \since v.5.8.0
431 */
432[[nodiscard]]
435 //! SObjectizer Environment to work in.
436 environment_t & env,
437 //! Count of working threads.
438 std::size_t thread_count )
439 {
440 return make_dispatcher( env, std::string_view{}, thread_count );
441 }
442
443//
444// make_dispatcher
445//
446/*!
447 * \brief Create an instance of %nef_thread_pool dispatcher with the default
448 * count of working threads.
449 *
450 * Count of work threads will be detected by default_thread_pool_size()
451 * function.
452 *
453 * \par Usage sample
454\code
455auto disp = so_5::disp::nef_thread_pool::make_instance( env );
456
457auto coop = env.make_coop(
458 // The main dispatcher for that coop will be
459 // this instance of nef_thread_pool dispatcher.
460 disp.binder() );
461\endcode
462 *
463 * \since v.5.8.0
464 */
465[[nodiscard]]
468 //! SObjectizer Environment to work in.
469 environment_t & env )
470 {
471 return make_dispatcher(
472 env,
473 std::string_view{},
475 }
476
477} /* namespace nef_thread_pool */
478
479} /* namespace disp */
480
481} /* namespace so_5 */
A base class for agents.
Definition agent.hpp:673
void so_bind_to_dispatcher(event_queue_t &queue) noexcept
Binding agent to the dispatcher.
Definition agent.cpp:872
The base class for the object with a reference counting.
Container for storing parameters for MPMC queue.
queue_params_t & operator=(queue_params_t &&o) noexcept
Move operator.
friend void swap(queue_params_t &a, queue_params_t &b) noexcept
Parameters for binding agents to nef_thread_pool dispatcher.
bind_params_t & max_demands_at_once(std::size_t v)
Set maximum count of demands to be processed at once.
std::size_t m_max_demands_at_once
Maximum count of demands to be processed at once.
std::size_t query_max_demands_at_once() const
Get maximum count of demands to do processed at once.
Alias for namespace with traits of event queue.
const queue_traits::queue_params_t & queue_params() const
Getter for queue parameters.
std::size_t m_thread_count
Count of working threads.
disp_params_t & tune_queue_params(L tunner)
Tuner for queue parameters.
disp_params_t & set_queue_params(queue_traits::queue_params_t p)
Setter for queue parameters.
queue_traits::queue_params_t m_queue_params
Queue parameters.
disp_params_t & thread_count(std::size_t count)
Setter for thread count.
friend void swap(disp_params_t &a, disp_params_t &b) noexcept
std::size_t thread_count() const
Getter for thread count.
disp_params_t()=default
Default constructor.
A handle for nef_thread_pool dispatcher.
impl::basic_dispatcher_iface_shptr_t m_dispatcher
A reference to actual implementation of a dispatcher.
disp_binder_shptr_t binder() const
Get a binder for that dispatcher with default binding params.
bool operator!() const noexcept
Does this handle contain a reference to dispatcher?
bool empty() const noexcept
Is this handle empty?
void reset() noexcept
Drop the content of handle.
disp_binder_shptr_t binder(bind_params_t params) const
Get a binder for that dispatcher.
operator bool() const noexcept
Is this handle empty?
std::enable_if_t< std::is_invocable_v< Setter, bind_params_t & >, disp_binder_shptr_t > binder(Setter &&params_setter) const
Create a binder for that dispatcher.
dispatcher_handle_t(impl::basic_dispatcher_iface_shptr_t dispatcher) noexcept
void bind(agent_t &agent) noexcept override
Bind agent to dispatcher.
actual_binder_t(actual_dispatcher_iface_shptr_t disp, bind_params_t params) noexcept
const bind_params_t m_params
Binding parameters.
actual_dispatcher_iface_shptr_t m_disp
Dispatcher to be used.
void preallocate_resources(agent_t &agent) override
Allocate resources in dispatcher for new agent.
void undo_preallocation(agent_t &agent) noexcept override
Undo resources allocation.
void unbind(agent_t &agent) noexcept override
Unbind agent from dispatcher.
An actual interface of nef-thread-pool dispatcher.
virtual event_queue_t * query_resources_for_agent(agent_t &agent) noexcept=0
Get resources allocated for an agent.
virtual void undo_preallocation_for_agent(agent_t &agent) noexcept=0
Undo preallocation of resources for a new agent.
virtual void unbind_agent(agent_t &agent) noexcept=0
Unbind agent from the dispatcher.
virtual void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params)=0
Preallocate all necessary resources for a new agent.
void undo_preallocation_for_agent(agent_t &agent) noexcept override
Undo preallocation of resources for a new agent.
event_queue_t * query_resources_for_agent(agent_t &agent) noexcept override
Get resources allocated for an agent.
void preallocate_resources_for_agent(agent_t &agent, const bind_params_t &params) override
Preallocate all necessary resources for a new agent.
actual_dispatcher_implementation_t(outliving_reference_t< environment_t > env, const std::string_view name_base, disp_params_t params)
dispatcher_template_t< Work_Thread > m_impl
Real dispatcher.
void unbind_agent(agent_t &agent) noexcept override
Unbind agent from the dispatcher.
void schedule_on_disp_queue() noexcept override
Perform scheduling of processing of this event queue.
dispatcher_queue_t & m_disp_queue
Dispatcher queue with that the agent queue has to be used.
agent_queue_with_preallocated_finish_demand_t * intrusive_queue_giveout_next() noexcept
Give away a pointer to the next agent_queue.
std::unique_ptr< base_type_t::demand_t > m_finish_demand
A preallocated demand for evt_finish.
agent_queue_with_preallocated_finish_demand_t * m_intrusive_queue_next
The next item in intrusive queue of agent_queues.
agent_queue_with_preallocated_finish_demand_t(outliving_reference_t< dispatcher_queue_t > disp_queue, const bind_params_t &params)
Initializing constructor.
void intrusive_queue_set_next(agent_queue_with_preallocated_finish_demand_t *next) noexcept
Set a pointer to the next agent_queue.
The very basic interface of thread_pool dispatcher.
virtual disp_binder_shptr_t binder(bind_params_t params)=0
static dispatcher_handle_t make(actual_dispatcher_iface_shptr_t disp) noexcept
Multi-producer/Multi-consumer queue of pointers to event queues.
void schedule(T *queue) noexcept
Schedule execution of demands from the queue.
friend void swap(work_thread_activity_tracking_flag_mixin_t &a, work_thread_activity_tracking_flag_mixin_t &b) noexcept
Mixin that holds optional work thread factory.
friend void swap(work_thread_factory_mixin_t &a, work_thread_factory_mixin_t &b) noexcept
void push_preallocated(std::unique_ptr< demand_t > tail_demand) noexcept
Helper method that implements pushing of a new preallocated demand to the queue.
void wait_for_emptyness() noexcept
Wait while queue becomes empty.
Interface for dispatcher binders.
SObjectizer Environment.
An interface of event queue for agent.
Template class for smart reference wrapper on the atomic_refcounted_t.
Helper class for indication of long-lived reference via its type.
Definition outliving.hpp:98
T & get() const noexcept
#define SO_5_FUNC
Definition declspec.hpp:48
Various stuff related to MPMC event queue implementation and tuning.
void adjust_thread_count(disp_params_t &params)
Sets the thread count to default value if used do not specify actual thread count.
SO_5_FUNC dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, disp_params_t params)
Create an instance nef_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, const std::string_view data_sources_name_base, std::size_t thread_count)
Create an instance of nef_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env, std::size_t thread_count)
Create an instance of nef_thread_pool dispatcher.
dispatcher_handle_t make_dispatcher(environment_t &env)
Create an instance of nef_thread_pool dispatcher with the default count of working threads.
Reusable components for dispatchers.
std::size_t default_thread_pool_size()
A helper function for detecting default thread count for thread pool.
std::unique_ptr< Disp_Iface_Type > make_actual_dispatcher(outliving_reference_t< environment_t > env, const std::string_view name_base, Disp_Params_Type disp_params, Args &&...args)
Helper function for creation of dispatcher instance with respect to work thread activity tracking fla...
Reusable implementation of some thread pool dispatcher functionality which can be used in other threa...
Internal implementation details of thread pool dispatcher.
Thread pool dispatcher.
Event dispatchers.
Private part of message limit implementation.
Definition agent.cpp:33
outliving_reference_t< T > outliving_mutable(T &r)
Make outliving_reference wrapper for mutable reference.
Adaptation of common implementation of thread-pool-like dispatcher to the specific of this thread-poo...
static bool is_individual_fifo(const bind_params_t &) noexcept
static constexpr std::string_view dispatcher_type_name() noexcept
static void wait_for_queue_emptyness(agent_queue_with_preallocated_finish_demand_t &queue) noexcept
A description of event execution demand.