RESTinio
connection.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  HTTP-connection routine.
7 */
8 
9 #pragma once
10 
11 #include <restinio/asio_include.hpp>
12 
13 #include <http_parser.h>
14 
15 #include <fmt/format.h>
16 
17 #include <restinio/exception.hpp>
18 #include <restinio/http_headers.hpp>
19 #include <restinio/request_handler.hpp>
20 #include <restinio/impl/connection_base.hpp>
21 #include <restinio/impl/header_helpers.hpp>
22 #include <restinio/impl/response_coordinator.hpp>
23 #include <restinio/impl/connection_settings.hpp>
24 #include <restinio/impl/fixed_buffer.hpp>
25 #include <restinio/impl/write_group_output_ctx.hpp>
26 #include <restinio/impl/executor_wrapper.hpp>
27 #include <restinio/impl/sendfile_operation.hpp>
28 
29 #include <restinio/utils/impl/safe_uint_truncate.hpp>
30 
31 namespace restinio
32 {
33 
34 namespace impl
35 {
36 
37 //
38 // http_parser_ctx_t
39 //
40 
41 //! Parsing result context for using in parser callbacks.
42 /*!
43  All data is used as temps, and is usable only
44  after parsing completes new requests then it is moved out.
45 */
47 {
48  //! Request data.
49  //! \{
52  //! \}
53 
54  //! Parser context temp values and flags.
55  //! \{
57  bool m_last_was_value{ true };
58  //! \}
59 
60  //! Flag: is http message parsed completely.
61  bool m_message_complete{ false };
62 
63  //! Prepare context to handle new request.
64  void
66  {
67  m_header = http_request_header_t{};
68  m_body.clear();
69  m_current_field_name.clear();
70  m_last_was_value = true;
71  m_message_complete = false;
72  }
73 };
74 
75 //! Include parser callbacks.
77 
78 //
79 // create_parser_settings()
80 //
81 
82 //! Helper for setting parser settings.
83 /*!
84  Is used to initialize const value in connection_settings_t ctor.
85 */
86 template< typename Http_Methods >
89 {
92 
94  []( http_parser * parser, const char * at, size_t length ) -> int {
95  return restinio_url_cb( parser, at, length );
96  };
97 
99  []( http_parser * parser, const char * at, size_t length ) -> int {
101  };
102 
104  []( http_parser * parser, const char * at, size_t length ) -> int {
106  };
107 
109  []( http_parser * parser ) -> int {
111  };
112 
114  []( http_parser * parser, const char * at, size_t length ) -> int {
115  return restinio_body_cb( parser, at, length );
116  };
117 
119  []( http_parser * parser ) -> int {
121  };
122 
123  return parser_settings;
124 }
125 
126 //
127 // connection_upgrade_stage_t
128 //
129 
130 //! Enum for a flag specifying that connection is going to upgrade or not.
132 {
133  //! No connection request in progress
134  none,
135  //! Request with connection-upgrade header came and waits for
136  //! request handler to be called in non pipelined fashion
137  //! (it must be the only request that is handled at the moment).
139  //! Handler for request with connection-upgrade header was called
140  //! so any response data comming is for that request.
141  //! If connection transforms to websocket connection
142  //! then no further operations are expected.
144 };
145 
146 //
147 // connection_input_t
148 //
149 
150 //! Data associated with connection read routine.
152 {
153  connection_input_t( std::size_t buffer_size )
154  : m_buf{ buffer_size }
155  {}
156 
157  //! HTTP-parser.
158  //! \{
161  //! \}
162 
163  //! Input buffer.
165 
166  //! Connection upgrade request stage.
169 
170  //! Flag to track whether read operation is performed now.
172 
173  //! Prepare parser for reading new http-message.
174  void
176  {
177  // Reinit parser.
178  http_parser_init( &m_parser, HTTP_REQUEST);
179 
180  // Reset context and attach it to parser.
181  m_parser_ctx.reset();
182  m_parser.data = &m_parser_ctx;
183  }
184 };
185 
186 template < typename Connection, typename Start_Read_CB, typename Failed_CB >
187 void
189  asio_ns::ip::tcp::socket & ,
190  Connection & ,
191  Start_Read_CB start_read_cb,
192  Failed_CB )
193 {
194  // No preparation is needed, start
195  start_read_cb();
196 }
197 
198 //
199 // connection_t
200 //
201 
202 //! Context for handling http connections.
203 /*
204  Working circle consists of the following steps:
205  * wait for request -- reading from socket and parsing header and body;
206  * handling request -- once the request is completely obtained it's handling
207  is deligated to a handler chosen by handler factory;
208  * writing response -- writing response to socket;
209  * back to first step o close connection -- depending on keep-alive property
210  of the last response the connection goes back to first step or
211  shutdowns.
212 
213  Each step is controlled by timer (\see schedule_operation_timeout_callback())
214 
215  In case of errors connection closes itself.
216 */
217 template < typename Traits >
219  : public connection_base_t
220  , public executor_wrapper_t< typename Traits::strand_t >
221 {
223 
224  public:
225  using timer_manager_t = typename Traits::timer_manager_t;
226  using timer_guard_t = typename timer_manager_t::timer_guard_t;
227  using request_handler_t = typename Traits::request_handler_t;
228  using logger_t = typename Traits::logger_t;
229  using strand_t = typename Traits::strand_t;
230  using stream_socket_t = typename Traits::stream_socket_t;
231 
233  //! Connection id.
234  connection_id_t conn_id,
235  //! Connection socket.
236  stream_socket_t && socket,
237  //! Settings that are common for connections.
238  connection_settings_handle_t< Traits > settings,
239  //! Remote endpoint for that connection.
240  endpoint_t remote_endpoint )
243  , m_socket{ std::move( socket ) }
244  , m_settings{ std::move( settings ) }
250  , m_logger{ *( m_settings->m_logger ) }
251  {
252  // Notify of a new connection instance.
253  m_logger.trace( [&]{
254  return fmt::format(
255  "[connection:{}] start connection with {}",
256  connection_id(),
258  } );
259  }
260 
261  // Disable copy/move.
262  connection_t( const connection_t & ) = delete;
263  connection_t( connection_t && ) = delete;
264  connection_t & operator = ( const connection_t & ) = delete;
265  connection_t & operator = ( connection_t && ) = delete;
266 
268  {
269  try
270  {
271  m_logger.trace( [&]{
272  return fmt::format(
273  "[connection:{}] destructor called",
274  connection_id() );
275  } );
276  }
277  catch( ... )
278  {}
279  }
280 
281  void
283  {
285  m_socket,
286  *this,
287  [ & ]{
288  // Inform state listener if it used.
289  m_settings->call_state_listener( [this]() noexcept {
290  return connection_state::notice_t{
291  this->connection_id(),
292  this->m_remote_endpoint,
294  } );
295 
296  // Start timeout checking.
299 
300  // Start reading request.
302  },
303  [ & ]( const asio_ns::error_code & ec ){
305  return fmt::format(
306  "[connection:{}] prepare connection error: {}",
307  connection_id(),
308  ec.message() );
309  } );
310  } );
311  }
312 
313  //! Start reading next htttp-message.
314  void
316  {
317  m_logger.trace( [&]{
318  return fmt::format(
319  "[connection:{}] start waiting for request",
320  connection_id() );
321  } );
322 
323  // Prepare parser for consuming new request message.
325 
326  // Guard total time for a request to be read.
327  // guarding here makes the total read process
328  // to run in read_next_http_message_timelimit.
330 
331  if( 0 != m_input.m_buf.length() )
332  {
333  // If a pipeline requests were sent by client
334  // then the biginning (or even entire request) of it
335  // is in the buffer obtained from socket in previous
336  // read operation.
338  }
339  else
340  {
341  // Next request (if any) must be obtained from socket.
342  consume_message();
343  }
344  }
345 
346  //! Internals that are necessary for upgrade.
348  {
350  upgrade_internals_t && ) = default;
351 
353  connection_settings_handle_t< Traits > settings,
354  stream_socket_t socket )
355  : m_settings{ std::move( settings ) }
356  , m_socket{ std::move( socket ) }
357  {}
358 
360  stream_socket_t m_socket;
361  };
362 
363  //! Move socket out of connection.
366  {
367  return upgrade_internals_t{
368  m_settings,
369  std::move( m_socket ) };
370  }
371 
372  private:
373  //! Start (continue) a chain of read-parse-read-... operations.
374  inline void
376  {
378  {
379  m_logger.trace( [&]{
380  return fmt::format(
381  "[connection:{}] continue reading request",
382  connection_id() );
383  } );
384 
385 
390  this->get_executor(),
391  [ this, ctx = shared_from_this() ](
392  const asio_ns::error_code & ec,
393  std::size_t length ){
395  after_read( ec, length );
396  } ) );
397  }
398  else
399  {
400  m_logger.trace( [&]{
401  return fmt::format(
402  "[connection:{}] skip read operation: already running",
403  connection_id() );
404  } );
405  }
406  }
407 
408  //! Handle read operation result.
409  inline void
410  after_read( const asio_ns::error_code & ec, std::size_t length )
411  {
412  if( !ec )
413  {
414  m_logger.trace( [&]{
415  return fmt::format(
416  "[connection:{}] received {} bytes",
417  this->connection_id(),
418  length );
419  } );
420 
422 
424  }
425  else
426  {
427  // Well, if it is actually an error
428  // then close connection.
430  {
431  if ( !error_is_eof( ec ) || 0 != m_input.m_parser.nread )
433  return fmt::format(
434  "[connection:{}] read socket error: {}; "
435  "parsed bytes: {}",
436  connection_id(),
437  ec.message(),
439  } );
440  else
441  {
442  // A case that is not such an error:
443  // on a connection (most probably keeped alive
444  // after previous request, but a new also applied)
445  // no bytes were consumed and remote peer closes connection.
446  m_logger.trace( [&]{
447  return fmt::format(
448  "[connection:{}] EOF and no request, "
449  "close connection",
450  connection_id() );
451  } );
452 
453  close();
454  }
455  }
456  // else: read operation was cancelled.
457  }
458  }
459 
460  //! Parse some data.
461  void
462  consume_data( const char * data, std::size_t length )
463  {
464  auto & parser = m_input.m_parser;
465 
466  const auto nparsed =
468  &parser,
470  data,
471  length );
472 
473  // If entire http-message was obtained,
474  // parser is stopped and the might be a part of consecutive request
475  // left in buffer, so we mark how many bytes were obtained.
476  // and next message read (if any) will be started from already existing
477  // data left in buffer.
479 
480  if( HPE_OK != parser.http_errno &&
482  {
483  // PARSE ERROR:
484  auto err = HTTP_PARSER_ERRNO( &parser );
485 
486  // TODO: handle case when there are some request in process.
488  return fmt::format(
489  "[connection:{}] parser error {}: {}",
490  connection_id(),
491  http_errno_name( err ),
493  } );
494 
495  // nothing to do.
496  return;
497  }
498 
500  {
502  }
503  else
504  consume_message();
505  }
506 
507  //! Handle a given request message.
508  void
510  {
511  try
512  {
513  auto & parser = m_input.m_parser;
515 
516  if( m_input.m_parser.upgrade )
517  {
518  // Start upgrade connection operation.
519 
520  // The first thing is to make sure
521  // that upgrade request will be handled in
522  // a non pipelined fashion.
525  }
526 
529  {
530  // Run ordinary HTTP logic.
532 
533  m_logger.trace( [&]{
534  return fmt::format(
535  "[connection:{}] request received (#{}): {} {}",
536  connection_id(),
537  request_id,
539  static_cast<http_method>( parser.method ) ),
541  } );
542 
543  // TODO: mb there is a way to
544  // track if response was emmited immediately in handler
545  // or it was delegated
546  // so it is possible to omit this timer scheduling.
548 
549  if( request_rejected() ==
552  request_id,
556  m_remote_endpoint ) ) )
557  {
558  // If handler refused request, say not implemented.
560  request_id,
565  }
567  {
568  // Request was accepted,
569  // didn't create immediate response that closes connection after,
570  // and it is possible to receive more requests
571  // then start consuming yet another request.
573  }
574  }
575  else
576  {
577  m_logger.trace( [&]{
578  const std::string default_value{};
579 
580  return fmt::format(
581  "[connection:{}] upgrade request received: {} {}; "
582  "Upgrade: '{}';",
583  connection_id(),
585  static_cast<http_method>( parser.method ) ),
589  } );
590 
592  {
593  // There are no requests in handling
594  // So the current request with upgrade
595  // is the only one and can be handled directly.
596  // It is safe to call a handler for it.
598  }
599  else
600  {
601  // There are pipelined request
602  m_logger.trace( [&]{
603  return fmt::format(
604  "[connection:{}] upgrade request happened to be a pipelined one, "
605  "and will be handled after previous requests are handled",
606  connection_id() );
607  } );
608  }
609 
610  // No further actions (like continue reading) in both cases are needed.
611  }
612 
613  }
614  catch( const std::exception & ex )
615  {
617  return fmt::format(
618  "[connection:{}] error while handling request: {}",
619  this->connection_id(),
620  ex.what() );
621  } );
622  }
623  }
624 
625  //! Calls handler for upgrade request.
626  /*!
627  Request data must be in input context (m_input).
628  */
629  void
631  {
632  auto & parser = m_input.m_parser;
634 
635  // If user responses with error
636  // then connection must be able to send
637  // (hence to receive) response.
638 
640 
641  m_logger.info( [&]{
642  return fmt::format(
643  "[connection:{}] handle upgrade request (#{}): {} {}",
644  connection_id(),
645  request_id,
647  static_cast<http_method>( parser.method ) ),
649  } );
650 
651  // Do not guard upgrade request.
653 
654  // After calling handler we expect the results or
655  // no further operations with connection
658 
659  if( request_rejected() ==
662  request_id,
666  m_remote_endpoint) ) )
667  {
668  if( m_socket.is_open() )
669  {
670  // Request is rejected, so our socket
671  // must not be moved out to websocket connection.
672 
673  // If handler refused request, say not implemented.
675  request_id,
680  }
681  else
682  {
683  // Request is rejected, but the socket
684  // was moved out to somewhere else???
685 
686  m_logger.error( [&]{
687  return fmt::format(
688  "[connection:{}] upgrade request handler rejects "
689  "request, but socket was moved out from connection",
690  connection_id() );
691  } );
692  }
693  }
694 
695  // Else 2 cases:
696  // 1. request is handled asynchronously, so
697  // what happens next depends on handling.
698  // 2. handling was immediate, so further destiny
699  // of a connection was already determined.
700  //
701  // In both cases: here do nothing.
702  // We can't even do read-only access because
703  // upgrade handling might take place
704  // in distinct execution context.
705  // So no even a log messages here.
706  }
707 
708  //! Write parts for specified request.
709  virtual void
711  //! Request id.
712  request_id_t request_id,
713  //! Resp output flag.
714  response_output_flags_t response_output_flags,
715  //! Part of the response data.
716  write_group_t wg ) override
717  {
718  //! Run write message on io_context loop if possible.
719  asio_ns::dispatch(
720  this->get_executor(),
721  [ this,
722  request_id,
724  actual_wg = std::move( wg ),
725  ctx = shared_from_this() ]() mutable {
726  try
727  {
729  request_id,
731  std::move( actual_wg ) );
732  }
733  catch( const std::exception & ex )
734  {
736  return fmt::format(
737  "[connection:{}] unable to handle response: {}",
738  connection_id(),
739  ex.what() );
740  } );
741  }
742  } );
743  }
744 
745  //! Write parts for specified request.
746  void
748  //! Request id.
749  request_id_t request_id,
750  //! Resp output flag.
751  response_output_flags_t response_output_flags,
752  //! Part of the response data.
753  write_group_t wg )
754  {
756  try
757  {
761  }
762  catch( const std::exception & ex )
763  {
764  m_logger.error( [&]{
765  return fmt::format(
766  "[connection:{}] notificator error: {}",
767  connection_id(),
768  ex.what() );
769  } );
770  }
771  };
772 
773  if( m_socket.is_open() )
774  {
778  {
779  // It is response for a connection-upgrade request.
780  // If we receive it here then it is constructed via
781  // message builder and so connection was not transformed
782  // to websocket connection.
783  // So it is necessary to resume pipeline logic that was stopped
784  // for upgrade-request to be handled as the only request
785  // on the connection for that moment.
787  {
789  }
790  }
791 
793  {
794  m_logger.trace( [&]{
795  return fmt::format(
796  "[connection:{}] append response (#{}), "
797  "flags: {}, write group size: {}",
798  connection_id(),
799  request_id,
801  wg.items_count() );
802  } );
803 
805  request_id,
807  std::move( wg ) );
808 
810  }
811  else
812  {
813  m_logger.warn( [&]{
814  return fmt::format(
815  "[connection:{}] receive response parts for "
816  "request (#{}), but response with connection-close "
817  "attribute happened before",
818  connection_id(),
819  request_id );
820  } );
822  }
823  }
824  else
825  {
826  m_logger.warn( [&]{
827  return fmt::format(
828  "[connection:{}] try to write response, "
829  "while socket is closed",
830  connection_id() );
831  } );
833  }
834  }
835 
836  // Check if there is something to write,
837  // and if so starts write operation.
838  void
840  {
842 
844  {
845  init_write();
846  }
847  }
848 
849  //! Initiate write operation.
850  void
852  {
853  // Here: not writing anything to socket, so
854  // write operation can be initiated.
855 
856  // Remember if all response cells were busy.
859 
861 
862  if( next_write_group )
863  {
864  m_logger.trace( [&]{
865  return fmt::format(
866  "[connection:{}] start next write group for response (#{}), "
867  "size: {}",
868  this->connection_id(),
871  } );
872 
873  // Check if all response cells busy:
876 
877  // Whether we need to resume read after this group is written?
881 
883  {
884  // We need to extract status line out of the first buffer
885  assert(
888 
889  m_logger.trace( [&]{
890  // Get status line:
891  const string_view_t
892  status_line{
893  asio_ns::buffer_cast< const char * >(
896 
897  return
898  fmt::format(
899  "[connection:{}] start response (#{}): {}",
900  this->connection_id(),
902  status_line );
903  } );
904  }
905 
906  // Initialize write context with a new write group.
909 
910  // Start the loop of sending data from current write group.
912  }
913  else
914  {
916  }
917  }
918 
919  // Use aliases for shorter names.
923 
924  //! Start/continue/continue handling output data of current write group.
925  /*!
926  This function is a starting point of a loop process of sending data
927  from a given write group.
928  It extracts the next bunch of trivial buffers or a
929  sendfile-runner and starts an appropriate write operation.
930  In data of a given write group finishes,
931  finish_handling_current_write_ctx() is invoked thus breaking the loop.
932  */
933  void
935  {
936  try
937  {
939 
941  {
943  }
945  {
947  }
948  else
949  {
952  }
953  }
954  catch( const std::exception & ex )
955  {
957  return fmt::format(
958  "[connection:{}] handle_current_write_ctx failed: {}",
959  connection_id(),
960  ex.what() );
961  } );
962  }
963  }
964 
965  //! Run trivial buffers write operation.
966  void
967  handle_trivial_write_operation( const trivial_write_operation_t & op )
968  {
969  // Asio buffers (param for async write):
970  auto & bufs = op.get_trivial_bufs();
971 
973  {
974  m_logger.trace( [&]{
975  return fmt::format(
976  "[connection:{}] sending resp data with "
977  "connection-close attribute "
978  "buf count: {}, "
979  "total size: {}",
980  connection_id(),
981  bufs.size(),
982  op.size() );
983  } );
984 
985  // Reading new requests is useless.
988  }
989  else
990  {
991  m_logger.trace( [&]{
992  return fmt::format(
993  "[connection:{}] sending resp data, "
994  "buf count: {}, "
995  "total size: {}",
996  connection_id(),
997  bufs.size(),
998  op.size() ); } );
999  }
1000 
1001  // There is somethig to write.
1003  m_socket,
1004  bufs,
1006  this->get_executor(),
1007  [ this,
1008  ctx = shared_from_this() ]
1009  ( const asio_ns::error_code & ec, std::size_t written ){
1010 
1011  if( !ec )
1012  {
1013  m_logger.trace( [&]{
1014  return fmt::format(
1015  "[connection:{}] outgoing data was sent: {} bytes",
1016  connection_id(),
1017  written );
1018  } );
1019  }
1020 
1021  after_write( ec );
1022  } ) );
1023 
1025  }
1026 
1027  //! Run sendfile write operation.
1028  void
1029  handle_file_write_operation( file_write_operation_t & op )
1030  {
1032  {
1033  m_logger.trace( [&]{
1034  return fmt::format(
1035  "[connection:{}] sending resp file data with "
1036  "connection-close attribute, "
1037  "total size: {}",
1038  connection_id(),
1039  op.size() );
1040  } );
1041 
1042  // Reading new requests is useless.
1045  }
1046  else
1047  {
1048  m_logger.trace( [&]{
1049  return fmt::format(
1050  "[connection:{}] sending resp file data, total size: {}",
1051  connection_id(),
1052  op.size() );
1053  } );
1054  }
1055 
1057 
1058  auto op_ctx = op;
1059 
1061  this->get_executor(),
1062  m_socket,
1064  this->get_executor(),
1065  [ this,
1066  ctx = shared_from_this(),
1067  // Store operation context till the end
1068  op_ctx ](
1069  const asio_ns::error_code & ec,
1070  file_size_t written ) mutable{
1071 
1072  // Reset sendfile operation context.
1073  op_ctx.reset();
1074 
1075  if( !ec )
1076  {
1077  m_logger.trace( [&]{
1078  return fmt::format(
1079  "[connection:{}] file data was sent: {} bytes",
1080  connection_id(),
1081  written );
1082  } );
1083  }
1084  else
1085  {
1086  m_logger.error( [&]{
1087  return fmt::format(
1088  "[connection:{}] send file data error: {} ({}) bytes",
1089  connection_id(),
1090  ec.value(),
1091  ec.message() );
1092  } );
1093  }
1094 
1095  after_write( ec );
1096  } ) );
1097 
1098  }
1099 
1100  //! Do post write actions for current write group.
1101  void
1103  {
1104  // Finishing writing this group.
1105  m_logger.trace( [&]{
1106  return fmt::format(
1107  "[connection:{}] finishing current write group",
1108  this->connection_id() );
1109  } );
1110 
1111  // Group notificators are called from here (if exist):
1113 
1115  {
1116  m_logger.trace( [&]{
1117  return fmt::format(
1118  "[connection:{}] should keep alive",
1119  this->connection_id() );
1120  } );
1121 
1124  {
1125  // Run ordinary HTTP logic.
1127  {
1129  }
1130 
1131  // Start another write opertion
1132  // if there is something to send.
1134  }
1135  else
1136  {
1138  {
1139  // Here upgrade req is the only request
1140  // to by handled by this connection.
1141  // So it is safe to call a handler for it.
1143  }
1144  else
1145  {
1146  // Do not start reading in any case,
1147  // but if there is at least one request preceding
1148  // upgrade-req, logic must continue http interaction.
1150  }
1151  }
1152  }
1153  else
1154  {
1155  // No keep-alive, close connection.
1156  close();
1157  }
1158  }
1159 
1160  void
1162  {
1164  {
1165  // Bufs empty but there happened to
1166  // be a response context marked as complete
1167  // (final_parts) and having connection-close attr.
1168  // It is because `init_write_if_necessary()`
1169  // is called only under `!m_response_coordinator.closed()`
1170  // condition, so if no bufs were obtained
1171  // and response coordinator is closed means
1172  // that a first response stored by
1173  // response coordinator was marked as complete
1174  // without data.
1175 
1176  m_logger.trace( [&]{
1177  return fmt::format(
1178  "[connection:{}] last sent response was marked "
1179  "as complete",
1180  connection_id() ); } );
1181  close();
1182  }
1183  else
1184  {
1185  // Not writing anything, so need to deal with timouts.
1187  {
1188  // No requests in processing.
1189  // So set read next request timeout.
1191  }
1192  else
1193  {
1194  // Have requests in process.
1195  // So take control over request handling.
1197  }
1198  }
1199  }
1200 
1201  //! Handle write response finished.
1202  void
1203  after_write( const asio_ns::error_code & ec )
1204  {
1205  if( !ec )
1206  {
1208  }
1209  else
1210  {
1211  if( !error_is_operation_aborted( ec ) )
1212  {
1214  return fmt::format(
1215  "[connection:{}] unable to write: {}",
1216  connection_id(),
1217  ec.message() );
1218  } );
1219 
1220  }
1221  // else: Operation aborted only in case of close was called.
1222 
1223  try
1224  {
1226  }
1227  catch( const std::exception & ex )
1228  {
1229  m_logger.error( [&]{
1230  return fmt::format(
1231  "[connection:{}] notificator error: {}",
1232  connection_id(),
1233  ex.what() );
1234  } );
1235  }
1236  }
1237  }
1238 
1239  //! Close connection functions.
1240  //! \{
1241 
1242  //! Standard close routine.
1243  void
1245  {
1246  m_logger.trace( [&]{
1247  return fmt::format(
1248  "[connection:{}] close",
1249  connection_id() );
1250  } );
1251 
1255  ignored_ec );
1256  m_socket.close();
1257 
1258  m_logger.trace( [&]{
1259  return fmt::format(
1260  "[connection:{}] close: close socket",
1261  connection_id() );
1262  } );
1263 
1264  // Clear stuff.
1265 
1267  m_logger.trace( [&]{
1268  return fmt::format(
1269  "[connection:{}] close: timer canceled",
1270  connection_id() );
1271  } );
1272 
1274 
1275  m_logger.trace( [&]{
1276  return fmt::format(
1277  "[connection:{}] close: reset responses data",
1278  connection_id() );
1279  } );
1280 
1281  // Inform state listener if it used.
1282  m_settings->call_state_listener( [this]() noexcept {
1283  return connection_state::notice_t{
1284  this->connection_id(),
1285  this->m_remote_endpoint,
1287  } );
1288  }
1289 
1290  //! Trigger an error.
1291  /*!
1292  Closes the connection and write to log
1293  an error message.
1294  */
1295  template< typename Message_Builder >
1296  void
1297  trigger_error_and_close( Message_Builder msg_builder )
1298  {
1300 
1301  close();
1302  }
1303  //! \}
1304 
1305  //! Connection.
1306  stream_socket_t m_socket;
1307 
1308  //! Common paramaters of a connection.
1310 
1311  //! Remote endpoint for this connection.
1313 
1314  //! Input routine.
1316 
1317  //! Write to socket operation context.
1319 
1320  // Memo flag: whether we need to resume read after this group is written
1322 
1323  //! Response coordinator.
1325 
1326  //! Timer to controll operations.
1327  //! \{
1328 
1329  //! Check timeouts for all activities.
1330  static connection_t &
1331  cast_to_self( tcp_connection_ctx_base_t & base )
1332  {
1333  return static_cast< connection_t & >( base );
1334  }
1335 
1336  //! Schedules real timedout operations check on
1337  //! the executer of a connection.
1338  virtual void
1339  check_timeout( tcp_connection_ctx_handle_t & self ) override
1340  {
1341  asio_ns::dispatch(
1342  this->get_executor(),
1343  [ ctx = std::move( self ) ]{
1345  } );
1346  }
1347 
1348  //! Callback type for timedout operations.
1349  using timout_cb_t = void (connection_t::* )( void );
1350 
1351  //! Callback to all if timeout happened.
1352  timout_cb_t m_current_timeout_cb{ nullptr };
1353 
1354  //! Timeout point of a current guarded operation.
1356  //! Timer guard.
1357  timer_guard_t m_timer_guard;
1358  //! A prepared weak handle for passing it to timer guard.
1360 
1361  //! Check timed out operation.
1362  void
1364  {
1366  {
1367  if( m_current_timeout_cb )
1368  (this->*m_current_timeout_cb)();
1369  }
1370  else
1371  {
1373  }
1374  }
1375 
1376  //! Schedule next timeout checking.
1377  void
1379  {
1381  }
1382 
1383  //! Stop timout guarding.
1384  void
1386  {
1387  m_current_timeout_cb = nullptr;
1389  }
1390 
1391  //! Helper function to work with timer guard.
1392  void
1394  std::chrono::steady_clock::time_point timeout_after,
1395  timout_cb_t timout_cb )
1396  {
1399  }
1400 
1401  void
1403  std::chrono::steady_clock::duration timeout,
1404  timout_cb_t timout_cb )
1405  {
1408  timout_cb );
1409  }
1410 
1411  void
1412  handle_xxx_timeout( const char * operation_name )
1413  {
1414  m_logger.trace( [&]{
1415  return fmt::format(
1416  "[connection:{}] {} timed out",
1417  connection_id(),
1418  operation_name );
1419  } );
1420 
1421  close();
1422  }
1423 
1424  void
1426  {
1427  handle_xxx_timeout( "wait for request" );
1428  }
1429 
1430  //! Statr guard read operation if necessary.
1431  void
1433  {
1435  {
1439  }
1440  }
1441 
1442  void
1444  {
1445  handle_xxx_timeout( "handle request" );
1446  }
1447 
1448  //! Start guard request handling operation if necessary.
1449  void
1451  {
1453  {
1457  }
1458  }
1459 
1460  void
1462  {
1463  handle_xxx_timeout( "writing response" );
1464  }
1465 
1466  //! Start guard write operation if necessary.
1467  void
1469  {
1473  }
1474 
1475  void
1477  {
1478  handle_xxx_timeout( "writing response (sendfile)" );
1479  }
1480 
1481  void
1482  guard_sendfile_operation( std::chrono::steady_clock::duration timelimit )
1483  {
1484  if( std::chrono::steady_clock::duration::zero() == timelimit )
1486 
1488  timelimit,
1490  }
1491  //! \}
1492 
1493  //! Request handler.
1494  request_handler_t & m_request_handler;
1495 
1496  //! Logger for operation
1497  logger_t & m_logger;
1498 };
1499 
1500 //
1501 // connection_factory_t
1502 //
1503 
1504 //! Factory for connections.
1505 template < typename Traits >
1507 {
1508  public:
1509  using logger_t = typename Traits::logger_t;
1510  using stream_socket_t = typename Traits::stream_socket_t;
1511 
1513  connection_settings_handle_t< Traits > connection_settings,
1514  std::unique_ptr< socket_options_setter_t > socket_options_setter )
1518  {}
1519 
1520  auto
1522  stream_socket_t socket,
1523  endpoint_t remote_endpoint )
1524  {
1525  using connection_type_t = connection_t< Traits >;
1526  std::shared_ptr< connection_type_t > result;
1527  try
1528  {
1529  {
1530  socket_options_t options{ socket.lowest_layer() };
1531  (*m_socket_options_setter)( options );
1532  }
1533 
1534  result = std::make_shared< connection_type_t >(
1535  m_connection_id_counter++,
1536  std::move( socket ),
1537  m_connection_settings,
1538  std::move( remote_endpoint ) );
1539  }
1540  catch( const std::exception & ex )
1541  {
1542  m_logger.error( [&]{
1543  return fmt::format(
1544  "failed to create connection: {}",
1545  ex.what() );
1546  } );
1547  }
1548 
1549  return result;
1550  }
1551 
1552  private:
1554 
1556 
1558 
1559  logger_t & m_logger;
1560 };
1561 
1562 } /* namespace impl */
1563 
1564 } /* namespace restinio */
http_request_header_t m_header
Request data.
Definition: connection.hpp:50
void check_timeout_impl()
Check timed out operation.
upgrade_internals_t move_upgrade_internals()
Move socket out of connection.
Definition: connection.hpp:365
static connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timer to controll operations.
void consume_message()
Start (continue) a chain of read-parse-read-... operations.
Definition: connection.hpp:375
connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
std::string m_current_field_name
Parser context temp values and flags.
Definition: connection.hpp:56
response_coordinator_t m_response_coordinator
Response coordinator.
bool m_message_complete
Flag: is http message parsed completely.
Definition: connection.hpp:61
Request with connection-upgrade header came and waits for request handler to be called in non pipelin...
write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
void guard_sendfile_operation(std::chrono::steady_clock::duration timelimit)
timout_cb_t m_current_timeout_cb
Callback to all if timeout happened.
void handle_current_write_ctx()
Start/continue/continue handling output data of current write group.
Definition: connection.hpp:934
upgrade_internals_t(upgrade_internals_t &&)=default
connection_t & operator=(connection_t &&)=delete
http_parser m_parser
HTTP-parser.
Definition: connection.hpp:159
connection_t & operator=(const connection_t &)=delete
std::unique_ptr< socket_options_setter_t > m_socket_options_setter
logger_t & m_logger
Logger for operation.
connection_settings_handle_t< Traits > m_connection_settings
stream_socket_t m_socket
Connection.
void after_read(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result.
Definition: connection.hpp:410
void finish_handling_current_write_ctx()
Do post write actions for current write group.
timer_guard_t m_timer_guard
Timer guard.
void consume_data(const char *data, std::size_t length)
Parse some data.
Definition: connection.hpp:462
upgrade_internals_t(connection_settings_handle_t< Traits > settings, stream_socket_t socket)
Definition: connection.hpp:352
connection_input_t(std::size_t buffer_size)
Definition: connection.hpp:153
virtual void write_response_parts(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg) override
Write parts for specified request.
Definition: connection.hpp:710
void prepare_connection_and_start_read(asio_ns::ip::tcp::socket &, Connection &, Start_Read_CB start_read_cb, Failed_CB)
Definition: connection.hpp:188
void guard_write_operation()
Start guard write operation if necessary.
http_parser_settings create_parser_settings() noexcept
Helper for setting parser settings.
Definition: connection.hpp:88
connection_upgrade_stage_t m_connection_upgrade_stage
Connection upgrade request stage.
Definition: connection.hpp:167
connection_input_t m_input
Input routine.
void reset()
Prepare context to handle new request.
Definition: connection.hpp:65
void handle_xxx_timeout(const char *operation_name)
connection_settings_handle_t< Traits > m_settings
Definition: connection.hpp:359
connection_t(connection_id_t conn_id, stream_socket_t &&socket, connection_settings_handle_t< Traits > settings, endpoint_t remote_endpoint)
Definition: connection.hpp:232
auto create_new_connection(stream_socket_t socket, endpoint_t remote_endpoint)
Handler for request with connection-upgrade header was called so any response data comming is for tha...
connection_factory_t(connection_settings_handle_t< Traits > connection_settings, std::unique_ptr< socket_options_setter_t > socket_options_setter)
connection_t(const connection_t &)=delete
void cancel_timeout_checking()
Stop timout guarding.
void reset_parser()
Prepare parser for reading new http-message.
Definition: connection.hpp:175
void on_request_message_complete()
Handle a given request message.
Definition: connection.hpp:509
Internals that are necessary for upgrade.
Definition: connection.hpp:347
void trigger_error_and_close(Message_Builder msg_builder)
Trigger an error.
std::chrono::steady_clock::time_point m_current_timeout_after
Timeout point of a current guarded operation.
void schedule_operation_timeout_callback(std::chrono::steady_clock::duration timeout, timout_cb_t timout_cb)
connection_t(connection_t &&)=delete
void write_response_parts_impl(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg)
Write parts for specified request.
Definition: connection.hpp:747
No connection request in progress.
const endpoint_t m_remote_endpoint
Remote endpoint for this connection.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
A prepared weak handle for passing it to timer guard.
fixed_buffer_t m_buf
Input buffer.
Definition: connection.hpp:164
bool m_read_operation_is_running
Flag to track whether read operation is performed now.
Definition: connection.hpp:171
void init_next_timeout_checking()
Schedule next timeout checking.
void handle_file_write_operation(file_write_operation_t &op)
Run sendfile write operation.
void close()
Close connection functions.
request_handler_t & m_request_handler
Request handler.
void guard_request_handling_operation()
Start guard request handling operation if necessary.
Parsing result context for using in parser callbacks.
Definition: connection.hpp:46
void handle_upgrade_request()
Calls handler for upgrade request.
Definition: connection.hpp:630
void init_write()
Initiate write operation.
Definition: connection.hpp:851
Data associated with connection read routine.
Definition: connection.hpp:151
void wait_for_http_message()
Start reading next htttp-message.
Definition: connection.hpp:315
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
Schedules real timedout operations check on the executer of a connection.
Context for handling http connections.
Definition: connection.hpp:218
Factory for connections.
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, optional_t< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in optional_t<Value_Type> if parameter exists ...
Definition: value_or.hpp:64
void guard_read_operation()
Statr guard read operation if necessary.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
Run trivial buffers write operation.
Definition: connection.hpp:967
connection_upgrade_stage_t
Enum for a flag specifying that connection is going to upgrade or not.
Definition: connection.hpp:131