RESTinio
connection.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  HTTP-connection routine.
7 */
8 
9 #pragma once
10 
11 #include <restinio/asio_include.hpp>
12 
13 #include <http_parser.h>
14 
15 #include <restinio/impl/include_fmtlib.hpp>
16 
17 #include <restinio/exception.hpp>
18 #include <restinio/http_headers.hpp>
19 #include <restinio/request_handler.hpp>
20 #include <restinio/connection_count_limiter.hpp>
21 #include <restinio/impl/connection_base.hpp>
22 #include <restinio/impl/header_helpers.hpp>
23 #include <restinio/impl/response_coordinator.hpp>
24 #include <restinio/impl/connection_settings.hpp>
25 #include <restinio/impl/fixed_buffer.hpp>
26 #include <restinio/impl/write_group_output_ctx.hpp>
27 #include <restinio/impl/executor_wrapper.hpp>
28 #include <restinio/impl/sendfile_operation.hpp>
29 
30 #include <restinio/utils/impl/safe_uint_truncate.hpp>
31 #include <restinio/utils/at_scope_exit.hpp>
32 
33 namespace restinio
34 {
35 
36 namespace impl
37 {
38 
39 //
40 // http_parser_ctx_t
41 //
42 
43 //! Parsing result context for using in parser callbacks.
44 /*!
45  All data is used as temps, and is usable only
46  after parsing completes new requests then it is moved out.
47 */
49 {
50  //! Request data.
51  //! \{
54  //! \}
55 
56  //! Parser context temp values and flags.
57  //! \{
60  bool m_last_was_value{ true };
61 
62  /*!
63  * @since v.0.6.9
64  */
66 
67  /*!
68  * @since v.0.6.9
69  */
71  //! \}
72 
73  //! Flag: is http message parsed completely.
74  bool m_message_complete{ false };
75 
76  /*!
77  * @brief Total number of parsed HTTP-fields.
78  *
79  * This number includes the number of leading HTTP-fields and the number
80  * of trailing HTTP-fields (in the case of chunked encoding).
81  *
82  * @since v.0.6.12
83  */
85 
86  /*!
87  * @brief Limits for the incoming message.
88  *
89  * @since v.0.6.12
90  */
92 
93  /*!
94  * @brief The main constructor.
95  *
96  * @since v.0.6.12
97  */
99  incoming_http_msg_limits_t limits )
100  : m_limits{ limits }
101  {}
102 
103  //! Prepare context to handle new request.
104  void
106  {
107  m_header = http_request_header_t{};
108  m_body.clear();
109  m_current_field_name.clear();
110  m_last_value_total_size = 0u;
111  m_last_was_value = true;
113  m_message_complete = false;
114  m_total_field_count = 0u;
115  }
116 
117  //! Creates an instance of chunked_input_info if there is an info
118  //! about chunks in the body.
119  /*!
120  * @since v.0.6.9
121  */
122  RESTINIO_NODISCARD
123  chunked_input_info_unique_ptr_t
125  {
127 
130  {
133  }
134 
135  return result;
136  }
137 };
138 
139 //! Include parser callbacks.
140 #include "parser_callbacks.ipp"
141 
142 //
143 // create_parser_settings()
144 //
145 
146 //! Helper for setting parser settings.
147 /*!
148  Is used to initialize const value in connection_settings_t ctor.
149 */
150 template< typename Http_Methods >
153 {
156 
158  []( http_parser * parser, const char * at, size_t length ) -> int {
159  return restinio_url_cb( parser, at, length );
160  };
161 
163  []( http_parser * parser, const char * at, size_t length ) -> int {
165  };
166 
168  []( http_parser * parser, const char * at, size_t length ) -> int {
170  };
171 
173  []( http_parser * parser ) -> int {
175  };
176 
178  []( http_parser * parser, const char * at, size_t length ) -> int {
179  return restinio_body_cb( parser, at, length );
180  };
181 
183  []( http_parser * parser ) -> int {
185  };
186 
188  []( http_parser * parser ) -> int {
190  };
191 
193  []( http_parser * parser ) -> int {
195  };
196 
197  return parser_settings;
198 }
199 
200 //
201 // connection_upgrade_stage_t
202 //
203 
204 //! Enum for a flag specifying that connection is going to upgrade or not.
206 {
207  //! No connection request in progress
208  none,
209  //! Request with connection-upgrade header came and waits for
210  //! request handler to be called in non pipelined fashion
211  //! (it must be the only request that is handled at the moment).
213  //! Handler for request with connection-upgrade header was called
214  //! so any response data comming is for that request.
215  //! If connection transforms to websocket connection
216  //! then no further operations are expected.
218 };
220 //
221 // connection_input_t
222 //
223 
224 //! Data associated with connection read routine.
226 {
228  std::size_t buffer_size,
229  incoming_http_msg_limits_t limits )
230  : m_parser_ctx{ limits }
231  , m_buf{ buffer_size }
232  {}
233 
234  //! HTTP-parser.
235  //! \{
238  //! \}
239 
240  //! Input buffer.
242 
243  //! Connection upgrade request stage.
246 
247  //! Flag to track whether read operation is performed now.
249 
250  //! Prepare parser for reading new http-message.
251  void
253  {
254  // Reinit parser.
255  http_parser_init( &m_parser, HTTP_REQUEST);
256 
257  // Reset context and attach it to parser.
258  m_parser_ctx.reset();
259  m_parser.data = &m_parser_ctx;
260  }
261 };
262 
263 template < typename Connection, typename Start_Read_CB, typename Failed_CB >
264 void
266  asio_ns::ip::tcp::socket & ,
267  Connection & ,
268  Start_Read_CB start_read_cb,
269  Failed_CB )
270 {
271  // No preparation is needed, start
272  start_read_cb();
273 }
274 
275 // An overload for the case of non-TLS-connection.
276 inline tls_socket_t *
278  asio_ns::ip::tcp::socket & ) noexcept
279 {
280  return nullptr;
281 }
282 
283 //
284 // connection_t
285 //
286 
287 //! Context for handling http connections.
288 /*
289  Working circle consists of the following steps:
290  * wait for request -- reading from socket and parsing header and body;
291  * handling request -- once the request is completely obtained it's handling
292  is deligated to a handler chosen by handler factory;
293  * writing response -- writing response to socket;
294  * back to first step o close connection -- depending on keep-alive property
295  of the last response the connection goes back to first step or
296  shutdowns.
297 
298  Each step is controlled by timer (\see schedule_operation_timeout_callback())
299 
300  In case of errors connection closes itself.
301 */
302 template < typename Traits >
304  : public connection_base_t
305  , public executor_wrapper_t< typename Traits::strand_t >
306 {
308 
309  public:
310  using timer_manager_t = typename Traits::timer_manager_t;
311  using timer_guard_t = typename timer_manager_t::timer_guard_t;
314  using logger_t = typename Traits::logger_t;
315  using strand_t = typename Traits::strand_t;
316  using stream_socket_t = typename Traits::stream_socket_t;
317  using lifetime_monitor_t =
319 
321  //! Connection id.
322  connection_id_t conn_id,
323  //! Connection socket.
324  stream_socket_t && socket,
325  //! Settings that are common for connections.
326  connection_settings_handle_t< Traits > settings,
327  //! Remote endpoint for that connection.
328  endpoint_t remote_endpoint,
329  //! Lifetime monitor to be used for handling connection count.
330  lifetime_monitor_t lifetime_monitor )
333  , m_socket{ std::move( socket ) }
334  , m_settings{ std::move( settings ) }
336  , m_input{
339  }
343  , m_logger{ *( m_settings->m_logger ) }
345  {
346  // Notify of a new connection instance.
347  m_logger.trace( [&]{
348  return fmt::format(
349  "[connection:{}] start connection with {}",
350  connection_id(),
352  } );
353  }
354 
355  // Disable copy/move.
356  connection_t( const connection_t & ) = delete;
357  connection_t( connection_t && ) = delete;
358  connection_t & operator = ( const connection_t & ) = delete;
359  connection_t & operator = ( connection_t && ) = delete;
360 
362  {
364  [&]{
365  return fmt::format(
366  "[connection:{}] destructor called",
367  connection_id() );
368  } );
369  }
370 
371  void
373  {
375  m_socket,
376  *this,
377  [ & ]{
378  // Inform state listener if it used.
379  m_settings->call_state_listener( [this]() noexcept {
380  return connection_state::notice_t{
381  this->connection_id(),
382  this->m_remote_endpoint,
385  m_socket )
386  }
387  };
388  } );
389 
390  // Start timeout checking.
393 
394  // Start reading request.
396  },
397  [ & ]( const asio_ns::error_code & ec ){
399  return fmt::format(
400  "[connection:{}] prepare connection error: {}",
401  connection_id(),
402  ec.message() );
403  } );
404  } );
405  }
406 
407  //! Start reading next htttp-message.
408  void
410  {
411  m_logger.trace( [&]{
412  return fmt::format(
413  "[connection:{}] start waiting for request",
414  connection_id() );
415  } );
416 
417  // Prepare parser for consuming new request message.
419 
420  // Guard total time for a request to be read.
421  // guarding here makes the total read process
422  // to run in read_next_http_message_timelimit.
424 
425  if( 0 != m_input.m_buf.length() )
426  {
427  // If a pipeline requests were sent by client
428  // then the biginning (or even entire request) of it
429  // is in the buffer obtained from socket in previous
430  // read operation.
432  }
433  else
434  {
435  // Next request (if any) must be obtained from socket.
436  consume_message();
437  }
438  }
439 
440  //! Internals that are necessary for upgrade.
442  {
444  upgrade_internals_t && ) = default;
445 
447  connection_settings_handle_t< Traits > settings,
448  stream_socket_t socket,
449  lifetime_monitor_t lifetime_monitor )
450  : m_settings{ std::move(settings) }
451  , m_socket{ std::move( socket ) }
453  {}
454 
456  stream_socket_t m_socket;
458  };
459 
460  //! Move socket out of connection.
463  {
464  return upgrade_internals_t{
465  m_settings,
466  std::move(m_socket),
468  };
469  }
470 
471  private:
472  //! Start (continue) a chain of read-parse-read-... operations.
473  inline void
475  {
477  {
478  m_logger.trace( [&]{
479  return fmt::format(
480  "[connection:{}] continue reading request",
481  connection_id() );
482  } );
483 
484 
489  this->get_executor(),
490  [this, ctx = shared_from_this()]
491  // NOTE: this lambda is noexcept since v.0.6.0.
492  ( const asio_ns::error_code & ec,
493  std::size_t length ) noexcept {
496  } ) );
497  }
498  else
499  {
500  m_logger.trace( [&]{
501  return fmt::format(
502  "[connection:{}] skip read operation: already running",
503  connection_id() );
504  } );
505  }
506  }
507 
508  //! Handle read operation result.
509  inline void
510  after_read( const asio_ns::error_code & ec, std::size_t length ) noexcept
511  {
512  if( !ec )
513  {
514  // Exceptions shouldn't go out of `after_read`.
515  // So intercept them and close the connection in the case
516  // of an exception.
517  try
518  {
519  m_logger.trace( [&]{
520  return fmt::format(
521  "[connection:{}] received {} bytes",
522  this->connection_id(),
523  length );
524  } );
525 
527 
529  }
530  catch( const std::exception & x )
531  {
533  return fmt::format(
534  "[connection:{}] unexpected exception during the "
535  "handling of incoming data: {}",
536  connection_id(),
537  x.what() );
538  } );
539  }
540  }
541  else
542  {
543  // Well, if it is actually an error
544  // then close connection.
546  {
547  if ( !error_is_eof( ec ) || 0 != m_input.m_parser.nread )
549  return fmt::format(
550  "[connection:{}] read socket error: {}; "
551  "parsed bytes: {}",
552  connection_id(),
553  ec.message(),
555  } );
556  else
557  {
558  // A case that is not such an error:
559  // on a connection (most probably keeped alive
560  // after previous request, but a new also applied)
561  // no bytes were consumed and remote peer closes connection.
563  [&]{
564  return fmt::format(
565  "[connection:{}] EOF and no request, "
566  "close connection",
567  connection_id() );
568  } );
569 
571  }
572  }
573  // else: read operation was cancelled.
574  }
575  }
576 
577  //! Parse some data.
578  void
579  consume_data( const char * data, std::size_t length )
580  {
581  auto & parser = m_input.m_parser;
582 
583  const auto nparsed =
585  &parser,
587  data,
588  length );
589 
590  // If entire http-message was obtained,
591  // parser is stopped and the might be a part of consecutive request
592  // left in buffer, so we mark how many bytes were obtained.
593  // and next message read (if any) will be started from already existing
594  // data left in buffer.
596 
597  if( HPE_OK != parser.http_errno &&
599  {
600  // PARSE ERROR:
601  auto err = HTTP_PARSER_ERRNO( &parser );
602 
603  // TODO: handle case when there are some request in process.
605  return fmt::format(
606  "[connection:{}] parser error {}: {}",
607  connection_id(),
608  http_errno_name( err ),
610  } );
611 
612  // nothing to do.
613  return;
614  }
615 
617  {
619  }
620  else
621  consume_message();
622  }
623 
624  //! Handle a given request message.
625  void
627  {
628  try
629  {
630  auto & parser = m_input.m_parser;
632 
633  if( m_input.m_parser.upgrade )
634  {
635  // Start upgrade connection operation.
636 
637  // The first thing is to make sure
638  // that upgrade request will be handled in
639  // a non pipelined fashion.
642  }
643 
646  {
647  // Run ordinary HTTP logic.
649 
650  m_logger.trace( [&]{
651  return fmt::format(
652  "[connection:{}] request received (#{}): {} {}",
653  connection_id(),
654  request_id,
656  static_cast<http_method>( parser.method ) ),
658  } );
659 
660  // TODO: mb there is a way to
661  // track if response was emmited immediately in handler
662  // or it was delegated
663  // so it is possible to omit this timer scheduling.
665 
666  const auto handling_result =
669  request_id,
676 
677  switch( handling_result )
678  {
681  // If handler refused request, say not implemented.
683  request_id,
688  break;
689 
692  {
693  // Request was accepted,
694  // didn't create immediate response that closes connection after,
695  // and it is possible to receive more requests
696  // then start consuming yet another request.
698  }
699  break;
700  }
701  }
702  else
703  {
704  m_logger.trace( [&]{
705  const std::string default_value{};
706 
707  return fmt::format(
708  "[connection:{}] upgrade request received: {} {}; "
709  "Upgrade: '{}';",
710  connection_id(),
712  static_cast<http_method>( parser.method ) ),
716  } );
717 
719  {
720  // There are no requests in handling
721  // So the current request with upgrade
722  // is the only one and can be handled directly.
723  // It is safe to call a handler for it.
725  }
726  else
727  {
728  // There are pipelined request
729  m_logger.trace( [&]{
730  return fmt::format(
731  "[connection:{}] upgrade request happened to be a pipelined one, "
732  "and will be handled after previous requests are handled",
733  connection_id() );
734  } );
735  }
736 
737  // No further actions (like continue reading) in both cases are needed.
738  }
739 
740  }
741  catch( const std::exception & ex )
742  {
744  return fmt::format(
745  "[connection:{}] error while handling request: {}",
746  this->connection_id(),
747  ex.what() );
748  } );
749  }
750  }
751 
752  //! Calls handler for upgrade request.
753  /*!
754  Request data must be in input context (m_input).
755  */
756  void
758  {
759  auto & parser = m_input.m_parser;
761 
762  // If user responses with error
763  // then connection must be able to send
764  // (hence to receive) response.
765 
767 
768  m_logger.info( [&]{
769  return fmt::format(
770  "[connection:{}] handle upgrade request (#{}): {} {}",
771  connection_id(),
772  request_id,
774  static_cast<http_method>( parser.method ) ),
776  } );
777 
778  // Do not guard upgrade request.
780 
781  // After calling handler we expect the results or
782  // no further operations with connection
785 
786  const auto handling_result = m_request_handler(
788  request_id,
795  switch( handling_result )
796  {
799  if( m_socket.is_open() )
800  {
801  // Request is rejected, so our socket
802  // must not be moved out to websocket connection.
803 
804  // If handler refused request, say not implemented.
806  request_id,
811  }
812  else
813  {
814  // Request is rejected, but the socket
815  // was moved out to somewhere else???
816 
817  m_logger.error( [&]{
818  return fmt::format(
819  "[connection:{}] upgrade request handler rejects "
820  "request, but socket was moved out from connection",
821  connection_id() );
822  } );
823  }
824  break;
825 
827  /* nothing to do */
828  break;
829  }
830 
831  // Else 2 cases:
832  // 1. request is handled asynchronously, so
833  // what happens next depends on handling.
834  // 2. handling was immediate, so further destiny
835  // of a connection was already determined.
836  //
837  // In both cases: here do nothing.
838  // We can't even do read-only access because
839  // upgrade handling might take place
840  // in distinct execution context.
841  // So no even a log messages here.
842  }
843 
844  //! Write parts for specified request.
845  virtual void
847  //! Request id.
848  request_id_t request_id,
849  //! Resp output flag.
850  response_output_flags_t response_output_flags,
851  //! Part of the response data.
852  write_group_t wg ) override
853  {
854  //! Run write message on io_context loop if possible.
855  asio_ns::dispatch(
856  this->get_executor(),
857  [ this,
858  request_id,
860  actual_wg = std::move( wg ),
861  ctx = shared_from_this() ]
862  // NOTE that this lambda is noexcept since v.0.6.0.
863  () mutable noexcept
864  {
865  try
866  {
868  request_id,
870  std::move( actual_wg ) );
871  }
872  catch( const std::exception & ex )
873  {
875  return fmt::format(
876  "[connection:{}] unable to handle response: {}",
877  connection_id(),
878  ex.what() );
879  } );
880  }
881  } );
882  }
883 
884  //! Write parts for specified request.
885  void
887  //! Request id.
888  request_id_t request_id,
889  //! Resp output flag.
890  response_output_flags_t response_output_flags,
891  //! Part of the response data.
892  write_group_t wg )
893  {
895  try
896  {
900  }
901  catch( const std::exception & ex )
902  {
903  m_logger.error( [&]{
904  return fmt::format(
905  "[connection:{}] notificator error: {}",
906  connection_id(),
907  ex.what() );
908  } );
909  }
910  };
911 
912  if( m_socket.is_open() )
913  {
917  {
918  // It is response for a connection-upgrade request.
919  // If we receive it here then it is constructed via
920  // message builder and so connection was not transformed
921  // to websocket connection.
922  // So it is necessary to resume pipeline logic that was stopped
923  // for upgrade-request to be handled as the only request
924  // on the connection for that moment.
926  {
928  }
929  }
930 
932  {
933  m_logger.trace( [&]{
934  return fmt::format(
935  "[connection:{}] append response (#{}), "
936  "flags: {}, write group size: {}",
937  connection_id(),
938  request_id,
940  wg.items_count() );
941  } );
942 
944  request_id,
946  std::move( wg ) );
947 
949  }
950  else
951  {
952  m_logger.warn( [&]{
953  return fmt::format(
954  "[connection:{}] receive response parts for "
955  "request (#{}), but response with connection-close "
956  "attribute happened before",
957  connection_id(),
958  request_id );
959  } );
961  }
962  }
963  else
964  {
965  m_logger.warn( [&]{
966  return fmt::format(
967  "[connection:{}] try to write response, "
968  "while socket is closed",
969  connection_id() );
970  } );
972  }
973  }
974 
975  // Check if there is something to write,
976  // and if so starts write operation.
977  void
979  {
981 
983  {
984  init_write();
985  }
986  }
987 
988  //! Initiate write operation.
989  void
991  {
992  // Here: not writing anything to socket, so
993  // write operation can be initiated.
994 
995  // Remember if all response cells were busy.
998 
1000 
1001  if( next_write_group )
1002  {
1003  m_logger.trace( [&]{
1004  return fmt::format(
1005  "[connection:{}] start next write group for response (#{}), "
1006  "size: {}",
1007  this->connection_id(),
1010  } );
1011 
1012  // Check if all response cells busy:
1013  const bool response_coordinator_full_after =
1015 
1016  // Whether we need to resume read after this group is written?
1020 
1022  {
1023  // We need to extract status line out of the first buffer
1024  assert(
1027 
1028  m_logger.trace( [&]{
1029  // Get status line:
1030  const string_view_t
1031  status_line{
1032  asio_ns::buffer_cast< const char * >(
1035 
1036  return
1037  fmt::format(
1038  "[connection:{}] start response (#{}): {}",
1039  this->connection_id(),
1041  status_line );
1042  } );
1043  }
1044 
1045  // Initialize write context with a new write group.
1047  std::move( next_write_group->first ) );
1048 
1049  // Start the loop of sending data from current write group.
1051  }
1052  else
1053  {
1055  }
1056  }
1057 
1058  // Use aliases for shorter names.
1062 
1063  //! Start/continue/continue handling output data of current write group.
1064  /*!
1065  This function is a starting point of a loop process of sending data
1066  from a given write group.
1067  It extracts the next bunch of trivial buffers or a
1068  sendfile-runner and starts an appropriate write operation.
1069  In data of a given write group finishes,
1070  finish_handling_current_write_ctx() is invoked thus breaking the loop.
1071 
1072  @note
1073  Since v.0.6.0 this method is noexcept.
1074  */
1075  void
1077  {
1078  try
1079  {
1081 
1083  {
1085  }
1087  {
1089  }
1090  else
1091  {
1094  }
1095  }
1096  catch( const std::exception & ex )
1097  {
1099  return fmt::format(
1100  "[connection:{}] handle_current_write_ctx failed: {}",
1101  connection_id(),
1102  ex.what() );
1103  } );
1104  }
1105  }
1106 
1107  //! Run trivial buffers write operation.
1108  void
1109  handle_trivial_write_operation( const trivial_write_operation_t & op )
1110  {
1111  // Asio buffers (param for async write):
1112  auto & bufs = op.get_trivial_bufs();
1113 
1115  {
1116  m_logger.trace( [&]{
1117  return fmt::format(
1118  "[connection:{}] sending resp data with "
1119  "connection-close attribute "
1120  "buf count: {}, "
1121  "total size: {}",
1122  connection_id(),
1123  bufs.size(),
1124  op.size() );
1125  } );
1126 
1127  // Reading new requests is useless.
1130  }
1131  else
1132  {
1133  m_logger.trace( [&]{
1134  return fmt::format(
1135  "[connection:{}] sending resp data, "
1136  "buf count: {}, "
1137  "total size: {}",
1138  connection_id(),
1139  bufs.size(),
1140  op.size() ); } );
1141  }
1142 
1143  // There is somethig to write.
1145  m_socket,
1146  bufs,
1148  this->get_executor(),
1149  [this, ctx = shared_from_this()]
1150  // NOTE: since v.0.6.0 this lambda is noexcept.
1151  ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1152  {
1153  if( !ec )
1154  {
1156  [&]{
1157  return fmt::format(
1158  "[connection:{}] outgoing data was sent: {} bytes",
1159  connection_id(),
1160  written );
1161  } );
1162  }
1163 
1165  } ) );
1166 
1168  }
1169 
1170  //! Run sendfile write operation.
1171  void
1172  handle_file_write_operation( file_write_operation_t & op )
1173  {
1175  {
1176  m_logger.trace( [&]{
1177  return fmt::format(
1178  "[connection:{}] sending resp file data with "
1179  "connection-close attribute, "
1180  "total size: {}",
1181  connection_id(),
1182  op.size() );
1183  } );
1184 
1185  // Reading new requests is useless.
1188  }
1189  else
1190  {
1191  m_logger.trace( [&]{
1192  return fmt::format(
1193  "[connection:{}] sending resp file data, total size: {}",
1194  connection_id(),
1195  op.size() );
1196  } );
1197  }
1198 
1200 
1201  auto op_ctx = op;
1202 
1204  this->get_executor(),
1205  m_socket,
1207  this->get_executor(),
1208  [this, ctx = shared_from_this(),
1209  // Store operation context till the end
1210  op_ctx ]
1211  // NOTE: since v.0.6.0 this lambda is noexcept
1212  (const asio_ns::error_code & ec, file_size_t written ) mutable noexcept
1213  {
1214  // NOTE: op_ctx should be reset just before return from
1215  // that lambda. We can't call reset() until the end of
1216  // the lambda because lambda object itself will be
1217  // destroyed.
1219  [&op_ctx] {
1220  // Reset sendfile operation context.
1222  } );
1223 
1224  if( !ec )
1225  {
1227  [&]{
1228  return fmt::format(
1229  "[connection:{}] file data was sent: {} bytes",
1230  connection_id(),
1231  written );
1232  } );
1233  }
1234  else
1235  {
1237  [&]{
1238  return fmt::format(
1239  "[connection:{}] send file data error: {} ({}) bytes",
1240  connection_id(),
1241  ec.value(),
1242  ec.message() );
1243  } );
1244  }
1245 
1247  } ) );
1248  }
1249 
1250  //! Do post write actions for current write group.
1251  void
1253  {
1254  // Finishing writing this group.
1255  m_logger.trace( [&]{
1256  return fmt::format(
1257  "[connection:{}] finishing current write group",
1258  this->connection_id() );
1259  } );
1260 
1261  // Group notificators are called from here (if exist):
1263 
1265  {
1266  m_logger.trace( [&]{
1267  return fmt::format(
1268  "[connection:{}] should keep alive",
1269  this->connection_id() );
1270  } );
1271 
1274  {
1275  // Run ordinary HTTP logic.
1277  {
1279  }
1280 
1281  // Start another write opertion
1282  // if there is something to send.
1284  }
1285  else
1286  {
1288  {
1289  // Here upgrade req is the only request
1290  // to by handled by this connection.
1291  // So it is safe to call a handler for it.
1293  }
1294  else
1295  {
1296  // Do not start reading in any case,
1297  // but if there is at least one request preceding
1298  // upgrade-req, logic must continue http interaction.
1300  }
1301  }
1302  }
1303  else
1304  {
1305  // No keep-alive, close connection.
1306  close();
1307  }
1308  }
1309 
1310  void
1312  {
1314  {
1315  // Bufs empty but there happened to
1316  // be a response context marked as complete
1317  // (final_parts) and having connection-close attr.
1318  // It is because `init_write_if_necessary()`
1319  // is called only under `!m_response_coordinator.closed()`
1320  // condition, so if no bufs were obtained
1321  // and response coordinator is closed means
1322  // that a first response stored by
1323  // response coordinator was marked as complete
1324  // without data.
1325 
1326  m_logger.trace( [&]{
1327  return fmt::format(
1328  "[connection:{}] last sent response was marked "
1329  "as complete",
1330  connection_id() ); } );
1331  close();
1332  }
1333  else
1334  {
1335  // Not writing anything, so need to deal with timouts.
1337  {
1338  // No requests in processing.
1339  // So set read next request timeout.
1341  }
1342  else
1343  {
1344  // Have requests in process.
1345  // So take control over request handling.
1347  }
1348  }
1349  }
1350 
1351  //! Handle write response finished.
1352  /*!
1353  * @note
1354  * Since v.0.6.0 this method is noexcept.
1355  */
1356  void
1357  after_write( const asio_ns::error_code & ec ) noexcept
1358  {
1359  if( !ec )
1360  {
1362  }
1363  else
1364  {
1365  if( !error_is_operation_aborted( ec ) )
1366  {
1368  return fmt::format(
1369  "[connection:{}] unable to write: {}",
1370  connection_id(),
1371  ec.message() );
1372  } );
1373  }
1374  // else: Operation aborted only in case of close was called.
1375 
1376  try
1377  {
1379  }
1380  catch( const std::exception & ex )
1381  {
1383  [&]{
1384  return fmt::format(
1385  "[connection:{}] notificator error: {}",
1386  connection_id(),
1387  ex.what() );
1388  } );
1389  }
1390  }
1391  }
1392 
1393  //! Close connection functions.
1394  //! \{
1395 
1396  //! Standard close routine.
1397  void
1398  close() noexcept
1399  {
1401  [&]{
1402  return fmt::format(
1403  "[connection:{}] close",
1404  connection_id() );
1405  } );
1406 
1407  // shutdown() and close() should be called regardless of
1408  // possible exceptions.
1410  m_logger,
1411  "connection.socket.shutdown",
1412  [this] {
1416  ignored_ec );
1417  } );
1419  m_logger,
1420  "connection.socket.close",
1421  [this] {
1422  m_socket.close();
1423  } );
1424 
1426  [&]{
1427  return fmt::format(
1428  "[connection:{}] close: close socket",
1429  connection_id() );
1430  } );
1431 
1432  // Clear stuff.
1434 
1436  [&]{
1437  return fmt::format(
1438  "[connection:{}] close: timer canceled",
1439  connection_id() );
1440  } );
1441 
1443 
1445  [&]{
1446  return fmt::format(
1447  "[connection:{}] close: reset responses data",
1448  connection_id() );
1449  } );
1450 
1451  // Inform state listener if it used.
1453  [this]() noexcept {
1454  return connection_state::notice_t{
1455  this->connection_id(),
1456  this->m_remote_endpoint,
1458  };
1459  } );
1460  }
1461 
1462  //! Trigger an error.
1463  /*!
1464  Closes the connection and write to log
1465  an error message.
1466  */
1467  template< typename Message_Builder >
1468  void
1469  trigger_error_and_close( Message_Builder msg_builder ) noexcept
1470  {
1471  // An exception from logger/msg_builder shouldn't prevent
1472  // a call to close().
1475 
1477  }
1478  //! \}
1479 
1480  //! Connection.
1481  stream_socket_t m_socket;
1482 
1483  //! Common paramaters of a connection.
1485 
1486  //! Remote endpoint for this connection.
1488 
1489  //! Input routine.
1491 
1492  //! Write to socket operation context.
1494 
1495  // Memo flag: whether we need to resume read after this group is written
1497 
1498  //! Response coordinator.
1500 
1501  //! Timer to controll operations.
1502  //! \{
1503 
1504  //! Check timeouts for all activities.
1505  static connection_t &
1506  cast_to_self( tcp_connection_ctx_base_t & base )
1507  {
1508  return static_cast< connection_t & >( base );
1509  }
1510 
1511  //! Schedules real timedout operations check on
1512  //! the executer of a connection.
1513  virtual void
1514  check_timeout( tcp_connection_ctx_handle_t & self ) override
1515  {
1516  asio_ns::dispatch(
1517  this->get_executor(),
1518  [ ctx = std::move( self ) ]
1519  // NOTE: this lambda is noexcept since v.0.6.0.
1520  () noexcept {
1521  auto & conn_object = cast_to_self( *ctx );
1522  // If an exception will be thrown we can only
1523  // close the connection.
1524  try
1525  {
1527  }
1528  catch( const std::exception & x )
1529  {
1531  return fmt::format( "[connection: {}] unexpected "
1532  "error during timeout handling: {}",
1534  x.what() );
1535  } );
1536  }
1537  } );
1538  }
1539 
1540  //! Callback type for timedout operations.
1541  using timout_cb_t = void (connection_t::* )( void );
1542 
1543  //! Callback to all if timeout happened.
1544  timout_cb_t m_current_timeout_cb{ nullptr };
1545 
1546  //! Timeout point of a current guarded operation.
1548  //! Timer guard.
1549  timer_guard_t m_timer_guard;
1550  //! A prepared weak handle for passing it to timer guard.
1552 
1553  //! Check timed out operation.
1554  void
1556  {
1558  {
1559  if( m_current_timeout_cb )
1560  (this->*m_current_timeout_cb)();
1561  }
1562  else
1563  {
1565  }
1566  }
1567 
1568  //! Schedule next timeout checking.
1569  void
1571  {
1573  }
1574 
1575  //! Stop timout guarding.
1576  void
1578  {
1579  m_current_timeout_cb = nullptr;
1581  }
1582 
1583  //! Helper function to work with timer guard.
1584  void
1586  std::chrono::steady_clock::time_point timeout_after,
1587  timout_cb_t timout_cb )
1588  {
1591  }
1592 
1593  void
1595  std::chrono::steady_clock::duration timeout,
1596  timout_cb_t timout_cb )
1597  {
1600  timout_cb );
1601  }
1602 
1603  void
1604  handle_xxx_timeout( const char * operation_name )
1605  {
1606  m_logger.trace( [&]{
1607  return fmt::format(
1608  "[connection:{}] {} timed out",
1609  connection_id(),
1610  operation_name );
1611  } );
1612 
1613  close();
1614  }
1615 
1616  void
1618  {
1619  handle_xxx_timeout( "wait for request" );
1620  }
1621 
1622  //! Statr guard read operation if necessary.
1623  void
1625  {
1627  {
1631  }
1632  }
1633 
1634  void
1636  {
1637  handle_xxx_timeout( "handle request" );
1638  }
1639 
1640  //! Start guard request handling operation if necessary.
1641  void
1643  {
1645  {
1649  }
1650  }
1651 
1652  void
1654  {
1655  handle_xxx_timeout( "writing response" );
1656  }
1657 
1658  //! Start guard write operation if necessary.
1659  void
1661  {
1665  }
1666 
1667  void
1669  {
1670  handle_xxx_timeout( "writing response (sendfile)" );
1671  }
1672 
1673  void
1674  guard_sendfile_operation( std::chrono::steady_clock::duration timelimit )
1675  {
1676  if( std::chrono::steady_clock::duration::zero() == timelimit )
1678 
1680  timelimit,
1682  }
1683  //! \}
1684 
1685  //! Request handler.
1687 
1688  //! Logger for operation
1689  logger_t & m_logger;
1690 
1691  /*!
1692  * @brief Monitor of the connection lifetime.
1693  *
1694  * It's required for controlling the count of active parallel
1695  * connections.
1696  *
1697  * @since v.0.6.12
1698  */
1700 };
1701 
1702 //
1703 // connection_factory_t
1704 //
1705 
1706 //! Factory for connections.
1707 template < typename Traits >
1709 {
1710  public:
1711  using logger_t = typename Traits::logger_t;
1712  using stream_socket_t = typename Traits::stream_socket_t;
1713  using lifetime_monitor_t =
1715 
1717  connection_settings_handle_t< Traits > connection_settings,
1718  std::unique_ptr< socket_options_setter_t > socket_options_setter )
1722  {}
1723 
1724  // NOTE: since v.0.6.3 it returns non-empty
1725  // shared_ptr<connection_t<Traits>> or an exception is thrown in
1726  // the case of an error.
1727  // NOTE: since v.0.6.12 it accepts yet another parameter: lifetime_monitor.
1728  auto
1730  stream_socket_t socket,
1731  endpoint_t remote_endpoint,
1732  lifetime_monitor_t lifetime_monitor )
1733  {
1734  using connection_type_t = connection_t< Traits >;
1735 
1736  {
1737  socket_options_t options{ socket.lowest_layer() };
1738  (*m_socket_options_setter)( options );
1739  }
1740 
1741  return std::make_shared< connection_type_t >(
1742  m_connection_id_counter++,
1743  std::move( socket ),
1744  m_connection_settings,
1745  std::move( remote_endpoint ),
1746  std::move( lifetime_monitor ) );
1747  }
1748 
1749  private:
1751 
1753 
1755 
1756  logger_t & m_logger;
1757 };
1758 
1759 } /* namespace impl */
1760 
1761 } /* namespace restinio */
http_request_header_t m_header
Request data.
Definition: connection.hpp:52
void check_timeout_impl()
Check timed out operation.
upgrade_internals_t move_upgrade_internals()
Move socket out of connection.
Definition: connection.hpp:462
static connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timer to controll operations.
std::size_t m_total_field_count
Total number of parsed HTTP-fields.
Definition: connection.hpp:84
void consume_message()
Start (continue) a chain of read-parse-read-... operations.
Definition: connection.hpp:474
connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
connection_input_t(std::size_t buffer_size, incoming_http_msg_limits_t limits)
Definition: connection.hpp:227
std::string m_current_field_name
Parser context temp values and flags.
Definition: connection.hpp:58
response_coordinator_t m_response_coordinator
Response coordinator.
bool m_message_complete
Flag: is http message parsed completely.
Definition: connection.hpp:74
Request with connection-upgrade header came and waits for request handler to be called in non pipelin...
void after_write(const asio_ns::error_code &ec) noexcept
Handle write response finished.
write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
void guard_sendfile_operation(std::chrono::steady_clock::duration timelimit)
timout_cb_t m_current_timeout_cb
Callback to all if timeout happened.
tls_socket_t * make_tls_socket_pointer_for_state_listener(tls_socket_t &socket) noexcept
Definition: tls.hpp:341
upgrade_internals_t(upgrade_internals_t &&)=default
connection_t & operator=(connection_t &&)=delete
http_parser m_parser
HTTP-parser.
Definition: connection.hpp:236
void handle_current_write_ctx() noexcept
Start/continue/continue handling output data of current write group.
connection_t & operator=(const connection_t &)=delete
std::unique_ptr< socket_options_setter_t > m_socket_options_setter
RESTINIO_NODISCARD chunked_input_info_unique_ptr_t make_chunked_input_info_if_necessary()
Creates an instance of chunked_input_info if there is an info about chunks in the body...
Definition: connection.hpp:124
logger_t & m_logger
Logger for operation.
connection_settings_handle_t< Traits > m_connection_settings
stream_socket_t m_socket
Connection.
void cancel_timeout_checking() noexcept
Stop timout guarding.
connection_t(connection_id_t conn_id, stream_socket_t &&socket, connection_settings_handle_t< Traits > settings, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
Definition: connection.hpp:320
void finish_handling_current_write_ctx()
Do post write actions for current write group.
timer_guard_t m_timer_guard
Timer guard.
void consume_data(const char *data, std::size_t length)
Parse some data.
Definition: connection.hpp:579
chunked_input_info_block_t m_chunked_info_block
Definition: connection.hpp:70
virtual void write_response_parts(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg) override
Write parts for specified request.
Definition: connection.hpp:846
void prepare_connection_and_start_read(asio_ns::ip::tcp::socket &, Connection &, Start_Read_CB start_read_cb, Failed_CB)
Definition: connection.hpp:265
void guard_write_operation()
Start guard write operation if necessary.
http_parser_settings create_parser_settings() noexcept
Helper for setting parser settings.
Definition: connection.hpp:152
void close() noexcept
Close connection functions.
connection_upgrade_stage_t m_connection_upgrade_stage
Connection upgrade request stage.
Definition: connection.hpp:244
connection_input_t m_input
Input routine.
void reset()
Prepare context to handle new request.
Definition: connection.hpp:105
void handle_xxx_timeout(const char *operation_name)
void after_read(const asio_ns::error_code &ec, std::size_t length) noexcept
Handle read operation result.
Definition: connection.hpp:510
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
connection_settings_handle_t< Traits > m_settings
Definition: connection.hpp:455
Handler for request with connection-upgrade header was called so any response data comming is for tha...
connection_factory_t(connection_settings_handle_t< Traits > connection_settings, std::unique_ptr< socket_options_setter_t > socket_options_setter)
connection_t(const connection_t &)=delete
void reset_parser()
Prepare parser for reading new http-message.
Definition: connection.hpp:252
http_parser_ctx_t(incoming_http_msg_limits_t limits)
The main constructor.
Definition: connection.hpp:98
void on_request_message_complete()
Handle a given request message.
Definition: connection.hpp:626
Internals that are necessary for upgrade.
Definition: connection.hpp:441
std::chrono::steady_clock::time_point m_current_timeout_after
Timeout point of a current guarded operation.
void schedule_operation_timeout_callback(std::chrono::steady_clock::duration timeout, timout_cb_t timout_cb)
connection_t(connection_t &&)=delete
void write_response_parts_impl(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg)
Write parts for specified request.
Definition: connection.hpp:886
No connection request in progress.
const endpoint_t m_remote_endpoint
Remote endpoint for this connection.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
A prepared weak handle for passing it to timer guard.
fixed_buffer_t m_buf
Input buffer.
Definition: connection.hpp:241
RESTINIO_NODISCARD char to_lower_case(unsigned char ch)
bool m_read_operation_is_running
Flag to track whether read operation is performed now.
Definition: connection.hpp:248
void init_next_timeout_checking()
Schedule next timeout checking.
upgrade_internals_t(connection_settings_handle_t< Traits > settings, stream_socket_t socket, lifetime_monitor_t lifetime_monitor)
Definition: connection.hpp:446
void handle_file_write_operation(file_write_operation_t &op)
Run sendfile write operation.
void trigger_error_and_close(Message_Builder msg_builder) noexcept
Trigger an error.
request_handler_t & m_request_handler
Request handler.
void guard_request_handling_operation()
Start guard request handling operation if necessary.
void handle_upgrade_request()
Calls handler for upgrade request.
Definition: connection.hpp:757
const incoming_http_msg_limits_t m_limits
Limits for the incoming message.
Definition: connection.hpp:91
void init_write()
Initiate write operation.
Definition: connection.hpp:990
Data associated with connection read routine.
Definition: connection.hpp:225
void wait_for_http_message()
Start reading next htttp-message.
Definition: connection.hpp:409
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
Schedules real timedout operations check on the executer of a connection.
auto create_new_connection(stream_socket_t socket, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
Context for handling http connections.
Definition: connection.hpp:303
Factory for connections.
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, optional_t< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in optional_t<Value_Type> if parameter exists ...
Definition: value_or.hpp:64
void guard_read_operation()
Statr guard read operation if necessary.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
Run trivial buffers write operation.
connection_upgrade_stage_t
Enum for a flag specifying that connection is going to upgrade or not.
Definition: connection.hpp:205