From: Stefan Eissing Date: Thu, 9 Feb 2023 09:16:50 +0000 (+0100) Subject: http2: fix upload busy loop X-Git-Tag: curl-8_0_0~119 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=d9ccc75b007aefb2b706ce1976ddadc39531fa23;p=thirdparty%2Fcurl.git http2: fix upload busy loop - Set KEEP_SEND_PAUSE when exhausting remote HTTP/2 window size of a stream. - Clear KEEP_SEND_PAUSE when receiving HTTP/2 window updates on a paused stream. - Also fix http2 send compiler warnings reported in #10449. Prior to this change, starting in 71b7e016 which precedes 7.88.0, libcurl may eat CPU during HTTP/2 upload. Reported-by: Jay Satiro Fixes https://github.com/curl/curl/issues/10449 Fixes https://github.com/curl/curl/issues/10618 Closes https://github.com/curl/curl/pull/10627 --- diff --git a/lib/http2.c b/lib/http2.c index f45b0ac5b1..0bae69cb3a 100644 --- a/lib/http2.c +++ b/lib/http2.c @@ -942,6 +942,17 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv RST", stream_id)); stream->reset = TRUE; break; + case NGHTTP2_WINDOW_UPDATE: + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv WINDOW_UPDATE", stream_id)); + if((data_s->req.keepon & KEEP_SEND_PAUSE) && + (data_s->req.keepon & KEEP_SEND)) { + data_s->req.keepon &= ~KEEP_SEND_PAUSE; + drain_this(cf, data_s); + Curl_expire(data_s, 0, EXPIRE_RUN_NOW); + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] unpausing after win update", + stream_id)); + } + break; default: DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv frame %x", stream_id, frame->hd.type)); @@ -1006,18 +1017,6 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags, return NGHTTP2_ERR_PAUSE; } -#if 0 - /* pause execution of nghttp2 if we received data for another handle - in order to process them first. */ - if(CF_DATA_CURRENT(cf) != data_s) { - ctx->pause_stream_id = stream_id; - DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] not call_data -> NGHTTP2_ERR_PAUSE", - stream_id)); - drain_this(cf, data_s); - return NGHTTP2_ERR_PAUSE; - } -#endif - return 0; } @@ -1763,7 +1762,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, goto out; } - DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: win %u/%u", + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u", stream->stream_id, nghttp2_session_get_local_window_size(ctx->h2), nghttp2_session_get_stream_local_window_size(ctx->h2, @@ -1976,19 +1975,20 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, CURLcode result; struct h2h3req *hreq; struct cf_call_data save; + ssize_t nwritten; CF_DATA_SAVE(save, cf, data); - DEBUGF(LOG_CF(data, cf, "send len=%zu", len)); + DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len)); if(stream->stream_id != -1) { if(stream->close_handled) { infof(data, "stream %u closed", stream->stream_id); *err = CURLE_HTTP2_STREAM; - len = -1; + nwritten = -1; goto out; } else if(stream->closed) { - len = http2_handle_stream_close(cf, data, stream, err); + nwritten = http2_handle_stream_close(cf, data, stream, err); goto out; } /* If stream_id != -1, we have dispatched request HEADERS, and now @@ -1998,26 +1998,24 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id); if(nghttp2_is_fatal(rv)) { *err = CURLE_SEND_ERROR; - len = -1; + nwritten = -1; goto out; } result = h2_session_send(cf, data); if(result) { *err = result; - len = -1; + nwritten = -1; goto out; } - len -= stream->upload_len; - /* Nullify here because we call nghttp2_session_send() and they - might refer to the old buffer. */ + nwritten = (ssize_t)len - (ssize_t)stream->upload_len; stream->upload_mem = NULL; stream->upload_len = 0; if(should_close_session(ctx)) { DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); *err = CURLE_HTTP2; - len = -1; + nwritten = -1; goto out; } @@ -2029,26 +2027,36 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, nghttp2_session_resume_data(ctx->h2, stream->stream_id); } -#ifdef DEBUG_HTTP2 - if(!len) { - infof(data, "http2_send: easy %p (stream %u) win %u/%u", - data, stream->stream_id, - nghttp2_session_get_remote_window_size(ctx->h2), - nghttp2_session_get_stream_remote_window_size(ctx->h2, - stream->stream_id) - ); - + if(!nwritten) { + size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2, + stream->stream_id); + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%zu", + stream->stream_id, + nghttp2_session_get_remote_window_size(ctx->h2), rwin)); + if(rwin == 0) { + /* We cannot upload more as the stream's remote window size + * is 0. We need to receive WIN_UPDATEs before we can continue. + */ + data->req.keepon |= KEEP_SEND_PAUSE; + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pausing send as remote flow " + "window is exhausted", stream->stream_id)); + } } - infof(data, "http2_send returns %zu for stream %u", len, - stream->stream_id); -#endif + DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ", + stream->stream_id, nwritten)); + + /* handled writing BODY for open stream. */ goto out; } - + /* Stream has not been opened yet. `buf` is expected to contain + * request headers. */ + /* TODO: this assumes that the `buf` and `len` we are called with + * is *all* HEADERs and no body. We have no way to determine here + * if that is indeed the case. */ result = Curl_pseudo_headers(data, buf, len, NULL, &hreq); if(result) { *err = result; - len = -1; + nwritten = -1; goto out; } nheader = hreq->entries; @@ -2057,7 +2065,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, if(!nva) { Curl_pseudo_free(hreq); *err = CURLE_OUT_OF_MEMORY; - len = -1; + nwritten = -1; goto out; } else { @@ -2104,25 +2112,28 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, DEBUGF(LOG_CF(data, cf, "send: nghttp2_submit_request error (%s)%u", nghttp2_strerror(stream_id), stream_id)); *err = CURLE_SEND_ERROR; - len = -1; + nwritten = -1; goto out; } infof(data, "Using Stream ID: %u (easy handle %p)", stream_id, (void *)data); stream->stream_id = stream_id; + /* See TODO above. We assume that the whole buf was consumed by + * generating the request headers. */ + nwritten = len; result = h2_session_send(cf, data); if(result) { *err = result; - len = -1; + nwritten = -1; goto out; } if(should_close_session(ctx)) { DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session")); *err = CURLE_HTTP2; - len = -1; + nwritten = -1; goto out; } @@ -2137,7 +2148,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data, out: CF_DATA_RESTORE(cf, save); - return len; + return nwritten; } static int cf_h2_get_select_socks(struct Curl_cfilter *cf, diff --git a/tests/tests-httpd/test_07_upload.py b/tests/tests-httpd/test_07_upload.py index 45a6b659db..c8288672c9 100644 --- a/tests/tests-httpd/test_07_upload.py +++ b/tests/tests-httpd/test_07_upload.py @@ -181,3 +181,41 @@ class TestUpload: respdata = open(curl.response_file(i)).readlines() assert respdata == indata + # PUT 100k + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto): + if proto == 'h3' and not env.have_h3(): + pytest.skip("h3 not supported") + fdata = os.path.join(env.gen_dir, 'data-100k') + count = 1 + curl = CurlClient(env=env) + url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]' + r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto, + extra_args=['--parallel']) + assert r.exit_code == 0, f'{r}' + r.check_stats(count=count, exp_status=200) + exp_data = [f'{os.path.getsize(fdata)}'] + r.check_stats(count=count, exp_status=200) + for i in range(count): + respdata = open(curl.response_file(i)).readlines() + assert respdata == exp_data + + # PUT 10m + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto): + if proto == 'h3' and not env.have_h3(): + pytest.skip("h3 not supported") + fdata = os.path.join(env.gen_dir, 'data-10m') + count = 1 + curl = CurlClient(env=env) + url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]&chunk_delay=10ms' + r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto, + extra_args=['--parallel']) + assert r.exit_code == 0, f'{r}' + r.check_stats(count=count, exp_status=200) + exp_data = [f'{os.path.getsize(fdata)}'] + r.check_stats(count=count, exp_status=200) + for i in range(count): + respdata = open(curl.response_file(i)).readlines() + assert respdata == exp_data + diff --git a/tests/tests-httpd/testenv/curl.py b/tests/tests-httpd/testenv/curl.py index 7d0414d7cd..8f2f18b090 100644 --- a/tests/tests-httpd/testenv/curl.py +++ b/tests/tests-httpd/testenv/curl.py @@ -270,6 +270,29 @@ class CurlClient: with_stats=with_stats, with_headers=with_headers) + def http_put(self, urls: List[str], data=None, fdata=None, + alpn_proto: Optional[str] = None, + with_stats: bool = True, + with_headers: bool = False, + extra_args: Optional[List[str]] = None): + if extra_args is None: + extra_args = [] + if fdata is not None: + extra_args.extend(['-T', fdata]) + elif data is not None: + extra_args.extend(['-T', '-']) + extra_args.extend([ + '-o', 'download_#1.data', + ]) + if with_stats: + extra_args.extend([ + '-w', '%{json}\\n' + ]) + return self._raw(urls, intext=data, + alpn_proto=alpn_proto, options=extra_args, + with_stats=with_stats, + with_headers=with_headers) + def response_file(self, idx: int): return os.path.join(self._run_dir, f'download_{idx}.data') @@ -303,7 +326,7 @@ class CurlClient: duration=datetime.now() - start, with_stats=with_stats) - def _raw(self, urls, timeout=10, options=None, insecure=False, + def _raw(self, urls, intext='', timeout=10, options=None, insecure=False, alpn_proto: Optional[str] = None, force_resolve=True, with_stats=False, @@ -312,7 +335,7 @@ class CurlClient: urls=urls, timeout=timeout, options=options, insecure=insecure, alpn_proto=alpn_proto, force_resolve=force_resolve, with_headers=with_headers) - r = self._run(args, with_stats=with_stats) + r = self._run(args, intext=intext, with_stats=with_stats) if r.exit_code == 0 and with_headers: self._parse_headerfile(self._headerfile, r=r) if r.json: diff --git a/tests/tests-httpd/testenv/httpd.py b/tests/tests-httpd/testenv/httpd.py index 8b8859f9a2..fd7be84f31 100644 --- a/tests/tests-httpd/testenv/httpd.py +++ b/tests/tests-httpd/testenv/httpd.py @@ -325,6 +325,9 @@ class Httpd: f' ', f' SetHandler curltest-echo', f' ', + f' ', + f' SetHandler curltest-put', + f' ', f' ', f' SetHandler curltest-tweak', f' ', diff --git a/tests/tests-httpd/testenv/mod_curltest/mod_curltest.c b/tests/tests-httpd/testenv/mod_curltest/mod_curltest.c index 10522a8d11..498f9e536d 100644 --- a/tests/tests-httpd/testenv/mod_curltest/mod_curltest.c +++ b/tests/tests-httpd/testenv/mod_curltest/mod_curltest.c @@ -35,6 +35,7 @@ static void curltest_hooks(apr_pool_t *pool); static int curltest_echo_handler(request_rec *r); +static int curltest_put_handler(request_rec *r); static int curltest_tweak_handler(request_rec *r); AP_DECLARE_MODULE(curltest) = { @@ -81,6 +82,7 @@ static void curltest_hooks(apr_pool_t *pool) /* curl test handlers */ ap_hook_handler(curltest_echo_handler, NULL, NULL, APR_HOOK_MIDDLE); + ap_hook_handler(curltest_put_handler, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_handler(curltest_tweak_handler, NULL, NULL, APR_HOOK_MIDDLE); } @@ -229,9 +231,9 @@ static int curltest_echo_handler(request_rec *r) rv = ap_pass_brigade(r->output_filters, bb); cleanup: - if(rv == APR_SUCCESS - || r->status != HTTP_OK - || c->aborted) { + if(rv == APR_SUCCESS || + r->status != HTTP_OK || + c->aborted) { ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "echo_handler: done"); return OK; } @@ -409,3 +411,104 @@ cleanup: } return AP_FILTER_ERROR; } + +static int curltest_put_handler(request_rec *r) +{ + conn_rec *c = r->connection; + apr_bucket_brigade *bb; + apr_bucket *b; + apr_status_t rv; + char buffer[16*1024]; + const char *ct; + apr_off_t rbody_len = 0; + const char *request_id = "none"; + apr_time_t chunk_delay = 0; + apr_array_header_t *args = NULL; + long l; + int i; + + if(strcmp(r->handler, "curltest-put")) { + return DECLINED; + } + if(r->method_number != M_PUT) { + return DECLINED; + } + + if(r->args) { + args = apr_cstr_split(r->args, "&", 1, r->pool); + for(i = 0; i < args->nelts; ++i) { + char *s, *val, *arg = APR_ARRAY_IDX(args, i, char*); + s = strchr(arg, '='); + if(s) { + *s = '\0'; + val = s + 1; + if(!strcmp("id", arg)) { + /* just an id for repeated requests with curl's url globbing */ + request_id = val; + continue; + } + else if(!strcmp("chunk_delay", arg)) { + rv = duration_parse(&chunk_delay, val, "s"); + if(APR_SUCCESS == rv) { + continue; + } + } + } + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "query parameter not " + "understood: '%s' in %s", + arg, r->args); + ap_die(HTTP_BAD_REQUEST, r); + return OK; + } + } + + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: processing"); + r->status = 200; + r->clength = -1; + r->chunked = 1; + apr_table_unset(r->headers_out, "Content-Length"); + /* Discourage content-encodings */ + apr_table_unset(r->headers_out, "Content-Encoding"); + apr_table_setn(r->subprocess_env, "no-brotli", "1"); + apr_table_setn(r->subprocess_env, "no-gzip", "1"); + + ct = apr_table_get(r->headers_in, "content-type"); + ap_set_content_type(r, ct? ct : "text/plain"); + + bb = apr_brigade_create(r->pool, c->bucket_alloc); + /* copy any request body into the response */ + if((rv = ap_setup_client_block(r, REQUEST_CHUNKED_DECHUNK))) goto cleanup; + if(ap_should_client_block(r)) { + while(0 < (l = ap_get_client_block(r, &buffer[0], sizeof(buffer)))) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, + "put_handler: read %ld bytes from request body", l); + if(chunk_delay) { + apr_sleep(chunk_delay); + } + rbody_len += l; + } + } + /* we are done */ + rv = apr_brigade_printf(bb, NULL, NULL, "%"APR_OFF_T_FMT, rbody_len); + if(APR_SUCCESS != rv) goto cleanup; + b = apr_bucket_eos_create(c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(bb, b); + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: request read"); + + rv = ap_pass_brigade(r->output_filters, bb); + +cleanup: + if(rv == APR_SUCCESS + || r->status != HTTP_OK + || c->aborted) { + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler: done"); + return OK; + } + else { + /* no way to know what type of error occurred */ + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler failed"); + return AP_FILTER_ERROR; + } + return DECLINED; +} +