From: Stefan Eissing Date: Thu, 9 Feb 2023 09:49:04 +0000 (+0100) Subject: vquic: stabilization and improvements X-Git-Tag: curl-7_88_0~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c96f982166fbc671b77f84a1730e9b6d11357e8f;p=thirdparty%2Fcurl.git vquic: stabilization and improvements vquic stabilization - udp send code shared between ngtcp2 and quiche - quiche handling of data and events improved ngtcp2 and pytest improvements - fixes handling of "drain" situations, discovered in scorecard tests with the Caddy server. - improvements in handling transfers that have already data or are already closed to make an early return on recv pytest - adding caddy tests when available scorecard improvemnts. - using correct caddy port - allowing tests for only httpd or caddy Closes #10451 --- diff --git a/lib/http.h b/lib/http.h index 6c5c79d38e..735729c4ad 100644 --- a/lib/http.h +++ b/lib/http.h @@ -264,7 +264,7 @@ struct HTTP { bool upload_done; #endif /* ENABLE_QUIC */ #ifdef USE_NGHTTP3 - size_t unacked_window; + size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */ struct h3out *h3out; /* per-stream buffers for upload */ struct dynbuf overflow; /* excess data received during a single Curl_read */ #endif /* USE_NGHTTP3 */ @@ -291,6 +291,8 @@ struct HTTP { #ifdef USE_QUICHE bool h3_got_header; /* TRUE when h3 stream has recvd some HEADER */ bool h3_recving_data; /* TRUE when h3 stream is reading DATA */ + bool h3_body_pending; /* TRUE when h3 stream may have more body DATA */ + struct h3_event_node *pending; #endif /* USE_QUICHE */ }; diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index 8abd6a654c..32658c718a 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -58,6 +58,7 @@ #include "dynbuf.h" #include "select.h" #include "vquic.h" +#include "vquic_int.h" #include "h2h3.h" #include "vtls/keylog.h" #include "vtls/vtls.h" @@ -120,16 +121,9 @@ void Curl_ngtcp2_ver(char *p, size_t len) ng2->version_str, ht3->version_str); } -struct blocked_pkt { - const uint8_t *pkt; - size_t pktlen; - size_t gsolen; -}; - struct cf_ngtcp2_ctx { - curl_socket_t sockfd; - struct sockaddr_storage local_addr; - socklen_t local_addrlen; + struct cf_quic_ctx q; + ngtcp2_path connected_path; ngtcp2_conn *qconn; ngtcp2_cid dcid; ngtcp2_cid scid; @@ -147,16 +141,6 @@ struct cf_ngtcp2_ctx { WOLFSSL_CTX *sslctx; WOLFSSL *ssl; #endif - bool no_gso; - uint8_t *pktbuf; - size_t pktbuflen; - /* the number of entries in blocked_pkt */ - size_t num_blocked_pkt; - /* the number of processed entries in blocked_pkt */ - size_t num_blocked_pkt_sent; - /* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */ - struct blocked_pkt blocked_pkt[2]; - struct cf_call_data call_data; nghttp3_conn *h3conn; nghttp3_settings h3settings; @@ -235,6 +219,8 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx, { ngtcp2_settings *s = &ctx->settings; ngtcp2_transport_params *t = &ctx->transport_params; + size_t stream_win_size = CURL_MAX_READ_SIZE; + ngtcp2_settings_default(s); ngtcp2_transport_params_default(t); #ifdef DEBUG_NGTCP2 @@ -242,13 +228,19 @@ static void quic_settings(struct cf_ngtcp2_ctx *ctx, #else s->log_printf = NULL; #endif + + (void)data; s->initial_ts = timestamp(); - t->initial_max_stream_data_bidi_local = data->set.buffer_size; - t->initial_max_stream_data_bidi_remote = QUIC_MAX_STREAMS; - t->initial_max_stream_data_uni = QUIC_MAX_STREAMS; - t->initial_max_data = QUIC_MAX_DATA; - t->initial_max_streams_bidi = 1; - t->initial_max_streams_uni = 3; + s->handshake_timeout = NGTCP2_DEFAULT_HANDSHAKE_TIMEOUT; + s->max_window = 100 * stream_win_size; + s->max_stream_window = stream_win_size; + + t->initial_max_data = 10 * stream_win_size; + t->initial_max_stream_data_bidi_local = stream_win_size; + t->initial_max_stream_data_bidi_remote = stream_win_size; + t->initial_max_stream_data_uni = stream_win_size; + t->initial_max_streams_bidi = QUIC_MAX_STREAMS; + t->initial_max_streams_uni = QUIC_MAX_STREAMS; t->max_idle_timeout = QUIC_IDLE_TIMEOUT; if(ctx->qlogfd != -1) { s->qlog.write = qlog_callback; @@ -606,15 +598,39 @@ static int cb_handshake_completed(ngtcp2_conn *tconn, void *user_data) return 0; } -static void extend_stream_window(ngtcp2_conn *tconn, - struct HTTP *stream) +static void report_consumed_data(struct Curl_cfilter *cf, + struct Curl_easy *data, + size_t consumed) { - size_t thismuch = stream->unacked_window; - ngtcp2_conn_extend_max_stream_offset(tconn, stream->stream3_id, thismuch); - ngtcp2_conn_extend_max_offset(tconn, thismuch); - stream->unacked_window = 0; -} + struct HTTP *stream = data->req.p.http; + struct cf_ngtcp2_ctx *ctx = cf->ctx; + /* the HTTP/1.1 response headers are written to the buffer, but + * consuming those does not count against flow control. */ + if(stream->recv_buf_nonflow) { + if(consumed >= stream->recv_buf_nonflow) { + consumed -= stream->recv_buf_nonflow; + stream->recv_buf_nonflow = 0; + } + else { + stream->recv_buf_nonflow -= consumed; + consumed = 0; + } + } + if(consumed > 0) { + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] consumed %zu DATA bytes", + stream->stream3_id, consumed)); + ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->stream3_id, + consumed); + ngtcp2_conn_extend_max_offset(ctx->qconn, consumed); + } + if(!stream->closed && data->state.drain + && !stream->memlen + && !Curl_dyn_len(&stream->overflow)) { + /* nothing buffered any more */ + data->state.drain = 0; + } +} static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags, int64_t stream_id, uint64_t offset, @@ -631,7 +647,7 @@ static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags, nconsumed = nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin); - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] read_stream(len=%zu) -> %zd", + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] read_stream(len=%zu) -> %zd", stream_id, buflen, nconsumed)); if(nconsumed < 0) { ngtcp2_connection_close_error_set_application_error( @@ -672,23 +688,26 @@ cb_acked_stream_data_offset(ngtcp2_conn *tconn, int64_t stream_id, } static int cb_stream_close(ngtcp2_conn *tconn, uint32_t flags, - int64_t stream_id, uint64_t app_error_code, + int64_t stream3_id, uint64_t app_error_code, void *user_data, void *stream_user_data) { struct Curl_cfilter *cf = user_data; + struct Curl_easy *data = stream_user_data; struct cf_ngtcp2_ctx *ctx = cf->ctx; int rv; (void)tconn; - (void)stream_user_data; + (void)data; /* stream is closed... */ if(!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) { app_error_code = NGHTTP3_H3_NO_ERROR; } - rv = nghttp3_conn_close_stream(ctx->h3conn, stream_id, + rv = nghttp3_conn_close_stream(ctx->h3conn, stream3_id, app_error_code); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] quic close(err=%" + PRIu64 ") -> %d", stream3_id, app_error_code, rv)); if(rv) { ngtcp2_connection_close_error_set_application_error( &ctx->last_error, nghttp3_err_infer_quic_app_error_code(rv), NULL, 0); @@ -712,7 +731,7 @@ static int cb_stream_reset(ngtcp2_conn *tconn, int64_t stream_id, (void)data; rv = nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream_id); - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] reset -> %d", stream_id, rv)); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv)); if(rv) { return NGTCP2_ERR_CALLBACK_FAILURE; } @@ -874,7 +893,7 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, struct cf_call_data save; CF_DATA_SAVE(save, cf, data); - socks[0] = ctx->sockfd; + socks[0] = ctx->q.sockfd; /* in an HTTP/3 connection we can basically always get a frame so we should always be ready for one */ @@ -894,6 +913,17 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf, return rv; } +static void notify_drain(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + (void)cf; + if(!data->state.drain) { + data->state.drain = 1; + Curl_expire(data, 0, EXPIRE_RUN_NOW); + } +} + + static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, uint64_t app_error_code, void *user_data, void *stream_user_data) @@ -906,7 +936,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, (void)app_error_code; (void)cf; - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] close(err=%" PRIx64 ")", + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] h3 close(err=%" PRIx64 ")", stream_id, app_error_code)); stream->closed = TRUE; stream->error3 = app_error_code; @@ -915,64 +945,68 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, * the response before it was complete. */ stream->reset = TRUE; } - Curl_expire(data, 0, EXPIRE_QUIC); - /* make sure that ngh3_stream_recv is called again to complete the transfer - even if there are no more packets to be received from the server. */ - data->state.drain = 1; + notify_drain(cf, data); return 0; } /* - * write_data() copies data to the stream's receive buffer. If not enough - * space is available in the receive buffer, it copies the rest to the - * stream's overflow buffer. + * write_resp_raw() copies resonse data in raw format to the `data`'s + * receive buffer. If not enough space is available, it appends to the + * `data`'s overflow buffer. */ -static CURLcode write_data(struct HTTP *stream, const void *mem, size_t memlen) +static CURLcode write_resp_raw(struct Curl_cfilter *cf, + struct Curl_easy *data, + const void *mem, size_t memlen, + bool flow) { + struct HTTP *stream = data->req.p.http; CURLcode result = CURLE_OK; const char *buf = mem; size_t ncopy = memlen; /* copy as much as possible to the receive buffer */ if(stream->len) { size_t len = CURLMIN(ncopy, stream->len); - memcpy(stream->mem, buf, len); + memcpy(stream->mem + stream->memlen, buf, len); stream->len -= len; stream->memlen += len; - stream->mem += len; buf += len; ncopy -= len; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] resp_raw: added %zu bytes" + " to data buffer", stream->stream3_id, len)); } /* copy the rest to the overflow buffer */ if(ncopy) { result = Curl_dyn_addn(&stream->overflow, buf, ncopy); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] resp_raw: added %zu bytes" + " to overflow buffer -> %d", + stream->stream3_id, ncopy, result)); + notify_drain(cf, data); + } + + if(!flow) + stream->recv_buf_nonflow += memlen; + if(CF_DATA_CURRENT(cf) != data) { + notify_drain(cf, data); } return result; } -static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream_id, +static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id, const uint8_t *buf, size_t buflen, void *user_data, void *stream_user_data) { struct Curl_cfilter *cf = user_data; struct Curl_easy *data = stream_user_data; - struct HTTP *stream = data->req.p.http; - CURLcode result = CURLE_OK; + CURLcode result; + (void)conn; - (void)cf; + (void)stream3_id; - result = write_data(stream, buf, buflen); - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] recv_data(len=%zu) -> %d", - stream_id, buflen, result)); - if(result) { - return -1; - } - stream->unacked_window += buflen; - (void)stream_id; - (void)user_data; - return 0; + result = write_resp_raw(cf, data, buf, buflen, TRUE); + return result? -1 : 0; } -static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream_id, +static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream3_id, size_t consumed, void *user_data, void *stream_user_data) { @@ -980,9 +1014,10 @@ static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream_id, struct cf_ngtcp2_ctx *ctx = cf->ctx; (void)conn; (void)stream_user_data; - (void)stream_id; - ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream_id, consumed); + /* nghttp3 has consumed bytes on the QUIC stream and we need to + * tell the QUIC connection to increase its flow control */ + ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream3_id, consumed); ngtcp2_conn_extend_max_offset(ctx->qconn, consumed); return 0; } @@ -1028,13 +1063,13 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id, /* add a CRLF only if we've received some headers */ if(stream->firstheader) { - result = write_data(stream, "\r\n", 2); + result = write_resp_raw(cf, data, "\r\n", 2, FALSE); if(result) { return -1; } } - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] end_headers(status_code=%d", + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] end_headers(status_code=%d", stream_id, stream->status_code)); if(stream->status_code / 100 != 1) { stream->bodystarted = TRUE; @@ -1062,41 +1097,43 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t stream_id, if(token == NGHTTP3_QPACK_TOKEN__STATUS) { char line[14]; /* status line is always 13 characters long */ size_t ncopy; + + DEBUGASSERT(!stream->firstheader); stream->status_code = decode_status_code(h3val.base, h3val.len); DEBUGASSERT(stream->status_code != -1); ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n", stream->status_code); - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] status: %s", + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] status: %s", stream_id, line)); - result = write_data(stream, line, ncopy); + result = write_resp_raw(cf, data, line, ncopy, FALSE); if(result) { return -1; } + stream->firstheader = TRUE; } else { /* store as an HTTP1-style header */ - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] header: %.*s: %.*s", + DEBUGASSERT(stream->firstheader); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] header: %.*s: %.*s", stream_id, (int)h3name.len, h3name.base, (int)h3val.len, h3val.base)); - result = write_data(stream, h3name.base, h3name.len); + result = write_resp_raw(cf, data, h3name.base, h3name.len, FALSE); if(result) { return -1; } - result = write_data(stream, ": ", 2); + result = write_resp_raw(cf, data, ": ", 2, FALSE); if(result) { return -1; } - result = write_data(stream, h3val.base, h3val.len); + result = write_resp_raw(cf, data, h3val.base, h3val.len, FALSE); if(result) { return -1; } - result = write_data(stream, "\r\n", 2); + result = write_resp_raw(cf, data, "\r\n", 2, FALSE); if(result) { return -1; } } - - stream->firstheader = TRUE; return 0; } @@ -1130,7 +1167,7 @@ static int cb_h3_reset_stream(nghttp3_conn *conn, int64_t stream_id, rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, stream_id, app_error_code); - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] reset -> %d", stream_id, rv)); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv)); if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) { return NGTCP2_ERR_CALLBACK_FAILURE; } @@ -1215,22 +1252,74 @@ static int init_ngh3_conn(struct Curl_cfilter *cf) return result; } -static size_t drain_overflow_buffer(struct HTTP *stream) +static void drain_overflow_buffer(struct Curl_cfilter *cf, + struct Curl_easy *data) { + struct HTTP *stream = data->req.p.http; size_t overlen = Curl_dyn_len(&stream->overflow); size_t ncopy = CURLMIN(overlen, stream->len); + + (void)cf; if(ncopy > 0) { - memcpy(stream->mem, Curl_dyn_ptr(&stream->overflow), ncopy); + memcpy(stream->mem + stream->memlen, + Curl_dyn_ptr(&stream->overflow), ncopy); stream->len -= ncopy; - stream->mem += ncopy; stream->memlen += ncopy; if(ncopy != overlen) /* make the buffer only keep the tail */ (void)Curl_dyn_tail(&stream->overflow, overlen - ncopy); - else + else { Curl_dyn_reset(&stream->overflow); + } } - return ncopy; +} + +static ssize_t recv_closed_stream(struct Curl_cfilter *cf, + struct Curl_easy *data, + CURLcode *err) +{ + struct HTTP *stream = data->req.p.http; + ssize_t nread = -1; + + if(stream->reset) { + failf(data, + "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id); + *err = CURLE_PARTIAL_FILE; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, was reset -> %d", + stream->stream3_id, *err)); + goto out; + } + else if(stream->error3 != NGHTTP3_H3_NO_ERROR) { + failf(data, + "HTTP/3 stream %" PRId64 " was not closed cleanly: (err 0x%" PRIx64 + ")", + stream->stream3_id, stream->error3); + *err = CURLE_HTTP3; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed uncleanly" + " -> %d", stream->stream3_id, *err)); + goto out; + } + + if(!stream->bodystarted) { + failf(data, + "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting" + " all response header fields, treated as error", + stream->stream3_id); + *err = CURLE_HTTP3; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed incomplete" + " -> %d", stream->stream3_id, *err)); + goto out; + } + else { + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed ok" + " -> %d", stream->stream3_id, *err)); + } + *err = CURLE_OK; + nread = 0; + +out: + data->state.drain = 0; + return nread; } /* incoming data frames on the h3 stream */ @@ -1249,85 +1338,72 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, DEBUGASSERT(ctx->h3conn); *err = CURLE_OK; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) start", + stream->stream3_id, len)); + /* TODO: this implementation of response DATA buffering is fragile. + * It makes the following assumptions: + * - the `buf` passed here has the same lifetime as the easy handle + * - data returned in `buf` from this call is immediately used and `buf` + * can be overwritten during any handling of other transfers at + * this connection. + */ if(!stream->memlen) { - /* remember where to store incoming data for this stream and how big the - buffer is */ + /* `buf` was not known before or is currently not used by stream, + * assign it (again). */ stream->mem = buf; stream->len = len; } - /* else, there's data in the buffer already */ - /* if there's data in the overflow buffer from a previous call, copy as much - as possible to the receive buffer before receiving more */ - drain_overflow_buffer(stream); + /* if there's data in the overflow buffer, move as much + as possible to the receive buffer now */ + drain_overflow_buffer(cf, data); if(cf_process_ingress(cf, data)) { *err = CURLE_RECV_ERROR; nread = -1; goto out; } - if(cf_flush_egress(cf, data)) { - *err = CURLE_SEND_ERROR; - nread = -1; - goto out; - } if(stream->memlen) { nread = stream->memlen; - /* data arrived */ /* reset to allow more data to come */ - stream->memlen = 0; + /* TODO: very brittle buffer use design: + * - stream->mem has now `nread` bytes of response data + * - we assume that the caller will use those immediately and + * we can overwrite that with new data on our next invocation from + * anywhere. + */ stream->mem = buf; + stream->memlen = 0; stream->len = len; /* extend the stream window with the data we're consuming and send out any additional packets to tell the server that we can receive more */ - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] recv, consumed %zd bytes", + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv -> %zd bytes", stream->stream3_id, nread)); - extend_stream_window(ctx->qconn, stream); + report_consumed_data(cf, data, nread); if(cf_flush_egress(cf, data)) { *err = CURLE_SEND_ERROR; nread = -1; - goto out; } goto out; } if(stream->closed) { - if(stream->reset) { - failf(data, - "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id); - *err = CURLE_PARTIAL_FILE; - nread = -1; - goto out; - } - else if(stream->error3 != NGHTTP3_H3_NO_ERROR) { - failf(data, - "HTTP/3 stream %" PRId64 " was not closed cleanly: (err 0x%" PRIx64 - ")", - stream->stream3_id, stream->error3); - *err = CURLE_HTTP3; - nread = -1; - goto out; - } - - if(!stream->bodystarted) { - failf(data, - "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting" - " all response header fields, treated as error", - stream->stream3_id); - *err = CURLE_HTTP3; - nread = -1; - goto out; - } - - nread = 0; + nread = recv_closed_stream(cf, data, err); goto out; } - DEBUGF(LOG_CF(data, cf, "cf_ngtcp2_recv returns EAGAIN")); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv -> EAGAIN", + stream->stream3_id)); *err = CURLE_AGAIN; nread = -1; out: + if(cf_flush_egress(cf, data)) { + *err = CURLE_SEND_ERROR; + nread = -1; + goto out; + } + CF_DATA_RESTORE(cf, save); return nread; } @@ -1457,6 +1533,7 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf, stream->stream3_id = stream3_id; stream->h3req = TRUE; Curl_dyn_init(&stream->overflow, CURL_MAX_READ_SIZE); + stream->recv_buf_nonflow = 0; result = Curl_pseudo_headers(data, mem, len, NULL, &hreq); if(result) @@ -1500,8 +1577,6 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf, } stream->h3out = h3out; - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] sending request %s, with_body=%d", - stream->stream3_id, data->state.url, !!stream->upload_left)); rc = nghttp3_conn_submit_request(ctx->h3conn, stream->stream3_id, nva, nheader, &data_reader, data); if(rc) @@ -1509,8 +1584,6 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf, break; } default: - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] sending request %s", - stream->stream3_id, data->state.url)); stream->upload_left = 0; /* nothing left to send */ rc = nghttp3_conn_submit_request(ctx->h3conn, stream->stream3_id, nva, nheader, NULL, data); @@ -1521,9 +1594,9 @@ static CURLcode h3_stream_open(struct Curl_cfilter *cf, Curl_safefree(nva); - infof(data, "Using HTTP/3 Stream ID: %" PRIx64 " (easy handle %p)", + infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)", stream3_id, (void *)data); - DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] opened for %s", + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s", stream3_id, data->state.url)); Curl_pseudo_free(hreq); @@ -1533,12 +1606,12 @@ fail: if(rc) { switch(rc) { case NGHTTP3_ERR_CONN_CLOSING: - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] failed to send, " + DEBUGF(LOG_CF(data, cf, "h3sid[%"PRId64"] failed to send, " "connection is closing", stream->stream3_id)); result = CURLE_RECV_ERROR; break; default: - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] failed to send -> %d (%s)", + DEBUGF(LOG_CF(data, cf, "h3sid[%"PRId64"] failed to send -> %d (%s)", stream->stream3_id, rc, ngtcp2_strerror(rc))); result = CURLE_SEND_ERROR; break; @@ -1685,6 +1758,7 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, int rv; uint8_t buf[65536]; size_t bufsize = sizeof(buf); + size_t pktcount = 0, total_recvd = 0; struct sockaddr_storage remote_addr; socklen_t remote_addrlen; ngtcp2_path path; @@ -1693,7 +1767,7 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, for(;;) { remote_addrlen = sizeof(remote_addr); - while((recvd = recvfrom(ctx->sockfd, (char *)buf, bufsize, 0, + while((recvd = recvfrom(ctx->q.sockfd, (char *)buf, bufsize, 0, (struct sockaddr *)&remote_addr, &remote_addrlen)) == -1 && SOCKERRNO == EINTR) @@ -1703,7 +1777,7 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, DEBUGF(LOG_CF(data, cf, "ingress, recvfrom -> EAGAIN")); goto out; } - if(SOCKERRNO == ECONNREFUSED) { + if(!cf->connected && SOCKERRNO == ECONNREFUSED) { const char *r_ip; int r_port; Curl_cf_socket_peek(cf->next, data, NULL, NULL, @@ -1722,12 +1796,14 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, ctx->got_first_byte = TRUE; } - ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->local_addr, - ctx->local_addrlen); + ++pktcount; + total_recvd += recvd; + + ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->q.local_addr, + ctx->q.local_addrlen); ngtcp2_addr_init(&path.remote, (struct sockaddr *)&remote_addr, remote_addrlen); - DEBUGF(LOG_CF(data, cf, "ingress, recvd pkt of %zd bytes", recvd)); rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, buf, recvd, ts); if(rv) { DEBUGF(LOG_CF(data, cf, "ingress, read_pkt -> %s", @@ -1753,190 +1829,10 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, } out: - return CURLE_OK; -} - -static CURLcode do_sendmsg(struct Curl_cfilter *cf, - struct Curl_easy *data, - const uint8_t *pkt, size_t pktlen, size_t gsolen, - size_t *sent); - -static CURLcode send_packet_no_gso(struct Curl_cfilter *cf, - struct Curl_easy *data, - const uint8_t *pkt, size_t pktlen, - size_t gsolen, size_t *psent) -{ - const uint8_t *p, *end = pkt + pktlen; - size_t sent; - - *psent = 0; - - for(p = pkt; p < end; p += gsolen) { - size_t len = CURLMIN(gsolen, (size_t)(end - p)); - CURLcode curlcode = do_sendmsg(cf, data, p, len, len, &sent); - if(curlcode != CURLE_OK) { - return curlcode; - } - *psent += sent; - } - - return CURLE_OK; -} - -static CURLcode do_sendmsg(struct Curl_cfilter *cf, - struct Curl_easy *data, - const uint8_t *pkt, size_t pktlen, size_t gsolen, - size_t *psent) -{ - struct cf_ngtcp2_ctx *ctx = cf->ctx; -#ifdef HAVE_SENDMSG - struct iovec msg_iov; - struct msghdr msg = {0}; - ssize_t sent; -#if defined(__linux__) && defined(UDP_SEGMENT) - uint8_t msg_ctrl[32]; - struct cmsghdr *cm; -#endif - - *psent = 0; - msg_iov.iov_base = (uint8_t *)pkt; - msg_iov.iov_len = pktlen; - msg.msg_iov = &msg_iov; - msg.msg_iovlen = 1; - -#if defined(__linux__) && defined(UDP_SEGMENT) - if(pktlen > gsolen) { - /* Only set this, when we need it. macOS, for example, - * does not seem to like a msg_control of length 0. */ - msg.msg_control = msg_ctrl; - assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t))); - msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); - cm = CMSG_FIRSTHDR(&msg); - cm->cmsg_level = SOL_UDP; - cm->cmsg_type = UDP_SEGMENT; - cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); - *(uint16_t *)(void *)CMSG_DATA(cm) = gsolen & 0xffff; - } -#endif - - - while((sent = sendmsg(ctx->sockfd, &msg, 0)) == -1 && SOCKERRNO == EINTR) - ; - - if(sent == -1) { - switch(SOCKERRNO) { - case EAGAIN: -#if EAGAIN != EWOULDBLOCK - case EWOULDBLOCK: -#endif - return CURLE_AGAIN; - case EMSGSIZE: - /* UDP datagram is too large; caused by PMTUD. Just let it be lost. */ - break; - case EIO: - if(pktlen > gsolen) { - /* GSO failure */ - failf(data, "sendmsg() returned %zd (errno %d); disable GSO", sent, - SOCKERRNO); - ctx->no_gso = TRUE; - return send_packet_no_gso(cf, data, pkt, pktlen, gsolen, psent); - } - /* FALLTHROUGH */ - default: - failf(data, "sendmsg() returned %zd (errno %d)", sent, SOCKERRNO); - return CURLE_SEND_ERROR; - } - } - else { - assert(pktlen == (size_t)sent); - } -#else - ssize_t sent; - (void)gsolen; - - *psent = 0; - - while((sent = send(ctx->sockfd, (const char *)pkt, pktlen, 0)) == -1 && - SOCKERRNO == EINTR) - ; - - if(sent == -1) { - if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) { - return CURLE_AGAIN; - } - else { - failf(data, "send() returned %zd (errno %d)", sent, SOCKERRNO); - if(SOCKERRNO != EMSGSIZE) { - return CURLE_SEND_ERROR; - } - /* UDP datagram is too large; caused by PMTUD. Just let it be - lost. */ - } - } -#endif - - *psent = pktlen; - - return CURLE_OK; -} - -static CURLcode send_packet(struct Curl_cfilter *cf, - struct Curl_easy *data, - const uint8_t *pkt, size_t pktlen, size_t gsolen, - size_t *psent) -{ - struct cf_ngtcp2_ctx *ctx = cf->ctx; - - DEBUGF(LOG_CF(data, cf, "egress, send %zu bytes", pktlen)); - if(ctx->no_gso && pktlen > gsolen) { - return send_packet_no_gso(cf, data, pkt, pktlen, gsolen, psent); - } - - return do_sendmsg(cf, data, pkt, pktlen, gsolen, psent); -} - -static void push_blocked_pkt(struct Curl_cfilter *cf, const uint8_t *pkt, - size_t pktlen, size_t gsolen) -{ - struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct blocked_pkt *blkpkt; - - assert(ctx->num_blocked_pkt < - sizeof(ctx->blocked_pkt) / sizeof(ctx->blocked_pkt[0])); - - blkpkt = &ctx->blocked_pkt[ctx->num_blocked_pkt++]; - - blkpkt->pkt = pkt; - blkpkt->pktlen = pktlen; - blkpkt->gsolen = gsolen; -} - -static CURLcode send_blocked_pkt(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct cf_ngtcp2_ctx *ctx = cf->ctx; - size_t sent; - CURLcode curlcode; - struct blocked_pkt *blkpkt; - - for(; ctx->num_blocked_pkt_sent < ctx->num_blocked_pkt; - ++ctx->num_blocked_pkt_sent) { - blkpkt = &ctx->blocked_pkt[ctx->num_blocked_pkt_sent]; - curlcode = send_packet(cf, data, blkpkt->pkt, - blkpkt->pktlen, blkpkt->gsolen, &sent); - - if(curlcode) { - if(curlcode == CURLE_AGAIN) { - blkpkt->pkt += sent; - blkpkt->pktlen -= sent; - } - return curlcode; - } - } - - ctx->num_blocked_pkt = 0; - ctx->num_blocked_pkt_sent = 0; - + (void)pktcount; + (void)total_recvd; + DEBUGF(LOG_CF(data, cf, "ingress, recvd %zu packets with %zd bytes", + pktcount, total_recvd)); return CURLE_OK; } @@ -1947,15 +1843,15 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, int rv; size_t sent; ngtcp2_ssize outlen; - uint8_t *outpos = ctx->pktbuf; + uint8_t *outpos = ctx->q.pktbuf; size_t max_udp_payload_size = ngtcp2_conn_get_max_tx_udp_payload_size(ctx->qconn); size_t path_max_udp_payload_size = ngtcp2_conn_get_path_max_tx_udp_payload_size(ctx->qconn); size_t max_pktcnt = - CURLMIN(MAX_PKT_BURST, ctx->pktbuflen / max_udp_payload_size); + CURLMIN(MAX_PKT_BURST, ctx->q.pktbuflen / max_udp_payload_size); size_t pktcnt = 0; - size_t gsolen; + size_t gsolen = 0; /* this disables gso until we have a clue */ ngtcp2_path_storage ps; ngtcp2_tstamp ts = timestamp(); ngtcp2_tstamp expiry; @@ -1977,8 +1873,8 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, return CURLE_SEND_ERROR; } - if(ctx->num_blocked_pkt) { - curlcode = send_blocked_pkt(cf, data); + if(ctx->q.num_blocked_pkt) { + curlcode = vquic_send_blocked_pkt(cf, data, &ctx->q); if(curlcode) { if(curlcode == CURLE_AGAIN) { Curl_expire(data, 1, EXPIRE_QUIC); @@ -2015,22 +1911,24 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, &ndatalen, flags, stream_id, (const ngtcp2_vec *)vec, veccnt, ts); if(outlen == 0) { - if(outpos != ctx->pktbuf) { - curlcode = send_packet(cf, data, ctx->pktbuf, - outpos - ctx->pktbuf, gsolen, &sent); + /* ngtcp2 does not want to send more packets, if the buffer is + * not empty, send that now */ + if(outpos != ctx->q.pktbuf) { + curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf, + outpos - ctx->q.pktbuf, gsolen, &sent); if(curlcode) { if(curlcode == CURLE_AGAIN) { - push_blocked_pkt(cf, ctx->pktbuf + sent, - outpos - ctx->pktbuf - sent, - gsolen); + vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent, + outpos - ctx->q.pktbuf - sent, + gsolen); Curl_expire(data, 1, EXPIRE_QUIC); return CURLE_OK; } return curlcode; } } - - break; + /* done for now */ + goto out; } if(outlen < 0) { switch(outlen) { @@ -2043,6 +1941,8 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, nghttp3_conn_shutdown_stream_write(ctx->h3conn, stream_id); continue; case NGTCP2_ERR_WRITE_MORE: + /* ngtcp2 wants to send more. update the flow of the stream whose data + * is in the buffer and continue */ assert(ndatalen >= 0); rv = nghttp3_conn_add_write_offset(ctx->h3conn, stream_id, ndatalen); if(rv) { @@ -2061,6 +1961,7 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, } } else if(ndatalen >= 0) { + /* ngtcp2 thinks it has added all it wants. Update the stream */ rv = nghttp3_conn_add_write_offset(ctx->h3conn, stream_id, ndatalen); if(rv) { failf(data, "nghttp3_conn_add_write_offset returned error: %s\n", @@ -2069,64 +1970,74 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, } } + /* advance to the end of the buffered packet data */ outpos += outlen; if(pktcnt == 0) { + /* first packet buffer chunk. use this as gsolen. It's how ngtcp2 + * indicates the intended segment size. */ gsolen = outlen; } else if((size_t)outlen > gsolen || - (gsolen > path_max_udp_payload_size && - (size_t)outlen != gsolen)) { + (gsolen > path_max_udp_payload_size && (size_t)outlen != gsolen)) { /* Packet larger than path_max_udp_payload_size is PMTUD probe packet and it might not be sent because of EMSGSIZE. Send them separately to minimize the loss. */ - curlcode = send_packet(cf, data, ctx->pktbuf, - outpos - outlen - ctx->pktbuf, gsolen, &sent); + /* send the pktbuf *before* the last addition */ + curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf, + outpos - outlen - ctx->q.pktbuf, gsolen, &sent); if(curlcode) { if(curlcode == CURLE_AGAIN) { - push_blocked_pkt(cf, ctx->pktbuf + sent, - outpos - outlen - ctx->pktbuf - sent, gsolen); - push_blocked_pkt(cf, outpos - outlen, outlen, outlen); + /* blocked, add the pktbuf *before* and *at* the last addition + * separately to the blocked packages */ + vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent, + outpos - outlen - ctx->q.pktbuf - sent, gsolen); + vquic_push_blocked_pkt(cf, &ctx->q, outpos - outlen, outlen, outlen); Curl_expire(data, 1, EXPIRE_QUIC); return CURLE_OK; } return curlcode; } - curlcode = send_packet(cf, data, outpos - outlen, outlen, - outlen, &sent); + /* send the pktbuf *at* the last addition */ + curlcode = vquic_send_packet(cf, data, &ctx->q, outpos - outlen, outlen, + outlen, &sent); if(curlcode) { if(curlcode == CURLE_AGAIN) { assert(0 == sent); - push_blocked_pkt(cf, outpos - outlen, outlen, outlen); + vquic_push_blocked_pkt(cf, &ctx->q, outpos - outlen, outlen, outlen); Curl_expire(data, 1, EXPIRE_QUIC); return CURLE_OK; } return curlcode; } - + /* pktbuf has been completely sent */ pktcnt = 0; - outpos = ctx->pktbuf; + outpos = ctx->q.pktbuf; continue; } if(++pktcnt >= max_pktcnt || (size_t)outlen < gsolen) { - curlcode = send_packet(cf, data, ctx->pktbuf, - outpos - ctx->pktbuf, gsolen, &sent); + /* enough packets or last one is shorter than the intended + * segment size, indicating that it is time to send. */ + curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf, + outpos - ctx->q.pktbuf, gsolen, &sent); if(curlcode) { if(curlcode == CURLE_AGAIN) { - push_blocked_pkt(cf, ctx->pktbuf + sent, outpos - ctx->pktbuf - sent, - gsolen); + vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent, + outpos - ctx->q.pktbuf - sent, gsolen); Curl_expire(data, 1, EXPIRE_QUIC); return CURLE_OK; } return curlcode; } - + /* pktbuf has been completely sent */ pktcnt = 0; - outpos = ctx->pktbuf; + outpos = ctx->q.pktbuf; } } +out: + /* non-errored exit. check when we should run again. */ expiry = ngtcp2_conn_get_expiry(ctx->qconn); if(expiry != UINT64_MAX) { if(expiry <= ts) { @@ -2176,16 +2087,23 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf, struct HTTP *stream = data->req.p.http; Curl_dyn_free(&stream->overflow); free(stream->h3out); +#ifdef DEBUGBUILD + if(ctx->qconn) { + ngtcp2_conn_stat stat; + ngtcp2_conn_get_conn_stat(ctx->qconn, &stat); + DEBUGF(LOG_CF(data, cf, "ngtcp2 conn stat: cwnd=%" PRIu64 ", " + "max_tx_payload=%zu", + stat.cwnd, stat.max_tx_udp_payload_size)); + } +#endif break; } - case CF_CTRL_DATA_DONE_SEND: { struct HTTP *stream = data->req.p.http; stream->upload_done = TRUE; (void)nghttp3_conn_resume_stream(ctx->h3conn, stream->stream3_id); break; } - case CF_CTRL_DATA_IDLE: if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) { if(cf_flush_egress(cf, data)) { @@ -2234,7 +2152,7 @@ static void cf_ngtcp2_ctx_clear(struct cf_ngtcp2_ctx *ctx) if(ctx->sslctx) wolfSSL_CTX_free(ctx->sslctx); #endif - free(ctx->pktbuf); + vquic_ctx_free(&ctx->q); if(ctx->h3conn) nghttp3_conn_del(ctx->h3conn); if(ctx->qconn) @@ -2262,7 +2180,7 @@ static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data) (uint8_t *)buffer, sizeof(buffer), &ctx->last_error, ts); if(rc > 0) { - while((send(ctx->sockfd, buffer, rc, 0) == -1) && + while((send(ctx->q.sockfd, buffer, rc, 0) == -1) && SOCKERRNO == EINTR); } @@ -2298,7 +2216,6 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, int rc; int rv; CURLcode result; - ngtcp2_path path; /* TODO: this must be initialized properly */ const struct Curl_sockaddr_ex *sockaddr; int qfd; @@ -2335,19 +2252,27 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, ctx->qlogfd = qfd; /* -1 if failure above */ quic_settings(ctx, data); - Curl_cf_socket_peek(cf->next, data, &ctx->sockfd, + result = vquic_ctx_init(&ctx->q, + NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE * MAX_PKT_BURST); + if(result) + return result; + + Curl_cf_socket_peek(cf->next, data, &ctx->q.sockfd, &sockaddr, NULL, NULL, NULL, NULL); - ctx->local_addrlen = sizeof(ctx->local_addr); - rv = getsockname(ctx->sockfd, (struct sockaddr *)&ctx->local_addr, - &ctx->local_addrlen); + ctx->q.local_addrlen = sizeof(ctx->q.local_addr); + rv = getsockname(ctx->q.sockfd, (struct sockaddr *)&ctx->q.local_addr, + &ctx->q.local_addrlen); if(rv == -1) return CURLE_QUIC_CONNECT_ERROR; - ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->local_addr, - ctx->local_addrlen); - ngtcp2_addr_init(&path.remote, &sockaddr->sa_addr, sockaddr->addrlen); + ngtcp2_addr_init(&ctx->connected_path.local, + (struct sockaddr *)&ctx->q.local_addr, + ctx->q.local_addrlen); + ngtcp2_addr_init(&ctx->connected_path.remote, + &sockaddr->sa_addr, sockaddr->addrlen); - rc = ngtcp2_conn_client_new(&ctx->qconn, &ctx->dcid, &ctx->scid, &path, + rc = ngtcp2_conn_client_new(&ctx->qconn, &ctx->dcid, &ctx->scid, + &ctx->connected_path, NGTCP2_PROTO_VER_V1, &ng_callbacks, &ctx->settings, &ctx->transport_params, NULL, cf); @@ -2362,24 +2287,6 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, ngtcp2_connection_close_error_default(&ctx->last_error); -#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG) - ctx->no_gso = FALSE; -#else - ctx->no_gso = TRUE; -#endif - - ctx->num_blocked_pkt = 0; - ctx->num_blocked_pkt_sent = 0; - memset(&ctx->blocked_pkt, 0, sizeof(ctx->blocked_pkt)); - - ctx->pktbuflen = NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE * MAX_PKT_BURST; - ctx->pktbuf = malloc(ctx->pktbuflen); - if(!ctx->pktbuf) { - ngtcp2_conn_del(ctx->qconn); - ctx->qconn = NULL; - return CURLE_OUT_OF_MEMORY; - } - ctx->conn_ref.get_conn = get_conn; ctx->conn_ref.user_data = cf; diff --git a/lib/vquic/curl_quiche.c b/lib/vquic/curl_quiche.c index f1bcd70b70..54408d75a3 100644 --- a/lib/vquic/curl_quiche.c +++ b/lib/vquic/curl_quiche.c @@ -40,6 +40,7 @@ #include "progress.h" #include "strerror.h" #include "vquic.h" +#include "vquic_int.h" #include "curl_quiche.h" #include "transfer.h" #include "h2h3.h" @@ -56,6 +57,9 @@ #define QUIC_MAX_DATA (1*1024*1024) #define QUIC_IDLE_TIMEOUT (60 * 1000) /* milliseconds */ +/* how many UDP packets to send max in one call */ +#define MAX_PKT_BURST 10 +#define MAX_UDP_PAYLOAD_SIZE 1452 /* * Store quiche version info in this buffer. @@ -128,14 +132,11 @@ struct quic_handshake { struct h3_event_node { struct h3_event_node *next; - int64_t stream3_id; quiche_h3_event *ev; }; struct cf_quiche_ctx { - curl_socket_t sockfd; - struct sockaddr_storage local_addr; - socklen_t local_addrlen; + struct cf_quic_ctx q; quiche_conn *qconn; quiche_config *cfg; quiche_h3_conn *h3c; @@ -143,12 +144,12 @@ struct cf_quiche_ctx { uint8_t scid[QUICHE_MAX_CONN_ID_LEN]; SSL_CTX *sslctx; SSL *ssl; - struct h3_event_node *pending; - struct curltime connect_started; /* time the current attempt started */ - struct curltime handshake_done; /* time connect handshake finished */ - int first_reply_ms; /* ms since first data arrived */ - struct curltime reconnect_at; /* time the next attempt should start */ - bool goaway; + struct curltime started_at; /* time the current attempt started */ + struct curltime handshake_at; /* time connect handshake finished */ + struct curltime first_byte_at; /* when first byte was recvd */ + struct curltime reconnect_at; /* time the next attempt should start */ + BIT(goaway); /* got GOAWAY from server */ + BIT(got_first_byte); /* if first byte was received */ }; @@ -160,24 +161,25 @@ static void quiche_debug_log(const char *line, void *argp) } #endif -static void h3_clear_pending(struct cf_quiche_ctx *ctx) +static void h3_clear_pending(struct Curl_easy *data) { - if(ctx->pending) { + struct HTTP *stream = data->req.p.http; + + if(stream->pending) { struct h3_event_node *node, *next; - for(node = ctx->pending; node; node = next) { + for(node = stream->pending; node; node = next) { next = node->next; quiche_h3_event_free(node->ev); free(node); } - ctx->pending = NULL; + stream->pending = NULL; } } static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx) { if(ctx) { - if(ctx->pending) - h3_clear_pending(ctx); + vquic_ctx_free(&ctx->q); if(ctx->qconn) quiche_conn_free(ctx->qconn); if(ctx->h3config) @@ -187,19 +189,23 @@ static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx) if(ctx->cfg) quiche_config_free(ctx->cfg); memset(ctx, 0, sizeof(*ctx)); - ctx->first_reply_ms = -1; } } +static void notify_drain(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + (void)cf; + data->state.drain = 1; + Curl_expire(data, 0, EXPIRE_RUN_NOW); +} + static CURLcode h3_add_event(struct Curl_cfilter *cf, struct Curl_easy *data, - int64_t stream3_id, quiche_h3_event *ev, - size_t *pqlen) + int64_t stream3_id, quiche_h3_event *ev) { - struct cf_quiche_ctx *ctx = cf->ctx; struct Curl_easy *mdata; - struct h3_event_node *node, **pnext = &ctx->pending; - size_t qlen; + struct h3_event_node *node, **pnext; DEBUGASSERT(data->multi); for(mdata = data->multi->easyp; mdata; mdata = mdata->next) { @@ -209,31 +215,25 @@ static CURLcode h3_add_event(struct Curl_cfilter *cf, } if(!mdata) { - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] event discarded, easy handle " + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] event discarded, easy handle " "not found", stream3_id)); quiche_h3_event_free(ev); - *pqlen = 0; return CURLE_OK; } node = calloc(sizeof(*node), 1); - if(!node) + if(!node) { + quiche_h3_event_free(ev); return CURLE_OUT_OF_MEMORY; - node->stream3_id = stream3_id; + } node->ev = ev; /* append to process them in order of arrival */ - qlen = 0; + pnext = &mdata->req.p.http->pending; while(*pnext) { pnext = &((*pnext)->next); - ++qlen; } *pnext = node; - *pqlen = qlen + 1; - if(!mdata->state.drain) { - /* tell the multi handle that this data needs processing */ - mdata->state.drain = 1; - Curl_expire(mdata, 0, EXPIRE_RUN_NOW); - } + notify_drain(cf, mdata); return CURLE_OK; } @@ -270,6 +270,71 @@ static int cb_each_header(uint8_t *name, size_t name_len, return 0; } +static ssize_t cf_recv_body(struct Curl_cfilter *cf, + struct Curl_easy *data, + char *buf, size_t len, + CURLcode *err) +{ + struct cf_quiche_ctx *ctx = cf->ctx; + struct HTTP *stream = data->req.p.http; + ssize_t nread; + size_t offset = 0; + + if(!stream->firstbody) { + /* add a header-body separator CRLF */ + offset = 2; + } + nread = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->stream3_id, + (unsigned char *)buf + offset, len - offset); + if(nread >= 0) { + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][DATA] len=%zd", + stream->stream3_id, nread)); + if(!stream->firstbody) { + stream->firstbody = TRUE; + buf[0] = '\r'; + buf[1] = '\n'; + nread += offset; + } + } + else if(nread == -1) { + *err = CURLE_AGAIN; + stream->h3_recving_data = FALSE; + } + else { + failf(data, "Error %zd in HTTP/3 response body for stream[%"PRId64"]", + nread, stream->stream3_id); + stream->closed = TRUE; + stream->reset = TRUE; + streamclose(cf->conn, "Reset of stream"); + stream->h3_recving_data = FALSE; + nread = -1; + *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; + } + return nread; +} + +#ifdef DEBUGBUILD +static const char *cf_ev_name(quiche_h3_event *ev) +{ + switch(quiche_h3_event_type(ev)) { + case QUICHE_H3_EVENT_HEADERS: + return "HEADERS"; + case QUICHE_H3_EVENT_DATA: + return "DATA"; + case QUICHE_H3_EVENT_RESET: + return "RESET"; + case QUICHE_H3_EVENT_FINISHED: + return "FINISHED"; + case QUICHE_H3_EVENT_GOAWAY: + return "GOAWAY"; + default: + return "Unknown"; + } +} +#else +#define cf_ev_name(x) "" +#endif + static ssize_t h3_process_event(struct Curl_cfilter *cf, struct Curl_easy *data, char *buf, size_t len, @@ -277,15 +342,14 @@ static ssize_t h3_process_event(struct Curl_cfilter *cf, quiche_h3_event *ev, CURLcode *err) { - struct cf_quiche_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; - ssize_t recvd = -1; - ssize_t rcode; + ssize_t recvd = 0; int rc; struct h3h1header headers; DEBUGASSERT(stream3_id == stream->stream3_id); + *err = CURLE_OK; switch(quiche_h3_event_type(ev)) { case QUICHE_H3_EVENT_HEADERS: stream->h3_got_header = TRUE; @@ -301,67 +365,42 @@ static ssize_t h3_process_event(struct Curl_cfilter *cf, break; } recvd = headers.nlen; - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, HEADERS len=%zd", + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][HEADERS] len=%zd", stream3_id, recvd)); break; case QUICHE_H3_EVENT_DATA: - if(!stream->firstbody) { - /* add a header-body separator CRLF */ - buf[0] = '\r'; - buf[1] = '\n'; - buf += 2; - len -= 2; - stream->firstbody = TRUE; - recvd = 2; /* two bytes already */ - } - else + DEBUGASSERT(!stream->closed); + stream->h3_recving_data = TRUE; + recvd = cf_recv_body(cf, data, buf, len, err); + if(recvd < 0) { + if(*err != CURLE_AGAIN) + return -1; recvd = 0; - rcode = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream3_id, - (unsigned char *)buf, len); - if(rcode <= 0) { - failf(data, "Error %zd in HTTP/3 response body for stream[%"PRId64"]", - rcode, stream3_id); - recvd = -1; - *err = CURLE_AGAIN; - break; } - stream->h3_recving_data = TRUE; - recvd += rcode; - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, DATA len=%zd", - stream3_id, rcode)); break; case QUICHE_H3_EVENT_RESET: - if(quiche_conn_is_draining(ctx->qconn) && !stream->h3_got_header) { - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] stream RESET without response, " - "connection is draining", stream3_id)); - } - else { - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, RESET", stream3_id)); - } - streamclose(cf->conn, "Stream reset"); - *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; - recvd = -1; + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][RESET]", stream3_id)); + stream->closed = TRUE; + stream->reset = TRUE; + /* streamclose(cf->conn, "Reset of stream");*/ + stream->h3_recving_data = FALSE; break; case QUICHE_H3_EVENT_FINISHED: - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, FINISHED", stream3_id)); + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][FINISHED]", stream3_id)); stream->closed = TRUE; - streamclose(cf->conn, "End of stream"); - *err = CURLE_OK; - recvd = 0; /* end of stream */ + /* streamclose(cf->conn, "End of stream");*/ + stream->h3_recving_data = FALSE; break; case QUICHE_H3_EVENT_GOAWAY: - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, GOAWAY", stream3_id)); - recvd = -1; - *err = CURLE_AGAIN; - ctx->goaway = TRUE; + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][GOAWAY]", stream3_id)); break; default: - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, unhandled event %d", + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] recv, unhandled event %d", stream3_id, quiche_h3_event_type(ev))); break; } @@ -373,34 +412,28 @@ static ssize_t h3_process_pending(struct Curl_cfilter *cf, char *buf, size_t len, CURLcode *err) { - struct cf_quiche_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; - struct h3_event_node *node = ctx->pending, **pnext = &ctx->pending; + struct h3_event_node *node = stream->pending, **pnext = &stream->pending; ssize_t recvd = 0, erecvd; + *err = CURLE_OK; DEBUGASSERT(stream); - while(node) { - if(node->stream3_id == stream->stream3_id) { - erecvd = h3_process_event(cf, data, buf, len, - node->stream3_id, node->ev, err); - quiche_h3_event_free(node->ev); - *pnext = node->next; - free(node); - node = *pnext; - if(erecvd < 0) { - recvd = erecvd; - break; - } - recvd += erecvd; - if(erecvd > INT_MAX || (size_t)erecvd >= len) - break; - buf += erecvd; - len -= erecvd; - } - else { - pnext = &node->next; - node = node->next; + while(node && len) { + erecvd = h3_process_event(cf, data, buf, len, + stream->stream3_id, node->ev, err); + quiche_h3_event_free(node->ev); + *pnext = node->next; + free(node); + node = *pnext; + if(erecvd < 0) { + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] process event -> %d", + stream->stream3_id, *err)); + return erecvd; } + recvd += erecvd; + *err = CURLE_OK; + buf += erecvd; + len -= erecvd; } return recvd; } @@ -409,12 +442,14 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_quiche_ctx *ctx = cf->ctx; - ssize_t recvd; - uint8_t *buf = (uint8_t *)data->state.buffer; - size_t bufsize = data->set.buffer_size; - struct sockaddr_storage from; - socklen_t from_len; + int64_t stream3_id = data->req.p.http? data->req.p.http->stream3_id : -1; + uint8_t buf[65536]; + size_t bufsize = sizeof(buf); + struct sockaddr_storage remote_addr; + socklen_t remote_addrlen; quiche_recv_info recv_info; + ssize_t recvd, nread; + ssize_t total = 0, pkts = 0; DEBUGASSERT(ctx->qconn); @@ -422,14 +457,16 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, quiche_conn_on_timeout(ctx->qconn); do { - from_len = sizeof(from); - - recvd = recvfrom(ctx->sockfd, buf, bufsize, 0, - (struct sockaddr *)&from, &from_len); - + remote_addrlen = sizeof(remote_addr); + while((recvd = recvfrom(ctx->q.sockfd, (char *)buf, bufsize, 0, + (struct sockaddr *)&remote_addr, + &remote_addrlen)) == -1 && + SOCKERRNO == EINTR) + ; if(recvd < 0) { - if((SOCKERRNO == EAGAIN) || (SOCKERRNO == EWOULDBLOCK)) - goto out; + if((SOCKERRNO == EAGAIN) || (SOCKERRNO == EWOULDBLOCK)) { + break; + } if(SOCKERRNO == ECONNREFUSED) { const char *r_ip; int r_port; @@ -440,42 +477,50 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf, return CURLE_COULDNT_CONNECT; } failf(data, "quiche: recvfrom() unexpectedly returned %zd " - "(errno: %d, socket %d)", recvd, SOCKERRNO, ctx->sockfd); + "(errno: %d, socket %d)", recvd, SOCKERRNO, ctx->q.sockfd); return CURLE_RECV_ERROR; } - DEBUGF(LOG_CF(data, cf, "ingress, recvd %zd bytes", recvd)); - recv_info.from = (struct sockaddr *) &from; - recv_info.from_len = from_len; - recv_info.to = (struct sockaddr *) &ctx->local_addr; - recv_info.to_len = ctx->local_addrlen; - - recvd = quiche_conn_recv(ctx->qconn, buf, recvd, &recv_info); - if(recvd == QUICHE_ERR_DONE) - goto out; - - if(recvd < 0) { - if(QUICHE_ERR_TLS_FAIL == recvd) { + total += recvd; + ++pkts; + if(recvd > 0 && !ctx->got_first_byte) { + ctx->first_byte_at = Curl_now(); + ctx->got_first_byte = TRUE; + } + recv_info.from = (struct sockaddr *) &remote_addr; + recv_info.from_len = remote_addrlen; + recv_info.to = (struct sockaddr *) &ctx->q.local_addr; + recv_info.to_len = ctx->q.local_addrlen; + + nread = quiche_conn_recv(ctx->qconn, buf, recvd, &recv_info); + if(nread < 0) { + if(QUICHE_ERR_DONE == nread) { + DEBUGF(LOG_CF(data, cf, "ingress, quiche is DONE")); + return CURLE_OK; + } + else if(QUICHE_ERR_TLS_FAIL == nread) { long verify_ok = SSL_get_verify_result(ctx->ssl); if(verify_ok != X509_V_OK) { failf(data, "SSL certificate problem: %s", X509_verify_cert_error_string(verify_ok)); - return CURLE_PEER_FAILED_VERIFICATION; } } - - failf(data, "quiche_conn_recv() == %zd", recvd); - - return CURLE_RECV_ERROR; + else { + failf(data, "quiche_conn_recv() == %zd", nread); + return CURLE_RECV_ERROR; + } } - if(ctx->first_reply_ms < 0) { - timediff_t ms = Curl_timediff(Curl_now(), ctx->connect_started); - ctx->first_reply_ms = (ms < INT_MAX)? (int)ms : INT_MAX; + else if(nread < recvd) { + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] ingress, quiche only " + "accepted %zd/%zd bytes", + stream3_id, nread, recvd)); } - } while(1); -out: + } while(pkts < 1000); /* arbitrary */ + + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] ingress, recvd %zd bytes " + "in %zd packets", stream3_id, total, pkts)); return CURLE_OK; } @@ -487,129 +532,236 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_quiche_ctx *ctx = cf->ctx; - ssize_t sent; - uint8_t out[1200]; - int64_t timeout_ns; + int64_t stream3_id = data->req.p.http? data->req.p.http->stream3_id : -1; quiche_send_info send_info; + ssize_t outlen, total_len = 0; + size_t max_udp_payload_size = + quiche_conn_max_send_udp_payload_size(ctx->qconn); + size_t gsolen = max_udp_payload_size; + size_t sent, pktcnt = 0; + CURLcode result; + int64_t timeout_ns; - do { - sent = quiche_conn_send(ctx->qconn, out, sizeof(out), &send_info); - if(sent == QUICHE_ERR_DONE) - break; + ctx->q.no_gso = TRUE; + if(ctx->q.num_blocked_pkt) { + result = vquic_send_blocked_pkt(cf, data, &ctx->q); + if(result) { + if(result == CURLE_AGAIN) { + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, still not " + "able to send blocked packet", stream3_id)); + Curl_expire(data, 1, EXPIRE_QUIC); + return CURLE_OK; + } + goto out; + } + } - if(sent < 0) { - failf(data, "quiche_conn_send returned %zd", sent); - return CURLE_SEND_ERROR; + for(;;) { + outlen = quiche_conn_send(ctx->qconn, ctx->q.pktbuf, max_udp_payload_size, + &send_info); + if(outlen == QUICHE_ERR_DONE) { + result = CURLE_OK; + goto out; } - DEBUGF(LOG_CF(data, cf, "egress, send %zu bytes", sent)); - sent = send(ctx->sockfd, out, sent, 0); - if(sent < 0) { - failf(data, "send() returned %zd", sent); - return CURLE_SEND_ERROR; + if(outlen < 0) { + failf(data, "quiche_conn_send returned %zd", outlen); + result = CURLE_SEND_ERROR; + goto out; } - } while(1); - /* time until the next timeout event, as nanoseconds. */ - timeout_ns = quiche_conn_timeout_as_nanos(ctx->qconn); - if(timeout_ns) - /* expire uses milliseconds */ - Curl_expire(data, (timeout_ns + 999999) / 1000000, EXPIRE_QUIC); + /* send the pktbuf *before* the last addition */ + result = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf, + outlen, gsolen, &sent); + ++pktcnt; + total_len += outlen; + if(result) { + if(result == CURLE_AGAIN) { + /* blocked, add the pktbuf *before* and *at* the last addition + * separately to the blocked packages */ + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, pushing blocked " + "packet with %zd bytes", stream3_id, outlen)); + vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf, outlen, gsolen); + Curl_expire(data, 1, EXPIRE_QUIC); + return CURLE_OK; + } + goto out; + } + } - return CURLE_OK; +out: + timeout_ns = quiche_conn_timeout_as_nanos(ctx->qconn); + if(timeout_ns % 1000000) + timeout_ns += 1000000; + /* expire resolution is milliseconds */ + Curl_expire(data, (timeout_ns / 1000000), EXPIRE_QUIC); + if(pktcnt) + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, sent %zd packets " + "with %zd bytes", stream3_id, pktcnt, total_len)); + return result; } -static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data, - char *buf, size_t len, CURLcode *err) +static ssize_t recv_closed_stream(struct Curl_cfilter *cf, + struct Curl_easy *data, + CURLcode *err) { - struct cf_quiche_ctx *ctx = cf->ctx; - ssize_t recvd = -1; - ssize_t rcode; - quiche_h3_event *ev; struct HTTP *stream = data->req.p.http; + ssize_t nread = -1; - *err = CURLE_AGAIN; - /* process any pending events for `data` first. if there are, - * return so the transfer can handle those. We do not want to - * progress ingress while events are pending here. */ - recvd = h3_process_pending(cf, data, buf, len, err); - if(recvd < 0) { - goto out; - } - else if(recvd > 0) { - *err = CURLE_OK; + if(stream->reset) { + failf(data, + "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id); + *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, was reset -> %d", + stream->stream3_id, *err)); goto out; } - recvd = -1; - if(cf_process_ingress(cf, data)) { - DEBUGF(LOG_CF(data, cf, "h3_stream_recv returns on ingress")); + if(!stream->h3_got_header) { + failf(data, + "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting" + " all response header fields, treated as error", + stream->stream3_id); + /* *err = CURLE_PARTIAL_FILE; */ *err = CURLE_RECV_ERROR; + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed incomplete" + " -> %d", stream->stream3_id, *err)); goto out; } - - if(stream->h3_recving_data) { - /* body receiving state */ - rcode = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->stream3_id, - (unsigned char *)buf, len); - if(rcode <= 0) { - stream->h3_recving_data = FALSE; - /* fall through into the while loop below */ - } - else { - *err = CURLE_OK; - recvd = rcode; - goto out; - } + else { + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed ok" + " -> %d", stream->stream3_id, *err)); } + *err = CURLE_OK; + nread = 0; - while(recvd < 0) { +out: + return nread; +} + +static CURLcode cf_poll_events(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct cf_quiche_ctx *ctx = cf->ctx; + struct HTTP *stream = data->req.p.http; + quiche_h3_event *ev; + + /* Take in the events and distribute them to the transfers. */ + while(1) { int64_t stream3_id = quiche_h3_conn_poll(ctx->h3c, ctx->qconn, &ev); - if(stream3_id < 0) + if(stream3_id < 0) { /* nothing more to do */ break; + } + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] recv, queue event %s " + "for [h3sid=%"PRId64"]", + stream? stream->stream3_id : -1, cf_ev_name(ev), + stream3_id)); + if(h3_add_event(cf, data, stream3_id, ev) != CURLE_OK) { + return CURLE_OUT_OF_MEMORY; + } + } + return CURLE_OK; +} + +static ssize_t cf_recv_transfer_data(struct Curl_cfilter *cf, + struct Curl_easy *data, + char *buf, size_t len, + CURLcode *err) +{ + struct HTTP *stream = data->req.p.http; + ssize_t recvd = -1; + size_t offset = 0; - if(stream3_id == stream->stream3_id) { - recvd = h3_process_event(cf, data, buf, len, stream3_id, ev, err); - quiche_h3_event_free(ev); + if(stream->h3_recving_data) { + /* try receiving body first */ + recvd = cf_recv_body(cf, data, buf, len, err); + if(recvd < 0) { + if(*err != CURLE_AGAIN) + return -1; + recvd = 0; } - else { - size_t qlen; - /* event for another transfer, preserver for later */ - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, queue event " - "for h3[%"PRId64"]", stream->stream3_id, stream3_id)); - if(h3_add_event(cf, data, stream3_id, ev, &qlen) != CURLE_OK) { - *err = CURLE_OUT_OF_MEMORY; - goto out; - } - if(qlen > 20) { - Curl_expire(data, 0, EXPIRE_QUIC); - break; - } + if(recvd > 0) { + offset = recvd; } } - if(cf_flush_egress(cf, data)) { - DEBUGF(LOG_CF(data, cf, "recv(), flush egress failed")); - *err = CURLE_SEND_ERROR; - recvd = -1; - goto out; + if(offset < len && stream->pending) { + /* process any pending events for `data` first. if there are, + * return so the transfer can handle those. We do not want to + * progress ingress while events are pending here. */ + recvd = h3_process_pending(cf, data, buf + offset, len - offset, err); + if(recvd < 0) { + if(*err != CURLE_AGAIN) + return -1; + recvd = 0; + } + if(recvd > 0) { + offset += recvd; + } } - if(recvd >= 0) { - /* Get this called again to drain the event queue */ - Curl_expire(data, 0, EXPIRE_QUIC); + if(offset) { *err = CURLE_OK; + return offset; } - else if(stream->closed) { - *err = CURLE_OK; + *err = CURLE_AGAIN; + return 0; +} + +static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data, + char *buf, size_t len, CURLcode *err) +{ + struct HTTP *stream = data->req.p.http; + ssize_t recvd = -1; + + *err = CURLE_AGAIN; + + recvd = cf_recv_transfer_data(cf, data, buf, len, err); + if(recvd) + goto out; + if(stream->closed) { + recvd = recv_closed_stream(cf, data, err); + goto out; + } + + /* we did get nothing from the quiche buffers or pending events. + * Take in more data from the connection, any error is fatal */ + if(cf_process_ingress(cf, data)) { + DEBUGF(LOG_CF(data, cf, "h3_stream_recv returns on ingress")); + *err = CURLE_RECV_ERROR; recvd = -1; + goto out; + } + /* poll quiche and distribute the events to the transfers */ + *err = cf_poll_events(cf, data); + if(*err) { + recvd = -1; + goto out; } + /* try to receive again for this transfer */ + recvd = cf_recv_transfer_data(cf, data, buf, len, err); + if(recvd) + goto out; + if(stream->closed) { + recvd = recv_closed_stream(cf, data, err); + goto out; + } + recvd = -1; + *err = CURLE_AGAIN; + data->state.drain = 0; + out: - data->state.drain = (recvd >= 0) ? 1 : 0; - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv -> %ld, err=%d", - stream->stream3_id, (long)recvd, *err)); + if(cf_flush_egress(cf, data)) { + DEBUGF(LOG_CF(data, cf, "cf_recv, flush egress failed")); + *err = CURLE_SEND_ERROR; + return -1; + } + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] cf_recv -> %zd, err=%d", + stream->stream3_id, recvd, *err)); + if(recvd > 0) + notify_drain(cf, data); return recvd; } @@ -630,8 +782,9 @@ static CURLcode cf_http_request(struct Curl_cfilter *cf, CURLcode result = CURLE_OK; struct h2h3req *hreq = NULL; - DEBUGF(LOG_CF(data, cf, "cf_http_request %s", data->state.url)); stream->h3req = TRUE; /* send off! */ + stream->closed = FALSE; + stream->reset = FALSE; result = Curl_pseudo_headers(data, mem, len, NULL, &hreq); if(result) @@ -664,30 +817,41 @@ static CURLcode cf_http_request(struct Curl_cfilter *cf, /* data sending without specifying the data amount up front */ stream->upload_left = -1; /* unknown, but not zero */ + stream->upload_done = !stream->upload_left; stream3_id = quiche_h3_send_request(ctx->h3c, ctx->qconn, nva, nheader, - stream->upload_left ? FALSE: TRUE); - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] send request %s, upload=%zu", - stream3_id, data->state.url, stream->upload_left)); + stream->upload_done); break; default: + stream->upload_left = 0; + stream->upload_done = TRUE; stream3_id = quiche_h3_send_request(ctx->h3c, ctx->qconn, nva, nheader, TRUE); - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] send request %s", - stream3_id, data->state.url)); break; } Curl_safefree(nva); if(stream3_id < 0) { - DEBUGF(LOG_CF(data, cf, "quiche_h3_send_request returned %ld", - (long)stream3_id)); + if(QUICHE_H3_ERR_STREAM_BLOCKED == stream3_id) { + DEBUGF(LOG_CF(data, cf, "send_request(%s, body_len=%ld) rejected " + "with H3_ERR_STREAM_BLOCKED", + data->state.url, (long)stream->upload_left)); + result = CURLE_AGAIN; + goto fail; + } + else { + DEBUGF(LOG_CF(data, cf, "send_request(%s, body_len=%ld) -> %" PRId64, + data->state.url, (long)stream->upload_left, stream3_id)); + } result = CURLE_SEND_ERROR; goto fail; } - DEBUGF(LOG_CF(data, cf, "Using HTTP/3 Stream ID: %"PRId64, stream3_id)); stream->stream3_id = stream3_id; + infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)", + stream3_id, (void *)data); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s", + stream3_id, data->state.url)); Curl_pseudo_free(hreq); return CURLE_OK; @@ -703,32 +867,44 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data, { struct cf_quiche_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; - ssize_t sent; + ssize_t nwritten; + + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu) start", + stream->h3req? stream->stream3_id : -1, len)); + *err = cf_process_ingress(cf, data); + if(*err) + return -1; - DEBUGF(LOG_CF(data, cf, "cf_quiche_send(len=%zu) %s", len, data->state.url)); if(!stream->h3req) { CURLcode result = cf_http_request(cf, data, buf, len); if(result) { - *err = CURLE_SEND_ERROR; + *err = result; return -1; } - sent = len; + nwritten = len; } else { - sent = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id, - (uint8_t *)buf, len, FALSE); - if(sent == QUICHE_H3_ERR_DONE) { - sent = 0; + nwritten = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id, + (uint8_t *)buf, len, FALSE); + DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] send body(len=%zu) -> %zd", + stream->stream3_id, len, nwritten)); + if(nwritten == QUICHE_H3_ERR_DONE) { + /* no error, nothing to do (flow control?) */ + *err = CURLE_AGAIN; + nwritten = -1; } - else if(sent == QUICHE_H3_TRANSPORT_ERR_FINAL_SIZE) { + else if(nwritten == QUICHE_H3_TRANSPORT_ERR_FINAL_SIZE) { DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> exceeds size", len)); *err = CURLE_SEND_ERROR; - return -1; + nwritten = -1; } - else if(sent < 0) { - DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> %zd", len, sent)); + else if(nwritten < 0) { + DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> SEND_ERROR", len)); *err = CURLE_SEND_ERROR; - return -1; + nwritten = -1; + } + else { + *err = CURLE_OK; } } @@ -737,8 +913,26 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data, return -1; } - *err = CURLE_OK; - return sent; + return nwritten; +} + +static bool stream_is_writeable(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct cf_quiche_ctx *ctx = cf->ctx; + struct HTTP *stream = data->req.p.http; + + /* surely, there must be a better way */ + quiche_stream_iter *qiter = quiche_conn_writable(ctx->qconn); + if(qiter) { + uint64_t stream_id; + while(quiche_stream_iter_next(qiter, &stream_id)) { + if(stream_id == (uint64_t)stream->stream3_id) + return TRUE; + } + quiche_stream_iter_free(qiter); + } + return FALSE; } static int cf_quiche_get_select_socks(struct Curl_cfilter *cf, @@ -749,14 +943,15 @@ static int cf_quiche_get_select_socks(struct Curl_cfilter *cf, struct SingleRequest *k = &data->req; int rv = GETSOCK_BLANK; - socks[0] = ctx->sockfd; + socks[0] = ctx->q.sockfd; /* in an HTTP/3 connection we can basically always get a frame so we should always be ready for one */ rv |= GETSOCK_READSOCK(0); /* we're still uploading or the HTTP/3 layer wants to send data */ - if((k->keepon & (KEEP_SEND|KEEP_SEND_PAUSE)) == KEEP_SEND) + if(((k->keepon & (KEEP_SEND|KEEP_SEND_PAUSE)) == KEEP_SEND) + && stream_is_writeable(cf, data)) rv |= GETSOCK_WRITESOCK(0); return rv; @@ -769,16 +964,22 @@ static int cf_quiche_get_select_socks(struct Curl_cfilter *cf, static bool cf_quiche_data_pending(struct Curl_cfilter *cf, const struct Curl_easy *data) { - struct cf_quiche_ctx *ctx = cf->ctx; struct HTTP *stream = data->req.p.http; - struct h3_event_node *node; - for(node = ctx->pending; node; node = node->next) { - if(node->stream3_id == stream->stream3_id) { - DEBUGF(LOG_CF((struct Curl_easy *)data, cf, - "h3[%"PRId64"] has data pending", stream->stream3_id)); - return TRUE; - } + if(stream->pending) { + DEBUGF(LOG_CF((struct Curl_easy *)data, cf, + "[h3sid=%"PRId64"] has event pending", stream->stream3_id)); + return TRUE; + } + if(stream->h3_recving_data) { + DEBUGF(LOG_CF((struct Curl_easy *)data, cf, + "[h3sid=%"PRId64"] is receiving DATA", stream->stream3_id)); + return TRUE; + } + if(data->state.drain) { + DEBUGF(LOG_CF((struct Curl_easy *)data, cf, + "[h3sid=%"PRId64"] is draining", stream->stream3_id)); + return TRUE; } return FALSE; } @@ -793,25 +994,34 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf, (void)arg1; (void)arg2; switch(event) { + case CF_CTRL_DATA_DONE: { + struct HTTP *stream = data->req.p.http; + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] easy handle is %s", + stream->stream3_id, arg1? "cancelled" : "done")); + h3_clear_pending(data); + break; + } case CF_CTRL_DATA_DONE_SEND: { struct HTTP *stream = data->req.p.http; ssize_t sent; stream->upload_done = TRUE; sent = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id, NULL, 0, TRUE); + DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] send_body FINISHED", + stream->stream3_id)); if(sent < 0) return CURLE_SEND_ERROR; break; } - case CF_CTRL_DATA_DONE: { - struct HTTP *stream = data->req.p.http; - DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] easy handle is %s", - stream->stream3_id, arg1? "cancelled" : "done")); + case CF_CTRL_DATA_IDLE: + /* anything to do? */ break; - } case CF_CTRL_CONN_REPORT_STATS: - if(cf->sockindex == FIRSTSOCKET) - Curl_pgrsTimeWas(data, TIMER_APPCONNECT, ctx->handshake_done); + if(cf->sockindex == FIRSTSOCKET) { + if(ctx->got_first_byte) + Curl_pgrsTimeWas(data, TIMER_CONNECT, ctx->first_byte_at); + Curl_pgrsTimeWas(data, TIMER_APPCONNECT, ctx->handshake_at); + } break; default: break; @@ -882,11 +1092,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, CURLcode result; const struct Curl_sockaddr_ex *sockaddr; - result = Curl_cf_socket_peek(cf->next, data, &ctx->sockfd, - &sockaddr, NULL, NULL, NULL, NULL); - if(result) - return result; - DEBUGASSERT(ctx->sockfd != CURL_SOCKET_BAD); + DEBUGASSERT(ctx->q.sockfd != CURL_SOCKET_BAD); #ifdef DEBUG_QUICHE /* initialize debug log callback only once */ @@ -897,6 +1103,10 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, } #endif + result = vquic_ctx_init(&ctx->q, MAX_UDP_PAYLOAD_SIZE * MAX_PKT_BURST); + if(result) + return result; + ctx->cfg = quiche_config_new(QUICHE_PROTOCOL_VERSION); if(!ctx->cfg) { failf(data, "can't create quiche config"); @@ -933,16 +1143,18 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf, if(result) return result; - ctx->local_addrlen = sizeof(ctx->local_addr); - rv = getsockname(ctx->sockfd, (struct sockaddr *)&ctx->local_addr, - &ctx->local_addrlen); + Curl_cf_socket_peek(cf->next, data, &ctx->q.sockfd, + &sockaddr, NULL, NULL, NULL, NULL); + ctx->q.local_addrlen = sizeof(ctx->q.local_addr); + rv = getsockname(ctx->q.sockfd, (struct sockaddr *)&ctx->q.local_addr, + &ctx->q.local_addrlen); if(rv == -1) return CURLE_QUIC_CONNECT_ERROR; ctx->qconn = quiche_conn_new_with_tls((const uint8_t *)ctx->scid, sizeof(ctx->scid), NULL, 0, - (struct sockaddr *)&ctx->local_addr, - ctx->local_addrlen, + (struct sockaddr *)&ctx->q.local_addr, + ctx->q.local_addrlen, &sockaddr->sa_addr, sockaddr->addrlen, ctx->cfg, ctx->ssl, false); if(!ctx->qconn) { @@ -1016,7 +1228,10 @@ static CURLcode cf_quiche_connect(struct Curl_cfilter *cf, result = cf_connect_start(cf, data); if(result) goto out; - ctx->connect_started = now; + ctx->started_at = now; + result = cf_flush_egress(cf, data); + /* we do not expect to be able to recv anything yet */ + goto out; } result = cf_process_ingress(cf, data); @@ -1029,8 +1244,8 @@ static CURLcode cf_quiche_connect(struct Curl_cfilter *cf, if(quiche_conn_is_established(ctx->qconn)) { DEBUGF(LOG_CF(data, cf, "handshake complete after %dms", - (int)Curl_timediff(now, ctx->connect_started))); - ctx->handshake_done = now; + (int)Curl_timediff(now, ctx->started_at))); + ctx->handshake_at = now; result = cf_verify_peer(cf, data); if(!result) { DEBUGF(LOG_CF(data, cf, "peer verified")); @@ -1124,10 +1339,13 @@ static CURLcode cf_quiche_query(struct Curl_cfilter *cf, return CURLE_OK; } case CF_QUERY_CONNECT_REPLY_MS: - *pres1 = ctx->first_reply_ms; - DEBUGF(LOG_CF(data, cf, "query connect reply: %dms", *pres1)); + if(ctx->got_first_byte) { + timediff_t ms = Curl_timediff(ctx->first_byte_at, ctx->started_at); + *pres1 = (ms < INT_MAX)? (int)ms : INT_MAX; + } + else + *pres1 = -1; return CURLE_OK; - default: break; } diff --git a/lib/vquic/vquic.c b/lib/vquic/vquic.c index 6cd42f70e0..43872bf0d2 100644 --- a/lib/vquic/vquic.c +++ b/lib/vquic/vquic.c @@ -29,11 +29,13 @@ #endif #include "urldata.h" #include "dynbuf.h" +#include "cfilters.h" #include "curl_log.h" #include "curl_msh3.h" #include "curl_ngtcp2.h" #include "curl_quiche.h" #include "vquic.h" +#include "vquic_int.h" /* The last 3 #include files should be in this order */ #include "curl_printf.h" @@ -60,6 +62,220 @@ void Curl_quic_ver(char *p, size_t len) #endif } +CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen) +{ + qctx->num_blocked_pkt = 0; + qctx->num_blocked_pkt_sent = 0; + memset(&qctx->blocked_pkt, 0, sizeof(qctx->blocked_pkt)); + + qctx->pktbuflen = pktbuflen; + qctx->pktbuf = malloc(qctx->pktbuflen); + if(!qctx->pktbuf) + return CURLE_OUT_OF_MEMORY; + +#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG) + qctx->no_gso = FALSE; +#else + qctx->no_gso = TRUE; +#endif + + return CURLE_OK; +} + +void vquic_ctx_free(struct cf_quic_ctx *qctx) +{ + free(qctx->pktbuf); + qctx->pktbuf = NULL; +} + +static CURLcode send_packet_no_gso(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, + size_t gsolen, size_t *psent); + +static CURLcode do_sendmsg(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, size_t gsolen, + size_t *psent) +{ +#ifdef HAVE_SENDMSG + struct iovec msg_iov; + struct msghdr msg = {0}; + ssize_t sent; +#if defined(__linux__) && defined(UDP_SEGMENT) + uint8_t msg_ctrl[32]; + struct cmsghdr *cm; +#endif + + *psent = 0; + msg_iov.iov_base = (uint8_t *)pkt; + msg_iov.iov_len = pktlen; + msg.msg_iov = &msg_iov; + msg.msg_iovlen = 1; + +#if defined(__linux__) && defined(UDP_SEGMENT) + if(pktlen > gsolen) { + /* Only set this, when we need it. macOS, for example, + * does not seem to like a msg_control of length 0. */ + msg.msg_control = msg_ctrl; + assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t))); + msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); + cm = CMSG_FIRSTHDR(&msg); + cm->cmsg_level = SOL_UDP; + cm->cmsg_type = UDP_SEGMENT; + cm->cmsg_len = CMSG_LEN(sizeof(uint16_t)); + *(uint16_t *)(void *)CMSG_DATA(cm) = gsolen & 0xffff; + } +#endif + + + while((sent = sendmsg(qctx->sockfd, &msg, 0)) == -1 && SOCKERRNO == EINTR) + ; + + if(sent == -1) { + switch(SOCKERRNO) { + case EAGAIN: +#if EAGAIN != EWOULDBLOCK + case EWOULDBLOCK: +#endif + return CURLE_AGAIN; + case EMSGSIZE: + /* UDP datagram is too large; caused by PMTUD. Just let it be lost. */ + break; + case EIO: + if(pktlen > gsolen) { + /* GSO failure */ + failf(data, "sendmsg() returned %zd (errno %d); disable GSO", sent, + SOCKERRNO); + qctx->no_gso = TRUE; + return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent); + } + /* FALLTHROUGH */ + default: + failf(data, "sendmsg() returned %zd (errno %d)", sent, SOCKERRNO); + return CURLE_SEND_ERROR; + } + } + else { + assert(pktlen == (size_t)sent); + } +#else + ssize_t sent; + (void)gsolen; + + *psent = 0; + + while((sent = send(qctx->sockfd, (const char *)pkt, pktlen, 0)) == -1 && + SOCKERRNO == EINTR) + ; + + if(sent == -1) { + if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) { + return CURLE_AGAIN; + } + else { + failf(data, "send() returned %zd (errno %d)", sent, SOCKERRNO); + if(SOCKERRNO != EMSGSIZE) { + return CURLE_SEND_ERROR; + } + /* UDP datagram is too large; caused by PMTUD. Just let it be + lost. */ + } + } +#endif + (void)cf; + *psent = pktlen; + + return CURLE_OK; +} + +static CURLcode send_packet_no_gso(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, + size_t gsolen, size_t *psent) +{ + const uint8_t *p, *end = pkt + pktlen; + size_t sent; + + *psent = 0; + + for(p = pkt; p < end; p += gsolen) { + size_t len = CURLMIN(gsolen, (size_t)(end - p)); + CURLcode curlcode = do_sendmsg(cf, data, qctx, p, len, len, &sent); + if(curlcode != CURLE_OK) { + return curlcode; + } + *psent += sent; + } + + return CURLE_OK; +} + +CURLcode vquic_send_packet(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, size_t gsolen, + size_t *psent) +{ + if(qctx->no_gso && pktlen > gsolen) { + return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent); + } + + return do_sendmsg(cf, data, qctx, pkt, pktlen, gsolen, psent); +} + + + +void vquic_push_blocked_pkt(struct Curl_cfilter *cf, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, size_t gsolen) +{ + struct vquic_blocked_pkt *blkpkt; + + (void)cf; + assert(qctx->num_blocked_pkt < + sizeof(qctx->blocked_pkt) / sizeof(qctx->blocked_pkt[0])); + + blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt++]; + + blkpkt->pkt = pkt; + blkpkt->pktlen = pktlen; + blkpkt->gsolen = gsolen; +} + +CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx) +{ + size_t sent; + CURLcode curlcode; + struct vquic_blocked_pkt *blkpkt; + + (void)cf; + for(; qctx->num_blocked_pkt_sent < qctx->num_blocked_pkt; + ++qctx->num_blocked_pkt_sent) { + blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt_sent]; + curlcode = vquic_send_packet(cf, data, qctx, blkpkt->pkt, + blkpkt->pktlen, blkpkt->gsolen, &sent); + + if(curlcode) { + if(curlcode == CURLE_AGAIN) { + blkpkt->pkt += sent; + blkpkt->pktlen -= sent; + } + return curlcode; + } + } + + qctx->num_blocked_pkt = 0; + qctx->num_blocked_pkt_sent = 0; + + return CURLE_OK; +} + /* * If the QLOGDIR environment variable is set, open and return a file * descriptor to write the log to. diff --git a/lib/vquic/vquic_int.h b/lib/vquic/vquic_int.h index 9db4808d26..42aba39b06 100644 --- a/lib/vquic/vquic_int.h +++ b/lib/vquic/vquic_int.h @@ -28,6 +28,45 @@ #ifdef ENABLE_QUIC +struct vquic_blocked_pkt { + const uint8_t *pkt; + size_t pktlen; + size_t gsolen; +}; + +struct cf_quic_ctx { + curl_socket_t sockfd; + struct sockaddr_storage local_addr; + socklen_t local_addrlen; + struct vquic_blocked_pkt blocked_pkt[2]; + uint8_t *pktbuf; + /* the number of entries in blocked_pkt */ + size_t num_blocked_pkt; + size_t num_blocked_pkt_sent; + /* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */ + size_t pktbuflen; + /* the number of processed entries in blocked_pkt */ + bool no_gso; +}; + +CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen); +void vquic_ctx_free(struct cf_quic_ctx *qctx); + +CURLcode vquic_send_packet(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, size_t gsolen, + size_t *psent); + +void vquic_push_blocked_pkt(struct Curl_cfilter *cf, + struct cf_quic_ctx *qctx, + const uint8_t *pkt, size_t pktlen, size_t gsolen); + +CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct cf_quic_ctx *qctx); + + #endif /* !ENABLE_QUIC */ #endif /* HEADER_CURL_VQUIC_QUIC_INT_H */ diff --git a/tests/tests-httpd/config.ini.in b/tests/tests-httpd/config.ini.in index b4069376e7..3b1d28de84 100644 --- a/tests/tests-httpd/config.ini.in +++ b/tests/tests-httpd/config.ini.in @@ -40,4 +40,5 @@ nghttpx = @HTTPD_NGHTTPX@ [caddy] caddy = @CADDY@ -port = 5004 +http_port = 5003 +https_port = 5004 diff --git a/tests/tests-httpd/scorecard.py b/tests/tests-httpd/scorecard.py index 023336f7b1..7d64528c24 100644 --- a/tests/tests-httpd/scorecard.py +++ b/tests/tests-httpd/scorecard.py @@ -213,31 +213,33 @@ class ScoreCard: self.info(f'\n') return props - def downloads(self, proto: str) -> Dict[str, Any]: + def downloads(self, proto: str, test_httpd: bool = True, + test_caddy: bool = True) -> Dict[str, Any]: scores = {} - if proto == 'h3': - port = self.env.h3_port - via = 'nghttpx' - descr = f'port {port}, proxying httpd' - else: - port = self.env.https_port - via = 'httpd' - descr = f'port {port}' - self.info('httpd downloads\n') - self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score1.data', fsize=1024*1024) - url1 = f'https://{self.env.domain1}:{port}/score1.data' - self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score10.data', fsize=10*1024*1024) - url10 = f'https://{self.env.domain1}:{port}/score10.data' - self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score100.data', fsize=100*1024*1024) - url100 = f'https://{self.env.domain1}:{port}/score100.data' - scores[via] = { - 'description': descr, - '1MB-local': self.download_url(url=url1, proto=proto, count=50), - '10MB-local': self.download_url(url=url10, proto=proto, count=50), - '100MB-local': self.download_url(url=url100, proto=proto, count=50), - } - if self.caddy: - port = self.env.caddy_port + if test_httpd: + if proto == 'h3': + port = self.env.h3_port + via = 'nghttpx' + descr = f'port {port}, proxying httpd' + else: + port = self.env.https_port + via = 'httpd' + descr = f'port {port}' + self.info(f'{via} downloads\n') + self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score1.data', fsize=1024*1024) + url1 = f'https://{self.env.domain1}:{port}/score1.data' + self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score10.data', fsize=10*1024*1024) + url10 = f'https://{self.env.domain1}:{port}/score10.data' + self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score100.data', fsize=100*1024*1024) + url100 = f'https://{self.env.domain1}:{port}/score100.data' + scores[via] = { + 'description': descr, + '1MB-local': self.download_url(url=url1, proto=proto, count=50), + '10MB-local': self.download_url(url=url10, proto=proto, count=50), + '100MB-local': self.download_url(url=url100, proto=proto, count=50), + } + if test_caddy and self.caddy: + port = self.caddy.port via = 'caddy' descr = f'port {port}' self.info('caddy downloads\n') @@ -255,7 +257,11 @@ class ScoreCard: } return scores - def score_proto(self, proto: str, handshakes: bool = True, downloads: bool = True): + def score_proto(self, proto: str, + handshakes: bool = True, + downloads: bool = True, + test_httpd: bool = True, + test_caddy: bool = True): self.info(f"scoring {proto}\n") p = {} if proto == 'h3': @@ -289,7 +295,9 @@ class ScoreCard: if handshakes: score['handshakes'] = self.handshakes(proto=proto) if downloads: - score['downloads'] = self.downloads(proto=proto) + score['downloads'] = self.downloads(proto=proto, + test_httpd=test_httpd, + test_caddy=test_caddy) self.info("\n") return score @@ -335,6 +343,10 @@ class ScoreCard: help="print text instead of json") parser.add_argument("-d", "--downloads", action='store_true', default=False, help="evaluate downloads only") + parser.add_argument("--httpd", action='store_true', default=False, + help="evaluate httpd server only") + parser.add_argument("--caddy", action='store_true', default=False, + help="evaluate caddy server only") parser.add_argument("protocols", nargs='*', help="Name(s) of protocol to score") args = parser.parse_args() @@ -348,8 +360,16 @@ class ScoreCard: protocols = args.protocols if len(args.protocols) else ['h2', 'h3'] handshakes = True downloads = True + test_httpd = True + test_caddy = True if args.downloads: handshakes = False + if args.caddy: + test_caddy = True + test_httpd = False + if args.httpd: + test_caddy = False + test_httpd = True rv = 0 self.env = Env() @@ -372,7 +392,10 @@ class ScoreCard: assert self.caddy.start() for p in protocols: - score = self.score_proto(proto=p, handshakes=handshakes, downloads=downloads) + score = self.score_proto(proto=p, handshakes=handshakes, + downloads=downloads, + test_caddy=test_caddy, + test_httpd=test_httpd) if args.text: self.print_score(score) else: diff --git a/tests/tests-httpd/test_02_download.py b/tests/tests-httpd/test_02_download.py index 615de719ea..5d9f2d9496 100644 --- a/tests/tests-httpd/test_02_download.py +++ b/tests/tests-httpd/test_02_download.py @@ -42,13 +42,22 @@ class TestDownload: def _class_scope(self, env, httpd, nghttpx): if env.have_h3(): nghttpx.start_if_needed() - fpath = os.path.join(httpd.docs_dir, 'data-1mb.data') + + def _make_docs_file(self, docs_dir: str, fname: str, fsize: int): + fpath = os.path.join(docs_dir, fname) data1k = 1024*'x' + flen = 0 with open(fpath, 'w') as fd: - fsize = 0 - while fsize < 1024*1024: + while flen < fsize: fd.write(data1k) - fsize += len(data1k) + flen += len(data1k) + return flen + + @pytest.fixture(autouse=True, scope='class') + def _class_scope(self, env, httpd): + self._make_docs_file(docs_dir=httpd.docs_dir, fname='data1.data', fsize=1024*1024) + self._make_docs_file(docs_dir=httpd.docs_dir, fname='data10.data', fsize=10*1024*1024) + self._make_docs_file(docs_dir=httpd.docs_dir, fname='data100.data', fsize=100*1024*1024) # download 1 file @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) @@ -163,8 +172,8 @@ class TestDownload: @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) def test_02_08_1MB_serial(self, env: Env, httpd, nghttpx, repeat, proto): - count = 2 - urln = f'https://{env.authority_for(env.domain1, proto)}/data-1mb.data?[0-{count-1}]' + count = 20 + urln = f'https://{env.authority_for(env.domain1, proto)}/data1.data?[0-{count-1}]' curl = CurlClient(env=env) r = curl.http_download(urls=[urln], alpn_proto=proto) assert r.exit_code == 0 @@ -173,8 +182,30 @@ class TestDownload: @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) def test_02_09_1MB_parallel(self, env: Env, httpd, nghttpx, repeat, proto): - count = 2 - urln = f'https://{env.authority_for(env.domain1, proto)}/data-1mb.data?[0-{count-1}]' + count = 20 + urln = f'https://{env.authority_for(env.domain1, proto)}/data1.data?[0-{count-1}]' + curl = CurlClient(env=env) + r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[ + '--parallel' + ]) + assert r.exit_code == 0 + r.check_stats(count=count, exp_status=200) + + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_02_10_10MB_serial(self, env: Env, + httpd, nghttpx, repeat, proto): + count = 20 + urln = f'https://{env.authority_for(env.domain1, proto)}/data10.data?[0-{count-1}]' + curl = CurlClient(env=env) + r = curl.http_download(urls=[urln], alpn_proto=proto) + assert r.exit_code == 0 + r.check_stats(count=count, exp_status=200) + + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_02_11_10MB_parallel(self, env: Env, + httpd, nghttpx, repeat, proto): + count = 20 + urln = f'https://{env.authority_for(env.domain1, proto)}/data10.data?[0-{count-1}]' curl = CurlClient(env=env) r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[ '--parallel' diff --git a/tests/tests-httpd/test_05_errors.py b/tests/tests-httpd/test_05_errors.py index bb6e9217f1..a95e25536e 100644 --- a/tests/tests-httpd/test_05_errors.py +++ b/tests/tests-httpd/test_05_errors.py @@ -73,6 +73,8 @@ class TestErrors: proto): if proto == 'h3' and not env.have_h3(): pytest.skip("h3 not supported") + if proto == 'h3' and env.curl_uses_lib('quiche'): + pytest.skip("quiche not reliable, sometimes reports success") count = 5 curl = CurlClient(env=env) urln = f'https://{env.authority_for(env.domain1, proto)}' \ diff --git a/tests/tests-httpd/test_07_upload.py b/tests/tests-httpd/test_07_upload.py index aec403cc21..45a6b659db 100644 --- a/tests/tests-httpd/test_07_upload.py +++ b/tests/tests-httpd/test_07_upload.py @@ -90,9 +90,26 @@ class TestUpload: respdata = open(curl.response_file(i)).readlines() assert respdata == [data] + # upload data parallel, check that they were echoed + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_07_11_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto): + if proto == 'h3' and not env.have_h3(): + pytest.skip("h3 not supported") + count = 50 + data = '0123456789' + curl = CurlClient(env=env) + url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]' + r = curl.http_upload(urls=[url], data=data, alpn_proto=proto, + extra_args=['--parallel']) + assert r.exit_code == 0, f'{r}' + r.check_stats(count=count, exp_status=200) + for i in range(count): + respdata = open(curl.response_file(i)).readlines() + assert respdata == [data] + # upload large data sequentially, check that this is what was echoed @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) - def test_07_11_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto): + def test_07_20_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto): if proto == 'h3' and not env.have_h3(): pytest.skip("h3 not supported") fdata = os.path.join(env.gen_dir, 'data-100k') @@ -149,9 +166,9 @@ class TestUpload: if proto == 'h3' and not env.have_h3(): pytest.skip("h3 not supported") if proto == 'h3' and env.curl_uses_lib('quiche'): - pytest.skip("quiche stalls on parallel, large uploads") + pytest.skip("quiche stalls on parallel, large uploads, unless --trace is used???") fdata = os.path.join(env.gen_dir, 'data-100k') - count = 3 + count = 50 curl = CurlClient(env=env) url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]' r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto, diff --git a/tests/tests-httpd/test_08_caddy.py b/tests/tests-httpd/test_08_caddy.py new file mode 100644 index 0000000000..67a5f77be7 --- /dev/null +++ b/tests/tests-httpd/test_08_caddy.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +#*************************************************************************** +# _ _ ____ _ +# Project ___| | | | _ \| | +# / __| | | | |_) | | +# | (__| |_| | _ <| |___ +# \___|\___/|_| \_\_____| +# +# Copyright (C) 2008 - 2022, Daniel Stenberg, , et al. +# +# This software is licensed as described in the file COPYING, which +# you should have received as part of this distribution. The terms +# are also available at https://curl.se/docs/copyright.html. +# +# You may opt to use, copy, modify, merge, publish, distribute and/or sell +# copies of the Software, and permit persons to whom the Software is +# furnished to do so, under the terms of the COPYING file. +# +# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY +# KIND, either express or implied. +# +# SPDX-License-Identifier: curl +# +########################################################################### +# +import logging +import os +import pytest + +from testenv import Env, CurlClient, Caddy + + +log = logging.getLogger(__name__) + + +@pytest.mark.skipif(condition=not Env.has_caddy(), reason=f"missing caddy") +class TestCaddy: + + @pytest.fixture(autouse=True, scope='class') + def caddy(self, env): + caddy = Caddy(env=env) + assert caddy.start() + yield caddy + caddy.stop() + + def _make_docs_file(self, docs_dir: str, fname: str, fsize: int): + fpath = os.path.join(docs_dir, fname) + data1k = 1024*'x' + flen = 0 + with open(fpath, 'w') as fd: + while flen < fsize: + fd.write(data1k) + flen += len(data1k) + return flen + + @pytest.fixture(autouse=True, scope='class') + def _class_scope(self, env, caddy): + self._make_docs_file(docs_dir=caddy.docs_dir, fname='data1.data', fsize=1024*1024) + self._make_docs_file(docs_dir=caddy.docs_dir, fname='data10.data', fsize=10*1024*1024) + self._make_docs_file(docs_dir=caddy.docs_dir, fname='data100.data', fsize=100*1024*1024) + + # download 1 file + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_08_01_download_1(self, env: Env, caddy: Caddy, repeat, proto): + if proto == 'h3' and not env.have_h3_curl(): + pytest.skip("h3 not supported in curl") + curl = CurlClient(env=env) + url = f'https://{env.domain1}:{caddy.port}/data.json' + r = curl.http_download(urls=[url], alpn_proto=proto) + assert r.exit_code == 0, f'{r}' + r.check_stats(count=1, exp_status=200) + + # download 1MB files sequentially + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_08_02_download_1mb_sequential(self, env: Env, caddy: Caddy, + repeat, proto): + if proto == 'h3' and not env.have_h3_curl(): + pytest.skip("h3 not supported in curl") + count = 50 + curl = CurlClient(env=env) + urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]' + r = curl.http_download(urls=[urln], alpn_proto=proto) + assert r.exit_code == 0 + r.check_stats(count=count, exp_status=200) + # sequential transfers will open 1 connection + assert r.total_connects == 1 + + # download 1MB files parallel + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_08_03_download_1mb_parallel(self, env: Env, caddy: Caddy, + repeat, proto): + if proto == 'h3' and not env.have_h3_curl(): + pytest.skip("h3 not supported in curl") + count = 50 + curl = CurlClient(env=env) + urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]' + r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[ + '--parallel' + ]) + assert r.exit_code == 0 + r.check_stats(count=count, exp_status=200) + if proto == 'http/1.1': + # http/1.1 parallel transfers will open multiple connections + assert r.total_connects > 1 + else: + assert r.total_connects == 1 + + # download 10MB files sequentially + @pytest.mark.parametrize("proto", ['h2', 'h3']) + def test_08_04_download_10mb_sequential(self, env: Env, caddy: Caddy, + repeat, proto): + if proto == 'h3' and not env.have_h3_curl(): + pytest.skip("h3 not supported in curl") + if proto == 'h3' and env.curl_uses_lib('quiche'): + pytest.skip("quiche stalls after a certain amount of data") + count = 20 + curl = CurlClient(env=env) + urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]' + r = curl.http_download(urls=[urln], alpn_proto=proto) + assert r.exit_code == 0 + r.check_stats(count=count, exp_status=200) + # sequential transfers will open 1 connection + assert r.total_connects == 1 + + # download 10MB files parallel + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_08_05_download_1mb_parallel(self, env: Env, caddy: Caddy, + repeat, proto): + if proto == 'h3' and not env.have_h3_curl(): + pytest.skip("h3 not supported in curl") + if proto == 'h3' and env.curl_uses_lib('quiche'): + pytest.skip("quiche stalls after a certain amount of data") + count = 50 + curl = CurlClient(env=env) + urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]' + r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[ + '--parallel' + ]) + assert r.exit_code == 0 + r.check_stats(count=count, exp_status=200) + if proto == 'http/1.1': + # http/1.1 parallel transfers will open multiple connections + assert r.total_connects > 1 + else: + assert r.total_connects == 1 + diff --git a/tests/tests-httpd/testenv/caddy.py b/tests/tests-httpd/testenv/caddy.py index 23fb4ec7a1..c97cf661d3 100644 --- a/tests/tests-httpd/testenv/caddy.py +++ b/tests/tests-httpd/testenv/caddy.py @@ -55,6 +55,10 @@ class Caddy: def docs_dir(self): return self._docs_dir + @property + def port(self) -> str: + return self.env.caddy_https_port + def clear_logs(self): self._rmf(self._error_log) @@ -105,7 +109,7 @@ class Caddy: curl = CurlClient(env=self.env, run_dir=self._tmp_dir) try_until = datetime.now() + timeout while datetime.now() < try_until: - check_url = f'https://{self.env.domain1}:{self.env.caddy_port}/' + check_url = f'https://{self.env.domain1}:{self.port}/' r = curl.http_get(url=check_url) if r.exit_code != 0: return True @@ -118,7 +122,7 @@ class Caddy: curl = CurlClient(env=self.env, run_dir=self._tmp_dir) try_until = datetime.now() + timeout while datetime.now() < try_until: - check_url = f'https://{self.env.domain1}:{self.env.caddy_port}/' + check_url = f'https://{self.env.domain1}:{self.port}/' r = curl.http_get(url=check_url) if r.exit_code == 0: return True @@ -149,12 +153,13 @@ class Caddy: with open(self._conf_file, 'w') as fd: conf = [ # base server config f'{{', - f' https_port {self.env.caddy_port}', - f' servers :{self.env.caddy_port} {{', + f' http_port {self.env.caddy_http_port}', + f' https_port {self.env.caddy_https_port}', + f' servers :{self.env.caddy_https_port} {{', f' protocols h3 h2 h1', f' }}', f'}}', - f'{domain1}:{self.env.caddy_port} {{', + f'{domain1}:{self.env.caddy_https_port} {{', f' file_server * {{', f' root {self._docs_dir}', f' }}', diff --git a/tests/tests-httpd/testenv/curl.py b/tests/tests-httpd/testenv/curl.py index 40dadb0a34..a2b538e80f 100644 --- a/tests/tests-httpd/testenv/curl.py +++ b/tests/tests-httpd/testenv/curl.py @@ -319,6 +319,8 @@ class CurlClient: args = [self._curl, "-s", "--path-as-is"] if with_headers: args.extend(["-D", self._headerfile]) + if self.env.verbose > 1: + args.extend(['--trace', self._tracefile]) if self.env.verbose > 2: args.extend(['--trace', self._tracefile, '--trace-time']) diff --git a/tests/tests-httpd/testenv/env.py b/tests/tests-httpd/testenv/env.py index 0acebe2230..83d3cce4c7 100644 --- a/tests/tests-httpd/testenv/env.py +++ b/tests/tests-httpd/testenv/env.py @@ -136,8 +136,8 @@ class EnvConfig: log.debug(f'nghttpx -v: {p.stdout}') self.caddy = self.config['caddy']['caddy'] - if len(self.caddy) == 0: - self.caddy = 'caddy' + if len(self.caddy.strip()) == 0: + self.caddy = None if self.caddy is not None: try: p = subprocess.run(args=[self.caddy, 'version'], @@ -147,7 +147,8 @@ class EnvConfig: self.caddy = None except: self.caddy = None - self.caddy_port = self.config['caddy']['port'] + self.caddy_http_port = self.config['caddy']['http_port'] + self.caddy_https_port = self.config['caddy']['https_port'] @property def httpd_version(self): @@ -241,6 +242,10 @@ class Env: def httpd_is_at_least(minv) -> bool: return Env.CONFIG.httpd_is_at_least(minv) + @staticmethod + def has_caddy() -> bool: + return Env.CONFIG.caddy is not None + def __init__(self, pytestconfig=None): self._verbose = pytestconfig.option.verbose \ if pytestconfig is not None else 0 @@ -306,8 +311,12 @@ class Env: return self.CONFIG.caddy @property - def caddy_port(self) -> str: - return self.CONFIG.caddy_port + def caddy_https_port(self) -> str: + return self.CONFIG.caddy_https_port + + @property + def caddy_http_port(self) -> str: + return self.CONFIG.caddy_http_port @property def curl(self) -> str: diff --git a/tests/tests-httpd/testenv/nghttpx.py b/tests/tests-httpd/testenv/nghttpx.py index 100cf7372e..cc163dcb33 100644 --- a/tests/tests-httpd/testenv/nghttpx.py +++ b/tests/tests-httpd/testenv/nghttpx.py @@ -160,7 +160,9 @@ class Nghttpx: try_until = datetime.now() + timeout while datetime.now() < try_until: check_url = f'https://{self.env.domain1}:{self.env.h3_port}/' - r = curl.http_get(url=check_url, extra_args=['--http3-only']) + r = curl.http_get(url=check_url, extra_args=[ + '--http3-only', '--trace', 'curl.trace', '--trace-time' + ]) if r.exit_code == 0: return True log.debug(f'waiting for nghttpx to become responsive: {r}')