]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
vquic: stabilization and improvements
authorStefan Eissing <stefan@eissing.org>
Thu, 9 Feb 2023 09:49:04 +0000 (10:49 +0100)
committerDaniel Stenberg <daniel@haxx.se>
Fri, 10 Feb 2023 09:26:40 +0000 (10:26 +0100)
vquic stabilization
- udp send code shared between ngtcp2 and quiche
- quiche handling of data and events improved

ngtcp2 and pytest improvements
- fixes handling of "drain" situations, discovered in scorecard
  tests with the Caddy server.
- improvements in handling transfers that have already  data or
  are already closed to make an early return on recv

pytest
- adding caddy tests when available

scorecard improvemnts.
- using correct caddy port
- allowing tests for only httpd or caddy

Closes #10451

15 files changed:
lib/http.h
lib/vquic/curl_ngtcp2.c
lib/vquic/curl_quiche.c
lib/vquic/vquic.c
lib/vquic/vquic_int.h
tests/tests-httpd/config.ini.in
tests/tests-httpd/scorecard.py
tests/tests-httpd/test_02_download.py
tests/tests-httpd/test_05_errors.py
tests/tests-httpd/test_07_upload.py
tests/tests-httpd/test_08_caddy.py [new file with mode: 0644]
tests/tests-httpd/testenv/caddy.py
tests/tests-httpd/testenv/curl.py
tests/tests-httpd/testenv/env.py
tests/tests-httpd/testenv/nghttpx.py

index 6c5c79d38ef16c142710f0fd25f5c157ee7e5751..735729c4ada7b92dfbabd67750218d31e38d4d43 100644 (file)
@@ -264,7 +264,7 @@ struct HTTP {
   bool upload_done;
 #endif /* ENABLE_QUIC */
 #ifdef USE_NGHTTP3
-  size_t unacked_window;
+  size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
   struct h3out *h3out; /* per-stream buffers for upload */
   struct dynbuf overflow; /* excess data received during a single Curl_read */
 #endif /* USE_NGHTTP3 */
@@ -291,6 +291,8 @@ struct HTTP {
 #ifdef USE_QUICHE
   bool h3_got_header; /* TRUE when h3 stream has recvd some HEADER */
   bool h3_recving_data; /* TRUE when h3 stream is reading DATA */
+  bool h3_body_pending; /* TRUE when h3 stream may have more body DATA */
+  struct h3_event_node *pending;
 #endif /* USE_QUICHE */
 };
 
index 8abd6a654cd26a5c11561a0044b7d85102842c4f..32658c718a4069917a5270fa5d0ac09befacb596 100644 (file)
@@ -58,6 +58,7 @@
 #include "dynbuf.h"
 #include "select.h"
 #include "vquic.h"
+#include "vquic_int.h"
 #include "h2h3.h"
 #include "vtls/keylog.h"
 #include "vtls/vtls.h"
@@ -120,16 +121,9 @@ void Curl_ngtcp2_ver(char *p, size_t len)
                   ng2->version_str, ht3->version_str);
 }
 
-struct blocked_pkt {
-  const uint8_t *pkt;
-  size_t pktlen;
-  size_t gsolen;
-};
-
 struct cf_ngtcp2_ctx {
-  curl_socket_t sockfd;
-  struct sockaddr_storage local_addr;
-  socklen_t local_addrlen;
+  struct cf_quic_ctx q;
+  ngtcp2_path connected_path;
   ngtcp2_conn *qconn;
   ngtcp2_cid dcid;
   ngtcp2_cid scid;
@@ -147,16 +141,6 @@ struct cf_ngtcp2_ctx {
   WOLFSSL_CTX *sslctx;
   WOLFSSL *ssl;
 #endif
-  bool no_gso;
-  uint8_t *pktbuf;
-  size_t pktbuflen;
-  /* the number of entries in blocked_pkt */
-  size_t num_blocked_pkt;
-  /* the number of processed entries in blocked_pkt */
-  size_t num_blocked_pkt_sent;
-  /* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */
-  struct blocked_pkt blocked_pkt[2];
-
   struct cf_call_data call_data;
   nghttp3_conn *h3conn;
   nghttp3_settings h3settings;
@@ -235,6 +219,8 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx,
 {
   ngtcp2_settings *s = &ctx->settings;
   ngtcp2_transport_params *t = &ctx->transport_params;
+  size_t stream_win_size = CURL_MAX_READ_SIZE;
+
   ngtcp2_settings_default(s);
   ngtcp2_transport_params_default(t);
 #ifdef DEBUG_NGTCP2
@@ -242,13 +228,19 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx,
 #else
   s->log_printf = NULL;
 #endif
+
+  (void)data;
   s->initial_ts = timestamp();
-  t->initial_max_stream_data_bidi_local = data->set.buffer_size;
-  t->initial_max_stream_data_bidi_remote = QUIC_MAX_STREAMS;
-  t->initial_max_stream_data_uni = QUIC_MAX_STREAMS;
-  t->initial_max_data = QUIC_MAX_DATA;
-  t->initial_max_streams_bidi = 1;
-  t->initial_max_streams_uni = 3;
+  s->handshake_timeout = NGTCP2_DEFAULT_HANDSHAKE_TIMEOUT;
+  s->max_window = 100 * stream_win_size;
+  s->max_stream_window = stream_win_size;
+
+  t->initial_max_data = 10 * stream_win_size;
+  t->initial_max_stream_data_bidi_local = stream_win_size;
+  t->initial_max_stream_data_bidi_remote = stream_win_size;
+  t->initial_max_stream_data_uni = stream_win_size;
+  t->initial_max_streams_bidi = QUIC_MAX_STREAMS;
+  t->initial_max_streams_uni = QUIC_MAX_STREAMS;
   t->max_idle_timeout = QUIC_IDLE_TIMEOUT;
   if(ctx->qlogfd != -1) {
     s->qlog.write = qlog_callback;
@@ -606,15 +598,39 @@ static int cb_handshake_completed(ngtcp2_conn *tconn, void *user_data)
   return 0;
 }
 
-static void extend_stream_window(ngtcp2_conn *tconn,
-                                 struct HTTP *stream)
+static void report_consumed_data(struct Curl_cfilter *cf,
+                                 struct Curl_easy *data,
+                                 size_t consumed)
 {
-  size_t thismuch = stream->unacked_window;
-  ngtcp2_conn_extend_max_stream_offset(tconn, stream->stream3_id, thismuch);
-  ngtcp2_conn_extend_max_offset(tconn, thismuch);
-  stream->unacked_window = 0;
-}
+  struct HTTP *stream = data->req.p.http;
+  struct cf_ngtcp2_ctx *ctx = cf->ctx;
 
+  /* the HTTP/1.1 response headers are written to the buffer, but
+   * consuming those does not count against flow control. */
+  if(stream->recv_buf_nonflow) {
+    if(consumed >= stream->recv_buf_nonflow) {
+      consumed -= stream->recv_buf_nonflow;
+      stream->recv_buf_nonflow = 0;
+    }
+    else {
+      stream->recv_buf_nonflow -= consumed;
+      consumed = 0;
+    }
+  }
+  if(consumed > 0) {
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] consumed %zu DATA bytes",
+                  stream->stream3_id, consumed));
+    ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->stream3_id,
+                                         consumed);
+    ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
+  }
+  if(!stream->closed && data->state.drain
+     && !stream->memlen
+     && !Curl_dyn_len(&stream->overflow)) {
+     /* nothing buffered any more */
+     data->state.drain = 0;
+  }
+}
 
 static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
                                int64_t stream_id, uint64_t offset,
@@ -631,7 +647,7 @@ static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
 
   nconsumed =
     nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin);
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] read_stream(len=%zu) -> %zd",
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] read_stream(len=%zu) -> %zd",
                 stream_id, buflen, nconsumed));
   if(nconsumed < 0) {
     ngtcp2_connection_close_error_set_application_error(
@@ -672,23 +688,26 @@ cb_acked_stream_data_offset(ngtcp2_conn *tconn, int64_t stream_id,
 }
 
 static int cb_stream_close(ngtcp2_conn *tconn, uint32_t flags,
-                           int64_t stream_id, uint64_t app_error_code,
+                           int64_t stream3_id, uint64_t app_error_code,
                            void *user_data, void *stream_user_data)
 {
   struct Curl_cfilter *cf = user_data;
+  struct Curl_easy *data = stream_user_data;
   struct cf_ngtcp2_ctx *ctx = cf->ctx;
   int rv;
 
   (void)tconn;
-  (void)stream_user_data;
+  (void)data;
   /* stream is closed... */
 
   if(!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
     app_error_code = NGHTTP3_H3_NO_ERROR;
   }
 
-  rv = nghttp3_conn_close_stream(ctx->h3conn, stream_id,
+  rv = nghttp3_conn_close_stream(ctx->h3conn, stream3_id,
                                  app_error_code);
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] quic close(err=%"
+                PRIu64 ") -> %d", stream3_id, app_error_code, rv));
   if(rv) {
     ngtcp2_connection_close_error_set_application_error(
         &ctx->last_error, nghttp3_err_infer_quic_app_error_code(rv), NULL, 0);
@@ -712,7 +731,7 @@ static int cb_stream_reset(ngtcp2_conn *tconn, int64_t stream_id,
   (void)data;
 
   rv = nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream_id);
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] reset -> %d", stream_id, rv));
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv));
   if(rv) {
     return NGTCP2_ERR_CALLBACK_FAILURE;
   }
@@ -874,7 +893,7 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
   struct cf_call_data save;
 
   CF_DATA_SAVE(save, cf, data);
-  socks[0] = ctx->sockfd;
+  socks[0] = ctx->q.sockfd;
 
   /* in an HTTP/3 connection we can basically always get a frame so we should
      always be ready for one */
@@ -894,6 +913,17 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
   return rv;
 }
 
+static void notify_drain(struct Curl_cfilter *cf,
+                         struct Curl_easy *data)
+{
+  (void)cf;
+  if(!data->state.drain) {
+    data->state.drain = 1;
+    Curl_expire(data, 0, EXPIRE_RUN_NOW);
+  }
+}
+
+
 static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
                               uint64_t app_error_code, void *user_data,
                               void *stream_user_data)
@@ -906,7 +936,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
   (void)app_error_code;
   (void)cf;
 
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] close(err=%" PRIx64 ")",
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] h3 close(err=%" PRIx64 ")",
                 stream_id, app_error_code));
   stream->closed = TRUE;
   stream->error3 = app_error_code;
@@ -915,64 +945,68 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
      * the response before it was complete. */
     stream->reset = TRUE;
   }
-  Curl_expire(data, 0, EXPIRE_QUIC);
-  /* make sure that ngh3_stream_recv is called again to complete the transfer
-     even if there are no more packets to be received from the server. */
-  data->state.drain = 1;
+  notify_drain(cf, data);
   return 0;
 }
 
 /*
- * write_data() copies data to the stream's receive buffer. If not enough
* space is available in the receive buffer, it copies the rest to the
- * stream's overflow buffer.
+ * write_resp_raw() copies resonse data in raw format to the `data`'s
 * receive buffer. If not enough space is available, it appends to the
+ * `data`'s overflow buffer.
  */
-static CURLcode write_data(struct HTTP *stream, const void *mem, size_t memlen)
+static CURLcode write_resp_raw(struct Curl_cfilter *cf,
+                               struct Curl_easy *data,
+                               const void *mem, size_t memlen,
+                               bool flow)
 {
+  struct HTTP *stream = data->req.p.http;
   CURLcode result = CURLE_OK;
   const char *buf = mem;
   size_t ncopy = memlen;
   /* copy as much as possible to the receive buffer */
   if(stream->len) {
     size_t len = CURLMIN(ncopy, stream->len);
-    memcpy(stream->mem, buf, len);
+    memcpy(stream->mem + stream->memlen, buf, len);
     stream->len -= len;
     stream->memlen += len;
-    stream->mem += len;
     buf += len;
     ncopy -= len;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] resp_raw: added %zu bytes"
+                  " to data buffer", stream->stream3_id, len));
   }
   /* copy the rest to the overflow buffer */
   if(ncopy) {
     result = Curl_dyn_addn(&stream->overflow, buf, ncopy);
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] resp_raw: added %zu bytes"
+                  " to overflow buffer -> %d",
+                  stream->stream3_id, ncopy, result));
+    notify_drain(cf, data);
+  }
+
+  if(!flow)
+    stream->recv_buf_nonflow += memlen;
+  if(CF_DATA_CURRENT(cf) != data) {
+    notify_drain(cf, data);
   }
   return result;
 }
 
-static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream_id,
+static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
                            const uint8_t *buf, size_t buflen,
                            void *user_data, void *stream_user_data)
 {
   struct Curl_cfilter *cf = user_data;
   struct Curl_easy *data = stream_user_data;
-  struct HTTP *stream = data->req.p.http;
-  CURLcode result = CURLE_OK;
+  CURLcode result;
+
   (void)conn;
-  (void)cf;
+  (void)stream3_id;
 
-  result = write_data(stream, buf, buflen);
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] recv_data(len=%zu) -> %d",
-                stream_id, buflen, result));
-  if(result) {
-    return -1;
-  }
-  stream->unacked_window += buflen;
-  (void)stream_id;
-  (void)user_data;
-  return 0;
+  result = write_resp_raw(cf, data, buf, buflen, TRUE);
+  return result? -1 : 0;
 }
 
-static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
+static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream3_id,
                                   size_t consumed, void *user_data,
                                   void *stream_user_data)
 {
@@ -980,9 +1014,10 @@ static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
   struct cf_ngtcp2_ctx *ctx = cf->ctx;
   (void)conn;
   (void)stream_user_data;
-  (void)stream_id;
 
-  ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream_id, consumed);
+  /* nghttp3 has consumed bytes on the QUIC stream and we need to
+   * tell the QUIC connection to increase its flow control */
+  ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream3_id, consumed);
   ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
   return 0;
 }
@@ -1028,13 +1063,13 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
 
   /* add a CRLF only if we've received some headers */
   if(stream->firstheader) {
-    result = write_data(stream, "\r\n", 2);
+    result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
     if(result) {
       return -1;
     }
   }
 
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] end_headers(status_code=%d",
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] end_headers(status_code=%d",
                 stream_id, stream->status_code));
   if(stream->status_code / 100 != 1) {
     stream->bodystarted = TRUE;
@@ -1062,41 +1097,43 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t stream_id,
   if(token == NGHTTP3_QPACK_TOKEN__STATUS) {
     char line[14]; /* status line is always 13 characters long */
     size_t ncopy;
+
+    DEBUGASSERT(!stream->firstheader);
     stream->status_code = decode_status_code(h3val.base, h3val.len);
     DEBUGASSERT(stream->status_code != -1);
     ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n",
                       stream->status_code);
-    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] status: %s",
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] status: %s",
                   stream_id, line));
-    result = write_data(stream, line, ncopy);
+    result = write_resp_raw(cf, data, line, ncopy, FALSE);
     if(result) {
       return -1;
     }
+    stream->firstheader = TRUE;
   }
   else {
     /* store as an HTTP1-style header */
-    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] header: %.*s: %.*s",
+    DEBUGASSERT(stream->firstheader);
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] header: %.*s: %.*s",
                   stream_id, (int)h3name.len, h3name.base,
                   (int)h3val.len, h3val.base));
-    result = write_data(stream, h3name.base, h3name.len);
+    result = write_resp_raw(cf, data, h3name.base, h3name.len, FALSE);
     if(result) {
       return -1;
     }
-    result = write_data(stream, ": ", 2);
+    result = write_resp_raw(cf, data, ": ", 2, FALSE);
     if(result) {
       return -1;
     }
-    result = write_data(stream, h3val.base, h3val.len);
+    result = write_resp_raw(cf, data, h3val.base, h3val.len, FALSE);
     if(result) {
       return -1;
     }
-    result = write_data(stream, "\r\n", 2);
+    result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
     if(result) {
       return -1;
     }
   }
-
-  stream->firstheader = TRUE;
   return 0;
 }
 
@@ -1130,7 +1167,7 @@ static int cb_h3_reset_stream(nghttp3_conn *conn, int64_t stream_id,
 
   rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, stream_id,
                                          app_error_code);
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] reset -> %d", stream_id, rv));
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv));
   if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) {
     return NGTCP2_ERR_CALLBACK_FAILURE;
   }
@@ -1215,22 +1252,74 @@ static int init_ngh3_conn(struct Curl_cfilter *cf)
   return result;
 }
 
-static size_t drain_overflow_buffer(struct HTTP *stream)
+static void drain_overflow_buffer(struct Curl_cfilter *cf,
+                                  struct Curl_easy *data)
 {
+  struct HTTP *stream = data->req.p.http;
   size_t overlen = Curl_dyn_len(&stream->overflow);
   size_t ncopy = CURLMIN(overlen, stream->len);
+
+  (void)cf;
   if(ncopy > 0) {
-    memcpy(stream->mem, Curl_dyn_ptr(&stream->overflow), ncopy);
+    memcpy(stream->mem + stream->memlen,
+           Curl_dyn_ptr(&stream->overflow), ncopy);
     stream->len -= ncopy;
-    stream->mem += ncopy;
     stream->memlen += ncopy;
     if(ncopy != overlen)
       /* make the buffer only keep the tail */
       (void)Curl_dyn_tail(&stream->overflow, overlen - ncopy);
-    else
+    else {
       Curl_dyn_reset(&stream->overflow);
+    }
   }
-  return ncopy;
+}
+
+static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
+                                  struct Curl_easy *data,
+                                  CURLcode *err)
+{
+  struct HTTP *stream = data->req.p.http;
+  ssize_t nread = -1;
+
+  if(stream->reset) {
+    failf(data,
+          "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id);
+    *err = CURLE_PARTIAL_FILE;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, was reset -> %d",
+                  stream->stream3_id, *err));
+    goto out;
+  }
+  else if(stream->error3 != NGHTTP3_H3_NO_ERROR) {
+    failf(data,
+          "HTTP/3 stream %" PRId64 " was not closed cleanly: (err 0x%" PRIx64
+          ")",
+          stream->stream3_id, stream->error3);
+    *err = CURLE_HTTP3;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed uncleanly"
+                  " -> %d", stream->stream3_id, *err));
+    goto out;
+  }
+
+  if(!stream->bodystarted) {
+    failf(data,
+          "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting"
+          " all response header fields, treated as error",
+          stream->stream3_id);
+    *err = CURLE_HTTP3;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed incomplete"
+                  " -> %d", stream->stream3_id, *err));
+    goto out;
+  }
+  else {
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed ok"
+                  " -> %d", stream->stream3_id, *err));
+  }
+  *err = CURLE_OK;
+  nread = 0;
+
+out:
+  data->state.drain = 0;
+  return nread;
 }
 
 /* incoming data frames on the h3 stream */
@@ -1249,85 +1338,72 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
   DEBUGASSERT(ctx->h3conn);
   *err = CURLE_OK;
 
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) start",
+                stream->stream3_id, len));
+  /* TODO: this implementation of response DATA buffering is fragile.
+   * It makes the following assumptions:
+   * - the `buf` passed here has the same lifetime as the easy handle
+   * - data returned in `buf` from this call is immediately used and `buf`
+   *   can be overwritten during any handling of other transfers at
+   *   this connection.
+   */
   if(!stream->memlen) {
-    /* remember where to store incoming data for this stream and how big the
-       buffer is */
+    /* `buf` was not known before or is currently not used by stream,
+     * assign it (again). */
     stream->mem = buf;
     stream->len = len;
   }
-  /* else, there's data in the buffer already */
 
-  /* if there's data in the overflow buffer from a previous call, copy as much
-     as possible to the receive buffer before receiving more */
-  drain_overflow_buffer(stream);
+  /* if there's data in the overflow buffer, move as much
+     as possible to the receive buffer now */
+  drain_overflow_buffer(cf, data);
 
   if(cf_process_ingress(cf, data)) {
     *err = CURLE_RECV_ERROR;
     nread = -1;
     goto out;
   }
-  if(cf_flush_egress(cf, data)) {
-    *err = CURLE_SEND_ERROR;
-    nread = -1;
-    goto out;
-  }
 
   if(stream->memlen) {
     nread = stream->memlen;
-    /* data arrived */
     /* reset to allow more data to come */
-    stream->memlen = 0;
+    /* TODO: very brittle buffer use design:
+     * - stream->mem has now `nread` bytes of response data
+     * - we assume that the caller will use those immediately and
+     *   we can overwrite that with new data on our next invocation from
+     *   anywhere.
+     */
     stream->mem = buf;
+    stream->memlen = 0;
     stream->len = len;
     /* extend the stream window with the data we're consuming and send out
        any additional packets to tell the server that we can receive more */
-    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] recv, consumed %zd bytes",
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv -> %zd bytes",
                   stream->stream3_id, nread));
-    extend_stream_window(ctx->qconn, stream);
+    report_consumed_data(cf, data, nread);
     if(cf_flush_egress(cf, data)) {
       *err = CURLE_SEND_ERROR;
       nread = -1;
-      goto out;
     }
     goto out;
   }
 
   if(stream->closed) {
-    if(stream->reset) {
-      failf(data,
-            "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id);
-      *err = CURLE_PARTIAL_FILE;
-      nread = -1;
-      goto out;
-    }
-    else if(stream->error3 != NGHTTP3_H3_NO_ERROR) {
-      failf(data,
-            "HTTP/3 stream %" PRId64 " was not closed cleanly: (err 0x%" PRIx64
-            ")",
-            stream->stream3_id, stream->error3);
-      *err = CURLE_HTTP3;
-      nread = -1;
-      goto out;
-    }
-
-    if(!stream->bodystarted) {
-      failf(data,
-            "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting"
-            " all response header fields, treated as error",
-            stream->stream3_id);
-      *err = CURLE_HTTP3;
-      nread = -1;
-      goto out;
-    }
-
-    nread = 0;
+    nread = recv_closed_stream(cf, data, err);
     goto out;
   }
 
-  DEBUGF(LOG_CF(data, cf, "cf_ngtcp2_recv returns EAGAIN"));
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv -> EAGAIN",
+                stream->stream3_id));
   *err = CURLE_AGAIN;
   nread = -1;
 out:
+  if(cf_flush_egress(cf, data)) {
+    *err = CURLE_SEND_ERROR;
+    nread = -1;
+    goto out;
+  }
+
   CF_DATA_RESTORE(cf, save);
   return nread;
 }
@@ -1457,6 +1533,7 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf,
   stream->stream3_id = stream3_id;
   stream->h3req = TRUE;
   Curl_dyn_init(&stream->overflow, CURL_MAX_READ_SIZE);
+  stream->recv_buf_nonflow = 0;
 
   result = Curl_pseudo_headers(data, mem, len, NULL, &hreq);
   if(result)
@@ -1500,8 +1577,6 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf,
     }
     stream->h3out = h3out;
 
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] sending request %s, with_body=%d",
-                  stream->stream3_id, data->state.url, !!stream->upload_left));
     rc = nghttp3_conn_submit_request(ctx->h3conn, stream->stream3_id,
                                      nva, nheader, &data_reader, data);
     if(rc)
@@ -1509,8 +1584,6 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf,
     break;
   }
   default:
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] sending request %s",
-                  stream->stream3_id, data->state.url));
     stream->upload_left = 0; /* nothing left to send */
     rc = nghttp3_conn_submit_request(ctx->h3conn, stream->stream3_id,
                                      nva, nheader, NULL, data);
@@ -1521,9 +1594,9 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf,
 
   Curl_safefree(nva);
 
-  infof(data, "Using HTTP/3 Stream ID: %" PRIx64 " (easy handle %p)",
+  infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)",
         stream3_id, (void *)data);
-  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] opened for %s",
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s",
                 stream3_id, data->state.url));
 
   Curl_pseudo_free(hreq);
@@ -1533,12 +1606,12 @@ fail:
   if(rc) {
     switch(rc) {
     case NGHTTP3_ERR_CONN_CLOSING:
-      DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] failed to send, "
+      DEBUGF(LOG_CF(data, cf, "h3sid[%"PRId64"] failed to send, "
                     "connection is closing", stream->stream3_id));
       result = CURLE_RECV_ERROR;
       break;
     default:
-      DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] failed to send -> %d (%s)",
+      DEBUGF(LOG_CF(data, cf, "h3sid[%"PRId64"] failed to send -> %d (%s)",
                     stream->stream3_id, rc, ngtcp2_strerror(rc)));
       result = CURLE_SEND_ERROR;
       break;
@@ -1685,6 +1758,7 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
   int rv;
   uint8_t buf[65536];
   size_t bufsize = sizeof(buf);
+  size_t pktcount = 0, total_recvd = 0;
   struct sockaddr_storage remote_addr;
   socklen_t remote_addrlen;
   ngtcp2_path path;
@@ -1693,7 +1767,7 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
 
   for(;;) {
     remote_addrlen = sizeof(remote_addr);
-    while((recvd = recvfrom(ctx->sockfd, (char *)buf, bufsize, 0,
+    while((recvd = recvfrom(ctx->q.sockfd, (char *)buf, bufsize, 0,
                             (struct sockaddr *)&remote_addr,
                             &remote_addrlen)) == -1 &&
           SOCKERRNO == EINTR)
@@ -1703,7 +1777,7 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
         DEBUGF(LOG_CF(data, cf, "ingress, recvfrom -> EAGAIN"));
         goto out;
       }
-      if(SOCKERRNO == ECONNREFUSED) {
+      if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
         const char *r_ip;
         int r_port;
         Curl_cf_socket_peek(cf->next, data, NULL, NULL,
@@ -1722,12 +1796,14 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
       ctx->got_first_byte = TRUE;
     }
 
-    ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->local_addr,
-                     ctx->local_addrlen);
+    ++pktcount;
+    total_recvd += recvd;
+
+    ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->q.local_addr,
+                     ctx->q.local_addrlen);
     ngtcp2_addr_init(&path.remote, (struct sockaddr *)&remote_addr,
                      remote_addrlen);
 
-    DEBUGF(LOG_CF(data, cf, "ingress, recvd pkt of %zd bytes", recvd));
     rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, buf, recvd, ts);
     if(rv) {
       DEBUGF(LOG_CF(data, cf, "ingress, read_pkt -> %s",
@@ -1753,190 +1829,10 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
   }
 
 out:
-  return CURLE_OK;
-}
-
-static CURLcode do_sendmsg(struct Curl_cfilter *cf,
-                           struct Curl_easy *data,
-                           const uint8_t *pkt, size_t pktlen, size_t gsolen,
-                           size_t *sent);
-
-static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
-                                   struct Curl_easy *data,
-                                   const uint8_t *pkt, size_t pktlen,
-                                   size_t gsolen, size_t *psent)
-{
-  const uint8_t *p, *end = pkt + pktlen;
-  size_t sent;
-
-  *psent = 0;
-
-  for(p = pkt; p < end; p += gsolen) {
-    size_t len = CURLMIN(gsolen, (size_t)(end - p));
-    CURLcode curlcode = do_sendmsg(cf, data, p, len, len, &sent);
-    if(curlcode != CURLE_OK) {
-      return curlcode;
-    }
-    *psent += sent;
-  }
-
-  return CURLE_OK;
-}
-
-static CURLcode do_sendmsg(struct Curl_cfilter *cf,
-                           struct Curl_easy *data,
-                           const uint8_t *pkt, size_t pktlen, size_t gsolen,
-                           size_t *psent)
-{
-  struct cf_ngtcp2_ctx *ctx = cf->ctx;
-#ifdef HAVE_SENDMSG
-  struct iovec msg_iov;
-  struct msghdr msg = {0};
-  ssize_t sent;
-#if defined(__linux__) && defined(UDP_SEGMENT)
-  uint8_t msg_ctrl[32];
-  struct cmsghdr *cm;
-#endif
-
-  *psent = 0;
-  msg_iov.iov_base = (uint8_t *)pkt;
-  msg_iov.iov_len = pktlen;
-  msg.msg_iov = &msg_iov;
-  msg.msg_iovlen = 1;
-
-#if defined(__linux__) && defined(UDP_SEGMENT)
-  if(pktlen > gsolen) {
-    /* Only set this, when we need it. macOS, for example,
-     * does not seem to like a msg_control of length 0. */
-    msg.msg_control = msg_ctrl;
-    assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t)));
-    msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
-    cm = CMSG_FIRSTHDR(&msg);
-    cm->cmsg_level = SOL_UDP;
-    cm->cmsg_type = UDP_SEGMENT;
-    cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
-    *(uint16_t *)(void *)CMSG_DATA(cm) = gsolen & 0xffff;
-  }
-#endif
-
-
-  while((sent = sendmsg(ctx->sockfd, &msg, 0)) == -1 && SOCKERRNO == EINTR)
-    ;
-
-  if(sent == -1) {
-    switch(SOCKERRNO) {
-    case EAGAIN:
-#if EAGAIN != EWOULDBLOCK
-    case EWOULDBLOCK:
-#endif
-      return CURLE_AGAIN;
-    case EMSGSIZE:
-      /* UDP datagram is too large; caused by PMTUD. Just let it be lost. */
-      break;
-    case EIO:
-      if(pktlen > gsolen) {
-        /* GSO failure */
-        failf(data, "sendmsg() returned %zd (errno %d); disable GSO", sent,
-              SOCKERRNO);
-        ctx->no_gso = TRUE;
-        return send_packet_no_gso(cf, data, pkt, pktlen, gsolen, psent);
-      }
-      /* FALLTHROUGH */
-    default:
-      failf(data, "sendmsg() returned %zd (errno %d)", sent, SOCKERRNO);
-      return CURLE_SEND_ERROR;
-    }
-  }
-  else {
-    assert(pktlen == (size_t)sent);
-  }
-#else
-  ssize_t sent;
-  (void)gsolen;
-
-  *psent = 0;
-
-  while((sent = send(ctx->sockfd, (const char *)pkt, pktlen, 0)) == -1 &&
-        SOCKERRNO == EINTR)
-    ;
-
-  if(sent == -1) {
-    if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
-      return CURLE_AGAIN;
-    }
-    else {
-      failf(data, "send() returned %zd (errno %d)", sent, SOCKERRNO);
-      if(SOCKERRNO != EMSGSIZE) {
-        return CURLE_SEND_ERROR;
-      }
-      /* UDP datagram is too large; caused by PMTUD. Just let it be
-         lost. */
-    }
-  }
-#endif
-
-  *psent = pktlen;
-
-  return CURLE_OK;
-}
-
-static CURLcode send_packet(struct Curl_cfilter *cf,
-                            struct Curl_easy *data,
-                            const uint8_t *pkt, size_t pktlen, size_t gsolen,
-                            size_t *psent)
-{
-  struct cf_ngtcp2_ctx *ctx = cf->ctx;
-
-  DEBUGF(LOG_CF(data, cf, "egress, send %zu bytes", pktlen));
-  if(ctx->no_gso && pktlen > gsolen) {
-    return send_packet_no_gso(cf, data, pkt, pktlen, gsolen, psent);
-  }
-
-  return do_sendmsg(cf, data, pkt, pktlen, gsolen, psent);
-}
-
-static void push_blocked_pkt(struct Curl_cfilter *cf, const uint8_t *pkt,
-                             size_t pktlen, size_t gsolen)
-{
-  struct cf_ngtcp2_ctx *ctx = cf->ctx;
-  struct blocked_pkt *blkpkt;
-
-  assert(ctx->num_blocked_pkt <
-         sizeof(ctx->blocked_pkt) / sizeof(ctx->blocked_pkt[0]));
-
-  blkpkt = &ctx->blocked_pkt[ctx->num_blocked_pkt++];
-
-  blkpkt->pkt = pkt;
-  blkpkt->pktlen = pktlen;
-  blkpkt->gsolen = gsolen;
-}
-
-static CURLcode send_blocked_pkt(struct Curl_cfilter *cf,
-                                 struct Curl_easy *data)
-{
-  struct cf_ngtcp2_ctx *ctx = cf->ctx;
-  size_t sent;
-  CURLcode curlcode;
-  struct blocked_pkt *blkpkt;
-
-  for(; ctx->num_blocked_pkt_sent < ctx->num_blocked_pkt;
-      ++ctx->num_blocked_pkt_sent) {
-    blkpkt = &ctx->blocked_pkt[ctx->num_blocked_pkt_sent];
-    curlcode = send_packet(cf, data, blkpkt->pkt,
-                           blkpkt->pktlen, blkpkt->gsolen, &sent);
-
-    if(curlcode) {
-      if(curlcode == CURLE_AGAIN) {
-        blkpkt->pkt += sent;
-        blkpkt->pktlen -= sent;
-      }
-      return curlcode;
-    }
-  }
-
-  ctx->num_blocked_pkt = 0;
-  ctx->num_blocked_pkt_sent = 0;
-
+  (void)pktcount;
+  (void)total_recvd;
+  DEBUGF(LOG_CF(data, cf, "ingress, recvd %zu packets with %zd bytes",
+                pktcount, total_recvd));
   return CURLE_OK;
 }
 
@@ -1947,15 +1843,15 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
   int rv;
   size_t sent;
   ngtcp2_ssize outlen;
-  uint8_t *outpos = ctx->pktbuf;
+  uint8_t *outpos = ctx->q.pktbuf;
   size_t max_udp_payload_size =
       ngtcp2_conn_get_max_tx_udp_payload_size(ctx->qconn);
   size_t path_max_udp_payload_size =
       ngtcp2_conn_get_path_max_tx_udp_payload_size(ctx->qconn);
   size_t max_pktcnt =
-      CURLMIN(MAX_PKT_BURST, ctx->pktbuflen / max_udp_payload_size);
+      CURLMIN(MAX_PKT_BURST, ctx->q.pktbuflen / max_udp_payload_size);
   size_t pktcnt = 0;
-  size_t gsolen;
+  size_t gsolen = 0;  /* this disables gso until we have a clue */
   ngtcp2_path_storage ps;
   ngtcp2_tstamp ts = timestamp();
   ngtcp2_tstamp expiry;
@@ -1977,8 +1873,8 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
     return CURLE_SEND_ERROR;
   }
 
-  if(ctx->num_blocked_pkt) {
-    curlcode = send_blocked_pkt(cf, data);
+  if(ctx->q.num_blocked_pkt) {
+    curlcode = vquic_send_blocked_pkt(cf, data, &ctx->q);
     if(curlcode) {
       if(curlcode == CURLE_AGAIN) {
         Curl_expire(data, 1, EXPIRE_QUIC);
@@ -2015,22 +1911,24 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
                                        &ndatalen, flags, stream_id,
                                        (const ngtcp2_vec *)vec, veccnt, ts);
     if(outlen == 0) {
-      if(outpos != ctx->pktbuf) {
-        curlcode = send_packet(cf, data, ctx->pktbuf,
-                               outpos - ctx->pktbuf, gsolen, &sent);
+      /* ngtcp2 does not want to send more packets, if the buffer is
+       * not empty, send that now */
+      if(outpos != ctx->q.pktbuf) {
+        curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+                               outpos - ctx->q.pktbuf, gsolen, &sent);
         if(curlcode) {
           if(curlcode == CURLE_AGAIN) {
-            push_blocked_pkt(cf, ctx->pktbuf + sent,
-                             outpos - ctx->pktbuf - sent,
-                             gsolen);
+            vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent,
+                                   outpos - ctx->q.pktbuf - sent,
+                                   gsolen);
             Curl_expire(data, 1, EXPIRE_QUIC);
             return CURLE_OK;
           }
           return curlcode;
         }
       }
-
-      break;
+      /* done for now */
+      goto out;
     }
     if(outlen < 0) {
       switch(outlen) {
@@ -2043,6 +1941,8 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
         nghttp3_conn_shutdown_stream_write(ctx->h3conn, stream_id);
         continue;
       case NGTCP2_ERR_WRITE_MORE:
+        /* ngtcp2 wants to send more. update the flow of the stream whose data
+         * is in the buffer and continue */
         assert(ndatalen >= 0);
         rv = nghttp3_conn_add_write_offset(ctx->h3conn, stream_id, ndatalen);
         if(rv) {
@@ -2061,6 +1961,7 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
       }
     }
     else if(ndatalen >= 0) {
+      /* ngtcp2 thinks it has added all it wants. Update the stream  */
       rv = nghttp3_conn_add_write_offset(ctx->h3conn, stream_id, ndatalen);
       if(rv) {
         failf(data, "nghttp3_conn_add_write_offset returned error: %s\n",
@@ -2069,64 +1970,74 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
       }
     }
 
+    /* advance to the end of the buffered packet data */
     outpos += outlen;
 
     if(pktcnt == 0) {
+      /* first packet buffer chunk. use this as gsolen. It's how ngtcp2
+       * indicates the intended segment size. */
       gsolen = outlen;
     }
     else if((size_t)outlen > gsolen ||
-            (gsolen > path_max_udp_payload_size &&
-             (size_t)outlen != gsolen)) {
+            (gsolen > path_max_udp_payload_size && (size_t)outlen != gsolen)) {
       /* Packet larger than path_max_udp_payload_size is PMTUD probe
          packet and it might not be sent because of EMSGSIZE. Send
          them separately to minimize the loss. */
-      curlcode = send_packet(cf, data, ctx->pktbuf,
-                             outpos - outlen - ctx->pktbuf, gsolen, &sent);
+      /* send the pktbuf *before* the last addition */
+      curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+                             outpos - outlen - ctx->q.pktbuf, gsolen, &sent);
       if(curlcode) {
         if(curlcode == CURLE_AGAIN) {
-          push_blocked_pkt(cf, ctx->pktbuf + sent,
-                           outpos - outlen - ctx->pktbuf - sent, gsolen);
-          push_blocked_pkt(cf, outpos - outlen, outlen, outlen);
+          /* blocked, add the pktbuf *before* and *at* the last addition
+           * separately to the blocked packages */
+          vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent,
+                           outpos - outlen - ctx->q.pktbuf - sent, gsolen);
+          vquic_push_blocked_pkt(cf, &ctx->q, outpos - outlen, outlen, outlen);
           Curl_expire(data, 1, EXPIRE_QUIC);
           return CURLE_OK;
         }
         return curlcode;
       }
-      curlcode = send_packet(cf, data, outpos - outlen, outlen,
-                             outlen, &sent);
+      /* send the pktbuf *at* the last addition */
+      curlcode = vquic_send_packet(cf, data, &ctx->q, outpos - outlen, outlen,
+                                   outlen, &sent);
       if(curlcode) {
         if(curlcode == CURLE_AGAIN) {
           assert(0 == sent);
-          push_blocked_pkt(cf, outpos - outlen, outlen, outlen);
+          vquic_push_blocked_pkt(cf, &ctx->q, outpos - outlen, outlen, outlen);
           Curl_expire(data, 1, EXPIRE_QUIC);
           return CURLE_OK;
         }
         return curlcode;
       }
-
+      /* pktbuf has been completely sent */
       pktcnt = 0;
-      outpos = ctx->pktbuf;
+      outpos = ctx->q.pktbuf;
       continue;
     }
 
     if(++pktcnt >= max_pktcnt || (size_t)outlen < gsolen) {
-      curlcode = send_packet(cf, data, ctx->pktbuf,
-                             outpos - ctx->pktbuf, gsolen, &sent);
+      /* enough packets or last one is shorter than the intended
+       * segment size, indicating that it is time to send. */
+      curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+                                   outpos - ctx->q.pktbuf, gsolen, &sent);
       if(curlcode) {
         if(curlcode == CURLE_AGAIN) {
-          push_blocked_pkt(cf, ctx->pktbuf + sent, outpos - ctx->pktbuf - sent,
-                           gsolen);
+          vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent,
+                                 outpos - ctx->q.pktbuf - sent, gsolen);
           Curl_expire(data, 1, EXPIRE_QUIC);
           return CURLE_OK;
         }
         return curlcode;
       }
-
+      /* pktbuf has been completely sent */
       pktcnt = 0;
-      outpos = ctx->pktbuf;
+      outpos = ctx->q.pktbuf;
     }
   }
 
+out:
+  /* non-errored exit. check when we should run again. */
   expiry = ngtcp2_conn_get_expiry(ctx->qconn);
   if(expiry != UINT64_MAX) {
     if(expiry <= ts) {
@@ -2176,16 +2087,23 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
     struct HTTP *stream = data->req.p.http;
     Curl_dyn_free(&stream->overflow);
     free(stream->h3out);
+#ifdef DEBUGBUILD
+  if(ctx->qconn) {
+    ngtcp2_conn_stat stat;
+    ngtcp2_conn_get_conn_stat(ctx->qconn, &stat);
+    DEBUGF(LOG_CF(data, cf, "ngtcp2 conn stat: cwnd=%" PRIu64 ", "
+                  "max_tx_payload=%zu",
+                  stat.cwnd, stat.max_tx_udp_payload_size));
+  }
+#endif
     break;
   }
-
   case CF_CTRL_DATA_DONE_SEND: {
     struct HTTP *stream = data->req.p.http;
     stream->upload_done = TRUE;
     (void)nghttp3_conn_resume_stream(ctx->h3conn, stream->stream3_id);
     break;
   }
-
   case CF_CTRL_DATA_IDLE:
     if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) {
       if(cf_flush_egress(cf, data)) {
@@ -2234,7 +2152,7 @@ static void cf_ngtcp2_ctx_clear(struct cf_ngtcp2_ctx *ctx)
   if(ctx->sslctx)
     wolfSSL_CTX_free(ctx->sslctx);
 #endif
-  free(ctx->pktbuf);
+  vquic_ctx_free(&ctx->q);
   if(ctx->h3conn)
     nghttp3_conn_del(ctx->h3conn);
   if(ctx->qconn)
@@ -2262,7 +2180,7 @@ static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data)
                                             (uint8_t *)buffer, sizeof(buffer),
                                             &ctx->last_error, ts);
     if(rc > 0) {
-      while((send(ctx->sockfd, buffer, rc, 0) == -1) &&
+      while((send(ctx->q.sockfd, buffer, rc, 0) == -1) &&
             SOCKERRNO == EINTR);
     }
 
@@ -2298,7 +2216,6 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
   int rc;
   int rv;
   CURLcode result;
-  ngtcp2_path path; /* TODO: this must be initialized properly */
   const struct Curl_sockaddr_ex *sockaddr;
   int qfd;
 
@@ -2335,19 +2252,27 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
   ctx->qlogfd = qfd; /* -1 if failure above */
   quic_settings(ctx, data);
 
-  Curl_cf_socket_peek(cf->next, data, &ctx->sockfd,
+  result = vquic_ctx_init(&ctx->q,
+                          NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE * MAX_PKT_BURST);
+  if(result)
+    return result;
+
+  Curl_cf_socket_peek(cf->next, data, &ctx->q.sockfd,
                       &sockaddr, NULL, NULL, NULL, NULL);
-  ctx->local_addrlen = sizeof(ctx->local_addr);
-  rv = getsockname(ctx->sockfd, (struct sockaddr *)&ctx->local_addr,
-                   &ctx->local_addrlen);
+  ctx->q.local_addrlen = sizeof(ctx->q.local_addr);
+  rv = getsockname(ctx->q.sockfd, (struct sockaddr *)&ctx->q.local_addr,
+                   &ctx->q.local_addrlen);
   if(rv == -1)
     return CURLE_QUIC_CONNECT_ERROR;
 
-  ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->local_addr,
-                   ctx->local_addrlen);
-  ngtcp2_addr_init(&path.remote, &sockaddr->sa_addr, sockaddr->addrlen);
+  ngtcp2_addr_init(&ctx->connected_path.local,
+                   (struct sockaddr *)&ctx->q.local_addr,
+                   ctx->q.local_addrlen);
+  ngtcp2_addr_init(&ctx->connected_path.remote,
+                   &sockaddr->sa_addr, sockaddr->addrlen);
 
-  rc = ngtcp2_conn_client_new(&ctx->qconn, &ctx->dcid, &ctx->scid, &path,
+  rc = ngtcp2_conn_client_new(&ctx->qconn, &ctx->dcid, &ctx->scid,
+                              &ctx->connected_path,
                               NGTCP2_PROTO_VER_V1, &ng_callbacks,
                               &ctx->settings, &ctx->transport_params,
                               NULL, cf);
@@ -2362,24 +2287,6 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
 
   ngtcp2_connection_close_error_default(&ctx->last_error);
 
-#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG)
-  ctx->no_gso = FALSE;
-#else
-  ctx->no_gso = TRUE;
-#endif
-
-  ctx->num_blocked_pkt = 0;
-  ctx->num_blocked_pkt_sent = 0;
-  memset(&ctx->blocked_pkt, 0, sizeof(ctx->blocked_pkt));
-
-  ctx->pktbuflen = NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE * MAX_PKT_BURST;
-  ctx->pktbuf = malloc(ctx->pktbuflen);
-  if(!ctx->pktbuf) {
-    ngtcp2_conn_del(ctx->qconn);
-    ctx->qconn = NULL;
-    return CURLE_OUT_OF_MEMORY;
-  }
-
   ctx->conn_ref.get_conn = get_conn;
   ctx->conn_ref.user_data = cf;
 
index f1bcd70b70b4bbcf03257fe91ebcc675767c49a3..54408d75a35943400572aa77434bc30f324f99e3 100644 (file)
@@ -40,6 +40,7 @@
 #include "progress.h"
 #include "strerror.h"
 #include "vquic.h"
+#include "vquic_int.h"
 #include "curl_quiche.h"
 #include "transfer.h"
 #include "h2h3.h"
@@ -56,6 +57,9 @@
 #define QUIC_MAX_DATA (1*1024*1024)
 #define QUIC_IDLE_TIMEOUT (60 * 1000) /* milliseconds */
 
+/* how many UDP packets to send max in one call */
+#define MAX_PKT_BURST 10
+#define MAX_UDP_PAYLOAD_SIZE  1452
 
 /*
  * Store quiche version info in this buffer.
@@ -128,14 +132,11 @@ struct quic_handshake {
 
 struct h3_event_node {
   struct h3_event_node *next;
-  int64_t stream3_id;
   quiche_h3_event *ev;
 };
 
 struct cf_quiche_ctx {
-  curl_socket_t sockfd;
-  struct sockaddr_storage local_addr;
-  socklen_t local_addrlen;
+  struct cf_quic_ctx q;
   quiche_conn *qconn;
   quiche_config *cfg;
   quiche_h3_conn *h3c;
@@ -143,12 +144,12 @@ struct cf_quiche_ctx {
   uint8_t scid[QUICHE_MAX_CONN_ID_LEN];
   SSL_CTX *sslctx;
   SSL *ssl;
-  struct h3_event_node *pending;
-  struct curltime connect_started; /* time the current attempt started */
-  struct curltime handshake_done;    /* time connect handshake finished */
-  int first_reply_ms;              /* ms since first data arrived */
-  struct curltime reconnect_at;    /* time the next attempt should start */
-  bool goaway;
+  struct curltime started_at;        /* time the current attempt started */
+  struct curltime handshake_at;      /* time connect handshake finished */
+  struct curltime first_byte_at;     /* when first byte was recvd */
+  struct curltime reconnect_at;      /* time the next attempt should start */
+  BIT(goaway);                       /* got GOAWAY from server */
+  BIT(got_first_byte);               /* if first byte was received */
 };
 
 
@@ -160,24 +161,25 @@ static void quiche_debug_log(const char *line, void *argp)
 }
 #endif
 
-static void h3_clear_pending(struct cf_quiche_ctx *ctx)
+static void h3_clear_pending(struct Curl_easy *data)
 {
-  if(ctx->pending) {
+  struct HTTP *stream = data->req.p.http;
+
+  if(stream->pending) {
     struct h3_event_node *node, *next;
-    for(node = ctx->pending; node; node = next) {
+    for(node = stream->pending; node; node = next) {
       next = node->next;
       quiche_h3_event_free(node->ev);
       free(node);
     }
-    ctx->pending = NULL;
+    stream->pending = NULL;
   }
 }
 
 static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx)
 {
   if(ctx) {
-    if(ctx->pending)
-      h3_clear_pending(ctx);
+    vquic_ctx_free(&ctx->q);
     if(ctx->qconn)
       quiche_conn_free(ctx->qconn);
     if(ctx->h3config)
@@ -187,19 +189,23 @@ static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx)
     if(ctx->cfg)
       quiche_config_free(ctx->cfg);
     memset(ctx, 0, sizeof(*ctx));
-    ctx->first_reply_ms = -1;
   }
 }
 
+static void notify_drain(struct Curl_cfilter *cf,
+                         struct Curl_easy *data)
+{
+  (void)cf;
+  data->state.drain = 1;
+  Curl_expire(data, 0, EXPIRE_RUN_NOW);
+}
+
 static CURLcode h3_add_event(struct Curl_cfilter *cf,
                              struct Curl_easy *data,
-                             int64_t stream3_id, quiche_h3_event *ev,
-                             size_t *pqlen)
+                             int64_t stream3_id, quiche_h3_event *ev)
 {
-  struct cf_quiche_ctx *ctx = cf->ctx;
   struct Curl_easy *mdata;
-  struct h3_event_node *node, **pnext = &ctx->pending;
-  size_t qlen;
+  struct h3_event_node *node, **pnext;
 
   DEBUGASSERT(data->multi);
   for(mdata = data->multi->easyp; mdata; mdata = mdata->next) {
@@ -209,31 +215,25 @@ static CURLcode h3_add_event(struct Curl_cfilter *cf,
   }
 
   if(!mdata) {
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] event discarded, easy handle "
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] event discarded, easy handle "
                   "not found", stream3_id));
     quiche_h3_event_free(ev);
-    *pqlen = 0;
     return CURLE_OK;
   }
 
   node = calloc(sizeof(*node), 1);
-  if(!node)
+  if(!node) {
+    quiche_h3_event_free(ev);
     return CURLE_OUT_OF_MEMORY;
-  node->stream3_id = stream3_id;
+  }
   node->ev = ev;
   /* append to process them in order of arrival */
-  qlen = 0;
+  pnext = &mdata->req.p.http->pending;
   while(*pnext) {
     pnext = &((*pnext)->next);
-    ++qlen;
   }
   *pnext = node;
-  *pqlen = qlen + 1;
-  if(!mdata->state.drain) {
-    /* tell the multi handle that this data needs processing */
-    mdata->state.drain = 1;
-    Curl_expire(mdata, 0, EXPIRE_RUN_NOW);
-  }
+  notify_drain(cf, mdata);
   return CURLE_OK;
 }
 
@@ -270,6 +270,71 @@ static int cb_each_header(uint8_t *name, size_t name_len,
   return 0;
 }
 
+static ssize_t cf_recv_body(struct Curl_cfilter *cf,
+                                struct Curl_easy *data,
+                                char *buf, size_t len,
+                                CURLcode *err)
+{
+  struct cf_quiche_ctx *ctx = cf->ctx;
+  struct HTTP *stream = data->req.p.http;
+  ssize_t nread;
+  size_t offset = 0;
+
+  if(!stream->firstbody) {
+    /* add a header-body separator CRLF */
+    offset = 2;
+  }
+  nread = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->stream3_id,
+                              (unsigned char *)buf + offset, len - offset);
+  if(nread >= 0) {
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][DATA] len=%zd",
+                  stream->stream3_id, nread));
+    if(!stream->firstbody) {
+      stream->firstbody = TRUE;
+      buf[0] = '\r';
+      buf[1] = '\n';
+      nread += offset;
+    }
+  }
+  else if(nread == -1) {
+    *err = CURLE_AGAIN;
+    stream->h3_recving_data = FALSE;
+  }
+  else {
+    failf(data, "Error %zd in HTTP/3 response body for stream[%"PRId64"]",
+          nread, stream->stream3_id);
+    stream->closed = TRUE;
+    stream->reset = TRUE;
+    streamclose(cf->conn, "Reset of stream");
+    stream->h3_recving_data = FALSE;
+    nread = -1;
+    *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+  }
+  return nread;
+}
+
+#ifdef DEBUGBUILD
+static const char *cf_ev_name(quiche_h3_event *ev)
+{
+  switch(quiche_h3_event_type(ev)) {
+  case QUICHE_H3_EVENT_HEADERS:
+    return "HEADERS";
+  case QUICHE_H3_EVENT_DATA:
+    return "DATA";
+  case QUICHE_H3_EVENT_RESET:
+    return "RESET";
+  case QUICHE_H3_EVENT_FINISHED:
+    return "FINISHED";
+  case QUICHE_H3_EVENT_GOAWAY:
+    return "GOAWAY";
+  default:
+    return "Unknown";
+  }
+}
+#else
+#define cf_ev_name(x)   ""
+#endif
+
 static ssize_t h3_process_event(struct Curl_cfilter *cf,
                                 struct Curl_easy *data,
                                 char *buf, size_t len,
@@ -277,15 +342,14 @@ static ssize_t h3_process_event(struct Curl_cfilter *cf,
                                 quiche_h3_event *ev,
                                 CURLcode *err)
 {
-  struct cf_quiche_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
-  ssize_t recvd = -1;
-  ssize_t rcode;
+  ssize_t recvd = 0;
   int rc;
   struct h3h1header headers;
 
   DEBUGASSERT(stream3_id == stream->stream3_id);
 
+  *err = CURLE_OK;
   switch(quiche_h3_event_type(ev)) {
   case QUICHE_H3_EVENT_HEADERS:
     stream->h3_got_header = TRUE;
@@ -301,67 +365,42 @@ static ssize_t h3_process_event(struct Curl_cfilter *cf,
       break;
     }
     recvd = headers.nlen;
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, HEADERS len=%zd",
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][HEADERS] len=%zd",
                   stream3_id, recvd));
     break;
 
   case QUICHE_H3_EVENT_DATA:
-    if(!stream->firstbody) {
-      /* add a header-body separator CRLF */
-      buf[0] = '\r';
-      buf[1] = '\n';
-      buf += 2;
-      len -= 2;
-      stream->firstbody = TRUE;
-      recvd = 2; /* two bytes already */
-    }
-    else
+    DEBUGASSERT(!stream->closed);
+    stream->h3_recving_data = TRUE;
+    recvd = cf_recv_body(cf, data, buf, len, err);
+    if(recvd < 0) {
+      if(*err != CURLE_AGAIN)
+        return -1;
       recvd = 0;
-    rcode = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream3_id,
-                               (unsigned char *)buf, len);
-    if(rcode <= 0) {
-      failf(data, "Error %zd in HTTP/3 response body for stream[%"PRId64"]",
-            rcode, stream3_id);
-      recvd = -1;
-      *err = CURLE_AGAIN;
-      break;
     }
-    stream->h3_recving_data = TRUE;
-    recvd += rcode;
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, DATA len=%zd",
-                  stream3_id, rcode));
     break;
 
   case QUICHE_H3_EVENT_RESET:
-    if(quiche_conn_is_draining(ctx->qconn) && !stream->h3_got_header) {
-      DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] stream RESET without response, "
-                    "connection is draining", stream3_id));
-    }
-    else {
-      DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, RESET", stream3_id));
-    }
-    streamclose(cf->conn, "Stream reset");
-    *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
-    recvd = -1;
+      DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][RESET]", stream3_id));
+    stream->closed = TRUE;
+    stream->reset = TRUE;
+    /* streamclose(cf->conn, "Reset of stream");*/
+    stream->h3_recving_data = FALSE;
     break;
 
   case QUICHE_H3_EVENT_FINISHED:
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, FINISHED", stream3_id));
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][FINISHED]", stream3_id));
     stream->closed = TRUE;
-    streamclose(cf->conn, "End of stream");
-    *err = CURLE_OK;
-    recvd = 0; /* end of stream */
+    /* streamclose(cf->conn, "End of stream");*/
+    stream->h3_recving_data = FALSE;
     break;
 
   case QUICHE_H3_EVENT_GOAWAY:
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, GOAWAY", stream3_id));
-    recvd = -1;
-    *err = CURLE_AGAIN;
-    ctx->goaway = TRUE;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][GOAWAY]", stream3_id));
     break;
 
   default:
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, unhandled event %d",
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] recv, unhandled event %d",
                   stream3_id, quiche_h3_event_type(ev)));
     break;
   }
@@ -373,34 +412,28 @@ static ssize_t h3_process_pending(struct Curl_cfilter *cf,
                                   char *buf, size_t len,
                                   CURLcode *err)
 {
-  struct cf_quiche_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
-  struct h3_event_node *node = ctx->pending, **pnext = &ctx->pending;
+  struct h3_event_node *node = stream->pending, **pnext = &stream->pending;
   ssize_t recvd = 0, erecvd;
 
+  *err = CURLE_OK;
   DEBUGASSERT(stream);
-  while(node) {
-    if(node->stream3_id == stream->stream3_id) {
-      erecvd = h3_process_event(cf, data, buf, len,
-                                node->stream3_id, node->ev, err);
-      quiche_h3_event_free(node->ev);
-      *pnext = node->next;
-      free(node);
-      node = *pnext;
-      if(erecvd < 0) {
-        recvd = erecvd;
-        break;
-      }
-      recvd += erecvd;
-      if(erecvd > INT_MAX || (size_t)erecvd >= len)
-        break;
-      buf += erecvd;
-      len -= erecvd;
-    }
-    else {
-      pnext = &node->next;
-      node = node->next;
+  while(node && len) {
+    erecvd = h3_process_event(cf, data, buf, len,
+                              stream->stream3_id, node->ev, err);
+    quiche_h3_event_free(node->ev);
+    *pnext = node->next;
+    free(node);
+    node = *pnext;
+    if(erecvd < 0) {
+      DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] process event -> %d",
+                    stream->stream3_id, *err));
+      return erecvd;
     }
+    recvd += erecvd;
+    *err = CURLE_OK;
+    buf += erecvd;
+    len -= erecvd;
   }
   return recvd;
 }
@@ -409,12 +442,14 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
                                    struct Curl_easy *data)
 {
   struct cf_quiche_ctx *ctx = cf->ctx;
-  ssize_t recvd;
-  uint8_t *buf = (uint8_t *)data->state.buffer;
-  size_t bufsize = data->set.buffer_size;
-  struct sockaddr_storage from;
-  socklen_t from_len;
+  int64_t stream3_id = data->req.p.http? data->req.p.http->stream3_id : -1;
+  uint8_t buf[65536];
+  size_t bufsize = sizeof(buf);
+  struct sockaddr_storage remote_addr;
+  socklen_t remote_addrlen;
   quiche_recv_info recv_info;
+  ssize_t recvd, nread;
+  ssize_t total = 0, pkts = 0;
 
   DEBUGASSERT(ctx->qconn);
 
@@ -422,14 +457,16 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
   quiche_conn_on_timeout(ctx->qconn);
 
   do {
-    from_len = sizeof(from);
-
-    recvd = recvfrom(ctx->sockfd, buf, bufsize, 0,
-                     (struct sockaddr *)&from, &from_len);
-
+    remote_addrlen = sizeof(remote_addr);
+    while((recvd = recvfrom(ctx->q.sockfd, (char *)buf, bufsize, 0,
+                            (struct sockaddr *)&remote_addr,
+                            &remote_addrlen)) == -1 &&
+          SOCKERRNO == EINTR)
+      ;
     if(recvd < 0) {
-      if((SOCKERRNO == EAGAIN) || (SOCKERRNO == EWOULDBLOCK))
-        goto out;
+      if((SOCKERRNO == EAGAIN) || (SOCKERRNO == EWOULDBLOCK)) {
+        break;
+      }
       if(SOCKERRNO == ECONNREFUSED) {
         const char *r_ip;
         int r_port;
@@ -440,42 +477,50 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
         return CURLE_COULDNT_CONNECT;
       }
       failf(data, "quiche: recvfrom() unexpectedly returned %zd "
-            "(errno: %d, socket %d)", recvd, SOCKERRNO, ctx->sockfd);
+            "(errno: %d, socket %d)", recvd, SOCKERRNO, ctx->q.sockfd);
       return CURLE_RECV_ERROR;
     }
 
-    DEBUGF(LOG_CF(data, cf, "ingress, recvd %zd bytes", recvd));
-    recv_info.from = (struct sockaddr *) &from;
-    recv_info.from_len = from_len;
-    recv_info.to = (struct sockaddr *) &ctx->local_addr;
-    recv_info.to_len = ctx->local_addrlen;
-
-    recvd = quiche_conn_recv(ctx->qconn, buf, recvd, &recv_info);
-    if(recvd == QUICHE_ERR_DONE)
-      goto out;
-
-    if(recvd < 0) {
-      if(QUICHE_ERR_TLS_FAIL == recvd) {
+    total += recvd;
+    ++pkts;
+    if(recvd > 0 && !ctx->got_first_byte) {
+      ctx->first_byte_at = Curl_now();
+      ctx->got_first_byte = TRUE;
+    }
+    recv_info.from = (struct sockaddr *) &remote_addr;
+    recv_info.from_len = remote_addrlen;
+    recv_info.to = (struct sockaddr *) &ctx->q.local_addr;
+    recv_info.to_len = ctx->q.local_addrlen;
+
+    nread = quiche_conn_recv(ctx->qconn, buf, recvd, &recv_info);
+    if(nread < 0) {
+      if(QUICHE_ERR_DONE == nread) {
+        DEBUGF(LOG_CF(data, cf, "ingress, quiche is DONE"));
+        return CURLE_OK;
+      }
+      else if(QUICHE_ERR_TLS_FAIL == nread) {
         long verify_ok = SSL_get_verify_result(ctx->ssl);
         if(verify_ok != X509_V_OK) {
           failf(data, "SSL certificate problem: %s",
                 X509_verify_cert_error_string(verify_ok));
-
           return CURLE_PEER_FAILED_VERIFICATION;
         }
       }
-
-      failf(data, "quiche_conn_recv() == %zd", recvd);
-
-      return CURLE_RECV_ERROR;
+      else {
+        failf(data, "quiche_conn_recv() == %zd", nread);
+        return CURLE_RECV_ERROR;
+      }
     }
-    if(ctx->first_reply_ms < 0) {
-      timediff_t ms = Curl_timediff(Curl_now(), ctx->connect_started);
-      ctx->first_reply_ms = (ms < INT_MAX)? (int)ms : INT_MAX;
+    else if(nread < recvd) {
+      DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] ingress, quiche only "
+                    "accepted %zd/%zd bytes",
+                    stream3_id, nread, recvd));
     }
-  } while(1);
 
-out:
+  } while(pkts < 1000); /* arbitrary */
+
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] ingress, recvd %zd bytes "
+                "in %zd packets", stream3_id, total, pkts));
   return CURLE_OK;
 }
 
@@ -487,129 +532,236 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
                                 struct Curl_easy *data)
 {
   struct cf_quiche_ctx *ctx = cf->ctx;
-  ssize_t sent;
-  uint8_t out[1200];
-  int64_t timeout_ns;
+  int64_t stream3_id = data->req.p.http? data->req.p.http->stream3_id : -1;
   quiche_send_info send_info;
+  ssize_t outlen, total_len = 0;
+  size_t max_udp_payload_size =
+    quiche_conn_max_send_udp_payload_size(ctx->qconn);
+  size_t gsolen = max_udp_payload_size;
+  size_t sent, pktcnt = 0;
+  CURLcode result;
+  int64_t timeout_ns;
 
-  do {
-    sent = quiche_conn_send(ctx->qconn, out, sizeof(out), &send_info);
-    if(sent == QUICHE_ERR_DONE)
-      break;
+  ctx->q.no_gso = TRUE;
+  if(ctx->q.num_blocked_pkt) {
+    result = vquic_send_blocked_pkt(cf, data, &ctx->q);
+    if(result) {
+      if(result == CURLE_AGAIN) {
+        DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, still not "
+                      "able to send blocked packet", stream3_id));
+        Curl_expire(data, 1, EXPIRE_QUIC);
+        return CURLE_OK;
+      }
+      goto out;
+    }
+  }
 
-    if(sent < 0) {
-      failf(data, "quiche_conn_send returned %zd", sent);
-      return CURLE_SEND_ERROR;
+  for(;;) {
+    outlen = quiche_conn_send(ctx->qconn, ctx->q.pktbuf, max_udp_payload_size,
+                              &send_info);
+    if(outlen == QUICHE_ERR_DONE) {
+      result = CURLE_OK;
+      goto out;
     }
 
-    DEBUGF(LOG_CF(data, cf, "egress, send %zu bytes", sent));
-    sent = send(ctx->sockfd, out, sent, 0);
-    if(sent < 0) {
-      failf(data, "send() returned %zd", sent);
-      return CURLE_SEND_ERROR;
+    if(outlen < 0) {
+      failf(data, "quiche_conn_send returned %zd", outlen);
+      result = CURLE_SEND_ERROR;
+      goto out;
     }
-  } while(1);
 
-  /* time until the next timeout event, as nanoseconds. */
-  timeout_ns = quiche_conn_timeout_as_nanos(ctx->qconn);
-  if(timeout_ns)
-    /* expire uses milliseconds */
-    Curl_expire(data, (timeout_ns + 999999) / 1000000, EXPIRE_QUIC);
+    /* send the pktbuf *before* the last addition */
+    result = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+                               outlen, gsolen, &sent);
+    ++pktcnt;
+    total_len += outlen;
+    if(result) {
+      if(result == CURLE_AGAIN) {
+        /* blocked, add the pktbuf *before* and *at* the last addition
+         * separately to the blocked packages */
+        DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, pushing blocked "
+                      "packet with %zd bytes", stream3_id, outlen));
+        vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf, outlen, gsolen);
+        Curl_expire(data, 1, EXPIRE_QUIC);
+        return CURLE_OK;
+      }
+      goto out;
+    }
+  }
 
-  return CURLE_OK;
+out:
+  timeout_ns = quiche_conn_timeout_as_nanos(ctx->qconn);
+  if(timeout_ns % 1000000)
+    timeout_ns += 1000000;
+    /* expire resolution is milliseconds */
+  Curl_expire(data, (timeout_ns / 1000000), EXPIRE_QUIC);
+  if(pktcnt)
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, sent %zd packets "
+                  "with %zd bytes", stream3_id, pktcnt, total_len));
+  return result;
 }
 
-static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
-                              char *buf, size_t len, CURLcode *err)
+static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
+                                  struct Curl_easy *data,
+                                  CURLcode *err)
 {
-  struct cf_quiche_ctx *ctx = cf->ctx;
-  ssize_t recvd = -1;
-  ssize_t rcode;
-  quiche_h3_event *ev;
   struct HTTP *stream = data->req.p.http;
+  ssize_t nread = -1;
 
-  *err = CURLE_AGAIN;
-  /* process any pending events for `data` first. if there are,
-   * return so the transfer can handle those. We do not want to
-   * progress ingress while events are pending here. */
-  recvd = h3_process_pending(cf, data, buf, len, err);
-  if(recvd < 0) {
-    goto out;
-  }
-  else if(recvd > 0) {
-    *err = CURLE_OK;
+  if(stream->reset) {
+    failf(data,
+          "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id);
+    *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, was reset -> %d",
+                  stream->stream3_id, *err));
     goto out;
   }
-  recvd = -1;
 
-  if(cf_process_ingress(cf, data)) {
-    DEBUGF(LOG_CF(data, cf, "h3_stream_recv returns on ingress"));
+  if(!stream->h3_got_header) {
+    failf(data,
+          "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting"
+          " all response header fields, treated as error",
+          stream->stream3_id);
+    /* *err = CURLE_PARTIAL_FILE; */
     *err = CURLE_RECV_ERROR;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed incomplete"
+                  " -> %d", stream->stream3_id, *err));
     goto out;
   }
-
-  if(stream->h3_recving_data) {
-    /* body receiving state */
-    rcode = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->stream3_id,
-                                (unsigned char *)buf, len);
-    if(rcode <= 0) {
-      stream->h3_recving_data = FALSE;
-      /* fall through into the while loop below */
-    }
-    else {
-      *err = CURLE_OK;
-      recvd = rcode;
-      goto out;
-    }
+  else {
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed ok"
+                  " -> %d", stream->stream3_id, *err));
   }
+  *err = CURLE_OK;
+  nread = 0;
 
-  while(recvd < 0) {
+out:
+  return nread;
+}
+
+static CURLcode cf_poll_events(struct Curl_cfilter *cf,
+                               struct Curl_easy *data)
+{
+  struct cf_quiche_ctx *ctx = cf->ctx;
+  struct HTTP *stream = data->req.p.http;
+  quiche_h3_event *ev;
+
+  /* Take in the events and distribute them to the transfers. */
+  while(1) {
     int64_t stream3_id = quiche_h3_conn_poll(ctx->h3c, ctx->qconn, &ev);
-    if(stream3_id < 0)
+    if(stream3_id < 0) {
       /* nothing more to do */
       break;
+    }
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] recv, queue event %s "
+                  "for [h3sid=%"PRId64"]",
+                  stream? stream->stream3_id : -1, cf_ev_name(ev),
+                  stream3_id));
+    if(h3_add_event(cf, data, stream3_id, ev) != CURLE_OK) {
+      return CURLE_OUT_OF_MEMORY;
+    }
+  }
+  return CURLE_OK;
+}
+
+static ssize_t cf_recv_transfer_data(struct Curl_cfilter *cf,
+                                     struct Curl_easy *data,
+                                      char *buf, size_t len,
+                                      CURLcode *err)
+{
+  struct HTTP *stream = data->req.p.http;
+  ssize_t recvd = -1;
+  size_t offset = 0;
 
-    if(stream3_id == stream->stream3_id) {
-      recvd = h3_process_event(cf, data, buf, len, stream3_id, ev, err);
-      quiche_h3_event_free(ev);
+  if(stream->h3_recving_data) {
+    /* try receiving body first */
+    recvd = cf_recv_body(cf, data, buf, len, err);
+    if(recvd < 0) {
+      if(*err != CURLE_AGAIN)
+        return -1;
+      recvd = 0;
     }
-    else {
-      size_t qlen;
-      /* event for another transfer, preserver for later */
-      DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, queue event "
-                    "for h3[%"PRId64"]", stream->stream3_id, stream3_id));
-      if(h3_add_event(cf, data, stream3_id, ev, &qlen) != CURLE_OK) {
-        *err = CURLE_OUT_OF_MEMORY;
-        goto out;
-      }
-      if(qlen > 20) {
-        Curl_expire(data, 0, EXPIRE_QUIC);
-        break;
-      }
+    if(recvd > 0) {
+      offset = recvd;
     }
   }
 
-  if(cf_flush_egress(cf, data)) {
-    DEBUGF(LOG_CF(data, cf, "recv(), flush egress failed"));
-    *err = CURLE_SEND_ERROR;
-    recvd = -1;
-    goto out;
+  if(offset < len && stream->pending) {
+    /* process any pending events for `data` first. if there are,
+     * return so the transfer can handle those. We do not want to
+     * progress ingress while events are pending here. */
+    recvd = h3_process_pending(cf, data, buf + offset, len - offset, err);
+    if(recvd < 0) {
+      if(*err != CURLE_AGAIN)
+        return -1;
+      recvd = 0;
+    }
+    if(recvd > 0) {
+      offset += recvd;
+    }
   }
 
-  if(recvd >= 0) {
-    /* Get this called again to drain the event queue */
-    Curl_expire(data, 0, EXPIRE_QUIC);
+  if(offset) {
     *err = CURLE_OK;
+    return offset;
   }
-  else if(stream->closed) {
-    *err = CURLE_OK;
+  *err = CURLE_AGAIN;
+  return 0;
+}
+
+static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
+                              char *buf, size_t len, CURLcode *err)
+{
+  struct HTTP *stream = data->req.p.http;
+  ssize_t recvd = -1;
+
+  *err = CURLE_AGAIN;
+
+  recvd = cf_recv_transfer_data(cf, data, buf, len, err);
+  if(recvd)
+    goto out;
+  if(stream->closed) {
+    recvd = recv_closed_stream(cf, data, err);
+    goto out;
+  }
+
+  /* we did get nothing from the quiche buffers or pending events.
+   * Take in more data from the connection, any error is fatal */
+  if(cf_process_ingress(cf, data)) {
+    DEBUGF(LOG_CF(data, cf, "h3_stream_recv returns on ingress"));
+    *err = CURLE_RECV_ERROR;
     recvd = -1;
+    goto out;
+  }
+  /* poll quiche and distribute the events to the transfers */
+  *err = cf_poll_events(cf, data);
+  if(*err) {
+    recvd = -1;
+    goto out;
   }
 
+  /* try to receive again for this transfer */
+  recvd = cf_recv_transfer_data(cf, data, buf, len, err);
+  if(recvd)
+    goto out;
+  if(stream->closed) {
+    recvd = recv_closed_stream(cf, data, err);
+    goto out;
+  }
+  recvd = -1;
+  *err = CURLE_AGAIN;
+  data->state.drain = 0;
+
 out:
-  data->state.drain = (recvd >= 0) ? 1 : 0;
-  DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv -> %ld, err=%d",
-                stream->stream3_id, (long)recvd, *err));
+  if(cf_flush_egress(cf, data)) {
+    DEBUGF(LOG_CF(data, cf, "cf_recv, flush egress failed"));
+    *err = CURLE_SEND_ERROR;
+    return -1;
+  }
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] cf_recv -> %zd, err=%d",
+                stream->stream3_id, recvd, *err));
+  if(recvd > 0)
+    notify_drain(cf, data);
   return recvd;
 }
 
@@ -630,8 +782,9 @@ static CURLcode cf_http_request(struct Curl_cfilter *cf,
   CURLcode result = CURLE_OK;
   struct h2h3req *hreq = NULL;
 
-  DEBUGF(LOG_CF(data, cf, "cf_http_request %s", data->state.url));
   stream->h3req = TRUE; /* send off! */
+  stream->closed = FALSE;
+  stream->reset = FALSE;
 
   result = Curl_pseudo_headers(data, mem, len, NULL, &hreq);
   if(result)
@@ -664,30 +817,41 @@ static CURLcode cf_http_request(struct Curl_cfilter *cf,
       /* data sending without specifying the data amount up front */
       stream->upload_left = -1; /* unknown, but not zero */
 
+    stream->upload_done = !stream->upload_left;
     stream3_id = quiche_h3_send_request(ctx->h3c, ctx->qconn, nva, nheader,
-                                        stream->upload_left ? FALSE: TRUE);
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] send request %s, upload=%zu",
-                  stream3_id, data->state.url, stream->upload_left));
+                                        stream->upload_done);
     break;
   default:
+    stream->upload_left = 0;
+    stream->upload_done = TRUE;
     stream3_id = quiche_h3_send_request(ctx->h3c, ctx->qconn, nva, nheader,
                                         TRUE);
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] send request %s",
-                  stream3_id, data->state.url));
     break;
   }
 
   Curl_safefree(nva);
 
   if(stream3_id < 0) {
-    DEBUGF(LOG_CF(data, cf, "quiche_h3_send_request returned %ld",
-                  (long)stream3_id));
+    if(QUICHE_H3_ERR_STREAM_BLOCKED == stream3_id) {
+      DEBUGF(LOG_CF(data, cf, "send_request(%s, body_len=%ld) rejected "
+                    "with H3_ERR_STREAM_BLOCKED",
+                    data->state.url, (long)stream->upload_left));
+      result = CURLE_AGAIN;
+      goto fail;
+    }
+    else {
+      DEBUGF(LOG_CF(data, cf, "send_request(%s, body_len=%ld) -> %" PRId64,
+                    data->state.url, (long)stream->upload_left, stream3_id));
+    }
     result = CURLE_SEND_ERROR;
     goto fail;
   }
 
-  DEBUGF(LOG_CF(data, cf, "Using HTTP/3 Stream ID: %"PRId64, stream3_id));
   stream->stream3_id = stream3_id;
+  infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)",
+        stream3_id, (void *)data);
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s",
+                stream3_id, data->state.url));
 
   Curl_pseudo_free(hreq);
   return CURLE_OK;
@@ -703,32 +867,44 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data,
 {
   struct cf_quiche_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
-  ssize_t sent;
+  ssize_t nwritten;
+
+  DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu) start",
+                stream->h3req? stream->stream3_id : -1, len));
+  *err = cf_process_ingress(cf, data);
+  if(*err)
+    return -1;
 
-  DEBUGF(LOG_CF(data, cf, "cf_quiche_send(len=%zu) %s", len, data->state.url));
   if(!stream->h3req) {
     CURLcode result = cf_http_request(cf, data, buf, len);
     if(result) {
-      *err = CURLE_SEND_ERROR;
+      *err = result;
       return -1;
     }
-    sent = len;
+    nwritten = len;
   }
   else {
-    sent = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id,
-                               (uint8_t *)buf, len, FALSE);
-    if(sent == QUICHE_H3_ERR_DONE) {
-      sent = 0;
+    nwritten = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id,
+                                   (uint8_t *)buf, len, FALSE);
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] send body(len=%zu) -> %zd",
+                  stream->stream3_id, len, nwritten));
+    if(nwritten == QUICHE_H3_ERR_DONE) {
+      /* no error, nothing to do (flow control?) */
+      *err = CURLE_AGAIN;
+      nwritten = -1;
     }
-    else if(sent == QUICHE_H3_TRANSPORT_ERR_FINAL_SIZE) {
+    else if(nwritten == QUICHE_H3_TRANSPORT_ERR_FINAL_SIZE) {
       DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> exceeds size", len));
       *err = CURLE_SEND_ERROR;
-      return -1;
+      nwritten = -1;
     }
-    else if(sent < 0) {
-      DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> %zd", len, sent));
+    else if(nwritten < 0) {
+      DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> SEND_ERROR", len));
       *err = CURLE_SEND_ERROR;
-      return -1;
+      nwritten = -1;
+    }
+    else {
+      *err = CURLE_OK;
     }
   }
 
@@ -737,8 +913,26 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     return -1;
   }
 
-  *err = CURLE_OK;
-  return sent;
+  return nwritten;
+}
+
+static bool stream_is_writeable(struct Curl_cfilter *cf,
+                                struct Curl_easy *data)
+{
+  struct cf_quiche_ctx *ctx = cf->ctx;
+  struct HTTP *stream = data->req.p.http;
+
+  /* surely, there must be a better way */
+  quiche_stream_iter *qiter = quiche_conn_writable(ctx->qconn);
+  if(qiter) {
+    uint64_t stream_id;
+    while(quiche_stream_iter_next(qiter, &stream_id)) {
+      if(stream_id == (uint64_t)stream->stream3_id)
+        return TRUE;
+    }
+    quiche_stream_iter_free(qiter);
+  }
+  return FALSE;
 }
 
 static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
@@ -749,14 +943,15 @@ static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
   struct SingleRequest *k = &data->req;
   int rv = GETSOCK_BLANK;
 
-  socks[0] = ctx->sockfd;
+  socks[0] = ctx->q.sockfd;
 
   /* in an HTTP/3 connection we can basically always get a frame so we should
      always be ready for one */
   rv |= GETSOCK_READSOCK(0);
 
   /* we're still uploading or the HTTP/3 layer wants to send data */
-  if((k->keepon & (KEEP_SEND|KEEP_SEND_PAUSE)) == KEEP_SEND)
+  if(((k->keepon & (KEEP_SEND|KEEP_SEND_PAUSE)) == KEEP_SEND)
+     && stream_is_writeable(cf, data))
     rv |= GETSOCK_WRITESOCK(0);
 
   return rv;
@@ -769,16 +964,22 @@ static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
 static bool cf_quiche_data_pending(struct Curl_cfilter *cf,
                                    const struct Curl_easy *data)
 {
-  struct cf_quiche_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
-  struct h3_event_node *node;
 
-  for(node = ctx->pending; node; node = node->next) {
-    if(node->stream3_id == stream->stream3_id) {
-      DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
-                     "h3[%"PRId64"] has data pending", stream->stream3_id));
-      return TRUE;
-    }
+  if(stream->pending) {
+    DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
+                   "[h3sid=%"PRId64"] has event pending", stream->stream3_id));
+    return TRUE;
+  }
+  if(stream->h3_recving_data) {
+    DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
+                   "[h3sid=%"PRId64"] is receiving DATA", stream->stream3_id));
+    return TRUE;
+  }
+  if(data->state.drain) {
+    DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
+                   "[h3sid=%"PRId64"] is draining", stream->stream3_id));
+    return TRUE;
   }
   return FALSE;
 }
@@ -793,25 +994,34 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
   (void)arg1;
   (void)arg2;
   switch(event) {
+  case CF_CTRL_DATA_DONE: {
+    struct HTTP *stream = data->req.p.http;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] easy handle is %s",
+                  stream->stream3_id, arg1? "cancelled" : "done"));
+    h3_clear_pending(data);
+    break;
+  }
   case CF_CTRL_DATA_DONE_SEND: {
     struct HTTP *stream = data->req.p.http;
     ssize_t sent;
     stream->upload_done = TRUE;
     sent = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id,
                                NULL, 0, TRUE);
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] send_body FINISHED",
+                  stream->stream3_id));
     if(sent < 0)
       return CURLE_SEND_ERROR;
     break;
   }
-  case CF_CTRL_DATA_DONE: {
-    struct HTTP *stream = data->req.p.http;
-    DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] easy handle is %s",
-                  stream->stream3_id, arg1? "cancelled" : "done"));
+  case CF_CTRL_DATA_IDLE:
+    /* anything to do? */
     break;
-  }
   case CF_CTRL_CONN_REPORT_STATS:
-    if(cf->sockindex == FIRSTSOCKET)
-      Curl_pgrsTimeWas(data, TIMER_APPCONNECT, ctx->handshake_done);
+    if(cf->sockindex == FIRSTSOCKET) {
+      if(ctx->got_first_byte)
+        Curl_pgrsTimeWas(data, TIMER_CONNECT, ctx->first_byte_at);
+      Curl_pgrsTimeWas(data, TIMER_APPCONNECT, ctx->handshake_at);
+    }
     break;
   default:
     break;
@@ -882,11 +1092,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
   CURLcode result;
   const struct Curl_sockaddr_ex *sockaddr;
 
-  result = Curl_cf_socket_peek(cf->next, data, &ctx->sockfd,
-                               &sockaddr, NULL, NULL, NULL, NULL);
-  if(result)
-    return result;
-  DEBUGASSERT(ctx->sockfd != CURL_SOCKET_BAD);
+  DEBUGASSERT(ctx->q.sockfd != CURL_SOCKET_BAD);
 
 #ifdef DEBUG_QUICHE
   /* initialize debug log callback only once */
@@ -897,6 +1103,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
   }
 #endif
 
+  result = vquic_ctx_init(&ctx->q, MAX_UDP_PAYLOAD_SIZE * MAX_PKT_BURST);
+  if(result)
+    return result;
+
   ctx->cfg = quiche_config_new(QUICHE_PROTOCOL_VERSION);
   if(!ctx->cfg) {
     failf(data, "can't create quiche config");
@@ -933,16 +1143,18 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
   if(result)
     return result;
 
-  ctx->local_addrlen = sizeof(ctx->local_addr);
-  rv = getsockname(ctx->sockfd, (struct sockaddr *)&ctx->local_addr,
-                   &ctx->local_addrlen);
+  Curl_cf_socket_peek(cf->next, data, &ctx->q.sockfd,
+                      &sockaddr, NULL, NULL, NULL, NULL);
+  ctx->q.local_addrlen = sizeof(ctx->q.local_addr);
+  rv = getsockname(ctx->q.sockfd, (struct sockaddr *)&ctx->q.local_addr,
+                   &ctx->q.local_addrlen);
   if(rv == -1)
     return CURLE_QUIC_CONNECT_ERROR;
 
   ctx->qconn = quiche_conn_new_with_tls((const uint8_t *)ctx->scid,
                                       sizeof(ctx->scid), NULL, 0,
-                                      (struct sockaddr *)&ctx->local_addr,
-                                      ctx->local_addrlen,
+                                      (struct sockaddr *)&ctx->q.local_addr,
+                                      ctx->q.local_addrlen,
                                       &sockaddr->sa_addr, sockaddr->addrlen,
                                       ctx->cfg, ctx->ssl, false);
   if(!ctx->qconn) {
@@ -1016,7 +1228,10 @@ static CURLcode cf_quiche_connect(struct Curl_cfilter *cf,
     result = cf_connect_start(cf, data);
     if(result)
       goto out;
-    ctx->connect_started = now;
+    ctx->started_at = now;
+    result = cf_flush_egress(cf, data);
+    /* we do not expect to be able to recv anything yet */
+    goto out;
   }
 
   result = cf_process_ingress(cf, data);
@@ -1029,8 +1244,8 @@ static CURLcode cf_quiche_connect(struct Curl_cfilter *cf,
 
   if(quiche_conn_is_established(ctx->qconn)) {
     DEBUGF(LOG_CF(data, cf, "handshake complete after %dms",
-           (int)Curl_timediff(now, ctx->connect_started)));
-    ctx->handshake_done = now;
+           (int)Curl_timediff(now, ctx->started_at)));
+    ctx->handshake_at = now;
     result = cf_verify_peer(cf, data);
     if(!result) {
       DEBUGF(LOG_CF(data, cf, "peer verified"));
@@ -1124,10 +1339,13 @@ static CURLcode cf_quiche_query(struct Curl_cfilter *cf,
     return CURLE_OK;
   }
   case CF_QUERY_CONNECT_REPLY_MS:
-    *pres1 = ctx->first_reply_ms;
-    DEBUGF(LOG_CF(data, cf, "query connect reply: %dms", *pres1));
+    if(ctx->got_first_byte) {
+      timediff_t ms = Curl_timediff(ctx->first_byte_at, ctx->started_at);
+      *pres1 = (ms < INT_MAX)? (int)ms : INT_MAX;
+    }
+    else
+      *pres1 = -1;
     return CURLE_OK;
-
   default:
     break;
   }
index 6cd42f70e009349cf0e9ae46299fd79cd291bbec..43872bf0d299447087dfb2693d62f065f8efa58a 100644 (file)
 #endif
 #include "urldata.h"
 #include "dynbuf.h"
+#include "cfilters.h"
 #include "curl_log.h"
 #include "curl_msh3.h"
 #include "curl_ngtcp2.h"
 #include "curl_quiche.h"
 #include "vquic.h"
+#include "vquic_int.h"
 
 /* The last 3 #include files should be in this order */
 #include "curl_printf.h"
@@ -60,6 +62,220 @@ void Curl_quic_ver(char *p, size_t len)
 #endif
 }
 
+CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen)
+{
+  qctx->num_blocked_pkt = 0;
+  qctx->num_blocked_pkt_sent = 0;
+  memset(&qctx->blocked_pkt, 0, sizeof(qctx->blocked_pkt));
+
+  qctx->pktbuflen = pktbuflen;
+  qctx->pktbuf = malloc(qctx->pktbuflen);
+  if(!qctx->pktbuf)
+    return CURLE_OUT_OF_MEMORY;
+
+#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG)
+  qctx->no_gso = FALSE;
+#else
+  qctx->no_gso = TRUE;
+#endif
+
+  return CURLE_OK;
+}
+
+void vquic_ctx_free(struct cf_quic_ctx *qctx)
+{
+  free(qctx->pktbuf);
+  qctx->pktbuf = NULL;
+}
+
+static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
+                                   struct Curl_easy *data,
+                                   struct cf_quic_ctx *qctx,
+                                   const uint8_t *pkt, size_t pktlen,
+                                   size_t gsolen, size_t *psent);
+
+static CURLcode do_sendmsg(struct Curl_cfilter *cf,
+                           struct Curl_easy *data,
+                           struct cf_quic_ctx *qctx,
+                           const uint8_t *pkt, size_t pktlen, size_t gsolen,
+                           size_t *psent)
+{
+#ifdef HAVE_SENDMSG
+  struct iovec msg_iov;
+  struct msghdr msg = {0};
+  ssize_t sent;
+#if defined(__linux__) && defined(UDP_SEGMENT)
+  uint8_t msg_ctrl[32];
+  struct cmsghdr *cm;
+#endif
+
+  *psent = 0;
+  msg_iov.iov_base = (uint8_t *)pkt;
+  msg_iov.iov_len = pktlen;
+  msg.msg_iov = &msg_iov;
+  msg.msg_iovlen = 1;
+
+#if defined(__linux__) && defined(UDP_SEGMENT)
+  if(pktlen > gsolen) {
+    /* Only set this, when we need it. macOS, for example,
+     * does not seem to like a msg_control of length 0. */
+    msg.msg_control = msg_ctrl;
+    assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t)));
+    msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
+    cm = CMSG_FIRSTHDR(&msg);
+    cm->cmsg_level = SOL_UDP;
+    cm->cmsg_type = UDP_SEGMENT;
+    cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
+    *(uint16_t *)(void *)CMSG_DATA(cm) = gsolen & 0xffff;
+  }
+#endif
+
+
+  while((sent = sendmsg(qctx->sockfd, &msg, 0)) == -1 && SOCKERRNO == EINTR)
+    ;
+
+  if(sent == -1) {
+    switch(SOCKERRNO) {
+    case EAGAIN:
+#if EAGAIN != EWOULDBLOCK
+    case EWOULDBLOCK:
+#endif
+      return CURLE_AGAIN;
+    case EMSGSIZE:
+      /* UDP datagram is too large; caused by PMTUD. Just let it be lost. */
+      break;
+    case EIO:
+      if(pktlen > gsolen) {
+        /* GSO failure */
+        failf(data, "sendmsg() returned %zd (errno %d); disable GSO", sent,
+              SOCKERRNO);
+        qctx->no_gso = TRUE;
+        return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent);
+      }
+      /* FALLTHROUGH */
+    default:
+      failf(data, "sendmsg() returned %zd (errno %d)", sent, SOCKERRNO);
+      return CURLE_SEND_ERROR;
+    }
+  }
+  else {
+    assert(pktlen == (size_t)sent);
+  }
+#else
+  ssize_t sent;
+  (void)gsolen;
+
+  *psent = 0;
+
+  while((sent = send(qctx->sockfd, (const char *)pkt, pktlen, 0)) == -1 &&
+        SOCKERRNO == EINTR)
+    ;
+
+  if(sent == -1) {
+    if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
+      return CURLE_AGAIN;
+    }
+    else {
+      failf(data, "send() returned %zd (errno %d)", sent, SOCKERRNO);
+      if(SOCKERRNO != EMSGSIZE) {
+        return CURLE_SEND_ERROR;
+      }
+      /* UDP datagram is too large; caused by PMTUD. Just let it be
+         lost. */
+    }
+  }
+#endif
+  (void)cf;
+  *psent = pktlen;
+
+  return CURLE_OK;
+}
+
+static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
+                                   struct Curl_easy *data,
+                                   struct cf_quic_ctx *qctx,
+                                   const uint8_t *pkt, size_t pktlen,
+                                   size_t gsolen, size_t *psent)
+{
+  const uint8_t *p, *end = pkt + pktlen;
+  size_t sent;
+
+  *psent = 0;
+
+  for(p = pkt; p < end; p += gsolen) {
+    size_t len = CURLMIN(gsolen, (size_t)(end - p));
+    CURLcode curlcode = do_sendmsg(cf, data, qctx, p, len, len, &sent);
+    if(curlcode != CURLE_OK) {
+      return curlcode;
+    }
+    *psent += sent;
+  }
+
+  return CURLE_OK;
+}
+
+CURLcode vquic_send_packet(struct Curl_cfilter *cf,
+                           struct Curl_easy *data,
+                           struct cf_quic_ctx *qctx,
+                           const uint8_t *pkt, size_t pktlen, size_t gsolen,
+                           size_t *psent)
+{
+  if(qctx->no_gso && pktlen > gsolen) {
+    return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent);
+  }
+
+  return do_sendmsg(cf, data, qctx, pkt, pktlen, gsolen, psent);
+}
+
+
+
+void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
+                            struct cf_quic_ctx *qctx,
+                            const uint8_t *pkt, size_t pktlen, size_t gsolen)
+{
+  struct vquic_blocked_pkt *blkpkt;
+
+  (void)cf;
+  assert(qctx->num_blocked_pkt <
+         sizeof(qctx->blocked_pkt) / sizeof(qctx->blocked_pkt[0]));
+
+  blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt++];
+
+  blkpkt->pkt = pkt;
+  blkpkt->pktlen = pktlen;
+  blkpkt->gsolen = gsolen;
+}
+
+CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf,
+                                struct Curl_easy *data,
+                                struct cf_quic_ctx *qctx)
+{
+  size_t sent;
+  CURLcode curlcode;
+  struct vquic_blocked_pkt *blkpkt;
+
+  (void)cf;
+  for(; qctx->num_blocked_pkt_sent < qctx->num_blocked_pkt;
+      ++qctx->num_blocked_pkt_sent) {
+    blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt_sent];
+    curlcode = vquic_send_packet(cf, data, qctx, blkpkt->pkt,
+                                 blkpkt->pktlen, blkpkt->gsolen, &sent);
+
+    if(curlcode) {
+      if(curlcode == CURLE_AGAIN) {
+        blkpkt->pkt += sent;
+        blkpkt->pktlen -= sent;
+      }
+      return curlcode;
+    }
+  }
+
+  qctx->num_blocked_pkt = 0;
+  qctx->num_blocked_pkt_sent = 0;
+
+  return CURLE_OK;
+}
+
 /*
  * If the QLOGDIR environment variable is set, open and return a file
  * descriptor to write the log to.
index 9db4808d26cc467367a380924afb50623241d271..42aba39b061058bfccf7b44e92e58a4afe23026a 100644 (file)
 
 #ifdef ENABLE_QUIC
 
+struct vquic_blocked_pkt {
+  const uint8_t *pkt;
+  size_t pktlen;
+  size_t gsolen;
+};
+
+struct cf_quic_ctx {
+  curl_socket_t sockfd;
+  struct sockaddr_storage local_addr;
+  socklen_t local_addrlen;
+  struct vquic_blocked_pkt blocked_pkt[2];
+  uint8_t *pktbuf;
+  /* the number of entries in blocked_pkt */
+  size_t num_blocked_pkt;
+  size_t num_blocked_pkt_sent;
+  /* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */
+  size_t pktbuflen;
+  /* the number of processed entries in blocked_pkt */
+  bool no_gso;
+};
+
+CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen);
+void vquic_ctx_free(struct cf_quic_ctx *qctx);
+
+CURLcode vquic_send_packet(struct Curl_cfilter *cf,
+                           struct Curl_easy *data,
+                           struct cf_quic_ctx *qctx,
+                           const uint8_t *pkt, size_t pktlen, size_t gsolen,
+                           size_t *psent);
+
+void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
+                            struct cf_quic_ctx *qctx,
+                            const uint8_t *pkt, size_t pktlen, size_t gsolen);
+
+CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf,
+                                struct Curl_easy *data,
+                                struct cf_quic_ctx *qctx);
+
+
 #endif /* !ENABLE_QUIC */
 
 #endif /* HEADER_CURL_VQUIC_QUIC_INT_H */
index b4069376e767f98fd511262a49ca198a62c9d99c..3b1d28de846658ddaa88116c2c72f828e670a1ef 100644 (file)
@@ -40,4 +40,5 @@ nghttpx = @HTTPD_NGHTTPX@
 
 [caddy]
 caddy = @CADDY@
-port = 5004
+http_port = 5003
+https_port = 5004
index 023336f7b1ec9b53d671c59ff8b8fc5ba836ee97..7d64528c246b415a691b88419f51640a72ebaba2 100644 (file)
@@ -213,31 +213,33 @@ class ScoreCard:
         self.info(f'\n')
         return props
 
-    def downloads(self, proto: str) -> Dict[str, Any]:
+    def downloads(self, proto: str, test_httpd: bool = True,
+                  test_caddy: bool = True) -> Dict[str, Any]:
         scores = {}
-        if proto == 'h3':
-            port = self.env.h3_port
-            via = 'nghttpx'
-            descr = f'port {port}, proxying httpd'
-        else:
-            port = self.env.https_port
-            via = 'httpd'
-            descr = f'port {port}'
-        self.info('httpd downloads\n')
-        self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score1.data', fsize=1024*1024)
-        url1 = f'https://{self.env.domain1}:{port}/score1.data'
-        self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score10.data', fsize=10*1024*1024)
-        url10 = f'https://{self.env.domain1}:{port}/score10.data'
-        self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score100.data', fsize=100*1024*1024)
-        url100 = f'https://{self.env.domain1}:{port}/score100.data'
-        scores[via] = {
-            'description': descr,
-            '1MB-local': self.download_url(url=url1, proto=proto, count=50),
-            '10MB-local': self.download_url(url=url10, proto=proto, count=50),
-            '100MB-local': self.download_url(url=url100, proto=proto, count=50),
-        }
-        if self.caddy:
-            port = self.env.caddy_port
+        if test_httpd:
+            if proto == 'h3':
+                port = self.env.h3_port
+                via = 'nghttpx'
+                descr = f'port {port}, proxying httpd'
+            else:
+                port = self.env.https_port
+                via = 'httpd'
+                descr = f'port {port}'
+            self.info(f'{via} downloads\n')
+            self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score1.data', fsize=1024*1024)
+            url1 = f'https://{self.env.domain1}:{port}/score1.data'
+            self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score10.data', fsize=10*1024*1024)
+            url10 = f'https://{self.env.domain1}:{port}/score10.data'
+            self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score100.data', fsize=100*1024*1024)
+            url100 = f'https://{self.env.domain1}:{port}/score100.data'
+            scores[via] = {
+                'description': descr,
+                '1MB-local': self.download_url(url=url1, proto=proto, count=50),
+                '10MB-local': self.download_url(url=url10, proto=proto, count=50),
+                '100MB-local': self.download_url(url=url100, proto=proto, count=50),
+            }
+        if test_caddy and self.caddy:
+            port = self.caddy.port
             via = 'caddy'
             descr = f'port {port}'
             self.info('caddy downloads\n')
@@ -255,7 +257,11 @@ class ScoreCard:
             }
         return scores
 
-    def score_proto(self, proto: str, handshakes: bool = True, downloads: bool = True):
+    def score_proto(self, proto: str,
+                    handshakes: bool = True,
+                    downloads: bool = True,
+                    test_httpd: bool = True,
+                    test_caddy: bool = True):
         self.info(f"scoring {proto}\n")
         p = {}
         if proto == 'h3':
@@ -289,7 +295,9 @@ class ScoreCard:
         if handshakes:
             score['handshakes'] = self.handshakes(proto=proto)
         if downloads:
-            score['downloads'] = self.downloads(proto=proto)
+            score['downloads'] = self.downloads(proto=proto,
+                                                test_httpd=test_httpd,
+                                                test_caddy=test_caddy)
         self.info("\n")
         return score
 
@@ -335,6 +343,10 @@ class ScoreCard:
                             help="print text instead of json")
         parser.add_argument("-d", "--downloads", action='store_true', default=False,
                             help="evaluate downloads only")
+        parser.add_argument("--httpd", action='store_true', default=False,
+                            help="evaluate httpd server only")
+        parser.add_argument("--caddy", action='store_true', default=False,
+                            help="evaluate caddy server only")
         parser.add_argument("protocols", nargs='*', help="Name(s) of protocol to score")
         args = parser.parse_args()
 
@@ -348,8 +360,16 @@ class ScoreCard:
         protocols = args.protocols if len(args.protocols) else ['h2', 'h3']
         handshakes = True
         downloads = True
+        test_httpd = True
+        test_caddy = True
         if args.downloads:
             handshakes = False
+        if args.caddy:
+            test_caddy = True
+            test_httpd = False
+        if args.httpd:
+            test_caddy = False
+            test_httpd = True
 
         rv = 0
         self.env = Env()
@@ -372,7 +392,10 @@ class ScoreCard:
                 assert self.caddy.start()
 
             for p in protocols:
-                score = self.score_proto(proto=p, handshakes=handshakes, downloads=downloads)
+                score = self.score_proto(proto=p, handshakes=handshakes,
+                                         downloads=downloads,
+                                         test_caddy=test_caddy,
+                                         test_httpd=test_httpd)
                 if args.text:
                     self.print_score(score)
                 else:
index 615de719ea750501d432b932a6bcfd4cdc70ecf5..5d9f2d9496d4891a2b9fe87b32884b55244be11e 100644 (file)
@@ -42,13 +42,22 @@ class TestDownload:
     def _class_scope(self, env, httpd, nghttpx):
         if env.have_h3():
             nghttpx.start_if_needed()
-        fpath = os.path.join(httpd.docs_dir, 'data-1mb.data')
+
+    def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
+        fpath = os.path.join(docs_dir, fname)
         data1k = 1024*'x'
+        flen = 0
         with open(fpath, 'w') as fd:
-            fsize = 0
-            while fsize < 1024*1024:
+            while flen < fsize:
                 fd.write(data1k)
-                fsize += len(data1k)
+                flen += len(data1k)
+        return flen
+
+    @pytest.fixture(autouse=True, scope='class')
+    def _class_scope(self, env, httpd):
+        self._make_docs_file(docs_dir=httpd.docs_dir, fname='data1.data', fsize=1024*1024)
+        self._make_docs_file(docs_dir=httpd.docs_dir, fname='data10.data', fsize=10*1024*1024)
+        self._make_docs_file(docs_dir=httpd.docs_dir, fname='data100.data', fsize=100*1024*1024)
 
     # download 1 file
     @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
@@ -163,8 +172,8 @@ class TestDownload:
     @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
     def test_02_08_1MB_serial(self, env: Env,
                               httpd, nghttpx, repeat, proto):
-        count = 2
-        urln = f'https://{env.authority_for(env.domain1, proto)}/data-1mb.data?[0-{count-1}]'
+        count = 20
+        urln = f'https://{env.authority_for(env.domain1, proto)}/data1.data?[0-{count-1}]'
         curl = CurlClient(env=env)
         r = curl.http_download(urls=[urln], alpn_proto=proto)
         assert r.exit_code == 0
@@ -173,8 +182,30 @@ class TestDownload:
     @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
     def test_02_09_1MB_parallel(self, env: Env,
                               httpd, nghttpx, repeat, proto):
-        count = 2
-        urln = f'https://{env.authority_for(env.domain1, proto)}/data-1mb.data?[0-{count-1}]'
+        count = 20
+        urln = f'https://{env.authority_for(env.domain1, proto)}/data1.data?[0-{count-1}]'
+        curl = CurlClient(env=env)
+        r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+            '--parallel'
+        ])
+        assert r.exit_code == 0
+        r.check_stats(count=count, exp_status=200)
+
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_02_10_10MB_serial(self, env: Env,
+                              httpd, nghttpx, repeat, proto):
+        count = 20
+        urln = f'https://{env.authority_for(env.domain1, proto)}/data10.data?[0-{count-1}]'
+        curl = CurlClient(env=env)
+        r = curl.http_download(urls=[urln], alpn_proto=proto)
+        assert r.exit_code == 0
+        r.check_stats(count=count, exp_status=200)
+
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_02_11_10MB_parallel(self, env: Env,
+                              httpd, nghttpx, repeat, proto):
+        count = 20
+        urln = f'https://{env.authority_for(env.domain1, proto)}/data10.data?[0-{count-1}]'
         curl = CurlClient(env=env)
         r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
             '--parallel'
index bb6e9217f12b163833e999a4a7fda91793ed8930..a95e25536e7d47de818877eb908eed631e874a62 100644 (file)
@@ -73,6 +73,8 @@ class TestErrors:
                               proto):
         if proto == 'h3' and not env.have_h3():
             pytest.skip("h3 not supported")
+        if proto == 'h3' and env.curl_uses_lib('quiche'):
+            pytest.skip("quiche not reliable, sometimes reports success")
         count = 5
         curl = CurlClient(env=env)
         urln = f'https://{env.authority_for(env.domain1, proto)}' \
index aec403cc21f1f494d4d48c69d5b5dec45e5b362a..45a6b659db24702381cdad9aa9693b089c8a8160 100644 (file)
@@ -90,9 +90,26 @@ class TestUpload:
             respdata = open(curl.response_file(i)).readlines()
             assert respdata == [data]
 
+    # upload data parallel, check that they were echoed
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_07_11_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto):
+        if proto == 'h3' and not env.have_h3():
+            pytest.skip("h3 not supported")
+        count = 50
+        data = '0123456789'
+        curl = CurlClient(env=env)
+        url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
+        r = curl.http_upload(urls=[url], data=data, alpn_proto=proto,
+                             extra_args=['--parallel'])
+        assert r.exit_code == 0, f'{r}'
+        r.check_stats(count=count, exp_status=200)
+        for i in range(count):
+            respdata = open(curl.response_file(i)).readlines()
+            assert respdata == [data]
+
     # upload large data sequentially, check that this is what was echoed
     @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
-    def test_07_11_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
+    def test_07_20_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
         if proto == 'h3' and not env.have_h3():
             pytest.skip("h3 not supported")
         fdata = os.path.join(env.gen_dir, 'data-100k')
@@ -149,9 +166,9 @@ class TestUpload:
         if proto == 'h3' and not env.have_h3():
             pytest.skip("h3 not supported")
         if proto == 'h3' and env.curl_uses_lib('quiche'):
-            pytest.skip("quiche stalls on parallel, large uploads")
+            pytest.skip("quiche stalls on parallel, large uploads, unless --trace is used???")
         fdata = os.path.join(env.gen_dir, 'data-100k')
-        count = 3
+        count = 50
         curl = CurlClient(env=env)
         url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
         r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto,
diff --git a/tests/tests-httpd/test_08_caddy.py b/tests/tests-httpd/test_08_caddy.py
new file mode 100644 (file)
index 0000000..67a5f77
--- /dev/null
@@ -0,0 +1,147 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#***************************************************************************
+#                                  _   _ ____  _
+#  Project                     ___| | | |  _ \| |
+#                             / __| | | | |_) | |
+#                            | (__| |_| |  _ <| |___
+#                             \___|\___/|_| \_\_____|
+#
+# Copyright (C) 2008 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+###########################################################################
+#
+import logging
+import os
+import pytest
+
+from testenv import Env, CurlClient, Caddy
+
+
+log = logging.getLogger(__name__)
+
+
+@pytest.mark.skipif(condition=not Env.has_caddy(), reason=f"missing caddy")
+class TestCaddy:
+
+    @pytest.fixture(autouse=True, scope='class')
+    def caddy(self, env):
+        caddy = Caddy(env=env)
+        assert caddy.start()
+        yield caddy
+        caddy.stop()
+
+    def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
+        fpath = os.path.join(docs_dir, fname)
+        data1k = 1024*'x'
+        flen = 0
+        with open(fpath, 'w') as fd:
+            while flen < fsize:
+                fd.write(data1k)
+                flen += len(data1k)
+        return flen
+
+    @pytest.fixture(autouse=True, scope='class')
+    def _class_scope(self, env, caddy):
+        self._make_docs_file(docs_dir=caddy.docs_dir, fname='data1.data', fsize=1024*1024)
+        self._make_docs_file(docs_dir=caddy.docs_dir, fname='data10.data', fsize=10*1024*1024)
+        self._make_docs_file(docs_dir=caddy.docs_dir, fname='data100.data', fsize=100*1024*1024)
+
+    # download 1 file
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_08_01_download_1(self, env: Env, caddy: Caddy, repeat, proto):
+        if proto == 'h3' and not env.have_h3_curl():
+            pytest.skip("h3 not supported in curl")
+        curl = CurlClient(env=env)
+        url = f'https://{env.domain1}:{caddy.port}/data.json'
+        r = curl.http_download(urls=[url], alpn_proto=proto)
+        assert r.exit_code == 0, f'{r}'
+        r.check_stats(count=1, exp_status=200)
+
+    # download 1MB files sequentially
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_08_02_download_1mb_sequential(self, env: Env, caddy: Caddy,
+                                           repeat, proto):
+        if proto == 'h3' and not env.have_h3_curl():
+            pytest.skip("h3 not supported in curl")
+        count = 50
+        curl = CurlClient(env=env)
+        urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
+        r = curl.http_download(urls=[urln], alpn_proto=proto)
+        assert r.exit_code == 0
+        r.check_stats(count=count, exp_status=200)
+        # sequential transfers will open 1 connection
+        assert r.total_connects == 1
+
+    # download 1MB files parallel
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_08_03_download_1mb_parallel(self, env: Env, caddy: Caddy,
+                                         repeat, proto):
+        if proto == 'h3' and not env.have_h3_curl():
+            pytest.skip("h3 not supported in curl")
+        count = 50
+        curl = CurlClient(env=env)
+        urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
+        r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+            '--parallel'
+        ])
+        assert r.exit_code == 0
+        r.check_stats(count=count, exp_status=200)
+        if proto == 'http/1.1':
+            # http/1.1 parallel transfers will open multiple connections
+            assert r.total_connects > 1
+        else:
+            assert r.total_connects == 1
+
+    # download 10MB files sequentially
+    @pytest.mark.parametrize("proto", ['h2', 'h3'])
+    def test_08_04_download_10mb_sequential(self, env: Env, caddy: Caddy,
+                                           repeat, proto):
+        if proto == 'h3' and not env.have_h3_curl():
+            pytest.skip("h3 not supported in curl")
+        if proto == 'h3' and env.curl_uses_lib('quiche'):
+            pytest.skip("quiche stalls after a certain amount of data")
+        count = 20
+        curl = CurlClient(env=env)
+        urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'
+        r = curl.http_download(urls=[urln], alpn_proto=proto)
+        assert r.exit_code == 0
+        r.check_stats(count=count, exp_status=200)
+        # sequential transfers will open 1 connection
+        assert r.total_connects == 1
+
+    # download 10MB files parallel
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_08_05_download_1mb_parallel(self, env: Env, caddy: Caddy,
+                                         repeat, proto):
+        if proto == 'h3' and not env.have_h3_curl():
+            pytest.skip("h3 not supported in curl")
+        if proto == 'h3' and env.curl_uses_lib('quiche'):
+            pytest.skip("quiche stalls after a certain amount of data")
+        count = 50
+        curl = CurlClient(env=env)
+        urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'
+        r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+            '--parallel'
+        ])
+        assert r.exit_code == 0
+        r.check_stats(count=count, exp_status=200)
+        if proto == 'http/1.1':
+            # http/1.1 parallel transfers will open multiple connections
+            assert r.total_connects > 1
+        else:
+            assert r.total_connects == 1
+
index 23fb4ec7a1fe575ac42050756de726f61b114af2..c97cf661d32c422b3fbef8ca0ac5e396bfaf49fb 100644 (file)
@@ -55,6 +55,10 @@ class Caddy:
     def docs_dir(self):
         return self._docs_dir
 
+    @property
+    def port(self) -> str:
+        return self.env.caddy_https_port
+
     def clear_logs(self):
         self._rmf(self._error_log)
 
@@ -105,7 +109,7 @@ class Caddy:
         curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
         try_until = datetime.now() + timeout
         while datetime.now() < try_until:
-            check_url = f'https://{self.env.domain1}:{self.env.caddy_port}/'
+            check_url = f'https://{self.env.domain1}:{self.port}/'
             r = curl.http_get(url=check_url)
             if r.exit_code != 0:
                 return True
@@ -118,7 +122,7 @@ class Caddy:
         curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
         try_until = datetime.now() + timeout
         while datetime.now() < try_until:
-            check_url = f'https://{self.env.domain1}:{self.env.caddy_port}/'
+            check_url = f'https://{self.env.domain1}:{self.port}/'
             r = curl.http_get(url=check_url)
             if r.exit_code == 0:
                 return True
@@ -149,12 +153,13 @@ class Caddy:
         with open(self._conf_file, 'w') as fd:
             conf = [   # base server config
                 f'{{',
-                f'  https_port {self.env.caddy_port}',
-                f'  servers :{self.env.caddy_port} {{',
+                f'  http_port {self.env.caddy_http_port}',
+                f'  https_port {self.env.caddy_https_port}',
+                f'  servers :{self.env.caddy_https_port} {{',
                 f'    protocols h3 h2 h1',
                 f'  }}',
                 f'}}',
-                f'{domain1}:{self.env.caddy_port} {{',
+                f'{domain1}:{self.env.caddy_https_port} {{',
                 f'  file_server * {{',
                 f'    root {self._docs_dir}',
                 f'  }}',
index 40dadb0a34b68e9802f791b05204f88d9e30e64b..a2b538e80f31f885a7fbc18155faf5f3944f1609 100644 (file)
@@ -319,6 +319,8 @@ class CurlClient:
         args = [self._curl, "-s", "--path-as-is"]
         if with_headers:
             args.extend(["-D", self._headerfile])
+        if self.env.verbose > 1:
+            args.extend(['--trace', self._tracefile])
         if self.env.verbose > 2:
             args.extend(['--trace', self._tracefile, '--trace-time'])
 
index 0acebe22300d2495bb971f5337a7f5d9935c43bf..83d3cce4c7a669c42480c2fd9368cd1730180b4b 100644 (file)
@@ -136,8 +136,8 @@ class EnvConfig:
                 log.debug(f'nghttpx -v: {p.stdout}')
 
         self.caddy = self.config['caddy']['caddy']
-        if len(self.caddy) == 0:
-            self.caddy = 'caddy'
+        if len(self.caddy.strip()) == 0:
+            self.caddy = None
         if self.caddy is not None:
             try:
                 p = subprocess.run(args=[self.caddy, 'version'],
@@ -147,7 +147,8 @@ class EnvConfig:
                     self.caddy = None
             except:
                 self.caddy = None
-        self.caddy_port = self.config['caddy']['port']
+        self.caddy_http_port = self.config['caddy']['http_port']
+        self.caddy_https_port = self.config['caddy']['https_port']
 
     @property
     def httpd_version(self):
@@ -241,6 +242,10 @@ class Env:
     def httpd_is_at_least(minv) -> bool:
         return Env.CONFIG.httpd_is_at_least(minv)
 
+    @staticmethod
+    def has_caddy() -> bool:
+        return Env.CONFIG.caddy is not None
+
     def __init__(self, pytestconfig=None):
         self._verbose = pytestconfig.option.verbose \
             if pytestconfig is not None else 0
@@ -306,8 +311,12 @@ class Env:
         return self.CONFIG.caddy
 
     @property
-    def caddy_port(self) -> str:
-        return self.CONFIG.caddy_port
+    def caddy_https_port(self) -> str:
+        return self.CONFIG.caddy_https_port
+
+    @property
+    def caddy_http_port(self) -> str:
+        return self.CONFIG.caddy_http_port
 
     @property
     def curl(self) -> str:
index 100cf7372e645aa7696f5f5b09f33d496f57551c..cc163dcb337de612015949a52e7d8f3e1d4e210f 100644 (file)
@@ -160,7 +160,9 @@ class Nghttpx:
         try_until = datetime.now() + timeout
         while datetime.now() < try_until:
             check_url = f'https://{self.env.domain1}:{self.env.h3_port}/'
-            r = curl.http_get(url=check_url, extra_args=['--http3-only'])
+            r = curl.http_get(url=check_url, extra_args=[
+                '--http3-only', '--trace', 'curl.trace', '--trace-time'
+            ])
             if r.exit_code == 0:
                 return True
             log.debug(f'waiting for nghttpx to become responsive: {r}')