RESTinio
ws_connection.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  WebSocket connection routine.
7 */
8 
9 #pragma once
10 
11 #include <queue>
12 
13 #include <restinio/asio_include.hpp>
14 
15 #include <http_parser.h>
16 
17 #include <restinio/impl/include_fmtlib.hpp>
18 
19 #include <restinio/all.hpp>
20 #include <restinio/impl/executor_wrapper.hpp>
21 #include <restinio/impl/write_group_output_ctx.hpp>
22 #include <restinio/websocket/message.hpp>
23 #include <restinio/websocket/impl/ws_parser.hpp>
24 #include <restinio/websocket/impl/ws_protocol_validator.hpp>
25 
26 #include <restinio/utils/impl/safe_uint_truncate.hpp>
27 
28 #include <restinio/compiler_features.hpp>
29 
30 namespace restinio
31 {
32 
33 namespace websocket
34 {
35 
36 namespace basic
37 {
38 
39 namespace impl
40 {
41 
43 
44 //! Max possible size of websocket frame header (a part before payload).
45 constexpr size_t
47 {
48  return 14;
49 }
50 
51 //
52 // ws_outgoing_data_t
53 //
54 
55 //! A queue for outgoing buffers.
57 {
58  public:
59  //! Add buffers to queue.
60  void
61  append( write_group_t wg )
62  {
63  m_awaiting_write_groups.emplace( std::move( wg ) );
64  }
65 
68  {
69  optional_t< write_group_t > result;
70 
71  if( !m_awaiting_write_groups.empty() )
72  {
73  result = std::move( m_awaiting_write_groups.front() );
74  m_awaiting_write_groups.pop();
75  }
76 
77  return result;
78  }
79 
80  private:
81  //! A queue of buffers.
83 };
84 
85 //
86 // connection_input_t
87 //
88 
89 //! Websocket input stuff.
91 {
92  connection_input_t( std::size_t buffer_size )
93  : m_buf{ buffer_size }
94  {}
95 
96  //! websocket parser.
98 
99  //! Input buffer.
101 
102  //! Current payload.
104 
105  //! Prepare parser for reading new http-message.
106  void
108  {
109  m_parser.reset();
110  m_payload.clear();
111  }
112 };
113 
114 //
115 // ws_connection_t
116 //
117 
118 //! Context for handling websocket connections.
119 template <
120  typename Traits,
121  typename WS_Message_Handler >
123  : public ws_connection_base_t
124  , public restinio::impl::executor_wrapper_t< typename Traits::strand_t >
125 {
127 
128  public:
129  using message_handler_t = WS_Message_Handler;
130 
131  using timer_manager_t = typename Traits::timer_manager_t;
133  using timer_guard_t = typename timer_manager_t::timer_guard_t;
134  using logger_t = typename Traits::logger_t;
135  using strand_t = typename Traits::strand_t;
136  using stream_socket_t = typename Traits::stream_socket_t;
137  using lifetime_monitor_t =
139 
141 
142  ws_connection_t(
143  //! Connection id.
144  connection_id_t conn_id,
145  //! Data inherited from http-connection.
146  //! \{
147  restinio::impl::connection_settings_handle_t< Traits > settings,
148  stream_socket_t socket,
149  lifetime_monitor_t lifetime_monitor,
150  //! \}
151  message_handler_t msg_handler )
154  , m_settings{ std::move( settings ) }
155  , m_socket{ std::move( socket ) }
160  , m_logger{ *( m_settings->m_logger ) }
161  {
162  // Notify of a new connection instance.
163  m_logger.trace( [&]{
164  return fmt::format(
165  "[connection:{}] move socket to [ws_connection:{}]",
166  connection_id(),
167  connection_id() );
168  } );
169 
170  m_logger.trace( [&]{
171  return fmt::format(
172  "[ws_connection:{}] start connection with {}",
173  connection_id(),
175  } );
176 
177  // Inform state listener if it used.
178  m_settings->call_state_listener( [this]() noexcept {
179  return connection_state::notice_t {
180  connection_id(),
183  };
184  } );
185  }
186 
187  ws_connection_t( const ws_connection_t & ) = delete;
188  ws_connection_t( ws_connection_t && ) = delete;
189  ws_connection_t & operator = ( const ws_connection_t & ) = delete;
190  ws_connection_t & operator = ( ws_connection_t && ) = delete;
191 
193  {
194  try
195  {
196  // Notify of a new connection instance.
197  m_logger.trace( [&]{
198  return fmt::format(
199  "[ws_connection:{}] destructor called",
200  connection_id() );
201  } );
202  }
203  catch( ... )
204  {}
205  }
206 
207  //! Shutdown websocket.
208  virtual void
209  shutdown() override
210  {
211  asio_ns::dispatch(
212  this->get_executor(),
213  [ this, ctx = shared_from_this() ]
214  // NOTE: this lambda is noexcept since v.0.6.0.
215  () noexcept {
216  try
217  {
218  // An exception from logger shouldn't prevent
219  // main shutdown actions.
221  [&]{
222  return fmt::format(
223  "[ws_connection:{}] shutdown",
224  connection_id() );
225  } );
226 
228  graceful_close();
229  }
230  catch( const std::exception & ex )
231  {
233  [&]{
234  return fmt::format(
235  "[ws_connection:{}] shutdown operation error: {}",
236  connection_id(),
237  ex.what() );
238  } );
239  }
240  } );
241  }
242 
243  //! Kill websocket.
244  virtual void
245  kill() override
246  {
247  asio_ns::dispatch(
248  this->get_executor(),
249  [ this, ctx = shared_from_this() ]
250  // NOTE: this lambda is noexcept since v.0.6.0.
251  () noexcept
252  {
253  try
254  {
255  // An exception from logger shouldn't prevent
256  // main kill actions.
258  [&]{
259  return fmt::format(
260  "[ws_connection:{}] kill",
261  connection_id() );
262  } );
263 
266 
267  close_impl();
268  }
269  catch( const std::exception & ex )
270  {
272  [&]{
273  return fmt::format(
274  "[ws_connection:{}] kill operation error: {}",
275  connection_id(),
276  ex.what() );
277  } );
278  }
279  } );
280  }
281 
282  //! Start reading ws-messages.
283  void
284  init_read( ws_handle_t wsh ) override
285  {
287 
288  // Run write message on io_context loop (direct invocation if possible).
289  asio_ns::dispatch(
290  this->get_executor(),
291  [ this, ctx = shared_from_this(), wswh = std::move( wswh ) ]
292  // NOTE: this lambda is noexcept since v.0.6.0.
293  () noexcept
294  {
295  try
296  {
297  // Start timeout checking.
300 
304  }
305  catch( const std::exception & ex )
306  {
309  [&]{
310  return fmt::format(
311  "[ws_connection:{}] unable to init read: {}",
312  connection_id(),
313  ex.what() );
314  } );
315  }
316  } );
317  }
318 
319  //! Write pieces of outgoing data.
320  virtual void
322  write_group_t wg,
323  bool is_close_frame ) override
324  {
325  //! Run write message on io_context loop if possible.
326  asio_ns::dispatch(
327  this->get_executor(),
328  [ this,
329  actual_wg = std::move( wg ),
330  ctx = shared_from_this(),
332  // NOTE: this lambda is noexcept since v.0.6.0.
333  () mutable noexcept
334  {
335  try
336  {
339  std::move( actual_wg ),
340  is_close_frame );
341  else
342  {
343  m_logger.warn( [&]{
344  return fmt::format(
345  "[ws_connection:{}] cannot write to websocket: "
346  "write operations disabled",
347  connection_id() );
348  } );
349  }
350  }
351  catch( const std::exception & ex )
352  {
355  [&]{
356  return fmt::format(
357  "[ws_connection:{}] unable to write data: {}",
358  connection_id(),
359  ex.what() );
360  } );
361  }
362  } );
363  }
364  private:
365  //! Standard close routine.
366  /*!
367  * @note
368  * This method is noexcept since v.0.6.0.
369  */
370  void
371  close_impl() noexcept
372  {
374  [&]() noexcept {
376  [&]{
377  return fmt::format(
378  "[ws_connection:{}] close socket",
379  connection_id() );
380  } );
381 
382  // This actions can throw and because of that we have
383  // to wrap them...
385  m_logger,
386  "ws_connection.close_impl.socket.shutdown",
387  [&] {
391  ignored_ec );
392  } );
393 
395  m_logger,
396  "ws_connection.close_impl.socket.close",
397  [&] {
398  m_socket.close();
399  } );
400  } );
401  }
402 
403  //! Start waiting for close-frame.
404  void
406  {
409  }
410 
411  //! Close WebSocket connection in a graceful manner.
412  void
414  {
416  [&]{
419  } );
420  }
421 
422  //! Send close frame to peer.
423  void
424  send_close_frame_to_peer( std::string payload )
425  {
427  bufs.reserve( 2 );
428 
431  final_frame,
433  payload.size() ) );
434 
437 
439 
440  // No more data must be written.
442  }
443 
444  //! Send close frame to peer.
445  void
447  status_code_t code,
448  std::string desc = std::string{} )
449  {
451  }
452 
453  //! Trigger an error.
454  /*!
455  Writes error message to log,
456  closes socket,
457  and sends close frame to user if necessary.
458 
459  @note
460  This method is noexcept since v.0.6.0
461  */
462  template< typename MSG_BUILDER >
463  void
465  status_code_t status,
466  MSG_BUILDER msg_builder ) noexcept
467  {
468  // An exception in logger shouldn't prevent the main actions.
470  m_logger, std::move( msg_builder ) );
471 
472  // This can throw but we have to suppress any exceptions.
474  m_logger, "ws_connection.call_close_handler_if_necessary",
475  [this, status] {
477  } );
478 
480  }
481 
482 
483  //! Start the process of reading ws messages from socket.
484  void
486  {
487  m_logger.trace( [&]{
488  return fmt::format(
489  "[ws_connection:{}] start reading header",
490  connection_id() );
491  } );
492 
493  // Prepare parser for consuming new message.
495 
496  if( 0 == m_input.m_buf.length() )
497  {
499  }
500  else
501  {
502  // Has something to read from m_input.m_buf.
505  }
506  }
507 
508  //! Initiate read operation on socket to receive bytes for header.
509  void
511  {
512  m_logger.trace( [&]{
513  return fmt::format(
514  "[ws_connection:{}] continue reading message",
515  connection_id() );
516  } );
517 
521  this->get_executor(),
522  [ this, ctx = shared_from_this() ]
523  // NOTE: this lambda is noexcept since v.0.6.0.
524  ( const asio_ns::error_code & ec, std::size_t length ) noexcept
525  {
526  try
527  {
529  }
530  catch( const std::exception & ex )
531  {
534  [&]{
535  return fmt::format(
536  "[ws_connection:{}] after read header callback error: {}",
537  connection_id(),
538  ex.what() );
539  } );
540  }
541  } ) );
542  }
543 
544  //! Handle read error (reading header or payload)
545  void
546  handle_read_error( const char * desc, const asio_ns::error_code & ec )
547  {
548  // Assume that connection is lost.
551  [&]{
552  return fmt::format(
553  "[ws_connection:{}] {}: {}",
554  connection_id(),
555  desc,
556  ec.message() );
557  } );
558  }
559 
560  //! Handle read operation result, when reading header.
561  void
563  const asio_ns::error_code & ec,
564  std::size_t length )
565  {
566  if( !ec )
567  {
568  m_logger.trace( [&]{
569  return fmt::format(
570  "[ws_connection:{}] received {} bytes",
571  this->connection_id(),
572  length );
573  } );
574 
577  }
578  else
579  {
580  handle_read_error( "reading message header error", ec );
581  }
582  }
583 
584  //! Parse header from internal buffer.
585  void
586  consume_header_from_buffer( const char * data, std::size_t length )
587  {
589 
591 
593  {
595  }
596  else
597  {
598  assert( nparsed == length );
600  }
601  }
602 
603  //! Handle parsed header.
604  void
605  handle_parsed_header( const message_details_t & md )
606  {
607  m_logger.trace( [&]{
608  return fmt::format(
609  "[ws_connection:{}] start handling {} ({:#x})",
610  connection_id(),
612  static_cast<std::uint16_t>(md.m_opcode) );
613  } );
614 
615  const auto validation_result =
617 
619  {
620  m_logger.error( [&]{
621  return fmt::format(
622  "[ws_connection:{}] invalid header",
623  connection_id() );
624  } );
625 
627  {
629  [&]{
631  // Do not wait anything in return, because
632  // protocol is violated.
633  } );
634 
636  }
638  {
639  // Wait for close frame cannot be done.
640  close_impl();
641  }
642 
643  return;
644  }
645 
647  }
648 
649  //! Handle parsed and valid header.
650  void
651  handle_parsed_and_valid_header( const message_details_t & md )
652  {
653  const auto payload_length =
655 
657 
658  if( payload_length == 0 )
659  {
660  // Callback for message with 0-size payload.
662  }
663  else
664  {
665  const auto payload_part_size =
667 
668  std::memcpy(
670  m_input.m_buf.bytes(),
672 
674 
675  const std::size_t length_remaining =
677 
681  length_remaining ) )
682  {
683  if( 0 == length_remaining )
684  {
685  // All message is obtained.
687  }
688  else
689  {
690  // Read the rest of payload:
694  }
695  }
696  // Else payload is invalid and validate_payload_part()
697  // has handled the case so do nothing.
698  }
699  }
700 
701  //! Start reading message payload.
702  void
704  //! A pointer to the remainder of unfetched payload.
705  char * payload_data,
706  //! The size of the remainder of unfetched payload.
707  std::size_t length_remaining,
708  //! Validate payload and call handler.
709  bool do_validate_payload_and_call_msg_handler = true )
710  {
714  this->get_executor(),
715  [ this,
716  ctx = shared_from_this(),
717  payload_data,
720  // NOTE: this lambda is noexcept since v.0.6.0.
721  ( const asio_ns::error_code & ec, std::size_t length ) noexcept
722  {
723  try
724  {
726  payload_data,
728  ec,
729  length,
731  }
732  catch( const std::exception & ex )
733  {
736  [&]{
737  return fmt::format(
738  "[ws_connection:{}] after read payload callback error: {}",
739  connection_id(),
740  ex.what() );
741  } );
742  }
743  } ) );
744  }
745 
746  //! Handle read operation result, when reading payload.
747  void
749  char * payload_data,
750  std::size_t length_remaining,
751  const asio_ns::error_code & ec,
752  std::size_t length,
753  bool do_validate_payload_and_call_msg_handler = true )
754  {
755  if( !ec )
756  {
757  m_logger.trace( [&]{
758  return fmt::format(
759  "[ws_connection:{}] received {} bytes",
760  this->connection_id(),
761  length );
762  } );
763 
765 
768 
770  {
772  {
773  if( 0 == next_length_remaining )
774  {
775  // Here: all the payload is ready.
776 
777  // All message is obtained.
779  }
780  else
781  {
782  //Here: not all payload is obtained,
783  // so inintiate read once again:
788  }
789  }
790  // Else payload is invalid and validate_payload_part()
791  // has handled the case so do nothing.
792  }
793  else
794  {
795  if( 0 == next_length_remaining )
796  {
798  }
799  else
800  {
805  }
806  }
807  }
808  else
809  {
810  handle_read_error( "reading message payload error", ec );
811  }
812  }
813 
814  //! Call user message handler with current message.
815  void
816  call_message_handler( message_handle_t close_frame )
817  {
818  if( auto wsh = m_websocket_weak_handle.lock() )
819  {
820  try
821  {
823  std::move( wsh ),
824  std::move( close_frame ) );
825  }
826  catch( const std::exception & ex )
827  {
828  m_logger.error( [&]{
829  return fmt::format(
830  "[ws_connection:{}] execute handler error: {}",
831  connection_id(),
832  ex.what() );
833  } );
834  }
835  }
836  }
837 
838  //! Validates a part of received payload.
839  bool
841  char * payload_data,
842  std::size_t length,
843  std::size_t next_length_remaining )
844  {
845  const auto validation_result =
847 
849  {
851 
853  {
854  // Can skip this payload because it was not a bad close frame.
855 
856  // It is the case we are expecting close frame
857  // so validator must be ready to receive more headers
858  // and payloads after this frame.
860 
861  if( 0 == next_length_remaining )
862  {
864  }
865  else
866  {
867  // Skip checking payload for this frame:
868  const bool do_validate_payload_and_call_msg_handler = false;
873  }
874  }
875  return false;
876  }
877 
878  return true;
879  }
880 
881  //! Handle payload errors.
882  void
883  handle_invalid_payload( validation_state_t validation_result )
884  {
885  m_logger.error( [&]{
886  return fmt::format(
887  "[ws_connection:{}] invalid paload",
888  connection_id() );
889  } );
890 
892  {
893  // A corner case: invalid payload in close frame.
894 
896  {
897  // Case: close frame was not expected.
898 
899  // This actually must be executed:
901  [&]{
903  // Do not wait anything in return, because
904  // protocol is violated.
905  } );
906 
907  // Notify user of a close but use a correct close code.
909  }
911  {
912  // Case: close frame was expected.
913 
914  // We got a close frame but it is incorrect,
915  // so just close (there is not too much we can do).
916  close_impl();
917  }
918  }
919  else
920  {
922  {
924  [&]{
927  } );
928 
930  }
931  }
932  }
933 
934  void
936  {
937  auto & md = m_input.m_parser.current_message();
938 
941  {
943  {
945  {
946  m_logger.trace( [&]{
947  return fmt::format(
948  "[ws_connection:{}] got close frame from peer, status: {}",
949  connection_id(),
950  static_cast<std::uint16_t>(
952  } );
953 
956  [&]{
958  } );
959 
961  }
962 
966  md.m_opcode,
967  std::move( m_input.m_payload ) ) );
968 
971  }
972  else
973  {
975 
977  {
978  // Got it!
980 
981  close_impl();
982 
983  m_logger.trace( [&]{
984  return fmt::format(
985  "[ws_connection:{}] expected close frame came",
986  connection_id() );
987  } );
988  }
989  else
990  {
991  // Wait for next frame.
993  }
994  }
995  }
996  else
997  {
999  }
1000  }
1001 
1002  void
1003  call_close_handler_if_necessary( status_code_t status )
1004  {
1006  [&]{
1009  final_frame,
1011  status_code_to_bin( status ) ) );
1012  } );
1013  }
1014 
1015  //! Implementation of writing data performed on the asio_ns::io_context.
1016  void
1017  write_data_impl( write_group_t wg, bool is_close_frame )
1018  {
1019  if( m_socket.is_open() )
1020  {
1021  if( is_close_frame )
1022  {
1023  m_logger.trace( [&]{
1024  return fmt::format(
1025  "[ws_connection:{}] user sends close frame",
1026  connection_id() );
1027  } );
1028 
1029  m_close_frame_to_peer.disable(); // It is formed and sent by user
1030  m_close_frame_to_user.disable(); // And user knows that websocket is closed.
1031  // No more writes.
1033 
1034  // Start waiting only close-frame.
1036  }
1037 
1038  // Push write_group to queue.
1039  m_outgoing_data.append( std::move( wg ) );
1040 
1042  }
1043  else
1044  {
1045  m_logger.warn( [&]{
1046  return fmt::format(
1047  "[ws_connection:{}] try to write while socket is closed",
1048  connection_id() );
1049  } );
1050 
1051  try
1052  {
1056  }
1057  catch( ... )
1058  {}
1059  }
1060  }
1061 
1062  //! Checks if there is something to write,
1063  //! and if so starts write operation.
1064  void
1066  {
1068  {
1069  init_write();
1070  }
1071  }
1072 
1073  //! Initiate write operation.
1074  void
1076  {
1077  // Here: not writing anything to socket, so
1078  // write operation can be initiated.
1080 
1081  if( next_write_group )
1082  {
1083  m_logger.trace( [&]{
1084  return fmt::format(
1085  "[ws_connection:{}] start next write group, "
1086  "size: {}",
1087  this->connection_id(),
1089  } );
1090 
1091  // Initialize write context with a new write group.
1093  std::move( next_write_group ) );
1094 
1095  // Start the loop of sending data from current write group.
1097  }
1098  }
1099 
1100  // Use aliases for shorter names.
1104 
1105  void
1107  {
1108  try
1109  {
1111 
1113  {
1115  }
1117  {
1119  }
1120  else
1121  {
1123  throw exception_t{ "sendfile write operation not implemented" };
1124  }
1125  }
1126  catch( const std::exception & ex )
1127  {
1130  [&]{
1131  return fmt::format(
1132  "[ws_connection:{}] handle_current_write_ctx failed: {}",
1133  connection_id(),
1134  ex.what() );
1135  } );
1136  }
1137  }
1138 
1139  void
1140  handle_trivial_write_operation( const trivial_write_operation_t & op )
1141  {
1142  // Asio buffers (param for async write):
1143  auto & bufs = op.get_trivial_bufs();
1144 
1145  m_logger.trace( [&]{
1146  return fmt::format(
1147  "[ws_connection:{}] sending data with "
1148  "buf count: {}, "
1149  "total size: {}",
1150  connection_id(),
1151  bufs.size(),
1152  op.size() ); } );
1153 
1155 
1156  // There is somethig to write.
1158  m_socket,
1159  bufs,
1161  this->get_executor(),
1162  [ this,
1163  ctx = shared_from_this() ]
1164  // NOTE: this lambda is noexcept since v.0.6.0.
1165  ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1166  {
1167  try
1168  {
1169  if( !ec )
1170  {
1171  m_logger.trace( [&]{
1172  return fmt::format(
1173  "[ws_connection:{}] outgoing data was sent: {} bytes",
1174  connection_id(),
1175  written );
1176  } );
1177  }
1178 
1179  after_write( ec );
1180  }
1181  catch( const std::exception & ex )
1182  {
1185  [&]{
1186  return fmt::format(
1187  "[ws_connection:{}] after write callback error: {}",
1188  connection_id(),
1189  ex.what() );
1190  } );
1191  }
1192  } ) );
1193  }
1194 
1195  //! Do post write actions for current write group.
1196  void
1198  {
1199  // Finishing writing this group.
1200  m_logger.trace( [&]{
1201  return fmt::format(
1202  "[ws_connection:{}] finishing current write group",
1203  this->connection_id() );
1204  } );
1205 
1206  // Group notificators are called from here (if exist):
1208 
1209  // Start another write opertion
1210  // if there is something to send.
1212  }
1213 
1214  //! Handle write response finished.
1215  void
1216  after_write( const asio_ns::error_code & ec )
1217  {
1218  if( !ec )
1219  {
1221  }
1222  else
1223  {
1226  [&]{
1227  return fmt::format(
1228  "[ws_connection:{}] unable to write: {}",
1229  connection_id(),
1230  ec.message() );
1231  } );
1232 
1233  try
1234  {
1236  }
1237  catch( const std::exception & ex )
1238  {
1239  m_logger.error( [&]{
1240  return fmt::format(
1241  "[ws_connection:{}] notificator error: {}",
1242  connection_id(),
1243  ex.what() );
1244  } );
1245  }
1246  }
1247  }
1248 
1249  //! Common paramaters of a connection.
1251 
1252  //! Connection.
1253  stream_socket_t m_socket;
1254 
1255  /*!
1256  * @brief Monitor of the connection lifetime.
1257  *
1258  * @since v.0.6.12
1259  */
1261 
1262  //! Timers.
1263  //! \{
1264  static ws_connection_t &
1265  cast_to_self( tcp_connection_ctx_base_t & base )
1266  {
1267  return static_cast< ws_connection_t & >( base );
1268  }
1269 
1270  virtual void
1271  check_timeout( tcp_connection_ctx_handle_t & self ) override
1272  {
1273  asio_ns::dispatch(
1274  this->get_executor(),
1275  [ ctx = std::move( self ) ]
1276  // NOTE: this lambda is noexcept since v.0.6.0.
1277  () noexcept
1278  {
1279  auto & conn_object = cast_to_self( *ctx );
1280  // If an exception will be thrown we can only
1281  // close the connection.
1282  try
1283  {
1285  }
1286  catch( const std::exception & x )
1287  {
1290  [&] {
1291  return fmt::format( "[connection: {}] unexpected "
1292  "error during timeout handling: {}",
1294  x.what() );
1295  } );
1296  }
1297  } );
1298  }
1299 
1304  timer_guard_t m_timer_guard;
1305 
1306  void
1308  {
1309  const auto now = std::chrono::steady_clock::now();
1311  {
1312  m_logger.trace( [&]{
1313  return fmt::format(
1314  "[wd_connection:{}] write operation timed out",
1315  connection_id() );
1316  } );
1319  close_impl();
1320  }
1322  {
1323  m_logger.trace( [&]{
1324  return fmt::format(
1325  "[wd_connection:{}] waiting for close-frame from peer timed out",
1326  connection_id() );
1327  } );
1328  close_impl();
1329  }
1330  else
1331  {
1333  }
1334  }
1335 
1336  //! schedule next timeout checking.
1337  void
1339  {
1341  }
1342 
1343  //! Start guard write operation if necessary.
1344  void
1346  {
1349  }
1350 
1351  void
1353  {
1356  }
1357  //! \}
1358 
1359  //! Input routine.
1361 
1362  //! Helper for validating protocol.
1364 
1365  //! Websocket message handler provided by user.
1366  message_handler_t m_msg_handler;
1367 
1368  //! Logger for operation
1369  logger_t & m_logger;
1370 
1371  //! Write to socket operation context.
1373 
1374  //! Output buffers queue.
1376 
1377  //! A waek handler for owning ws_t to use it when call message handler.
1379 
1380  //! Websocket output states.
1381  enum class write_state_t
1382  {
1383  //! Able to append outgoing data.
1384  write_enabled,
1385  //! No more outgoing data can be added (e.g. close-frame was sent).
1387  };
1388 
1389  //! A state of a websocket output.
1391 
1392  //! Websocket input states.
1393  enum class read_state_t
1394  {
1395  //! Reads any type of frame and serve it to user.
1397  //! Reads only close frame: skip all frames until close-frame.
1399  //! Do not read anything (before activation).
1400  read_nothing
1401  };
1402 
1403  //! A state of a websocket input.
1405 
1406  //! A helper class for running exclusive action.
1407  //! Only a first action will run.
1409  {
1410  public:
1411  template < typename Action >
1412  void
1413  run_if_first( Action && action ) noexcept(noexcept(action()))
1414  {
1415  if( m_not_executed_yet )
1416  {
1417  m_not_executed_yet = false;
1418  action();
1419  }
1420  }
1421 
1422  //! Disable ation: action will not be executed even on a first shot.
1423  void
1425  {
1426  m_not_executed_yet = false;
1427  }
1428 
1429  private:
1430  bool m_not_executed_yet{ true };
1431  };
1432 
1436 };
1437 
1438 } /* namespace impl */
1439 
1440 } /* namespace basic */
1441 
1442 } /* namespace websocket */
1443 
1444 } /* namespace restinio */
read_state_t m_read_state
A state of a websocket input.
void consume_header_from_buffer(const char *data, std::size_t length)
Parse header from internal buffer.
Context for handling websocket connections.
std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after
restinio::impl::fixed_buffer_t m_buf
Input buffer.
ws_weak_handle_t m_websocket_weak_handle
A waek handler for owning ws_t to use it when call message handler.
void close_impl() noexcept
Standard close routine.
void after_read_payload(char *payload_data, std::size_t length_remaining, const asio_ns::error_code &ec, std::size_t length, bool do_validate_payload_and_call_msg_handler=true)
Handle read operation result, when reading payload.
void run_if_first(Action &&action) noexcept(noexcept(action()))
virtual void kill() override
Kill websocket.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
void consume_header_from_socket()
Initiate read operation on socket to receive bytes for header.
void handle_invalid_payload(validation_state_t validation_result)
Handle payload errors.
write_groups_queue_t m_awaiting_write_groups
A queue of buffers.
ws_connection_t & operator=(const ws_connection_t &)=delete
constexpr size_t websocket_header_max_size()
Max possible size of websocket frame header (a part before payload).
void write_data_impl(write_group_t wg, bool is_close_frame)
Implementation of writing data performed on the asio_ns::io_context.
void send_close_frame_to_peer(std::string payload)
Send close frame to peer.
restinio::impl::write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
message_handler_t m_msg_handler
Websocket message handler provided by user.
ws_outgoing_data_t m_outgoing_data
Output buffers queue.
ws_connection_t(const ws_connection_t &)=delete
void handle_read_error(const char *desc, const asio_ns::error_code &ec)
Handle read error (reading header or payload)
void append(write_group_t wg)
Add buffers to queue.
bool validate_payload_part(char *payload_data, std::size_t length, std::size_t next_length_remaining)
Validates a part of received payload.
void disable()
Disable ation: action will not be executed even on a first shot.
No more outgoing data can be added (e.g. close-frame was sent).
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
void init_read(ws_handle_t wsh) override
Start reading ws-messages.
static ws_connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timers.
Reads only close frame: skip all frames until close-frame.
void trigger_error_and_close(status_code_t status, MSG_BUILDER msg_builder) noexcept
Trigger an error.
ws_protocol_validator_t m_protocol_validator
Helper for validating protocol.
void call_close_handler_if_necessary(status_code_t status)
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
void start_waiting_close_frame_only()
Start waiting for close-frame.
void start_read_header()
Start the process of reading ws messages from socket.
restinio::impl::connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
void handle_parsed_header(const message_details_t &md)
Handle parsed header.
void start_read_payload(char *payload_data, std::size_t length_remaining, bool do_validate_payload_and_call_msg_handler=true)
Start reading message payload.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
void init_write_if_necessary()
Checks if there is something to write, and if so starts write operation.
ws_connection_t & operator=(ws_connection_t &&)=delete
void init_next_timeout_checking()
schedule next timeout checking.
virtual void shutdown() override
Shutdown websocket.
void send_close_frame_to_peer(status_code_t code, std::string desc=std::string{})
Send close frame to peer.
void after_read_header(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result, when reading header.
write_state_t m_write_state
A state of a websocket output.
virtual void write_data(write_group_t wg, bool is_close_frame) override
Write pieces of outgoing data.
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
void guard_write_operation()
Start guard write operation if necessary.
void graceful_close()
Close WebSocket connection in a graceful manner.
void reset_parser_and_payload()
Prepare parser for reading new http-message.
void call_message_handler(message_handle_t close_frame)
Call user message handler with current message.
std::chrono::steady_clock::time_point m_write_operation_timeout_after
void handle_parsed_and_valid_header(const message_details_t &md)
Handle parsed and valid header.
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, optional_t< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in optional_t<Value_Type> if parameter exists ...
Definition: value_or.hpp:64
A helper class for running exclusive action. Only a first action will run.