]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
http2: flow control and buffer improvements
authorStefan Eissing <stefan@eissing.org>
Thu, 30 Mar 2023 10:13:49 +0000 (12:13 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Thu, 30 Mar 2023 21:11:26 +0000 (23:11 +0200)
- use bufq for send/receive of network data
- usd bufq for send/receive of stream data
- use HTTP/2 flow control with no-auto updates to control the
  amount of data we are buffering for a stream
  HTTP/2 stream window set to 128K after local tests, defined
  code constant for now
- elminiating PAUSEing nghttp2 processing when receiving data
  since a stream can now take in all DATA nghttp2 forwards

Improved scorecard and adjuste http2 stream window sizes
- scorecard improved output formatting and options default
- scorecard now also benchmarks small requests / second

Closes #10771

lib/bufq.c
lib/bufq.h
lib/http.c
lib/http.h
lib/http2.c
tests/data/test1800
tests/http/scorecard.py
tests/http/test_07_upload.py
tests/http/testenv/caddy.py
tests/http/testenv/env.py

index e0d7726613ea01796a1e38ebf746dc523ec0d21c..f0ab6bb75c84628add70bac83a5e9c9d1f31643e 100644 (file)
@@ -287,12 +287,6 @@ bool Curl_bufq_is_full(const struct bufq *q)
   return chunk_is_full(q->tail);
 }
 
-static size_t data_pass_size(struct bufq *q)
-{
-  (void)q;
-  return 4*1024;
-}
-
 static struct buf_chunk *get_spare(struct bufq *q)
 {
   struct buf_chunk *chunk = NULL;
@@ -426,9 +420,12 @@ ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
   return nread;
 }
 
-bool Curl_bufq_peek(const struct bufq *q,
+bool Curl_bufq_peek(struct bufq *q,
                     const unsigned char **pbuf, size_t *plen)
 {
+  if(q->head && chunk_is_empty(q->head)) {
+    prune_head(q);
+  }
   if(q->head && !chunk_is_empty(q->head)) {
     chunk_peek(q->head, pbuf, plen);
     return TRUE;
@@ -438,7 +435,7 @@ bool Curl_bufq_peek(const struct bufq *q,
   return FALSE;
 }
 
-bool Curl_bufq_peek_at(const struct bufq *q, size_t offset,
+bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
                        const unsigned char **pbuf, size_t *plen)
 {
   struct buf_chunk *c = q->head;
@@ -502,13 +499,11 @@ ssize_t Curl_bufq_write_pass(struct bufq *q,
                              CURLcode *err)
 {
   ssize_t nwritten = 0, n;
-  bool prefer_direct = (len >= data_pass_size(q));
 
   *err = CURLE_OK;
   while(len) {
-    if(Curl_bufq_is_full(q) || (!Curl_bufq_is_empty(q) && prefer_direct)) {
-      /* try to make room in case we are full
-       * or empty the buffer when adding "large" data */
+    if(Curl_bufq_is_full(q)) {
+      /* try to make room in case we are full */
       n = Curl_bufq_pass(q, writer, writer_ctx, err);
       if(n < 0) {
         if(*err != CURLE_AGAIN) {
@@ -519,22 +514,6 @@ ssize_t Curl_bufq_write_pass(struct bufq *q,
       }
     }
 
-    if(Curl_bufq_is_empty(q) && prefer_direct) {
-      /* empty and `data` is "large", try passing directly */
-      n = writer(writer_ctx, buf, len, err);
-      if(n < 0) {
-        if(*err != CURLE_AGAIN) {
-          /* real error, fail */
-          return -1;
-        }
-        /* passing would block */
-        n = 0;
-      }
-      buf += (size_t)n;
-      len -= (size_t)n;
-      nwritten += (size_t)n;
-    }
-
     if(len) {
       /* Add whatever is remaining now to bufq */
       n = Curl_bufq_write(q, buf, len, err);
index a4ca21ecead44692f66193f932e0c98181008ea3..09af226a9e45cb3b9eecb652ae82b0b0b4694b9f 100644 (file)
@@ -201,10 +201,10 @@ ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
  * Repeated calls return the same information until the buffer queue
  * is modified, see `Curl_bufq_skip()``
  */
-bool Curl_bufq_peek(const struct bufq *q,
+bool Curl_bufq_peek(struct bufq *q,
                     const unsigned char **pbuf, size_t *plen);
 
-bool Curl_bufq_peek_at(const struct bufq *q, size_t offset,
+bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
                        const unsigned char **pbuf, size_t *plen);
 
 /**
index bcaa7948711262d076e4e8f43c19654a7b5b0244..cc2f5f057339fb19c5919878b72587f1ea727bf6 100644 (file)
@@ -4556,7 +4556,8 @@ CURLcode Curl_http_req_make(struct http_req **preq,
     if(!req->path)
       goto out;
   }
-  Curl_dynhds_init(&req->headers, 128, DYN_H2_HEADERS);
+  Curl_dynhds_init(&req->headers, 0, DYN_H2_HEADERS);
+  Curl_dynhds_init(&req->trailers, 0, DYN_H2_TRAILERS);
   result = CURLE_OK;
 
 out:
@@ -4573,6 +4574,7 @@ void Curl_http_req_free(struct http_req *req)
     free(req->authority);
     free(req->path);
     Curl_dynhds_free(&req->headers);
+    Curl_dynhds_free(&req->trailers);
     free(req);
   }
 }
@@ -4594,7 +4596,8 @@ CURLcode Curl_http_resp_make(struct http_resp **presp,
     if(!resp->description)
       goto out;
   }
-  Curl_dynhds_init(&resp->headers, 128, DYN_H2_HEADERS);
+  Curl_dynhds_init(&resp->headers, 0, DYN_H2_HEADERS);
+  Curl_dynhds_init(&resp->trailers, 0, DYN_H2_TRAILERS);
   result = CURLE_OK;
 
 out:
@@ -4609,6 +4612,7 @@ void Curl_http_resp_free(struct http_resp *resp)
   if(resp) {
     free(resp->description);
     Curl_dynhds_free(&resp->headers);
+    Curl_dynhds_free(&resp->trailers);
     if(resp->prev)
       Curl_http_resp_free(resp->prev);
     free(resp);
index 5f4fcb90435fecd7ffd9f7ebdd4990940055780f..b9a2e6149278727f3868c3d667233c7215292197 100644 (file)
@@ -29,6 +29,7 @@
 #include <pthread.h>
 #endif
 
+#include "bufq.h"
 #include "dynhds.h"
 #include "ws.h"
 
@@ -227,14 +228,12 @@ struct HTTP {
 #ifdef USE_NGHTTP2
   /*********** for HTTP/2 we store stream-local data here *************/
   int32_t stream_id; /* stream we are interested in */
-
-  /* We store non-final and final response headers here, per-stream */
-  struct dynbuf header_recvbuf;
-  size_t nread_header_recvbuf; /* number of bytes in header_recvbuf fed into
-                                  upper layer */
-  struct dynbuf trailer_recvbuf;
-  const uint8_t *pausedata; /* pointer to data received in on_data_chunk */
-  size_t pauselen; /* the number of bytes left in data */
+  struct bufq h2_sendbuf; /* request body data buffere for sending */
+  size_t h2_send_hds_len; /* amount of bytes in first cf_send() that
+                             are header bytes. Or 0 if not known. */
+  struct bufq h2_recvbuf;
+  size_t h2_recv_hds_len; /* how many bytes in recvbuf are headers */
+  struct dynhds resp_trailers;
   bool close_handled; /* TRUE if stream closure is handled by libcurl */
 
   char **push_headers;       /* allocated array */
@@ -346,6 +345,7 @@ struct http_req {
   char *authority;
   char *path;
   struct dynhds headers;
+  struct dynhds trailers;
 };
 
 /**
@@ -366,6 +366,7 @@ struct http_resp {
   int status;
   char *description;
   struct dynhds headers;
+  struct dynhds trailers;
   struct http_resp *prev;
 };
 
index b0ce87d98793525f05bdb43082a02cc905a442f9..f43462d7059a4f229b0a48e017816ee858a73398 100644 (file)
@@ -27,6 +27,7 @@
 #ifdef USE_NGHTTP2
 #include <nghttp2/nghttp2.h>
 #include "urldata.h"
+#include "bufq.h"
 #include "http2.h"
 #include "http.h"
 #include "sendf.h"
@@ -48,8 +49,6 @@
 #include "curl_memory.h"
 #include "memdebug.h"
 
-#define H2_BUFSIZE 32768
-
 #if (NGHTTP2_VERSION_NUM < 0x010c00)
 #error too old nghttp2 version, upgrade!
 #endif
 #define NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE 1
 #endif
 
-#define HTTP2_HUGE_WINDOW_SIZE (32 * 1024 * 1024) /* 32 MB */
 
+/* buffer dimensioning:
+ * use 16K as chunk size, as that fits H2 DATA frames well */
+#define H2_CHUNK_SIZE           (16 * 1024)
+/* this is how much we want "in flight" for a stream */
+#define H2_STREAM_WINDOW_SIZE   (512 * 1024)
+/* on receving from TLS, we prep for holding a full stream window */
+#define H2_NW_RECV_CHUNKS       (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+/* on send into TLS, we just want to accumulate small frames */
+#define H2_NW_SEND_CHUNKS       1
+/* stream recv/send chunks are a result of window / chunk sizes */
+#define H2_STREAM_RECV_CHUNKS   (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+#define H2_STREAM_SEND_CHUNKS   (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+/* spare chunks we keep for a full window */
+#define H2_STREAM_POOL_SPARES   (H2_STREAM_WINDOW_SIZE / H2_CHUNK_SIZE)
+
+#define HTTP2_HUGE_WINDOW_SIZE (16 * H2_STREAM_WINDOW_SIZE)
 
 #define H2_SETTINGS_IV_LEN  3
 #define H2_BINSETTINGS_LEN 80
@@ -75,7 +89,7 @@ static int populate_settings(nghttp2_settings_entry *iv,
   iv[0].value = Curl_multi_max_concurrent_streams(data->multi);
 
   iv[1].settings_id = NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
-  iv[1].value = HTTP2_HUGE_WINDOW_SIZE;
+  iv[1].value = H2_STREAM_WINDOW_SIZE;
 
   iv[2].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH;
   iv[2].value = data->multi->push_cb != NULL;
@@ -101,22 +115,14 @@ struct cf_h2_ctx {
   /* The easy handle used in the current filter call, cleared at return */
   struct cf_call_data call_data;
 
-  char *inbuf; /* buffer to receive data from underlying socket */
-  size_t inbuflen; /* number of bytes filled in inbuf */
-  size_t nread_inbuf; /* number of bytes read from in inbuf */
-
-  struct dynbuf outbuf;
+  struct bufq inbufq;           /* network input */
+  struct bufq outbufq;          /* network output */
+  struct bufc_pool stream_bufcp; /* spares for stream buffers */
 
-  /* We need separate buffer for transmission and reception because we
-     may call nghttp2_session_send() after the
-     nghttp2_session_mem_recv() but mem buffer is still not full. In
-     this case, we wrongly sends the content of mem buffer if we share
-     them for both cases. */
-  int32_t pause_stream_id; /* stream ID which paused
-                              nghttp2_session_mem_recv */
   size_t drain_total; /* sum of all stream's UrlState.drain */
   int32_t goaway_error;
   int32_t last_stream_id;
+  BIT(conn_closed);
   BIT(goaway);
   BIT(enable_push);
 };
@@ -133,8 +139,9 @@ static void cf_h2_ctx_clear(struct cf_h2_ctx *ctx)
   if(ctx->h2) {
     nghttp2_session_del(ctx->h2);
   }
-  free(ctx->inbuf);
-  Curl_dyn_free(&ctx->outbuf);
+  Curl_bufq_free(&ctx->inbufq);
+  Curl_bufq_free(&ctx->outbufq);
+  Curl_bufcp_free(&ctx->stream_bufcp);
   memset(ctx, 0, sizeof(*ctx));
   ctx->call_data = save;
 }
@@ -151,22 +158,42 @@ static int h2_client_new(struct Curl_cfilter *cf,
                          nghttp2_session_callbacks *cbs)
 {
   struct cf_h2_ctx *ctx = cf->ctx;
-
-#if NGHTTP2_VERSION_NUM < 0x013200
-  /* before 1.50.0 */
-  return nghttp2_session_client_new(&ctx->h2, cbs, cf);
-#else
   nghttp2_option *o;
+
   int rc = nghttp2_option_new(&o);
   if(rc)
     return rc;
+  /* We handle window updates ourself to enfore buffer limits */
+  nghttp2_option_set_no_auto_window_update(o, 1);
+#if NGHTTP2_VERSION_NUM >= 0x013200
+  /* with 1.50.0 */
   /* turn off RFC 9113 leading and trailing white spaces validation against
      HTTP field value. */
   nghttp2_option_set_no_rfc9113_leading_and_trailing_ws_validation(o, 1);
+#endif
   rc = nghttp2_session_client_new2(&ctx->h2, cbs, cf, o);
   nghttp2_option_del(o);
   return rc;
-#endif
+}
+
+static ssize_t nw_in_reader(void *reader_ctx,
+                              unsigned char *buf, size_t buflen,
+                              CURLcode *err)
+{
+  struct Curl_cfilter *cf = reader_ctx;
+  struct Curl_easy *data = CF_DATA_CURRENT(cf);
+
+  return Curl_conn_cf_recv(cf->next, data, (char *)buf, buflen, err);
+}
+
+static ssize_t nw_out_writer(void *writer_ctx,
+                             const unsigned char *buf, size_t buflen,
+                             CURLcode *err)
+{
+  struct Curl_cfilter *cf = writer_ctx;
+  struct Curl_easy *data = CF_DATA_CURRENT(cf);
+
+  return Curl_conn_cf_send(cf->next, data, (const char *)buf, buflen, err);
 }
 
 static ssize_t send_callback(nghttp2_session *h2,
@@ -204,6 +231,7 @@ static void multi_connchanged(struct Curl_multi *multi)
 static CURLcode http2_data_setup(struct Curl_cfilter *cf,
                                  struct Curl_easy *data)
 {
+  struct cf_h2_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
 
   (void)cf;
@@ -212,22 +240,19 @@ static CURLcode http2_data_setup(struct Curl_cfilter *cf,
 
   stream->stream_id = -1;
 
-  Curl_dyn_init(&stream->header_recvbuf, DYN_H2_HEADERS);
-  Curl_dyn_init(&stream->trailer_recvbuf, DYN_H2_TRAILERS);
-
+  Curl_bufq_initp(&stream->h2_sendbuf, &ctx->stream_bufcp,
+                  H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
+  Curl_bufq_initp(&stream->h2_recvbuf, &ctx->stream_bufcp,
+                  H2_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
+  Curl_dynhds_init(&stream->resp_trailers, 0, DYN_H2_TRAILERS);
+  stream->h2_send_hds_len = 0;
+  stream->h2_recv_hds_len = 0;
   stream->bodystarted = FALSE;
   stream->status_code = -1;
-  stream->pausedata = NULL;
-  stream->pauselen = 0;
   stream->closed = FALSE;
   stream->close_handled = FALSE;
-  stream->memlen = 0;
   stream->error = NGHTTP2_NO_ERROR;
   stream->upload_left = 0;
-  stream->upload_mem = NULL;
-  stream->upload_len = 0;
-  stream->mem = data->state.buffer;
-  stream->len = data->set.buffer_size;
 
   return CURLE_OK;
 }
@@ -246,11 +271,10 @@ static CURLcode cf_h2_ctx_init(struct Curl_cfilter *cf,
   nghttp2_session_callbacks *cbs = NULL;
 
   DEBUGASSERT(!ctx->h2);
-  ctx->inbuf = malloc(H2_BUFSIZE);
-  if(!ctx->inbuf)
-      goto out;
-  /* we want to aggregate small frames, SETTINGS, PRIO, UPDATES */
-  Curl_dyn_init(&ctx->outbuf, 4*1024);
+  Curl_bufcp_init(&ctx->stream_bufcp, H2_CHUNK_SIZE, H2_STREAM_POOL_SPARES);
+  Curl_bufq_initp(&ctx->inbufq, &ctx->stream_bufcp, H2_NW_RECV_CHUNKS, 0);
+  Curl_bufq_initp(&ctx->outbufq, &ctx->stream_bufcp, H2_NW_SEND_CHUNKS, 0);
+  ctx->last_stream_id = 2147483647;
 
   rc = nghttp2_session_callbacks_new(&cbs);
   if(rc) {
@@ -345,19 +369,14 @@ out:
   return result;
 }
 
-static CURLcode  h2_session_send(struct Curl_cfilter *cf,
-                                 struct Curl_easy *data);
-static int h2_process_pending_input(struct Curl_cfilter *cf,
-                                    struct Curl_easy *data,
-                                    CURLcode *err);
-
 /*
  * http2_stream_free() free HTTP2 stream related data
  */
 static void http2_stream_free(struct HTTP *stream)
 {
   if(stream) {
-    Curl_dyn_free(&stream->header_recvbuf);
+    Curl_bufq_free(&stream->h2_sendbuf);
+    Curl_bufq_free(&stream->h2_recvbuf);
     for(; stream->push_headers_used > 0; --stream->push_headers_used) {
       free(stream->push_headers[stream->push_headers_used - 1]);
     }
@@ -375,6 +394,54 @@ static int should_close_session(struct cf_h2_ctx *ctx)
     !nghttp2_session_want_write(ctx->h2);
 }
 
+/*
+ * Processes pending input left in network input buffer.
+ * This function returns 0 if it succeeds, or -1 and error code will
+ * be assigned to *err.
+ */
+static int h2_process_pending_input(struct Curl_cfilter *cf,
+                                    struct Curl_easy *data,
+                                    CURLcode *err)
+{
+  struct cf_h2_ctx *ctx = cf->ctx;
+  const unsigned char *buf;
+  size_t blen;
+  ssize_t rv;
+
+  while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) {
+
+    rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen);
+    DEBUGF(LOG_CF(data, cf,
+                 "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv));
+    if(rv < 0) {
+      failf(data,
+            "process_pending_input: nghttp2_session_mem_recv() returned "
+            "%zd:%s", rv, nghttp2_strerror((int)rv));
+      *err = CURLE_RECV_ERROR;
+      return -1;
+    }
+    Curl_bufq_skip(&ctx->inbufq, (size_t)rv);
+    if(Curl_bufq_is_empty(&ctx->inbufq)) {
+      DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
+      break;
+    }
+    else {
+      DEBUGF(LOG_CF(data, cf, "process_pending_input: %zu bytes left "
+                    "in connection buffer", Curl_bufq_len(&ctx->inbufq)));
+    }
+  }
+
+  if(nghttp2_session_check_request_allowed(ctx->h2) == 0) {
+    /* No more requests are allowed in the current session, so
+       the connection may not be reused. This is set when a
+       GOAWAY frame has been received or when the limit of stream
+       identifiers has been reached. */
+    connclose(cf->conn, "http/2: No new requests allowed");
+  }
+
+  return 0;
+}
+
 /*
  * The server may send us data at any point (e.g. PING frames). Therefore,
  * we cannot assume that an HTTP/2 socket is dead just because it is readable.
@@ -401,13 +468,10 @@ static bool http2_connisalive(struct Curl_cfilter *cf, struct Curl_easy *data,
 
     *input_pending = FALSE;
     Curl_attach_connection(data, cf->conn);
-    nread = Curl_conn_cf_recv(cf->next, data,
-                              ctx->inbuf, H2_BUFSIZE, &result);
+    nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
     if(nread != -1) {
-      DEBUGF(LOG_CF(data, cf, "%d bytes stray data read before trying "
-                    "h2 connection", (int)nread));
-      ctx->nread_inbuf = 0;
-      ctx->inbuflen = nread;
+      DEBUGF(LOG_CF(data, cf, "%zd bytes stray data read before trying "
+                    "h2 connection", nread));
       if(h2_process_pending_input(cf, data, &result) < 0)
         /* immediate error, considered dead */
         alive = FALSE;
@@ -456,30 +520,23 @@ void Curl_http2_ver(char *p, size_t len)
   (void)msnprintf(p, len, "nghttp2/%s", h2->version_str);
 }
 
-static CURLcode flush_output(struct Curl_cfilter *cf,
+static CURLcode nw_out_flush(struct Curl_cfilter *cf,
                              struct Curl_easy *data)
 {
   struct cf_h2_ctx *ctx = cf->ctx;
-  size_t buflen = Curl_dyn_len(&ctx->outbuf);
-  ssize_t written;
+  ssize_t nwritten;
   CURLcode result;
 
-  if(!buflen)
+  (void)data;
+  if(Curl_bufq_is_empty(&ctx->outbufq))
     return CURLE_OK;
 
-  DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", buflen));
-  written = Curl_conn_cf_send(cf->next, data, Curl_dyn_ptr(&ctx->outbuf),
-                              buflen, &result);
-  if(written < 0) {
+  DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes",
+                Curl_bufq_len(&ctx->outbufq)));
+  nwritten = Curl_bufq_pass(&ctx->outbufq, nw_out_writer, cf, &result);
+  if(nwritten < 0 && result != CURLE_AGAIN) {
     return result;
   }
-  if((size_t)written < buflen) {
-    Curl_dyn_tail(&ctx->outbuf, buflen - (size_t)written);
-    return CURLE_AGAIN;
-  }
-  else {
-    Curl_dyn_reset(&ctx->outbuf);
-  }
   return CURLE_OK;
 }
 
@@ -495,49 +552,27 @@ static ssize_t send_callback(nghttp2_session *h2,
   struct Curl_cfilter *cf = userp;
   struct cf_h2_ctx *ctx = cf->ctx;
   struct Curl_easy *data = CF_DATA_CURRENT(cf);
-  ssize_t written;
+  ssize_t nwritten;
   CURLcode result = CURLE_OK;
-  size_t buflen = Curl_dyn_len(&ctx->outbuf);
 
   (void)h2;
   (void)flags;
   DEBUGASSERT(data);
 
-  if(blen < 1024 && (buflen + blen + 1 < ctx->outbuf.toobig)) {
-    result = Curl_dyn_addn(&ctx->outbuf, buf, blen);
-    if(result) {
-      failf(data, "Failed to add data to output buffer");
-      return NGHTTP2_ERR_CALLBACK_FAILURE;
-    }
-    return blen;
-  }
-  if(buflen) {
-    /* not adding, flush buffer */
-    result = flush_output(cf, data);
-    if(result) {
-      if(result == CURLE_AGAIN) {
-        return NGHTTP2_ERR_WOULDBLOCK;
-      }
-      failf(data, "Failed sending HTTP2 data");
-      return NGHTTP2_ERR_CALLBACK_FAILURE;
+  nwritten = Curl_bufq_write_pass(&ctx->outbufq, buf, blen,
+                                  nw_out_writer, cf, &result);
+  if(nwritten < 0) {
+    if(result == CURLE_AGAIN) {
+      return NGHTTP2_ERR_WOULDBLOCK;
     }
-  }
-
-  DEBUGF(LOG_CF(data, cf, "h2 conn send %zu bytes", blen));
-  written = Curl_conn_cf_send(cf->next, data, buf, blen, &result);
-  if(result == CURLE_AGAIN) {
-    return NGHTTP2_ERR_WOULDBLOCK;
-  }
-
-  if(written == -1) {
     failf(data, "Failed sending HTTP2 data");
     return NGHTTP2_ERR_CALLBACK_FAILURE;
   }
 
-  if(!written)
+  if(!nwritten)
     return NGHTTP2_ERR_WOULDBLOCK;
 
-  return written;
+  return nwritten;
 }
 
 
@@ -779,17 +814,21 @@ static int push_promise(struct Curl_cfilter *cf,
     }
 
     rv = nghttp2_session_set_stream_user_data(ctx->h2,
-                                              frame->promised_stream_id,
+                                              newstream->stream_id,
                                               newhandle);
     if(rv) {
       infof(data, "failed to set user_data for stream %u",
-            frame->promised_stream_id);
+            newstream->stream_id);
       DEBUGASSERT(0);
       rv = CURL_PUSH_DENY;
       goto fail;
     }
-    Curl_dyn_init(&newstream->header_recvbuf, DYN_H2_HEADERS);
-    Curl_dyn_init(&newstream->trailer_recvbuf, DYN_H2_TRAILERS);
+    Curl_bufq_initp(&newstream->h2_sendbuf, &ctx->stream_bufcp,
+                    H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
+    Curl_bufq_initp(&newstream->h2_recvbuf, &ctx->stream_bufcp,
+                    H2_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
+    newstream->h2_send_hds_len = 0;
+    Curl_dynhds_init(&newstream->resp_trailers, 0, DYN_H2_TRAILERS);
   }
   else {
     DEBUGF(LOG_CF(data, cf, "Got PUSH_PROMISE, ignore it"));
@@ -799,6 +838,25 @@ static int push_promise(struct Curl_cfilter *cf,
   return rv;
 }
 
+static CURLcode recvbuf_write_hds(struct Curl_cfilter *cf,
+                                  struct Curl_easy *data,
+                                  const char *buf, size_t blen)
+{
+  struct HTTP *stream = data->req.p.http;
+  ssize_t nwritten;
+  CURLcode result;
+
+  (void)cf;
+  nwritten = Curl_bufq_write(&stream->h2_recvbuf,
+                             (const unsigned char *)buf, blen, &result);
+  if(nwritten < 0)
+    return result;
+  stream->h2_recv_hds_len += (size_t)nwritten;
+  /* TODO: make sure recvbuf is more flexible with overflow */
+  DEBUGASSERT((size_t)nwritten == blen);
+  return CURLE_OK;
+}
+
 static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
                          void *userp)
 {
@@ -808,7 +866,6 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
   struct HTTP *stream = NULL;
   struct Curl_easy *data = CF_DATA_CURRENT(cf);
   int rv;
-  size_t left, ncopy;
   int32_t stream_id = frame->hd.stream_id;
   CURLcode result;
 
@@ -841,6 +898,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
       ctx->goaway_error = frame->goaway.error_code;
       ctx->last_stream_id = frame->goaway.last_stream_id;
       if(data) {
+        DEBUGF(LOG_CF(data, cf, "recv GOAWAY, error=%d, last_stream=%u",
+                      ctx->goaway_error, ctx->last_stream_id));
         infof(data, "recveived GOAWAY, error=%d, last_stream=%u",
                     ctx->goaway_error, ctx->last_stream_id);
         multi_connchanged(data->multi);
@@ -882,7 +941,7 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
     if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
       /* Stream has ended. If there is pending data, ensure that read
          will occur to consume it. */
-      if(!data->state.drain && stream->memlen) {
+      if(!data->state.drain && !Curl_bufq_is_empty(&stream->h2_recvbuf)) {
         drain_this(cf, data_s);
         Curl_expire(data, 0, EXPIRE_RUN_NOW);
       }
@@ -908,29 +967,16 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
       stream->status_code = -1;
     }
 
-    result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("\r\n"));
+    result = recvbuf_write_hds(cf, data_s, STRCONST("\r\n"));
     if(result)
       return NGHTTP2_ERR_CALLBACK_FAILURE;
 
-    left = Curl_dyn_len(&stream->header_recvbuf) -
-      stream->nread_header_recvbuf;
-    ncopy = CURLMIN(stream->len, left);
-
-    memcpy(&stream->mem[stream->memlen],
-           Curl_dyn_ptr(&stream->header_recvbuf) +
-           stream->nread_header_recvbuf,
-           ncopy);
-    stream->nread_header_recvbuf += ncopy;
-
-    DEBUGASSERT(stream->mem);
-    DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu header bytes, at %p",
-                  stream_id, ncopy, (void *)stream->mem));
-
-    stream->len -= ncopy;
-    stream->memlen += ncopy;
-
-    drain_this(cf, data_s);
-    Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+    DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu header bytes",
+                  stream_id, Curl_bufq_len(&stream->h2_recvbuf)));
+    if(CF_DATA_CURRENT(cf) != data_s) {
+      drain_this(cf, data_s);
+      Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+    }
     break;
   case NGHTTP2_PUSH_PROMISE:
     DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv PUSH_PROMISE", stream_id));
@@ -980,10 +1026,10 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
                               const uint8_t *mem, size_t len, void *userp)
 {
   struct Curl_cfilter *cf = userp;
-  struct cf_h2_ctx *ctx = cf->ctx;
   struct HTTP *stream;
   struct Curl_easy *data_s;
-  size_t nread;
+  ssize_t nwritten;
+  CURLcode result;
   (void)flags;
 
   DEBUGASSERT(stream_id); /* should never be a zero stream ID here */
@@ -997,6 +1043,8 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
        in the pipeline. Silently ignore. */
     DEBUGF(LOG_CF(CF_DATA_CURRENT(cf), cf, "[h2sid=%u] Data for unknown",
                   stream_id));
+    /* consumed explicitly as no one will read it */
+    nghttp2_session_consume(session, stream_id, len);
     return 0;
   }
 
@@ -1004,11 +1052,13 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
   if(!stream)
     return NGHTTP2_ERR_CALLBACK_FAILURE;
 
-  nread = CURLMIN(stream->len, len);
-  memcpy(&stream->mem[stream->memlen], mem, nread);
+  nwritten = Curl_bufq_write(&stream->h2_recvbuf, mem, len, &result);
+  if(nwritten < 0) {
+    if(result != CURLE_AGAIN)
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
 
-  stream->len -= nread;
-  stream->memlen += nread;
+    nwritten = 0;
+  }
 
   /* if we receive data for another handle, wake that up */
   if(CF_DATA_CURRENT(cf) != data_s) {
@@ -1016,20 +1066,10 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
     Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
   }
 
-  DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu DATA recvd, "
-                "(buffer now holds %zu, %zu still free in %p)",
-                stream_id, nread,
-                stream->memlen, stream->len, (void *)stream->mem));
-
-  if(nread < len) {
-    stream->pausedata = mem + nread;
-    stream->pauselen = len - nread;
-    DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zu not recvd -> NGHTTP2_ERR_PAUSE",
-                  stream_id, len - nread));
-    ctx->pause_stream_id = stream_id;
-    drain_this(cf, data_s);
-    return NGHTTP2_ERR_PAUSE;
-  }
+  DEBUGASSERT((size_t)nwritten == len);
+  DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] %zd/%zu DATA recvd, "
+                "(buffer now holds %zu)",
+                stream_id, nwritten, len, Curl_bufq_len(&stream->h2_recvbuf)));
 
   return 0;
 }
@@ -1038,7 +1078,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
                            uint32_t error_code, void *userp)
 {
   struct Curl_cfilter *cf = userp;
-  struct cf_h2_ctx *ctx = cf->ctx;
   struct Curl_easy *data_s;
   struct HTTP *stream;
   int rv;
@@ -1074,11 +1113,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
           stream_id);
     DEBUGASSERT(0);
   }
-  if(stream_id == ctx->pause_stream_id) {
-    DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] closed the pause stream",
-                  stream_id));
-    ctx->pause_stream_id = 0;
-  }
   DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] closed now", stream_id));
   return 0;
 }
@@ -1110,33 +1144,6 @@ static int on_begin_headers(nghttp2_session *session,
   return 0;
 }
 
-/* Decode HTTP status code.  Returns -1 if no valid status code was
-   decoded. */
-static int decode_status_code(const uint8_t *value, size_t len)
-{
-  int i;
-  int res;
-
-  if(len != 3) {
-    return -1;
-  }
-
-  res = 0;
-
-  for(i = 0; i < 3; ++i) {
-    char c = value[i];
-
-    if(c < '0' || c > '9') {
-      return -1;
-    }
-
-    res *= 10;
-    res += c - '0';
-  }
-
-  return res;
-}
-
 /* frame->hd.type is either NGHTTP2_HEADERS or NGHTTP2_PUSH_PROMISE */
 static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
                      const uint8_t *name, size_t namelen,
@@ -1234,9 +1241,9 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
                   stream->stream_id,
                   (int)namelen, name,
                   (int)valuelen, value));
-    result = Curl_dyn_addf(&stream->trailer_recvbuf,
-                           "%.*s: %.*s\r\n", (int)namelen, name,
-                           (int)valuelen, value);
+    result = Curl_dynhds_add(&stream->resp_trailers,
+                             (const char *)name, namelen,
+                             (const char *)value, valuelen);
     if(result)
       return NGHTTP2_ERR_CALLBACK_FAILURE;
 
@@ -1245,25 +1252,25 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
 
   if(namelen == sizeof(H2H3_PSEUDO_STATUS) - 1 &&
      memcmp(H2H3_PSEUDO_STATUS, name, namelen) == 0) {
-    /* nghttp2 guarantees :status is received first and only once, and
-       value is 3 digits status code, and decode_status_code always
-       succeeds. */
+    /* nghttp2 guarantees :status is received first and only once. */
     char buffer[32];
-    stream->status_code = decode_status_code(value, valuelen);
-    DEBUGASSERT(stream->status_code != -1);
+    result = Curl_http_decode_status(&stream->status_code,
+                                     (const char *)value, valuelen);
+    if(result)
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
     msnprintf(buffer, sizeof(buffer), H2H3_PSEUDO_STATUS ":%u\r",
               stream->status_code);
     result = Curl_headers_push(data_s, buffer, CURLH_PSEUDO);
     if(result)
       return NGHTTP2_ERR_CALLBACK_FAILURE;
-    result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("HTTP/2 "));
+    result = recvbuf_write_hds(cf, data_s, STRCONST("HTTP/2 "));
     if(result)
       return NGHTTP2_ERR_CALLBACK_FAILURE;
-    result = Curl_dyn_addn(&stream->header_recvbuf, value, valuelen);
+    result = recvbuf_write_hds(cf, data_s, (const char *)value, valuelen);
     if(result)
       return NGHTTP2_ERR_CALLBACK_FAILURE;
     /* the space character after the status code is mandatory */
-    result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST(" \r\n"));
+    result = recvbuf_write_hds(cf, data_s, STRCONST(" \r\n"));
     if(result)
       return NGHTTP2_ERR_CALLBACK_FAILURE;
     /* if we receive data for another handle, wake that up */
@@ -1278,16 +1285,16 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
   /* nghttp2 guarantees that namelen > 0, and :status was already
      received, and this is not pseudo-header field . */
   /* convert to an HTTP1-style header */
-  result = Curl_dyn_addn(&stream->header_recvbuf, name, namelen);
+  result = recvbuf_write_hds(cf, data_s, (const char *)name, namelen);
   if(result)
     return NGHTTP2_ERR_CALLBACK_FAILURE;
-  result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST(": "));
+  result = recvbuf_write_hds(cf, data_s, STRCONST(": "));
   if(result)
     return NGHTTP2_ERR_CALLBACK_FAILURE;
-  result = Curl_dyn_addn(&stream->header_recvbuf, value, valuelen);
+  result = recvbuf_write_hds(cf, data_s, (const char *)value, valuelen);
   if(result)
     return NGHTTP2_ERR_CALLBACK_FAILURE;
-  result = Curl_dyn_addn(&stream->header_recvbuf, STRCONST("\r\n"));
+  result = recvbuf_write_hds(cf, data_s, STRCONST("\r\n"));
   if(result)
     return NGHTTP2_ERR_CALLBACK_FAILURE;
   /* if we receive data for another handle, wake that up */
@@ -1302,17 +1309,18 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
   return 0; /* 0 is successful */
 }
 
-static ssize_t data_source_read_callback(nghttp2_session *session,
-                                         int32_t stream_id,
-                                         uint8_t *buf, size_t length,
-                                         uint32_t *data_flags,
-                                         nghttp2_data_source *source,
-                                         void *userp)
+static ssize_t req_body_read_callback(nghttp2_session *session,
+                                      int32_t stream_id,
+                                      uint8_t *buf, size_t length,
+                                      uint32_t *data_flags,
+                                      nghttp2_data_source *source,
+                                      void *userp)
 {
   struct Curl_cfilter *cf = userp;
   struct Curl_easy *data_s;
   struct HTTP *stream = NULL;
-  size_t nread;
+  CURLcode result;
+  ssize_t nread;
   (void)source;
 
   (void)cf;
@@ -1332,23 +1340,25 @@ static ssize_t data_source_read_callback(nghttp2_session *session,
   else
     return NGHTTP2_ERR_INVALID_ARGUMENT;
 
-  nread = CURLMIN(stream->upload_len, length);
-  if(nread > 0) {
-    memcpy(buf, stream->upload_mem, nread);
-    stream->upload_mem += nread;
-    stream->upload_len -= nread;
-    if(data_s->state.infilesize != -1)
-      stream->upload_left -= nread;
+  nread = Curl_bufq_read(&stream->h2_sendbuf, buf, length, &result);
+  if(nread < 0) {
+    if(result != CURLE_AGAIN)
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
+    nread = 0;
   }
 
+  if(nread > 0 && data_s->state.infilesize != -1)
+    stream->upload_left -= nread;
+
+  DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] req_body_read(len=%zu) left=%zd"
+                " -> %zd, %d",
+                stream_id, length, stream->upload_left, nread, result));
+
   if(stream->upload_left == 0)
     *data_flags = NGHTTP2_DATA_FLAG_EOF;
   else if(nread == 0)
     return NGHTTP2_ERR_DEFERRED;
 
-  DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] data_source_read_callback: "
-                "returns %zu bytes", stream_id, nread));
-
   return nread;
 }
 
@@ -1374,8 +1384,9 @@ static void http2_data_done(struct Curl_cfilter *cf,
 
   /* there might be allocated resources done before this got the 'h2' pointer
      setup */
-  Curl_dyn_free(&stream->header_recvbuf);
-  Curl_dyn_free(&stream->trailer_recvbuf);
+  Curl_bufq_free(&stream->h2_sendbuf);
+  Curl_bufq_free(&stream->h2_recvbuf);
+  Curl_dynhds_free(&stream->resp_trailers);
   if(stream->push_headers) {
     /* if they weren't used and then freed before */
     for(; stream->push_headers_used > 0; --stream->push_headers_used) {
@@ -1388,24 +1399,17 @@ static void http2_data_done(struct Curl_cfilter *cf,
   if(!ctx || !ctx->h2)
     return;
 
-  /* do this before the reset handling, as that might clear ->stream_id */
-  if(stream->stream_id && stream->stream_id == ctx->pause_stream_id) {
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] DONE, the pause stream",
-                  stream->stream_id));
-    ctx->pause_stream_id = 0;
-  }
-
   (void)premature;
   if(!stream->closed && stream->stream_id) {
     /* RST_STREAM */
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] RST", stream->stream_id));
+    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] premature DATA_DONE, RST stream",
+                  stream->stream_id));
     if(!nghttp2_submit_rst_stream(ctx->h2, NGHTTP2_FLAG_NONE,
                                   stream->stream_id, NGHTTP2_STREAM_CLOSED))
       (void)nghttp2_session_send(ctx->h2);
   }
 
-  if(data->state.drain)
-    drained_transfer(cf, data);
+  drained_transfer(cf, data);
 
   /* -1 means unassigned and 0 means cleared */
   if(nghttp2_session_get_stream_user_data(ctx->h2, stream->stream_id)) {
@@ -1458,78 +1462,6 @@ CURLcode Curl_http2_request_upgrade(struct dynbuf *req,
   return result;
 }
 
-/*
- * h2_process_pending_input() processes pending input left in
- * httpc->inbuf.  Then, call h2_session_send() to send pending data.
- * This function returns 0 if it succeeds, or -1 and error code will
- * be assigned to *err.
- */
-static int h2_process_pending_input(struct Curl_cfilter *cf,
-                                    struct Curl_easy *data,
-                                    CURLcode *err)
-{
-  struct cf_h2_ctx *ctx = cf->ctx;
-  ssize_t nread;
-  ssize_t rv;
-
-  nread = ctx->inbuflen - ctx->nread_inbuf;
-  if(nread) {
-    char *inbuf = ctx->inbuf + ctx->nread_inbuf;
-
-    rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)inbuf, nread);
-    if(rv < 0) {
-      failf(data,
-            "h2_process_pending_input: nghttp2_session_mem_recv() returned "
-            "%zd:%s", rv, nghttp2_strerror((int)rv));
-      *err = CURLE_RECV_ERROR;
-      return -1;
-    }
-
-    if(nread == rv) {
-      DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
-      ctx->inbuflen = 0;
-      ctx->nread_inbuf = 0;
-    }
-    else {
-      ctx->nread_inbuf += rv;
-      DEBUGF(LOG_CF(data, cf, "h2_process_pending_input: %zu bytes left "
-                    "in connection buffer",
-                   ctx->inbuflen - ctx->nread_inbuf));
-    }
-  }
-
-  rv = h2_session_send(cf, data);
-  if(rv) {
-    *err = CURLE_SEND_ERROR;
-    return -1;
-  }
-
-  if(nghttp2_session_check_request_allowed(ctx->h2) == 0) {
-    /* No more requests are allowed in the current session, so
-       the connection may not be reused. This is set when a
-       GOAWAY frame has been received or when the limit of stream
-       identifiers has been reached. */
-    connclose(cf->conn, "http/2: No new requests allowed");
-  }
-
-  if(should_close_session(ctx)) {
-    struct HTTP *stream = data->req.p.http;
-    DEBUGF(LOG_CF(data, cf,
-                 "h2_process_pending_input: nothing to do in this session"));
-    if(stream->reset)
-      *err = CURLE_PARTIAL_FILE;
-    else if(stream->error)
-      *err = CURLE_HTTP2;
-    else {
-      /* not an error per se, but should still close the connection */
-      connclose(cf->conn, "GOAWAY received");
-      *err = CURLE_OK;
-    }
-    return -1;
-  }
-  return 0;
-}
-
 static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
                                      struct Curl_easy *data)
 {
@@ -1540,32 +1472,16 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
   if(!ctx || !ctx->h2)
     goto out;
 
+  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] data done", stream->stream_id));
   if(stream->upload_left) {
     /* If the stream still thinks there's data left to upload. */
-    stream->upload_left = 0; /* DONE! */
+    if(stream->upload_left == -1)
+      stream->upload_left = 0; /* DONE! */
 
     /* resume sending here to trigger the callback to get called again so
        that it can signal EOF to nghttp2 */
     (void)nghttp2_session_resume_data(ctx->h2, stream->stream_id);
-    (void)h2_process_pending_input(cf, data, &result);
-  }
-
-  /* If nghttp2 still has pending frames unsent */
-  if(nghttp2_session_want_write(ctx->h2)) {
-    struct SingleRequest *k = &data->req;
-    int rv;
-
-    DEBUGF(LOG_CF(data, cf, "HTTP/2 still wants to send data"));
-
-    /* and attempt to send the pending frames */
-    rv = h2_session_send(cf, data);
-    if(rv)
-      result = CURLE_SEND_ERROR;
-
-    if(nghttp2_session_want_write(ctx->h2)) {
-       /* re-set KEEP_SEND to make sure we are called again */
-       k->keepon |= KEEP_SEND;
-    }
+    drain_this(cf, data);
   }
 
 out:
@@ -1576,20 +1492,10 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
                                          struct Curl_easy *data,
                                          struct HTTP *stream, CURLcode *err)
 {
-  struct cf_h2_ctx *ctx = cf->ctx;
-
-  if(ctx->pause_stream_id == stream->stream_id) {
-    ctx->pause_stream_id = 0;
-  }
+  ssize_t rv = 0;
 
   drained_transfer(cf, data);
 
-  if(ctx->pause_stream_id == 0) {
-    if(h2_process_pending_input(cf, data, err) != 0) {
-      return -1;
-    }
-  }
-
   if(stream->error == NGHTTP2_REFUSED_STREAM) {
     DEBUGF(LOG_CF(data, cf, "[h2sid=%u] REFUSED_STREAM, try again on a new "
                   "connection", stream->stream_id));
@@ -1619,34 +1525,42 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
     return -1;
   }
 
-  if(Curl_dyn_len(&stream->trailer_recvbuf)) {
-    char *trailp = Curl_dyn_ptr(&stream->trailer_recvbuf);
-    char *lf;
+  if(Curl_dynhds_count(&stream->resp_trailers)) {
+    struct dynhds_entry *e;
+    struct dynbuf dbuf;
+    size_t i;
 
-    do {
-      size_t len = 0;
-      CURLcode result;
-      /* each trailer line ends with a newline */
-      lf = strchr(trailp, '\n');
-      if(!lf)
+    *err = CURLE_OK;
+    Curl_dyn_init(&dbuf, DYN_TRAILERS);
+    for(i = 0; i < Curl_dynhds_count(&stream->resp_trailers); ++i) {
+      e = Curl_dynhds_getn(&stream->resp_trailers, i);
+      if(!e)
         break;
-      len = lf + 1 - trailp;
-
-      Curl_debug(data, CURLINFO_HEADER_IN, trailp, len);
-      /* pass the trailers one by one to the callback */
-      result = Curl_client_write(data, CLIENTWRITE_HEADER, trailp, len);
-      if(result) {
-        *err = result;
-        return -1;
-      }
-      trailp = ++lf;
-    } while(lf);
+      Curl_dyn_reset(&dbuf);
+      *err = Curl_dyn_addf(&dbuf, "%.*s: %.*s\x0d\x0a",
+                          (int)e->namelen, e->name,
+                          (int)e->valuelen, e->value);
+      if(*err)
+        break;
+      Curl_debug(data, CURLINFO_HEADER_IN, Curl_dyn_ptr(&dbuf),
+                 Curl_dyn_len(&dbuf));
+      *err = Curl_client_write(data, CLIENTWRITE_HEADER|CLIENTWRITE_TRAILER,
+                               Curl_dyn_ptr(&dbuf), Curl_dyn_len(&dbuf));
+      if(*err)
+        break;
+    }
+    Curl_dyn_free(&dbuf);
+    if(*err)
+      goto out;
   }
 
   stream->close_handled = TRUE;
+  *err = CURLE_OK;
+  rv = 0;
 
-  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] closed cleanly", stream->stream_id));
-  return 0;
+out:
+  DEBUGF(LOG_CF(data, cf, "handle_stream_close -> %zd, %d", rv, *err));
+  return rv;
 }
 
 static int sweight_wanted(const struct Curl_easy *data)
@@ -1683,12 +1597,13 @@ static void h2_pri_spec(struct Curl_easy *data,
 }
 
 /*
- * h2_session_send() checks if there's been an update in the priority /
+ * Check if there's been an update in the priority /
  * dependency settings and if so it submits a PRIORITY frame with the updated
  * info.
+ * Flush any out data pending in the network buffer.
  */
-static CURLcode h2_session_send(struct Curl_cfilter *cf,
-                                struct Curl_easy *data)
+static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
+                                  struct Curl_easy *data)
 {
   struct cf_h2_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
@@ -1717,259 +1632,169 @@ out:
                   nghttp2_strerror(rv), rv));
     return CURLE_SEND_ERROR;
   }
-  return flush_output(cf, data);
+  return nw_out_flush(cf, data);
 }
 
-static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
-                          char *buf, size_t len, CURLcode *err)
+static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
+                           char *buf, size_t len, CURLcode *err)
 {
   struct cf_h2_ctx *ctx = cf->ctx;
   struct HTTP *stream = data->req.p.http;
   ssize_t nread = -1;
-  struct cf_call_data save;
-  bool conn_is_closed = FALSE;
-
-  CF_DATA_SAVE(save, cf, data);
 
-  /* If the h2 session has told us to GOAWAY with an error AND
-   * indicated the highest stream id it has processes AND
-   * the stream we are trying to read has a higher id, this
-   * means we will most likely not receive any more for it.
-   * Treat this as if the server explicitly had RST the stream */
-  if((ctx->goaway && ctx->goaway_error &&
-      ctx->last_stream_id > 0 &&
-      ctx->last_stream_id < stream->stream_id)) {
-    stream->reset = TRUE;
+  *err = CURLE_AGAIN;
+  if(!Curl_bufq_is_empty(&stream->h2_recvbuf)) {
+    nread = Curl_bufq_read(&stream->h2_recvbuf,
+                           (unsigned char *)buf, len, err);
+    DEBUGF(LOG_CF(data, cf, "recvbuf read(len=%zu) -> %zd, %d",
+                  len, nread, *err));
+    if(nread < 0)
+      goto out;
+    DEBUGASSERT(nread > 0);
   }
 
-  /* If a stream is RST, it does not matter what state the h2 session
-   * is in, our answer to receiving data is always the same. */
-  if(stream->reset) {
-    *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+  if(nread < 0) {
+    if(stream->closed) {
+      nread = http2_handle_stream_close(cf, data, stream, err);
+    }
+    else if(stream->reset ||
+            (ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) ||
+            (ctx->goaway && ctx->last_stream_id < stream->stream_id)) {
+      *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+      nread = -1;
+    }
+  }
+  else if(nread == 0) {
+    *err = CURLE_AGAIN;
     nread = -1;
-    goto out;
   }
 
-  if(should_close_session(ctx)) {
-    DEBUGF(LOG_CF(data, cf, "http2_recv: nothing to do in this session"));
-    if(cf->conn->bits.close) {
-      /* already marked for closure, return OK and we're done */
-      drained_transfer(cf, data);
-      *err = CURLE_OK;
-      nread = 0;
-      goto out;
-    }
-    *err = CURLE_HTTP2;
-    nread = -1;
-    goto out;
+out:
+  DEBUGF(LOG_CF(data, cf, "stream_recv(len=%zu) -> %zd, %d",
+                len, nread, *err));
+  return nread;
+}
+
+static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
+                                    struct Curl_easy *data)
+{
+  struct cf_h2_ctx *ctx = cf->ctx;
+  struct HTTP *stream = data->req.p.http;
+  CURLcode result = CURLE_OK;
+  ssize_t nread;
+  bool keep_reading = TRUE;
+
+  /* Process network input buffer fist */
+  if(!Curl_bufq_is_empty(&ctx->inbufq)) {
+    DEBUGF(LOG_CF(data, cf, "Process %zd bytes in connection buffer",
+                  Curl_bufq_len(&ctx->inbufq)));
+    if(h2_process_pending_input(cf, data, &result) < 0)
+      return result;
   }
 
-  /* Nullify here because we call nghttp2_session_send() and they
-     might refer to the old buffer. */
-  stream->upload_mem = NULL;
-  stream->upload_len = 0;
+  /* Receive data from the "lower" filters, e.g. network until
+   * it is time to stop or we have enough data for this stream */
+  while(keep_reading &&
+        !ctx->conn_closed &&               /* not closed the connection */
+        !stream->closed &&                 /* nor the stream */
+        Curl_bufq_is_empty(&ctx->inbufq) && /* and we consumed our input */
+        !Curl_bufq_is_full(&stream->h2_recvbuf) && /* enough? */
+        Curl_bufq_len(&stream->h2_recvbuf) < data->set.buffer_size) {
+
+    nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
+    DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
+                  Curl_bufq_len(&ctx->inbufq), nread, result));
+    if(nread < 0) {
+      if(result != CURLE_AGAIN) {
+        failf(data, "Failed receiving HTTP2 data");
+        return result;
+      }
+      break;
+    }
+    else if(nread == 0) {
+      ctx->conn_closed = TRUE;
+      break;
+    }
 
-  /*
-   * At this point 'stream' is just in the Curl_easy the connection
-   * identifies as its owner at this time.
-   */
+    keep_reading = Curl_bufq_is_full(&ctx->inbufq);
+    if(h2_process_pending_input(cf, data, &result))
+      return result;
+  }
 
-  if(stream->bodystarted &&
-     stream->nread_header_recvbuf < Curl_dyn_len(&stream->header_recvbuf)) {
-    /* If there is header data pending for this stream to return, do that */
-    size_t left =
-      Curl_dyn_len(&stream->header_recvbuf) - stream->nread_header_recvbuf;
-    size_t ncopy = CURLMIN(len, left);
-    memcpy(buf, Curl_dyn_ptr(&stream->header_recvbuf) +
-           stream->nread_header_recvbuf, ncopy);
-    stream->nread_header_recvbuf += ncopy;
-
-    DEBUGF(LOG_CF(data, cf, "recv: Got %d bytes from header_recvbuf",
-                  (int)ncopy));
-    nread = ncopy;
-    goto out;
+  if(ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) {
+    connclose(cf->conn, "GOAWAY received");
   }
 
+  return CURLE_OK;
+}
+
+static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
+                          char *buf, size_t len, CURLcode *err)
+{
+  struct cf_h2_ctx *ctx = cf->ctx;
+  struct HTTP *stream = data->req.p.http;
+  ssize_t nread = -1;
+  CURLcode result;
+  struct cf_call_data save;
+
+  CF_DATA_SAVE(save, cf, data);
+
   DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u",
                 stream->stream_id,
                 nghttp2_session_get_local_window_size(ctx->h2),
                 nghttp2_session_get_stream_local_window_size(ctx->h2,
                                                              stream->stream_id)
            ));
+  nread = stream_recv(cf, data, buf, len, err);
+  if(nread < 0 && *err != CURLE_AGAIN)
+    goto out;
 
-  if(stream->memlen) {
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: DRAIN %zu bytes (%p => %p)",
-                  stream->stream_id, stream->memlen,
-                  (void *)stream->mem, (void *)buf));
-    if(buf != stream->mem) {
-      /* if we didn't get the same buffer this time, we must move the data to
-         the beginning */
-      memmove(buf, stream->mem, stream->memlen);
-      stream->len = len - stream->memlen;
-      stream->mem = buf;
-    }
-
-    if(ctx->pause_stream_id == stream->stream_id && !stream->pausedata) {
-      /* We have paused nghttp2, but we have no pause data (see
-         on_data_chunk_recv). */
-      ctx->pause_stream_id = 0;
-      if(h2_process_pending_input(cf, data, err) != 0) {
-        nread = -1;
-        goto out;
-      }
-    }
-  }
-  else if(stream->pausedata) {
-    DEBUGASSERT(ctx->pause_stream_id == stream->stream_id);
-    nread = CURLMIN(len, stream->pauselen);
-    memcpy(buf, stream->pausedata, nread);
-
-    stream->pausedata += nread;
-    stream->pauselen -= nread;
-    drain_this(cf, data);
-
-    if(stream->pauselen == 0) {
-      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] Unpaused", stream->stream_id));
-      DEBUGASSERT(ctx->pause_stream_id == stream->stream_id);
-      ctx->pause_stream_id = 0;
+  if(nread < 0) {
+    *err = h2_progress_ingress(cf, data);
+    if(*err)
+      goto out;
 
-      stream->pausedata = NULL;
-      stream->pauselen = 0;
-    }
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: returns unpaused %zd bytes",
-                  stream->stream_id, nread));
-    goto out;
-  }
-  else if(ctx->pause_stream_id) {
-    /* If a stream paused nghttp2_session_mem_recv previously, and has
-       not processed all data, it still refers to the buffer in
-       nghttp2_session.  If we call nghttp2_session_mem_recv(), we may
-       overwrite that buffer.  To avoid that situation, just return
-       here with CURLE_AGAIN.  This could be busy loop since data in
-       socket is not read.  But it seems that usually streams are
-       notified with its drain property, and socket is read again
-       quickly. */
-    if(stream->closed) {
-      /* closed overrides paused */
+    nread = stream_recv(cf, data, buf, len, err);
+    if(Curl_bufq_is_empty(&stream->h2_recvbuf)) {
       drained_transfer(cf, data);
-      nread = 0;
-      goto out;
     }
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] is paused, pause h2sid: %u",
-                  stream->stream_id, ctx->pause_stream_id));
-    *err = CURLE_AGAIN;
-    nread = -1;
-    goto out;
   }
-  else {
-    /* We have nothing buffered for `data` and no other stream paused
-     * the processing of incoming data, we can therefore read new data
-     * from the network.
-     * If DATA is coming for this stream, we want to store it ad the
-     * `buf` passed in right away - saving us a copy.
-     */
-    stream->mem = buf;
-    stream->len = len;
-    stream->memlen = 0;
-
-    if(ctx->inbuflen > 0) {
-      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] %zd bytes in inbuf",
-                    stream->stream_id, ctx->inbuflen - ctx->nread_inbuf));
-      if(h2_process_pending_input(cf, data, err))
-        return -1;
-    }
 
-    while(stream->memlen == 0 &&       /* have no data for this stream */
-          !stream->closed &&           /* and it is not closed/reset */
-          !ctx->pause_stream_id &&     /* we are not paused either */
-          ctx->inbuflen == 0 &&       /* and out input buffer is empty */
-          !conn_is_closed) {          /* and connection is not closed */
-      /* Receive data from the "lower" filters */
-      nread = Curl_conn_cf_recv(cf->next, data, ctx->inbuf, H2_BUFSIZE, err);
-      if(nread < 0) {
-        DEBUGASSERT(*err);
-        if(*err == CURLE_AGAIN) {
-          break;
-        }
-        failf(data, "Failed receiving HTTP2 data");
-        conn_is_closed = TRUE;
-      }
-      else if(nread == 0) {
-        DEBUGF(LOG_CF(data, cf, "[h2sid=%u] underlying connection is closed",
-                      stream->stream_id));
-        conn_is_closed = TRUE;
+  if(nread > 0) {
+    size_t data_consumed = (size_t)nread;
+    /* Now that we transferred this to the upper layer, we report
+     * the actual amount of DATA consumed to the H2 session, so
+     * that it adjusts stream flow control */
+    if(stream->h2_recv_hds_len >= data_consumed) {
+      stream->h2_recv_hds_len -= data_consumed;  /* no DATA */
+    }
+    else {
+      if(stream->h2_recv_hds_len) {
+        data_consumed -= stream->h2_recv_hds_len;
+        stream->h2_recv_hds_len = 0;
       }
-      else {
-        DEBUGF(LOG_CF(data, cf, "[h2sid=%u] read %zd from connection",
-                      stream->stream_id, nread));
-        ctx->inbuflen = nread;
-        DEBUGASSERT(ctx->nread_inbuf == 0);
-        if(h2_process_pending_input(cf, data, err))
-          return -1;
+      if(data_consumed) {
+        DEBUGF(LOG_CF(data, cf, "[h2sid=%u] increase window by %zu",
+                      stream->stream_id, data_consumed));
+        nghttp2_session_consume(ctx->h2, stream->stream_id, data_consumed);
       }
     }
 
-  }
-
-  if(stream->memlen) {
-    ssize_t retlen = stream->memlen;
-
-    /* TODO: all this buffer handling is very brittle */
-    stream->len += stream->memlen;
-    stream->memlen = 0;
-
-    if(ctx->pause_stream_id == stream->stream_id) {
-      /* data for this stream is returned now, but this stream caused a pause
-         already so we need it called again asap */
-      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] Data returned for PAUSED stream",
-                    stream->stream_id));
-      drain_this(cf, data);
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
-    }
-    else if(stream->closed) {
-      if(stream->reset || stream->error) {
-        nread = http2_handle_stream_close(cf, data, stream, err);
-        goto out;
-      }
-      /* this stream is closed, trigger a another read ASAP to detect that */
-      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] is closed now, run again",
+    if(stream->closed) {
+      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] closed stream, set drain",
                     stream->stream_id));
       drain_this(cf, data);
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
     }
-    else {
-      drained_transfer(cf, data);
-    }
-
-    *err = CURLE_OK;
-    nread = retlen;
-    goto out;
   }
 
-  if(conn_is_closed && !stream->closed) {
-    /* underlying connection is closed and we have nothing for the stream.
-     * Treat this as a RST */
-    stream->closed = stream->reset = TRUE;
-      failf(data, "HTTP/2 stream %u was not closed cleanly before"
-            " end of the underlying connection",
-            stream->stream_id);
-  }
-
-  if(stream->closed) {
-    nread = http2_handle_stream_close(cf, data, stream, err);
-    goto out;
-  }
-
-  if(!data->state.drain && Curl_conn_cf_data_pending(cf->next, data)) {
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pending data, set drain",
-                  stream->stream_id));
-    drain_this(cf, data);
-  }
-  *err = CURLE_AGAIN;
-  nread = -1;
 out:
-  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv -> %zd, %d",
-                stream->stream_id, nread, *err));
+  result = h2_progress_egress(cf, data);
+  if(result) {
+    *err = result;
+    nread = -1;
+  }
+  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv(len=%zu) -> %zd %d",
+                stream->stream_id, len, nread, *err));
   CF_DATA_RESTORE(cf, save);
   return nread;
 }
@@ -1996,9 +1821,15 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
   ssize_t nwritten;
 
   CF_DATA_SAVE(save, cf, data);
-  DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len));
 
   if(stream->stream_id != -1) {
+    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%u",
+                stream->stream_id,
+                nghttp2_session_get_remote_window_size(ctx->h2),
+                nghttp2_session_get_stream_remote_window_size(
+                  ctx->h2, stream->stream_id)
+           ));
+
     if(stream->close_handled) {
       infof(data, "stream %u closed", stream->stream_id);
       *err = CURLE_HTTP2_STREAM;
@@ -2011,25 +1842,36 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     }
     /* If stream_id != -1, we have dispatched request HEADERS, and now
        are going to send or sending request body in DATA frame */
-    stream->upload_mem = buf;
-    stream->upload_len = len;
-    rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
-    if(nghttp2_is_fatal(rv)) {
-      *err = CURLE_SEND_ERROR;
+    nwritten = Curl_bufq_write(&stream->h2_sendbuf, buf, len, err);
+    if(nwritten < 0) {
+      if(*err != CURLE_AGAIN)
+        goto out;
+      nwritten = 0;
+    }
+
+    if(!Curl_bufq_is_empty(&stream->h2_sendbuf)) {
+      rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
+      if(nghttp2_is_fatal(rv)) {
+        *err = CURLE_SEND_ERROR;
+        nwritten = -1;
+        goto out;
+      }
+    }
+
+    result = h2_progress_ingress(cf, data);
+    if(result) {
+      *err = result;
       nwritten = -1;
       goto out;
     }
-    result = h2_session_send(cf, data);
+
+    result = h2_progress_egress(cf, data);
     if(result) {
       *err = result;
       nwritten = -1;
       goto out;
     }
 
-    nwritten = (ssize_t)len - (ssize_t)stream->upload_len;
-    stream->upload_mem = NULL;
-    stream->upload_len = 0;
-
     if(should_close_session(ctx)) {
       DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
       *err = CURLE_HTTP2;
@@ -2037,14 +1879,6 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
       goto out;
     }
 
-    if(stream->upload_left) {
-      /* we are sure that we have more data to send here.  Calling the
-         following API will make nghttp2_session_want_write() return
-         nonzero if remote window allows it, which then libcurl checks
-         socket is writable or not.  See http2_perform_getsock(). */
-      nghttp2_session_resume_data(ctx->h2, stream->stream_id);
-    }
-
     if(!nwritten) {
       size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
                                                           stream->stream_id);
@@ -2060,18 +1894,23 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
                  "window is exhausted", stream->stream_id));
         }
     }
-    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ",
-           stream->stream_id, nwritten));
-
     /* handled writing BODY for open stream. */
     goto out;
   }
+
+  DEBUGF(LOG_CF(data, cf, "cf_send, submit %s", data->state.url));
+  if(!stream->h2_send_hds_len) {
+    /* first invocation carries the HTTP/1.1 formatted request headers.
+     * we remember that in case we EAGAIN this call, because the next
+     * invocation may have added request body data into the buffer. */
+    stream->h2_send_hds_len = len;
+  }
+
   /* Stream has not been opened yet. `buf` is expected to contain
-   * request headers. */
-  /* TODO: this assumes that the `buf` and `len` we are called with
-   * is *all* HEADERs and no body. We have no way to determine here
-   * if that is indeed the case. */
-  result = Curl_pseudo_headers(data, buf, len, NULL, &hreq);
+   * `stream->h2_send_hds_len` bytes of request headers. */
+  DEBUGASSERT(stream->h2_send_hds_len <= len);
+  result = Curl_pseudo_headers(data, buf, stream->h2_send_hds_len,
+                               NULL, &hreq);
   if(result) {
     *err = result;
     nwritten = -1;
@@ -2114,7 +1953,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
       /* data sending without specifying the data amount up front */
       stream->upload_left = -1; /* unknown, but not zero */
 
-    data_prd.read_callback = data_source_read_callback;
+    data_prd.read_callback = req_body_read_callback;
     data_prd.source.ptr = NULL;
     stream_id = nghttp2_submit_request(ctx->h2, &pri_spec, nva, nheader,
                                        &data_prd, data);
@@ -2134,14 +1973,21 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     goto out;
   }
 
+  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send(len=%zu) submit %s",
+                stream_id, len, data->state.url));
   infof(data, "Using Stream ID: %u (easy handle %p)",
         stream_id, (void *)data);
   stream->stream_id = stream_id;
-  /* See TODO above. We assume that the whole buf was consumed by
-   * generating the request headers. */
-  nwritten = len;
+  nwritten = stream->h2_send_hds_len;
+
+  result = h2_progress_ingress(cf, data);
+  if(result) {
+    *err = result;
+    nwritten = -1;
+    goto out;
+  }
 
-  result = h2_session_send(cf, data);
+  result = h2_progress_egress(cf, data);
   if(result) {
     *err = result;
     nwritten = -1;
@@ -2155,16 +2001,9 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     goto out;
   }
 
-  /* If whole HEADERS frame was sent off to the underlying socket, the nghttp2
-     library calls data_source_read_callback. But only it found that no data
-     available, so it deferred the DATA transmission. Which means that
-     nghttp2_session_want_write() returns 0 on http2_perform_getsock(), which
-     results that no writable socket check is performed. To workaround this,
-     we issue nghttp2_session_resume_data() here to bring back DATA
-     transmission from deferred state. */
-  nghttp2_session_resume_data(ctx->h2, stream->stream_id);
-
 out:
+  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send -> %zd, %d",
+         stream->stream_id, nwritten, *err));
   CF_DATA_RESTORE(cf, save);
   return nwritten;
 }
@@ -2182,7 +2021,7 @@ static int cf_h2_get_select_socks(struct Curl_cfilter *cf,
   CF_DATA_SAVE(save, cf, data);
   sock[0] = Curl_conn_cf_get_socket(cf, data);
 
-  if(!(k->keepon & KEEP_RECV_PAUSE))
+  if(!(k->keepon & (KEEP_RECV_PAUSE|KEEP_RECV_HOLD)))
     /* Unless paused - in an HTTP/2 connection we can basically always get a
        frame so we should always be ready for one */
     bitmap |= GETSOCK_READSOCK(0);
@@ -2230,10 +2069,13 @@ static CURLcode cf_h2_connect(struct Curl_cfilter *cf,
       goto out;
   }
 
-  if(-1 == h2_process_pending_input(cf, data, &result)) {
-    result = CURLE_HTTP2;
+  result = h2_progress_ingress(cf, data);
+  if(result)
+    goto out;
+
+  result = h2_progress_egress(cf, data);
+  if(result)
     goto out;
-  }
 
   *done = TRUE;
   cf->connected = TRUE;
@@ -2278,7 +2120,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
 #ifdef NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE
   if(ctx && ctx->h2) {
     struct HTTP *stream = data->req.p.http;
-    uint32_t window = !pause * HTTP2_HUGE_WINDOW_SIZE;
+    uint32_t window = !pause * H2_STREAM_WINDOW_SIZE;
     CURLcode result;
 
     int rv = nghttp2_session_set_local_window_size(ctx->h2,
@@ -2292,7 +2134,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
     }
 
     /* make sure the window update gets sent */
-    result = h2_session_send(cf, data);
+    result = h2_progress_egress(cf, data);
     if(result)
       return result;
 
@@ -2329,10 +2171,9 @@ static CURLcode cf_h2_cntrl(struct Curl_cfilter *cf,
     result = http2_data_setup(cf, data);
     break;
   }
-  case CF_CTRL_DATA_PAUSE: {
+  case CF_CTRL_DATA_PAUSE:
     result = http2_data_pause(cf, data, (arg1 != 0));
     break;
-  }
   case CF_CTRL_DATA_DONE_SEND: {
     result = http2_data_done_send(cf, data);
     break;
@@ -2352,7 +2193,10 @@ static bool cf_h2_data_pending(struct Curl_cfilter *cf,
                                const struct Curl_easy *data)
 {
   struct cf_h2_ctx *ctx = cf->ctx;
-  if(ctx && ctx->inbuflen > 0 && ctx->nread_inbuf > ctx->inbuflen)
+  struct HTTP *stream = data->req.p.http;
+
+  if(ctx && (!Curl_bufq_is_empty(&ctx->inbufq)
+            || (stream && !Curl_bufq_is_empty(&stream->h2_recvbuf))))
     return TRUE;
   return cf->next? cf->next->cft->has_data_pending(cf->next, data) : FALSE;
 }
@@ -2606,23 +2450,26 @@ CURLcode Curl_http2_upgrade(struct Curl_easy *data,
   if(result)
     return result;
 
-  if(nread) {
-    /* we are going to copy mem to httpc->inbuf.  This is required since
-       mem is part of buffer pointed by stream->mem, and callbacks
-       called by nghttp2_session_mem_recv() will write stream specific
-       data into stream->mem, overwriting data already there. */
-    if(H2_BUFSIZE < nread) {
-      failf(data, "connection buffer size is too small to store data "
-            "following HTTP Upgrade response header: buflen=%d, datalen=%zu",
-            H2_BUFSIZE, nread);
+  if(nread > 0) {
+    /* Remaining data from the protocol switch reply is already using
+     * the switched protocol, ie. HTTP/2. We add that to the network
+     * inbufq. */
+    ssize_t copied;
+
+    copied = Curl_bufq_write(&ctx->inbufq,
+                             (const unsigned char *)mem, nread, &result);
+    if(copied < 0) {
+      failf(data, "error on copying HTTP Upgrade response: %d", result);
+      return CURLE_RECV_ERROR;
+    }
+    if((size_t)copied < nread) {
+      failf(data, "connection buffer size could not take all data "
+            "from HTTP Upgrade response header: copied=%zd, datalen=%zu",
+            copied, nread);
       return CURLE_HTTP2;
     }
-
-    infof(data, "Copying HTTP/2 data in stream buffer to connection buffer"
+    infof(data, "Copied HTTP/2 data in stream buffer to connection buffer"
           " after upgrade: len=%zu", nread);
-    DEBUGASSERT(ctx->nread_inbuf == 0);
-    memcpy(ctx->inbuf, mem, nread);
-    ctx->inbuflen = nread;
   }
 
   conn->httpversion = 20; /* we know we're on HTTP/2 now */
index d7cc73af4181914a4c41926c95c5d419ab225d5d..6b14bfb37a682469c684d6ef369782ef3dd63f0c 100644 (file)
@@ -49,7 +49,7 @@ User-Agent: curl/%VERSION
 Accept: */*\r
 Connection: Upgrade, HTTP2-Settings\r
 Upgrade: %H2CVER\r
-HTTP2-Settings: AAMAAABkAAQCAAAAAAIAAAAA\r
+HTTP2-Settings: AAMAAABkAAQACAAAAAIAAAAA\r
 \r
 </protocol>
 </verify>
index 271bf31fd07b5b3e0984a50d1ffa29949cd24fdc..294008d01cd6b4ac8f79993c5fccdc69b79ca5f3 100644 (file)
@@ -59,17 +59,16 @@ class ScoreCard:
 
     def handshakes(self, proto: str) -> Dict[str, Any]:
         props = {}
-        sample_size = 10
-        self.info(f'handshaking ')
+        sample_size = 5
+        self.info(f'TLS Handshake\n')
         for authority in [
             f'{self.env.authority_for(self.env.domain1, proto)}'
         ]:
-            self.info('localhost')
+            self.info('  localhost...')
             c_samples = []
             hs_samples = []
             errors = []
             for i in range(sample_size):
-                self.info('.')
                 curl = CurlClient(env=self.env, silent=True)
                 url = f'https://{authority}/'
                 r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True)
@@ -79,20 +78,25 @@ class ScoreCard:
                 else:
                     errors.append(f'exit={r.exit_code}')
             props['localhost'] = {
-                'connect': mean(c_samples),
-                'handshake': mean(hs_samples),
-                'errors': errors
+                'ipv4-connect': mean(c_samples),
+                'ipv4-handshake': mean(hs_samples),
+                'ipv4-errors': errors,
+                'ipv6-connect': 0,
+                'ipv6-handshake': 0,
+                'ipv6-errors': [],
             }
+            self.info('ok.\n')
         for authority in [
-            'curl.se', 'google.com', 'cloudflare.com', 'nghttp2.org',
+            'curl.se', 'nghttp2.org',
         ]:
+            self.info(f'  {authority}...')
+            props[authority] = {}
             for ipv in ['ipv4', 'ipv6']:
-                self.info(f'{authority}-{ipv}')
+                self.info(f'{ipv}...')
                 c_samples = []
                 hs_samples = []
                 errors = []
                 for i in range(sample_size):
-                    self.info('.')
                     curl = CurlClient(env=self.env, silent=True)
                     args = [
                         '--http3-only' if proto == 'h3' else '--http2',
@@ -104,12 +108,10 @@ class ScoreCard:
                         hs_samples.append(r.stats[0]['time_appconnect'])
                     else:
                         errors.append(f'exit={r.exit_code}')
-                props[f'{authority}-{ipv}'] = {
-                    'connect': mean(c_samples) if len(c_samples) else -1,
-                    'handshake': mean(hs_samples) if len(hs_samples) else -1,
-                    'errors': errors
-                }
-        self.info('\n')
+                    props[authority][f'{ipv}-connect'] = mean(c_samples) if len(c_samples) else -1
+                    props[authority][f'{ipv}-handshake'] = mean(hs_samples) if len(hs_samples) else -1
+                    props[authority][f'{ipv}-errors'] = errors
+            self.info('ok.\n')
         return props
 
     def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
@@ -138,16 +140,17 @@ class ScoreCard:
         count = 1
         samples = []
         errors = []
-        self.info(f'{sample_size}x single')
+        self.info(f'single...')
         for i in range(sample_size):
             curl = CurlClient(env=self.env, silent=True)
-            r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True)
+            r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
+                                   with_headers=False)
             err = self._check_downloads(r, count)
             if err:
                 errors.append(err)
             else:
-                samples.append(r.stats[0]['speed_download'])
-        self.info(f'.')
+                total_size = sum([s['size_download'] for s in r.stats])
+                samples.append(total_size / r.duration.total_seconds())
         return {
             'count': count,
             'samples': sample_size,
@@ -160,17 +163,17 @@ class ScoreCard:
         samples = []
         errors = []
         url = f'{url}?[0-{count - 1}]'
-        self.info(f'{sample_size}x{count} serial')
+        self.info(f'serial...')
         for i in range(sample_size):
             curl = CurlClient(env=self.env, silent=True)
-            r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True)
-            self.info(f'.')
+            r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
+                                   with_headers=False)
             err = self._check_downloads(r, count)
             if err:
                 errors.append(err)
             else:
-                for s in r.stats:
-                    samples.append(s['speed_download'])
+                total_size = sum([s['size_download'] for s in r.stats])
+                samples.append(total_size / r.duration.total_seconds())
         return {
             'count': count,
             'samples': sample_size,
@@ -183,19 +186,18 @@ class ScoreCard:
         samples = []
         errors = []
         url = f'{url}?[0-{count - 1}]'
-        self.info(f'{sample_size}x{count} parallel')
+        self.info(f'parallel...')
         for i in range(sample_size):
             curl = CurlClient(env=self.env, silent=True)
-            start = datetime.now()
             r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
-                                   extra_args=['--parallel'])
+                                   with_headers=False,
+                                   extra_args=['--parallel', '--parallel-max', str(count)])
             err = self._check_downloads(r, count)
             if err:
                 errors.append(err)
             else:
-                duration = datetime.now() - start
                 total_size = sum([s['size_download'] for s in r.stats])
-                samples.append(total_size / duration.total_seconds())
+                samples.append(total_size / r.duration.total_seconds())
         return {
             'count': count,
             'samples': sample_size,
@@ -210,7 +212,7 @@ class ScoreCard:
             'serial': self.transfer_serial(url=url, proto=proto, count=count),
             'parallel': self.transfer_parallel(url=url, proto=proto, count=count),
         }
-        self.info(f'\n')
+        self.info(f'ok.\n')
         return props
 
     def downloads(self, proto: str, test_httpd: bool = True,
@@ -234,9 +236,9 @@ class ScoreCard:
             url100 = f'https://{self.env.domain1}:{port}/score100.data'
             scores[via] = {
                 'description': descr,
-                '1MB-local': self.download_url(url=url1, proto=proto, count=50),
-                '10MB-local': self.download_url(url=url10, proto=proto, count=50),
-                '100MB-local': self.download_url(url=url100, proto=proto, count=50),
+                '1MB': self.download_url(url=url1, proto=proto, count=50),
+                '10MB': self.download_url(url=url10, proto=proto, count=50),
+                '100MB': self.download_url(url=url100, proto=proto, count=50),
             }
         if test_caddy and self.caddy:
             port = self.caddy.port
@@ -251,15 +253,85 @@ class ScoreCard:
             url100 = f'https://{self.env.domain1}:{port}/score100.data'
             scores[via] = {
                 'description': descr,
-                '1MB-local': self.download_url(url=url1, proto=proto, count=50),
-                '10MB-local': self.download_url(url=url10, proto=proto, count=50),
-                '100MB-local': self.download_url(url=url100, proto=proto, count=50),
+                '1MB': self.download_url(url=url1, proto=proto, count=50),
+                '10MB': self.download_url(url=url10, proto=proto, count=50),
+                '100MB': self.download_url(url=url100, proto=proto, count=50),
+            }
+        return scores
+
+    def do_requests(self, url: str, proto: str, count: int, max_parallel: int = 1):
+        sample_size = 1
+        samples = []
+        errors = []
+        url = f'{url}?[0-{count - 1}]'
+        extra_args = ['--parallel', '--parallel-max', str(max_parallel)] if max_parallel > 1 else []
+        self.info(f'{max_parallel}...')
+        for i in range(sample_size):
+            curl = CurlClient(env=self.env)
+            r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
+                                   with_headers=False,
+                                   extra_args=extra_args)
+            err = self._check_downloads(r, count)
+            if err:
+                errors.append(err)
+            else:
+                for s in r.stats:
+                    samples.append(count / r.duration.total_seconds())
+        return {
+            'count': count,
+            'samples': sample_size,
+            'speed': mean(samples) if len(samples) else -1,
+            'errors': errors
+        }
+
+    def requests_url(self, url: str, proto: str, count: int):
+        self.info(f'  {url}: ')
+        props = {
+            'serial': self.do_requests(url=url, proto=proto, count=count),
+            'par-6': self.do_requests(url=url, proto=proto, count=count, max_parallel=6),
+            'par-25': self.do_requests(url=url, proto=proto, count=count, max_parallel=25),
+            'par-50': self.do_requests(url=url, proto=proto, count=count, max_parallel=50),
+            'par-100': self.do_requests(url=url, proto=proto, count=count, max_parallel=100),
+        }
+        self.info(f'ok.\n')
+        return props
+
+    def requests(self, proto: str, test_httpd: bool = True,
+                 test_caddy: bool = True) -> Dict[str, Any]:
+        scores = {}
+        if test_httpd:
+            if proto == 'h3':
+                port = self.env.h3_port
+                via = 'nghttpx'
+                descr = f'port {port}, proxying httpd'
+            else:
+                port = self.env.https_port
+                via = 'httpd'
+                descr = f'port {port}'
+            self.info(f'{via} requests\n')
+            self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='reqs10.data', fsize=10*1024)
+            url1 = f'https://{self.env.domain1}:{port}/reqs10.data'
+            scores[via] = {
+                'description': descr,
+                '10KB': self.requests_url(url=url1, proto=proto, count=10000),
+            }
+        if test_caddy and self.caddy:
+            port = self.caddy.port
+            via = 'caddy'
+            descr = f'port {port}'
+            self.info('caddy requests\n')
+            self._make_docs_file(docs_dir=self.caddy.docs_dir, fname='req10.data', fsize=10 * 1024)
+            url1 = f'https://{self.env.domain1}:{port}/req10.data'
+            scores[via] = {
+                'description': descr,
+                '10KB': self.requests_url(url=url1, proto=proto, count=5000),
             }
         return scores
 
     def score_proto(self, proto: str,
                     handshakes: bool = True,
                     downloads: bool = True,
+                    requests: bool = True,
                     test_httpd: bool = True,
                     test_caddy: bool = True):
         self.info(f"scoring {proto}\n")
@@ -280,6 +352,10 @@ class ScoreCard:
                 if self.env.curl_uses_lib(lib):
                     p['implementation'] = lib
                     break
+        elif proto == 'h1' or proto == 'http/1.1':
+            proto = 'http/1.1'
+            p['name'] = proto
+            p['implementation'] = 'hyper' if self.env.curl_uses_lib('hyper') else 'native'
         else:
             raise ScoreCardException(f"unknown protocol: {proto}")
 
@@ -298,6 +374,10 @@ class ScoreCard:
             score['downloads'] = self.downloads(proto=proto,
                                                 test_httpd=test_httpd,
                                                 test_caddy=test_caddy)
+        if requests:
+            score['requests'] = self.requests(proto=proto,
+                                              test_httpd=test_httpd,
+                                              test_caddy=test_caddy)
         self.info("\n")
         return score
 
@@ -310,44 +390,86 @@ class ScoreCard:
     def fmt_mbs(self, val):
         return f'{val/(1024*1024):0.000f} MB/s' if val >= 0 else '--'
 
+    def fmt_reqs(self, val):
+        return f'{val:0.000f} r/s' if val >= 0 else '--'
+
     def print_score(self, score):
         print(f'{score["protocol"]["name"].upper()} in curl {score["curl"]} ({score["os"]}) via '
               f'{score["protocol"]["implementation"]}/{score["protocol"]["version"]} ')
         if 'handshakes' in score:
-            print('Handshakes')
-            print(f'  {"Host":<25} {"Connect":>12} {"Handshake":>12}     {"Errors":<20}')
+            print(f'{"Handshakes":<24} {"ipv4":25} {"ipv6":28}')
+            print(f'  {"Host":<17} {"Connect":>12} {"Handshake":>12} '
+                  f'{"Connect":>12} {"Handshake":>12}     {"Errors":<20}')
             for key, val in score["handshakes"].items():
-                print(f'  {key:<25} {self.fmt_ms(val["connect"]):>12} '''
-                      f'{self.fmt_ms(val["handshake"]):>12}     {"/".join(val["errors"]):<20}')
+                print(f'  {key:<17} {self.fmt_ms(val["ipv4-connect"]):>12} '
+                      f'{self.fmt_ms(val["ipv4-handshake"]):>12} '
+                      f'{self.fmt_ms(val["ipv6-connect"]):>12} '
+                      f'{self.fmt_ms(val["ipv6-handshake"]):>12}     {"/".join(val["ipv4-errors"] + val["ipv6-errors"]):<20}'
+                      )
         if 'downloads' in score:
             print('Downloads')
+            print(f'  {"Server":<8} {"Size":>8} '
+                  f'{"Single":>12} {"Serial":>12} {"Parallel":>12}    {"Errors":<20}')
+            skeys = {}
             for dkey, dval in score["downloads"].items():
-                print(f'  {dkey}: {dval["description"]}')
+                for k in dval.keys():
+                    skeys[k] = True
+            for skey in skeys:
+                for dkey, dval in score["downloads"].items():
+                    if skey in dval:
+                        sval = dval[skey]
+                        if isinstance(sval, str):
+                            continue
+                        errors = []
+                        for key, val in sval.items():
+                            if 'errors' in val:
+                                errors.extend(val['errors'])
+                        print(f'  {dkey:<8} {skey:>8} '
+                              f'{self.fmt_mbs(sval["single"]["speed"]):>12} '
+                              f'{self.fmt_mbs(sval["serial"]["speed"]):>12} '
+                              f'{self.fmt_mbs(sval["parallel"]["speed"]):>12} '
+                              f'   {"/".join(errors):<20}')
+        if 'requests' in score:
+            print('Requests, max in parallel')
+            print(f'  {"Server":<8} {"Size":>8} '
+                  f'{"1    ":>12} {"6    ":>12} {"25    ":>12} '
+                  f'{"50    ":>12} {"100    ":>12}    {"Errors":<20}')
+            for dkey, dval in score["requests"].items():
                 for skey, sval in dval.items():
                     if isinstance(sval, str):
                         continue
-                    print(f'    {skey:<13} {"Samples":>10} {"Count":>10} {"Speed":>17}   {"Errors":<20}')
+                    errors = []
                     for key, val in sval.items():
-                        print(f'      {key:<11} {val["samples"]:>10} '''
-                              f'{val["count"]:>10} {self.fmt_mbs(val["speed"]):>17}   '
-                              f'{"/".join(val["errors"]):<20}')
+                        if 'errors' in val:
+                            errors.extend(val['errors'])
+                    print(f'  {dkey:<8} {skey:>8} '
+                          f'{self.fmt_reqs(sval["serial"]["speed"]):>12} '
+                          f'{self.fmt_reqs(sval["par-6"]["speed"]):>12} '
+                          f'{self.fmt_reqs(sval["par-25"]["speed"]):>12} '
+                          f'{self.fmt_reqs(sval["par-50"]["speed"]):>12} '
+                          f'{self.fmt_reqs(sval["par-100"]["speed"]):>12} '
+                          f'   {"/".join(errors):<20}')
 
     def main(self):
         parser = argparse.ArgumentParser(prog='scorecard', description="""
             Run a range of tests to give a scorecard for a HTTP protocol
             'h3' or 'h2' implementation in curl.
             """)
-        parser.add_argument("-v", "--verbose", action='count', default=0,
+        parser.add_argument("-v", "--verbose", action='count', default=1,
                             help="log more output on stderr")
-        parser.add_argument("-t", "--text", action='store_true', default=False,
-                            help="print text instead of json")
+        parser.add_argument("-j", "--json", action='store_true', default=False,
+                            help="print json instead of text")
+        parser.add_argument("-H", "--handshakes", action='store_true', default=False,
+                            help="evaluate handshakes only")
         parser.add_argument("-d", "--downloads", action='store_true', default=False,
                             help="evaluate downloads only")
+        parser.add_argument("-r", "--requests", action='store_true', default=False,
+                            help="evaluate requests only")
         parser.add_argument("--httpd", action='store_true', default=False,
                             help="evaluate httpd server only")
         parser.add_argument("--caddy", action='store_true', default=False,
                             help="evaluate caddy server only")
-        parser.add_argument("protocols", nargs='*', help="Name(s) of protocol to score")
+        parser.add_argument("protocol", default='h2', nargs='?', help="Name of protocol to score")
         args = parser.parse_args()
 
         self.verbose = args.verbose
@@ -357,13 +479,21 @@ class ScoreCard:
             console.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
             logging.getLogger('').addHandler(console)
 
-        protocols = args.protocols if len(args.protocols) else ['h2', 'h3']
+        protocol = args.protocol
         handshakes = True
         downloads = True
-        test_httpd = True
+        requests = True
+        test_httpd = protocol != 'h3'
         test_caddy = True
+        if args.handshakes:
+            downloads = False
+            requests = False
         if args.downloads:
             handshakes = False
+            requests = False
+        if args.requests:
+            handshakes = False
+            downloads = False
         if args.caddy:
             test_caddy = True
             test_httpd = False
@@ -383,7 +513,7 @@ class ScoreCard:
             assert self.httpd.exists(), f'httpd not found: {self.env.httpd}'
             self.httpd.clear_logs()
             assert self.httpd.start()
-            if 'h3' in protocols:
+            if 'h3' == protocol:
                 self.nghttpx = Nghttpx(env=self.env)
                 self.nghttpx.clear_logs()
                 assert self.nghttpx.start()
@@ -392,15 +522,15 @@ class ScoreCard:
                 self.caddy.clear_logs()
                 assert self.caddy.start()
 
-            for p in protocols:
-                score = self.score_proto(proto=p, handshakes=handshakes,
-                                         downloads=downloads,
-                                         test_caddy=test_caddy,
-                                         test_httpd=test_httpd)
-                if args.text:
-                    self.print_score(score)
-                else:
-                    print(json.JSONEncoder(indent=2).encode(score))
+            score = self.score_proto(proto=protocol, handshakes=handshakes,
+                                     downloads=downloads,
+                                     requests=requests,
+                                     test_caddy=test_caddy,
+                                     test_httpd=test_httpd)
+            if args.json:
+                print(json.JSONEncoder(indent=2).encode(score))
+            else:
+                self.print_score(score)
 
         except ScoreCardException as ex:
             sys.stderr.write(f"ERROR: {str(ex)}\n")
index c2c7e51971f1c34df1270c6922b176e3066dea55..40f178a7cca721942c88695c42ddbf03fd758223 100644 (file)
@@ -24,6 +24,8 @@
 #
 ###########################################################################
 #
+import difflib
+import filecmp
 import logging
 import os
 import pytest
@@ -178,11 +180,7 @@ class TestUpload:
                              extra_args=['--parallel'])
         r.check_exit_code(0)  
         r.check_stats(count=count, exp_status=200)
-        indata = open(fdata).readlines()
-        r.check_stats(count=count, exp_status=200)
-        for i in range(count):
-            respdata = open(curl.response_file(i)).readlines()
-            assert respdata == indata
+        self.check_download(count, fdata, curl)
 
     # PUT 100k
     @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
@@ -222,3 +220,14 @@ class TestUpload:
             respdata = open(curl.response_file(i)).readlines()
             assert respdata == exp_data
 
+    def check_download(self, count, srcfile, curl):
+        for i in range(count):
+            dfile = curl.download_file(i)
+            assert os.path.exists(dfile)
+            if not filecmp.cmp(srcfile, dfile, shallow=False):
+                diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
+                                                    b=open(dfile).readlines(),
+                                                    fromfile=srcfile,
+                                                    tofile=dfile,
+                                                    n=1))
+                assert False, f'download {dfile} differs:\n{diff}'
index d789446f9646323d01cc1a5ffb033f456d96edfa..ea1343a950fb51395933c5a93e2e21bf9e8e1564 100644 (file)
@@ -126,10 +126,8 @@ class Caddy:
             r = curl.http_get(url=check_url)
             if r.exit_code == 0:
                 return True
-            log.error(f'curl: {r}')
-            log.debug(f'waiting for caddy to become responsive: {r}')
             time.sleep(.1)
-        log.error(f"Server still not responding after {timeout}")
+        log.error(f"Caddy still not responding after {timeout}")
         return False
 
     def _rmf(self, path):
index caf9249b1a052070a6744281bd16534dbbf1644d..9d1a4255f8b267e46c6315b62934cc79f7a040f0 100644 (file)
@@ -63,6 +63,8 @@ class EnvConfig:
         self.config = DEF_CONFIG
         # check cur and its features
         self.curl = CURL
+        if 'CURL' in os.environ:
+            self.curl = os.environ['CURL']
         self.curl_props = {
             'version': None,
             'os': None,