RESTinio
ws_connection.hpp
Go to the documentation of this file.
1 /*
2  restinio
3 */
4 
5 /*!
6  WebSocket connection routine.
7 */
8 
9 #pragma once
10 
11 #include <queue>
12 
13 #include <restinio/asio_include.hpp>
14 
15 #include <http_parser.h>
16 
17 #include <fmt/format.h>
18 
19 #include <restinio/all.hpp>
20 #include <restinio/impl/executor_wrapper.hpp>
21 #include <restinio/impl/write_group_output_ctx.hpp>
22 #include <restinio/websocket/message.hpp>
23 #include <restinio/websocket/impl/ws_parser.hpp>
24 #include <restinio/websocket/impl/ws_protocol_validator.hpp>
25 
26 #include <restinio/utils/impl/safe_uint_truncate.hpp>
27 
28 namespace restinio
29 {
30 
31 namespace websocket
32 {
33 
34 namespace basic
35 {
36 
37 namespace impl
38 {
39 
41 
42 //! Max possible size of websocket frame header (a part before payload).
43 constexpr size_t
45 {
46  return 14;
47 }
48 
49 //
50 // ws_outgoing_data_t
51 //
52 
53 //! A queue for outgoing buffers.
55 {
56  public:
57  //! Add buffers to queue.
58  void
59  append( write_group_t wg )
60  {
61  m_awaiting_write_groups.emplace( std::move( wg ) );
62  }
63 
66  {
67  optional_t< write_group_t > result;
68 
69  if( !m_awaiting_write_groups.empty() )
70  {
71  result = std::move( m_awaiting_write_groups.front() );
72  m_awaiting_write_groups.pop();
73  }
74 
75  return result;
76  }
77 
78  private:
79  //! A queue of buffers.
81 };
82 
83 //
84 // connection_input_t
85 //
86 
87 //! Websocket input stuff.
89 {
90  connection_input_t( std::size_t buffer_size )
91  : m_buf{ buffer_size }
92  {}
93 
94  //! websocket parser.
96 
97  //! Input buffer.
99 
100  //! Current payload.
102 
103  //! Prepare parser for reading new http-message.
104  void
106  {
107  m_parser.reset();
108  m_payload.clear();
109  }
110 };
111 
112 //
113 // ws_connection_t
114 //
115 
116 //! Context for handling websocket connections.
117 template <
118  typename Traits,
119  typename WS_Message_Handler >
121  : public ws_connection_base_t
122  , public restinio::impl::executor_wrapper_t< typename Traits::strand_t >
123 {
125 
126  public:
127  using message_handler_t = WS_Message_Handler;
128 
129  using timer_manager_t = typename Traits::timer_manager_t;
131  using timer_guard_t = typename timer_manager_t::timer_guard_t;
132  using logger_t = typename Traits::logger_t;
133  using strand_t = typename Traits::strand_t;
134  using stream_socket_t = typename Traits::stream_socket_t;
135 
137 
138  ws_connection_t(
139  //! Connection id.
140  connection_id_t conn_id,
141  //! Data inherited from http-connection.
142  //! \{
143  restinio::impl::connection_settings_handle_t< Traits > settings,
144  stream_socket_t socket,
145  //! \}
146  message_handler_t msg_handler )
149  , m_settings{ std::move( settings ) }
150  , m_socket{ std::move( socket ) }
154  , m_logger{ *( m_settings->m_logger ) }
155  {
156  // Notify of a new connection instance.
157  m_logger.trace( [&]{
158  return fmt::format(
159  "[connection:{}] move socket to [ws_connection:{}]",
160  connection_id(),
161  connection_id() );
162  } );
163 
164  m_logger.trace( [&]{
165  return fmt::format(
166  "[ws_connection:{}] start connection with {}",
167  connection_id(),
169  } );
170 
171  // Inform state listener if it used.
172  m_settings->call_state_listener( [this]() noexcept {
173  return connection_state::notice_t{
174  connection_id(),
177  } );
178  }
179 
180  ws_connection_t( const ws_connection_t & ) = delete;
181  ws_connection_t( ws_connection_t && ) = delete;
182  ws_connection_t & operator = ( const ws_connection_t & ) = delete;
183  ws_connection_t & operator = ( ws_connection_t && ) = delete;
184 
186  {
187  try
188  {
189  // Notify of a new connection instance.
190  m_logger.trace( [&]{
191  return fmt::format(
192  "[ws_connection:{}] destructor called",
193  connection_id() );
194  } );
195  }
196  catch( ... )
197  {}
198  }
199 
200  //! Shutdown websocket.
201  virtual void
202  shutdown() override
203  {
204  asio_ns::dispatch(
205  this->get_executor(),
206  [ this, ctx = shared_from_this() ](){
207  try
208  {
209  m_logger.trace( [&]{
210  return fmt::format(
211  "[ws_connection:{}] shutdown",
212  connection_id() );
213  } );
214 
216  graceful_close();
217  }
218  catch( const std::exception & ex )
219  {
220  m_logger.error( [&]{
221  return fmt::format(
222  "[ws_connection:{}] shutdown operation error: {}",
223  connection_id(),
224  ex.what() );
225  } );
226  }
227  } );
228  }
229 
230  //! Kill websocket.
231  virtual void
232  kill() override
233  {
234  asio_ns::dispatch(
235  this->get_executor(),
236  [ this, ctx = shared_from_this() ](){
237  try
238  {
239  m_logger.trace( [&]{
240  return fmt::format(
241  "[ws_connection:{}] kill",
242  connection_id() );
243  } );
244 
247 
248  close_impl();
249  }
250  catch( const std::exception & ex )
251  {
252  m_logger.error( [&]{
253  return fmt::format(
254  "[ws_connection:{}] kill operation error: {}",
255  connection_id(),
256  ex.what() );
257  } );
258  }
259  } );
260  }
261 
262  //! Start reading ws-messages.
263  void
264  init_read( ws_handle_t wsh ) override
265  {
267 
268  // Run write message on io_context loop (direct invocation if possible).
269  asio_ns::dispatch(
270  this->get_executor(),
271  [ this, ctx = shared_from_this(), wswh = std::move( wswh ) ](){
272  try
273  {
274  // Start timeout checking.
277 
281  }
282  catch( const std::exception & ex )
283  {
286  [&]{
287  return fmt::format(
288  "[ws_connection:{}] unable to init read: {}",
289  connection_id(),
290  ex.what() );
291  } );
292  }
293  } );
294  }
295 
296  //! Write pieces of outgoing data.
297  virtual void
299  write_group_t wg,
300  bool is_close_frame ) override
301  {
302  //! Run write message on io_context loop if possible.
303  asio_ns::dispatch(
304  this->get_executor(),
305  [ this,
306  actual_wg = std::move( wg ),
307  ctx = shared_from_this(),
308  is_close_frame ]() mutable {
309  try
310  {
313  std::move( actual_wg ),
314  is_close_frame );
315  else
316  {
317  m_logger.warn( [&]{
318  return fmt::format(
319  "[ws_connection:{}] cannot write to websocket: "
320  "write operations disabled",
321  connection_id() );
322  } );
323  }
324  }
325  catch( const std::exception & ex )
326  {
329  [&]{
330  return fmt::format(
331  "[ws_connection:{}] unable to write data: {}",
332  connection_id(),
333  ex.what() );
334  } );
335  }
336  } );
337  }
338  private:
339  //! Standard close routine.
340  void
342  {
344  [&]{
345  m_logger.trace( [&]{
346  return fmt::format(
347  "[ws_connection:{}] close socket",
348  connection_id() );
349  } );
350 
354  ignored_ec );
355  m_socket.close();
356  } );
357  }
358 
359  //! Start waiting for close-frame.
360  void
362  {
365  }
366 
367  //! Close WebSocket connection in a graceful manner.
368  void
370  {
372  [&]{
375  } );
376  }
377 
378  //! Send close frame to peer.
379  void
380  send_close_frame_to_peer( std::string payload )
381  {
383  bufs.reserve( 2 );
384 
387  final_frame,
389  payload.size() ) );
390 
393 
395 
396  // No more data must be written.
398  }
399 
400  //! Send close frame to peer.
401  void
403  status_code_t code,
404  std::string desc = std::string{} )
405  {
407  }
408 
409  //! Trigger an error.
410  /*!
411  Writes error message to log,
412  closes socket,
413  and sends close frame to user if necessary.
414  */
415  template< typename MSG_BUILDER >
416  void
418  status_code_t status,
419  MSG_BUILDER msg_builder )
420  {
423  close_impl();
424  }
425 
426 
427  //! Start the process of reading ws messages from socket.
428  void
430  {
431  m_logger.trace( [&]{
432  return fmt::format(
433  "[ws_connection:{}] start reading header",
434  connection_id() );
435  } );
436 
437  // Prepare parser for consuming new message.
439 
440  if( 0 == m_input.m_buf.length() )
441  {
443  }
444  else
445  {
446  // Has something to read from m_input.m_buf.
449  }
450  }
451 
452  //! Initiate read operation on socket to receive bytes for header.
453  void
455  {
456  m_logger.trace( [&]{
457  return fmt::format(
458  "[ws_connection:{}] continue reading message",
459  connection_id() );
460  } );
461 
465  this->get_executor(),
466  [ this, ctx = shared_from_this() ](
467  const asio_ns::error_code & ec,
468  std::size_t length ){
469  try
470  {
472  }
473  catch( const std::exception & ex )
474  {
477  [&]{
478  return fmt::format(
479  "[ws_connection:{}] after read header callback error: {}",
480  connection_id(),
481  ex.what() );
482  } );
483  }
484  } ) );
485  }
486 
487  //! Handle read error (reading header or payload)
488  void
489  handle_read_error( const char * desc, const asio_ns::error_code & ec )
490  {
491  // Assume that connection is lost.
494  [&]{
495  return fmt::format(
496  "[ws_connection:{}] {}: {}",
497  connection_id(),
498  desc,
499  ec.message() );
500  } );
501  }
502 
503  //! Handle read operation result, when reading header.
504  void
506  const asio_ns::error_code & ec,
507  std::size_t length )
508  {
509  if( !ec )
510  {
511  m_logger.trace( [&]{
512  return fmt::format(
513  "[ws_connection:{}] received {} bytes",
514  this->connection_id(),
515  length );
516  } );
517 
520  }
521  else
522  {
523  handle_read_error( "reading message header error", ec );
524  }
525  }
526 
527  //! Parse header from internal buffer.
528  void
529  consume_header_from_buffer( const char * data, std::size_t length )
530  {
532 
534 
536  {
538  }
539  else
540  {
541  assert( nparsed == length );
543  }
544  }
545 
546  //! Handle parsed header.
547  void
548  handle_parsed_header( const message_details_t & md )
549  {
550  m_logger.trace( [&]{
551  return fmt::format(
552  "[ws_connection:{}] start handling {} ({:#x})",
553  connection_id(),
555  static_cast<std::uint16_t>(md.m_opcode) );
556  } );
557 
558  const auto validation_result =
560 
562  {
563  m_logger.error( [&]{
564  return fmt::format(
565  "[ws_connection:{}] invalid header",
566  connection_id() );
567  } );
568 
570  {
572  [&]{
574  // Do not wait anything in return, because
575  // protocol is violated.
576  } );
577 
579  }
581  {
582  // Wait for close frame cannot be done.
583  close_impl();
584  }
585 
586  return;
587  }
588 
590  }
591 
592  //! Handle parsed and valid header.
593  void
594  handle_parsed_and_valid_header( const message_details_t & md )
595  {
596  const auto payload_length =
598 
600 
601  if( payload_length == 0 )
602  {
603  // Callback for message with 0-size payload.
605  }
606  else
607  {
608  const auto payload_part_size =
610 
611  std::memcpy(
613  m_input.m_buf.bytes(),
615 
617 
618  const std::size_t length_remaining =
620 
624  length_remaining ) )
625  {
626  if( 0 == length_remaining )
627  {
628  // All message is obtained.
630  }
631  else
632  {
633  // Read the rest of payload:
637  }
638  }
639  // Else payload is invalid and validate_payload_part()
640  // has handled the case so do nothing.
641  }
642  }
643 
644  //! Start reading message payload.
645  void
647  //! A pointer to the remainder of unfetched payload.
648  char * payload_data,
649  //! The size of the remainder of unfetched payload.
650  std::size_t length_remaining,
651  //! Validate payload and call handler.
652  bool do_validate_payload_and_call_msg_handler = true )
653  {
657  this->get_executor(),
658  [ this,
659  ctx = shared_from_this(),
660  payload_data,
663  const asio_ns::error_code & ec,
664  std::size_t length ){
665 
666  try
667  {
669  payload_data,
671  ec,
672  length,
674  }
675  catch( const std::exception & ex )
676  {
679  [&]{
680  return fmt::format(
681  "[ws_connection:{}] after read payload callback error: {}",
682  connection_id(),
683  ex.what() );
684  } );
685  }
686  } ) );
687  }
688 
689  //! Handle read operation result, when reading payload.
690  void
692  char * payload_data,
693  std::size_t length_remaining,
694  const asio_ns::error_code & ec,
695  std::size_t length,
696  bool do_validate_payload_and_call_msg_handler = true )
697  {
698  if( !ec )
699  {
700  m_logger.trace( [&]{
701  return fmt::format(
702  "[ws_connection:{}] received {} bytes",
703  this->connection_id(),
704  length );
705  } );
706 
708 
711 
713  {
715  {
716  if( 0 == next_length_remaining )
717  {
718  // Here: all the payload is ready.
719 
720  // All message is obtained.
722  }
723  else
724  {
725  //Here: not all payload is obtained,
726  // so inintiate read once again:
731  }
732  }
733  // Else payload is invalid and validate_payload_part()
734  // has handled the case so do nothing.
735  }
736  else
737  {
738  if( 0 == next_length_remaining )
739  {
741  }
742  else
743  {
748  }
749  }
750  }
751  else
752  {
753  handle_read_error( "reading message payload error", ec );
754  }
755  }
756 
757  //! Call user message handler with current message.
758  void
759  call_message_handler( message_handle_t close_frame )
760  {
761  if( auto wsh = m_websocket_weak_handle.lock() )
762  {
763  try
764  {
766  std::move( wsh ),
767  std::move( close_frame ) );
768  }
769  catch( const std::exception & ex )
770  {
771  m_logger.error( [&]{
772  return fmt::format(
773  "[ws_connection:{}] execute handler error: {}",
774  connection_id(),
775  ex.what() );
776  } );
777  }
778  }
779  }
780 
781  //! Validates a part of received payload.
782  bool
784  char * payload_data,
785  std::size_t length,
786  std::size_t next_length_remaining )
787  {
788  const auto validation_result =
790 
792  {
794 
796  {
797  // Can skip this payload because it was not a bad close frame.
798 
799  // It is the case we are expecting close frame
800  // so validator must be ready to receive more headers
801  // and payloads after this frame.
803 
804  if( 0 == next_length_remaining )
805  {
807  }
808  else
809  {
810  // Skip checking payload for this frame:
811  const bool do_validate_payload_and_call_msg_handler = false;
816  }
817  }
818  return false;
819  }
820 
821  return true;
822  }
823 
824  //! Handle payload errors.
825  void
826  handle_invalid_payload( validation_state_t validation_result )
827  {
828  m_logger.error( [&]{
829  return fmt::format(
830  "[ws_connection:{}] invalid paload",
831  connection_id() );
832  } );
833 
835  {
836  // A corner case: invalid payload in close frame.
837 
839  {
840  // Case: close frame was not expected.
841 
842  // This actually must be executed:
844  [&]{
846  // Do not wait anything in return, because
847  // protocol is violated.
848  } );
849 
850  // Notify user of a close but use a correct close code.
852  }
854  {
855  // Case: close frame was expected.
856 
857  // We got a close frame but it is incorrect,
858  // so just close (there is not too much we can do).
859  close_impl();
860  }
861  }
862  else
863  {
865  {
867  [&]{
870  } );
871 
873  }
874  }
875  }
876 
877  void
879  {
880  auto & md = m_input.m_parser.current_message();
881 
884  {
886  {
888  {
889  m_logger.trace( [&]{
890  return fmt::format(
891  "[ws_connection:{}] got close frame from peer, status: {}",
892  connection_id(),
893  static_cast<std::uint16_t>(
895  } );
896 
899  [&]{
901  } );
902 
904  }
905 
909  md.m_opcode,
910  std::move( m_input.m_payload ) ) );
911 
914  }
915  else
916  {
918 
920  {
921  // Got it!
923 
924  close_impl();
925 
926  m_logger.trace( [&]{
927  return fmt::format(
928  "[ws_connection:{}] expected close frame came",
929  connection_id() );
930  } );
931  }
932  else
933  {
934  // Wait for next frame.
936  }
937  }
938  }
939  else
940  {
942  }
943  }
944 
945  void
946  call_close_handler_if_necessary( status_code_t status )
947  {
949  [&]{
952  final_frame,
954  status_code_to_bin( status ) ) );
955  } );
956  }
957 
958  //! Implementation of writing data performed on the asio_ns::io_context.
959  void
960  write_data_impl( write_group_t wg, bool is_close_frame )
961  {
962  if( m_socket.is_open() )
963  {
964  if( is_close_frame )
965  {
966  m_logger.trace( [&]{
967  return fmt::format(
968  "[ws_connection:{}] user sends close frame",
969  connection_id() );
970  } );
971 
972  m_close_frame_to_peer.disable(); // It is formed and sent by user
973  m_close_frame_to_user.disable(); // And user knows that websocket is closed.
974  // No more writes.
976 
977  // Start waiting only close-frame.
979  }
980 
981  // Push write_group to queue.
983 
985  }
986  else
987  {
988  m_logger.warn( [&]{
989  return fmt::format(
990  "[ws_connection:{}] try to write while socket is closed",
991  connection_id() );
992  } );
993 
994  try
995  {
999  }
1000  catch( ... )
1001  {}
1002  }
1003  }
1004 
1005  //! Checks if there is something to write,
1006  //! and if so starts write operation.
1007  void
1009  {
1011  {
1012  init_write();
1013  }
1014  }
1015 
1016  //! Initiate write operation.
1017  void
1019  {
1020  // Here: not writing anything to socket, so
1021  // write operation can be initiated.
1023 
1024  if( next_write_group )
1025  {
1026  m_logger.trace( [&]{
1027  return fmt::format(
1028  "[ws_connection:{}] start next write group, "
1029  "size: {}",
1030  this->connection_id(),
1032  } );
1033 
1034  // Initialize write context with a new write group.
1036  std::move( next_write_group ) );
1037 
1038  // Start the loop of sending data from current write group.
1040  }
1041  }
1042 
1043  // Use aliases for shorter names.
1047 
1048  void
1050  {
1051  try
1052  {
1054 
1056  {
1058  }
1060  {
1062  }
1063  else
1064  {
1066  throw exception_t{ "sendfile write operation not implemented" };
1067  }
1068  }
1069  catch( const std::exception & ex )
1070  {
1073  [&]{
1074  return fmt::format(
1075  "[ws_connection:{}] handle_current_write_ctx failed: {}",
1076  connection_id(),
1077  ex.what() );
1078  } );
1079  }
1080  }
1081 
1082  void
1083  handle_trivial_write_operation( const trivial_write_operation_t & op )
1084  {
1085  // Asio buffers (param for async write):
1086  auto & bufs = op.get_trivial_bufs();
1087 
1088  m_logger.trace( [&]{
1089  return fmt::format(
1090  "[ws_connection:{}] sending data with "
1091  "buf count: {}, "
1092  "total size: {}",
1093  connection_id(),
1094  bufs.size(),
1095  op.size() ); } );
1096 
1098 
1099  // There is somethig to write.
1101  m_socket,
1102  bufs,
1104  this->get_executor(),
1105  [ this,
1106  ctx = shared_from_this() ]
1107  ( const asio_ns::error_code & ec, std::size_t written ){
1108  try
1109  {
1110  if( !ec )
1111  {
1112  m_logger.trace( [&]{
1113  return fmt::format(
1114  "[ws_connection:{}] outgoing data was sent: {} bytes",
1115  connection_id(),
1116  written );
1117  } );
1118  }
1119 
1120  after_write( ec );
1121  }
1122  catch( const std::exception & ex )
1123  {
1126  [&]{
1127  return fmt::format(
1128  "[ws_connection:{}] after write callback error: {}",
1129  connection_id(),
1130  ex.what() );
1131  } );
1132  }
1133  } ) );
1134  }
1135 
1136  //! Do post write actions for current write group.
1137  void
1139  {
1140  // Finishing writing this group.
1141  m_logger.trace( [&]{
1142  return fmt::format(
1143  "[ws_connection:{}] finishing current write group",
1144  this->connection_id() );
1145  } );
1146 
1147  // Group notificators are called from here (if exist):
1149 
1150  // Start another write opertion
1151  // if there is something to send.
1153  }
1154 
1155  //! Handle write response finished.
1156  void
1157  after_write( const asio_ns::error_code & ec )
1158  {
1159  if( !ec )
1160  {
1162  }
1163  else
1164  {
1167  [&]{
1168  return fmt::format(
1169  "[ws_connection:{}] unable to write: {}",
1170  connection_id(),
1171  ec.message() );
1172  } );
1173 
1174  try
1175  {
1177  }
1178  catch( const std::exception & ex )
1179  {
1180  m_logger.error( [&]{
1181  return fmt::format(
1182  "[ws_connection:{}] notificator error: {}",
1183  connection_id(),
1184  ex.what() );
1185  } );
1186  }
1187  }
1188  }
1189 
1190  //! Common paramaters of a connection.
1192 
1193  //! Connection.
1194  stream_socket_t m_socket;
1195 
1196  //! Timers.
1197  //! \{
1198  static ws_connection_t &
1199  cast_to_self( tcp_connection_ctx_base_t & base )
1200  {
1201  return static_cast< ws_connection_t & >( base );
1202  }
1203 
1204  virtual void
1205  check_timeout( tcp_connection_ctx_handle_t & self ) override
1206  {
1207  asio_ns::dispatch(
1208  this->get_executor(),
1209  [ ctx = std::move( self ) ]{
1211  } );
1212  }
1213 
1218  timer_guard_t m_timer_guard;
1219 
1220  void
1222  {
1223  const auto now = std::chrono::steady_clock::now();
1225  {
1226  m_logger.trace( [&]{
1227  return fmt::format(
1228  "[wd_connection:{}] write operation timed out",
1229  connection_id() );
1230  } );
1233  close_impl();
1234  }
1236  {
1237  m_logger.trace( [&]{
1238  return fmt::format(
1239  "[wd_connection:{}] waiting for close-frame from peer timed out",
1240  connection_id() );
1241  } );
1242  close_impl();
1243  }
1244  else
1245  {
1247  }
1248  }
1249 
1250  //! schedule next timeout checking.
1251  void
1253  {
1255  }
1256 
1257  //! Start guard write operation if necessary.
1258  void
1260  {
1263  }
1264 
1265  void
1267  {
1270  }
1271  //! \}
1272 
1273  //! Input routine.
1275 
1276  //! Helper for validating protocol.
1278 
1279  //! Websocket message handler provided by user.
1280  message_handler_t m_msg_handler;
1281 
1282  //! Logger for operation
1283  logger_t & m_logger;
1284 
1285  //! Write to socket operation context.
1287 
1288  //! Output buffers queue.
1290 
1291  //! A waek handler for owning ws_t to use it when call message handler.
1293 
1294  //! Websocket output states.
1295  enum class write_state_t
1296  {
1297  //! Able to append outgoing data.
1298  write_enabled,
1299  //! No more outgoing data can be added (e.g. close-frame was sent).
1301  };
1302 
1303  //! A state of a websocket output.
1305 
1306  //! Websocket input states.
1307  enum class read_state_t
1308  {
1309  //! Reads any type of frame and serve it to user.
1311  //! Reads only close frame: skip all frames until close-frame.
1313  //! Do not read anything (before activation).
1314  read_nothing
1315  };
1316 
1317  //! A state of a websocket input.
1319 
1320  //! A helper class for running exclusive action.
1321  //! Only a first action will run.
1323  {
1324  public:
1325  template < typename Action >
1326  void
1327  run_if_first( Action && action )
1328  {
1329  if( m_not_executed_yet )
1330  {
1331  m_not_executed_yet = false;
1332  action();
1333  }
1334  }
1335 
1336  //! Disable ation: action will not be executed even on a first shot.
1337  void
1339  {
1340  m_not_executed_yet = false;
1341  }
1342 
1343  private:
1344  bool m_not_executed_yet{ true };
1345  };
1346 
1350 };
1351 
1352 } /* namespace impl */
1353 
1354 } /* namespace basic */
1355 
1356 } /* namespace websocket */
1357 
1358 } /* namespace restinio */
read_state_t m_read_state
A state of a websocket input.
void consume_header_from_buffer(const char *data, std::size_t length)
Parse header from internal buffer.
Context for handling websocket connections.
std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after
restinio::impl::fixed_buffer_t m_buf
Input buffer.
ws_weak_handle_t m_websocket_weak_handle
A waek handler for owning ws_t to use it when call message handler.
void after_read_payload(char *payload_data, std::size_t length_remaining, const asio_ns::error_code &ec, std::size_t length, bool do_validate_payload_and_call_msg_handler=true)
Handle read operation result, when reading payload.
virtual void kill() override
Kill websocket.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
void consume_header_from_socket()
Initiate read operation on socket to receive bytes for header.
void handle_invalid_payload(validation_state_t validation_result)
Handle payload errors.
write_groups_queue_t m_awaiting_write_groups
A queue of buffers.
ws_connection_t & operator=(const ws_connection_t &)=delete
constexpr size_t websocket_header_max_size()
Max possible size of websocket frame header (a part before payload).
void write_data_impl(write_group_t wg, bool is_close_frame)
Implementation of writing data performed on the asio_ns::io_context.
void send_close_frame_to_peer(std::string payload)
Send close frame to peer.
restinio::impl::write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
message_handler_t m_msg_handler
Websocket message handler provided by user.
ws_outgoing_data_t m_outgoing_data
Output buffers queue.
ws_connection_t(const ws_connection_t &)=delete
void handle_read_error(const char *desc, const asio_ns::error_code &ec)
Handle read error (reading header or payload)
void append(write_group_t wg)
Add buffers to queue.
bool validate_payload_part(char *payload_data, std::size_t length, std::size_t next_length_remaining)
Validates a part of received payload.
void disable()
Disable ation: action will not be executed even on a first shot.
No more outgoing data can be added (e.g. close-frame was sent).
void init_read(ws_handle_t wsh) override
Start reading ws-messages.
static ws_connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timers.
Reads only close frame: skip all frames until close-frame.
ws_protocol_validator_t m_protocol_validator
Helper for validating protocol.
void call_close_handler_if_necessary(status_code_t status)
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
void start_waiting_close_frame_only()
Start waiting for close-frame.
void start_read_header()
Start the process of reading ws messages from socket.
restinio::impl::connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
void handle_parsed_header(const message_details_t &md)
Handle parsed header.
void start_read_payload(char *payload_data, std::size_t length_remaining, bool do_validate_payload_and_call_msg_handler=true)
Start reading message payload.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
void init_write_if_necessary()
Checks if there is something to write, and if so starts write operation.
ws_connection_t & operator=(ws_connection_t &&)=delete
void trigger_error_and_close(status_code_t status, MSG_BUILDER msg_builder)
Trigger an error.
void init_next_timeout_checking()
schedule next timeout checking.
virtual void shutdown() override
Shutdown websocket.
void send_close_frame_to_peer(status_code_t code, std::string desc=std::string{})
Send close frame to peer.
void after_read_header(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result, when reading header.
write_state_t m_write_state
A state of a websocket output.
virtual void write_data(write_group_t wg, bool is_close_frame) override
Write pieces of outgoing data.
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
void guard_write_operation()
Start guard write operation if necessary.
void graceful_close()
Close WebSocket connection in a graceful manner.
void reset_parser_and_payload()
Prepare parser for reading new http-message.
void call_message_handler(message_handle_t close_frame)
Call user message handler with current message.
std::chrono::steady_clock::time_point m_write_operation_timeout_after
void handle_parsed_and_valid_header(const message_details_t &md)
Handle parsed and valid header.
std::enable_if< std::is_same< Parameter_Container, query_string_params_t >::value||std::is_same< Parameter_Container, router::route_params_t >::value, optional_t< Value_Type > >::type opt_value(const Parameter_Container &params, string_view_t key)
Gets the value of a parameter specified by key wrapped in optional_t<Value_Type> if parameter exists ...
Definition: value_or.hpp:64
A helper class for running exclusive action. Only a first action will run.