bool upload_done;
#endif /* ENABLE_QUIC */
#ifdef USE_NGHTTP3
- size_t unacked_window;
+ size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
struct h3out *h3out; /* per-stream buffers for upload */
struct dynbuf overflow; /* excess data received during a single Curl_read */
#endif /* USE_NGHTTP3 */
#ifdef USE_QUICHE
bool h3_got_header; /* TRUE when h3 stream has recvd some HEADER */
bool h3_recving_data; /* TRUE when h3 stream is reading DATA */
+ bool h3_body_pending; /* TRUE when h3 stream may have more body DATA */
+ struct h3_event_node *pending;
#endif /* USE_QUICHE */
};
#include "dynbuf.h"
#include "select.h"
#include "vquic.h"
+#include "vquic_int.h"
#include "h2h3.h"
#include "vtls/keylog.h"
#include "vtls/vtls.h"
ng2->version_str, ht3->version_str);
}
-struct blocked_pkt {
- const uint8_t *pkt;
- size_t pktlen;
- size_t gsolen;
-};
-
struct cf_ngtcp2_ctx {
- curl_socket_t sockfd;
- struct sockaddr_storage local_addr;
- socklen_t local_addrlen;
+ struct cf_quic_ctx q;
+ ngtcp2_path connected_path;
ngtcp2_conn *qconn;
ngtcp2_cid dcid;
ngtcp2_cid scid;
WOLFSSL_CTX *sslctx;
WOLFSSL *ssl;
#endif
- bool no_gso;
- uint8_t *pktbuf;
- size_t pktbuflen;
- /* the number of entries in blocked_pkt */
- size_t num_blocked_pkt;
- /* the number of processed entries in blocked_pkt */
- size_t num_blocked_pkt_sent;
- /* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */
- struct blocked_pkt blocked_pkt[2];
-
struct cf_call_data call_data;
nghttp3_conn *h3conn;
nghttp3_settings h3settings;
{
ngtcp2_settings *s = &ctx->settings;
ngtcp2_transport_params *t = &ctx->transport_params;
+ size_t stream_win_size = CURL_MAX_READ_SIZE;
+
ngtcp2_settings_default(s);
ngtcp2_transport_params_default(t);
#ifdef DEBUG_NGTCP2
#else
s->log_printf = NULL;
#endif
+
+ (void)data;
s->initial_ts = timestamp();
- t->initial_max_stream_data_bidi_local = data->set.buffer_size;
- t->initial_max_stream_data_bidi_remote = QUIC_MAX_STREAMS;
- t->initial_max_stream_data_uni = QUIC_MAX_STREAMS;
- t->initial_max_data = QUIC_MAX_DATA;
- t->initial_max_streams_bidi = 1;
- t->initial_max_streams_uni = 3;
+ s->handshake_timeout = NGTCP2_DEFAULT_HANDSHAKE_TIMEOUT;
+ s->max_window = 100 * stream_win_size;
+ s->max_stream_window = stream_win_size;
+
+ t->initial_max_data = 10 * stream_win_size;
+ t->initial_max_stream_data_bidi_local = stream_win_size;
+ t->initial_max_stream_data_bidi_remote = stream_win_size;
+ t->initial_max_stream_data_uni = stream_win_size;
+ t->initial_max_streams_bidi = QUIC_MAX_STREAMS;
+ t->initial_max_streams_uni = QUIC_MAX_STREAMS;
t->max_idle_timeout = QUIC_IDLE_TIMEOUT;
if(ctx->qlogfd != -1) {
s->qlog.write = qlog_callback;
return 0;
}
-static void extend_stream_window(ngtcp2_conn *tconn,
- struct HTTP *stream)
+static void report_consumed_data(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ size_t consumed)
{
- size_t thismuch = stream->unacked_window;
- ngtcp2_conn_extend_max_stream_offset(tconn, stream->stream3_id, thismuch);
- ngtcp2_conn_extend_max_offset(tconn, thismuch);
- stream->unacked_window = 0;
-}
+ struct HTTP *stream = data->req.p.http;
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ /* the HTTP/1.1 response headers are written to the buffer, but
+ * consuming those does not count against flow control. */
+ if(stream->recv_buf_nonflow) {
+ if(consumed >= stream->recv_buf_nonflow) {
+ consumed -= stream->recv_buf_nonflow;
+ stream->recv_buf_nonflow = 0;
+ }
+ else {
+ stream->recv_buf_nonflow -= consumed;
+ consumed = 0;
+ }
+ }
+ if(consumed > 0) {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] consumed %zu DATA bytes",
+ stream->stream3_id, consumed));
+ ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream->stream3_id,
+ consumed);
+ ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
+ }
+ if(!stream->closed && data->state.drain
+ && !stream->memlen
+ && !Curl_dyn_len(&stream->overflow)) {
+ /* nothing buffered any more */
+ data->state.drain = 0;
+ }
+}
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
int64_t stream_id, uint64_t offset,
nconsumed =
nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin);
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] read_stream(len=%zu) -> %zd",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] read_stream(len=%zu) -> %zd",
stream_id, buflen, nconsumed));
if(nconsumed < 0) {
ngtcp2_connection_close_error_set_application_error(
}
static int cb_stream_close(ngtcp2_conn *tconn, uint32_t flags,
- int64_t stream_id, uint64_t app_error_code,
+ int64_t stream3_id, uint64_t app_error_code,
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
+ struct Curl_easy *data = stream_user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
int rv;
(void)tconn;
- (void)stream_user_data;
+ (void)data;
/* stream is closed... */
if(!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
app_error_code = NGHTTP3_H3_NO_ERROR;
}
- rv = nghttp3_conn_close_stream(ctx->h3conn, stream_id,
+ rv = nghttp3_conn_close_stream(ctx->h3conn, stream3_id,
app_error_code);
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] quic close(err=%"
+ PRIu64 ") -> %d", stream3_id, app_error_code, rv));
if(rv) {
ngtcp2_connection_close_error_set_application_error(
&ctx->last_error, nghttp3_err_infer_quic_app_error_code(rv), NULL, 0);
(void)data;
rv = nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream_id);
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] reset -> %d", stream_id, rv));
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv));
if(rv) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
- socks[0] = ctx->sockfd;
+ socks[0] = ctx->q.sockfd;
/* in an HTTP/3 connection we can basically always get a frame so we should
always be ready for one */
return rv;
}
+static void notify_drain(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ (void)cf;
+ if(!data->state.drain) {
+ data->state.drain = 1;
+ Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ }
+}
+
+
static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
uint64_t app_error_code, void *user_data,
void *stream_user_data)
(void)app_error_code;
(void)cf;
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] close(err=%" PRIx64 ")",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] h3 close(err=%" PRIx64 ")",
stream_id, app_error_code));
stream->closed = TRUE;
stream->error3 = app_error_code;
* the response before it was complete. */
stream->reset = TRUE;
}
- Curl_expire(data, 0, EXPIRE_QUIC);
- /* make sure that ngh3_stream_recv is called again to complete the transfer
- even if there are no more packets to be received from the server. */
- data->state.drain = 1;
+ notify_drain(cf, data);
return 0;
}
/*
- * write_data() copies data to the stream's receive buffer. If not enough
- * space is available in the receive buffer, it copies the rest to the
- * stream's overflow buffer.
+ * write_resp_raw() copies resonse data in raw format to the `data`'s
+ * receive buffer. If not enough space is available, it appends to the
+ * `data`'s overflow buffer.
*/
-static CURLcode write_data(struct HTTP *stream, const void *mem, size_t memlen)
+static CURLcode write_resp_raw(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ const void *mem, size_t memlen,
+ bool flow)
{
+ struct HTTP *stream = data->req.p.http;
CURLcode result = CURLE_OK;
const char *buf = mem;
size_t ncopy = memlen;
/* copy as much as possible to the receive buffer */
if(stream->len) {
size_t len = CURLMIN(ncopy, stream->len);
- memcpy(stream->mem, buf, len);
+ memcpy(stream->mem + stream->memlen, buf, len);
stream->len -= len;
stream->memlen += len;
- stream->mem += len;
buf += len;
ncopy -= len;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] resp_raw: added %zu bytes"
+ " to data buffer", stream->stream3_id, len));
}
/* copy the rest to the overflow buffer */
if(ncopy) {
result = Curl_dyn_addn(&stream->overflow, buf, ncopy);
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] resp_raw: added %zu bytes"
+ " to overflow buffer -> %d",
+ stream->stream3_id, ncopy, result));
+ notify_drain(cf, data);
+ }
+
+ if(!flow)
+ stream->recv_buf_nonflow += memlen;
+ if(CF_DATA_CURRENT(cf) != data) {
+ notify_drain(cf, data);
}
return result;
}
-static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream_id,
+static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
const uint8_t *buf, size_t buflen,
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct Curl_easy *data = stream_user_data;
- struct HTTP *stream = data->req.p.http;
- CURLcode result = CURLE_OK;
+ CURLcode result;
+
(void)conn;
- (void)cf;
+ (void)stream3_id;
- result = write_data(stream, buf, buflen);
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] recv_data(len=%zu) -> %d",
- stream_id, buflen, result));
- if(result) {
- return -1;
- }
- stream->unacked_window += buflen;
- (void)stream_id;
- (void)user_data;
- return 0;
+ result = write_resp_raw(cf, data, buf, buflen, TRUE);
+ return result? -1 : 0;
}
-static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
+static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream3_id,
size_t consumed, void *user_data,
void *stream_user_data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
(void)conn;
(void)stream_user_data;
- (void)stream_id;
- ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream_id, consumed);
+ /* nghttp3 has consumed bytes on the QUIC stream and we need to
+ * tell the QUIC connection to increase its flow control */
+ ngtcp2_conn_extend_max_stream_offset(ctx->qconn, stream3_id, consumed);
ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
return 0;
}
/* add a CRLF only if we've received some headers */
if(stream->firstheader) {
- result = write_data(stream, "\r\n", 2);
+ result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
if(result) {
return -1;
}
}
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] end_headers(status_code=%d",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] end_headers(status_code=%d",
stream_id, stream->status_code));
if(stream->status_code / 100 != 1) {
stream->bodystarted = TRUE;
if(token == NGHTTP3_QPACK_TOKEN__STATUS) {
char line[14]; /* status line is always 13 characters long */
size_t ncopy;
+
+ DEBUGASSERT(!stream->firstheader);
stream->status_code = decode_status_code(h3val.base, h3val.len);
DEBUGASSERT(stream->status_code != -1);
ncopy = msnprintf(line, sizeof(line), "HTTP/3 %03d \r\n",
stream->status_code);
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] status: %s",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] status: %s",
stream_id, line));
- result = write_data(stream, line, ncopy);
+ result = write_resp_raw(cf, data, line, ncopy, FALSE);
if(result) {
return -1;
}
+ stream->firstheader = TRUE;
}
else {
/* store as an HTTP1-style header */
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] header: %.*s: %.*s",
+ DEBUGASSERT(stream->firstheader);
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] header: %.*s: %.*s",
stream_id, (int)h3name.len, h3name.base,
(int)h3val.len, h3val.base));
- result = write_data(stream, h3name.base, h3name.len);
+ result = write_resp_raw(cf, data, h3name.base, h3name.len, FALSE);
if(result) {
return -1;
}
- result = write_data(stream, ": ", 2);
+ result = write_resp_raw(cf, data, ": ", 2, FALSE);
if(result) {
return -1;
}
- result = write_data(stream, h3val.base, h3val.len);
+ result = write_resp_raw(cf, data, h3val.base, h3val.len, FALSE);
if(result) {
return -1;
}
- result = write_data(stream, "\r\n", 2);
+ result = write_resp_raw(cf, data, "\r\n", 2, FALSE);
if(result) {
return -1;
}
}
-
- stream->firstheader = TRUE;
return 0;
}
rv = ngtcp2_conn_shutdown_stream_write(ctx->qconn, stream_id,
app_error_code);
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] reset -> %d", stream_id, rv));
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] reset -> %d", stream_id, rv));
if(rv && rv != NGTCP2_ERR_STREAM_NOT_FOUND) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
return result;
}
-static size_t drain_overflow_buffer(struct HTTP *stream)
+static void drain_overflow_buffer(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
+ struct HTTP *stream = data->req.p.http;
size_t overlen = Curl_dyn_len(&stream->overflow);
size_t ncopy = CURLMIN(overlen, stream->len);
+
+ (void)cf;
if(ncopy > 0) {
- memcpy(stream->mem, Curl_dyn_ptr(&stream->overflow), ncopy);
+ memcpy(stream->mem + stream->memlen,
+ Curl_dyn_ptr(&stream->overflow), ncopy);
stream->len -= ncopy;
- stream->mem += ncopy;
stream->memlen += ncopy;
if(ncopy != overlen)
/* make the buffer only keep the tail */
(void)Curl_dyn_tail(&stream->overflow, overlen - ncopy);
- else
+ else {
Curl_dyn_reset(&stream->overflow);
+ }
}
- return ncopy;
+}
+
+static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ CURLcode *err)
+{
+ struct HTTP *stream = data->req.p.http;
+ ssize_t nread = -1;
+
+ if(stream->reset) {
+ failf(data,
+ "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id);
+ *err = CURLE_PARTIAL_FILE;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, was reset -> %d",
+ stream->stream3_id, *err));
+ goto out;
+ }
+ else if(stream->error3 != NGHTTP3_H3_NO_ERROR) {
+ failf(data,
+ "HTTP/3 stream %" PRId64 " was not closed cleanly: (err 0x%" PRIx64
+ ")",
+ stream->stream3_id, stream->error3);
+ *err = CURLE_HTTP3;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed uncleanly"
+ " -> %d", stream->stream3_id, *err));
+ goto out;
+ }
+
+ if(!stream->bodystarted) {
+ failf(data,
+ "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting"
+ " all response header fields, treated as error",
+ stream->stream3_id);
+ *err = CURLE_HTTP3;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed incomplete"
+ " -> %d", stream->stream3_id, *err));
+ goto out;
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed ok"
+ " -> %d", stream->stream3_id, *err));
+ }
+ *err = CURLE_OK;
+ nread = 0;
+
+out:
+ data->state.drain = 0;
+ return nread;
}
/* incoming data frames on the h3 stream */
DEBUGASSERT(ctx->h3conn);
*err = CURLE_OK;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) start",
+ stream->stream3_id, len));
+ /* TODO: this implementation of response DATA buffering is fragile.
+ * It makes the following assumptions:
+ * - the `buf` passed here has the same lifetime as the easy handle
+ * - data returned in `buf` from this call is immediately used and `buf`
+ * can be overwritten during any handling of other transfers at
+ * this connection.
+ */
if(!stream->memlen) {
- /* remember where to store incoming data for this stream and how big the
- buffer is */
+ /* `buf` was not known before or is currently not used by stream,
+ * assign it (again). */
stream->mem = buf;
stream->len = len;
}
- /* else, there's data in the buffer already */
- /* if there's data in the overflow buffer from a previous call, copy as much
- as possible to the receive buffer before receiving more */
- drain_overflow_buffer(stream);
+ /* if there's data in the overflow buffer, move as much
+ as possible to the receive buffer now */
+ drain_overflow_buffer(cf, data);
if(cf_process_ingress(cf, data)) {
*err = CURLE_RECV_ERROR;
nread = -1;
goto out;
}
- if(cf_flush_egress(cf, data)) {
- *err = CURLE_SEND_ERROR;
- nread = -1;
- goto out;
- }
if(stream->memlen) {
nread = stream->memlen;
- /* data arrived */
/* reset to allow more data to come */
- stream->memlen = 0;
+ /* TODO: very brittle buffer use design:
+ * - stream->mem has now `nread` bytes of response data
+ * - we assume that the caller will use those immediately and
+ * we can overwrite that with new data on our next invocation from
+ * anywhere.
+ */
stream->mem = buf;
+ stream->memlen = 0;
stream->len = len;
/* extend the stream window with the data we're consuming and send out
any additional packets to tell the server that we can receive more */
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] recv, consumed %zd bytes",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv -> %zd bytes",
stream->stream3_id, nread));
- extend_stream_window(ctx->qconn, stream);
+ report_consumed_data(cf, data, nread);
if(cf_flush_egress(cf, data)) {
*err = CURLE_SEND_ERROR;
nread = -1;
- goto out;
}
goto out;
}
if(stream->closed) {
- if(stream->reset) {
- failf(data,
- "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id);
- *err = CURLE_PARTIAL_FILE;
- nread = -1;
- goto out;
- }
- else if(stream->error3 != NGHTTP3_H3_NO_ERROR) {
- failf(data,
- "HTTP/3 stream %" PRId64 " was not closed cleanly: (err 0x%" PRIx64
- ")",
- stream->stream3_id, stream->error3);
- *err = CURLE_HTTP3;
- nread = -1;
- goto out;
- }
-
- if(!stream->bodystarted) {
- failf(data,
- "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting"
- " all response header fields, treated as error",
- stream->stream3_id);
- *err = CURLE_HTTP3;
- nread = -1;
- goto out;
- }
-
- nread = 0;
+ nread = recv_closed_stream(cf, data, err);
goto out;
}
- DEBUGF(LOG_CF(data, cf, "cf_ngtcp2_recv returns EAGAIN"));
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv -> EAGAIN",
+ stream->stream3_id));
*err = CURLE_AGAIN;
nread = -1;
out:
+ if(cf_flush_egress(cf, data)) {
+ *err = CURLE_SEND_ERROR;
+ nread = -1;
+ goto out;
+ }
+
CF_DATA_RESTORE(cf, save);
return nread;
}
stream->stream3_id = stream3_id;
stream->h3req = TRUE;
Curl_dyn_init(&stream->overflow, CURL_MAX_READ_SIZE);
+ stream->recv_buf_nonflow = 0;
result = Curl_pseudo_headers(data, mem, len, NULL, &hreq);
if(result)
}
stream->h3out = h3out;
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] sending request %s, with_body=%d",
- stream->stream3_id, data->state.url, !!stream->upload_left));
rc = nghttp3_conn_submit_request(ctx->h3conn, stream->stream3_id,
nva, nheader, &data_reader, data);
if(rc)
break;
}
default:
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] sending request %s",
- stream->stream3_id, data->state.url));
stream->upload_left = 0; /* nothing left to send */
rc = nghttp3_conn_submit_request(ctx->h3conn, stream->stream3_id,
nva, nheader, NULL, data);
Curl_safefree(nva);
- infof(data, "Using HTTP/3 Stream ID: %" PRIx64 " (easy handle %p)",
+ infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)",
stream3_id, (void *)data);
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRIx64 "] opened for %s",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s",
stream3_id, data->state.url));
Curl_pseudo_free(hreq);
if(rc) {
switch(rc) {
case NGHTTP3_ERR_CONN_CLOSING:
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] failed to send, "
+ DEBUGF(LOG_CF(data, cf, "h3sid[%"PRId64"] failed to send, "
"connection is closing", stream->stream3_id));
result = CURLE_RECV_ERROR;
break;
default:
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] failed to send -> %d (%s)",
+ DEBUGF(LOG_CF(data, cf, "h3sid[%"PRId64"] failed to send -> %d (%s)",
stream->stream3_id, rc, ngtcp2_strerror(rc)));
result = CURLE_SEND_ERROR;
break;
int rv;
uint8_t buf[65536];
size_t bufsize = sizeof(buf);
+ size_t pktcount = 0, total_recvd = 0;
struct sockaddr_storage remote_addr;
socklen_t remote_addrlen;
ngtcp2_path path;
for(;;) {
remote_addrlen = sizeof(remote_addr);
- while((recvd = recvfrom(ctx->sockfd, (char *)buf, bufsize, 0,
+ while((recvd = recvfrom(ctx->q.sockfd, (char *)buf, bufsize, 0,
(struct sockaddr *)&remote_addr,
&remote_addrlen)) == -1 &&
SOCKERRNO == EINTR)
DEBUGF(LOG_CF(data, cf, "ingress, recvfrom -> EAGAIN"));
goto out;
}
- if(SOCKERRNO == ECONNREFUSED) {
+ if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
Curl_cf_socket_peek(cf->next, data, NULL, NULL,
ctx->got_first_byte = TRUE;
}
- ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->local_addr,
- ctx->local_addrlen);
+ ++pktcount;
+ total_recvd += recvd;
+
+ ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->q.local_addr,
+ ctx->q.local_addrlen);
ngtcp2_addr_init(&path.remote, (struct sockaddr *)&remote_addr,
remote_addrlen);
- DEBUGF(LOG_CF(data, cf, "ingress, recvd pkt of %zd bytes", recvd));
rv = ngtcp2_conn_read_pkt(ctx->qconn, &path, &pi, buf, recvd, ts);
if(rv) {
DEBUGF(LOG_CF(data, cf, "ingress, read_pkt -> %s",
}
out:
- return CURLE_OK;
-}
-
-static CURLcode do_sendmsg(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- const uint8_t *pkt, size_t pktlen, size_t gsolen,
- size_t *sent);
-
-static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- const uint8_t *pkt, size_t pktlen,
- size_t gsolen, size_t *psent)
-{
- const uint8_t *p, *end = pkt + pktlen;
- size_t sent;
-
- *psent = 0;
-
- for(p = pkt; p < end; p += gsolen) {
- size_t len = CURLMIN(gsolen, (size_t)(end - p));
- CURLcode curlcode = do_sendmsg(cf, data, p, len, len, &sent);
- if(curlcode != CURLE_OK) {
- return curlcode;
- }
- *psent += sent;
- }
-
- return CURLE_OK;
-}
-
-static CURLcode do_sendmsg(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- const uint8_t *pkt, size_t pktlen, size_t gsolen,
- size_t *psent)
-{
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
-#ifdef HAVE_SENDMSG
- struct iovec msg_iov;
- struct msghdr msg = {0};
- ssize_t sent;
-#if defined(__linux__) && defined(UDP_SEGMENT)
- uint8_t msg_ctrl[32];
- struct cmsghdr *cm;
-#endif
-
- *psent = 0;
- msg_iov.iov_base = (uint8_t *)pkt;
- msg_iov.iov_len = pktlen;
- msg.msg_iov = &msg_iov;
- msg.msg_iovlen = 1;
-
-#if defined(__linux__) && defined(UDP_SEGMENT)
- if(pktlen > gsolen) {
- /* Only set this, when we need it. macOS, for example,
- * does not seem to like a msg_control of length 0. */
- msg.msg_control = msg_ctrl;
- assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t)));
- msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
- cm = CMSG_FIRSTHDR(&msg);
- cm->cmsg_level = SOL_UDP;
- cm->cmsg_type = UDP_SEGMENT;
- cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
- *(uint16_t *)(void *)CMSG_DATA(cm) = gsolen & 0xffff;
- }
-#endif
-
-
- while((sent = sendmsg(ctx->sockfd, &msg, 0)) == -1 && SOCKERRNO == EINTR)
- ;
-
- if(sent == -1) {
- switch(SOCKERRNO) {
- case EAGAIN:
-#if EAGAIN != EWOULDBLOCK
- case EWOULDBLOCK:
-#endif
- return CURLE_AGAIN;
- case EMSGSIZE:
- /* UDP datagram is too large; caused by PMTUD. Just let it be lost. */
- break;
- case EIO:
- if(pktlen > gsolen) {
- /* GSO failure */
- failf(data, "sendmsg() returned %zd (errno %d); disable GSO", sent,
- SOCKERRNO);
- ctx->no_gso = TRUE;
- return send_packet_no_gso(cf, data, pkt, pktlen, gsolen, psent);
- }
- /* FALLTHROUGH */
- default:
- failf(data, "sendmsg() returned %zd (errno %d)", sent, SOCKERRNO);
- return CURLE_SEND_ERROR;
- }
- }
- else {
- assert(pktlen == (size_t)sent);
- }
-#else
- ssize_t sent;
- (void)gsolen;
-
- *psent = 0;
-
- while((sent = send(ctx->sockfd, (const char *)pkt, pktlen, 0)) == -1 &&
- SOCKERRNO == EINTR)
- ;
-
- if(sent == -1) {
- if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
- return CURLE_AGAIN;
- }
- else {
- failf(data, "send() returned %zd (errno %d)", sent, SOCKERRNO);
- if(SOCKERRNO != EMSGSIZE) {
- return CURLE_SEND_ERROR;
- }
- /* UDP datagram is too large; caused by PMTUD. Just let it be
- lost. */
- }
- }
-#endif
-
- *psent = pktlen;
-
- return CURLE_OK;
-}
-
-static CURLcode send_packet(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- const uint8_t *pkt, size_t pktlen, size_t gsolen,
- size_t *psent)
-{
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
-
- DEBUGF(LOG_CF(data, cf, "egress, send %zu bytes", pktlen));
- if(ctx->no_gso && pktlen > gsolen) {
- return send_packet_no_gso(cf, data, pkt, pktlen, gsolen, psent);
- }
-
- return do_sendmsg(cf, data, pkt, pktlen, gsolen, psent);
-}
-
-static void push_blocked_pkt(struct Curl_cfilter *cf, const uint8_t *pkt,
- size_t pktlen, size_t gsolen)
-{
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct blocked_pkt *blkpkt;
-
- assert(ctx->num_blocked_pkt <
- sizeof(ctx->blocked_pkt) / sizeof(ctx->blocked_pkt[0]));
-
- blkpkt = &ctx->blocked_pkt[ctx->num_blocked_pkt++];
-
- blkpkt->pkt = pkt;
- blkpkt->pktlen = pktlen;
- blkpkt->gsolen = gsolen;
-}
-
-static CURLcode send_blocked_pkt(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
- size_t sent;
- CURLcode curlcode;
- struct blocked_pkt *blkpkt;
-
- for(; ctx->num_blocked_pkt_sent < ctx->num_blocked_pkt;
- ++ctx->num_blocked_pkt_sent) {
- blkpkt = &ctx->blocked_pkt[ctx->num_blocked_pkt_sent];
- curlcode = send_packet(cf, data, blkpkt->pkt,
- blkpkt->pktlen, blkpkt->gsolen, &sent);
-
- if(curlcode) {
- if(curlcode == CURLE_AGAIN) {
- blkpkt->pkt += sent;
- blkpkt->pktlen -= sent;
- }
- return curlcode;
- }
- }
-
- ctx->num_blocked_pkt = 0;
- ctx->num_blocked_pkt_sent = 0;
-
+ (void)pktcount;
+ (void)total_recvd;
+ DEBUGF(LOG_CF(data, cf, "ingress, recvd %zu packets with %zd bytes",
+ pktcount, total_recvd));
return CURLE_OK;
}
int rv;
size_t sent;
ngtcp2_ssize outlen;
- uint8_t *outpos = ctx->pktbuf;
+ uint8_t *outpos = ctx->q.pktbuf;
size_t max_udp_payload_size =
ngtcp2_conn_get_max_tx_udp_payload_size(ctx->qconn);
size_t path_max_udp_payload_size =
ngtcp2_conn_get_path_max_tx_udp_payload_size(ctx->qconn);
size_t max_pktcnt =
- CURLMIN(MAX_PKT_BURST, ctx->pktbuflen / max_udp_payload_size);
+ CURLMIN(MAX_PKT_BURST, ctx->q.pktbuflen / max_udp_payload_size);
size_t pktcnt = 0;
- size_t gsolen;
+ size_t gsolen = 0; /* this disables gso until we have a clue */
ngtcp2_path_storage ps;
ngtcp2_tstamp ts = timestamp();
ngtcp2_tstamp expiry;
return CURLE_SEND_ERROR;
}
- if(ctx->num_blocked_pkt) {
- curlcode = send_blocked_pkt(cf, data);
+ if(ctx->q.num_blocked_pkt) {
+ curlcode = vquic_send_blocked_pkt(cf, data, &ctx->q);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
Curl_expire(data, 1, EXPIRE_QUIC);
&ndatalen, flags, stream_id,
(const ngtcp2_vec *)vec, veccnt, ts);
if(outlen == 0) {
- if(outpos != ctx->pktbuf) {
- curlcode = send_packet(cf, data, ctx->pktbuf,
- outpos - ctx->pktbuf, gsolen, &sent);
+ /* ngtcp2 does not want to send more packets, if the buffer is
+ * not empty, send that now */
+ if(outpos != ctx->q.pktbuf) {
+ curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+ outpos - ctx->q.pktbuf, gsolen, &sent);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
- push_blocked_pkt(cf, ctx->pktbuf + sent,
- outpos - ctx->pktbuf - sent,
- gsolen);
+ vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent,
+ outpos - ctx->q.pktbuf - sent,
+ gsolen);
Curl_expire(data, 1, EXPIRE_QUIC);
return CURLE_OK;
}
return curlcode;
}
}
-
- break;
+ /* done for now */
+ goto out;
}
if(outlen < 0) {
switch(outlen) {
nghttp3_conn_shutdown_stream_write(ctx->h3conn, stream_id);
continue;
case NGTCP2_ERR_WRITE_MORE:
+ /* ngtcp2 wants to send more. update the flow of the stream whose data
+ * is in the buffer and continue */
assert(ndatalen >= 0);
rv = nghttp3_conn_add_write_offset(ctx->h3conn, stream_id, ndatalen);
if(rv) {
}
}
else if(ndatalen >= 0) {
+ /* ngtcp2 thinks it has added all it wants. Update the stream */
rv = nghttp3_conn_add_write_offset(ctx->h3conn, stream_id, ndatalen);
if(rv) {
failf(data, "nghttp3_conn_add_write_offset returned error: %s\n",
}
}
+ /* advance to the end of the buffered packet data */
outpos += outlen;
if(pktcnt == 0) {
+ /* first packet buffer chunk. use this as gsolen. It's how ngtcp2
+ * indicates the intended segment size. */
gsolen = outlen;
}
else if((size_t)outlen > gsolen ||
- (gsolen > path_max_udp_payload_size &&
- (size_t)outlen != gsolen)) {
+ (gsolen > path_max_udp_payload_size && (size_t)outlen != gsolen)) {
/* Packet larger than path_max_udp_payload_size is PMTUD probe
packet and it might not be sent because of EMSGSIZE. Send
them separately to minimize the loss. */
- curlcode = send_packet(cf, data, ctx->pktbuf,
- outpos - outlen - ctx->pktbuf, gsolen, &sent);
+ /* send the pktbuf *before* the last addition */
+ curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+ outpos - outlen - ctx->q.pktbuf, gsolen, &sent);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
- push_blocked_pkt(cf, ctx->pktbuf + sent,
- outpos - outlen - ctx->pktbuf - sent, gsolen);
- push_blocked_pkt(cf, outpos - outlen, outlen, outlen);
+ /* blocked, add the pktbuf *before* and *at* the last addition
+ * separately to the blocked packages */
+ vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent,
+ outpos - outlen - ctx->q.pktbuf - sent, gsolen);
+ vquic_push_blocked_pkt(cf, &ctx->q, outpos - outlen, outlen, outlen);
Curl_expire(data, 1, EXPIRE_QUIC);
return CURLE_OK;
}
return curlcode;
}
- curlcode = send_packet(cf, data, outpos - outlen, outlen,
- outlen, &sent);
+ /* send the pktbuf *at* the last addition */
+ curlcode = vquic_send_packet(cf, data, &ctx->q, outpos - outlen, outlen,
+ outlen, &sent);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
assert(0 == sent);
- push_blocked_pkt(cf, outpos - outlen, outlen, outlen);
+ vquic_push_blocked_pkt(cf, &ctx->q, outpos - outlen, outlen, outlen);
Curl_expire(data, 1, EXPIRE_QUIC);
return CURLE_OK;
}
return curlcode;
}
-
+ /* pktbuf has been completely sent */
pktcnt = 0;
- outpos = ctx->pktbuf;
+ outpos = ctx->q.pktbuf;
continue;
}
if(++pktcnt >= max_pktcnt || (size_t)outlen < gsolen) {
- curlcode = send_packet(cf, data, ctx->pktbuf,
- outpos - ctx->pktbuf, gsolen, &sent);
+ /* enough packets or last one is shorter than the intended
+ * segment size, indicating that it is time to send. */
+ curlcode = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+ outpos - ctx->q.pktbuf, gsolen, &sent);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
- push_blocked_pkt(cf, ctx->pktbuf + sent, outpos - ctx->pktbuf - sent,
- gsolen);
+ vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf + sent,
+ outpos - ctx->q.pktbuf - sent, gsolen);
Curl_expire(data, 1, EXPIRE_QUIC);
return CURLE_OK;
}
return curlcode;
}
-
+ /* pktbuf has been completely sent */
pktcnt = 0;
- outpos = ctx->pktbuf;
+ outpos = ctx->q.pktbuf;
}
}
+out:
+ /* non-errored exit. check when we should run again. */
expiry = ngtcp2_conn_get_expiry(ctx->qconn);
if(expiry != UINT64_MAX) {
if(expiry <= ts) {
struct HTTP *stream = data->req.p.http;
Curl_dyn_free(&stream->overflow);
free(stream->h3out);
+#ifdef DEBUGBUILD
+ if(ctx->qconn) {
+ ngtcp2_conn_stat stat;
+ ngtcp2_conn_get_conn_stat(ctx->qconn, &stat);
+ DEBUGF(LOG_CF(data, cf, "ngtcp2 conn stat: cwnd=%" PRIu64 ", "
+ "max_tx_payload=%zu",
+ stat.cwnd, stat.max_tx_udp_payload_size));
+ }
+#endif
break;
}
-
case CF_CTRL_DATA_DONE_SEND: {
struct HTTP *stream = data->req.p.http;
stream->upload_done = TRUE;
(void)nghttp3_conn_resume_stream(ctx->h3conn, stream->stream3_id);
break;
}
-
case CF_CTRL_DATA_IDLE:
if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) {
if(cf_flush_egress(cf, data)) {
if(ctx->sslctx)
wolfSSL_CTX_free(ctx->sslctx);
#endif
- free(ctx->pktbuf);
+ vquic_ctx_free(&ctx->q);
if(ctx->h3conn)
nghttp3_conn_del(ctx->h3conn);
if(ctx->qconn)
(uint8_t *)buffer, sizeof(buffer),
&ctx->last_error, ts);
if(rc > 0) {
- while((send(ctx->sockfd, buffer, rc, 0) == -1) &&
+ while((send(ctx->q.sockfd, buffer, rc, 0) == -1) &&
SOCKERRNO == EINTR);
}
int rc;
int rv;
CURLcode result;
- ngtcp2_path path; /* TODO: this must be initialized properly */
const struct Curl_sockaddr_ex *sockaddr;
int qfd;
ctx->qlogfd = qfd; /* -1 if failure above */
quic_settings(ctx, data);
- Curl_cf_socket_peek(cf->next, data, &ctx->sockfd,
+ result = vquic_ctx_init(&ctx->q,
+ NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE * MAX_PKT_BURST);
+ if(result)
+ return result;
+
+ Curl_cf_socket_peek(cf->next, data, &ctx->q.sockfd,
&sockaddr, NULL, NULL, NULL, NULL);
- ctx->local_addrlen = sizeof(ctx->local_addr);
- rv = getsockname(ctx->sockfd, (struct sockaddr *)&ctx->local_addr,
- &ctx->local_addrlen);
+ ctx->q.local_addrlen = sizeof(ctx->q.local_addr);
+ rv = getsockname(ctx->q.sockfd, (struct sockaddr *)&ctx->q.local_addr,
+ &ctx->q.local_addrlen);
if(rv == -1)
return CURLE_QUIC_CONNECT_ERROR;
- ngtcp2_addr_init(&path.local, (struct sockaddr *)&ctx->local_addr,
- ctx->local_addrlen);
- ngtcp2_addr_init(&path.remote, &sockaddr->sa_addr, sockaddr->addrlen);
+ ngtcp2_addr_init(&ctx->connected_path.local,
+ (struct sockaddr *)&ctx->q.local_addr,
+ ctx->q.local_addrlen);
+ ngtcp2_addr_init(&ctx->connected_path.remote,
+ &sockaddr->sa_addr, sockaddr->addrlen);
- rc = ngtcp2_conn_client_new(&ctx->qconn, &ctx->dcid, &ctx->scid, &path,
+ rc = ngtcp2_conn_client_new(&ctx->qconn, &ctx->dcid, &ctx->scid,
+ &ctx->connected_path,
NGTCP2_PROTO_VER_V1, &ng_callbacks,
&ctx->settings, &ctx->transport_params,
NULL, cf);
ngtcp2_connection_close_error_default(&ctx->last_error);
-#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG)
- ctx->no_gso = FALSE;
-#else
- ctx->no_gso = TRUE;
-#endif
-
- ctx->num_blocked_pkt = 0;
- ctx->num_blocked_pkt_sent = 0;
- memset(&ctx->blocked_pkt, 0, sizeof(ctx->blocked_pkt));
-
- ctx->pktbuflen = NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE * MAX_PKT_BURST;
- ctx->pktbuf = malloc(ctx->pktbuflen);
- if(!ctx->pktbuf) {
- ngtcp2_conn_del(ctx->qconn);
- ctx->qconn = NULL;
- return CURLE_OUT_OF_MEMORY;
- }
-
ctx->conn_ref.get_conn = get_conn;
ctx->conn_ref.user_data = cf;
#include "progress.h"
#include "strerror.h"
#include "vquic.h"
+#include "vquic_int.h"
#include "curl_quiche.h"
#include "transfer.h"
#include "h2h3.h"
#define QUIC_MAX_DATA (1*1024*1024)
#define QUIC_IDLE_TIMEOUT (60 * 1000) /* milliseconds */
+/* how many UDP packets to send max in one call */
+#define MAX_PKT_BURST 10
+#define MAX_UDP_PAYLOAD_SIZE 1452
/*
* Store quiche version info in this buffer.
struct h3_event_node {
struct h3_event_node *next;
- int64_t stream3_id;
quiche_h3_event *ev;
};
struct cf_quiche_ctx {
- curl_socket_t sockfd;
- struct sockaddr_storage local_addr;
- socklen_t local_addrlen;
+ struct cf_quic_ctx q;
quiche_conn *qconn;
quiche_config *cfg;
quiche_h3_conn *h3c;
uint8_t scid[QUICHE_MAX_CONN_ID_LEN];
SSL_CTX *sslctx;
SSL *ssl;
- struct h3_event_node *pending;
- struct curltime connect_started; /* time the current attempt started */
- struct curltime handshake_done; /* time connect handshake finished */
- int first_reply_ms; /* ms since first data arrived */
- struct curltime reconnect_at; /* time the next attempt should start */
- bool goaway;
+ struct curltime started_at; /* time the current attempt started */
+ struct curltime handshake_at; /* time connect handshake finished */
+ struct curltime first_byte_at; /* when first byte was recvd */
+ struct curltime reconnect_at; /* time the next attempt should start */
+ BIT(goaway); /* got GOAWAY from server */
+ BIT(got_first_byte); /* if first byte was received */
};
}
#endif
-static void h3_clear_pending(struct cf_quiche_ctx *ctx)
+static void h3_clear_pending(struct Curl_easy *data)
{
- if(ctx->pending) {
+ struct HTTP *stream = data->req.p.http;
+
+ if(stream->pending) {
struct h3_event_node *node, *next;
- for(node = ctx->pending; node; node = next) {
+ for(node = stream->pending; node; node = next) {
next = node->next;
quiche_h3_event_free(node->ev);
free(node);
}
- ctx->pending = NULL;
+ stream->pending = NULL;
}
}
static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx)
{
if(ctx) {
- if(ctx->pending)
- h3_clear_pending(ctx);
+ vquic_ctx_free(&ctx->q);
if(ctx->qconn)
quiche_conn_free(ctx->qconn);
if(ctx->h3config)
if(ctx->cfg)
quiche_config_free(ctx->cfg);
memset(ctx, 0, sizeof(*ctx));
- ctx->first_reply_ms = -1;
}
}
+static void notify_drain(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ (void)cf;
+ data->state.drain = 1;
+ Curl_expire(data, 0, EXPIRE_RUN_NOW);
+}
+
static CURLcode h3_add_event(struct Curl_cfilter *cf,
struct Curl_easy *data,
- int64_t stream3_id, quiche_h3_event *ev,
- size_t *pqlen)
+ int64_t stream3_id, quiche_h3_event *ev)
{
- struct cf_quiche_ctx *ctx = cf->ctx;
struct Curl_easy *mdata;
- struct h3_event_node *node, **pnext = &ctx->pending;
- size_t qlen;
+ struct h3_event_node *node, **pnext;
DEBUGASSERT(data->multi);
for(mdata = data->multi->easyp; mdata; mdata = mdata->next) {
}
if(!mdata) {
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] event discarded, easy handle "
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] event discarded, easy handle "
"not found", stream3_id));
quiche_h3_event_free(ev);
- *pqlen = 0;
return CURLE_OK;
}
node = calloc(sizeof(*node), 1);
- if(!node)
+ if(!node) {
+ quiche_h3_event_free(ev);
return CURLE_OUT_OF_MEMORY;
- node->stream3_id = stream3_id;
+ }
node->ev = ev;
/* append to process them in order of arrival */
- qlen = 0;
+ pnext = &mdata->req.p.http->pending;
while(*pnext) {
pnext = &((*pnext)->next);
- ++qlen;
}
*pnext = node;
- *pqlen = qlen + 1;
- if(!mdata->state.drain) {
- /* tell the multi handle that this data needs processing */
- mdata->state.drain = 1;
- Curl_expire(mdata, 0, EXPIRE_RUN_NOW);
- }
+ notify_drain(cf, mdata);
return CURLE_OK;
}
return 0;
}
+static ssize_t cf_recv_body(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ char *buf, size_t len,
+ CURLcode *err)
+{
+ struct cf_quiche_ctx *ctx = cf->ctx;
+ struct HTTP *stream = data->req.p.http;
+ ssize_t nread;
+ size_t offset = 0;
+
+ if(!stream->firstbody) {
+ /* add a header-body separator CRLF */
+ offset = 2;
+ }
+ nread = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->stream3_id,
+ (unsigned char *)buf + offset, len - offset);
+ if(nread >= 0) {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][DATA] len=%zd",
+ stream->stream3_id, nread));
+ if(!stream->firstbody) {
+ stream->firstbody = TRUE;
+ buf[0] = '\r';
+ buf[1] = '\n';
+ nread += offset;
+ }
+ }
+ else if(nread == -1) {
+ *err = CURLE_AGAIN;
+ stream->h3_recving_data = FALSE;
+ }
+ else {
+ failf(data, "Error %zd in HTTP/3 response body for stream[%"PRId64"]",
+ nread, stream->stream3_id);
+ stream->closed = TRUE;
+ stream->reset = TRUE;
+ streamclose(cf->conn, "Reset of stream");
+ stream->h3_recving_data = FALSE;
+ nread = -1;
+ *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+ }
+ return nread;
+}
+
+#ifdef DEBUGBUILD
+static const char *cf_ev_name(quiche_h3_event *ev)
+{
+ switch(quiche_h3_event_type(ev)) {
+ case QUICHE_H3_EVENT_HEADERS:
+ return "HEADERS";
+ case QUICHE_H3_EVENT_DATA:
+ return "DATA";
+ case QUICHE_H3_EVENT_RESET:
+ return "RESET";
+ case QUICHE_H3_EVENT_FINISHED:
+ return "FINISHED";
+ case QUICHE_H3_EVENT_GOAWAY:
+ return "GOAWAY";
+ default:
+ return "Unknown";
+ }
+}
+#else
+#define cf_ev_name(x) ""
+#endif
+
static ssize_t h3_process_event(struct Curl_cfilter *cf,
struct Curl_easy *data,
char *buf, size_t len,
quiche_h3_event *ev,
CURLcode *err)
{
- struct cf_quiche_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
- ssize_t recvd = -1;
- ssize_t rcode;
+ ssize_t recvd = 0;
int rc;
struct h3h1header headers;
DEBUGASSERT(stream3_id == stream->stream3_id);
+ *err = CURLE_OK;
switch(quiche_h3_event_type(ev)) {
case QUICHE_H3_EVENT_HEADERS:
stream->h3_got_header = TRUE;
break;
}
recvd = headers.nlen;
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, HEADERS len=%zd",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][HEADERS] len=%zd",
stream3_id, recvd));
break;
case QUICHE_H3_EVENT_DATA:
- if(!stream->firstbody) {
- /* add a header-body separator CRLF */
- buf[0] = '\r';
- buf[1] = '\n';
- buf += 2;
- len -= 2;
- stream->firstbody = TRUE;
- recvd = 2; /* two bytes already */
- }
- else
+ DEBUGASSERT(!stream->closed);
+ stream->h3_recving_data = TRUE;
+ recvd = cf_recv_body(cf, data, buf, len, err);
+ if(recvd < 0) {
+ if(*err != CURLE_AGAIN)
+ return -1;
recvd = 0;
- rcode = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream3_id,
- (unsigned char *)buf, len);
- if(rcode <= 0) {
- failf(data, "Error %zd in HTTP/3 response body for stream[%"PRId64"]",
- rcode, stream3_id);
- recvd = -1;
- *err = CURLE_AGAIN;
- break;
}
- stream->h3_recving_data = TRUE;
- recvd += rcode;
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, DATA len=%zd",
- stream3_id, rcode));
break;
case QUICHE_H3_EVENT_RESET:
- if(quiche_conn_is_draining(ctx->qconn) && !stream->h3_got_header) {
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] stream RESET without response, "
- "connection is draining", stream3_id));
- }
- else {
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, RESET", stream3_id));
- }
- streamclose(cf->conn, "Stream reset");
- *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
- recvd = -1;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][RESET]", stream3_id));
+ stream->closed = TRUE;
+ stream->reset = TRUE;
+ /* streamclose(cf->conn, "Reset of stream");*/
+ stream->h3_recving_data = FALSE;
break;
case QUICHE_H3_EVENT_FINISHED:
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, FINISHED", stream3_id));
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][FINISHED]", stream3_id));
stream->closed = TRUE;
- streamclose(cf->conn, "End of stream");
- *err = CURLE_OK;
- recvd = 0; /* end of stream */
+ /* streamclose(cf->conn, "End of stream");*/
+ stream->h3_recving_data = FALSE;
break;
case QUICHE_H3_EVENT_GOAWAY:
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, GOAWAY", stream3_id));
- recvd = -1;
- *err = CURLE_AGAIN;
- ctx->goaway = TRUE;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"][GOAWAY]", stream3_id));
break;
default:
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, unhandled event %d",
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] recv, unhandled event %d",
stream3_id, quiche_h3_event_type(ev)));
break;
}
char *buf, size_t len,
CURLcode *err)
{
- struct cf_quiche_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
- struct h3_event_node *node = ctx->pending, **pnext = &ctx->pending;
+ struct h3_event_node *node = stream->pending, **pnext = &stream->pending;
ssize_t recvd = 0, erecvd;
+ *err = CURLE_OK;
DEBUGASSERT(stream);
- while(node) {
- if(node->stream3_id == stream->stream3_id) {
- erecvd = h3_process_event(cf, data, buf, len,
- node->stream3_id, node->ev, err);
- quiche_h3_event_free(node->ev);
- *pnext = node->next;
- free(node);
- node = *pnext;
- if(erecvd < 0) {
- recvd = erecvd;
- break;
- }
- recvd += erecvd;
- if(erecvd > INT_MAX || (size_t)erecvd >= len)
- break;
- buf += erecvd;
- len -= erecvd;
- }
- else {
- pnext = &node->next;
- node = node->next;
+ while(node && len) {
+ erecvd = h3_process_event(cf, data, buf, len,
+ stream->stream3_id, node->ev, err);
+ quiche_h3_event_free(node->ev);
+ *pnext = node->next;
+ free(node);
+ node = *pnext;
+ if(erecvd < 0) {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] process event -> %d",
+ stream->stream3_id, *err));
+ return erecvd;
}
+ recvd += erecvd;
+ *err = CURLE_OK;
+ buf += erecvd;
+ len -= erecvd;
}
return recvd;
}
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
- ssize_t recvd;
- uint8_t *buf = (uint8_t *)data->state.buffer;
- size_t bufsize = data->set.buffer_size;
- struct sockaddr_storage from;
- socklen_t from_len;
+ int64_t stream3_id = data->req.p.http? data->req.p.http->stream3_id : -1;
+ uint8_t buf[65536];
+ size_t bufsize = sizeof(buf);
+ struct sockaddr_storage remote_addr;
+ socklen_t remote_addrlen;
quiche_recv_info recv_info;
+ ssize_t recvd, nread;
+ ssize_t total = 0, pkts = 0;
DEBUGASSERT(ctx->qconn);
quiche_conn_on_timeout(ctx->qconn);
do {
- from_len = sizeof(from);
-
- recvd = recvfrom(ctx->sockfd, buf, bufsize, 0,
- (struct sockaddr *)&from, &from_len);
-
+ remote_addrlen = sizeof(remote_addr);
+ while((recvd = recvfrom(ctx->q.sockfd, (char *)buf, bufsize, 0,
+ (struct sockaddr *)&remote_addr,
+ &remote_addrlen)) == -1 &&
+ SOCKERRNO == EINTR)
+ ;
if(recvd < 0) {
- if((SOCKERRNO == EAGAIN) || (SOCKERRNO == EWOULDBLOCK))
- goto out;
+ if((SOCKERRNO == EAGAIN) || (SOCKERRNO == EWOULDBLOCK)) {
+ break;
+ }
if(SOCKERRNO == ECONNREFUSED) {
const char *r_ip;
int r_port;
return CURLE_COULDNT_CONNECT;
}
failf(data, "quiche: recvfrom() unexpectedly returned %zd "
- "(errno: %d, socket %d)", recvd, SOCKERRNO, ctx->sockfd);
+ "(errno: %d, socket %d)", recvd, SOCKERRNO, ctx->q.sockfd);
return CURLE_RECV_ERROR;
}
- DEBUGF(LOG_CF(data, cf, "ingress, recvd %zd bytes", recvd));
- recv_info.from = (struct sockaddr *) &from;
- recv_info.from_len = from_len;
- recv_info.to = (struct sockaddr *) &ctx->local_addr;
- recv_info.to_len = ctx->local_addrlen;
-
- recvd = quiche_conn_recv(ctx->qconn, buf, recvd, &recv_info);
- if(recvd == QUICHE_ERR_DONE)
- goto out;
-
- if(recvd < 0) {
- if(QUICHE_ERR_TLS_FAIL == recvd) {
+ total += recvd;
+ ++pkts;
+ if(recvd > 0 && !ctx->got_first_byte) {
+ ctx->first_byte_at = Curl_now();
+ ctx->got_first_byte = TRUE;
+ }
+ recv_info.from = (struct sockaddr *) &remote_addr;
+ recv_info.from_len = remote_addrlen;
+ recv_info.to = (struct sockaddr *) &ctx->q.local_addr;
+ recv_info.to_len = ctx->q.local_addrlen;
+
+ nread = quiche_conn_recv(ctx->qconn, buf, recvd, &recv_info);
+ if(nread < 0) {
+ if(QUICHE_ERR_DONE == nread) {
+ DEBUGF(LOG_CF(data, cf, "ingress, quiche is DONE"));
+ return CURLE_OK;
+ }
+ else if(QUICHE_ERR_TLS_FAIL == nread) {
long verify_ok = SSL_get_verify_result(ctx->ssl);
if(verify_ok != X509_V_OK) {
failf(data, "SSL certificate problem: %s",
X509_verify_cert_error_string(verify_ok));
-
return CURLE_PEER_FAILED_VERIFICATION;
}
}
-
- failf(data, "quiche_conn_recv() == %zd", recvd);
-
- return CURLE_RECV_ERROR;
+ else {
+ failf(data, "quiche_conn_recv() == %zd", nread);
+ return CURLE_RECV_ERROR;
+ }
}
- if(ctx->first_reply_ms < 0) {
- timediff_t ms = Curl_timediff(Curl_now(), ctx->connect_started);
- ctx->first_reply_ms = (ms < INT_MAX)? (int)ms : INT_MAX;
+ else if(nread < recvd) {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] ingress, quiche only "
+ "accepted %zd/%zd bytes",
+ stream3_id, nread, recvd));
}
- } while(1);
-out:
+ } while(pkts < 1000); /* arbitrary */
+
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] ingress, recvd %zd bytes "
+ "in %zd packets", stream3_id, total, pkts));
return CURLE_OK;
}
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
- ssize_t sent;
- uint8_t out[1200];
- int64_t timeout_ns;
+ int64_t stream3_id = data->req.p.http? data->req.p.http->stream3_id : -1;
quiche_send_info send_info;
+ ssize_t outlen, total_len = 0;
+ size_t max_udp_payload_size =
+ quiche_conn_max_send_udp_payload_size(ctx->qconn);
+ size_t gsolen = max_udp_payload_size;
+ size_t sent, pktcnt = 0;
+ CURLcode result;
+ int64_t timeout_ns;
- do {
- sent = quiche_conn_send(ctx->qconn, out, sizeof(out), &send_info);
- if(sent == QUICHE_ERR_DONE)
- break;
+ ctx->q.no_gso = TRUE;
+ if(ctx->q.num_blocked_pkt) {
+ result = vquic_send_blocked_pkt(cf, data, &ctx->q);
+ if(result) {
+ if(result == CURLE_AGAIN) {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, still not "
+ "able to send blocked packet", stream3_id));
+ Curl_expire(data, 1, EXPIRE_QUIC);
+ return CURLE_OK;
+ }
+ goto out;
+ }
+ }
- if(sent < 0) {
- failf(data, "quiche_conn_send returned %zd", sent);
- return CURLE_SEND_ERROR;
+ for(;;) {
+ outlen = quiche_conn_send(ctx->qconn, ctx->q.pktbuf, max_udp_payload_size,
+ &send_info);
+ if(outlen == QUICHE_ERR_DONE) {
+ result = CURLE_OK;
+ goto out;
}
- DEBUGF(LOG_CF(data, cf, "egress, send %zu bytes", sent));
- sent = send(ctx->sockfd, out, sent, 0);
- if(sent < 0) {
- failf(data, "send() returned %zd", sent);
- return CURLE_SEND_ERROR;
+ if(outlen < 0) {
+ failf(data, "quiche_conn_send returned %zd", outlen);
+ result = CURLE_SEND_ERROR;
+ goto out;
}
- } while(1);
- /* time until the next timeout event, as nanoseconds. */
- timeout_ns = quiche_conn_timeout_as_nanos(ctx->qconn);
- if(timeout_ns)
- /* expire uses milliseconds */
- Curl_expire(data, (timeout_ns + 999999) / 1000000, EXPIRE_QUIC);
+ /* send the pktbuf *before* the last addition */
+ result = vquic_send_packet(cf, data, &ctx->q, ctx->q.pktbuf,
+ outlen, gsolen, &sent);
+ ++pktcnt;
+ total_len += outlen;
+ if(result) {
+ if(result == CURLE_AGAIN) {
+ /* blocked, add the pktbuf *before* and *at* the last addition
+ * separately to the blocked packages */
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, pushing blocked "
+ "packet with %zd bytes", stream3_id, outlen));
+ vquic_push_blocked_pkt(cf, &ctx->q, ctx->q.pktbuf, outlen, gsolen);
+ Curl_expire(data, 1, EXPIRE_QUIC);
+ return CURLE_OK;
+ }
+ goto out;
+ }
+ }
- return CURLE_OK;
+out:
+ timeout_ns = quiche_conn_timeout_as_nanos(ctx->qconn);
+ if(timeout_ns % 1000000)
+ timeout_ns += 1000000;
+ /* expire resolution is milliseconds */
+ Curl_expire(data, (timeout_ns / 1000000), EXPIRE_QUIC);
+ if(pktcnt)
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] egress, sent %zd packets "
+ "with %zd bytes", stream3_id, pktcnt, total_len));
+ return result;
}
-static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
- char *buf, size_t len, CURLcode *err)
+static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ CURLcode *err)
{
- struct cf_quiche_ctx *ctx = cf->ctx;
- ssize_t recvd = -1;
- ssize_t rcode;
- quiche_h3_event *ev;
struct HTTP *stream = data->req.p.http;
+ ssize_t nread = -1;
- *err = CURLE_AGAIN;
- /* process any pending events for `data` first. if there are,
- * return so the transfer can handle those. We do not want to
- * progress ingress while events are pending here. */
- recvd = h3_process_pending(cf, data, buf, len, err);
- if(recvd < 0) {
- goto out;
- }
- else if(recvd > 0) {
- *err = CURLE_OK;
+ if(stream->reset) {
+ failf(data,
+ "HTTP/3 stream %" PRId64 " reset by server", stream->stream3_id);
+ *err = stream->h3_got_header? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, was reset -> %d",
+ stream->stream3_id, *err));
goto out;
}
- recvd = -1;
- if(cf_process_ingress(cf, data)) {
- DEBUGF(LOG_CF(data, cf, "h3_stream_recv returns on ingress"));
+ if(!stream->h3_got_header) {
+ failf(data,
+ "HTTP/3 stream %" PRId64 " was closed cleanly, but before getting"
+ " all response header fields, treated as error",
+ stream->stream3_id);
+ /* *err = CURLE_PARTIAL_FILE; */
*err = CURLE_RECV_ERROR;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed incomplete"
+ " -> %d", stream->stream3_id, *err));
goto out;
}
-
- if(stream->h3_recving_data) {
- /* body receiving state */
- rcode = quiche_h3_recv_body(ctx->h3c, ctx->qconn, stream->stream3_id,
- (unsigned char *)buf, len);
- if(rcode <= 0) {
- stream->h3_recving_data = FALSE;
- /* fall through into the while loop below */
- }
- else {
- *err = CURLE_OK;
- recvd = rcode;
- goto out;
- }
+ else {
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv, closed ok"
+ " -> %d", stream->stream3_id, *err));
}
+ *err = CURLE_OK;
+ nread = 0;
- while(recvd < 0) {
+out:
+ return nread;
+}
+
+static CURLcode cf_poll_events(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ struct cf_quiche_ctx *ctx = cf->ctx;
+ struct HTTP *stream = data->req.p.http;
+ quiche_h3_event *ev;
+
+ /* Take in the events and distribute them to the transfers. */
+ while(1) {
int64_t stream3_id = quiche_h3_conn_poll(ctx->h3c, ctx->qconn, &ev);
- if(stream3_id < 0)
+ if(stream3_id < 0) {
/* nothing more to do */
break;
+ }
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] recv, queue event %s "
+ "for [h3sid=%"PRId64"]",
+ stream? stream->stream3_id : -1, cf_ev_name(ev),
+ stream3_id));
+ if(h3_add_event(cf, data, stream3_id, ev) != CURLE_OK) {
+ return CURLE_OUT_OF_MEMORY;
+ }
+ }
+ return CURLE_OK;
+}
+
+static ssize_t cf_recv_transfer_data(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ char *buf, size_t len,
+ CURLcode *err)
+{
+ struct HTTP *stream = data->req.p.http;
+ ssize_t recvd = -1;
+ size_t offset = 0;
- if(stream3_id == stream->stream3_id) {
- recvd = h3_process_event(cf, data, buf, len, stream3_id, ev, err);
- quiche_h3_event_free(ev);
+ if(stream->h3_recving_data) {
+ /* try receiving body first */
+ recvd = cf_recv_body(cf, data, buf, len, err);
+ if(recvd < 0) {
+ if(*err != CURLE_AGAIN)
+ return -1;
+ recvd = 0;
}
- else {
- size_t qlen;
- /* event for another transfer, preserver for later */
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv, queue event "
- "for h3[%"PRId64"]", stream->stream3_id, stream3_id));
- if(h3_add_event(cf, data, stream3_id, ev, &qlen) != CURLE_OK) {
- *err = CURLE_OUT_OF_MEMORY;
- goto out;
- }
- if(qlen > 20) {
- Curl_expire(data, 0, EXPIRE_QUIC);
- break;
- }
+ if(recvd > 0) {
+ offset = recvd;
}
}
- if(cf_flush_egress(cf, data)) {
- DEBUGF(LOG_CF(data, cf, "recv(), flush egress failed"));
- *err = CURLE_SEND_ERROR;
- recvd = -1;
- goto out;
+ if(offset < len && stream->pending) {
+ /* process any pending events for `data` first. if there are,
+ * return so the transfer can handle those. We do not want to
+ * progress ingress while events are pending here. */
+ recvd = h3_process_pending(cf, data, buf + offset, len - offset, err);
+ if(recvd < 0) {
+ if(*err != CURLE_AGAIN)
+ return -1;
+ recvd = 0;
+ }
+ if(recvd > 0) {
+ offset += recvd;
+ }
}
- if(recvd >= 0) {
- /* Get this called again to drain the event queue */
- Curl_expire(data, 0, EXPIRE_QUIC);
+ if(offset) {
*err = CURLE_OK;
+ return offset;
}
- else if(stream->closed) {
- *err = CURLE_OK;
+ *err = CURLE_AGAIN;
+ return 0;
+}
+
+static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
+ char *buf, size_t len, CURLcode *err)
+{
+ struct HTTP *stream = data->req.p.http;
+ ssize_t recvd = -1;
+
+ *err = CURLE_AGAIN;
+
+ recvd = cf_recv_transfer_data(cf, data, buf, len, err);
+ if(recvd)
+ goto out;
+ if(stream->closed) {
+ recvd = recv_closed_stream(cf, data, err);
+ goto out;
+ }
+
+ /* we did get nothing from the quiche buffers or pending events.
+ * Take in more data from the connection, any error is fatal */
+ if(cf_process_ingress(cf, data)) {
+ DEBUGF(LOG_CF(data, cf, "h3_stream_recv returns on ingress"));
+ *err = CURLE_RECV_ERROR;
recvd = -1;
+ goto out;
+ }
+ /* poll quiche and distribute the events to the transfers */
+ *err = cf_poll_events(cf, data);
+ if(*err) {
+ recvd = -1;
+ goto out;
}
+ /* try to receive again for this transfer */
+ recvd = cf_recv_transfer_data(cf, data, buf, len, err);
+ if(recvd)
+ goto out;
+ if(stream->closed) {
+ recvd = recv_closed_stream(cf, data, err);
+ goto out;
+ }
+ recvd = -1;
+ *err = CURLE_AGAIN;
+ data->state.drain = 0;
+
out:
- data->state.drain = (recvd >= 0) ? 1 : 0;
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] recv -> %ld, err=%d",
- stream->stream3_id, (long)recvd, *err));
+ if(cf_flush_egress(cf, data)) {
+ DEBUGF(LOG_CF(data, cf, "cf_recv, flush egress failed"));
+ *err = CURLE_SEND_ERROR;
+ return -1;
+ }
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] cf_recv -> %zd, err=%d",
+ stream->stream3_id, recvd, *err));
+ if(recvd > 0)
+ notify_drain(cf, data);
return recvd;
}
CURLcode result = CURLE_OK;
struct h2h3req *hreq = NULL;
- DEBUGF(LOG_CF(data, cf, "cf_http_request %s", data->state.url));
stream->h3req = TRUE; /* send off! */
+ stream->closed = FALSE;
+ stream->reset = FALSE;
result = Curl_pseudo_headers(data, mem, len, NULL, &hreq);
if(result)
/* data sending without specifying the data amount up front */
stream->upload_left = -1; /* unknown, but not zero */
+ stream->upload_done = !stream->upload_left;
stream3_id = quiche_h3_send_request(ctx->h3c, ctx->qconn, nva, nheader,
- stream->upload_left ? FALSE: TRUE);
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] send request %s, upload=%zu",
- stream3_id, data->state.url, stream->upload_left));
+ stream->upload_done);
break;
default:
+ stream->upload_left = 0;
+ stream->upload_done = TRUE;
stream3_id = quiche_h3_send_request(ctx->h3c, ctx->qconn, nva, nheader,
TRUE);
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] send request %s",
- stream3_id, data->state.url));
break;
}
Curl_safefree(nva);
if(stream3_id < 0) {
- DEBUGF(LOG_CF(data, cf, "quiche_h3_send_request returned %ld",
- (long)stream3_id));
+ if(QUICHE_H3_ERR_STREAM_BLOCKED == stream3_id) {
+ DEBUGF(LOG_CF(data, cf, "send_request(%s, body_len=%ld) rejected "
+ "with H3_ERR_STREAM_BLOCKED",
+ data->state.url, (long)stream->upload_left));
+ result = CURLE_AGAIN;
+ goto fail;
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "send_request(%s, body_len=%ld) -> %" PRId64,
+ data->state.url, (long)stream->upload_left, stream3_id));
+ }
result = CURLE_SEND_ERROR;
goto fail;
}
- DEBUGF(LOG_CF(data, cf, "Using HTTP/3 Stream ID: %"PRId64, stream3_id));
stream->stream3_id = stream3_id;
+ infof(data, "Using HTTP/3 Stream ID: %" PRId64 " (easy handle %p)",
+ stream3_id, (void *)data);
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] opened for %s",
+ stream3_id, data->state.url));
Curl_pseudo_free(hreq);
return CURLE_OK;
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
- ssize_t sent;
+ ssize_t nwritten;
+
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu) start",
+ stream->h3req? stream->stream3_id : -1, len));
+ *err = cf_process_ingress(cf, data);
+ if(*err)
+ return -1;
- DEBUGF(LOG_CF(data, cf, "cf_quiche_send(len=%zu) %s", len, data->state.url));
if(!stream->h3req) {
CURLcode result = cf_http_request(cf, data, buf, len);
if(result) {
- *err = CURLE_SEND_ERROR;
+ *err = result;
return -1;
}
- sent = len;
+ nwritten = len;
}
else {
- sent = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id,
- (uint8_t *)buf, len, FALSE);
- if(sent == QUICHE_H3_ERR_DONE) {
- sent = 0;
+ nwritten = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id,
+ (uint8_t *)buf, len, FALSE);
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] send body(len=%zu) -> %zd",
+ stream->stream3_id, len, nwritten));
+ if(nwritten == QUICHE_H3_ERR_DONE) {
+ /* no error, nothing to do (flow control?) */
+ *err = CURLE_AGAIN;
+ nwritten = -1;
}
- else if(sent == QUICHE_H3_TRANSPORT_ERR_FINAL_SIZE) {
+ else if(nwritten == QUICHE_H3_TRANSPORT_ERR_FINAL_SIZE) {
DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> exceeds size", len));
*err = CURLE_SEND_ERROR;
- return -1;
+ nwritten = -1;
}
- else if(sent < 0) {
- DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> %zd", len, sent));
+ else if(nwritten < 0) {
+ DEBUGF(LOG_CF(data, cf, "send_body(len=%zu) -> SEND_ERROR", len));
*err = CURLE_SEND_ERROR;
- return -1;
+ nwritten = -1;
+ }
+ else {
+ *err = CURLE_OK;
}
}
return -1;
}
- *err = CURLE_OK;
- return sent;
+ return nwritten;
+}
+
+static bool stream_is_writeable(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
+{
+ struct cf_quiche_ctx *ctx = cf->ctx;
+ struct HTTP *stream = data->req.p.http;
+
+ /* surely, there must be a better way */
+ quiche_stream_iter *qiter = quiche_conn_writable(ctx->qconn);
+ if(qiter) {
+ uint64_t stream_id;
+ while(quiche_stream_iter_next(qiter, &stream_id)) {
+ if(stream_id == (uint64_t)stream->stream3_id)
+ return TRUE;
+ }
+ quiche_stream_iter_free(qiter);
+ }
+ return FALSE;
}
static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
struct SingleRequest *k = &data->req;
int rv = GETSOCK_BLANK;
- socks[0] = ctx->sockfd;
+ socks[0] = ctx->q.sockfd;
/* in an HTTP/3 connection we can basically always get a frame so we should
always be ready for one */
rv |= GETSOCK_READSOCK(0);
/* we're still uploading or the HTTP/3 layer wants to send data */
- if((k->keepon & (KEEP_SEND|KEEP_SEND_PAUSE)) == KEEP_SEND)
+ if(((k->keepon & (KEEP_SEND|KEEP_SEND_PAUSE)) == KEEP_SEND)
+ && stream_is_writeable(cf, data))
rv |= GETSOCK_WRITESOCK(0);
return rv;
static bool cf_quiche_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
- struct cf_quiche_ctx *ctx = cf->ctx;
struct HTTP *stream = data->req.p.http;
- struct h3_event_node *node;
- for(node = ctx->pending; node; node = node->next) {
- if(node->stream3_id == stream->stream3_id) {
- DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
- "h3[%"PRId64"] has data pending", stream->stream3_id));
- return TRUE;
- }
+ if(stream->pending) {
+ DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
+ "[h3sid=%"PRId64"] has event pending", stream->stream3_id));
+ return TRUE;
+ }
+ if(stream->h3_recving_data) {
+ DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
+ "[h3sid=%"PRId64"] is receiving DATA", stream->stream3_id));
+ return TRUE;
+ }
+ if(data->state.drain) {
+ DEBUGF(LOG_CF((struct Curl_easy *)data, cf,
+ "[h3sid=%"PRId64"] is draining", stream->stream3_id));
+ return TRUE;
}
return FALSE;
}
(void)arg1;
(void)arg2;
switch(event) {
+ case CF_CTRL_DATA_DONE: {
+ struct HTTP *stream = data->req.p.http;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] easy handle is %s",
+ stream->stream3_id, arg1? "cancelled" : "done"));
+ h3_clear_pending(data);
+ break;
+ }
case CF_CTRL_DATA_DONE_SEND: {
struct HTTP *stream = data->req.p.http;
ssize_t sent;
stream->upload_done = TRUE;
sent = quiche_h3_send_body(ctx->h3c, ctx->qconn, stream->stream3_id,
NULL, 0, TRUE);
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] send_body FINISHED",
+ stream->stream3_id));
if(sent < 0)
return CURLE_SEND_ERROR;
break;
}
- case CF_CTRL_DATA_DONE: {
- struct HTTP *stream = data->req.p.http;
- DEBUGF(LOG_CF(data, cf, "h3[%"PRId64"] easy handle is %s",
- stream->stream3_id, arg1? "cancelled" : "done"));
+ case CF_CTRL_DATA_IDLE:
+ /* anything to do? */
break;
- }
case CF_CTRL_CONN_REPORT_STATS:
- if(cf->sockindex == FIRSTSOCKET)
- Curl_pgrsTimeWas(data, TIMER_APPCONNECT, ctx->handshake_done);
+ if(cf->sockindex == FIRSTSOCKET) {
+ if(ctx->got_first_byte)
+ Curl_pgrsTimeWas(data, TIMER_CONNECT, ctx->first_byte_at);
+ Curl_pgrsTimeWas(data, TIMER_APPCONNECT, ctx->handshake_at);
+ }
break;
default:
break;
CURLcode result;
const struct Curl_sockaddr_ex *sockaddr;
- result = Curl_cf_socket_peek(cf->next, data, &ctx->sockfd,
- &sockaddr, NULL, NULL, NULL, NULL);
- if(result)
- return result;
- DEBUGASSERT(ctx->sockfd != CURL_SOCKET_BAD);
+ DEBUGASSERT(ctx->q.sockfd != CURL_SOCKET_BAD);
#ifdef DEBUG_QUICHE
/* initialize debug log callback only once */
}
#endif
+ result = vquic_ctx_init(&ctx->q, MAX_UDP_PAYLOAD_SIZE * MAX_PKT_BURST);
+ if(result)
+ return result;
+
ctx->cfg = quiche_config_new(QUICHE_PROTOCOL_VERSION);
if(!ctx->cfg) {
failf(data, "can't create quiche config");
if(result)
return result;
- ctx->local_addrlen = sizeof(ctx->local_addr);
- rv = getsockname(ctx->sockfd, (struct sockaddr *)&ctx->local_addr,
- &ctx->local_addrlen);
+ Curl_cf_socket_peek(cf->next, data, &ctx->q.sockfd,
+ &sockaddr, NULL, NULL, NULL, NULL);
+ ctx->q.local_addrlen = sizeof(ctx->q.local_addr);
+ rv = getsockname(ctx->q.sockfd, (struct sockaddr *)&ctx->q.local_addr,
+ &ctx->q.local_addrlen);
if(rv == -1)
return CURLE_QUIC_CONNECT_ERROR;
ctx->qconn = quiche_conn_new_with_tls((const uint8_t *)ctx->scid,
sizeof(ctx->scid), NULL, 0,
- (struct sockaddr *)&ctx->local_addr,
- ctx->local_addrlen,
+ (struct sockaddr *)&ctx->q.local_addr,
+ ctx->q.local_addrlen,
&sockaddr->sa_addr, sockaddr->addrlen,
ctx->cfg, ctx->ssl, false);
if(!ctx->qconn) {
result = cf_connect_start(cf, data);
if(result)
goto out;
- ctx->connect_started = now;
+ ctx->started_at = now;
+ result = cf_flush_egress(cf, data);
+ /* we do not expect to be able to recv anything yet */
+ goto out;
}
result = cf_process_ingress(cf, data);
if(quiche_conn_is_established(ctx->qconn)) {
DEBUGF(LOG_CF(data, cf, "handshake complete after %dms",
- (int)Curl_timediff(now, ctx->connect_started)));
- ctx->handshake_done = now;
+ (int)Curl_timediff(now, ctx->started_at)));
+ ctx->handshake_at = now;
result = cf_verify_peer(cf, data);
if(!result) {
DEBUGF(LOG_CF(data, cf, "peer verified"));
return CURLE_OK;
}
case CF_QUERY_CONNECT_REPLY_MS:
- *pres1 = ctx->first_reply_ms;
- DEBUGF(LOG_CF(data, cf, "query connect reply: %dms", *pres1));
+ if(ctx->got_first_byte) {
+ timediff_t ms = Curl_timediff(ctx->first_byte_at, ctx->started_at);
+ *pres1 = (ms < INT_MAX)? (int)ms : INT_MAX;
+ }
+ else
+ *pres1 = -1;
return CURLE_OK;
-
default:
break;
}
#endif
#include "urldata.h"
#include "dynbuf.h"
+#include "cfilters.h"
#include "curl_log.h"
#include "curl_msh3.h"
#include "curl_ngtcp2.h"
#include "curl_quiche.h"
#include "vquic.h"
+#include "vquic_int.h"
/* The last 3 #include files should be in this order */
#include "curl_printf.h"
#endif
}
+CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen)
+{
+ qctx->num_blocked_pkt = 0;
+ qctx->num_blocked_pkt_sent = 0;
+ memset(&qctx->blocked_pkt, 0, sizeof(qctx->blocked_pkt));
+
+ qctx->pktbuflen = pktbuflen;
+ qctx->pktbuf = malloc(qctx->pktbuflen);
+ if(!qctx->pktbuf)
+ return CURLE_OUT_OF_MEMORY;
+
+#if defined(__linux__) && defined(UDP_SEGMENT) && defined(HAVE_SENDMSG)
+ qctx->no_gso = FALSE;
+#else
+ qctx->no_gso = TRUE;
+#endif
+
+ return CURLE_OK;
+}
+
+void vquic_ctx_free(struct cf_quic_ctx *qctx)
+{
+ free(qctx->pktbuf);
+ qctx->pktbuf = NULL;
+}
+
+static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen,
+ size_t gsolen, size_t *psent);
+
+static CURLcode do_sendmsg(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen, size_t gsolen,
+ size_t *psent)
+{
+#ifdef HAVE_SENDMSG
+ struct iovec msg_iov;
+ struct msghdr msg = {0};
+ ssize_t sent;
+#if defined(__linux__) && defined(UDP_SEGMENT)
+ uint8_t msg_ctrl[32];
+ struct cmsghdr *cm;
+#endif
+
+ *psent = 0;
+ msg_iov.iov_base = (uint8_t *)pkt;
+ msg_iov.iov_len = pktlen;
+ msg.msg_iov = &msg_iov;
+ msg.msg_iovlen = 1;
+
+#if defined(__linux__) && defined(UDP_SEGMENT)
+ if(pktlen > gsolen) {
+ /* Only set this, when we need it. macOS, for example,
+ * does not seem to like a msg_control of length 0. */
+ msg.msg_control = msg_ctrl;
+ assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t)));
+ msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t));
+ cm = CMSG_FIRSTHDR(&msg);
+ cm->cmsg_level = SOL_UDP;
+ cm->cmsg_type = UDP_SEGMENT;
+ cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
+ *(uint16_t *)(void *)CMSG_DATA(cm) = gsolen & 0xffff;
+ }
+#endif
+
+
+ while((sent = sendmsg(qctx->sockfd, &msg, 0)) == -1 && SOCKERRNO == EINTR)
+ ;
+
+ if(sent == -1) {
+ switch(SOCKERRNO) {
+ case EAGAIN:
+#if EAGAIN != EWOULDBLOCK
+ case EWOULDBLOCK:
+#endif
+ return CURLE_AGAIN;
+ case EMSGSIZE:
+ /* UDP datagram is too large; caused by PMTUD. Just let it be lost. */
+ break;
+ case EIO:
+ if(pktlen > gsolen) {
+ /* GSO failure */
+ failf(data, "sendmsg() returned %zd (errno %d); disable GSO", sent,
+ SOCKERRNO);
+ qctx->no_gso = TRUE;
+ return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent);
+ }
+ /* FALLTHROUGH */
+ default:
+ failf(data, "sendmsg() returned %zd (errno %d)", sent, SOCKERRNO);
+ return CURLE_SEND_ERROR;
+ }
+ }
+ else {
+ assert(pktlen == (size_t)sent);
+ }
+#else
+ ssize_t sent;
+ (void)gsolen;
+
+ *psent = 0;
+
+ while((sent = send(qctx->sockfd, (const char *)pkt, pktlen, 0)) == -1 &&
+ SOCKERRNO == EINTR)
+ ;
+
+ if(sent == -1) {
+ if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
+ return CURLE_AGAIN;
+ }
+ else {
+ failf(data, "send() returned %zd (errno %d)", sent, SOCKERRNO);
+ if(SOCKERRNO != EMSGSIZE) {
+ return CURLE_SEND_ERROR;
+ }
+ /* UDP datagram is too large; caused by PMTUD. Just let it be
+ lost. */
+ }
+ }
+#endif
+ (void)cf;
+ *psent = pktlen;
+
+ return CURLE_OK;
+}
+
+static CURLcode send_packet_no_gso(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen,
+ size_t gsolen, size_t *psent)
+{
+ const uint8_t *p, *end = pkt + pktlen;
+ size_t sent;
+
+ *psent = 0;
+
+ for(p = pkt; p < end; p += gsolen) {
+ size_t len = CURLMIN(gsolen, (size_t)(end - p));
+ CURLcode curlcode = do_sendmsg(cf, data, qctx, p, len, len, &sent);
+ if(curlcode != CURLE_OK) {
+ return curlcode;
+ }
+ *psent += sent;
+ }
+
+ return CURLE_OK;
+}
+
+CURLcode vquic_send_packet(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen, size_t gsolen,
+ size_t *psent)
+{
+ if(qctx->no_gso && pktlen > gsolen) {
+ return send_packet_no_gso(cf, data, qctx, pkt, pktlen, gsolen, psent);
+ }
+
+ return do_sendmsg(cf, data, qctx, pkt, pktlen, gsolen, psent);
+}
+
+
+
+void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen, size_t gsolen)
+{
+ struct vquic_blocked_pkt *blkpkt;
+
+ (void)cf;
+ assert(qctx->num_blocked_pkt <
+ sizeof(qctx->blocked_pkt) / sizeof(qctx->blocked_pkt[0]));
+
+ blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt++];
+
+ blkpkt->pkt = pkt;
+ blkpkt->pktlen = pktlen;
+ blkpkt->gsolen = gsolen;
+}
+
+CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx)
+{
+ size_t sent;
+ CURLcode curlcode;
+ struct vquic_blocked_pkt *blkpkt;
+
+ (void)cf;
+ for(; qctx->num_blocked_pkt_sent < qctx->num_blocked_pkt;
+ ++qctx->num_blocked_pkt_sent) {
+ blkpkt = &qctx->blocked_pkt[qctx->num_blocked_pkt_sent];
+ curlcode = vquic_send_packet(cf, data, qctx, blkpkt->pkt,
+ blkpkt->pktlen, blkpkt->gsolen, &sent);
+
+ if(curlcode) {
+ if(curlcode == CURLE_AGAIN) {
+ blkpkt->pkt += sent;
+ blkpkt->pktlen -= sent;
+ }
+ return curlcode;
+ }
+ }
+
+ qctx->num_blocked_pkt = 0;
+ qctx->num_blocked_pkt_sent = 0;
+
+ return CURLE_OK;
+}
+
/*
* If the QLOGDIR environment variable is set, open and return a file
* descriptor to write the log to.
#ifdef ENABLE_QUIC
+struct vquic_blocked_pkt {
+ const uint8_t *pkt;
+ size_t pktlen;
+ size_t gsolen;
+};
+
+struct cf_quic_ctx {
+ curl_socket_t sockfd;
+ struct sockaddr_storage local_addr;
+ socklen_t local_addrlen;
+ struct vquic_blocked_pkt blocked_pkt[2];
+ uint8_t *pktbuf;
+ /* the number of entries in blocked_pkt */
+ size_t num_blocked_pkt;
+ size_t num_blocked_pkt_sent;
+ /* the packets blocked by sendmsg (EAGAIN or EWOULDBLOCK) */
+ size_t pktbuflen;
+ /* the number of processed entries in blocked_pkt */
+ bool no_gso;
+};
+
+CURLcode vquic_ctx_init(struct cf_quic_ctx *qctx, size_t pktbuflen);
+void vquic_ctx_free(struct cf_quic_ctx *qctx);
+
+CURLcode vquic_send_packet(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen, size_t gsolen,
+ size_t *psent);
+
+void vquic_push_blocked_pkt(struct Curl_cfilter *cf,
+ struct cf_quic_ctx *qctx,
+ const uint8_t *pkt, size_t pktlen, size_t gsolen);
+
+CURLcode vquic_send_blocked_pkt(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct cf_quic_ctx *qctx);
+
+
#endif /* !ENABLE_QUIC */
#endif /* HEADER_CURL_VQUIC_QUIC_INT_H */
[caddy]
caddy = @CADDY@
-port = 5004
+http_port = 5003
+https_port = 5004
self.info(f'\n')
return props
- def downloads(self, proto: str) -> Dict[str, Any]:
+ def downloads(self, proto: str, test_httpd: bool = True,
+ test_caddy: bool = True) -> Dict[str, Any]:
scores = {}
- if proto == 'h3':
- port = self.env.h3_port
- via = 'nghttpx'
- descr = f'port {port}, proxying httpd'
- else:
- port = self.env.https_port
- via = 'httpd'
- descr = f'port {port}'
- self.info('httpd downloads\n')
- self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score1.data', fsize=1024*1024)
- url1 = f'https://{self.env.domain1}:{port}/score1.data'
- self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score10.data', fsize=10*1024*1024)
- url10 = f'https://{self.env.domain1}:{port}/score10.data'
- self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score100.data', fsize=100*1024*1024)
- url100 = f'https://{self.env.domain1}:{port}/score100.data'
- scores[via] = {
- 'description': descr,
- '1MB-local': self.download_url(url=url1, proto=proto, count=50),
- '10MB-local': self.download_url(url=url10, proto=proto, count=50),
- '100MB-local': self.download_url(url=url100, proto=proto, count=50),
- }
- if self.caddy:
- port = self.env.caddy_port
+ if test_httpd:
+ if proto == 'h3':
+ port = self.env.h3_port
+ via = 'nghttpx'
+ descr = f'port {port}, proxying httpd'
+ else:
+ port = self.env.https_port
+ via = 'httpd'
+ descr = f'port {port}'
+ self.info(f'{via} downloads\n')
+ self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score1.data', fsize=1024*1024)
+ url1 = f'https://{self.env.domain1}:{port}/score1.data'
+ self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score10.data', fsize=10*1024*1024)
+ url10 = f'https://{self.env.domain1}:{port}/score10.data'
+ self._make_docs_file(docs_dir=self.httpd.docs_dir, fname='score100.data', fsize=100*1024*1024)
+ url100 = f'https://{self.env.domain1}:{port}/score100.data'
+ scores[via] = {
+ 'description': descr,
+ '1MB-local': self.download_url(url=url1, proto=proto, count=50),
+ '10MB-local': self.download_url(url=url10, proto=proto, count=50),
+ '100MB-local': self.download_url(url=url100, proto=proto, count=50),
+ }
+ if test_caddy and self.caddy:
+ port = self.caddy.port
via = 'caddy'
descr = f'port {port}'
self.info('caddy downloads\n')
}
return scores
- def score_proto(self, proto: str, handshakes: bool = True, downloads: bool = True):
+ def score_proto(self, proto: str,
+ handshakes: bool = True,
+ downloads: bool = True,
+ test_httpd: bool = True,
+ test_caddy: bool = True):
self.info(f"scoring {proto}\n")
p = {}
if proto == 'h3':
if handshakes:
score['handshakes'] = self.handshakes(proto=proto)
if downloads:
- score['downloads'] = self.downloads(proto=proto)
+ score['downloads'] = self.downloads(proto=proto,
+ test_httpd=test_httpd,
+ test_caddy=test_caddy)
self.info("\n")
return score
help="print text instead of json")
parser.add_argument("-d", "--downloads", action='store_true', default=False,
help="evaluate downloads only")
+ parser.add_argument("--httpd", action='store_true', default=False,
+ help="evaluate httpd server only")
+ parser.add_argument("--caddy", action='store_true', default=False,
+ help="evaluate caddy server only")
parser.add_argument("protocols", nargs='*', help="Name(s) of protocol to score")
args = parser.parse_args()
protocols = args.protocols if len(args.protocols) else ['h2', 'h3']
handshakes = True
downloads = True
+ test_httpd = True
+ test_caddy = True
if args.downloads:
handshakes = False
+ if args.caddy:
+ test_caddy = True
+ test_httpd = False
+ if args.httpd:
+ test_caddy = False
+ test_httpd = True
rv = 0
self.env = Env()
assert self.caddy.start()
for p in protocols:
- score = self.score_proto(proto=p, handshakes=handshakes, downloads=downloads)
+ score = self.score_proto(proto=p, handshakes=handshakes,
+ downloads=downloads,
+ test_caddy=test_caddy,
+ test_httpd=test_httpd)
if args.text:
self.print_score(score)
else:
def _class_scope(self, env, httpd, nghttpx):
if env.have_h3():
nghttpx.start_if_needed()
- fpath = os.path.join(httpd.docs_dir, 'data-1mb.data')
+
+ def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
+ fpath = os.path.join(docs_dir, fname)
data1k = 1024*'x'
+ flen = 0
with open(fpath, 'w') as fd:
- fsize = 0
- while fsize < 1024*1024:
+ while flen < fsize:
fd.write(data1k)
- fsize += len(data1k)
+ flen += len(data1k)
+ return flen
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, httpd):
+ self._make_docs_file(docs_dir=httpd.docs_dir, fname='data1.data', fsize=1024*1024)
+ self._make_docs_file(docs_dir=httpd.docs_dir, fname='data10.data', fsize=10*1024*1024)
+ self._make_docs_file(docs_dir=httpd.docs_dir, fname='data100.data', fsize=100*1024*1024)
# download 1 file
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_08_1MB_serial(self, env: Env,
httpd, nghttpx, repeat, proto):
- count = 2
- urln = f'https://{env.authority_for(env.domain1, proto)}/data-1mb.data?[0-{count-1}]'
+ count = 20
+ urln = f'https://{env.authority_for(env.domain1, proto)}/data1.data?[0-{count-1}]'
curl = CurlClient(env=env)
r = curl.http_download(urls=[urln], alpn_proto=proto)
assert r.exit_code == 0
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_09_1MB_parallel(self, env: Env,
httpd, nghttpx, repeat, proto):
- count = 2
- urln = f'https://{env.authority_for(env.domain1, proto)}/data-1mb.data?[0-{count-1}]'
+ count = 20
+ urln = f'https://{env.authority_for(env.domain1, proto)}/data1.data?[0-{count-1}]'
+ curl = CurlClient(env=env)
+ r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+ '--parallel'
+ ])
+ assert r.exit_code == 0
+ r.check_stats(count=count, exp_status=200)
+
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_02_10_10MB_serial(self, env: Env,
+ httpd, nghttpx, repeat, proto):
+ count = 20
+ urln = f'https://{env.authority_for(env.domain1, proto)}/data10.data?[0-{count-1}]'
+ curl = CurlClient(env=env)
+ r = curl.http_download(urls=[urln], alpn_proto=proto)
+ assert r.exit_code == 0
+ r.check_stats(count=count, exp_status=200)
+
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_02_11_10MB_parallel(self, env: Env,
+ httpd, nghttpx, repeat, proto):
+ count = 20
+ urln = f'https://{env.authority_for(env.domain1, proto)}/data10.data?[0-{count-1}]'
curl = CurlClient(env=env)
r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
'--parallel'
proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
+ if proto == 'h3' and env.curl_uses_lib('quiche'):
+ pytest.skip("quiche not reliable, sometimes reports success")
count = 5
curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, proto)}' \
respdata = open(curl.response_file(i)).readlines()
assert respdata == [data]
+ # upload data parallel, check that they were echoed
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_07_11_upload_parallel(self, env: Env, httpd, nghttpx, repeat, proto):
+ if proto == 'h3' and not env.have_h3():
+ pytest.skip("h3 not supported")
+ count = 50
+ data = '0123456789'
+ curl = CurlClient(env=env)
+ url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
+ r = curl.http_upload(urls=[url], data=data, alpn_proto=proto,
+ extra_args=['--parallel'])
+ assert r.exit_code == 0, f'{r}'
+ r.check_stats(count=count, exp_status=200)
+ for i in range(count):
+ respdata = open(curl.response_file(i)).readlines()
+ assert respdata == [data]
+
# upload large data sequentially, check that this is what was echoed
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
- def test_07_11_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
+ def test_07_20_upload_seq_large(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
fdata = os.path.join(env.gen_dir, 'data-100k')
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
if proto == 'h3' and env.curl_uses_lib('quiche'):
- pytest.skip("quiche stalls on parallel, large uploads")
+ pytest.skip("quiche stalls on parallel, large uploads, unless --trace is used???")
fdata = os.path.join(env.gen_dir, 'data-100k')
- count = 3
+ count = 50
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto,
--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#***************************************************************************
+# _ _ ____ _
+# Project ___| | | | _ \| |
+# / __| | | | |_) | |
+# | (__| |_| | _ <| |___
+# \___|\___/|_| \_\_____|
+#
+# Copyright (C) 2008 - 2022, Daniel Stenberg, <daniel@haxx.se>, et al.
+#
+# This software is licensed as described in the file COPYING, which
+# you should have received as part of this distribution. The terms
+# are also available at https://curl.se/docs/copyright.html.
+#
+# You may opt to use, copy, modify, merge, publish, distribute and/or sell
+# copies of the Software, and permit persons to whom the Software is
+# furnished to do so, under the terms of the COPYING file.
+#
+# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+# KIND, either express or implied.
+#
+# SPDX-License-Identifier: curl
+#
+###########################################################################
+#
+import logging
+import os
+import pytest
+
+from testenv import Env, CurlClient, Caddy
+
+
+log = logging.getLogger(__name__)
+
+
+@pytest.mark.skipif(condition=not Env.has_caddy(), reason=f"missing caddy")
+class TestCaddy:
+
+ @pytest.fixture(autouse=True, scope='class')
+ def caddy(self, env):
+ caddy = Caddy(env=env)
+ assert caddy.start()
+ yield caddy
+ caddy.stop()
+
+ def _make_docs_file(self, docs_dir: str, fname: str, fsize: int):
+ fpath = os.path.join(docs_dir, fname)
+ data1k = 1024*'x'
+ flen = 0
+ with open(fpath, 'w') as fd:
+ while flen < fsize:
+ fd.write(data1k)
+ flen += len(data1k)
+ return flen
+
+ @pytest.fixture(autouse=True, scope='class')
+ def _class_scope(self, env, caddy):
+ self._make_docs_file(docs_dir=caddy.docs_dir, fname='data1.data', fsize=1024*1024)
+ self._make_docs_file(docs_dir=caddy.docs_dir, fname='data10.data', fsize=10*1024*1024)
+ self._make_docs_file(docs_dir=caddy.docs_dir, fname='data100.data', fsize=100*1024*1024)
+
+ # download 1 file
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_08_01_download_1(self, env: Env, caddy: Caddy, repeat, proto):
+ if proto == 'h3' and not env.have_h3_curl():
+ pytest.skip("h3 not supported in curl")
+ curl = CurlClient(env=env)
+ url = f'https://{env.domain1}:{caddy.port}/data.json'
+ r = curl.http_download(urls=[url], alpn_proto=proto)
+ assert r.exit_code == 0, f'{r}'
+ r.check_stats(count=1, exp_status=200)
+
+ # download 1MB files sequentially
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_08_02_download_1mb_sequential(self, env: Env, caddy: Caddy,
+ repeat, proto):
+ if proto == 'h3' and not env.have_h3_curl():
+ pytest.skip("h3 not supported in curl")
+ count = 50
+ curl = CurlClient(env=env)
+ urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
+ r = curl.http_download(urls=[urln], alpn_proto=proto)
+ assert r.exit_code == 0
+ r.check_stats(count=count, exp_status=200)
+ # sequential transfers will open 1 connection
+ assert r.total_connects == 1
+
+ # download 1MB files parallel
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_08_03_download_1mb_parallel(self, env: Env, caddy: Caddy,
+ repeat, proto):
+ if proto == 'h3' and not env.have_h3_curl():
+ pytest.skip("h3 not supported in curl")
+ count = 50
+ curl = CurlClient(env=env)
+ urln = f'https://{env.domain1}:{caddy.port}/data1.data?[0-{count-1}]'
+ r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+ '--parallel'
+ ])
+ assert r.exit_code == 0
+ r.check_stats(count=count, exp_status=200)
+ if proto == 'http/1.1':
+ # http/1.1 parallel transfers will open multiple connections
+ assert r.total_connects > 1
+ else:
+ assert r.total_connects == 1
+
+ # download 10MB files sequentially
+ @pytest.mark.parametrize("proto", ['h2', 'h3'])
+ def test_08_04_download_10mb_sequential(self, env: Env, caddy: Caddy,
+ repeat, proto):
+ if proto == 'h3' and not env.have_h3_curl():
+ pytest.skip("h3 not supported in curl")
+ if proto == 'h3' and env.curl_uses_lib('quiche'):
+ pytest.skip("quiche stalls after a certain amount of data")
+ count = 20
+ curl = CurlClient(env=env)
+ urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'
+ r = curl.http_download(urls=[urln], alpn_proto=proto)
+ assert r.exit_code == 0
+ r.check_stats(count=count, exp_status=200)
+ # sequential transfers will open 1 connection
+ assert r.total_connects == 1
+
+ # download 10MB files parallel
+ @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+ def test_08_05_download_1mb_parallel(self, env: Env, caddy: Caddy,
+ repeat, proto):
+ if proto == 'h3' and not env.have_h3_curl():
+ pytest.skip("h3 not supported in curl")
+ if proto == 'h3' and env.curl_uses_lib('quiche'):
+ pytest.skip("quiche stalls after a certain amount of data")
+ count = 50
+ curl = CurlClient(env=env)
+ urln = f'https://{env.domain1}:{caddy.port}/data10.data?[0-{count-1}]'
+ r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
+ '--parallel'
+ ])
+ assert r.exit_code == 0
+ r.check_stats(count=count, exp_status=200)
+ if proto == 'http/1.1':
+ # http/1.1 parallel transfers will open multiple connections
+ assert r.total_connects > 1
+ else:
+ assert r.total_connects == 1
+
def docs_dir(self):
return self._docs_dir
+ @property
+ def port(self) -> str:
+ return self.env.caddy_https_port
+
def clear_logs(self):
self._rmf(self._error_log)
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
try_until = datetime.now() + timeout
while datetime.now() < try_until:
- check_url = f'https://{self.env.domain1}:{self.env.caddy_port}/'
+ check_url = f'https://{self.env.domain1}:{self.port}/'
r = curl.http_get(url=check_url)
if r.exit_code != 0:
return True
curl = CurlClient(env=self.env, run_dir=self._tmp_dir)
try_until = datetime.now() + timeout
while datetime.now() < try_until:
- check_url = f'https://{self.env.domain1}:{self.env.caddy_port}/'
+ check_url = f'https://{self.env.domain1}:{self.port}/'
r = curl.http_get(url=check_url)
if r.exit_code == 0:
return True
with open(self._conf_file, 'w') as fd:
conf = [ # base server config
f'{{',
- f' https_port {self.env.caddy_port}',
- f' servers :{self.env.caddy_port} {{',
+ f' http_port {self.env.caddy_http_port}',
+ f' https_port {self.env.caddy_https_port}',
+ f' servers :{self.env.caddy_https_port} {{',
f' protocols h3 h2 h1',
f' }}',
f'}}',
- f'{domain1}:{self.env.caddy_port} {{',
+ f'{domain1}:{self.env.caddy_https_port} {{',
f' file_server * {{',
f' root {self._docs_dir}',
f' }}',
args = [self._curl, "-s", "--path-as-is"]
if with_headers:
args.extend(["-D", self._headerfile])
+ if self.env.verbose > 1:
+ args.extend(['--trace', self._tracefile])
if self.env.verbose > 2:
args.extend(['--trace', self._tracefile, '--trace-time'])
log.debug(f'nghttpx -v: {p.stdout}')
self.caddy = self.config['caddy']['caddy']
- if len(self.caddy) == 0:
- self.caddy = 'caddy'
+ if len(self.caddy.strip()) == 0:
+ self.caddy = None
if self.caddy is not None:
try:
p = subprocess.run(args=[self.caddy, 'version'],
self.caddy = None
except:
self.caddy = None
- self.caddy_port = self.config['caddy']['port']
+ self.caddy_http_port = self.config['caddy']['http_port']
+ self.caddy_https_port = self.config['caddy']['https_port']
@property
def httpd_version(self):
def httpd_is_at_least(minv) -> bool:
return Env.CONFIG.httpd_is_at_least(minv)
+ @staticmethod
+ def has_caddy() -> bool:
+ return Env.CONFIG.caddy is not None
+
def __init__(self, pytestconfig=None):
self._verbose = pytestconfig.option.verbose \
if pytestconfig is not None else 0
return self.CONFIG.caddy
@property
- def caddy_port(self) -> str:
- return self.CONFIG.caddy_port
+ def caddy_https_port(self) -> str:
+ return self.CONFIG.caddy_https_port
+
+ @property
+ def caddy_http_port(self) -> str:
+ return self.CONFIG.caddy_http_port
@property
def curl(self) -> str:
try_until = datetime.now() + timeout
while datetime.now() < try_until:
check_url = f'https://{self.env.domain1}:{self.env.h3_port}/'
- r = curl.http_get(url=check_url, extra_args=['--http3-only'])
+ r = curl.http_get(url=check_url, extra_args=[
+ '--http3-only', '--trace', 'curl.trace', '--trace-time'
+ ])
if r.exit_code == 0:
return True
log.debug(f'waiting for nghttpx to become responsive: {r}')