struct Curl_hash streams; /* hash `data->id` to `h3_stream_ctx` */
size_t max_stream_window; /* max flow window for one stream */
uint64_t max_idle_ms; /* max idle time for QUIC connection */
+ uint64_t used_bidi_streams; /* bidi streams we have opened */
+ uint64_t max_bidi_streams; /* max bidi streams we can open */
int qlogfd;
};
#define CF_CTX_CALL_DATA(cf) \
((struct cf_ngtcp2_ctx *)(cf)->ctx)->call_data
+struct pkt_io_ctx;
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx);
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx);
+
/**
* All about the H3 internals of a stream
*/
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
+ CURLcode result;
(void)cf;
if(stream) {
nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
stream->closed = TRUE;
+ result = cf_progress_egress(cf, data, NULL);
+ if(result)
+ CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
}
Curl_hash_offt_remove(&ctx->streams, data->id);
pktx_update_time(pktx, cf);
}
-static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct pkt_io_ctx *pktx);
-static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct pkt_io_ctx *pktx);
static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
uint64_t datalen, void *user_data,
void *stream_user_data);
uint64_t max_streams,
void *user_data)
{
- (void)tconn;
- (void)max_streams;
- (void)user_data;
+ struct Curl_cfilter *cf = user_data;
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ struct Curl_easy *data = CF_DATA_CURRENT(cf);
+ (void)tconn;
+ ctx->max_bidi_streams = max_streams;
+ if(data)
+ CURL_TRC_CF(data, cf, "max bidi streams now %" CURL_PRIu64
+ ", used %" CURL_PRIu64, (curl_uint64_t)ctx->max_bidi_streams,
+ (curl_uint64_t)ctx->used_bidi_streams);
return 0;
}
goto out;
}
stream->id = (curl_int64_t)sid;
+ ++ctx->used_bidi_streams;
switch(data->state.httpreq) {
case HTTPREQ_POST:
switch(query) {
case CF_QUERY_MAX_CONCURRENT: {
- const ngtcp2_transport_params *rp;
DEBUGASSERT(pres1);
-
CF_DATA_SAVE(save, cf, data);
- rp = ngtcp2_conn_get_remote_transport_params(ctx->qconn);
- if(rp)
- *pres1 = (rp->initial_max_streams_bidi > INT_MAX)?
- INT_MAX : (int)rp->initial_max_streams_bidi;
- else /* not arrived yet? */
+ /* Set after transport params arrived and continually updated
+ * by callback. QUIC counts the number over the lifetime of the
+ * connection, ever increasing.
+ * We count the *open* transfers plus the budget for new ones. */
+ if(ctx->max_bidi_streams) {
+ uint64_t avail_bidi_streams = 0;
+ uint64_t max_streams = CONN_INUSE(cf->conn);
+ if(ctx->max_bidi_streams > ctx->used_bidi_streams)
+ avail_bidi_streams = ctx->max_bidi_streams - ctx->used_bidi_streams;
+ max_streams += avail_bidi_streams;
+ *pres1 = (max_streams > INT_MAX)? INT_MAX : (int)max_streams;
+ }
+ else /* transport params not arrived yet? take our default. */
*pres1 = Curl_multi_max_concurrent_streams(data->multi);
- CURL_TRC_CF(data, cf, "query max_conncurrent -> %d", *pres1);
+ CURL_TRC_CF(data, cf, "query conn[%" CURL_FORMAT_CURL_OFF_T "]: "
+ "MAX_CONCURRENT -> %d (%zu in use)",
+ cf->conn->connection_id, *pres1, CONN_INUSE(cf->conn));
CF_DATA_RESTORE(cf, save);
return CURLE_OK;
}
}
}
+static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data);
+
/**
* All about the H3 internals of a stream
*/
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
+ CURLcode result;
(void)cf;
if(stream) {
stream->send_closed = TRUE;
}
stream->closed = TRUE;
+ result = cf_flush_egress(cf, data);
+ if(result)
+ CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
}
Curl_hash_offt_remove(&ctx->streams, data->id);
}
return NULL;
}
-static void cf_quiche_expire_conn_transfers(struct Curl_cfilter *cf,
- struct Curl_easy *data)
+static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
struct Curl_easy *sdata;
DEBUGASSERT(data->multi);
- CURL_TRC_CF(data, cf, "expiring all transfers on this connection");
+ CURL_TRC_CF(data, cf, "conn closed, expire all transfers");
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if(sdata == data || sdata->conn != data->conn)
continue;
+ CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
}
}
&recv_info);
if(nread < 0) {
if(QUICHE_ERR_DONE == nread) {
+ if(quiche_conn_is_draining(ctx->qconn)) {
+ CURL_TRC_CF(r->data, r->cf, "ingress, connection is draining");
+ return CURLE_RECV_ERROR;
+ }
+ if(quiche_conn_is_closed(ctx->qconn)) {
+ CURL_TRC_CF(r->data, r->cf, "ingress, connection is closed");
+ return CURLE_RECV_ERROR;
+ }
CURL_TRC_CF(r->data, r->cf, "ingress, quiche is DONE");
return CURLE_OK;
}
failf(data, "connection closed by server");
/* Connection timed out, expire all transfers belonging to it
* as will not get any more POLL events here. */
- cf_quiche_expire_conn_transfers(cf, data);
+ cf_quiche_expire_conn_closed(cf, data);
return CURLE_SEND_ERROR;
}
}
max_streams += quiche_conn_peer_streams_left_bidi(ctx->qconn);
}
*pres1 = (max_streams > INT_MAX)? INT_MAX : (int)max_streams;
- CURL_TRC_CF(data, cf, "query: MAX_CONCURRENT -> %d", *pres1);
+ CURL_TRC_CF(data, cf, "query conn[%" CURL_FORMAT_CURL_OFF_T "]: "
+ "MAX_CONCURRENT -> %d (%zu in use)",
+ cf->conn->connection_id, *pres1, CONN_INUSE(cf->conn));
return CURLE_OK;
}
case CF_QUERY_CONNECT_REPLY_MS: