]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
http2: fix upload busy loop
authorStefan Eissing <stefan@eissing.org>
Thu, 9 Feb 2023 09:16:50 +0000 (10:16 +0100)
committerJay Satiro <raysatiro@yahoo.com>
Wed, 1 Mar 2023 09:13:49 +0000 (04:13 -0500)
- Set KEEP_SEND_PAUSE when exhausting remote HTTP/2 window size of a
  stream.

- Clear KEEP_SEND_PAUSE when receiving HTTP/2 window updates on a paused
  stream.

- Also fix http2 send compiler warnings reported in #10449.

Prior to this change, starting in 71b7e016 which precedes 7.88.0,
libcurl may eat CPU during HTTP/2 upload.

Reported-by: Jay Satiro
Fixes https://github.com/curl/curl/issues/10449
Fixes https://github.com/curl/curl/issues/10618
Closes https://github.com/curl/curl/pull/10627

lib/http2.c
tests/tests-httpd/test_07_upload.py
tests/tests-httpd/testenv/curl.py
tests/tests-httpd/testenv/httpd.py
tests/tests-httpd/testenv/mod_curltest/mod_curltest.c

index f45b0ac5b1d0d17ff5a277b92a78c811ee98c17d..0bae69cb3a5a9d3e0e4b8883b4b6881836dfcf8c 100644 (file)
@@ -942,6 +942,17 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
     DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv RST", stream_id));
     stream->reset = TRUE;
     break;
+    case NGHTTP2_WINDOW_UPDATE:
+      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv WINDOW_UPDATE", stream_id));
+      if((data_s->req.keepon & KEEP_SEND_PAUSE) &&
+         (data_s->req.keepon & KEEP_SEND)) {
+        data_s->req.keepon &= ~KEEP_SEND_PAUSE;
+        drain_this(cf, data_s);
+        Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+        DEBUGF(LOG_CF(data, cf, "[h2sid=%u] unpausing after win update",
+               stream_id));
+      }
+      break;
   default:
     DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv frame %x",
                   stream_id, frame->hd.type));
@@ -1006,18 +1017,6 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
     return NGHTTP2_ERR_PAUSE;
   }
 
-#if 0
-  /* pause execution of nghttp2 if we received data for another handle
-     in order to process them first. */
-  if(CF_DATA_CURRENT(cf) != data_s) {
-    ctx->pause_stream_id = stream_id;
-    DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] not call_data -> NGHTTP2_ERR_PAUSE",
-                  stream_id));
-    drain_this(cf, data_s);
-    return NGHTTP2_ERR_PAUSE;
-  }
-#endif
-
   return 0;
 }
 
@@ -1763,7 +1762,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
     goto out;
   }
 
-  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: win %u/%u",
+  DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u",
                 stream->stream_id,
                 nghttp2_session_get_local_window_size(ctx->h2),
                 nghttp2_session_get_stream_local_window_size(ctx->h2,
@@ -1976,19 +1975,20 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
   CURLcode result;
   struct h2h3req *hreq;
   struct cf_call_data save;
+  ssize_t nwritten;
 
   CF_DATA_SAVE(save, cf, data);
-  DEBUGF(LOG_CF(data, cf, "send len=%zu", len));
+  DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len));
 
   if(stream->stream_id != -1) {
     if(stream->close_handled) {
       infof(data, "stream %u closed", stream->stream_id);
       *err = CURLE_HTTP2_STREAM;
-      len = -1;
+      nwritten = -1;
       goto out;
     }
     else if(stream->closed) {
-      len = http2_handle_stream_close(cf, data, stream, err);
+      nwritten = http2_handle_stream_close(cf, data, stream, err);
       goto out;
     }
     /* If stream_id != -1, we have dispatched request HEADERS, and now
@@ -1998,26 +1998,24 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
     if(nghttp2_is_fatal(rv)) {
       *err = CURLE_SEND_ERROR;
-      len = -1;
+      nwritten = -1;
       goto out;
     }
     result = h2_session_send(cf, data);
     if(result) {
       *err = result;
-      len = -1;
+      nwritten = -1;
       goto out;
     }
-    len -= stream->upload_len;
 
-    /* Nullify here because we call nghttp2_session_send() and they
-       might refer to the old buffer. */
+    nwritten = (ssize_t)len - (ssize_t)stream->upload_len;
     stream->upload_mem = NULL;
     stream->upload_len = 0;
 
     if(should_close_session(ctx)) {
       DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
       *err = CURLE_HTTP2;
-      len = -1;
+      nwritten = -1;
       goto out;
     }
 
@@ -2029,26 +2027,36 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
       nghttp2_session_resume_data(ctx->h2, stream->stream_id);
     }
 
-#ifdef DEBUG_HTTP2
-    if(!len) {
-      infof(data, "http2_send: easy %p (stream %u) win %u/%u",
-            data, stream->stream_id,
-            nghttp2_session_get_remote_window_size(ctx->h2),
-            nghttp2_session_get_stream_remote_window_size(ctx->h2,
-                                                          stream->stream_id)
-        );
-
+    if(!nwritten) {
+      size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
+                                                          stream->stream_id);
+      DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%zu",
+             stream->stream_id,
+             nghttp2_session_get_remote_window_size(ctx->h2), rwin));
+        if(rwin == 0) {
+          /* We cannot upload more as the stream's remote window size
+           * is 0. We need to receive WIN_UPDATEs before we can continue.
+           */
+          data->req.keepon |= KEEP_SEND_PAUSE;
+          DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pausing send as remote flow "
+                 "window is exhausted", stream->stream_id));
+        }
     }
-    infof(data, "http2_send returns %zu for stream %u", len,
-          stream->stream_id);
-#endif
+    DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ",
+           stream->stream_id, nwritten));
+
+    /* handled writing BODY for open stream. */
     goto out;
   }
-
+  /* Stream has not been opened yet. `buf` is expected to contain
+   * request headers. */
+  /* TODO: this assumes that the `buf` and `len` we are called with
+   * is *all* HEADERs and no body. We have no way to determine here
+   * if that is indeed the case. */
   result = Curl_pseudo_headers(data, buf, len, NULL, &hreq);
   if(result) {
     *err = result;
-    len = -1;
+    nwritten = -1;
     goto out;
   }
   nheader = hreq->entries;
@@ -2057,7 +2065,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
   if(!nva) {
     Curl_pseudo_free(hreq);
     *err = CURLE_OUT_OF_MEMORY;
-    len = -1;
+    nwritten = -1;
     goto out;
   }
   else {
@@ -2104,25 +2112,28 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     DEBUGF(LOG_CF(data, cf, "send: nghttp2_submit_request error (%s)%u",
                   nghttp2_strerror(stream_id), stream_id));
     *err = CURLE_SEND_ERROR;
-    len = -1;
+    nwritten = -1;
     goto out;
   }
 
   infof(data, "Using Stream ID: %u (easy handle %p)",
         stream_id, (void *)data);
   stream->stream_id = stream_id;
+  /* See TODO above. We assume that the whole buf was consumed by
+   * generating the request headers. */
+  nwritten = len;
 
   result = h2_session_send(cf, data);
   if(result) {
     *err = result;
-    len = -1;
+    nwritten = -1;
     goto out;
   }
 
   if(should_close_session(ctx)) {
     DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
     *err = CURLE_HTTP2;
-    len = -1;
+    nwritten = -1;
     goto out;
   }
 
@@ -2137,7 +2148,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
 
 out:
   CF_DATA_RESTORE(cf, save);
-  return len;
+  return nwritten;
 }
 
 static int cf_h2_get_select_socks(struct Curl_cfilter *cf,
index 45a6b659db24702381cdad9aa9693b089c8a8160..c8288672c921507447fc41e3be7e8b48f056f0ae 100644 (file)
@@ -181,3 +181,41 @@ class TestUpload:
             respdata = open(curl.response_file(i)).readlines()
             assert respdata == indata
 
+    # PUT 100k
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto):
+        if proto == 'h3' and not env.have_h3():
+            pytest.skip("h3 not supported")
+        fdata = os.path.join(env.gen_dir, 'data-100k')
+        count = 1
+        curl = CurlClient(env=env)
+        url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]'
+        r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
+                             extra_args=['--parallel'])
+        assert r.exit_code == 0, f'{r}'
+        r.check_stats(count=count, exp_status=200)
+        exp_data = [f'{os.path.getsize(fdata)}']
+        r.check_stats(count=count, exp_status=200)
+        for i in range(count):
+            respdata = open(curl.response_file(i)).readlines()
+            assert respdata == exp_data
+
+    # PUT 10m
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto):
+        if proto == 'h3' and not env.have_h3():
+            pytest.skip("h3 not supported")
+        fdata = os.path.join(env.gen_dir, 'data-10m')
+        count = 1
+        curl = CurlClient(env=env)
+        url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]&chunk_delay=10ms'
+        r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
+                             extra_args=['--parallel'])
+        assert r.exit_code == 0, f'{r}'
+        r.check_stats(count=count, exp_status=200)
+        exp_data = [f'{os.path.getsize(fdata)}']
+        r.check_stats(count=count, exp_status=200)
+        for i in range(count):
+            respdata = open(curl.response_file(i)).readlines()
+            assert respdata == exp_data
+
index 7d0414d7cdaf974c5b24723fcdc0c745f23af980..8f2f18b090ec32492cb1101d7bf985aa35ddfd0f 100644 (file)
@@ -270,6 +270,29 @@ class CurlClient:
                          with_stats=with_stats,
                          with_headers=with_headers)
 
+    def http_put(self, urls: List[str], data=None, fdata=None,
+                 alpn_proto: Optional[str] = None,
+                 with_stats: bool = True,
+                 with_headers: bool = False,
+                 extra_args: Optional[List[str]] = None):
+        if extra_args is None:
+            extra_args = []
+        if fdata is not None:
+            extra_args.extend(['-T', fdata])
+        elif data is not None:
+            extra_args.extend(['-T', '-'])
+        extra_args.extend([
+            '-o', 'download_#1.data',
+        ])
+        if with_stats:
+            extra_args.extend([
+                '-w', '%{json}\\n'
+            ])
+        return self._raw(urls, intext=data,
+                         alpn_proto=alpn_proto, options=extra_args,
+                         with_stats=with_stats,
+                         with_headers=with_headers)
+
     def response_file(self, idx: int):
         return os.path.join(self._run_dir, f'download_{idx}.data')
 
@@ -303,7 +326,7 @@ class CurlClient:
                           duration=datetime.now() - start,
                           with_stats=with_stats)
 
-    def _raw(self, urls, timeout=10, options=None, insecure=False,
+    def _raw(self, urls, intext='', timeout=10, options=None, insecure=False,
              alpn_proto: Optional[str] = None,
              force_resolve=True,
              with_stats=False,
@@ -312,7 +335,7 @@ class CurlClient:
             urls=urls, timeout=timeout, options=options, insecure=insecure,
             alpn_proto=alpn_proto, force_resolve=force_resolve,
             with_headers=with_headers)
-        r = self._run(args, with_stats=with_stats)
+        r = self._run(args, intext=intext, with_stats=with_stats)
         if r.exit_code == 0 and with_headers:
             self._parse_headerfile(self._headerfile, r=r)
             if r.json:
index 8b8859f9a21e2f921850dbdffd2f80261063bea1..fd7be84f31a6db69f62ba2fd9a3bcb567ebe5eaf 100644 (file)
@@ -325,6 +325,9 @@ class Httpd:
                 f'    <Location /curltest/echo>',
                 f'      SetHandler curltest-echo',
                 f'    </Location>',
+                f'    <Location /curltest/put>',
+                f'      SetHandler curltest-put',
+                f'    </Location>',
                 f'    <Location /curltest/tweak>',
                 f'      SetHandler curltest-tweak',
                 f'    </Location>',
index 10522a8d11d3001181277ed9810ac022422915d1..498f9e536d3f9dbcd074c13922023434529efbc9 100644 (file)
@@ -35,6 +35,7 @@
 
 static void curltest_hooks(apr_pool_t *pool);
 static int curltest_echo_handler(request_rec *r);
+static int curltest_put_handler(request_rec *r);
 static int curltest_tweak_handler(request_rec *r);
 
 AP_DECLARE_MODULE(curltest) = {
@@ -81,6 +82,7 @@ static void curltest_hooks(apr_pool_t *pool)
 
   /* curl test handlers */
   ap_hook_handler(curltest_echo_handler, NULL, NULL, APR_HOOK_MIDDLE);
+  ap_hook_handler(curltest_put_handler, NULL, NULL, APR_HOOK_MIDDLE);
   ap_hook_handler(curltest_tweak_handler, NULL, NULL, APR_HOOK_MIDDLE);
 }
 
@@ -229,9 +231,9 @@ static int curltest_echo_handler(request_rec *r)
   rv = ap_pass_brigade(r->output_filters, bb);
 
 cleanup:
-  if(rv == APR_SUCCESS
-     || r->status != HTTP_OK
-     || c->aborted) {
+  if(rv == APR_SUCCESS ||
+     r->status != HTTP_OK ||
+     c->aborted) {
     ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "echo_handler: done");
     return OK;
   }
@@ -409,3 +411,104 @@ cleanup:
   }
   return AP_FILTER_ERROR;
 }
+
+static int curltest_put_handler(request_rec *r)
+{
+  conn_rec *c = r->connection;
+  apr_bucket_brigade *bb;
+  apr_bucket *b;
+  apr_status_t rv;
+  char buffer[16*1024];
+  const char *ct;
+  apr_off_t rbody_len = 0;
+  const char *request_id = "none";
+  apr_time_t chunk_delay = 0;
+  apr_array_header_t *args = NULL;
+  long l;
+  int i;
+
+  if(strcmp(r->handler, "curltest-put")) {
+    return DECLINED;
+  }
+  if(r->method_number != M_PUT) {
+    return DECLINED;
+  }
+
+  if(r->args) {
+    args = apr_cstr_split(r->args, "&", 1, r->pool);
+    for(i = 0; i < args->nelts; ++i) {
+      char *s, *val, *arg = APR_ARRAY_IDX(args, i, char*);
+      s = strchr(arg, '=');
+      if(s) {
+        *s = '\0';
+        val = s + 1;
+        if(!strcmp("id", arg)) {
+          /* just an id for repeated requests with curl's url globbing */
+          request_id = val;
+          continue;
+        }
+        else if(!strcmp("chunk_delay", arg)) {
+          rv = duration_parse(&chunk_delay, val, "s");
+          if(APR_SUCCESS == rv) {
+            continue;
+          }
+        }
+      }
+      ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "query parameter not "
+                    "understood: '%s' in %s",
+                    arg, r->args);
+      ap_die(HTTP_BAD_REQUEST, r);
+      return OK;
+    }
+  }
+
+  ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: processing");
+  r->status = 200;
+  r->clength = -1;
+  r->chunked = 1;
+  apr_table_unset(r->headers_out, "Content-Length");
+  /* Discourage content-encodings */
+  apr_table_unset(r->headers_out, "Content-Encoding");
+  apr_table_setn(r->subprocess_env, "no-brotli", "1");
+  apr_table_setn(r->subprocess_env, "no-gzip", "1");
+
+  ct = apr_table_get(r->headers_in, "content-type");
+  ap_set_content_type(r, ct? ct : "text/plain");
+
+  bb = apr_brigade_create(r->pool, c->bucket_alloc);
+  /* copy any request body into the response */
+  if((rv = ap_setup_client_block(r, REQUEST_CHUNKED_DECHUNK))) goto cleanup;
+  if(ap_should_client_block(r)) {
+    while(0 < (l = ap_get_client_block(r, &buffer[0], sizeof(buffer)))) {
+      ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r,
+                    "put_handler: read %ld bytes from request body", l);
+      if(chunk_delay) {
+        apr_sleep(chunk_delay);
+      }
+      rbody_len += l;
+    }
+  }
+  /* we are done */
+  rv = apr_brigade_printf(bb, NULL, NULL, "%"APR_OFF_T_FMT, rbody_len);
+  if(APR_SUCCESS != rv) goto cleanup;
+  b = apr_bucket_eos_create(c->bucket_alloc);
+  APR_BRIGADE_INSERT_TAIL(bb, b);
+  ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: request read");
+
+  rv = ap_pass_brigade(r->output_filters, bb);
+
+cleanup:
+  if(rv == APR_SUCCESS
+     || r->status != HTTP_OK
+     || c->aborted) {
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler: done");
+    return OK;
+  }
+  else {
+    /* no way to know what type of error occurred */
+    ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler failed");
+    return AP_FILTER_ERROR;
+  }
+  return DECLINED;
+}
+