]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
Websocket en-/decoding
authorStefan Eissing <stefan@eissing.org>
Tue, 18 Apr 2023 13:02:34 +0000 (15:02 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Tue, 25 Apr 2023 21:16:51 +0000 (23:16 +0200)
- state is fully kept at connection, since curl_ws_send() and
  curl_ws_rec() have lifetime beyond usual transfers
- no more limit on frame sizes

Reported-by: simplerobot on github
Fixes #10962
Closes #10999

lib/easy.c
lib/easyif.h
lib/http.h
lib/sendf.c
lib/urldata.h
lib/ws.c
lib/ws.h
tests/libtest/lib2304.c

index f3581410d5cef8d5d1b98b9301e3206b89ba5329..a6c32f51e6d1fbd032bfa465b18510dab63c30e0 100644 (file)
@@ -1223,6 +1223,26 @@ CURLcode curl_easy_recv(struct Curl_easy *data, void *buffer, size_t buflen,
   return CURLE_OK;
 }
 
+#ifdef USE_WEBSOCKETS
+CURLcode Curl_connect_only_attach(struct Curl_easy *data)
+{
+  curl_socket_t sfd;
+  CURLcode result;
+  struct connectdata *c = NULL;
+
+  result = easy_connection(data, &sfd, &c);
+  if(result)
+    return result;
+
+  if(!data->conn)
+    /* on first invoke, the transfer has been detached from the connection and
+       needs to be reattached */
+    Curl_attach_connection(data, c);
+
+  return CURLE_OK;
+}
+#endif /* USE_WEBSOCKETS */
+
 /*
  * Sends data over the connected socket.
  *
index 570ebef326ffcbee5033f62039f95247da5dc402..64489529660cbe2c57915996360da044851a1412 100644 (file)
 CURLcode Curl_senddata(struct Curl_easy *data, const void *buffer,
                        size_t buflen, ssize_t *n);
 
+#ifdef USE_WEBSOCKETS
+CURLcode Curl_connect_only_attach(struct Curl_easy *data);
+#endif
+
 #ifdef CURLDEBUG
 CURL_EXTERN CURLcode curl_easy_perform_ev(struct Curl_easy *easy);
 #endif
index 5fde9ce79b80344ae34c2e55d920cd90d210e983..4d9bbce28c85555c207045653881a838a83777a0 100644 (file)
@@ -212,10 +212,6 @@ struct HTTP {
     HTTPSEND_BODY     /* sending body */
   } sending;
 
-#ifdef USE_WEBSOCKETS
-  struct websocket ws;
-#endif
-
 #ifndef CURL_DISABLE_HTTP
   void *h2_ctx;              /* HTTP/2 implementation context */
   void *h3_ctx;              /* HTTP/3 implementation context */
index 2b082716871ef3ce1dd8a7870bf183ea8cde3cd2..81ee86482630e22b35593746e94ff050c1872ac9 100644 (file)
@@ -271,10 +271,8 @@ static CURLcode chop_write(struct Curl_easy *data,
   if(type & CLIENTWRITE_BODY) {
 #ifdef USE_WEBSOCKETS
     if(conn->handler->protocol & (CURLPROTO_WS|CURLPROTO_WSS)) {
-      struct HTTP *ws = data->req.p.http;
       writebody = Curl_ws_writecb;
-      ws->ws.data = data;
-      writebody_ptr = ws;
+      writebody_ptr = data;
     }
     else
 #endif
index 9aea4427806ad0e43ffa5edf987ab4cf42c95f23..d93ee970693969e3567d9906d009f8178e078482 100644 (file)
@@ -1030,7 +1030,7 @@ struct connectdata {
     struct mqtt_conn mqtt;
 #endif
 #ifdef USE_WEBSOCKETS
-    struct ws_conn ws;
+    struct websocket *ws;
 #endif
   } proto;
 
index dc1fa57517256f3a1395746ab50f0796d138fe23..7b28f5794f3e48db3f28c7dd7fb80eb6616424d7 100644 (file)
--- a/lib/ws.c
+++ b/lib/ws.c
 #ifdef USE_WEBSOCKETS
 
 #include "urldata.h"
+#include "bufq.h"
 #include "dynbuf.h"
 #include "rand.h"
 #include "curl_base64.h"
+#include "connect.h"
 #include "sendf.h"
 #include "multiif.h"
 #include "ws.h"
 #include "curl_memory.h"
 #include "memdebug.h"
 
+
+#define WSBIT_FIN 0x80
+#define WSBIT_OPCODE_CONT  0
+#define WSBIT_OPCODE_TEXT  (1)
+#define WSBIT_OPCODE_BIN   (2)
+#define WSBIT_OPCODE_CLOSE (8)
+#define WSBIT_OPCODE_PING  (9)
+#define WSBIT_OPCODE_PONG  (0xa)
+#define WSBIT_OPCODE_MASK  (0xf)
+
+#define WSBIT_MASK 0x80
+
+/* buffer dimensioning */
+#define WS_CHUNK_SIZE 65535
+#define WS_CHUNK_COUNT 2
+
+struct ws_frame_meta {
+  char proto_opcode;
+  int flags;
+  const char *name;
+};
+
+static struct ws_frame_meta WS_FRAMES[] = {
+  { WSBIT_OPCODE_CONT,  CURLWS_CONT,   "CONT" },
+  { WSBIT_OPCODE_TEXT,  CURLWS_TEXT,   "TEXT" },
+  { WSBIT_OPCODE_BIN,   CURLWS_BINARY, "BIN" },
+  { WSBIT_OPCODE_CLOSE, CURLWS_CLOSE,  "CLOSE" },
+  { WSBIT_OPCODE_PING,  CURLWS_PING,   "PING" },
+  { WSBIT_OPCODE_PONG,  CURLWS_PONG,   "PONG" },
+};
+
+static const char *ws_frame_name_of_op(unsigned char proto_opcode)
+{
+  unsigned char opcode = proto_opcode & WSBIT_OPCODE_MASK;
+  size_t i;
+  for(i = 0; i < sizeof(WS_FRAMES)/sizeof(WS_FRAMES[0]); ++i) {
+    if(WS_FRAMES[i].proto_opcode == opcode)
+      return WS_FRAMES[i].name;
+  }
+  return "???";
+}
+
+static int ws_frame_op2flags(unsigned char proto_opcode)
+{
+  unsigned char opcode = proto_opcode & WSBIT_OPCODE_MASK;
+  size_t i;
+  for(i = 0; i < sizeof(WS_FRAMES)/sizeof(WS_FRAMES[0]); ++i) {
+    if(WS_FRAMES[i].proto_opcode == opcode)
+      return WS_FRAMES[i].flags;
+  }
+  return 0;
+}
+
+static unsigned char ws_frame_flags2op(int flags)
+{
+  size_t i;
+  for(i = 0; i < sizeof(WS_FRAMES)/sizeof(WS_FRAMES[0]); ++i) {
+    if(WS_FRAMES[i].flags & flags)
+      return WS_FRAMES[i].proto_opcode;
+  }
+  return 0;
+}
+
+static void ws_dec_info(struct ws_decoder *dec, struct Curl_easy *data,
+                        const char *msg)
+{
+  switch(dec->head_len) {
+  case 0:
+    break;
+  case 1:
+    infof(data, "WS-DEC: %s [%s%s]", msg,
+          ws_frame_name_of_op(dec->head[0]),
+          (dec->head[0] & WSBIT_FIN)? "" : " NON-FINAL");
+    break;
+  default:
+    if(dec->head_len < dec->head_total) {
+      infof(data, "WS-DEC: %s [%s%s](%d/%d)", msg,
+            ws_frame_name_of_op(dec->head[0]),
+            (dec->head[0] & WSBIT_FIN)? "" : " NON-FINAL",
+            dec->head_len, dec->head_total);
+    }
+    else {
+      infof(data, "WS-DEC: %s [%s%s payload=%zd/%zd]", msg,
+            ws_frame_name_of_op(dec->head[0]),
+            (dec->head[0] & WSBIT_FIN)? "" : " NON-FINAL",
+            dec->payload_offset, dec->payload_len);
+    }
+    break;
+  }
+}
+
+typedef ssize_t ws_write_payload(const unsigned char *buf, size_t buflen,
+                                 int frame_age, int frame_flags,
+                                 curl_off_t payload_offset,
+                                 curl_off_t payload_len,
+                                 void *userp,
+                                 CURLcode *err);
+
+
+static void ws_dec_reset(struct ws_decoder *dec)
+{
+  dec->frame_age = 0;
+  dec->frame_flags = 0;
+  dec->payload_offset = 0;
+  dec->payload_len = 0;
+  dec->head_len = dec->head_total = 0;
+  dec->state = WS_DEC_INIT;
+}
+
+static void ws_dec_init(struct ws_decoder *dec)
+{
+  ws_dec_reset(dec);
+}
+
+static CURLcode ws_dec_read_head(struct ws_decoder *dec,
+                                 struct Curl_easy *data,
+                                 struct bufq *inraw)
+{
+  const unsigned char *inbuf;
+  size_t inlen;
+
+  while(Curl_bufq_peek(inraw, &inbuf, &inlen)) {
+    if(dec->head_len == 0) {
+      dec->head[0] = *inbuf;
+      Curl_bufq_skip(inraw, 1);
+
+      dec->frame_flags  = ws_frame_op2flags(dec->head[0]);
+      if(!dec->frame_flags) {
+        failf(data, "WS: unknown opcode: %x", dec->head[0]);
+        ws_dec_reset(dec);
+        return CURLE_RECV_ERROR;
+      }
+      dec->head_len = 1;
+      /* ws_dec_info(dec, data, "seeing opcode"); */
+      continue;
+    }
+    else if(dec->head_len == 1) {
+      dec->head[1] = *inbuf;
+      Curl_bufq_skip(inraw, 1);
+      dec->head_len = 2;
+
+      if(dec->head[1] & WSBIT_MASK) {
+        /* A client MUST close a connection if it detects a masked frame. */
+        failf(data, "WS: masked input frame");
+        ws_dec_reset(dec);
+        return CURLE_RECV_ERROR;
+      }
+      /* How long is the frame head? */
+      if(dec->head[1] == 126) {
+        dec->head_total = 4;
+        continue;
+      }
+      else if(dec->head[1] == 127) {
+        dec->head_total = 10;
+        continue;
+      }
+      else {
+        dec->head_total = 2;
+      }
+    }
+
+    if(dec->head_len < dec->head_total) {
+      dec->head[dec->head_len] = *inbuf;
+      Curl_bufq_skip(inraw, 1);
+      ++dec->head_len;
+      if(dec->head_len < dec->head_total) {
+        /* ws_dec_info(dec, data, "decoding head"); */
+        continue;
+      }
+    }
+    /* got the complete frame head */
+    DEBUGASSERT(dec->head_len == dec->head_total);
+    switch(dec->head_total) {
+    case 2:
+      dec->payload_len = dec->head[1];
+      break;
+    case 4:
+      dec->payload_len = (dec->head[2] << 8) | dec->head[3];
+      break;
+    case 10:
+      dec->payload_len = ((curl_off_t)dec->head[2] << 56) |
+        (curl_off_t)dec->head[3] << 48 |
+        (curl_off_t)dec->head[4] << 40 |
+        (curl_off_t)dec->head[5] << 32 |
+        (curl_off_t)dec->head[6] << 24 |
+        (curl_off_t)dec->head[7] << 16 |
+        (curl_off_t)dec->head[8] << 8 |
+        dec->head[9];
+      break;
+    default:
+      /* this should never happen */
+      DEBUGASSERT(0);
+      failf(data, "WS: unexpected frame header length");
+      return CURLE_RECV_ERROR;
+    }
+
+    dec->frame_age = 0;
+    dec->payload_offset = 0;
+    ws_dec_info(dec, data, "decoded");
+    return CURLE_OK;
+  }
+  return CURLE_AGAIN;
+}
+
+static CURLcode ws_dec_pass_payload(struct ws_decoder *dec,
+                                    struct Curl_easy *data,
+                                    struct bufq *inraw,
+                                    ws_write_payload *write_payload,
+                                    void *write_ctx)
+{
+  const unsigned char *inbuf;
+  size_t inlen;
+  ssize_t nwritten;
+  CURLcode result;
+  curl_off_t remain = dec->payload_len - dec->payload_offset;
+
+  (void)data;
+  while(remain && Curl_bufq_peek(inraw, &inbuf, &inlen)) {
+    if((curl_off_t)inlen > remain)
+      inlen = (size_t)remain;
+    nwritten = write_payload(inbuf, inlen, dec->frame_age, dec->frame_flags,
+                             dec->payload_offset, dec->payload_len,
+                             write_ctx, &result);
+    if(nwritten < 0)
+      return result;
+    Curl_bufq_skip(inraw, (size_t)nwritten);
+    dec->payload_offset += (curl_off_t)nwritten;
+    remain = dec->payload_len - dec->payload_offset;
+    /* infof(data, "WS-DEC: passed  %zd bytes payload, %zd remain",
+          nwritten, remain); */
+  }
+
+  return remain? CURLE_AGAIN : CURLE_OK;
+}
+
+static CURLcode ws_dec_pass(struct ws_decoder *dec,
+                            struct Curl_easy *data,
+                            struct bufq *inraw,
+                            ws_write_payload *write_payload,
+                            void *write_ctx)
+{
+  CURLcode result;
+
+  if(Curl_bufq_is_empty(inraw))
+    return CURLE_AGAIN;
+
+  switch(dec->state) {
+  case WS_DEC_INIT:
+    ws_dec_reset(dec);
+    dec->state = WS_DEC_HEAD;
+    /* FALLTHROUGH */
+  case WS_DEC_HEAD:
+    result = ws_dec_read_head(dec, data, inraw);
+    if(result) {
+      if(result != CURLE_AGAIN) {
+        infof(data, "WS: decode error %d", (int)result);
+        break;  /* real error */
+      }
+      /* incomplete ws frame head */
+      DEBUGASSERT(Curl_bufq_is_empty(inraw));
+      break;
+    }
+    /* head parsing done */
+    dec->state = WS_DEC_PAYLOAD;
+    if(dec->payload_len == 0) {
+      ssize_t nwritten;
+      const unsigned char tmp = '\0';
+      /* special case of a 0 length frame, need to write once */
+      nwritten = write_payload(&tmp, 0, dec->frame_age, dec->frame_flags,
+                               0, 0, write_ctx, &result);
+      if(nwritten < 0)
+        return result;
+      dec->state = WS_DEC_INIT;
+      break;
+    }
+    /* FALLTHROUGH */
+  case WS_DEC_PAYLOAD:
+    result = ws_dec_pass_payload(dec, data, inraw, write_payload, write_ctx);
+    ws_dec_info(dec, data, "passing");
+    if(result)
+      return result;
+    /* paylod parsing done */
+    dec->state = WS_DEC_INIT;
+    break;
+  default:
+    /* we covered all enums above, but some code analyzers are whimps */
+    result = CURLE_FAILED_INIT;
+  }
+  return result;
+}
+
+static void update_meta(struct websocket *ws,
+                        int frame_age, int frame_flags,
+                        curl_off_t payload_offset,
+                        curl_off_t payload_len,
+                        size_t cur_len)
+{
+  ws->frame.age = frame_age;
+  ws->frame.flags = frame_flags;
+  ws->frame.offset = payload_offset;
+  ws->frame.len = cur_len;
+  ws->frame.bytesleft = (payload_len - payload_offset - cur_len);
+}
+
+static void ws_enc_info(struct ws_encoder *enc, struct Curl_easy *data,
+                        const char *msg)
+{
+  infof(data, "WS-ENC: %s [%s%s%s payload=%zd/%zd]", msg,
+        ws_frame_name_of_op(enc->firstbyte),
+        (enc->firstbyte & WSBIT_OPCODE_CONT)? " CONT" : "",
+        (enc->firstbyte & WSBIT_FIN)? "" : " NON-FIN",
+        enc->payload_len - enc->payload_remain, enc->payload_len);
+}
+
+static void ws_enc_reset(struct ws_encoder *enc)
+{
+  enc->payload_remain = 0;
+  enc->xori = 0;
+  enc->contfragment = FALSE;
+}
+
+static void ws_enc_init(struct ws_encoder *enc)
+{
+  ws_enc_reset(enc);
+}
+
+/***
+    RFC 6455 Section 5.2
+
+      0                   1                   2                   3
+      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+     +-+-+-+-+-------+-+-------------+-------------------------------+
+     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
+     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
+     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
+     | |1|2|3|       |K|             |                               |
+     +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
+     |     Extended payload length continued, if payload len == 127  |
+     + - - - - - - - - - - - - - - - +-------------------------------+
+     |                               |Masking-key, if MASK set to 1  |
+     +-------------------------------+-------------------------------+
+     | Masking-key (continued)       |          Payload Data         |
+     +-------------------------------- - - - - - - - - - - - - - - - +
+     :                     Payload Data continued ...                :
+     + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+     |                     Payload Data continued ...                |
+     +---------------------------------------------------------------+
+*/
+
+static ssize_t ws_enc_write_head(struct Curl_easy *data,
+                                 struct ws_encoder *enc,
+                                 unsigned int flags,
+                                 curl_off_t payload_len,
+                                 struct bufq *out,
+                                 CURLcode *err)
+{
+  unsigned char firstbyte = 0;
+  unsigned char opcode;
+  unsigned char head[14];
+  size_t hlen;
+  ssize_t n;
+
+  if(enc->payload_remain > 0) {
+    /* trying to write a new frame before the previous one is finished */
+    failf(data, "WS: starting new frame with %zd bytes from last one"
+                "remaining to be sent", (ssize_t)enc->payload_remain);
+    *err = CURLE_SEND_ERROR;
+    return -1;
+  }
+
+  opcode = ws_frame_flags2op(flags);
+  if(!opcode) {
+    failf(data, "WS: provided flags not recognized '%x'", flags);
+    *err = CURLE_SEND_ERROR;
+    return -1;
+  }
+
+  if(!(flags & CURLWS_CONT)) {
+    if(!enc->contfragment)
+      /* not marked as continuing, this is the final fragment */
+      firstbyte |= WSBIT_FIN | opcode;
+    else
+      /* marked as continuing, this is the final fragment; set CONT
+         opcode and FIN bit */
+      firstbyte |= WSBIT_FIN | WSBIT_OPCODE_CONT;
+
+    enc->contfragment = FALSE;
+  }
+  else if(enc->contfragment) {
+    /* the previous fragment was not a final one and this isn't either, keep a
+       CONT opcode and no FIN bit */
+    firstbyte |= WSBIT_OPCODE_CONT;
+  }
+  else {
+    firstbyte = opcode;
+    enc->contfragment = TRUE;
+  }
+
+  head[0] = enc->firstbyte = firstbyte;
+  if(payload_len > 65535) {
+    head[1] = 127 | WSBIT_MASK;
+    head[2] = (unsigned char)((payload_len >> 56) & 0xff);
+    head[3] = (unsigned char)((payload_len >> 48) & 0xff);
+    head[4] = (unsigned char)((payload_len >> 40) & 0xff);
+    head[5] = (unsigned char)((payload_len >> 32) & 0xff);
+    head[6] = (unsigned char)((payload_len >> 24) & 0xff);
+    head[7] = (unsigned char)((payload_len >> 16) & 0xff);
+    head[8] = (unsigned char)((payload_len >> 8) & 0xff);
+    head[9] = (unsigned char)(payload_len & 0xff);
+    hlen = 10;
+  }
+  else if(payload_len > 126) {
+    head[1] = 126 | WSBIT_MASK;
+    head[2] = (unsigned char)((payload_len >> 8) & 0xff);
+    head[3] = (unsigned char)(payload_len & 0xff);
+    hlen = 4;
+  }
+  else {
+    head[1] = (unsigned char)payload_len | WSBIT_MASK;
+    hlen = 2;
+  }
+
+  enc->payload_remain = enc->payload_len = payload_len;
+  ws_enc_info(enc, data, "sending");
+
+  /* add 4 bytes mask */
+  memcpy(&head[hlen], &enc->mask, 4);
+  hlen += 4;
+  /* reset for payload to come */
+  enc->xori = 0;
+
+  n = Curl_bufq_write(out, head, hlen, err);
+  if(n < 0)
+    return -1;
+  if((size_t)n != hlen) {
+    /* We use a bufq with SOFT_LIMIT, writing should always succeed */
+    DEBUGASSERT(0);
+    *err = CURLE_SEND_ERROR;
+    return -1;
+  }
+  return n;
+}
+
+static ssize_t ws_enc_write_payload(struct ws_encoder *enc,
+                                    struct Curl_easy *data,
+                                    const unsigned char *buf, size_t buflen,
+                                    struct bufq *out, CURLcode *err)
+{
+  ssize_t n;
+  size_t i, len;
+
+  if(Curl_bufq_is_full(out)) {
+    *err = CURLE_AGAIN;
+    return -1;
+  }
+
+  /* not the most performant way to do this */
+  len = buflen;
+  if((curl_off_t)len > enc->payload_remain)
+    len = (size_t)enc->payload_remain;
+
+  for(i = 0; i < len; ++i) {
+    unsigned char c = buf[i] ^ enc->mask[enc->xori];
+    n = Curl_bufq_write(out, &c, 1, err);
+    if(n < 0) {
+      if((*err != CURLE_AGAIN) || !i)
+        return -1;
+      break;
+    }
+    enc->xori++;
+    enc->xori &= 3;
+  }
+  enc->payload_remain -= (curl_off_t)i;
+  ws_enc_info(enc, data, "buffered");
+  return (ssize_t)i;
+}
+
+
 struct wsfield {
   const char *name;
   const char *val;
@@ -111,7 +591,6 @@ CURLcode Curl_ws_request(struct Curl_easy *data, REQTYPE *req)
     }
   }
   k->upgr101 = UPGR101_WS;
-  Curl_dyn_init(&data->req.p.http->ws.buf, MAX_WS_SIZE * 2);
   return result;
 }
 
@@ -123,12 +602,27 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
                         const char *mem, size_t nread)
 {
   struct SingleRequest *k = &data->req;
-  struct HTTP *ws = data->req.p.http;
-  struct connectdata *conn = data->conn;
-  struct websocket *wsp = &data->req.p.http->ws;
-  struct ws_conn *wsc = &conn->proto.ws;
+  struct websocket *ws;
   CURLcode result;
 
+  DEBUGASSERT(data->conn);
+  ws = data->conn->proto.ws;
+  if(!ws) {
+    ws = calloc(1, sizeof(*ws));
+    if(!ws)
+      return CURLE_OUT_OF_MEMORY;
+    data->conn->proto.ws = ws;
+    Curl_bufq_init(&ws->recvbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT);
+    Curl_bufq_init2(&ws->sendbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT,
+                    BUFQ_OPT_SOFT_LIMIT);
+    ws_dec_init(&ws->dec);
+    ws_enc_init(&ws->enc);
+  }
+  else {
+    Curl_bufq_reset(&ws->recvbuf);
+    ws_dec_reset(&ws->dec);
+    ws_enc_reset(&ws->enc);
+  }
   /* Verify the Sec-WebSocket-Accept response.
 
      The sent value is the base64 encoded version of a SHA-1 hash done on the
@@ -149,169 +643,74 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
      the WebSocket Connection. */
 
   /* 4 bytes random */
-  result = Curl_rand(data, (unsigned char *)&ws->ws.mask, sizeof(ws->ws.mask));
+
+  result = Curl_rand(data, (unsigned char *)&ws->enc.mask,
+                     sizeof(ws->enc.mask));
   if(result)
     return result;
-
   infof(data, "Received 101, switch to WebSocket; mask %02x%02x%02x%02x",
-        ws->ws.mask[0], ws->ws.mask[1], ws->ws.mask[2], ws->ws.mask[3]);
-  Curl_dyn_init(&wsc->early, data->set.buffer_size);
-  if(nread) {
-    result = Curl_dyn_addn(&wsc->early, mem, nread);
-    if(result)
+        ws->enc.mask[0], ws->enc.mask[1], ws->enc.mask[2], ws->enc.mask[3]);
+
+  if(data->set.connect_only) {
+    ssize_t nwritten;
+    /* In CONNECT_ONLY setup, the payloads from `mem` need to be received
+     * when using `curl_ws_recv` later on after this transfer is already
+     * marked as DONE. */
+    nwritten = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)mem,
+                               nread, &result);
+    if(nwritten < 0)
       return result;
     infof(data, "%zu bytes websocket payload", nread);
-    wsp->stillb = Curl_dyn_ptr(&wsc->early);
-    wsp->stillblen = Curl_dyn_len(&wsc->early);
   }
   k->upgr101 = UPGR101_RECEIVED;
 
   return result;
 }
 
-#define WSBIT_FIN 0x80
-#define WSBIT_OPCODE_CONT  0
-#define WSBIT_OPCODE_TEXT  (1)
-#define WSBIT_OPCODE_BIN   (2)
-#define WSBIT_OPCODE_CLOSE (8)
-#define WSBIT_OPCODE_PING  (9)
-#define WSBIT_OPCODE_PONG  (0xa)
-#define WSBIT_OPCODE_MASK  (0xf)
-
-#define WSBIT_MASK 0x80
-
-/* remove the spent bytes from the beginning of the buffer as that part has
-   now been delivered to the application */
-static void ws_decode_shift(struct Curl_easy *data, size_t spent)
-{
-  struct websocket *wsp = &data->req.p.http->ws;
-  size_t len = Curl_dyn_len(&wsp->buf);
-  size_t keep = len - spent;
-  DEBUGASSERT(len >= spent);
-  Curl_dyn_tail(&wsp->buf, keep);
-}
-
-/* ws_decode() decodes a binary frame into structured WebSocket data,
-
-   data - the transfer
-   inbuf - incoming raw data. If NULL, work on the already buffered data.
-   inlen - size of the provided data, perhaps too little, perhaps too much
-   headlen - stored length of the frame header
-   olen - stored length of the extracted data
-   oleft - number of unread bytes pending to that belongs to this frame
-   flags - stored bitmask about the frame
-
-   Returns CURLE_AGAIN if there is only a partial frame in the buffer. Then it
-   stores the first part in the ->extra buffer to be used in the next call
-   when more data is provided.
-*/
-
-static CURLcode ws_decode(struct Curl_easy *data,
-                          unsigned char *inbuf, size_t inlen,
-                          size_t *headlen, size_t *olen,
-                          curl_off_t *oleft,
-                          unsigned int *flags)
+static ssize_t ws_client_write(const unsigned char *buf, size_t buflen,
+                               int frame_age, int frame_flags,
+                               curl_off_t payload_offset,
+                               curl_off_t payload_len,
+                               void *userp,
+                               CURLcode *err)
 {
-  bool fin;
-  unsigned char opcode;
-  curl_off_t total;
-  size_t dataindex = 2;
-  curl_off_t payloadsize;
-
-  *olen = *headlen = 0;
-
-  if(inlen < 2) {
-    /* the smallest possible frame is two bytes */
-    infof(data, "WS: plen == %u, EAGAIN", (int)inlen);
-    return CURLE_AGAIN;
-  }
-
-  fin = inbuf[0] & WSBIT_FIN;
-  opcode = inbuf[0] & WSBIT_OPCODE_MASK;
-  infof(data, "WS:%d received FIN bit %u", __LINE__, (int)fin);
-  *flags = 0;
-  switch(opcode) {
-  case WSBIT_OPCODE_CONT:
-    if(!fin)
-      *flags |= CURLWS_CONT;
-    infof(data, "WS: received OPCODE CONT");
-    break;
-  case WSBIT_OPCODE_TEXT:
-    infof(data, "WS: received OPCODE TEXT");
-    *flags |= CURLWS_TEXT;
-    break;
-  case WSBIT_OPCODE_BIN:
-    infof(data, "WS: received OPCODE BINARY");
-    *flags |= CURLWS_BINARY;
-    break;
-  case WSBIT_OPCODE_CLOSE:
-    infof(data, "WS: received OPCODE CLOSE");
-    *flags |= CURLWS_CLOSE;
-    break;
-  case WSBIT_OPCODE_PING:
-    infof(data, "WS: received OPCODE PING");
-    *flags |= CURLWS_PING;
-    break;
-  case WSBIT_OPCODE_PONG:
-    infof(data, "WS: received OPCODE PONG");
-    *flags |= CURLWS_PONG;
-    break;
-  default:
-    failf(data, "WS: unknown opcode: %x", opcode);
-    return CURLE_RECV_ERROR;
+  struct Curl_easy *data = userp;
+  struct websocket *ws;
+  size_t wrote;
+  curl_off_t remain = (payload_len - (payload_offset + buflen));
+
+  (void)frame_age;
+  if(!data->conn || !data->conn->proto.ws) {
+    *err = CURLE_FAILED_INIT;
+    return -1;
   }
-
-  if(inbuf[1] & WSBIT_MASK) {
-    /* A client MUST close a connection if it detects a masked frame. */
-    failf(data, "WS: masked input frame");
-    return CURLE_RECV_ERROR;
+  ws = data->conn->proto.ws;
+
+  if((frame_flags & CURLWS_PING) && !remain) {
+    /* auto-respond to PINGs, only works for single-frame payloads atm */
+    size_t bytes;
+    infof(data, "WS: auto-respond to PING with a PONG");
+    /* send back the exact same content as a PONG */
+    *err = curl_ws_send(data, buf, buflen, &bytes, 0, CURLWS_PONG);
+    if(*err)
+      return -1;
   }
-  payloadsize = inbuf[1];
-  if(payloadsize == 126) {
-    if(inlen < 4) {
-      infof(data, "WS:%d plen == %u, EAGAIN", __LINE__, (int)inlen);
-      return CURLE_AGAIN; /* not enough data available */
-    }
-    payloadsize = (inbuf[2] << 8) | inbuf[3];
-    dataindex += 2;
-  }
-  else if(payloadsize == 127) {
-    /* 64 bit payload size */
-    if(inlen < 10)
-      return CURLE_AGAIN;
-    if(inbuf[2] & 80) {
-      failf(data, "WS: too large frame");
-      return CURLE_RECV_ERROR;
+  else if(buflen || !remain) {
+    /* deliver the decoded frame to the user callback. The application
+     * may invoke curl_ws_meta() to access frame information. */
+    update_meta(ws, frame_age, frame_flags, payload_offset,
+                payload_len, buflen);
+    Curl_set_in_callback(data, true);
+    wrote = data->set.fwrite_func((char *)buf, 1,
+                                  buflen, data->set.out);
+    Curl_set_in_callback(data, false);
+    if(wrote != buflen) {
+      *err = CURLE_RECV_ERROR;
+      return -1;
     }
-    dataindex += 8;
-    payloadsize = ((curl_off_t)inbuf[2] << 56) |
-      (curl_off_t)inbuf[3] << 48 |
-      (curl_off_t)inbuf[4] << 40 |
-      (curl_off_t)inbuf[5] << 32 |
-      (curl_off_t)inbuf[6] << 24 |
-      (curl_off_t)inbuf[7] << 16 |
-      (curl_off_t)inbuf[8] << 8 |
-      inbuf[9];
-  }
-
-  /* point to the payload */
-  *headlen = dataindex;
-  total = dataindex + payloadsize;
-  if(total > (curl_off_t)inlen) {
-    /* buffer contains partial frame */
-    *olen = inlen - dataindex; /* bytes to write out */
-    *oleft = total - inlen;    /* bytes yet to come (for this frame) */
-    payloadsize = total - dataindex;
-  }
-  else {
-    /* we have the complete frame (`total` bytes) in buffer */
-    *olen = payloadsize;    /* bytes to write out */
-    *oleft = 0;             /* bytes yet to come (for this frame) */
   }
-
-  infof(data, "WS: received %Ou bytes payload (%Ou left, buflen was %zu)",
-        payloadsize, *oleft, inlen);
-  return CURLE_OK;
+  *err = CURLE_OK;
+  return (ssize_t)buflen;
 }
 
 /* Curl_ws_writecb() is the write callback for websocket traffic. The
@@ -321,98 +720,149 @@ static CURLcode ws_decode(struct Curl_easy *data,
 size_t Curl_ws_writecb(char *buffer, size_t size /* 1 */,
                        size_t nitems, void *userp)
 {
-  struct HTTP *ws = (struct HTTP *)userp;
-  struct Curl_easy *data = ws->ws.data;
-  struct websocket *wsp = &data->req.p.http->ws;
-  void *writebody_ptr = data->set.out;
+  struct Curl_easy *data = userp;
+
   if(data->set.ws_raw_mode)
-    return data->set.fwrite_func(buffer, size, nitems, writebody_ptr);
+    return data->set.fwrite_func(buffer, size, nitems, data->set.out);
   else if(nitems) {
-    size_t wrote = 0, headlen;
+    struct websocket *ws;
     CURLcode result;
 
+    if(!data->conn || !data->conn->proto.ws) {
+      failf(data, "WS: not a websocket transfer");
+      return nitems - 1;
+    }
+    ws = data->conn->proto.ws;
+
     if(buffer) {
-      result = Curl_dyn_addn(&wsp->buf, buffer, nitems);
-      if(result) {
+      ssize_t nwritten;
+
+      nwritten = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)buffer,
+                                 nitems, &result);
+      if(nwritten < 0) {
         infof(data, "WS: error adding data to buffer %d", (int)result);
         return nitems - 1;
       }
       buffer = NULL;
     }
 
-    while(Curl_dyn_len(&wsp->buf)) {
-      unsigned char *wsbuf = Curl_dyn_uptr(&wsp->buf);
-      size_t buflen = Curl_dyn_len(&wsp->buf);
-      size_t write_len = 0;
-      size_t consumed = 0;
-
-      if(!ws->ws.frame.bytesleft) {
-        unsigned int recvflags;
-        curl_off_t fb_left;
-
-        result = ws_decode(data, wsbuf, buflen,
-                           &headlen, &write_len, &fb_left, &recvflags);
-        if(result == CURLE_AGAIN)
-          /* insufficient amount of data, keep it for later.
-           * we pretend to have written all since we have a copy */
-          return nitems;
-        else if(result) {
-          infof(data, "WS: decode error %d", (int)result);
-          return nitems - 1;
-        }
-        consumed += headlen;
-        wsbuf += headlen;
-        buflen -= headlen;
-
-        /* New frame. store details about the frame to be reachable with
-           curl_ws_meta() from within the write callback */
-        ws->ws.frame.age = 0;
-        ws->ws.frame.offset = 0;
-        ws->ws.frame.flags = recvflags;
-        ws->ws.frame.bytesleft = fb_left;
-      }
-      else {
-        /* continuing frame */
-        write_len = (size_t)ws->ws.frame.bytesleft;
-        if(write_len > buflen)
-          write_len = buflen;
-        ws->ws.frame.offset += write_len;
-        ws->ws.frame.bytesleft -= write_len;
-      }
-      if((ws->ws.frame.flags & CURLWS_PING) && !ws->ws.frame.bytesleft) {
-        /* auto-respond to PINGs, only works for single-frame payloads atm */
-        size_t bytes;
-        infof(data, "WS: auto-respond to PING with a PONG");
-        /* send back the exact same content as a PONG */
-        result = curl_ws_send(data, wsbuf, write_len,
-                              &bytes, 0, CURLWS_PONG);
-        if(result)
-          return result;
-      }
-      else if(write_len || !wsp->frame.bytesleft) {
-        /* deliver the decoded frame to the user callback */
-        Curl_set_in_callback(data, true);
-        wrote = data->set.fwrite_func((char *)wsbuf, 1,
-                                      write_len, writebody_ptr);
-        Curl_set_in_callback(data, false);
-        if(wrote != write_len)
-          return 0;
+    while(!Curl_bufq_is_empty(&ws->recvbuf)) {
+
+      result = ws_dec_pass(&ws->dec, data, &ws->recvbuf,
+                           ws_client_write, data);
+      if(result == CURLE_AGAIN)
+        /* insufficient amount of data, keep it for later.
+         * we pretend to have written all since we have a copy */
+        return nitems;
+      else if(result) {
+        infof(data, "WS: decode error %d", (int)result);
+        return nitems - 1;
       }
-      /* get rid of the buffered data consumed */
-      consumed += write_len;
-      ws_decode_shift(data, consumed);
     }
   }
   return nitems;
 }
 
+struct ws_collect {
+  struct Curl_easy *data;
+  void *buffer;
+  size_t buflen;
+  size_t bufidx;
+  int frame_age;
+  int frame_flags;
+  curl_off_t payload_offset;
+  curl_off_t payload_len;
+  bool written;
+};
+
+static ssize_t ws_client_collect(const unsigned char *buf, size_t buflen,
+                                 int frame_age, int frame_flags,
+                                 curl_off_t payload_offset,
+                                 curl_off_t payload_len,
+                                 void *userp,
+                                 CURLcode *err)
+{
+  struct ws_collect *ctx = userp;
+  size_t nwritten;
+  curl_off_t remain = (payload_len - (payload_offset + buflen));
+
+  if((frame_flags & CURLWS_PING) && !remain) {
+    /* auto-respond to PINGs, only works for single-frame payloads atm */
+    size_t bytes;
+    infof(ctx->data, "WS: auto-respond to PING with a PONG");
+    /* send back the exact same content as a PONG */
+    *err = curl_ws_send(ctx->data, buf, buflen, &bytes, 0, CURLWS_PONG);
+    if(*err)
+      return -1;
+    nwritten = bytes;
+  }
+  else {
+    ctx->written = TRUE;
+    DEBUGASSERT(ctx->buflen >= ctx->bufidx);
+    nwritten = CURLMIN(buflen, ctx->buflen - ctx->bufidx);
+    if(!nwritten) {
+      if(!buflen) {  /* 0 length write, we accept that */
+        *err = CURLE_OK;
+        return 0;
+      }
+      *err = CURLE_AGAIN;  /* no more space */
+      return -1;
+    }
+    *err = CURLE_OK;
+    memcpy(ctx->buffer, buf, nwritten);
+    if(!ctx->bufidx) {
+      /* first write */
+      ctx->frame_age = frame_age;
+      ctx->frame_flags = frame_flags;
+      ctx->payload_offset = payload_offset;
+      ctx->payload_len = payload_len;
+    }
+    ctx->bufidx += nwritten;
+  }
+  return nwritten;
+}
+
+static ssize_t nw_in_recv(void *reader_ctx,
+                          unsigned char *buf, size_t buflen,
+                          CURLcode *err)
+{
+  struct Curl_easy *data = reader_ctx;
+  size_t nread;
+
+  *err = curl_easy_recv(data, buf, buflen, &nread);
+  if(*err)
+    return *err;
+  return (ssize_t)nread;
+}
+
 CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer,
                                   size_t buflen, size_t *nread,
                                   struct curl_ws_frame **metap)
 {
-  CURLcode result;
-  struct websocket *wsp = &data->req.p.http->ws;
+  struct connectdata *conn = data->conn;
+  struct websocket *ws;
   bool done = FALSE; /* not filled passed buffer yet */
+  struct ws_collect ctx;
+  CURLcode result;
+
+  if(!conn) {
+    /* Unhappy hack with lifetimes of transfers and connection */
+    if(!data->set.connect_only) {
+      failf(data, "CONNECT_ONLY is required");
+      return CURLE_UNSUPPORTED_PROTOCOL;
+    }
+
+    Curl_getconnectinfo(data, &conn);
+    if(!conn) {
+      failf(data, "connection not found");
+      return CURLE_BAD_FUNCTION_ARGUMENT;
+    }
+  }
+  ws = conn->proto.ws;
+  if(!ws) {
+    failf(data, "connection is not setup for websocket");
+    return CURLE_BAD_FUNCTION_ARGUMENT;
+  }
 
   *nread = 0;
   *metap = NULL;
@@ -421,221 +871,95 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer,
   if(result)
     return result;
 
+  memset(&ctx, 0, sizeof(ctx));
+  ctx.data = data;
+  ctx.buffer = buffer;
+  ctx.buflen = buflen;
+
   while(!done) {
-    size_t datalen;
-    unsigned int recvflags;
-
-    if(!wsp->stillblen || (result == CURLE_AGAIN)) {
-      /* try to get more data */
-      size_t n;
-      result = curl_easy_recv(data, &data->state.buffer[wsp->stillblen],
-                              data->set.buffer_size - wsp->stillblen, &n);
-      if(result)
+    /* receive more when our buffer is empty */
+    if(Curl_bufq_is_empty(&ws->recvbuf)) {
+      ssize_t n = Curl_bufq_slurp(&ws->recvbuf, nw_in_recv, data, &result);
+      if(n < 0) {
         return result;
-      if(!n) {
+      }
+      else if(n == 0) {
         /* connection closed */
         infof(data, "connection expectedly closed?");
         return CURLE_GOT_NOTHING;
       }
-      wsp->stillb = data->state.buffer;
-      wsp->stillblen += n;
     }
 
-    infof(data, "WS: %zu bytes left to decode", wsp->stillblen);
-    if(!wsp->frame.bytesleft) {
-      size_t headlen;
-      curl_off_t oleft;
-      /* detect new frame */
-      result = ws_decode(data, (unsigned char *)wsp->stillb, wsp->stillblen,
-                         &headlen, &datalen, &oleft, &recvflags);
-      if(result == CURLE_AGAIN)
-        /* a packet fragment only, loop and try reading more */
-        continue;
-      else if(result)
-        return result;
-      if(datalen > buflen) {
-        size_t diff = datalen - buflen;
-        datalen = buflen;
-        oleft += diff;
+    result = ws_dec_pass(&ws->dec, data, &ws->recvbuf,
+                         ws_client_collect, &ctx);
+    if(result == CURLE_AGAIN) {
+      if(!ctx.written) {
+        ws_dec_info(&ws->dec, data, "need more input");
+        continue;  /* nothing written, try more input */
       }
-      wsp->stillb += headlen;
-      wsp->stillblen -= headlen;
-      wsp->frame.offset = 0;
-      wsp->frame.bytesleft = oleft;
-      wsp->frame.flags = recvflags;
-    }
-    else {
-      /* existing frame, remaining payload handling */
-      datalen = wsp->frame.bytesleft;
-      if(datalen > wsp->stillblen)
-        datalen = wsp->stillblen;
-      if(datalen > buflen)
-        datalen = buflen;
-
-      wsp->frame.offset += wsp->frame.len;
-      wsp->frame.bytesleft -= datalen;
+      done = TRUE;
+      break;
     }
-    wsp->frame.len = datalen;
-
-    /* auto-respond to PINGs */
-    if((wsp->frame.flags & CURLWS_PING) && !wsp->frame.bytesleft) {
-      size_t nsent = 0;
-      infof(data, "WS: auto-respond to PING with a PONG, %zu bytes payload",
-            datalen);
-      /* send back the exact same content as a PONG */
-      result = curl_ws_send(data, wsp->stillb, datalen, &nsent, 0,
-                            CURLWS_PONG);
-      if(result)
-        return result;
-      infof(data, "WS: bytesleft %zu datalen %zu",
-            wsp->frame.bytesleft, datalen);
-      /* we handled the data part of the PING, advance over that */
-      wsp->stillb += nsent;
-      wsp->stillblen -= nsent;
+    else if(result) {
+      return result;
     }
-    else if(datalen) {
-      /* copy the payload to the user buffer */
-      memcpy(buffer, wsp->stillb, datalen);
-      *nread = datalen;
+    else if(ctx.written) {
+      /* The decoded frame is passed back to our caller.
+       * There are frames like PING were we auto-respond to and
+       * that we do not return. For these `ctx.written` is not set. */
       done = TRUE;
-
-      wsp->stillblen -= datalen;
-      if(wsp->stillblen)
-        wsp->stillb += datalen;
-      else {
-        wsp->stillb = NULL;
-      }
+      break;
     }
   }
-  *metap = &wsp->frame;
-  return CURLE_OK;
-}
 
-static void ws_xor(struct Curl_easy *data,
-                   const unsigned char *source,
-                   unsigned char *dest,
-                   size_t len)
-{
-  struct websocket *wsp = &data->req.p.http->ws;
-  size_t i;
-  /* append payload after the mask, XOR appropriately */
-  for(i = 0; i < len; i++) {
-    dest[i] = source[i] ^ wsp->mask[wsp->xori];
-    wsp->xori++;
-    wsp->xori &= 3;
-  }
+  /* update frame information to be passed back */
+  update_meta(ws, ctx.frame_age, ctx.frame_flags, ctx.payload_offset,
+              ctx.payload_len, ctx.bufidx);
+  *metap = &ws->frame;
+  *nread = ws->frame.len;
+  /* infof(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %zd, %zd left)",
+        buflen, *nread, ws->frame.offset, ws->frame.bytesleft); */
+  return CURLE_OK;
 }
 
-/***
-    RFC 6455 Section 5.2
-
-      0                   1                   2                   3
-      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-     +-+-+-+-+-------+-+-------------+-------------------------------+
-     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
-     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
-     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
-     | |1|2|3|       |K|             |                               |
-     +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
-     |     Extended payload length continued, if payload len == 127  |
-     + - - - - - - - - - - - - - - - +-------------------------------+
-     |                               |Masking-key, if MASK set to 1  |
-     +-------------------------------+-------------------------------+
-     | Masking-key (continued)       |          Payload Data         |
-     +-------------------------------- - - - - - - - - - - - - - - - +
-     :                     Payload Data continued ...                :
-     + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
-     |                     Payload Data continued ...                |
-     +---------------------------------------------------------------+
-*/
-
-static size_t ws_packethead(struct Curl_easy *data,
-                            size_t len, unsigned int flags)
+static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
+                         bool complete)
 {
-  struct HTTP *ws = data->req.p.http;
-  unsigned char *out = (unsigned char *)data->state.ulbuf;
-  unsigned char firstbyte = 0;
-  int outi;
-  unsigned char opcode;
-  if(flags & CURLWS_TEXT) {
-    opcode = WSBIT_OPCODE_TEXT;
-    infof(data, "WS: send OPCODE TEXT");
-  }
-  else if(flags & CURLWS_CLOSE) {
-    opcode = WSBIT_OPCODE_CLOSE;
-    infof(data, "WS: send OPCODE CLOSE");
-  }
-  else if(flags & CURLWS_PING) {
-    opcode = WSBIT_OPCODE_PING;
-    infof(data, "WS: send OPCODE PING");
-  }
-  else if(flags & CURLWS_PONG) {
-    opcode = WSBIT_OPCODE_PONG;
-    infof(data, "WS: send OPCODE PONG");
-  }
-  else {
-    opcode = WSBIT_OPCODE_BIN;
-    infof(data, "WS: send OPCODE BINARY");
-  }
-
-  if(!(flags & CURLWS_CONT)) {
-    if(!ws->ws.contfragment)
-      /* not marked as continuing, this is the final fragment */
-      firstbyte |= WSBIT_FIN | opcode;
-    else
-      /* marked as continuing, this is the final fragment; set CONT
-         opcode and FIN bit */
-      firstbyte |= WSBIT_FIN | WSBIT_OPCODE_CONT;
+  if(!Curl_bufq_is_empty(&ws->sendbuf)) {
+    CURLcode result;
+    const unsigned char *out;
+    size_t outlen;
+    ssize_t n;
 
-    ws->ws.contfragment = FALSE;
-    infof(data, "WS: set FIN");
-  }
-  else if(ws->ws.contfragment) {
-    /* the previous fragment was not a final one and this isn't either, keep a
-       CONT opcode and no FIN bit */
-    firstbyte |= WSBIT_OPCODE_CONT;
-    infof(data, "WS: keep CONT, no FIN");
-  }
-  else {
-    firstbyte = opcode;
-    ws->ws.contfragment = TRUE;
-    infof(data, "WS: set CONT, no FIN");
-  }
-  out[0] = firstbyte;
-  if(len > 65535) {
-    out[1] = 127 | WSBIT_MASK;
-    out[2] = (len >> 8) & 0xff;
-    out[3] = len & 0xff;
-    outi = 10;
-  }
-  else if(len > 126) {
-    out[1] = 126 | WSBIT_MASK;
-    out[2] = (len >> 8) & 0xff;
-    out[3] = len & 0xff;
-    outi = 4;
-  }
-  else {
-    out[1] = (unsigned char)len | WSBIT_MASK;
-    outi = 2;
+    while(Curl_bufq_peek(&ws->sendbuf, &out, &outlen)) {
+      if(data->set.connect_only)
+        result = Curl_senddata(data, out, outlen, &n);
+      else
+        result = Curl_write(data, data->conn->writesockfd, out, outlen, &n);
+      if(result) {
+        if(result == CURLE_AGAIN) {
+          if(!complete) {
+            infof(data, "WS: flush EAGAIN, %zu bytes remain in buffer",
+                  Curl_bufq_len(&ws->sendbuf));
+            return result;
+          }
+          /* TODO: the current design does not allow for buffered writes.
+           * We need to flush the buffer now. There is no ws_flush() later */
+          n = 0;
+          continue;
+        }
+        else if(result) {
+          failf(data, "WS: flush, write error %d", result);
+          return result;
+        }
+      }
+      else {
+        infof(data, "WS: flushed %zu bytes", (size_t)n);
+        Curl_bufq_skip(&ws->sendbuf, (size_t)n);
+      }
+    }
   }
-
-  infof(data, "WS: send FIN bit %u (byte %02x)",
-        firstbyte & WSBIT_FIN ? 1 : 0,
-        firstbyte);
-  infof(data, "WS: send payload len %u", (int)len);
-
-  /* 4 bytes mask */
-  memcpy(&out[outi], &ws->ws.mask, 4);
-
-  if(data->set.upload_buffer_size < (len + 10))
-    return 0;
-
-  /* pass over the mask */
-  outi += 4;
-
-  ws->ws.xori = 0;
-  /* return packet size */
-  return outi;
+  return CURLE_OK;
 }
 
 CURL_EXTERN CURLcode curl_ws_send(struct Curl_easy *data, const void *buffer,
@@ -643,109 +967,113 @@ CURL_EXTERN CURLcode curl_ws_send(struct Curl_easy *data, const void *buffer,
                                   curl_off_t totalsize,
                                   unsigned int sendflags)
 {
+  struct websocket *ws;
+  ssize_t nwritten, n;
+  size_t space;
   CURLcode result;
-  size_t headlen;
-  char *out;
-  ssize_t written;
-  struct websocket *wsp = &data->req.p.http->ws;
 
-  if(!data->set.ws_raw_mode) {
-    result = Curl_get_upload_buffer(data);
+  *sent = 0;
+  if(!data->conn && data->set.connect_only) {
+    result = Curl_connect_only_attach(data);
     if(result)
       return result;
   }
-  else {
-    if(totalsize || sendflags)
-      return CURLE_BAD_FUNCTION_ARGUMENT;
+  if(!data->conn) {
+    failf(data, "No associated connection");
+    return CURLE_SEND_ERROR;
   }
+  if(!data->conn->proto.ws) {
+    failf(data, "Not a websocket transfer on connection #%ld",
+          data->conn->connection_id);
+    return CURLE_SEND_ERROR;
+  }
+  ws = data->conn->proto.ws;
 
   if(data->set.ws_raw_mode) {
+    if(totalsize || sendflags)
+      return CURLE_BAD_FUNCTION_ARGUMENT;
     if(!buflen)
       /* nothing to do */
       return CURLE_OK;
     /* raw mode sends exactly what was requested, and this is from within
        the write callback */
     if(Curl_is_in_callback(data)) {
-      if(!data->conn) {
-        failf(data, "No associated connection");
-        return CURLE_SEND_ERROR;
-      }
       result = Curl_write(data, data->conn->writesockfd, buffer, buflen,
-                          &written);
+                          &nwritten);
     }
     else
-      result = Curl_senddata(data, buffer, buflen, &written);
+      result = Curl_senddata(data, buffer, buflen, &nwritten);
 
     infof(data, "WS: wanted to send %zu bytes, sent %zu bytes",
-          buflen, written);
-    *sent = written;
+          buflen, nwritten);
+    *sent = (nwritten >= 0)? (size_t)nwritten : 0;
     return result;
   }
 
-  if(buflen > (data->set.upload_buffer_size - 10))
-    /* don't do more than this in one go */
-    buflen = data->set.upload_buffer_size - 10;
+  /* Not RAW mode, buf we do the frame encoding */
+  result = ws_flush(data, ws, FALSE);
+  if(result)
+    return result;
+
+  /* Limit what we are willing to buffer */
+  space = Curl_bufq_space(&ws->sendbuf);
+  if(space < 14)
+    return CURLE_AGAIN;
+  if(buflen > space)
+    buflen = space;
 
   if(sendflags & CURLWS_OFFSET) {
     if(totalsize) {
       /* a frame series 'totalsize' bytes big, this is the first */
-      headlen = ws_packethead(data, totalsize, sendflags);
-      wsp->sleft = totalsize - buflen;
+      n = ws_enc_write_head(data, &ws->enc, sendflags, totalsize,
+                            &ws->sendbuf, &result);
+      if(n < 0)
+        return result;
     }
     else {
-      headlen = 0;
-      if((curl_off_t)buflen > wsp->sleft) {
-        infof(data, "WS: unaligned frame size (sending %zu instead of %zu)",
-              buflen, wsp->sleft);
-        wsp->sleft = 0;
+      if((curl_off_t)buflen > ws->enc.payload_remain) {
+        infof(data, "WS: unaligned frame size (sending %zu instead of %zd)",
+              buflen, ws->enc.payload_remain);
       }
-      else
-        wsp->sleft -= buflen;
     }
   }
-  else
-    headlen = ws_packethead(data, buflen, sendflags);
-
-  /* headlen is the size of the frame header */
-  out = data->state.ulbuf;
-  if(buflen)
-    /* for PING and PONG etc there might not be a payload */
-    ws_xor(data, buffer, (unsigned char *)out + headlen, buflen);
+  else if(!ws->enc.payload_remain) {
+    n = ws_enc_write_head(data, &ws->enc, sendflags, (curl_off_t)buflen,
+                          &ws->sendbuf, &result);
+    if(n < 0)
+      return result;
+  }
 
-  if(data->set.connect_only)
-    result = Curl_senddata(data, out, buflen + headlen, &written);
-  else
-    result = Curl_write(data, data->conn->writesockfd, out,
-                        buflen + headlen, &written);
+  n = ws_enc_write_payload(&ws->enc, data,
+                           buffer, buflen, &ws->sendbuf, &result);
+  if(n < 0)
+    return result;
 
-  infof(data, "WS: wanted to send %zu bytes, sent %zu bytes",
-        headlen + buflen, written);
+  *sent = (size_t)n;
+  return ws_flush(data, ws, TRUE);
+}
 
-  if(!result) {
-    /* the *sent number only counts "payload", excluding the header */
-    if((size_t)written > headlen)
-      *sent = written - headlen;
-    else
-      *sent = 0;
+static void ws_free(struct connectdata *conn)
+{
+  if(conn && conn->proto.ws) {
+    Curl_bufq_free(&conn->proto.ws->recvbuf);
+    Curl_bufq_free(&conn->proto.ws->sendbuf);
+    Curl_safefree(conn->proto.ws);
   }
-  return result;
 }
 
 void Curl_ws_done(struct Curl_easy *data)
 {
-  struct websocket *wsp = &data->req.p.http->ws;
-  DEBUGASSERT(wsp);
-  Curl_dyn_free(&wsp->buf);
+  (void)data;
 }
 
 CURLcode Curl_ws_disconnect(struct Curl_easy *data,
                             struct connectdata *conn,
                             bool dead_connection)
 {
-  struct ws_conn *wsc = &conn->proto.ws;
   (void)data;
   (void)dead_connection;
-  Curl_dyn_free(&wsc->early);
+  ws_free(conn);
   return CURLE_OK;
 }
 
@@ -753,9 +1081,9 @@ CURL_EXTERN struct curl_ws_frame *curl_ws_meta(struct Curl_easy *data)
 {
   /* we only return something for websocket, called from within the callback
      when not using raw mode */
-  if(GOOD_EASY_HANDLE(data) && Curl_is_in_callback(data) && data->req.p.http &&
-     !data->set.ws_raw_mode)
-    return &data->req.p.http->ws.frame;
+  if(GOOD_EASY_HANDLE(data) && Curl_is_in_callback(data) && data->conn &&
+     data->conn->proto.ws && !data->set.ws_raw_mode)
+    return &data->conn->proto.ws->frame;
   return NULL;
 }
 
index 176dda470b4e2cd7840f061126ac6ce5819ccaf6..0308a42545b60f46b64d8770936908329f6350bf 100644 (file)
--- a/lib/ws.h
+++ b/lib/ws.h
 #define REQTYPE struct dynbuf
 #endif
 
-/* this is the largest single fragment size we support */
-#define MAX_WS_SIZE 65535
+/* a client-side WS frame decoder, parsing frame headers and
+ * payload, keeping track of current position and stats */
+enum ws_dec_state {
+  WS_DEC_INIT,
+  WS_DEC_HEAD,
+  WS_DEC_PAYLOAD
+};
 
-/* part of 'struct HTTP', when used in the 'struct SingleRequest' in the
-   Curl_easy struct */
-struct websocket {
-  bool contfragment; /* set TRUE if the previous fragment sent was not final */
-  unsigned char mask[4]; /* 32 bit mask for this connection */
-  struct Curl_easy *data; /* used for write callback handling */
-  struct dynbuf buf;
-  size_t usedbuf; /* number of leading bytes in 'buf' the most recent complete
-                     websocket frame uses */
-  struct curl_ws_frame frame; /* the struct used for frame state */
-  size_t stillblen; /* number of bytes left in the buffer to deliver in
-                         the next curl_ws_recv() call */
-  const char *stillb; /* the stillblen pending bytes are here */
-  curl_off_t sleft; /* outstanding number of payload bytes left to send */
+struct ws_decoder {
+  int frame_age;        /* zero */
+  int frame_flags;      /* See the CURLWS_* defines */
+  curl_off_t payload_offset;   /* the offset parsing is at */
+  curl_off_t payload_len;
+  unsigned char head[10];
+  int head_len, head_total;
+  enum ws_dec_state state;
+};
+
+/* a client-side WS frame encoder, generating frame headers and
+ * converting payloads, tracking remaining data in current frame */
+struct ws_encoder {
+  curl_off_t payload_len;  /* payload length of current frame */
+  curl_off_t payload_remain;  /* remaining payload of current */
   unsigned int xori; /* xor index */
+  unsigned char mask[4]; /* 32 bit mask for this connection */
+  unsigned char firstbyte; /* first byte of frame we encode */
+  bool contfragment; /* set TRUE if the previous fragment sent was not final */
 };
 
-struct ws_conn {
-  struct dynbuf early; /* data already read when switching to ws */
+/* A websocket connection with en- and decoder that treat frames
+ * and keep track of boundaries. */
+struct websocket {
+  struct Curl_easy *data; /* used for write callback handling */
+  struct ws_decoder dec;  /* decode of we frames */
+  struct ws_encoder enc;  /* decode of we frames */
+  struct bufq recvbuf;    /* raw data from the server */
+  struct bufq sendbuf;    /* raw data to be sent to the server */
+  struct curl_ws_frame frame;  /* the current WS FRAME received */
 };
 
 CURLcode Curl_ws_request(struct Curl_easy *data, REQTYPE *req);
index 83027029b8956cd16a6341095843d8522d713d6a..58cafe0bc151e200d43603f98f42093c43978339 100644 (file)
@@ -67,6 +67,20 @@ static int recv_pong(CURL *curl, const char *exected_payload)
   return (int)result;
 }
 
+static int recv_any(CURL *curl)
+{
+  size_t rlen;
+  struct curl_ws_frame *meta;
+  char buffer[256];
+  CURLcode result = curl_ws_recv(curl, buffer, sizeof(buffer), &rlen, &meta);
+  if(result)
+    return result;
+
+  fprintf(stderr, "recv_any: got %u bytes rflags %x\n", (int)rlen,
+          meta->flags);
+  return 0;
+}
+
 /* just close the connection */
 static void websocket_close(CURL *curl)
 {
@@ -82,6 +96,7 @@ static void websocket(CURL *curl)
   int i = 0;
   fprintf(stderr, "ws: websocket() starts\n");
   do {
+    recv_any(curl);
     fprintf(stderr, "Send ping\n");
     if(ping(curl, "foobar"))
       return;