struct bufq sendbuf; /* h3 request body */
struct bufq recvbuf; /* h3 response body */
size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
+ size_t upload_blocked_len; /* the amount written last and EGAINed */
size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
uint64_t error3; /* HTTP/3 stream error code */
curl_off_t upload_left; /* number of request bytes left to upload */
ngtcp2_path_storage_zero(&pktx->ps);
}
-static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx);
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
-static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct pkt_io_ctx *pktx);
static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
uint64_t datalen, void *user_data,
void *stream_user_data);
NULL, /* early_data_rejected */
};
+/**
+ * Connection maintenance like timeouts on packet ACKs etc. are done by us, not
+ * the OS like for TCP. POLL events on the socket therefore are not
+ * sufficient.
+ * ngtcp2 tells us when it wants to be invoked again. We handle that via
+ * the `Curl_expire()` mechanisms.
+ */
+static CURLcode check_and_set_expiry(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
+{
+ struct cf_ngtcp2_ctx *ctx = cf->ctx;
+ struct pkt_io_ctx local_pktx;
+ ngtcp2_tstamp expiry;
+ ngtcp2_duration timeout;
+
+ if(!pktx) {
+ pktx_init(&local_pktx, cf, data);
+ pktx = &local_pktx;
+ }
+ else {
+ pktx->ts = timestamp();
+ }
+
+ expiry = ngtcp2_conn_get_expiry(ctx->qconn);
+ if(expiry != UINT64_MAX) {
+ if(expiry <= pktx->ts) {
+ CURLcode result;
+ int rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
+ if(rv) {
+ failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
+ ngtcp2_strerror(rv));
+ ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
+ return CURLE_SEND_ERROR;
+ }
+ timeout = 0;
+ result = cf_progress_ingress(cf, data, pktx);
+ if(result)
+ return result;
+ result = cf_progress_egress(cf, data, pktx);
+ if(result)
+ return result;
+ /* ask again, things might have changed */
+ expiry = ngtcp2_conn_get_expiry(ctx->qconn);
+ }
+
+ if(expiry > pktx->ts) {
+ timeout = expiry - pktx->ts;
+ if(timeout % NGTCP2_MILLISECONDS) {
+ timeout += NGTCP2_MILLISECONDS;
+ }
+ Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
+ }
+ }
+ return CURLE_OK;
+}
+
static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
struct Curl_easy *data,
curl_socket_t *socks)
(void)cf;
bits = CURL_CSELECT_IN;
- if(stream && !stream->send_closed && stream->upload_left)
+ if(stream && stream->upload_left && !stream->send_closed)
bits |= CURL_CSELECT_OUT;
if(data->state.dselect_bits != bits) {
data->state.dselect_bits = bits;
report_consumed_data(cf, data, nread);
}
- if(cf_process_ingress(cf, data, &pktx)) {
+ if(cf_progress_ingress(cf, data, &pktx)) {
*err = CURLE_RECV_ERROR;
nread = -1;
goto out;
}
out:
- if(cf_flush_egress(cf, data, &pktx)) {
+ if(cf_progress_egress(cf, data, &pktx)) {
*err = CURLE_SEND_ERROR;
nread = -1;
}
+ else {
+ CURLcode result2 = check_and_set_expiry(cf, data, &pktx);
+ if(result2) {
+ *err = result2;
+ nread = -1;
+ }
+ }
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) -> %zd, %d",
stream? stream->id : -1, len, nread, *err));
CF_DATA_RESTORE(cf, save);
Curl_bufq_skip(&stream->sendbuf, skiplen);
stream->sendbuf_len_in_flight -= skiplen;
- /* `sendbuf` *might* now have more room. If so, resume this
- * possibly paused stream. And also tell our transfer engine that
- * it may continue KEEP_SEND if told to PAUSE. */
- if(!Curl_bufq_is_full(&stream->sendbuf)) {
+ /* Everything ACKed, we resume upload processing */
+ if(!stream->sendbuf_len_in_flight) {
int rv = nghttp3_conn_resume_stream(conn, stream_id);
if(rv) {
return NGTCP2_ERR_CALLBACK_FAILURE;
else
/* data sending without specifying the data amount up front */
stream->upload_left = -1; /* unknown */
- reader.read_data = cb_h3_read_req_body;
- preader = &reader;
break;
default:
/* there is not request body */
stream->upload_left = 0; /* no request body */
- preader = NULL;
break;
}
+ stream->send_closed = (stream->upload_left == 0);
+ if(!stream->send_closed) {
+ reader.read_data = cb_h3_read_req_body;
+ preader = &reader;
+ }
+
rc = nghttp3_conn_submit_request(ctx->h3conn, stream->id,
nva, nheader, preader, data);
if(rc) {
ssize_t sent = 0;
struct cf_call_data save;
struct pkt_io_ctx pktx;
+ CURLcode result;
CF_DATA_SAVE(save, cf, data);
DEBUGASSERT(cf->connected);
pktx_init(&pktx, cf, data);
*err = CURLE_OK;
- if(stream && stream->closed) {
- *err = CURLE_HTTP3;
+ result = cf_progress_ingress(cf, data, &pktx);
+ if(result) {
+ *err = result;
sent = -1;
- goto out;
}
if(!stream || stream->id < 0) {
goto out;
}
}
+ else if(stream->upload_blocked_len) {
+ /* the data in `buf` has alread been submitted or added to the
+ * buffers, but have been EAGAINed on the last invocation. */
+ DEBUGASSERT(len >= stream->upload_blocked_len);
+ if(len < stream->upload_blocked_len) {
+ /* Did we get called again with a smaller `len`? This should not
+ * happen. We are not prepared to handle that. */
+ failf(data, "HTTP/3 send again with decreased length");
+ *err = CURLE_HTTP3;
+ sent = -1;
+ goto out;
+ }
+ sent = (ssize_t)stream->upload_blocked_len;
+ stream->upload_blocked_len = 0;
+ }
+ else if(stream->closed) {
+ *err = CURLE_HTTP3;
+ sent = -1;
+ goto out;
+ }
else {
sent = Curl_bufq_write(&stream->sendbuf, buf, len, err);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send, add to "
"sendbuf(len=%zu) -> %zd, %d",
stream->id, len, sent, *err));
if(sent < 0) {
- if(*err == CURLE_AGAIN) {
- /* Can't add more to the send buf, needs to drain first.
- * Pause the sending to avoid a busy loop. */
- data->req.keepon |= KEEP_SEND_HOLD;
- DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] pause send",
- stream->id));
- }
goto out;
}
(void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id);
}
- if(cf_flush_egress(cf, data, &pktx)) {
- *err = CURLE_SEND_ERROR;
+ result = cf_progress_egress(cf, data, &pktx);
+ if(result) {
+ *err = result;
sent = -1;
- goto out;
+ }
+
+ if(stream && sent > 0 && stream->sendbuf_len_in_flight) {
+ /* We have unacknowledged DATA and cannot report success to our
+ * caller. Instead we EAGAIN and remember how much we have already
+ * "written" into our various internal connection buffers.
+ * We put the stream upload on HOLD, until this gets ACKed. */
+ stream->upload_blocked_len = sent;
+ DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu), "
+ "%zu bytes in flight -> EGAIN", stream->id, len,
+ stream->sendbuf_len_in_flight));
+ *err = CURLE_AGAIN;
+ sent = -1;
+ data->req.keepon |= KEEP_SEND_HOLD;
}
out:
+ result = check_and_set_expiry(cf, data, &pktx);
+ if(result) {
+ *err = result;
+ sent = -1;
+ }
CF_DATA_RESTORE(cf, save);
return sent;
}
return CURLE_OK;
}
-static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct pkt_io_ctx *pktx)
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct pkt_io_ctx local_pktx;
size_t pkts_chunk = 128, i;
size_t pkts_max = 10 * pkts_chunk;
- CURLcode result;
+ CURLcode result = CURLE_OK;
if(!pktx) {
pktx_init(&local_pktx, cf, data);
if(pktx->pkt_count < pkts_chunk) /* got less than we could */
break;
/* give egress a chance before we receive more */
- result = cf_flush_egress(cf, data, pktx);
+ result = cf_progress_egress(cf, data, pktx);
+ if(result) /* error */
+ break;
}
return result;
}
return nwritten;
}
-static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct pkt_io_ctx *pktx)
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct pkt_io_ctx *pktx)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
- int rv;
ssize_t nread;
size_t max_payload_size, path_max_payload_size, max_pktcnt;
size_t pktcnt = 0;
size_t gsolen = 0; /* this disables gso until we have a clue */
- ngtcp2_tstamp expiry;
- ngtcp2_duration timeout;
CURLcode curlcode;
struct pkt_io_ctx local_pktx;
ngtcp2_path_storage_zero(&pktx->ps);
}
- rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
- if(rv) {
- failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
- ngtcp2_strerror(rv));
- ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
- return CURLE_SEND_ERROR;
- }
-
curlcode = vquic_flush(cf, data, &ctx->q);
if(curlcode) {
if(curlcode == CURLE_AGAIN) {
}
out:
- /* non-errored exit. check when we should run again. */
- expiry = ngtcp2_conn_get_expiry(ctx->qconn);
- if(expiry != UINT64_MAX) {
- if(expiry <= pktx->ts) {
- timeout = 0;
- }
- else {
- timeout = expiry - pktx->ts;
- if(timeout % NGTCP2_MILLISECONDS) {
- timeout += NGTCP2_MILLISECONDS;
- }
- }
- Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
- }
-
return CURLE_OK;
}
break;
}
case CF_CTRL_DATA_IDLE:
- if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) {
- if(cf_flush_egress(cf, data, NULL)) {
- result = CURLE_SEND_ERROR;
- }
- }
+ result = check_and_set_expiry(cf, data, NULL);
break;
default:
break;
result = cf_connect_start(cf, data, &pktx);
if(result)
goto out;
- result = cf_flush_egress(cf, data, &pktx);
+ result = cf_progress_egress(cf, data, &pktx);
/* we do not expect to be able to recv anything yet */
goto out;
}
- result = cf_process_ingress(cf, data, &pktx);
+ result = cf_progress_ingress(cf, data, &pktx);
if(result)
goto out;
- result = cf_flush_egress(cf, data, &pktx);
+ result = cf_progress_egress(cf, data, &pktx);
if(result)
goto out;
r_ip, r_port, curl_easy_strerror(result));
}
#endif
+ if(!result && ctx->qconn) {
+ result = check_and_set_expiry(cf, data, &pktx);
+ }
DEBUGF(LOG_CF(data, cf, "connect -> %d, done=%d", result, *done));
CF_DATA_RESTORE(cf, save);
return result;
not in use by any other transfer, there shouldn't be any data here,
only "protocol frames" */
*input_pending = FALSE;
- if(cf_process_ingress(cf, data, NULL))
+ if(cf_progress_ingress(cf, data, NULL))
alive = FALSE;
else {
alive = TRUE;