SObjectizer-5 Extra
Loading...
Searching...
No Matches
so_5_extra
mboxes
round_robin.hpp
Go to the documentation of this file.
1
/*!
2
* \file
3
* \brief Implementation of round-robin mbox.
4
*/
5
6
#
pragma
once
7
8
#
include
<
so_5_extra
/
error_ranges
.
hpp
>
9
10
#
include
<
so_5
/
impl
/
msg_tracing_helpers
.
hpp
>
11
12
namespace
so_5
{
13
14
namespace
extra
{
15
16
namespace
mboxes
{
17
18
namespace
round_robin
{
19
20
namespace
errors
{
21
22
/*!
23
* \brief An attempt to set delivery filter to round_robin mbox.
24
*
25
* \since
26
* v.1.0.1
27
*/
28
const
int
rc_delivery_filter_cannot_be_used_on_round_robin_mbox
=
29
so_5
::
extra
::
errors
::
mboxes_round_robin_errors
;
30
31
}
/* namespace errors */
32
33
namespace
details
{
34
35
//
36
// subscriber_info_t
37
//
38
39
/*!
40
* \brief An information block about one subscriber.
41
*/
42
struct
subscriber_info_t
43
{
44
//! Subscriber.
45
std
::
reference_wrapper
<
so_5
::
abstract_message_sink_t
>
m_sink
;
46
47
//! Constructor for the case when subscriber info is being
48
//! created during event subscription.
49
explicit
subscriber_info_t
(
50
so_5::abstract_message_sink_t & sink )
51
:
m_sink
(
sink
)
52
{}
53
};
54
55
//
56
// subscriber_container_t
57
//
58
59
/*!
60
* \brief Type of container for holding subscribers for one message type.
61
*/
62
class
subscriber_container_t
63
{
64
public
:
65
using
storage_t
=
std
::
vector
<
subscriber_info_t
>;
66
67
bool
68
empty
()
const
noexcept
69
{
70
return
m_subscribers.empty();
71
}
72
73
void
74
emplace_back
(
75
so_5::abstract_message_sink_t & sink )
76
{
77
m_subscribers.emplace_back( sink );
78
}
79
80
storage_t
::
iterator
81
end
()
noexcept
82
{
83
return
m_subscribers.end();
84
}
85
86
storage_t
::
iterator
87
find
(
so_5
::
abstract_message_sink_t
&
sink
)
noexcept
88
{
89
return
std::find_if( std::begin(m_subscribers), std::end(m_subscribers),
90
[&](
const
auto
& info ) {
91
return
std::addressof( info.m_sink.get() ) ==
92
std::addressof( sink );
93
} );
94
}
95
96
void
97
erase
( storage_t::iterator it )
noexcept
98
{
99
m_subscribers.erase(it);
100
ensure_valid_current_subscriber_index
(
)
;
101
}
102
103
const
subscriber_info_t
&
104
current_subscriber
()
const
noexcept
105
{
106
return
m_subscribers[ m_current_subscriber ];
107
}
108
109
void
110
switch_current_subscriber
()
noexcept
111
{
112
++m_current_subscriber;
113
ensure_valid_current_subscriber_index
(
)
;
114
}
115
116
private
:
117
storage_t
m_subscribers
;
118
storage_t
::
size_type
m_current_subscriber
{ 0 };
119
120
void
121
ensure_valid_current_subscriber_index
()
noexcept
122
{
123
if
( m_current_subscriber >= m_subscribers.size() )
124
m_current_subscriber = 0;
125
}
126
};
127
128
//
129
// data_t
130
//
131
132
/*!
133
* \brief Common part of round-robin mbox implementation.
134
*
135
* This part depends only on Lock type but not on tracing facilities.
136
*
137
* \tparam Lock type of lock object to be used.
138
*/
139
template
<
typename
Lock >
140
struct
data_t
141
{
142
//! Initializing constructor.
143
data_t
(
144
environment_t & env,
145
mbox_id_t id )
146
:
m_env
{
env
}
147
,
m_id
{
id
}
148
{}
149
150
//! SObjectizer Environment to work in.
151
environment_t
&
m_env
;
152
153
//! ID of this mbox.
154
const
mbox_id_t
m_id
;
155
156
//! Object lock.
157
Lock
m_lock
;
158
159
/*!
160
* \brief Map from message type to subscribers.
161
*/
162
using
messages_table_t
=
std
::
map
<
163
std
::
type_index
,
164
subscriber_container_t
>;
165
166
//! Map of subscribers to messages.
167
messages_table_t
m_subscribers
;
168
};
169
170
//
171
// mbox_template_t
172
//
173
174
//! A template with implementation of round-robin mbox.
175
/*!
176
* \tparam Lock_Type type of lock to be used for thread safety.
177
* \tparam Tracing_Base base class with implementation of message
178
* delivery tracing methods. Expected to be tracing_enabled_base or
179
* tracing_disabled_base from so_5::impl::msg_tracing_helpers namespace.
180
*/
181
template
<
182
typename
Lock_Type,
183
typename
Tracing_Base >
184
class
mbox_template_t
185
:
public
abstract_message_box_t
186
,
private
data_t
< Lock_Type >
187
,
private
Tracing_Base
188
{
189
using
data_type =
data_t
< Lock_Type >;
190
191
public
:
192
//! Initializing constructor.
193
template
<
typename
... Tracing_Args >
194
mbox_template_t
(
195
//! SObjectizer Environment to work in.
196
environment_t & env,
197
//! ID of this mbox.
198
mbox_id_t id,
199
//! Optional parameters for Tracing_Base's constructor.
200
Tracing_Args &&... args )
201
:
data_type
{
env
,
id
}
202
,
Tracing_Base
{
std
::
forward
<
Tracing_Args
>(
args
)... }
203
{}
204
205
mbox_id_t
206
id
()
const
override
207
{
208
return
this
->
m_id
;
209
}
210
211
void
212
subscribe_event_handler
(
213
const
std::type_index & msg_type,
214
so_5::abstract_message_sink_t & subscriber )
override
215
{
216
std
::
lock_guard
<
Lock_Type
>
lock
(
this
->
m_lock
);
217
218
auto
it
=
this
->
m_subscribers
.
find
(
msg_type
);
219
if
(
it
==
this
->
m_subscribers
.
end
() )
220
{
221
// There isn't such message type yet.
222
subscriber_container_t
container
;
223
container
.
emplace_back
(
subscriber
);
224
225
this
->
m_subscribers
.
emplace
(
msg_type
,
std
::
move
(
container
) );
226
}
227
else
228
{
229
auto
&
sinks
=
it
->
second
;
230
231
auto
pos
=
sinks
.
find
(
subscriber
);
232
if
(
pos
==
sinks
.
end
() )
233
// There is no subscriber in the container.
234
// It must be added.
235
sinks
.
emplace_back
(
subscriber
);
236
}
237
}
238
239
void
240
unsubscribe_event_handler
(
241
const
std::type_index & msg_type,
242
so_5::abstract_message_sink_t & subscriber )
noexcept
override
243
{
244
std
::
lock_guard
<
Lock_Type
>
lock
(
this
->
m_lock
);
245
246
auto
it
=
this
->
m_subscribers
.
find
(
msg_type
);
247
if
(
it
!=
this
->
m_subscribers
.
end
() )
248
{
249
auto
&
sinks
=
it
->
second
;
250
251
auto
pos
=
sinks
.
find
(
subscriber
);
252
if
(
pos
!=
sinks
.
end
() )
253
{
254
sinks
.
erase
(
pos
);
255
}
256
257
if
(
sinks
.
empty
() )
258
this
->
m_subscribers
.
erase
(
it
);
259
}
260
}
261
262
std
::
string
263
query_name
()
const
override
264
{
265
std
::
ostringstream
s
;
266
s
<<
"<mbox:type=RRMPSC:id="
<<
this
->
m_id
<<
">"
;
267
268
return
s
.
str
();
269
}
270
271
mbox_type_t
272
type
()
const
override
273
{
274
return
mbox_type_t
::
multi_producer_single_consumer
;
275
}
276
277
void
278
do_deliver_message
(
279
so_5::message_delivery_mode_t delivery_mode,
280
const
std::type_index & msg_type,
281
const
so_5::message_ref_t & message,
282
unsigned
int
redirection_deep )
override
283
{
284
typename
Tracing_Base
::
deliver_op_tracer
tracer
{
285
*
this
,
// as Tracing_Base
286
*
this
,
// as abstract_message_box_t
287
"deliver_message"
,
288
delivery_mode
,
289
msg_type
,
message
,
redirection_deep
};
290
291
do_deliver_message_impl
(
292
tracer
,
293
delivery_mode
,
294
msg_type
,
295
message
,
296
redirection_deep
);
297
}
298
299
void
300
set_delivery_filter
(
301
const
std
::
type_index
&
/*msg_type*/
,
302
const
so_5
::
delivery_filter_t
&
/*filter*/
,
303
so_5
::
abstract_message_sink_t
&
/*subscriber*/
)
override
304
{
305
using
namespace
so_5
::
extra
::
mboxes
::
round_robin
::
errors
;
306
307
SO_5_THROW_EXCEPTION
(
308
rc_delivery_filter_cannot_be_used_on_round_robin_mbox
,
309
"set_delivery_filter is called for round_robin-mbox"
);
310
}
311
312
void
313
drop_delivery_filter
(
314
const
std
::
type_index
&
/*msg_type*/
,
315
so_5
::
abstract_message_sink_t
&
/*subscriber*/
)
noexcept
override
316
{
317
// Nothing to do.
318
}
319
320
so_5
::
environment_t
&
321
environment
()
const
noexcept
override
322
{
323
return
this
->
m_env
;
324
}
325
326
private
:
327
void
328
do_deliver_message_impl
(
329
typename
Tracing_Base::deliver_op_tracer
const
& tracer,
330
so_5::message_delivery_mode_t delivery_mode,
331
const
std::type_index & msg_type,
332
const
so_5::message_ref_t & message,
333
unsigned
int
redirection_deep )
334
{
335
std
::
lock_guard
<
Lock_Type
>
lock
(
this
->
m_lock
);
336
337
auto
it
=
this
->
m_subscribers
.
find
(
msg_type
);
338
if
(
it
!=
this
->
m_subscribers
.
end
() )
339
{
340
const
auto
&
subscriber_info
=
it
->
second
.
current_subscriber
();
341
it
->
second
.
switch_current_subscriber
();
342
343
do_deliver_message_to_subscriber
(
344
subscriber_info
,
345
tracer
,
346
delivery_mode
,
347
msg_type
,
348
message
,
349
redirection_deep
);
350
}
351
else
352
tracer
.
no_subscribers
();
353
}
354
355
void
356
do_deliver_message_to_subscriber
(
357
const
subscriber_info_t
& subscriber_info,
358
typename
Tracing_Base::deliver_op_tracer
const
& tracer,
359
so_5::message_delivery_mode_t delivery_mode,
360
const
std::type_index & msg_type,
361
const
so_5::message_ref_t & message,
362
unsigned
int
redirection_deep )
363
{
364
using
namespace
so_5
::
message_limit
::
impl
;
365
366
subscriber_info
.
m_sink
.
get
().
push_event
(
367
this
->
m_id
,
368
delivery_mode
,
369
msg_type
,
370
message
,
371
redirection_deep
,
372
tracer
.
overlimit_tracer
() );
373
}
374
};
375
376
}
/* namespace details */
377
378
//
379
// make_mbox
380
//
381
/*!
382
* \brief Create an implementation of round-robin mbox.
383
*
384
* Usage example:
385
* \code
386
so_5::environment_t & env = ...;
387
const so_5::mbox_t rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );
388
...
389
so_5::send< some_message >( rrmbox, ... );
390
* \endcode
391
*
392
* \tparam Lock_Type type of lock to be used for thread safety.
393
*/
394
template
<
typename
Lock_Type =
std
::
mutex
>
395
mbox_t
396
make_mbox
(
environment_t
&
env
)
397
{
398
return
env
.
make_custom_mbox
(
399
[](
const
mbox_creation_data_t
&
data
) {
400
mbox_t
result
;
401
402
if
(
data
.
m_tracer
.
get
().
is_msg_tracing_enabled
() )
403
{
404
using
T
=
details
::
mbox_template_t
<
405
Lock_Type
,
406
::
so_5
::
impl
::
msg_tracing_helpers
::
tracing_enabled_base
>;
407
408
result
=
mbox_t
{
std
::
make_unique
<
T
>(
409
data
.
m_env
.
get
(),
410
data
.
m_id
,
411
data
.
m_tracer
)
412
};
413
}
414
else
415
{
416
using
T
=
details
::
mbox_template_t
<
417
Lock_Type
,
418
::
so_5
::
impl
::
msg_tracing_helpers
::
tracing_disabled_base
>;
419
420
result
=
mbox_t
{
std
::
make_unique
<
T
>(
421
data
.
m_env
.
get
(),
422
data
.
m_id
)
423
};
424
}
425
426
return
result
;
427
} );
428
}
429
430
}
/* namespace round_robin */
431
432
}
/* namespace mboxes */
433
434
}
/* namespace extra */
435
436
}
/* namespace so_5 */
so_5::extra::mboxes::round_robin::details::mbox_template_t
A template with implementation of round-robin mbox.
Definition
round_robin.hpp:188
so_5::extra::mboxes::round_robin::details::mbox_template_t::unsubscribe_event_handler
void unsubscribe_event_handler(const std::type_index &msg_type, so_5::abstract_message_sink_t &subscriber) noexcept override
Definition
round_robin.hpp:240
so_5::extra::mboxes::round_robin::details::mbox_template_t::type
mbox_type_t type() const override
Definition
round_robin.hpp:272
so_5::extra::mboxes::round_robin::details::mbox_template_t::set_delivery_filter
void set_delivery_filter(const std::type_index &, const so_5::delivery_filter_t &, so_5::abstract_message_sink_t &) override
Definition
round_robin.hpp:300
so_5::extra::mboxes::round_robin::details::mbox_template_t::drop_delivery_filter
void drop_delivery_filter(const std::type_index &, so_5::abstract_message_sink_t &) noexcept override
Definition
round_robin.hpp:313
so_5::extra::mboxes::round_robin::details::mbox_template_t::do_deliver_message
void do_deliver_message(so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep) override
Definition
round_robin.hpp:278
so_5::extra::mboxes::round_robin::details::mbox_template_t::id
mbox_id_t id() const override
Definition
round_robin.hpp:206
so_5::extra::mboxes::round_robin::details::mbox_template_t::do_deliver_message_to_subscriber
void do_deliver_message_to_subscriber(const subscriber_info_t &subscriber_info, typename Tracing_Base::deliver_op_tracer const &tracer, so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep)
Definition
round_robin.hpp:356
so_5::extra::mboxes::round_robin::details::mbox_template_t::do_deliver_message_impl
void do_deliver_message_impl(typename Tracing_Base::deliver_op_tracer const &tracer, so_5::message_delivery_mode_t delivery_mode, const std::type_index &msg_type, const so_5::message_ref_t &message, unsigned int redirection_deep)
Definition
round_robin.hpp:328
so_5::extra::mboxes::round_robin::details::mbox_template_t::query_name
std::string query_name() const override
Definition
round_robin.hpp:263
so_5::extra::mboxes::round_robin::details::mbox_template_t::subscribe_event_handler
void subscribe_event_handler(const std::type_index &msg_type, so_5::abstract_message_sink_t &subscriber) override
Definition
round_robin.hpp:212
so_5::extra::mboxes::round_robin::details::mbox_template_t::environment
so_5::environment_t & environment() const noexcept override
Definition
round_robin.hpp:321
so_5::extra::mboxes::round_robin::details::mbox_template_t::mbox_template_t
mbox_template_t(environment_t &env, mbox_id_t id, Tracing_Args &&... args)
Initializing constructor.
Definition
round_robin.hpp:194
so_5::extra::mboxes::round_robin::details::subscriber_container_t
Type of container for holding subscribers for one message type.
Definition
round_robin.hpp:63
so_5::extra::mboxes::round_robin::details::subscriber_container_t::current_subscriber
const subscriber_info_t & current_subscriber() const noexcept
Definition
round_robin.hpp:104
so_5::extra::mboxes::round_robin::details::subscriber_container_t::m_current_subscriber
storage_t::size_type m_current_subscriber
Definition
round_robin.hpp:118
so_5::extra::mboxes::round_robin::details::subscriber_container_t::erase
void erase(storage_t::iterator it) noexcept
Definition
round_robin.hpp:97
so_5::extra::mboxes::round_robin::details::subscriber_container_t::emplace_back
void emplace_back(so_5::abstract_message_sink_t &sink)
Definition
round_robin.hpp:74
so_5::extra::mboxes::round_robin::details::subscriber_container_t::find
storage_t::iterator find(so_5::abstract_message_sink_t &sink) noexcept
Definition
round_robin.hpp:87
so_5::extra::mboxes::round_robin::details::subscriber_container_t::switch_current_subscriber
void switch_current_subscriber() noexcept
Definition
round_robin.hpp:110
so_5::extra::mboxes::round_robin::details::subscriber_container_t::end
storage_t::iterator end() noexcept
Definition
round_robin.hpp:81
so_5::extra::mboxes::round_robin::details::subscriber_container_t::empty
bool empty() const noexcept
Definition
round_robin.hpp:68
so_5::extra::mboxes::round_robin::details::subscriber_container_t::ensure_valid_current_subscriber_index
void ensure_valid_current_subscriber_index() noexcept
Definition
round_robin.hpp:121
so_5::extra::mboxes::round_robin::details::subscriber_container_t::m_subscribers
storage_t m_subscribers
Definition
round_robin.hpp:117
so_5::extra::errors
Definition
error_ranges.hpp:13
so_5::extra::errors::mboxes_round_robin_errors
const int mboxes_round_robin_errors
Starting point for errors of mboxes::round_robin submodule.
Definition
error_ranges.hpp:92
so_5::extra::mboxes::round_robin::details
Definition
round_robin.hpp:33
so_5::extra::mboxes::round_robin::errors
Definition
round_robin.hpp:20
so_5::extra::mboxes::round_robin::errors::rc_delivery_filter_cannot_be_used_on_round_robin_mbox
const int rc_delivery_filter_cannot_be_used_on_round_robin_mbox
An attempt to set delivery filter to round_robin mbox.
Definition
round_robin.hpp:28
so_5::extra::mboxes::round_robin
Definition
round_robin.hpp:18
so_5::extra::mboxes::round_robin::make_mbox
mbox_t make_mbox(environment_t &env)
Create an implementation of round-robin mbox.
Definition
round_robin.hpp:396
so_5::extra::mboxes
Definition
broadcast.hpp:23
so_5::extra
Definition
details.hpp:15
so_5
Ranges for error codes of each submodules.
Definition
details.hpp:13
so_5::extra::mboxes::round_robin::details::data_t
Common part of round-robin mbox implementation.
Definition
round_robin.hpp:141
so_5::extra::mboxes::round_robin::details::data_t::data_t
data_t(environment_t &env, mbox_id_t id)
Initializing constructor.
Definition
round_robin.hpp:143
so_5::extra::mboxes::round_robin::details::data_t::m_subscribers
messages_table_t m_subscribers
Map of subscribers to messages.
Definition
round_robin.hpp:167
so_5::extra::mboxes::round_robin::details::data_t::m_lock
Lock m_lock
Object lock.
Definition
round_robin.hpp:157
so_5::extra::mboxes::round_robin::details::data_t::m_env
environment_t & m_env
SObjectizer Environment to work in.
Definition
round_robin.hpp:151
so_5::extra::mboxes::round_robin::details::data_t::m_id
const mbox_id_t m_id
ID of this mbox.
Definition
round_robin.hpp:154
so_5::extra::mboxes::round_robin::details::subscriber_info_t
An information block about one subscriber.
Definition
round_robin.hpp:43
so_5::extra::mboxes::round_robin::details::subscriber_info_t::subscriber_info_t
subscriber_info_t(so_5::abstract_message_sink_t &sink)
Constructor for the case when subscriber info is being created during event subscription.
Definition
round_robin.hpp:49
so_5::extra::mboxes::round_robin::details::subscriber_info_t::m_sink
std::reference_wrapper< so_5::abstract_message_sink_t > m_sink
Subscriber.
Definition
round_robin.hpp:45
Generated by
1.12.0