]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
websocket: support CURLOPT_READFUNCTION
authorStefan Eissing <stefan@eissing.org>
Tue, 8 Jul 2025 07:15:43 +0000 (09:15 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 11 Aug 2025 21:28:54 +0000 (23:28 +0200)
Add support for CURLOPT_READFUNCTION with WebSocket urls when *not* in
connect-only mode, e.g. when using curl_multi_perform.

Install the callback function and set CURLOPT_UPLOAD. Return
CURL_READFUNC_PAUSE when having nothing more to send and unpause the
transfer when more data is ready.

This will send the read bytes in a WebSocket BINARY frame.

Add support for this mode in the pytest "ws_data" client and have all
tests run in 'curl_ws_send/recv' and 'peform' mode as well.

Add `curl_ws_start_frame()`. Document, cover in libcurl-ws.md and
explain the READFUNCTION mode for websockets.

Add example `websocket-updown` for this.

Closes #17683

17 files changed:
docs/examples/.gitignore
docs/examples/Makefile.inc
docs/examples/websocket-updown.c [new file with mode: 0644]
docs/libcurl/Makefile.inc
docs/libcurl/curl_ws_send.md
docs/libcurl/curl_ws_start_frame.md [new file with mode: 0644]
docs/libcurl/libcurl-ws.md
docs/libcurl/opts/CURLOPT_WS_OPTIONS.md
include/curl/websockets.h
lib/http.c
lib/libcurl.def
lib/ws.c
scripts/singleuse.pl
tests/data/test1135
tests/data/test2300
tests/http/test_20_websockets.py
tests/libtest/cli_ws_data.c

index 2a6371faf47f5a3f1028fc1aeb6696986adbf79c..4b78bd78dd05d9dbd59287a7e171a37dedaa8ea9 100644 (file)
@@ -19,6 +19,7 @@ ephiperfifo
 evhiperfifo
 externalsocket
 fileupload
+ftp-delete
 ftp-wildcard
 ftpget
 ftpgetinfo
@@ -129,4 +130,5 @@ urlapi
 usercertinmem
 websocket
 websocket-cb
+websocket-updown
 xmlstream
index 57e320c96ed908bb8e5e28fd0f8373b42ce06aeb..c5be0f881b72b43e1fcabbc06fa3e5707d76dfa6 100644 (file)
@@ -136,7 +136,8 @@ check_PROGRAMS = \
   url2file \
   urlapi \
   websocket \
-  websocket-cb
+  websocket-cb \
+  websocket-updown
 
 # These examples require external dependencies that may not be commonly
 # available on POSIX systems, so do not bother attempting to compile them here.
diff --git a/docs/examples/websocket-updown.c b/docs/examples/websocket-updown.c
new file mode 100644 (file)
index 0000000..a32d488
--- /dev/null
@@ -0,0 +1,125 @@
+/***************************************************************************
+ *                                  _   _ ____  _
+ *  Project                     ___| | | |  _ \| |
+ *                             / __| | | | |_) | |
+ *                            | (__| |_| |  _ <| |___
+ *                             \___|\___/|_| \_\_____|
+ *
+ * Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+ *
+ * This software is licensed as described in the file COPYING, which
+ * you should have received as part of this distribution. The terms
+ * are also available at https://curl.se/docs/copyright.html.
+ *
+ * You may opt to use, copy, modify, merge, publish, distribute and/or sell
+ * copies of the Software, and permit persons to whom the Software is
+ * furnished to do so, under the terms of the COPYING file.
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+ * KIND, either express or implied.
+ *
+ * SPDX-License-Identifier: curl
+ *
+ ***************************************************************************/
+/* <DESC>
+ * WebSocket download-only using write callback
+ * </DESC>
+ */
+#include <stdio.h>
+#include <string.h>
+#include <curl/curl.h>
+
+static size_t writecb(char *b, size_t size, size_t nitems, void *p)
+{
+  CURL *easy = p;
+  size_t i;
+  unsigned int blen = (unsigned int)(nitems * size);
+  const struct curl_ws_frame *frame = curl_ws_meta(easy);
+  fprintf(stderr, "Type: %s\n", frame->flags & CURLWS_BINARY ?
+          "binary" : "text");
+  if(frame->flags & CURLWS_BINARY) {
+    fprintf(stderr, "Bytes: %u", blen);
+    for(i = 0; i < nitems; i++)
+      fprintf(stderr, "%02x ", (unsigned char)b[i]);
+    fprintf(stderr, "\n");
+  }
+  else
+    fprintf(stderr, "Text: %.*s\n", (int)blen, b);
+  return nitems;
+}
+
+struct read_ctx {
+  CURL *easy;
+  char buf[1024];
+  size_t blen;
+  size_t nsent;
+};
+
+static size_t readcb(char *buf, size_t nitems, size_t buflen, void *p)
+{
+  struct read_ctx *ctx = p;
+  size_t len = nitems * buflen;
+  size_t left = ctx->blen - ctx->nsent;
+  CURLcode result;
+
+  if(!ctx->nsent) {
+    /* On first call, set the FRAME information to be used (it defaults
+     * to CURLWS_BINARY otherwise). */
+    result = curl_ws_start_frame(ctx->easy, CURLWS_TEXT,
+                                 (curl_off_t)ctx->blen);
+    if(result) {
+      fprintf(stderr, "error staring frame: %d\n", result);
+      return CURL_READFUNC_ABORT;
+    }
+  }
+  fprintf(stderr, "read(len=%d, left=%d)\n", (int)len, (int)left);
+  if(left) {
+    if(left < len)
+      len = left;
+    memcpy(buf, ctx->buf + ctx->nsent, len);
+    ctx->nsent += len;
+    return len;
+  }
+  return 0;
+}
+
+int main(int argc, const char *argv[])
+{
+  CURL *easy;
+  struct read_ctx rctx;
+  CURLcode res;
+  const char *payload = "Hello, friend!";
+
+  memset(&rctx, 0, sizeof(rctx));
+
+  easy = curl_easy_init();
+  if(!easy)
+    return 1;
+
+  if(argc == 2)
+    curl_easy_setopt(easy, CURLOPT_URL, argv[1]);
+  else
+    curl_easy_setopt(easy, CURLOPT_URL, "wss://example.com");
+
+  curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, writecb);
+  curl_easy_setopt(easy, CURLOPT_WRITEDATA, easy);
+  curl_easy_setopt(easy, CURLOPT_READFUNCTION, readcb);
+  /* tell curl that we want to send the payload */
+  rctx.easy = easy;
+  rctx.blen = strlen(payload);
+  memcpy(rctx.buf, payload, rctx.blen);
+  curl_easy_setopt(easy, CURLOPT_READDATA, &rctx);
+  curl_easy_setopt(easy, CURLOPT_UPLOAD, 1L);
+
+
+  /* Perform the request, res gets the return code */
+  res = curl_easy_perform(easy);
+  /* Check for errors */
+  if(res != CURLE_OK)
+    fprintf(stderr, "curl_easy_perform() failed: %s\n",
+            curl_easy_strerror(res));
+
+  /* always cleanup */
+  curl_easy_cleanup(easy);
+  return 0;
+}
index 9142f65fabd8f16073723b8a51b18d44fc00b06a..9bc665d1c625117ad9ad8d7ff98ae5cc0c24bc73 100644 (file)
@@ -112,6 +112,7 @@ man_MANS = \
  curl_ws_meta.3 \
  curl_ws_recv.3 \
  curl_ws_send.3 \
+ curl_ws_start_frame.3 \
  libcurl-easy.3 \
  libcurl-env-dbg.3 \
  libcurl-env.3 \
index 757059fbd48a1ad93c54086ce026a922e8b5579b..ac8f020321263892634b89caf8726501cbb67bbb 100644 (file)
@@ -9,6 +9,7 @@ See-also:
   - curl_easy_perform (3)
   - curl_easy_setopt (3)
   - curl_ws_recv (3)
+  - curl_ws_start_frame (3)
   - libcurl-ws (3)
 Protocol:
   - WS
diff --git a/docs/libcurl/curl_ws_start_frame.md b/docs/libcurl/curl_ws_start_frame.md
new file mode 100644 (file)
index 0000000..efce758
--- /dev/null
@@ -0,0 +1,143 @@
+---
+c: Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
+SPDX-License-Identifier: curl
+Title: curl_ws_start_frame
+Section: 3
+Source: libcurl
+See-also:
+  - curl_easy_getinfo (3)
+  - curl_easy_perform (3)
+  - curl_easy_setopt (3)
+  - curl_ws_recv (3)
+  - libcurl-ws (3)
+Protocol:
+  - WS
+Added-in: 8.16.0
+---
+
+# NAME
+
+curl_ws_start_frame - start a new WebSocket frame
+
+# SYNOPSIS
+
+~~~c
+#include <curl/curl.h>
+
+CURLcode curl_ws_start_frame(CURL *curl,
+                             unsigned int flags,
+                             curl_off_t frame_len);
+~~~
+
+# DESCRIPTION
+
+Add the WebSocket frame header for the given flags and length to
+the transfers send buffer for WebSocket encoded data. Intended for
+use in a CURLOPT_READFUNCTION(3) callback.
+
+When using a CURLOPT_READFUNCTION(3) in a WebSocket transfer, any
+data returned by that function is sent as a *CURLWS_BINARY* frame
+with the length being the amount of data read.
+
+To send larger frames or frames of a different type, call
+curl_ws_start_frame() from within the read function and then return
+the data belonging to the frame.
+
+The function fails, if a previous frame has not been completely
+read yet. Also it fails in *CURLWS_RAW_MODE*.
+
+# FLAGS
+
+Supports all flags documented in curl_ws_meta(3).
+
+# %PROTOCOLS%
+
+# EXAMPLE
+
+~~~c
+#include <string.h> /* for strlen */
+
+struct read_ctx {
+  CURL *easy;
+  char *message;
+  size_t msg_len;
+  size_t nsent;
+};
+
+static size_t readcb(char *buf, size_t nitems, size_t buflen, void *p)
+{
+  struct read_ctx *ctx = p;
+  size_t len = nitems * buflen;
+  size_t left = ctx->msg_len - ctx->nsent;
+  CURLcode result;
+
+  if(!ctx->nsent) {
+    /* Want to send TEXT frame. */
+    result = curl_ws_start_frame(ctx->easy, CURLWS_TEXT,
+                                 (curl_off_t)ctx->msg_len);
+    if(result) {
+      fprintf(stderr, "error staring frame: %d\n", result);
+      return CURL_READFUNC_ABORT;
+    }
+  }
+  if(left) {
+    if(left < len)
+      len = left;
+    memcpy(buf, ctx->message + ctx->nsent, len);
+    ctx->nsent += len;
+    return len;
+  }
+  return 0;
+}
+
+int main(void)
+{
+  CURL *easy;
+  struct read_ctx rctx;
+  CURLcode res;
+
+  easy = curl_easy_init();
+  if(!easy)
+    return 1;
+
+  curl_easy_setopt(easy, CURLOPT_URL, "wss://example.com");
+  curl_easy_setopt(easy, CURLOPT_READFUNCTION, readcb);
+  /* tell curl that we want to send the payload */
+  memset(&rctx, 0, sizeof(rctx));
+  rctx.easy = easy;
+  rctx.message = "Hello, friend!";
+  rctx.msg_len = strlen(rctx.message);
+  curl_easy_setopt(easy, CURLOPT_READDATA, &rctx);
+  curl_easy_setopt(easy, CURLOPT_UPLOAD, 1L);
+
+  /* Perform the request, res gets the return code */
+  res = curl_easy_perform(easy);
+  /* Check for errors */
+  if(res != CURLE_OK)
+    fprintf(stderr, "curl_easy_perform() failed: %s\n",
+            curl_easy_strerror(res));
+
+  /* always cleanup */
+  curl_easy_cleanup(easy);
+  return 0;
+}
+
+~~~
+
+# %AVAILABILITY%
+
+# RETURN VALUE
+
+This function returns a CURLcode indicating success or error.
+
+CURLE_OK (0) means everything was OK, non-zero means an error occurred, see
+libcurl-errors(3). If CURLOPT_ERRORBUFFER(3) was set with curl_easy_setopt(3)
+there can be an error message stored in the error buffer when non-zero is
+returned.
+
+Instead of blocking, the function returns **CURLE_AGAIN**. The correct
+behavior is then to wait for the socket to signal readability before calling
+this function again.
+
+Any other non-zero return value indicates an error. See the libcurl-errors(3)
+man page for the full list with descriptions.
index 1ef3074d5d848238896ddbbf1fc13ba4c3accafe..147740157feed7b448eab5abf8eeb616ae5902e6 100644 (file)
@@ -97,14 +97,17 @@ Because of the many different ways WebSocket can be used, which is much more
 flexible than limited to plain downloads or uploads, libcurl offers two
 different API models to use it:
 
-1. CURLOPT_WRITEFUNCTION model:
+1. CURLOPT_WRITEFUNCTION/CURLOPT_READFUNCTION model:
 Using a write callback with CURLOPT_WRITEFUNCTION(3) much like other
 downloads for when the traffic is download oriented.
 
+Using a read callback with CURLOPT_READFUNCTION(3) much like other
+uploads for sending WebSocket frames to the server.
+
 2. CURLOPT_CONNECT_ONLY model:
 Using curl_ws_recv(3) and curl_ws_send(3) functions.
 
-## CURLOPT_WRITEFUNCTION MODEL
+## CURLOPT_WRITEFUNCTION/CURLOPT_READFUNCTION MODEL
 
 CURLOPT_CONNECT_ONLY(3) must be unset or **0L** for this model to take effect.
 
@@ -114,6 +117,21 @@ callback configured in CURLOPT_WRITEFUNCTION(3), whenever an incoming chunk
 of WebSocket data is received. The callback is handed a pointer to the payload
 data as an argument and can call curl_ws_meta(3) to get relevant metadata.
 
+With libcurl 8.16.0 or later, sending of WebSocket frames via a
+CURLOPT_READFUNCTION(3) is supported. To use that on such a connection,
+register a callback via CURLOPT_READFUNCTION(3) and set CURLOPT_UPLOAD(3)
+as well. Once, the WebSocket connection is established, your callback is
+invoked to get data to send. That data is sent in a *CURLWS_BINARY* frame with
+length of exactly the data returned.
+
+To send other frame types or longer frames, use curl_ws_start_frame(3)
+in the read callback. See the *websocket-updown* example.
+
+When using curl_multi_perform(3) to drive transfers, more possibilities
+exist. The CURLOPT_READFUNCTION(3) may return *CURL_READFUNC_PAUSE* when
+it has no more data to send. Calling curl_easy_pause(3) afterwards
+resumes the upload and the read callback is invoked again.
+
 ## CURLOPT_CONNECT_ONLY MODEL
 
 CURLOPT_CONNECT_ONLY(3) must be **2L** for this model to take effect.
index 95d56b0c4af29dd91ae378905e8fcef2437c2e97..d1488a762b14d1739ca7af386662d9514e3ad852 100644 (file)
@@ -39,6 +39,7 @@ Available bits in the bitmask
 ## CURLWS_RAW_MODE (1)
 
 Deliver "raw" WebSocket traffic to the CURLOPT_WRITEFUNCTION(3)
+callback. Read "raw" WebSocket traffic from the CURLOPT_READFUNCTION(3)
 callback.
 
 In raw mode, libcurl does not handle pings or any other frame for the
index 1337a3a090e9cb8cd923cddcdbc2c23ec3ae2710..df8590f399d5164f464d11a94e82ca03b04e907c 100644 (file)
@@ -72,6 +72,19 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *curl, const void *buffer,
                                   curl_off_t fragsize,
                                   unsigned int flags);
 
+/*
+ * NAME curl_ws_start_frame()
+ *
+ * DESCRIPTION
+ *
+ * Buffers a websocket frame header with the given flags and length.
+ * Errors when a previous frame is not complete, e.g. not all its
+ * payload has been added.
+ */
+CURL_EXTERN CURLcode curl_ws_start_frame(CURL *curl,
+                                         unsigned int flags,
+                                         curl_off_t frame_len);
+
 /* bits for the CURLOPT_WS_OPTIONS bitmask: */
 #define CURLWS_RAW_MODE   (1L<<0)
 #define CURLWS_NOAUTOPONG (1L<<1)
index 758ee50dea1f28811a8f9ac859d9ca50cb6ad619..06ab3f1645943ae2601b9500c6b1ef286e085e64 100644 (file)
@@ -1822,7 +1822,9 @@ void Curl_http_method(struct Curl_easy *data, struct connectdata *conn,
 {
   Curl_HttpReq httpreq = (Curl_HttpReq)data->state.httpreq;
   const char *request;
-  if((conn->handler->protocol&(PROTO_FAMILY_HTTP|CURLPROTO_FTP)) &&
+  if(conn->handler->protocol&(CURLPROTO_WS|CURLPROTO_WSS))
+    httpreq = HTTPREQ_GET;
+  else if((conn->handler->protocol&(PROTO_FAMILY_HTTP|CURLPROTO_FTP)) &&
      data->state.upload)
     httpreq = HTTPREQ_PUT;
 
@@ -3771,9 +3773,6 @@ static CURLcode http_on_response(struct Curl_easy *data,
         if(result)
           goto out;
         *pconsumed += blen; /* ws accept handled the data */
-        k->header = FALSE; /* we will not get more responses */
-        if(data->set.connect_only)
-          k->keepon &= ~KEEP_RECV; /* read no more content */
       }
 #endif
       else {
index ae64776dc270266fc7580bf3cc5c66f0a1c20bce..d2f5d8318f2bea2ee6e945481cb1a36bae48516b 100644 (file)
@@ -96,3 +96,4 @@ curl_version_info
 curl_ws_meta
 curl_ws_recv
 curl_ws_send
+curl_ws_start_frame
index 7d7bf481577acceee4ab7ba7fdf3ded5e789354c..82b8a2c22099a99f96de885c2f8fbf60dae59a45 100644 (file)
--- a/lib/ws.c
+++ b/lib/ws.c
@@ -116,7 +116,7 @@ struct websocket {
   struct ws_encoder enc;  /* decode of we frames */
   struct bufq recvbuf;    /* raw data from the server */
   struct bufq sendbuf;    /* raw data to be sent to the server */
-  struct curl_ws_frame frame;  /* the current WS FRAME received */
+  struct curl_ws_frame recvframe;  /* the current WS FRAME received */
   size_t sendbuf_payload; /* number of payload bytes in sendbuf */
 };
 
@@ -220,62 +220,66 @@ static int ws_frame_firstbyte2flags(struct Curl_easy *data,
   }
 }
 
-static unsigned char ws_frame_flags2firstbyte(struct Curl_easy *data,
-                                              unsigned int flags,
-                                              bool contfragment,
-                                              CURLcode *err)
+static CURLcode ws_frame_flags2firstbyte(struct Curl_easy *data,
+                                         unsigned int flags,
+                                         bool contfragment,
+                                         unsigned char *pfirstbyte)
 {
+  *pfirstbyte = 0;
   switch(flags & ~CURLWS_OFFSET) {
     case 0:
       if(contfragment) {
         infof(data, "[WS] no flags given; interpreting as continuation "
                     "fragment for compatibility");
-        return (WSBIT_OPCODE_CONT | WSBIT_FIN);
+        *pfirstbyte = (WSBIT_OPCODE_CONT | WSBIT_FIN);
+        return CURLE_OK;
       }
       failf(data, "[WS] no flags given");
-      *err = CURLE_BAD_FUNCTION_ARGUMENT;
-      return 0xff;
+      return CURLE_BAD_FUNCTION_ARGUMENT;
     case CURLWS_CONT:
       if(contfragment) {
         infof(data, "[WS] setting CURLWS_CONT flag without message type is "
                     "supported for compatibility but highly discouraged");
-        return WSBIT_OPCODE_CONT;
+        *pfirstbyte = WSBIT_OPCODE_CONT;
+        return CURLE_OK;
       }
       failf(data, "[WS] No ongoing fragmented message to continue");
-      *err = CURLE_BAD_FUNCTION_ARGUMENT;
-      return 0xff;
+      return CURLE_BAD_FUNCTION_ARGUMENT;
     case CURLWS_TEXT:
-      return contfragment ? (WSBIT_OPCODE_CONT | WSBIT_FIN)
-                          : (WSBIT_OPCODE_TEXT | WSBIT_FIN);
+      *pfirstbyte = contfragment ? (WSBIT_OPCODE_CONT | WSBIT_FIN)
+                                 : (WSBIT_OPCODE_TEXT | WSBIT_FIN);
+      return CURLE_OK;
     case (CURLWS_TEXT | CURLWS_CONT):
-      return contfragment ? WSBIT_OPCODE_CONT : WSBIT_OPCODE_TEXT;
+      *pfirstbyte = contfragment ? WSBIT_OPCODE_CONT : WSBIT_OPCODE_TEXT;
+      return CURLE_OK;
     case CURLWS_BINARY:
-      return contfragment ? (WSBIT_OPCODE_CONT | WSBIT_FIN)
-                          : (WSBIT_OPCODE_BIN | WSBIT_FIN);
+      *pfirstbyte = contfragment ? (WSBIT_OPCODE_CONT | WSBIT_FIN)
+                                 : (WSBIT_OPCODE_BIN | WSBIT_FIN);
+      return CURLE_OK;
     case (CURLWS_BINARY | CURLWS_CONT):
-      return contfragment ? WSBIT_OPCODE_CONT : WSBIT_OPCODE_BIN;
+      *pfirstbyte = contfragment ? WSBIT_OPCODE_CONT : WSBIT_OPCODE_BIN;
+      return CURLE_OK;
     case CURLWS_CLOSE:
-      return WSBIT_OPCODE_CLOSE | WSBIT_FIN;
+      *pfirstbyte = WSBIT_OPCODE_CLOSE | WSBIT_FIN;
+      return CURLE_OK;
     case (CURLWS_CLOSE | CURLWS_CONT):
       failf(data, "[WS] CLOSE frame must not be fragmented");
-      *err = CURLE_BAD_FUNCTION_ARGUMENT;
-      return 0xff;
+      return CURLE_BAD_FUNCTION_ARGUMENT;
     case CURLWS_PING:
-      return WSBIT_OPCODE_PING | WSBIT_FIN;
+      *pfirstbyte = WSBIT_OPCODE_PING | WSBIT_FIN;
+      return CURLE_OK;
     case (CURLWS_PING | CURLWS_CONT):
       failf(data, "[WS] PING frame must not be fragmented");
-      *err = CURLE_BAD_FUNCTION_ARGUMENT;
-      return 0xff;
+      return CURLE_BAD_FUNCTION_ARGUMENT;
     case CURLWS_PONG:
-      return WSBIT_OPCODE_PONG | WSBIT_FIN;
+      *pfirstbyte = WSBIT_OPCODE_PONG | WSBIT_FIN;
+      return CURLE_OK;
     case (CURLWS_PONG | CURLWS_CONT):
       failf(data, "[WS] PONG frame must not be fragmented");
-      *err = CURLE_BAD_FUNCTION_ARGUMENT;
-      return 0xff;
+      return CURLE_BAD_FUNCTION_ARGUMENT;
     default:
       failf(data, "[WS] unknown flags: %x", flags);
-      *err = CURLE_BAD_FUNCTION_ARGUMENT;
-      return 0xff;
+      return CURLE_BAD_FUNCTION_ARGUMENT;
   }
 }
 
@@ -518,7 +522,7 @@ static CURLcode ws_dec_pass(struct ws_decoder *dec,
     result = ws_dec_read_head(dec, data, inraw);
     if(result) {
       if(result != CURLE_AGAIN) {
-        infof(data, "[WS] decode error %d", (int)result);
+        failf(data, "[WS] decode frame error %d", (int)result);
         break;  /* real error */
       }
       /* incomplete ws frame head */
@@ -562,11 +566,11 @@ static void update_meta(struct websocket *ws,
 {
   curl_off_t bytesleft = (payload_len - payload_offset - cur_len);
 
-  ws->frame.age = frame_age;
-  ws->frame.flags = frame_flags;
-  ws->frame.offset = payload_offset;
-  ws->frame.len = cur_len;
-  ws->frame.bytesleft = bytesleft;
+  ws->recvframe.age = frame_age;
+  ws->recvframe.flags = frame_flags;
+  ws->recvframe.offset = payload_offset;
+  ws->recvframe.len = cur_len;
+  ws->recvframe.bytesleft = bytesleft;
 }
 
 /* WebSockets decoding client writer */
@@ -644,6 +648,7 @@ static CURLcode ws_cw_write(struct Curl_easy *data,
   struct websocket *ws;
   CURLcode result;
 
+  CURL_TRC_WRITE(data, "ws_cw_write(len=%zu, type=%d)", nbytes, type);
   if(!(type & CLIENTWRITE_BODY) || data->set.ws_raw_mode)
     return Curl_cwriter_write(data, writer->next, type, buf, nbytes);
 
@@ -674,11 +679,10 @@ static CURLcode ws_cw_write(struct Curl_easy *data,
     if(result == CURLE_AGAIN) {
       /* insufficient amount of data, keep it for later.
        * we pretend to have written all since we have a copy */
-      CURL_TRC_WS(data, "buffered incomplete frame head");
       return CURLE_OK;
     }
     else if(result) {
-      infof(data, "[WS] decode error %d", (int)result);
+      failf(data, "[WS] decode payload error %d", (int)result);
       return result;
     }
   }
@@ -748,36 +752,33 @@ static void ws_enc_init(struct ws_encoder *enc)
      +---------------------------------------------------------------+
 */
 
-static ssize_t ws_enc_write_head(struct Curl_easy *data,
+static CURLcode ws_enc_write_head(struct Curl_easy *data,
                                  struct ws_encoder *enc,
                                  unsigned int flags,
                                  curl_off_t payload_len,
-                                 struct bufq *out,
-                                 CURLcode *err)
+                                 struct bufq *out)
 {
-  unsigned char firstbyte = 0;
+  unsigned char firstb = 0;
   unsigned char head[14];
-  size_t hlen, n;
+  CURLcode result;
+  size_t hlen, nwritten;
 
   if(payload_len < 0) {
     failf(data, "[WS] starting new frame with negative payload length %"
                 FMT_OFF_T, payload_len);
-    *err = CURLE_SEND_ERROR;
-    return -1;
+    return CURLE_SEND_ERROR;
   }
 
   if(enc->payload_remain > 0) {
     /* trying to write a new frame before the previous one is finished */
     failf(data, "[WS] starting new frame with %zd bytes from last one "
                 "remaining to be sent", (ssize_t)enc->payload_remain);
-    *err = CURLE_SEND_ERROR;
-    return -1;
+    return CURLE_SEND_ERROR;
   }
 
-  firstbyte = ws_frame_flags2firstbyte(data, flags, enc->contfragment, err);
-  if(*err) {
-    return -1;
-  }
+  result = ws_frame_flags2firstbyte(data, flags, enc->contfragment, &firstb);
+  if(result)
+    return result;
 
   /* fragmentation only applies to data frames (text/binary);
    * control frames (close/ping/pong) do not affect the CONT status */
@@ -788,23 +789,20 @@ static ssize_t ws_enc_write_head(struct Curl_easy *data,
   if(flags & CURLWS_PING && payload_len > 125) {
     /* The maximum valid size of PING frames is 125 bytes. */
     failf(data, "[WS] given PING frame is too big");
-    *err = CURLE_TOO_LARGE;
-    return -1;
+    return CURLE_TOO_LARGE;
   }
   if(flags & CURLWS_PONG && payload_len > 125) {
     /* The maximum valid size of PONG frames is 125 bytes. */
     failf(data, "[WS] given PONG frame is too big");
-    *err = CURLE_TOO_LARGE;
-    return -1;
+    return CURLE_TOO_LARGE;
   }
   if(flags & CURLWS_CLOSE && payload_len > 125) {
     /* The maximum valid size of CLOSE frames is 125 bytes. */
     failf(data, "[WS] given CLOSE frame is too big");
-    *err = CURLE_TOO_LARGE;
-    return -1;
+    return CURLE_TOO_LARGE;
   }
 
-  head[0] = enc->firstbyte = firstbyte;
+  head[0] = enc->firstbyte = firstb;
   if(payload_len > 65535) {
     head[1] = 127 | WSBIT_MASK;
     head[2] = (unsigned char)((payload_len >> 56) & 0xff);
@@ -837,29 +835,28 @@ static ssize_t ws_enc_write_head(struct Curl_easy *data,
   /* reset for payload to come */
   enc->xori = 0;
 
-  *err = Curl_bufq_write(out, head, hlen, &n);
-  if(*err)
-    return -1;
-  if(n != hlen) {
+  result = Curl_bufq_write(out, head, hlen, &nwritten);
+  if(result)
+    return result;
+  if(nwritten != hlen) {
     /* We use a bufq with SOFT_LIMIT, writing should always succeed */
     DEBUGASSERT(0);
-    *err = CURLE_SEND_ERROR;
-    return -1;
+    return CURLE_SEND_ERROR;
   }
-  return (ssize_t)n;
+  return CURLE_OK;
 }
 
-static ssize_t ws_enc_write_payload(struct ws_encoder *enc,
-                                    struct Curl_easy *data,
-                                    const unsigned char *buf, size_t buflen,
-                                    struct bufq *out, CURLcode *err)
+static CURLcode ws_enc_write_payload(struct ws_encoder *enc,
+                                     struct Curl_easy *data,
+                                     const unsigned char *buf, size_t buflen,
+                                     struct bufq *out, size_t *pnwritten)
 {
+  CURLcode result;
   size_t i, len, n;
 
-  if(Curl_bufq_is_full(out)) {
-    *err = CURLE_AGAIN;
-    return -1;
-  }
+  *pnwritten = 0;
+  if(Curl_bufq_is_full(out))
+    return CURLE_AGAIN;
 
   /* not the most performant way to do this */
   len = buflen;
@@ -868,20 +865,138 @@ static ssize_t ws_enc_write_payload(struct ws_encoder *enc,
 
   for(i = 0; i < len; ++i) {
     unsigned char c = buf[i] ^ enc->mask[enc->xori];
-    *err = Curl_bufq_write(out, &c, 1, &n);
-    if(*err) {
-      if((*err != CURLE_AGAIN) || !i)
-        return -1;
+    result = Curl_bufq_write(out, &c, 1, &n);
+    if(result) {
+      if((result != CURLE_AGAIN) || !i)
+        return result;
       break;
     }
     enc->xori++;
     enc->xori &= 3;
   }
+  *pnwritten = i;
   enc->payload_remain -= (curl_off_t)i;
   ws_enc_info(enc, data, "buffered");
-  return (ssize_t)i;
+  return CURLE_OK;
+}
+
+
+
+struct cr_ws_ctx {
+  struct Curl_creader super;
+  BIT(read_eos);  /* we read an EOS from the next reader */
+  BIT(eos);       /* we have returned an EOS */
+};
+
+static CURLcode cr_ws_init(struct Curl_easy *data, struct Curl_creader *reader)
+{
+  (void)data;
+  (void)reader;
+  return CURLE_OK;
+}
+
+static void cr_ws_close(struct Curl_easy *data, struct Curl_creader *reader)
+{
+  (void)data;
+  (void)reader;
 }
 
+static CURLcode cr_ws_read(struct Curl_easy *data,
+                           struct Curl_creader *reader,
+                           char *buf, size_t blen,
+                           size_t *pnread, bool *peos)
+{
+  struct cr_ws_ctx *ctx = reader->ctx;
+  CURLcode result = CURLE_OK;
+  size_t nread, n;
+  struct websocket *ws;
+  bool eos;
+
+  *pnread = 0;
+  if(ctx->eos) {
+    *peos = TRUE;
+    return CURLE_OK;
+  }
+
+  ws = Curl_conn_meta_get(data->conn, CURL_META_PROTO_WS_CONN);
+  if(!ws) {
+    failf(data, "[WS] not a websocket transfer");
+    return CURLE_FAILED_INIT;
+  }
+
+  if(Curl_bufq_is_empty(&ws->sendbuf)) {
+    if(ctx->read_eos) {
+      ctx->eos = TRUE;
+      *peos = TRUE;
+      return CURLE_OK;
+    }
+
+    if(ws->enc.payload_remain) {
+      CURL_TRC_WS(data, "current frame, %" FMT_OFF_T " remaining",
+                  ws->enc.payload_remain);
+      if(ws->enc.payload_remain < (curl_off_t)blen)
+        blen = (size_t)ws->enc.payload_remain;
+    }
+
+    result = Curl_creader_read(data, reader->next, buf, blen, &nread, &eos);
+    if(result)
+      return result;
+    ctx->read_eos = eos;
+
+    if(!nread) {
+      /* nothing to convert, return this right away */
+      if(ctx->read_eos)
+        ctx->eos = TRUE;
+      *pnread = nread;
+      *peos = ctx->eos;
+      goto out;
+    }
+
+    if(!ws->enc.payload_remain) {
+      /* encode the data as a new BINARY frame */
+      result = ws_enc_write_head(data, &ws->enc, CURLWS_BINARY, nread,
+                                 &ws->sendbuf);
+      if(result)
+        goto out;
+    }
+
+    result = ws_enc_write_payload(&ws->enc, data, (unsigned char *)buf,
+                                  nread, &ws->sendbuf, &n);
+    if(result)
+      goto out;
+    CURL_TRC_READ(data, "cr_ws_read, added %zu payload, len=%zu", nread, n);
+  }
+
+  DEBUGASSERT(!Curl_bufq_is_empty(&ws->sendbuf));
+  *peos = FALSE;
+  result = Curl_bufq_cread(&ws->sendbuf, buf, blen, pnread);
+  if(!result && ctx->read_eos && Curl_bufq_is_empty(&ws->sendbuf)) {
+    /* no more data, read all, done. */
+    ctx->eos = TRUE;
+    *peos = TRUE;
+  }
+
+out:
+  CURL_TRC_READ(data, "cr_ws_read(len=%zu) -> %d, nread=%zu, eos=%d",
+                blen, result, *pnread, *peos);
+  return result;
+}
+
+static const struct Curl_crtype ws_cr_encode = {
+  "ws-encode",
+  cr_ws_init,
+  cr_ws_read,
+  cr_ws_close,
+  Curl_creader_def_needs_rewind,
+  Curl_creader_def_total_length,
+  Curl_creader_def_resume_from,
+  Curl_creader_def_rewind,
+  Curl_creader_def_unpause,
+  Curl_creader_def_is_paused,
+  Curl_creader_def_done,
+  sizeof(struct cr_ws_ctx)
+};
+
 
 struct wsfield {
   const char *name;
@@ -968,7 +1083,8 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
 {
   struct SingleRequest *k = &data->req;
   struct websocket *ws;
-  struct Curl_cwriter *ws_dec_writer;
+  struct Curl_cwriter *ws_dec_writer = NULL;
+  struct Curl_creader *ws_enc_reader = NULL;
   CURLcode result;
 
   DEBUGASSERT(data->conn);
@@ -1044,13 +1160,11 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
   result = Curl_cwriter_create(&ws_dec_writer, data, &ws_cw_decode,
                                CURL_CW_CONTENT_DECODE);
   if(result)
-    return result;
-
+    goto out;
   result = Curl_cwriter_add(data, ws_dec_writer);
-  if(result) {
-    Curl_cwriter_free(data, ws_dec_writer);
-    return result;
-  }
+  if(result)
+    goto out;
+  ws_dec_writer = NULL; /* owned by transfer now */
 
   if(data->set.connect_only) {
     size_t nwritten;
@@ -1060,18 +1174,55 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
     result = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)mem,
                              nread, &nwritten);
     if(result)
-      return result;
+      goto out;
     DEBUGASSERT(nread == nwritten);
-    infof(data, "%zu bytes websocket payload", nread);
+    k->keepon &= ~KEEP_RECV; /* read no more content */
   }
   else { /* !connect_only */
+    if(data->set.method == HTTPREQ_PUT) {
+      CURL_TRC_WS(data, "UPLOAD set, add ws-encode reader");
+      result = Curl_creader_set_fread(data, -1);
+      if(result)
+        goto out;
+
+      if(!data->set.ws_raw_mode) {
+        /* Add our client readerr encoding WS BINARY frames */
+        result = Curl_creader_create(&ws_enc_reader, data, &ws_cr_encode,
+                                     CURL_CR_CONTENT_ENCODE);
+        if(result)
+          goto out;
+        result = Curl_creader_add(data, ws_enc_reader);
+        if(result)
+          goto out;
+        ws_enc_reader = NULL; /* owned by transfer now */
+      }
+
+      /* start over with sending */
+      data->req.eos_read = FALSE;
+      k->keepon |= KEEP_SEND;
+    }
+
     /* And pass any additional data to the writers */
     if(nread) {
       result = Curl_client_write(data, CLIENTWRITE_BODY, mem, nread);
+      if(result)
+        goto out;
     }
   }
+
   k->upgr101 = UPGR101_RECEIVED;
+  k->header = FALSE; /* we will not get more responses */
 
+out:
+  if(ws_dec_writer)
+    Curl_cwriter_free(data, ws_dec_writer);
+  if(ws_enc_reader)
+    Curl_creader_free(data, ws_enc_reader);
+  if(result)
+    CURL_TRC_WS(data, "Curl_ws_accept() failed -> %d", result);
+  else
+    CURL_TRC_WS(data, "websocket established, %s mode",
+                data->set.connect_only ? "connect-only" : "callback");
   return result;
 }
 
@@ -1226,11 +1377,12 @@ CURLcode curl_ws_recv(CURL *d, void *buffer,
   /* update frame information to be passed back */
   update_meta(ws, ctx.frame_age, ctx.frame_flags, ctx.payload_offset,
               ctx.payload_len, ctx.bufidx);
-  *metap = &ws->frame;
-  *nread = ws->frame.len;
+  *metap = &ws->recvframe;
+  *nread = ws->recvframe.len;
   CURL_TRC_WS(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
                FMT_OFF_T ", %" FMT_OFF_T " left)",
-               buflen, *nread, ws->frame.offset, ws->frame.bytesleft);
+               buflen, *nread, ws->recvframe.offset,
+               ws->recvframe.bytesleft);
   return CURLE_OK;
 }
 
@@ -1378,7 +1530,7 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
 {
   struct websocket *ws;
   const unsigned char *buffer = buffer_arg;
-  ssize_t n;
+  size_t n;
   CURLcode result = CURLE_OK;
   struct Curl_easy *data = d;
 
@@ -1387,7 +1539,16 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
   CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" FMT_OFF_T
               ", flags=%x), raw=%d",
               buflen, fragsize, flags, data->set.ws_raw_mode);
-  *sent = 0;
+
+  if(sent)
+    *sent = 0;
+
+  if(!buffer && buflen) {
+    failf(data, "[WS] buffer is NULL when buflen is not");
+    result = CURLE_BAD_FUNCTION_ARGUMENT;
+    goto out;
+  }
+
   if(!data->conn && data->set.connect_only) {
     result = Curl_connect_only_attach(data);
     if(result)
@@ -1412,6 +1573,14 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
     if(result)
       goto out;
 
+    if(!buffer) {
+      failf(data, "[WS] buffer is NULL in raw mode");
+      return CURLE_BAD_FUNCTION_ARGUMENT;
+    }
+    if(!sent) {
+      failf(data, "[WS] sent is NULL in raw mode");
+      return CURLE_BAD_FUNCTION_ARGUMENT;
+    }
     if(fragsize || flags) {
       failf(data, "[WS] fragsize and flags must be zero in raw mode");
       return CURLE_BAD_FUNCTION_ARGUMENT;
@@ -1446,16 +1615,18 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
   }
   else {
     /* starting a new frame, we want a clean sendbuf */
-    curl_off_t payload_len = (flags & CURLWS_OFFSET) ?
-                             fragsize : (curl_off_t)buflen;
     result = ws_flush(data, ws, Curl_is_in_callback(data));
     if(result)
       goto out;
 
-    n = ws_enc_write_head(data, &ws->enc, flags, payload_len,
-                          &ws->sendbuf, &result);
-    if(n < 0)
+    result = ws_enc_write_head(data, &ws->enc, flags,
+                               (flags & CURLWS_OFFSET) ?
+                               fragsize : (curl_off_t)buflen,
+                               &ws->sendbuf);
+    if(result) {
+      CURL_TRC_WS(data, "curl_ws_send(), error writing frame head %d", result);
       goto out;
+    }
   }
 
   /* While there is either sendbuf to flush OR more payload to encode... */
@@ -1463,11 +1634,11 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
     /* Try to add more payload to sendbuf */
     if(buflen > ws->sendbuf_payload) {
       size_t prev_len = Curl_bufq_len(&ws->sendbuf);
-      n = ws_enc_write_payload(&ws->enc, data,
-                               buffer + ws->sendbuf_payload,
-                               buflen - ws->sendbuf_payload,
-                               &ws->sendbuf, &result);
-      if(n < 0 && (result != CURLE_AGAIN))
+      result = ws_enc_write_payload(&ws->enc, data,
+                                    buffer + ws->sendbuf_payload,
+                                    buflen - ws->sendbuf_payload,
+                                    &ws->sendbuf, &n);
+      if(result && (result != CURLE_AGAIN))
         goto out;
       ws->sendbuf_payload += Curl_bufq_len(&ws->sendbuf) - prev_len;
       if(!ws->sendbuf_payload) {
@@ -1479,7 +1650,8 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
     /* flush, blocking when in callback */
     result = ws_flush(data, ws, Curl_is_in_callback(data));
     if(!result && ws->sendbuf_payload > 0) {
-      *sent += ws->sendbuf_payload;
+      if(sent)
+        *sent += ws->sendbuf_payload;
       buffer += ws->sendbuf_payload;
       buflen -= ws->sendbuf_payload;
       ws->sendbuf_payload = 0;
@@ -1489,7 +1661,8 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
         /* blocked, part of payload bytes remain, report length
          * that we managed to send. */
         size_t flushed = (ws->sendbuf_payload - Curl_bufq_len(&ws->sendbuf));
-        *sent += flushed;
+        if(sent)
+          *sent += flushed;
         ws->sendbuf_payload -= flushed;
         result = CURLE_OK;
         goto out;
@@ -1499,7 +1672,7 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
          * OK on 0-length send (caller counts only payload) and EAGAIN */
         CURL_TRC_WS(data, "EAGAIN flushing sendbuf, payload_encoded: %zu/%zu",
                     ws->sendbuf_payload, buflen);
-        DEBUGASSERT(*sent == 0);
+        DEBUGASSERT(!sent || *sent == 0);
         result = CURLE_AGAIN;
         goto out;
       }
@@ -1511,7 +1684,8 @@ CURLcode curl_ws_send(CURL *d, const void *buffer_arg,
 out:
   CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" FMT_OFF_T
               ", flags=%x, raw=%d) -> %d, %zu",
-              buflen, fragsize, flags, data->set.ws_raw_mode, result, *sent);
+              buflen, fragsize, flags, data->set.ws_raw_mode, result,
+              sent ? *sent : 0);
   return result;
 }
 
@@ -1537,12 +1711,63 @@ const struct curl_ws_frame *curl_ws_meta(CURL *d)
     struct websocket *ws;
     ws = Curl_conn_meta_get(data->conn, CURL_META_PROTO_WS_CONN);
     if(ws)
-      return &ws->frame;
+      return &ws->recvframe;
 
   }
   return NULL;
 }
 
+CURL_EXTERN CURLcode curl_ws_start_frame(CURL *d,
+                                         unsigned int flags,
+                                         curl_off_t frame_len)
+{
+  struct websocket *ws;
+  CURLcode result = CURLE_OK;
+  struct Curl_easy *data = d;
+
+  if(!GOOD_EASY_HANDLE(data))
+    return CURLE_BAD_FUNCTION_ARGUMENT;
+  if(data->set.ws_raw_mode) {
+    failf(data, "cannot curl_ws_start_frame() with CURLWS_RAW_MODE enabled");
+    return CURLE_FAILED_INIT;
+  }
+
+  CURL_TRC_WS(data, "curl_start_frame(flags=%x, frame_len=%" FMT_OFF_T,
+              flags, frame_len);
+
+  if(!data->conn) {
+    failf(data, "[WS] No associated connection");
+    result = CURLE_SEND_ERROR;
+    goto out;
+  }
+  ws = Curl_conn_meta_get(data->conn, CURL_META_PROTO_WS_CONN);
+  if(!ws) {
+    failf(data, "[WS] Not a websocket transfer");
+    result = CURLE_SEND_ERROR;
+    goto out;
+  }
+
+  if(data->set.ws_raw_mode) {
+    failf(data, "[WS] cannot start frame in raw mode");
+    result = CURLE_SEND_ERROR;
+    goto out;
+  }
+
+  if(ws->enc.payload_remain) {
+    failf(data, "[WS] previous frame not finished");
+    result = CURLE_SEND_ERROR;
+    goto out;
+  }
+
+  result = ws_enc_write_head(data, &ws->enc, flags, frame_len, &ws->sendbuf);
+  if(result)
+    CURL_TRC_WS(data, "curl_start_frame(), error  adding frame head %d",
+                result);
+
+out:
+  return result;
+}
+
 const struct Curl_handler Curl_handler_ws = {
   "WS",                                 /* scheme */
   ws_setup_conn,                        /* setup_connection */
@@ -1631,4 +1856,15 @@ const struct curl_ws_frame *curl_ws_meta(CURL *data)
   (void)data;
   return NULL;
 }
+
+CURL_EXTERN CURLcode curl_ws_start_frame(CURL *curl,
+                                         unsigned int flags,
+                                         curl_off_t frame_len)
+{
+  (void)curl;
+  (void)flags;
+  (void)frame_len;
+  return CURLE_NOT_BUILT_IN;
+}
+
 #endif /* !CURL_DISABLE_WEBSOCKETS */
index b47c85c99589e7175467f41fd156ba5a6cbd9767..8240d46e5e1feed88b5b9cc45c9854082a57124a 100755 (executable)
@@ -155,6 +155,7 @@ my %api = (
     'curl_ws_meta' => 'API',
     'curl_ws_recv' => 'API',
     'curl_ws_send' => 'API',
+    'curl_ws_start_frame' => 'API',
 
     # the following functions are provided globally in debug builds
     'curl_easy_perform_ev' => 'debug-build',
index 73668d9994925749a391b3c5e0b0db7d0322796a..d3262a1d3a3de5007d1220e84d088865fb9fc5d5 100644 (file)
@@ -124,6 +124,7 @@ curl_url_set
 curl_url_strerror
 curl_ws_recv
 curl_ws_send
+curl_ws_start_frame
 curl_ws_meta
 </stdout>
 </verify>
index 6cb941dac367081c0c8d27b340e4388d7fe379c9..18a38e04d05e372a0f8dfca46c9878b5f589ad0f 100644 (file)
@@ -40,7 +40,7 @@ http
 WebSockets upgrade only
 </name>
 <command>
-ws://%HOSTIP:%HTTPPORT/%TESTNUMBER
+-T . ws://%HOSTIP:%HTTPPORT/%TESTNUMBER
 </command>
 </client>
 
index 60b97069ce06b8b52043394dfcd1eeabc466bcfc..c28b610f5861bca9305c32afe2ef0b77945bdc91 100644 (file)
@@ -126,43 +126,63 @@ class TestWebsockets:
         r = client.run(args=[url, payload])
         r.check_exit_code(100)  # CURLE_TOO_LARGE
 
-    def test_20_04_data_small(self, env: Env, ws_echo):
+    @pytest.mark.parametrize("model", [
+        pytest.param(1, id='multi_perform'),
+        pytest.param(2, id='curl_ws_send+recv'),
+    ])
+    def test_20_04_data_small(self, env: Env, ws_echo, model):
         client = LocalClient(env=env, name='cli_ws_data')
         if not client.exists():
             pytest.skip(f'example client not built: {client.name}')
         url = f'ws://localhost:{env.ws_port}/'
-        r = client.run(args=['-m', str(0), '-M', str(10), url])
+        r = client.run(args=[f'-{model}', '-m', str(0), '-M', str(10), url])
         r.check_exit_code(0)
 
-    def test_20_05_data_med(self, env: Env, ws_echo):
+    @pytest.mark.parametrize("model", [
+        pytest.param(1, id='multi_perform'),
+        pytest.param(2, id='curl_ws_send+recv'),
+    ])
+    def test_20_05_data_med(self, env: Env, ws_echo, model):
         client = LocalClient(env=env, name='cli_ws_data')
         if not client.exists():
             pytest.skip(f'example client not built: {client.name}')
         url = f'ws://localhost:{env.ws_port}/'
-        r = client.run(args=['-m', str(120), '-M', str(130), url])
+        r = client.run(args=[f'-{model}', '-m', str(120), '-M', str(130), url])
         r.check_exit_code(0)
 
-    def test_20_06_data_large(self, env: Env, ws_echo):
+    @pytest.mark.parametrize("model", [
+        pytest.param(1, id='multi_perform'),
+        pytest.param(2, id='curl_ws_send+recv'),
+    ])
+    def test_20_06_data_large(self, env: Env, ws_echo, model):
         client = LocalClient(env=env, name='cli_ws_data')
         if not client.exists():
             pytest.skip(f'example client not built: {client.name}')
         url = f'ws://localhost:{env.ws_port}/'
-        r = client.run(args=['-m', str(65535 - 5), '-M', str(65535 + 5), url])
+        r = client.run(args=[f'-{model}', '-m', str(65535 - 5), '-M', str(65535 + 5), url])
         r.check_exit_code(0)
 
-    def test_20_07_data_large_small_recv(self, env: Env, ws_echo):
+    @pytest.mark.parametrize("model", [
+        pytest.param(1, id='multi_perform'),
+        pytest.param(2, id='curl_ws_send+recv'),
+    ])
+    def test_20_07_data_large_small_recv(self, env: Env, ws_echo, model):
         run_env = os.environ.copy()
         run_env['CURL_WS_CHUNK_SIZE'] = '1024'
         client = LocalClient(env=env, name='cli_ws_data', run_env=run_env)
         if not client.exists():
             pytest.skip(f'example client not built: {client.name}')
         url = f'ws://localhost:{env.ws_port}/'
-        r = client.run(args=['-m', str(65535 - 5), '-M', str(65535 + 5), url])
+        r = client.run(args=[f'-{model}', '-m', str(65535 - 5), '-M', str(65535 + 5), url])
         r.check_exit_code(0)
 
     # Send large frames and simulate send blocking on 8192 bytes chunks
     # Simlates error reported in #15865
-    def test_20_08_data_very_large(self, env: Env, ws_echo):
+    @pytest.mark.parametrize("model", [
+        pytest.param(1, id='multi_perform'),
+        pytest.param(2, id='curl_ws_send+recv'),
+    ])
+    def test_20_08_data_very_large(self, env: Env, ws_echo, model):
         run_env = os.environ.copy()
         run_env['CURL_WS_CHUNK_EAGAIN'] = '8192'
         client = LocalClient(env=env, name='cli_ws_data', run_env=run_env)
@@ -171,5 +191,5 @@ class TestWebsockets:
         url = f'ws://localhost:{env.ws_port}/'
         count = 10
         large = 20000
-        r = client.run(args=['-c', str(count), '-m', str(large), url])
+        r = client.run(args=[f'-{model}', '-c', str(count), '-m', str(large), url])
         r.check_exit_code(0)
index 82717415b9fc884742016a7ba6b21139e70976aa..501c01eae2e8ea1cb587e287ada3f16f160a0c74 100644 (file)
 
 #ifndef CURL_DISABLE_WEBSOCKETS
 
-static CURLcode check_recv(const struct curl_ws_frame *frame,
-                           size_t r_offset, size_t nread, size_t exp_len)
+static CURLcode
+test_ws_data_m2_check_recv(const struct curl_ws_frame *frame,
+                           size_t r_offset, size_t nread,
+                           size_t exp_len)
 {
   if(!frame)
     return CURLE_OK;
@@ -67,9 +69,13 @@ static CURLcode check_recv(const struct curl_ws_frame *frame,
   return CURLE_OK;
 }
 
-static CURLcode data_echo(CURL *curl, size_t count,
-                          size_t plen_min, size_t plen_max)
+/* WebSocket Mode 2: CONNECT_ONLY 2, curl_ws_send()/curl_ws_recv() */
+static CURLcode test_ws_data_m2_echo(const char *url,
+                                     size_t count,
+                                     size_t plen_min,
+                                     size_t plen_max)
 {
+  CURL *curl = NULL;
   CURLcode r = CURLE_OK;
   const struct curl_ws_frame *frame;
   size_t len;
@@ -83,11 +89,27 @@ static CURLcode data_echo(CURL *curl, size_t count,
     r = CURLE_OUT_OF_MEMORY;
     goto out;
   }
-
   for(i = 0; i < plen_max; ++i) {
     send_buf[i] = (char)('0' + ((int)i % 10));
   }
 
+  curl = curl_easy_init();
+  if(!curl) {
+    r = CURLE_OUT_OF_MEMORY;
+    goto out;
+  }
+
+  curl_easy_setopt(curl, CURLOPT_URL, url);
+
+  /* use the callback style */
+  curl_easy_setopt(curl, CURLOPT_USERAGENT, "ws-data");
+  curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
+  curl_easy_setopt(curl, CURLOPT_CONNECT_ONLY, 2L); /* websocket style */
+  r = curl_easy_perform(curl);
+  curl_mfprintf(stderr, "curl_easy_perform() returned %u\n", (int)r);
+  if(r != CURLE_OK)
+    goto out;
+
   for(len = plen_min; len <= plen_max; ++len) {
     size_t nwritten, nread, slen = len, rlen = len;
     char *sbuf = send_buf, *rbuf = recv_buf;
@@ -123,7 +145,7 @@ static CURLcode data_echo(CURL *curl, size_t count,
           curl_mfprintf(stderr, "curl_ws_recv(len=%zu) -> %d, %zu (%ld/%zu) "
                         "\n", rlen, r, nread, (long)(len - rlen), len);
           if(!r) {
-            r = check_recv(frame, len - rlen, nread, len);
+            r = test_ws_data_m2_check_recv(frame, len - rlen, nread, len);
             if(r)
               goto out;
           }
@@ -141,7 +163,7 @@ static CURLcode data_echo(CURL *curl, size_t count,
 
       if(rblock && sblock) {
         curl_mfprintf(stderr, "EAGAIN, sleep, try again\n");
-        curlx_wait_ms(100);
+        curlx_wait_ms(1);
       }
     }
 
@@ -157,14 +179,158 @@ static CURLcode data_echo(CURL *curl, size_t count,
   }
 
 out:
-  if(!r)
-    ws_close(curl);
+  if(curl) {
+    if(!r)
+      ws_close(curl);
+    curl_easy_cleanup(curl);
+  }
   free(send_buf);
   free(recv_buf);
   return r;
 }
 
-static void usage_ws_data(const char *msg)
+struct test_ws_m1_ctx {
+  CURL *easy;
+  char *send_buf;
+  char *recv_buf;
+  size_t send_len, nsent;
+  size_t recv_len, nrcvd;
+};
+
+static size_t test_ws_data_m1_read(char *buf, size_t nitems, size_t buflen,
+                                   void *userdata)
+{
+  struct test_ws_m1_ctx *ctx = userdata;
+  size_t len = nitems * buflen;
+  size_t left = ctx->send_len - ctx->nsent;
+
+  curl_mfprintf(stderr, "m1_read(len=%zu, left=%zu)\n", len, left);
+  if(left) {
+    if(left > len)
+      left = len;
+    memcpy(buf, ctx->send_buf + ctx->nsent, left);
+    ctx->nsent += left;
+    return left;
+  }
+  return CURL_READFUNC_PAUSE;
+}
+
+static size_t test_ws_data_m1_write(char *buf, size_t nitems, size_t buflen,
+                                    void *userdata)
+{
+  struct test_ws_m1_ctx *ctx = userdata;
+  size_t len = nitems * buflen;
+
+  curl_mfprintf(stderr, "m1_write(len=%zu)\n", len);
+  if(len > (ctx->recv_len - ctx->nrcvd))
+    return CURL_WRITEFUNC_ERROR;
+  memcpy(ctx->recv_buf + ctx->nrcvd, buf, len);
+  ctx->nrcvd += len;
+  return len;
+}
+
+/* WebSocket Mode 1: multi handle, READ/WRITEFUNCTION use */
+static CURLcode test_ws_data_m1_echo(const char *url,
+                                     size_t count,
+                                     size_t plen_min,
+                                     size_t plen_max)
+{
+  CURLM *multi = NULL;
+  CURLcode r = CURLE_OK;
+  struct test_ws_m1_ctx m1_ctx;
+  size_t i, len;
+
+  memset(&m1_ctx, 0, sizeof(m1_ctx));
+  m1_ctx.send_buf = calloc(1, plen_max + 1);
+  m1_ctx.recv_buf = calloc(1, plen_max + 1);
+  if(!m1_ctx.send_buf || !m1_ctx.recv_buf) {
+    r = CURLE_OUT_OF_MEMORY;
+    goto out;
+  }
+  for(i = 0; i < plen_max; ++i) {
+    m1_ctx.send_buf[i] = (char)('0' + ((int)i % 10));
+  }
+
+  multi = curl_multi_init();
+  if(!multi) {
+    r = CURLE_OUT_OF_MEMORY;
+    goto out;
+  }
+
+  m1_ctx.easy = curl_easy_init();
+  if(!m1_ctx.easy) {
+    r = CURLE_OUT_OF_MEMORY;
+    goto out;
+  }
+
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_URL, url);
+  /* use the callback style */
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_USERAGENT, "ws-data");
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_VERBOSE, 1L);
+  /* we want to send */
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_UPLOAD, 1L);
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_READFUNCTION, test_ws_data_m1_read);
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_READDATA, &m1_ctx);
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_WRITEFUNCTION, test_ws_data_m1_write);
+  curl_easy_setopt(m1_ctx.easy, CURLOPT_WRITEDATA, &m1_ctx);
+
+  curl_multi_add_handle(multi, m1_ctx.easy);
+
+  for(len = plen_min; len <= plen_max; ++len) {
+    /* init what we want to send and expect to receive */
+    m1_ctx.send_len = len;
+    m1_ctx.nsent = 0;
+    m1_ctx.recv_len = len;
+    m1_ctx.nrcvd = 0;
+    memset(m1_ctx.recv_buf, 0, plen_max);
+    curl_easy_pause(m1_ctx.easy, CURLPAUSE_CONT);
+
+    for(i = 0; i < count; ++i) {
+      while(1) {
+        int still_running; /* keep number of running handles */
+        CURLMcode mc = curl_multi_perform(multi, &still_running);
+
+        if(!still_running || (m1_ctx.nrcvd == m1_ctx.recv_len)) {
+          /* got the full echo back or failed */
+          break;
+        }
+
+        if(!mc && still_running) {
+          mc = curl_multi_poll(multi, NULL, 0, 1, NULL);
+        }
+        if(mc) {
+          r = CURLE_RECV_ERROR;
+          goto out;
+        }
+
+      }
+
+      if(memcmp(m1_ctx.send_buf, m1_ctx.recv_buf, m1_ctx.send_len)) {
+        curl_mfprintf(stderr, "recv_data: data differs\n");
+        debug_dump("", "expected:", stderr,
+                   (unsigned char *)m1_ctx.send_buf, m1_ctx.send_len, 0);
+        debug_dump("", "received:", stderr,
+                   (unsigned char *)m1_ctx.recv_buf, m1_ctx.nrcvd, 0);
+        r = CURLE_RECV_ERROR;
+        goto out;
+      }
+
+    }
+  }
+
+out:
+  if(multi)
+    curl_multi_cleanup(multi);
+  if(m1_ctx.easy) {
+    curl_easy_cleanup(m1_ctx.easy);
+  }
+  free(m1_ctx.send_buf);
+  free(m1_ctx.recv_buf);
+  return r;
+}
+
+
+static void test_ws_data_usage(const char *msg)
 {
   if(msg)
     curl_mfprintf(stderr, "%s\n", msg);
@@ -180,18 +346,23 @@ static void usage_ws_data(const char *msg)
 static CURLcode test_cli_ws_data(const char *URL)
 {
 #ifndef CURL_DISABLE_WEBSOCKETS
-  CURL *curl;
   CURLcode res = CURLE_OK;
   const char *url;
   size_t plen_min = 0, plen_max = 0, count = 1;
-  int ch;
+  int ch, model = 2;
 
   (void)URL;
 
-  while((ch = cgetopt(test_argc, test_argv, "c:hm:M:")) != -1) {
+  while((ch = cgetopt(test_argc, test_argv, "12c:hm:M:")) != -1) {
     switch(ch) {
+    case '1':
+      model = 1;
+      break;
+    case '2':
+      model = 2;
+      break;
     case 'h':
-      usage_ws_data(NULL);
+      test_ws_data_usage(NULL);
       res = CURLE_BAD_FUNCTION_ARGUMENT;
       goto cleanup;
     case 'c':
@@ -204,7 +375,7 @@ static CURLcode test_cli_ws_data(const char *URL)
       plen_max = (size_t)strtol(coptarg, NULL, 10);
       break;
     default:
-      usage_ws_data("invalid option");
+      test_ws_data_usage("invalid option");
       res = CURLE_BAD_FUNCTION_ARGUMENT;
       goto cleanup;
     }
@@ -223,7 +394,7 @@ static CURLcode test_cli_ws_data(const char *URL)
   }
 
   if(test_argc != 1) {
-    usage_ws_data(NULL);
+    test_ws_data_usage(NULL);
     res = CURLE_BAD_FUNCTION_ARGUMENT;
     goto cleanup;
   }
@@ -231,22 +402,10 @@ static CURLcode test_cli_ws_data(const char *URL)
 
   curl_global_init(CURL_GLOBAL_ALL);
 
-  curl = curl_easy_init();
-  if(curl) {
-    curl_easy_setopt(curl, CURLOPT_URL, url);
-
-    /* use the callback style */
-    curl_easy_setopt(curl, CURLOPT_USERAGENT, "ws-data");
-    curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
-    curl_easy_setopt(curl, CURLOPT_CONNECT_ONLY, 2L); /* websocket style */
-    res = curl_easy_perform(curl);
-    curl_mfprintf(stderr, "curl_easy_perform() returned %u\n", res);
-    if(res == CURLE_OK)
-      res = data_echo(curl, count, plen_min, plen_max);
-
-    /* always cleanup */
-    curl_easy_cleanup(curl);
-  }
+  if(model == 1)
+    res = test_ws_data_m1_echo(url, count, plen_min, plen_max);
+  else
+    res = test_ws_data_m2_echo(url, count, plen_min, plen_max);
 
 cleanup:
   curl_global_cleanup();