]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
websocket: introduce blocking sends
authorStefan Eissing <stefan@eissing.org>
Thu, 8 Aug 2024 14:00:24 +0000 (16:00 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 12 Aug 2024 17:19:28 +0000 (19:19 +0200)
When using `curl_ws_send()`, perform a blocking send of the data under
the following conditions:

- the websocket is in raw mode and the call is done from within a curl
  callback. A partial write of the data could subsequently mess up the
  ws framing, as a callback has a hard time handling this.

- the websocket is encoding the data itself, has added it to its
  internal sendbuf. A partial flush of the buffer has unclear semantics
  for the caller, as they will have no idea what to send again.

Fixes WebSockets tests with CURL_DBG_SOCK_WBLOCK=90 set.
Closes #14458

docs/TODO
docs/libcurl/curl_global_trace.md
docs/libcurl/curl_ws_send.md
lib/bufq.c
lib/bufq.h
lib/curl_trc.c
lib/curl_trc.h
lib/ws.c

index 40c808eb0864501a80e6c38a8602e4cc7e71fe1b..366b1084bd54cc2577cdfd67324b832b9d1548bb 100644 (file)
--- a/docs/TODO
+++ b/docs/TODO
 
  - curl_multi_remove_handle for any of the above. See section 2.3.
 
+ - Calling curl_ws_send() from a callback
+
 2.2 Better support for same name resolves
 
  If a name resolve has been initiated for name NN and a second easy handle
index 0b0799db3964010313e8a8327981d5a7c6888820..51b10fb41361f6f6950803e6fe3aa55ea060e446 100644 (file)
@@ -109,6 +109,10 @@ Traces reading of upload data from the application in order to send it to the se
 
 Traces writing of download data, received from the server, to the application.
 
+## `ws`
+
+Tracing of WebSocket operations when this protocol is enabled in your build.
+
 # TRACE GROUPS
 
 Besides the specific component names there are the following group names
index 82fce1cff3e0f0d47bd4297d0e82082705c22361..262250b546e3c2449edaaf5bb970699dff3c13c5 100644 (file)
@@ -53,6 +53,10 @@ If **CURLWS_RAW_MODE** is enabled in CURLOPT_WS_OPTIONS(3), the
 To send a message consisting of multiple frames, set the *CURLWS_CONT* bit
 in all frames except the final one.
 
+Warning: while it is possible to invoke this function from a callback,
+such a call is blocking in this situation, e.g. only returns after all data
+has been sent or an error is encountered.
+
 # FLAGS
 
 ## CURLWS_TEXT
index c3245516c9fce016866d3f3bd96ee495de0d415b..46e6eaa386daf1c8a3412f781c523f1c01f2c4c5 100644 (file)
@@ -91,6 +91,23 @@ static size_t chunk_read(struct buf_chunk *chunk,
   }
 }
 
+static size_t chunk_unwrite(struct buf_chunk *chunk, size_t len)
+{
+  size_t n = chunk->w_offset - chunk->r_offset;
+  DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
+  if(!n) {
+    return 0;
+  }
+  else if(n <= len) {
+    chunk->r_offset = chunk->w_offset = 0;
+    return n;
+  }
+  else {
+    chunk->w_offset -= len;
+    return len;
+  }
+}
+
 static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
                             Curl_bufq_reader *reader,
                             void *reader_ctx, CURLcode *err)
@@ -363,6 +380,49 @@ static void prune_head(struct bufq *q)
   }
 }
 
+static struct buf_chunk *chunk_prev(struct buf_chunk *head,
+                                    struct buf_chunk *chunk)
+{
+  while(head) {
+    if(head == chunk)
+      return NULL;
+    if(head->next == chunk)
+      return head;
+    head = head->next;
+  }
+  return NULL;
+}
+
+static void prune_tail(struct bufq *q)
+{
+  struct buf_chunk *chunk;
+
+  while(q->tail && chunk_is_empty(q->tail)) {
+    chunk = q->tail;
+    q->tail = chunk_prev(q->head, chunk);
+    if(q->tail)
+      q->tail->next = NULL;
+    if(q->head == chunk)
+      q->head = q->tail;
+    if(q->pool) {
+      bufcp_put(q->pool, chunk);
+      --q->chunk_count;
+    }
+    else if((q->chunk_count > q->max_chunks) ||
+       (q->opts & BUFQ_OPT_NO_SPARES)) {
+      /* SOFT_LIMIT allowed us more than max. free spares until
+       * we are at max again. Or free them if we are configured
+       * to not use spares. */
+      free(chunk);
+      --q->chunk_count;
+    }
+    else {
+      chunk->next = q->spare;
+      q->spare = chunk;
+    }
+  }
+}
+
 static struct buf_chunk *get_non_full_tail(struct bufq *q)
 {
   struct buf_chunk *chunk;
@@ -428,6 +488,15 @@ CURLcode Curl_bufq_cwrite(struct bufq *q,
   return result;
 }
 
+CURLcode Curl_bufq_unwrite(struct bufq *q, size_t len)
+{
+  while(len && q->tail) {
+    len -= chunk_unwrite(q->head, len);
+    prune_tail(q);
+  }
+  return len? CURLE_AGAIN : CURLE_OK;
+}
+
 ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
                        CURLcode *err)
 {
index 87ffa45da2097c1e2668d7e446b8d62ba50b94d6..ec415648fd4d2a5c53263ff1477ae1ee4a5a4707 100644 (file)
@@ -182,6 +182,12 @@ CURLcode Curl_bufq_cwrite(struct bufq *q,
                          const char *buf, size_t len,
                          size_t *pnwritten);
 
+/**
+ * Remove `len` bytes from the end of the buffer queue again.
+ * Returns CURLE_AGAIN if less than `len` bytes were in the queue.
+ */
+CURLcode Curl_bufq_unwrite(struct bufq *q, size_t len);
+
 /**
  * Read buf from the start of the buffer queue. The buf is copied
  * and the amount of copied bytes is returned.
index 3618275e5d31666adf0fdb14dc5bdc18ef38a1ae..73748729ef1824a9f5695b42e2ff6483d1731bcb 100644 (file)
@@ -221,6 +221,24 @@ void Curl_trc_ftp(struct Curl_easy *data, const char *fmt, ...)
 }
 #endif /* !CURL_DISABLE_FTP */
 
+#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
+struct curl_trc_feat Curl_trc_feat_ws = {
+  "WS",
+  CURL_LOG_LVL_NONE,
+};
+
+void Curl_trc_ws(struct Curl_easy *data, const char *fmt, ...)
+{
+  DEBUGASSERT(!strchr(fmt, '\n'));
+  if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) {
+    va_list ap;
+    va_start(ap, fmt);
+    trc_infof(data, &Curl_trc_feat_ws, fmt, ap);
+    va_end(ap);
+  }
+}
+#endif /* USE_WEBSOCKETS && !CURL_DISABLE_HTTP */
+
 #define TRC_CT_NONE        (0)
 #define TRC_CT_PROTOCOL    (1<<(0))
 #define TRC_CT_NETWORK     (1<<(1))
@@ -240,6 +258,9 @@ static struct trc_feat_def trc_feats[] = {
 #ifndef CURL_DISABLE_DOH
   { &Curl_doh_trc,            TRC_CT_NETWORK },
 #endif
+#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
+  { &Curl_trc_feat_ws,        TRC_CT_PROTOCOL },
+#endif
 };
 
 struct trc_cft_def {
index 3d3801834285c517a74b69ee503b52c77b22beb7..c98234c6ac2e7d7748e2d38f50e6cc2ad097ccf5 100644 (file)
@@ -89,6 +89,11 @@ void Curl_failf(struct Curl_easy *data,
   do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ftp)) \
          Curl_trc_ftp(data, __VA_ARGS__); } while(0)
 #endif /* !CURL_DISABLE_FTP */
+#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
+#define CURL_TRC_WS(data, ...) \
+  do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) \
+         Curl_trc_ws(data, __VA_ARGS__); } while(0)
+#endif /* USE_WEBSOCKETS && !CURL_DISABLE_HTTP */
 
 #else /* CURL_HAVE_C99 */
 
@@ -100,6 +105,9 @@ void Curl_failf(struct Curl_easy *data,
 #ifndef CURL_DISABLE_FTP
 #define CURL_TRC_FTP   Curl_trc_ftp
 #endif
+#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
+#define CURL_TRC_WS    Curl_trc_ws
+#endif
 
 #endif /* !CURL_HAVE_C99 */
 
@@ -148,6 +156,11 @@ extern struct curl_trc_feat Curl_trc_feat_ftp;
 void Curl_trc_ftp(struct Curl_easy *data,
                   const char *fmt, ...) CURL_PRINTF(2, 3);
 #endif
+#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
+extern struct curl_trc_feat Curl_trc_feat_ws;
+void Curl_trc_ws(struct Curl_easy *data,
+                 const char *fmt, ...) CURL_PRINTF(2, 3);
+#endif
 
 
 #else /* defined(CURL_DISABLE_VERBOSE_STRINGS) */
index 5854e5a5d3fb8c06aafb3ae1e309716de6da83e9..9f80fec9f057b778bfa3e8ce87a288bed964ade3 100644 (file)
--- a/lib/ws.c
+++ b/lib/ws.c
@@ -37,6 +37,7 @@
 #include "ws.h"
 #include "easyif.h"
 #include "transfer.h"
+#include "select.h"
 #include "nonblock.h"
 
 /* The last 3 #include files should be in this order */
@@ -136,6 +137,9 @@ static void ws_dec_info(struct ws_decoder *dec, struct Curl_easy *data,
   }
 }
 
+static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws,
+                                     const char *buffer, size_t buflen);
+
 typedef ssize_t ws_write_payload(const unsigned char *buf, size_t buflen,
                                  int frame_age, int frame_flags,
                                  curl_off_t payload_offset,
@@ -773,7 +777,7 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
       }
     }
 #endif
-    DEBUGF(infof(data, "WS, using chunk size %zu", chunk_size));
+    CURL_TRC_WS(data, "WS, using chunk size %zu", chunk_size);
     Curl_bufq_init2(&ws->recvbuf, chunk_size, WS_CHUNK_COUNT,
                     BUFQ_OPT_SOFT_LIMIT);
     Curl_bufq_init2(&ws->sendbuf, chunk_size, WS_CHUNK_COUNT,
@@ -970,8 +974,8 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer,
         infof(data, "connection expectedly closed?");
         return CURLE_GOT_NOTHING;
       }
-      DEBUGF(infof(data, "curl_ws_recv, added %zu bytes from network",
-                   Curl_bufq_len(&ws->recvbuf)));
+      CURL_TRC_WS(data, "curl_ws_recv, added %zu bytes from network",
+                  Curl_bufq_len(&ws->recvbuf));
     }
 
     result = ws_dec_pass(&ws->dec, data, &ws->recvbuf,
@@ -1001,14 +1005,14 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer,
               ctx.payload_len, ctx.bufidx);
   *metap = &ws->frame;
   *nread = ws->frame.len;
-  /* infof(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
-           CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)",
-           buflen, *nread, ws->frame.offset, ws->frame.bytesleft); */
+  CURL_TRC_WS(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
+               CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)",
+               buflen, *nread, ws->frame.offset, ws->frame.bytesleft);
   return CURLE_OK;
 }
 
 static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
-                         bool complete)
+                         bool blocking)
 {
   if(!Curl_bufq_is_empty(&ws->sendbuf)) {
     CURLcode result;
@@ -1016,7 +1020,11 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
     size_t outlen, n;
 
     while(Curl_bufq_peek(&ws->sendbuf, &out, &outlen)) {
-      if(data->set.connect_only)
+      if(blocking) {
+        result = ws_send_raw_blocking(data, ws, (char *)out, outlen);
+        n = result? 0 : outlen;
+      }
+      else if(data->set.connect_only || Curl_is_in_callback(data))
         result = Curl_senddata(data, out, outlen, &n);
       else {
         result = Curl_xfer_send(data, out, outlen, FALSE, &n);
@@ -1024,22 +1032,14 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
           result = CURLE_AGAIN;
       }
 
-      if(result) {
-        if(result == CURLE_AGAIN) {
-          if(!complete) {
-            infof(data, "WS: flush EAGAIN, %zu bytes remain in buffer",
-                  Curl_bufq_len(&ws->sendbuf));
-            return result;
-          }
-          /* TODO: the current design does not allow for buffered writes.
-           * We need to flush the buffer now. There is no ws_flush() later */
-          n = 0;
-          continue;
-        }
-        else if(result) {
-          failf(data, "WS: flush, write error %d", result);
-          return result;
-        }
+      if(result == CURLE_AGAIN) {
+        CURL_TRC_WS(data, "flush EAGAIN, %zu bytes remain in buffer",
+                    Curl_bufq_len(&ws->sendbuf));
+        return result;
+      }
+      else if(result) {
+        failf(data, "WS: flush, write error %d", result);
+        return result;
       }
       else {
         infof(data, "WS: flushed %zu bytes", n);
@@ -1050,6 +1050,83 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
   return CURLE_OK;
 }
 
+static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws,
+                                     const char *buffer, size_t buflen)
+{
+  CURLcode result = CURLE_OK;
+  size_t nwritten;
+
+  (void)ws;
+  while(buflen) {
+    result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten);
+    if(result)
+      return result;
+    DEBUGASSERT(nwritten <= buflen);
+    buffer += nwritten;
+    buflen -= nwritten;
+    if(buflen) {
+      curl_socket_t sock = data->conn->sock[FIRSTSOCKET];
+      timediff_t left_ms;
+      int ev;
+
+      CURL_TRC_WS(data, "ws_send_raw_blocking() partial, %zu left to send",
+                  buflen);
+      left_ms = Curl_timeleft(data, NULL, FALSE);
+      if(left_ms < 0) {
+        failf(data, "Timeout waiting for socket becoming writable");
+        return CURLE_SEND_ERROR;
+      }
+
+      /* POLLOUT socket */
+      if(sock == CURL_SOCKET_BAD)
+        return CURLE_SEND_ERROR;
+      ev = Curl_socket_check(CURL_SOCKET_BAD, CURL_SOCKET_BAD, sock,
+                             left_ms? left_ms : 500);
+      if(ev < 0) {
+        failf(data, "Error while waiting for socket becoming writable");
+        return CURLE_SEND_ERROR;
+      }
+    }
+  }
+  return result;
+}
+
+static CURLcode ws_send_raw(CURL *data, const void *buffer,
+                            size_t buflen, size_t *pnwritten)
+{
+  struct websocket *ws = data->conn->proto.ws;
+  CURLcode result;
+
+  if(!ws) {
+    failf(data, "Not a websocket transfer");
+    return CURLE_SEND_ERROR;
+  }
+  if(!buflen)
+    return CURLE_OK;
+
+  if(Curl_is_in_callback(data)) {
+    /* When invoked from inside callbacks, we do a blocking send as the
+     * callback will probably not implement partial writes that may then
+     * mess up the ws framing subsequently.
+     * We need any pending data to be flushed before sending. */
+    result = ws_flush(data, ws, TRUE);
+    if(result)
+      return result;
+    result = ws_send_raw_blocking(data, ws, buffer, buflen);
+  }
+  else {
+    /* We need any pending data to be sent or EAGAIN this call. */
+    result = ws_flush(data, ws, FALSE);
+    if(result)
+      return result;
+    result = Curl_senddata(data, buffer, buflen, pnwritten);
+  }
+
+  CURL_TRC_WS(data, "ws_send_raw(len=%zu) -> %d, %zu",
+              buflen, result, *pnwritten);
+  return result;
+}
+
 CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
                                   size_t buflen, size_t *sent,
                                   curl_off_t fragsize,
@@ -1057,60 +1134,53 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
 {
   struct websocket *ws;
   ssize_t n;
-  size_t nwritten, space;
+  size_t space, payload_added;
   CURLcode result;
 
+  CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T
+              ", flags=%x), raw=%d",
+              buflen, fragsize, flags, data->set.ws_raw_mode);
   *sent = 0;
   if(!data->conn && data->set.connect_only) {
     result = Curl_connect_only_attach(data);
     if(result)
-      return result;
+      goto out;
   }
   if(!data->conn) {
     failf(data, "No associated connection");
-    return CURLE_SEND_ERROR;
+    result = CURLE_SEND_ERROR;
+    goto out;
   }
   if(!data->conn->proto.ws) {
     failf(data, "Not a websocket transfer");
-    return CURLE_SEND_ERROR;
+    result = CURLE_SEND_ERROR;
+    goto out;
   }
   ws = data->conn->proto.ws;
 
+  /* try flushing any content still waiting to be sent. */
+  result = ws_flush(data, ws, FALSE);
+  if(result)
+    goto out;
+
   if(data->set.ws_raw_mode) {
+    /* In raw mode, we write directly to the connection */
     if(fragsize || flags) {
-      DEBUGF(infof(data, "ws_send: "
-                   "fragsize and flags cannot be non-zero in raw mode"));
+      failf(data, "ws_send, raw mode: fragsize and flags cannot be non-zero");
       return CURLE_BAD_FUNCTION_ARGUMENT;
     }
-    if(!buflen)
-      /* nothing to do */
-      return CURLE_OK;
-    /* raw mode sends exactly what was requested, and this is from within
-       the write callback */
-    if(Curl_is_in_callback(data)) {
-      result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten);
-    }
-    else
-      result = Curl_senddata(data, buffer, buflen, &nwritten);
-
-    infof(data, "WS: wanted to send %zu bytes, sent %zu bytes",
-          buflen, nwritten);
-    *sent = nwritten;
-    return result;
+    result = ws_send_raw(data, buffer, buflen, sent);
+    goto out;
   }
 
   /* Not RAW mode, buf we do the frame encoding */
-  result = ws_flush(data, ws, FALSE);
-  if(result)
-    return result;
-
-  /* TODO: the current design does not allow partial writes, afaict.
-   * It is not clear how the application is supposed to react. */
   space = Curl_bufq_space(&ws->sendbuf);
-  DEBUGF(infof(data, "curl_ws_send(len=%zu), sendbuf len=%zu space %zu",
-               buflen, Curl_bufq_len(&ws->sendbuf), space));
-  if(space < 14)
-    return CURLE_AGAIN;
+  CURL_TRC_WS(data, "curl_ws_send(len=%zu), sendbuf=%zu space_left=%zu",
+              buflen, Curl_bufq_len(&ws->sendbuf), space);
+  if(space < 14) {
+    result = CURLE_AGAIN;
+    goto out;
+  }
 
   if(flags & CURLWS_OFFSET) {
     if(fragsize) {
@@ -1118,7 +1188,7 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
       n = ws_enc_write_head(data, &ws->enc, flags, fragsize,
                             &ws->sendbuf, &result);
       if(n < 0)
-        return result;
+        goto out;
     }
     else {
       if((curl_off_t)buflen > ws->enc.payload_remain) {
@@ -1132,16 +1202,66 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
     n = ws_enc_write_head(data, &ws->enc, flags, (curl_off_t)buflen,
                           &ws->sendbuf, &result);
     if(n < 0)
-      return result;
+      goto out;
   }
 
   n = ws_enc_write_payload(&ws->enc, data,
                            buffer, buflen, &ws->sendbuf, &result);
   if(n < 0)
-    return result;
+    goto out;
+  payload_added = (size_t)n;
+
+  while(!result && (buflen || !Curl_bufq_is_empty(&ws->sendbuf))) {
+    /* flush, blocking when in callback */
+    result = ws_flush(data, ws, Curl_is_in_callback(data));
+    if(!result) {
+      DEBUGASSERT(payload_added <= buflen);
+      /* all buffered data sent. Try sending the rest if there is any. */
+      *sent += payload_added;
+      buffer = (const char *)buffer + payload_added;
+      buflen -= payload_added;
+      payload_added = 0;
+      if(buflen) {
+        n = ws_enc_write_payload(&ws->enc, data,
+                                 buffer, buflen, &ws->sendbuf, &result);
+        if(n < 0)
+          goto out;
+        payload_added = Curl_bufq_len(&ws->sendbuf);
+      }
+    }
+    else if(result == CURLE_AGAIN) {
+      /* partially sent. how much of the call data has been part of it? what
+      * should we report to out caller so it can retry/send the rest? */
+      if(payload_added < buflen) {
+        /* We did not add everything the caller wanted. Return just
+         * the partial write to our buffer. */
+        *sent = payload_added;
+        result = CURLE_OK;
+        goto out;
+      }
+      else if(!buflen) {
+        /* We have no payload to report a partial write. EAGAIN would make
+         * the caller repeat this and add the frame again.
+         * Flush blocking seems the only way out of this. */
+        *sent = (size_t)n;
+        result = ws_flush(data, ws, TRUE);
+        goto out;
+      }
+      /* We added the complete data to our sendbuf. Report one byte less as
+       * sent. This parital success should make the caller invoke us again
+       * with the last byte. */
+      *sent = payload_added - 1;
+      result = Curl_bufq_unwrite(&ws->sendbuf, 1);
+      if(!result)
+        result = CURLE_AGAIN;
+    }
+  }
 
-  *sent = (size_t)n;
-  return ws_flush(data, ws, TRUE);
+out:
+  CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T
+              ", flags=%x, raw=%d) -> %d, %zu",
+              buflen, fragsize, flags, data->set.ws_raw_mode, result, *sent);
+  return result;
 }
 
 static void ws_free(struct connectdata *conn)