uint64_t used_bidi_streams; /* bidi streams we have opened */
uint64_t max_bidi_streams; /* max bidi streams we can open */
int qlogfd;
+ BIT(conn_closed); /* connection is closed */
};
/* How to access `call_data` from a cf_ngtcp2 filter */
#define H3_STREAM_CTX(ctx,data) ((struct h3_stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
+#define H3_STREAM_CTX_ID(ctx,id) ((struct h3_stream_ctx *)(\
+ Curl_hash_offt_get(&(ctx)->streams, (id))))
static void h3_stream_ctx_free(struct h3_stream_ctx *stream)
{
return CURLE_OK;
}
+static void cf_ngtcp2_stream_close(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct h3_stream_ctx *stream)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ DEBUGASSERT(data);
+ DEBUGASSERT(stream);
+ if(!stream->closed && ctx->qconn && ctx->h3conn) {
+ CURLcode result;
+
+ nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
+ ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
+ stream->closed = TRUE;
+ (void)ngtcp2_conn_shutdown_stream(ctx->qconn, 0, stream->id,
+ NGHTTP3_H3_REQUEST_CANCELLED);
+ result = cf_progress_egress(cf, data, NULL);
+ if(result)
+ CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cancel stream -> %d",
+ stream->id, result);
+ }
+}
+
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
- CURLcode result;
-
(void)cf;
if(stream) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] easy handle is done",
stream->id);
- if(ctx->h3conn && !stream->closed) {
- nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream->id);
- nghttp3_conn_close_stream(ctx->h3conn, stream->id,
- NGHTTP3_H3_REQUEST_CANCELLED);
- nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
- ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
- stream->closed = TRUE;
- result = cf_progress_egress(cf, data, NULL);
- if(result)
- CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
- }
-
+ cf_ngtcp2_stream_close(cf, data, stream);
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
return 0;
}
+static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf,
+ struct Curl_easy *data);
+
+static bool cf_ngtcp2_err_is_fatal(int code)
+{
+ return (NGTCP2_ERR_FATAL >= code) ||
+ (NGTCP2_ERR_DROP_CONN == code) ||
+ (NGTCP2_ERR_IDLE_CLOSE == code);
+}
+
+static void cf_ngtcp2_err_set(struct Curl_cfilter *cf,
+ struct Curl_easy *data, int code)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ if(!ctx->last_error.error_code) {
+ if(NGTCP2_ERR_CRYPTO == code) {
+ ngtcp2_ccerr_set_tls_alert(&ctx->last_error,
+ ngtcp2_conn_get_tls_alert(ctx->qconn),
+ NULL, 0);
+ }
+ else {
+ ngtcp2_ccerr_set_liberr(&ctx->last_error, code, NULL, 0);
+ }
+ }
+ if(cf_ngtcp2_err_is_fatal(code))
+ cf_ngtcp2_conn_close(cf, data);
+}
+
+static bool cf_ngtcp2_h3_err_is_fatal(int code)
+{
+ return (NGHTTP3_ERR_FATAL >= code) ||
+ (NGHTTP3_ERR_H3_CLOSED_CRITICAL_STREAM == code);
+}
+
+static void cf_ngtcp2_h3_err_set(struct Curl_cfilter *cf,
+ struct Curl_easy *data, int code)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ if(!ctx->last_error.error_code) {
+ ngtcp2_ccerr_set_application_error(&ctx->last_error,
+ nghttp3_err_infer_quic_app_error_code(code), NULL, 0);
+ }
+ if(cf_ngtcp2_h3_err_is_fatal(code))
+ cf_ngtcp2_conn_close(cf, data);
+}
+
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
int64_t sid, uint64_t offset,
const uint8_t *buf, size_t buflen,
nghttp3_ssize nconsumed;
int fin = (flags & NGTCP2_STREAM_DATA_FLAG_FIN) ? 1 : 0;
struct Curl_easy *data = stream_user_data;
- struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)offset;
(void)data;
nconsumed =
nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin);
- CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd",
- stream_id, buflen, nconsumed);
+ if(!data)
+ data = CF_DATA_CURRENT(cf);
+ if(data)
+ CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd",
+ stream_id, buflen, nconsumed);
if(nconsumed < 0) {
- /* consume all bytes */
- ngtcp2_conn_extend_max_stream_offset(tconn, stream_id, buflen);
- ngtcp2_conn_extend_max_offset(tconn, buflen);
- if(!data || (stream && stream->reset) ||
- NGHTTP3_ERR_H3_STREAM_CREATION_ERROR == (int)nconsumed) {
- struct Curl_easy *cdata = CF_DATA_CURRENT(cf);
- CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] discard data for stream %s",
- stream_id, (data && stream)? "reset" : "unknown");
- return 0;
+ struct h3_stream_ctx *stream = H3_STREAM_CTX_ID(ctx, stream_id);
+ if(data && stream) {
+ CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] error on known stream, "
+ "reset=%d, closed=%d",
+ stream_id, stream->reset, stream->closed);
}
- ngtcp2_ccerr_set_application_error(
- &ctx->last_error,
- nghttp3_err_infer_quic_app_error_code((int)nconsumed), NULL, 0);
return NGTCP2_ERR_CALLBACK_FAILURE;
}
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = (curl_int64_t)sid;
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
int rv;
(void)tconn;
- (void)data;
/* stream is closed... */
+ if(!data)
+ data = CF_DATA_CURRENT(cf);
+ if(!data)
+ return NGTCP2_ERR_CALLBACK_FAILURE;
if(!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) {
app_error_code = NGHTTP3_H3_NO_ERROR;
}
rv = nghttp3_conn_close_stream(ctx->h3conn, stream_id, app_error_code);
- CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] quic close(err=%"
+ CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] quic close(app_error=%"
CURL_PRIu64 ") -> %d", stream_id, (curl_uint64_t)app_error_code,
rv);
if(rv && rv != NGHTTP3_ERR_STREAM_NOT_FOUND) {
- ngtcp2_ccerr_set_application_error(
- &ctx->last_error, nghttp3_err_infer_quic_app_error_code(rv), NULL, 0);
+ cf_ngtcp2_h3_err_set(cf, data, rv);
return NGTCP2_ERR_CALLBACK_FAILURE;
}
if(rv) {
failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
ngtcp2_strerror(rv));
- ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
+ cf_ngtcp2_err_set(cf, data, rv);
return CURLE_SEND_ERROR;
}
result = cf_progress_ingress(cf, data, pktx);
{
/* If we already encountered an error, skip further writes */
- if(!stream->xfer_result)
+ if(!stream->xfer_result) {
stream->xfer_result = Curl_xfer_write_resp(data, buf, blen, eos);
- /* If the transfer write is errored, we do not want any more data */
- if(stream->xfer_result) {
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
- CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes "
- "of data, cancelling stream",
- stream->id, stream->xfer_result, blen);
- nghttp3_conn_close_stream(ctx->h3conn, stream->id,
- NGHTTP3_H3_REQUEST_CANCELLED);
+ /* If the transfer write is errored, we do not want any more data */
+ if(stream->xfer_result) {
+ CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes "
+ "of data", stream->id, stream->xfer_result, blen);
+ }
}
}
pktx_init(&pktx, cf, data);
- if(!stream) {
+ if(!stream || ctx->conn_closed) {
*err = CURLE_RECV_ERROR;
goto out;
}
if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id);
+ cf_ngtcp2_stream_close(cf, data, stream);
*err = stream->xfer_result;
nread = -1;
goto out;
}
if(!stream || stream->id < 0) {
+ if(ctx->conn_closed) {
+ CURL_TRC_CF(data, cf, "cannot open stream on closed connection");
+ *err = CURLE_SEND_ERROR;
+ sent = -1;
+ goto out;
+ }
sent = h3_stream_open(cf, data, buf, len, err);
if(sent < 0) {
CURL_TRC_CF(data, cf, "failed to open stream -> %d", *err);
}
stream = H3_STREAM_CTX(ctx, data);
}
+ else if(stream->xfer_result) {
+ CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id);
+ cf_ngtcp2_stream_close(cf, data, stream);
+ *err = stream->xfer_result;
+ sent = -1;
+ goto out;
+ }
else if(stream->upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
sent = -1;
goto out;
}
+ else if(ctx->conn_closed) {
+ CURL_TRC_CF(data, cf, "cannot send on closed connection");
+ *err = CURLE_SEND_ERROR;
+ sent = -1;
+ goto out;
+ }
else {
sent = Curl_bufq_write(&stream->sendbuf, buf, len, err);
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cf_send, add to "
if(rv) {
CURL_TRC_CF(pktx->data, pktx->cf, "ingress, read_pkt -> %s (%d)",
ngtcp2_strerror(rv), rv);
- if(!ctx->last_error.error_code) {
- if(rv == NGTCP2_ERR_CRYPTO) {
- ngtcp2_ccerr_set_tls_alert(&ctx->last_error,
- ngtcp2_conn_get_tls_alert(ctx->qconn),
- NULL, 0);
- }
- else {
- ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
- }
- }
+ cf_ngtcp2_err_set(pktx->cf, pktx->data, rv);
if(rv == NGTCP2_ERR_CRYPTO)
/* this is a "TLS problem", but a failed certificate verification
if(veccnt < 0) {
failf(x->data, "nghttp3_conn_writev_stream returned error: %s",
nghttp3_strerror((int)veccnt));
- ngtcp2_ccerr_set_application_error(
- &ctx->last_error,
- nghttp3_err_infer_quic_app_error_code((int)veccnt), NULL, 0);
+ cf_ngtcp2_h3_err_set(x->cf, x->data, (int)veccnt);
*err = CURLE_SEND_ERROR;
return -1;
}
DEBUGASSERT(ndatalen == -1);
failf(x->data, "ngtcp2_conn_writev_stream returned error: %s",
ngtcp2_strerror((int)n));
- ngtcp2_ccerr_set_liberr(&ctx->last_error, (int)n, NULL, 0);
+ cf_ngtcp2_err_set(x->cf, x->data, (int)n);
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
ctx->call_data = save;
}
-static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data)
+static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct cf_call_data save;
-
- CF_DATA_SAVE(save, cf, data);
- if(ctx && ctx->qconn) {
+ if(ctx && ctx->qconn && !ctx->conn_closed) {
char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE];
struct pkt_io_ctx pktx;
ngtcp2_ssize rc;
- CURL_TRC_CF(data, cf, "close");
+ ctx->conn_closed = TRUE;
pktx_init(&pktx, cf, data);
rc = ngtcp2_conn_write_connection_close(ctx->qconn, NULL, /* path */
NULL, /* pkt_info */
(uint8_t *)buffer, sizeof(buffer),
&ctx->last_error, pktx.ts);
+ CURL_TRC_CF(data, cf, "closing connection(err_type=%d, err_code=%"
+ CURL_PRIu64 ") -> %d", ctx->last_error.type,
+ (curl_uint64_t)ctx->last_error.error_code, (int)rc);
if(rc > 0) {
while((send(ctx->q.sockfd, buffer, (SEND_TYPE_ARG3)rc, 0) == -1) &&
SOCKERRNO == EINTR);
}
+ }
+}
+
+static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ struct cf_call_data save;
+ CF_DATA_SAVE(save, cf, data);
+ if(ctx && ctx->qconn) {
+ cf_ngtcp2_conn_close(cf, data);
cf_ngtcp2_ctx_clear(ctx);
+ CURL_TRC_CF(data, cf, "close");
}
-
cf->connected = FALSE;
CF_DATA_RESTORE(cf, save);
}
* by callback. QUIC counts the number over the lifetime of the
* connection, ever increasing.
* We count the *open* transfers plus the budget for new ones. */
- if(ctx->max_bidi_streams) {
+ if(!ctx->qconn || ctx->conn_closed) {
+ *pres1 = 0;
+ }
+ else if(ctx->max_bidi_streams) {
uint64_t avail_bidi_streams = 0;
uint64_t max_streams = CONN_INUSE(cf->conn);
if(ctx->max_bidi_streams > ctx->used_bidi_streams)
const ngtcp2_transport_params *rp;
struct cf_call_data save;
- CF_DATA_SAVE(save, cf, data);
+ CF_DATA_SAVE(save, cf, data);
*input_pending = FALSE;
- if(!ctx->qconn)
+ if(!ctx->qconn || ctx->conn_closed)
goto out;
/* Both sides of the QUIC connection announce they max idle times in
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]'
r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True)
- r.check_stats(count=count, http_status=200)
- for s in r.stats:
- self.check_stat(s, dl_size=30, ul_size=0)
+ r.check_stats(count=count, http_status=200, exitcode=0)
+ for idx, s in enumerate(r.stats):
+ self.check_stat(idx, s, r, dl_size=30, ul_size=0)
# download plain file with a 302 redirect
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True, extra_args=[
'--location'
])
- r.check_stats(count=count, http_status=200)
- for s in r.stats:
- self.check_stat(s, dl_size=30, ul_size=0)
+ r.check_stats(count=count, http_status=200, exitcode=0)
+ for idx, s in enumerate(r.stats):
+ self.check_stat(idx, s, r, dl_size=30, ul_size=0)
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_16_03_info_upload(self, env: Env, httpd, nghttpx, proto, repeat):
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]'
r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto,
- with_headers=True)
+ with_headers=True, extra_args=[
+ '--trace-config', 'http/2,http/3'
+ ])
r.check_response(count=count, http_status=200)
- r.check_stats(count=count, http_status=200)
- for s in r.stats:
- self.check_stat(s, dl_size=fsize, ul_size=fsize)
+ r.check_stats(count=count, http_status=200, exitcode=0)
+ for idx, s in enumerate(r.stats):
+ self.check_stat(idx, s, r, dl_size=fsize, ul_size=fsize)
# download plain file via http: ('time_appconnect' is 0)
@pytest.mark.parametrize("proto", ['http/1.1'])
curl = CurlClient(env=env)
url = f'http://{env.domain1}:{env.http_port}/data.json?[0-{count-1}]'
r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True)
- r.check_stats(count=count, http_status=200)
- for s in r.stats:
- self.check_stat(s, dl_size=30, ul_size=0)
+ r.check_stats(count=count, http_status=200, exitcode=0)
+ for idx, s in enumerate(r.stats):
+ self.check_stat(idx, s, r, dl_size=30, ul_size=0)
- def check_stat(self, s, dl_size=None, ul_size=None):
+ def check_stat(self, idx, s, r, dl_size=None, ul_size=None):
self.check_stat_times(s)
# we always send something
self.check_stat_positive(s, 'size_request')
# we always receive response headers
self.check_stat_positive(s, 'size_header')
if ul_size is not None:
- assert s['size_upload'] == ul_size # the file we sent
- assert s['size_request'] >= s['size_upload'], f'"size_request" smaller than "size_upload", {s}'
+ assert s['size_upload'] == ul_size, f'stat #{idx}\n{r.dump_logs()}' # the file we sent
+ assert s['size_request'] >= s['size_upload'], \
+ f'stat #{idx}, "size_request" smaller than "size_upload", {s}\n{r.dump_logs()}'
if dl_size is not None:
- assert s['size_download'] == dl_size # the file we received
+ assert s['size_download'] == dl_size, f'stat #{idx}\n{r.dump_logs()}' # the file we received
def check_stat_positive(self, s, key):
assert key in s, f'stat "{key}" missing: {s}'