DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv RST", stream_id));
stream->reset = TRUE;
break;
+ case NGHTTP2_WINDOW_UPDATE:
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv WINDOW_UPDATE", stream_id));
+ if((data_s->req.keepon & KEEP_SEND_PAUSE) &&
+ (data_s->req.keepon & KEEP_SEND)) {
+ data_s->req.keepon &= ~KEEP_SEND_PAUSE;
+ drain_this(cf, data_s);
+ Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] unpausing after win update",
+ stream_id));
+ }
+ break;
default:
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv frame %x",
stream_id, frame->hd.type));
return NGHTTP2_ERR_PAUSE;
}
-#if 0
- /* pause execution of nghttp2 if we received data for another handle
- in order to process them first. */
- if(CF_DATA_CURRENT(cf) != data_s) {
- ctx->pause_stream_id = stream_id;
- DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] not call_data -> NGHTTP2_ERR_PAUSE",
- stream_id));
- drain_this(cf, data_s);
- return NGHTTP2_ERR_PAUSE;
- }
-#endif
-
return 0;
}
goto out;
}
- DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: win %u/%u",
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u",
stream->stream_id,
nghttp2_session_get_local_window_size(ctx->h2),
nghttp2_session_get_stream_local_window_size(ctx->h2,
CURLcode result;
struct h2h3req *hreq;
struct cf_call_data save;
+ ssize_t nwritten;
CF_DATA_SAVE(save, cf, data);
- DEBUGF(LOG_CF(data, cf, "send len=%zu", len));
+ DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len));
if(stream->stream_id != -1) {
if(stream->close_handled) {
infof(data, "stream %u closed", stream->stream_id);
*err = CURLE_HTTP2_STREAM;
- len = -1;
+ nwritten = -1;
goto out;
}
else if(stream->closed) {
- len = http2_handle_stream_close(cf, data, stream, err);
+ nwritten = http2_handle_stream_close(cf, data, stream, err);
goto out;
}
/* If stream_id != -1, we have dispatched request HEADERS, and now
rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
if(nghttp2_is_fatal(rv)) {
*err = CURLE_SEND_ERROR;
- len = -1;
+ nwritten = -1;
goto out;
}
result = h2_session_send(cf, data);
if(result) {
*err = result;
- len = -1;
+ nwritten = -1;
goto out;
}
- len -= stream->upload_len;
- /* Nullify here because we call nghttp2_session_send() and they
- might refer to the old buffer. */
+ nwritten = (ssize_t)len - (ssize_t)stream->upload_len;
stream->upload_mem = NULL;
stream->upload_len = 0;
if(should_close_session(ctx)) {
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
*err = CURLE_HTTP2;
- len = -1;
+ nwritten = -1;
goto out;
}
nghttp2_session_resume_data(ctx->h2, stream->stream_id);
}
-#ifdef DEBUG_HTTP2
- if(!len) {
- infof(data, "http2_send: easy %p (stream %u) win %u/%u",
- data, stream->stream_id,
- nghttp2_session_get_remote_window_size(ctx->h2),
- nghttp2_session_get_stream_remote_window_size(ctx->h2,
- stream->stream_id)
- );
-
+ if(!nwritten) {
+ size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
+ stream->stream_id);
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%zu",
+ stream->stream_id,
+ nghttp2_session_get_remote_window_size(ctx->h2), rwin));
+ if(rwin == 0) {
+ /* We cannot upload more as the stream's remote window size
+ * is 0. We need to receive WIN_UPDATEs before we can continue.
+ */
+ data->req.keepon |= KEEP_SEND_PAUSE;
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pausing send as remote flow "
+ "window is exhausted", stream->stream_id));
+ }
}
- infof(data, "http2_send returns %zu for stream %u", len,
- stream->stream_id);
-#endif
+ DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ",
+ stream->stream_id, nwritten));
+
+ /* handled writing BODY for open stream. */
goto out;
}
-
+ /* Stream has not been opened yet. `buf` is expected to contain
+ * request headers. */
+ /* TODO: this assumes that the `buf` and `len` we are called with
+ * is *all* HEADERs and no body. We have no way to determine here
+ * if that is indeed the case. */
result = Curl_pseudo_headers(data, buf, len, NULL, &hreq);
if(result) {
*err = result;
- len = -1;
+ nwritten = -1;
goto out;
}
nheader = hreq->entries;
if(!nva) {
Curl_pseudo_free(hreq);
*err = CURLE_OUT_OF_MEMORY;
- len = -1;
+ nwritten = -1;
goto out;
}
else {
DEBUGF(LOG_CF(data, cf, "send: nghttp2_submit_request error (%s)%u",
nghttp2_strerror(stream_id), stream_id));
*err = CURLE_SEND_ERROR;
- len = -1;
+ nwritten = -1;
goto out;
}
infof(data, "Using Stream ID: %u (easy handle %p)",
stream_id, (void *)data);
stream->stream_id = stream_id;
+ /* See TODO above. We assume that the whole buf was consumed by
+ * generating the request headers. */
+ nwritten = len;
result = h2_session_send(cf, data);
if(result) {
*err = result;
- len = -1;
+ nwritten = -1;
goto out;
}
if(should_close_session(ctx)) {
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
*err = CURLE_HTTP2;
- len = -1;
+ nwritten = -1;
goto out;
}
out:
CF_DATA_RESTORE(cf, save);
- return len;
+ return nwritten;
}
static int cf_h2_get_select_socks(struct Curl_cfilter *cf,
respdata = open(curl.response_file(i)).readlines()
assert respdata == indata
+ # PUT 100k
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
+ fdata = os.path.join(env.gen_dir, 'data-100k')
+ count = 1
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]'
+ r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
+ extra_args=['--parallel'])
+ assert r.exit_code == 0, f'{r}'
+ r.check_stats(count=count, exp_status=200)
+ exp_data = [f'{os.path.getsize(fdata)}']
+ r.check_stats(count=count, exp_status=200)
+ for i in range(count):
+ respdata = open(curl.response_file(i)).readlines()
+ assert respdata == exp_data
+
+ # PUT 10m
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
+ fdata = os.path.join(env.gen_dir, 'data-10m')
+ count = 1
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]&chunk_delay=10ms'
+ r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
+ extra_args=['--parallel'])
+ assert r.exit_code == 0, f'{r}'
+ r.check_stats(count=count, exp_status=200)
+ exp_data = [f'{os.path.getsize(fdata)}']
+ r.check_stats(count=count, exp_status=200)
+ for i in range(count):
+ respdata = open(curl.response_file(i)).readlines()
+ assert respdata == exp_data
+
with_stats=with_stats,
with_headers=with_headers)
+ def http_put(self, urls: List[str], data=None, fdata=None,
+ alpn_proto: Optional[str] = None,
+ with_stats: bool = True,
+ with_headers: bool = False,
+ extra_args: Optional[List[str]] = None):
+ if extra_args is None:
+ extra_args = []
+ if fdata is not None:
+ extra_args.extend(['-T', fdata])
+ elif data is not None:
+ extra_args.extend(['-T', '-'])
+ extra_args.extend([
+ '-o', 'download_#1.data',
+ ])
+ if with_stats:
+ extra_args.extend([
+ '-w', '%{json}\\n'
+ ])
+ return self._raw(urls, intext=data,
+ alpn_proto=alpn_proto, options=extra_args,
+ with_stats=with_stats,
+ with_headers=with_headers)
+
def response_file(self, idx: int):
return os.path.join(self._run_dir, f'download_{idx}.data')
duration=datetime.now() - start,
with_stats=with_stats)
- def _raw(self, urls, timeout=10, options=None, insecure=False,
+ def _raw(self, urls, intext='', timeout=10, options=None, insecure=False,
alpn_proto: Optional[str] = None,
force_resolve=True,
with_stats=False,
urls=urls, timeout=timeout, options=options, insecure=insecure,
alpn_proto=alpn_proto, force_resolve=force_resolve,
with_headers=with_headers)
- r = self._run(args, with_stats=with_stats)
+ r = self._run(args, intext=intext, with_stats=with_stats)
if r.exit_code == 0 and with_headers:
self._parse_headerfile(self._headerfile, r=r)
if r.json:
f' <Location /curltest/echo>',
f' SetHandler curltest-echo',
f' </Location>',
+ f' <Location /curltest/put>',
+ f' SetHandler curltest-put',
+ f' </Location>',
f' <Location /curltest/tweak>',
f' SetHandler curltest-tweak',
f' </Location>',
static void curltest_hooks(apr_pool_t *pool);
static int curltest_echo_handler(request_rec *r);
+static int curltest_put_handler(request_rec *r);
static int curltest_tweak_handler(request_rec *r);
AP_DECLARE_MODULE(curltest) = {
/* curl test handlers */
ap_hook_handler(curltest_echo_handler, NULL, NULL, APR_HOOK_MIDDLE);
+ ap_hook_handler(curltest_put_handler, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_handler(curltest_tweak_handler, NULL, NULL, APR_HOOK_MIDDLE);
}
rv = ap_pass_brigade(r->output_filters, bb);
cleanup:
- if(rv == APR_SUCCESS
- || r->status != HTTP_OK
- || c->aborted) {
+ if(rv == APR_SUCCESS ||
+ r->status != HTTP_OK ||
+ c->aborted) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "echo_handler: done");
return OK;
}
}
return AP_FILTER_ERROR;
}
+
+static int curltest_put_handler(request_rec *r)
+{
+ conn_rec *c = r->connection;
+ apr_bucket_brigade *bb;
+ apr_bucket *b;
+ apr_status_t rv;
+ char buffer[16*1024];
+ const char *ct;
+ apr_off_t rbody_len = 0;
+ const char *request_id = "none";
+ apr_time_t chunk_delay = 0;
+ apr_array_header_t *args = NULL;
+ long l;
+ int i;
+
+ if(strcmp(r->handler, "curltest-put")) {
+ return DECLINED;
+ }
+ if(r->method_number != M_PUT) {
+ return DECLINED;
+ }
+
+ if(r->args) {
+ args = apr_cstr_split(r->args, "&", 1, r->pool);
+ for(i = 0; i < args->nelts; ++i) {
+ char *s, *val, *arg = APR_ARRAY_IDX(args, i, char*);
+ s = strchr(arg, '=');
+ if(s) {
+ *s = '\0';
+ val = s + 1;
+ if(!strcmp("id", arg)) {
+ /* just an id for repeated requests with curl's url globbing */
+ request_id = val;
+ continue;
+ }
+ else if(!strcmp("chunk_delay", arg)) {
+ rv = duration_parse(&chunk_delay, val, "s");
+ if(APR_SUCCESS == rv) {
+ continue;
+ }
+ }
+ }
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "query parameter not "
+ "understood: '%s' in %s",
+ arg, r->args);
+ ap_die(HTTP_BAD_REQUEST, r);
+ return OK;
+ }
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: processing");
+ r->status = 200;
+ r->clength = -1;
+ r->chunked = 1;
+ apr_table_unset(r->headers_out, "Content-Length");
+ /* Discourage content-encodings */
+ apr_table_unset(r->headers_out, "Content-Encoding");
+ apr_table_setn(r->subprocess_env, "no-brotli", "1");
+ apr_table_setn(r->subprocess_env, "no-gzip", "1");
+
+ ct = apr_table_get(r->headers_in, "content-type");
+ ap_set_content_type(r, ct? ct : "text/plain");
+
+ bb = apr_brigade_create(r->pool, c->bucket_alloc);
+ /* copy any request body into the response */
+ if((rv = ap_setup_client_block(r, REQUEST_CHUNKED_DECHUNK))) goto cleanup;
+ if(ap_should_client_block(r)) {
+ while(0 < (l = ap_get_client_block(r, &buffer[0], sizeof(buffer)))) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r,
+ "put_handler: read %ld bytes from request body", l);
+ if(chunk_delay) {
+ apr_sleep(chunk_delay);
+ }
+ rbody_len += l;
+ }
+ }
+ /* we are done */
+ rv = apr_brigade_printf(bb, NULL, NULL, "%"APR_OFF_T_FMT, rbody_len);
+ if(APR_SUCCESS != rv) goto cleanup;
+ b = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: request read");
+
+ rv = ap_pass_brigade(r->output_filters, bb);
+
+cleanup:
+ if(rv == APR_SUCCESS
+ || r->status != HTTP_OK
+ || c->aborted) {
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler: done");
+ return OK;
+ }
+ else {
+ /* no way to know what type of error occurred */
+ ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler failed");
+ return AP_FILTER_ERROR;
+ }
+ return DECLINED;
+}
+