RESTinio
response_coordinator.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  Coordinator for process od sending responses with
7  respect to http pipeline technique and chunk transfer.
8 */
9 
10 #pragma once
11 
12 #include <string>
13 #include <deque>
14 
15 #include <fmt/format.h>
16 
17 #include <restinio/exception.hpp>
18 #include <restinio/request_handler.hpp>
19 #include <restinio/buffers.hpp>
20 #include <restinio/optional.hpp>
21 
22 namespace restinio
23 {
24 
25 namespace impl
26 {
27 
29 
30 //
31 // response_context_t
32 //
33 
34 //! A context for a single response.
36 {
37  public:
38  //! Access write-groups container (used in unit tests)
41  {
42  return ctx.m_write_groups;
43  }
44 
45  //! Reinitialize context.
46  void
48  //! New request id.
49  request_id_t request_id ) noexcept
50  {
51  m_request_id = request_id;
52  m_response_output_flags =
53  response_output_flags_t{
54  response_parts_attr_t::not_final_parts,
55  response_connection_attr_t::connection_keepalive };
56  }
57 
58  //! Put write group to data queue.
59  void
60  enqueue_group( write_group_t wg )
61  {
62  // There is at least one group.
63  // So we check if this group can be merged with existing (the last one).
64  if( !m_write_groups.empty() &&
65  !m_write_groups.back().has_after_write_notificator() &&
66  std::size_t{ 0 } == wg.status_line_size() )
67  {
68  m_write_groups.back().merge( std::move( wg ) );
69  }
70  else
71  {
72  m_write_groups.emplace_back( std::move( wg ) );
73  }
74 
75  }
76 
77  //! Is context empty.
78  bool empty() const noexcept { return m_write_groups.empty(); }
79 
80  //! Extract write group from data queue.
83  {
84  assert( !m_write_groups.empty() );
85 
86  write_group_t result{ std::move( m_write_groups.front() ) };
87 
88  m_write_groups.erase( begin( m_write_groups ) );
89 
90  return result;
91  }
92 
93  //! Get id of associated request.
94  auto request_id() const noexcept { return m_request_id; }
95 
96  //! Get flags of corrent response data flow.
97  void
98  response_output_flags( response_output_flags_t flags ) noexcept
99  {
100  m_response_output_flags = flags;
101  }
102 
103  //! Get flags of corrent response data flow.
104  auto
105  response_output_flags() const noexcept
106  {
107  return m_response_output_flags;
108  }
109 
110  //! Is response data of a given request is complete.
111  bool
112  is_complete() const noexcept
113  {
114  return m_write_groups.empty() &&
115  response_parts_attr_t::final_parts ==
116  m_response_output_flags.m_response_parts;
117  }
118 
119  private:
121 
122  //! Unsent responses parts.
124 
125  //! Response flags
130 };
131 
132 //
133 // response_context_table_t
134 //
135 
136 //! Helper storage for responses' contexts.
138 {
139  public:
140  response_context_table_t( std::size_t max_elements_count )
141  {
142  m_contexts.resize( max_elements_count );
143  }
144 
145  //! If table is empty.
146  bool
147  empty() const noexcept
148  {
149  return !m_elements_exists;
150  }
151 
152  //! If table is full.
153  bool
154  is_full() const noexcept
155  {
156  return m_contexts.size() == m_elements_exists;
157  }
158 
159  //! Get first context.
161  front() noexcept
162  {
163  return m_contexts[ m_first_element_index ];
164  }
165 
166  //! Get last context.
168  back() noexcept
169  {
170  return m_contexts[
171  (m_first_element_index + (m_elements_exists - 1) ) %
172  m_contexts.size() ];
173  }
174 
175  //! Get context of specified request.
177  get_by_req_id( request_id_t req_id ) noexcept
178  {
179  if( empty() ||
180  req_id < front().request_id() ||
181  req_id > back().request_id() )
182  {
183  return nullptr;
184  }
185 
186  return &m_contexts[ get_real_index( req_id ) ];
187  }
188 
189  //! Insert new context into queue.
190  void
191  push_response_context( request_id_t req_id )
192  {
193  if( is_full() )
194  throw exception_t{
195  "unable to insert context because "
196  "response_context_table is full" };
197 
198  auto & ctx =
199  m_contexts[
200  // Current next.
201  ( m_first_element_index + m_elements_exists ) % m_contexts.size()
202  ];
203 
204  ctx.reinit( req_id );
205 
206  // 1 more element added.
207  ++m_elements_exists;
208  }
209 
210  //! Remove the first context from queue.
211  void
213  {
214  if( empty() )
215  throw exception_t{
216  "unable to pop context because "
217  "response_context_table is empty" };
218 
219  --m_elements_exists;
220  ++m_first_element_index;
221  if( m_contexts.size() == m_first_element_index )
222  {
223  m_first_element_index = std::size_t{0};
224  }
225  }
226 
227  private:
228  std::size_t
230  {
231  const auto distance_from_first =
232  req_id - front().request_id();
233 
234  return ( m_first_element_index + distance_from_first ) % m_contexts.size();
235  }
236 
240 };
241 
242 //
243 // response_coordinator_t
244 //
245 
246 //! Coordinator for process of sending responses with
247 //! respect to http pipeline technique and chunk transfer.
248 /*
249  Keeps track of maximum N (max_req_count) pipelined requests,
250  gathers pieces (write groups) of responses and provides access to
251  ready-to-send buffers on demand.
252 */
254 {
255  public:
257  //! Maximum count of requests to keep track of.
258  std::size_t max_req_count )
260  {}
261 
262  /** @name Response coordinator state.
263  * @brief Various state flags.
264  */
265  ///@{
266  bool closed() const noexcept { return m_connection_closed_response_occured; }
267  bool empty() const noexcept { return m_context_table.empty(); }
268  bool is_full() const noexcept { return m_context_table.is_full(); }
269  ///@}
270 
271  //! Check if it is possible to accept more requests.
272  bool
274  {
275  return !closed() && !is_full();
276  }
277 
278  //! Create a new request and reserve context for its response.
281  {
282  m_context_table.push_response_context( m_request_id_counter );
283 
284  return m_request_id_counter++;
285  }
286 
287  //! Add outgoing data for specified request.
288  void
290  //! Request id the responses parts are for.
291  request_id_t req_id,
292  //! Resp output flag.
293  response_output_flags_t response_output_flags,
294  //! The parts of response.
295  write_group_t wg )
296  {
297  // Nothing to do if already closed response emitted.
298  if( closed() )
299  throw exception_t{
300  "unable to append response parts, "
301  "response coordinator is closed" };
302 
303  auto * ctx = m_context_table.get_by_req_id( req_id );
304 
305  if( nullptr == ctx )
306  {
307  // Request is unknown...
308  throw exception_t{
309  fmt::format(
310  "no context associated with request {}",
311  req_id ) };
312  }
313 
314  if( response_parts_attr_t::final_parts ==
315  ctx->response_output_flags().m_response_parts )
316  {
317  // Request is already completed...
318  throw exception_t{
319  "unable to append response, "
320  "it marked as complete" };
321  }
322 
323  ctx->response_output_flags( response_output_flags );
324 
325  ctx->enqueue_group( std::move( wg ) );
326  }
327 
328  //! Extract a portion of data available for write.
329  /*!
330  Data (if available) is wrapped in an instance of write_group_t.
331  It can have a stats line mark (that is necessary for logging)
332  and a notificator that must be invoked after the write operation
333  of a given group completes.
334  */
337  {
338  if( closed() )
339  throw exception_t{
340  "unable to prepare output buffers, "
341  "response coordinator is closed" };
342 
344 
345  // Check for custom write operation.
346  if( !m_context_table.empty() )
347  {
348  auto & current_ctx = m_context_table.front();
349 
350  if( !current_ctx.empty() )
351  {
352  result =
353  std::make_pair(
356 
357  if( current_ctx.is_complete() )
358  {
364 
366  }
367  }
368  }
369 
370  return result;
371  }
372 
373  //! Remove all contexts.
374  /*!
375  Invoke write groups after-write callbacks with error status.
376  */
377  void
379  {
381  {
382  const auto ec =
385 
386  auto & current_ctx = m_context_table.front();
387  while( !current_ctx.empty() )
388  {
389  auto wg = current_ctx.dequeue_group();
390 
391  try
392  {
394  }
395  catch( ... )
396  {}
397  }
398  }
399  }
400 
401  private:
402  //! Counter for asigining id to new requests.
404 
405  //! Indicate whether a response with connection close flag was emitted.
407 
408  //! A storage for resp-context items.
410 };
411 
412 } /* namespace impl */
413 
414 } /* namespace restinio */
response_context_t & back() noexcept
Get last context.
std::size_t get_real_index(request_id_t req_id) noexcept
A context for a single response.
response_output_flags_t m_response_output_flags
Response flags.
bool m_connection_closed_response_occured
Indicate whether a response with connection close flag was emitted.
bool is_complete() const noexcept
Is response data of a given request is complete.
response_context_t * get_by_req_id(request_id_t req_id) noexcept
Get context of specified request.
bool empty() const noexcept
If table is empty.
void append_response(request_id_t req_id, response_output_flags_t response_output_flags, write_group_t wg)
Add outgoing data for specified request.
bool empty() const noexcept
Is context empty.
auto request_id() const noexcept
Get id of associated request.
auto response_output_flags() const noexcept
Get flags of corrent response data flow.
response_coordinator_t(std::size_t max_req_count)
std::vector< response_context_t > m_contexts
void reinit(request_id_t request_id) noexcept
Reinitialize context.
void response_output_flags(response_output_flags_t flags) noexcept
Get flags of corrent response data flow.
friend write_groups_container_t & utest_access(response_context_t &ctx)
Access write-groups container (used in unit tests)
void push_response_context(request_id_t req_id)
Insert new context into queue.
response_context_t & front() noexcept
Get first context.
Helper storage for responses&#39; contexts.
write_group_t dequeue_group()
Extract write group from data queue.
response_context_table_t(std::size_t max_elements_count)
request_id_t register_new_request()
Create a new request and reserve context for its response.
write_groups_container_t m_write_groups
Unsent responses parts.
void enqueue_group(write_group_t wg)
Put write group to data queue.
response_context_table_t m_context_table
A storage for resp-context items.
void pop_response_context()
Remove the first context from queue.
bool is_able_to_get_more_messages() const noexcept
Check if it is possible to accept more requests.
bool is_full() const noexcept
If table is full.
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, optional_t< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in optional_t<Value_Type> if parameter exists ...
Definition: value_or.hpp:64