}
}
-/*
- * This specific transfer on this connection has been "drained".
- */
-static void drained_transfer(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- if(data->state.drain) {
- struct cf_h2_ctx *ctx = cf->ctx;
- DEBUGASSERT(ctx->drain_total > 0);
- ctx->drain_total--;
- data->state.drain = 0;
- }
-}
-
-/*
- * Mark this transfer to get "drained".
- */
-static void drain_this(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- if(!data->state.drain) {
- struct cf_h2_ctx *ctx = cf->ctx;
- data->state.drain = 1;
- ctx->drain_total++;
- DEBUGASSERT(ctx->drain_total > 0);
- }
-}
-
/**
* All about the H3 internals of a stream
*/
#define H2_STREAM_ID(d) (H2_STREAM_CTX(d)? \
H2_STREAM_CTX(d)->id : -2)
+/*
+ * Mark this transfer to get "drained".
+ */
+static void drain_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data,
+ struct stream_ctx *stream)
+{
+ int bits;
+
+ (void)cf;
+ bits = CURL_CSELECT_IN;
+ if(stream->upload_left)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
+ Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ }
+}
+
static CURLcode http2_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct stream_ctx **pstream)
(void)nghttp2_session_send(ctx->h2);
}
- drained_transfer(cf, data);
-
/* -1 means unassigned and 0 means cleared */
if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) {
int rv = nghttp2_session_set_stream_user_data(ctx->h2,
while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) {
rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen);
- DEBUGF(LOG_CF(data, cf,
- "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv));
if(rv < 0) {
failf(data,
"process_pending_input: nghttp2_session_mem_recv() returned "
}
Curl_bufq_skip(&ctx->inbufq, (size_t)rv);
if(Curl_bufq_is_empty(&ctx->inbufq)) {
- DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
break;
}
else {
}
}
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ drain_stream(cf, data, stream);
}
break;
case NGHTTP2_HEADERS:
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] %zu header bytes",
stream_id, Curl_bufq_len(&stream->recvbuf)));
- if(CF_DATA_CURRENT(cf) != data) {
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
+ drain_stream(cf, data, stream);
break;
case NGHTTP2_PUSH_PROMISE:
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv PUSH_PROMISE", stream_id));
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv RST", stream_id));
stream->closed = TRUE;
stream->reset = TRUE;
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ drain_stream(cf, data, stream);
break;
case NGHTTP2_WINDOW_UPDATE:
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv WINDOW_UPDATE", stream_id));
if((data->req.keepon & KEEP_SEND_HOLD) &&
(data->req.keepon & KEEP_SEND)) {
data->req.keepon &= ~KEEP_SEND_HOLD;
- drain_this(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ drain_stream(cf, data, stream);
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] un-holding after win update",
stream_id));
}
}
/* if we receive data for another handle, wake that up */
- if(CF_DATA_CURRENT(cf) != data_s) {
- drain_this(cf, data_s);
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
- }
+ drain_stream(cf, data_s, stream);
DEBUGASSERT((size_t)nwritten == len);
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%d] %zd/%zu DATA recvd, "
if(stream->error)
stream->reset = TRUE;
- if(CF_DATA_CURRENT(cf) != data_s) {
- drain_this(cf, data_s);
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
- }
+ drain_stream(cf, data_s, stream);
/* remove `data_s` from the nghttp2 stream */
rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
/* resume sending here to trigger the callback to get called again so
that it can signal EOF to nghttp2 */
(void)nghttp2_session_resume_data(ctx->h2, stream->id);
- drain_this(cf, data);
+ drain_stream(cf, data, stream);
}
out:
struct stream_ctx *stream = H2_STREAM_CTX(data);
ssize_t rv = 0;
- drained_transfer(cf, data);
-
if(stream->error == NGHTTP2_REFUSED_STREAM) {
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] REFUSED_STREAM, try again on a new "
"connection", stream->id));
connclose(cf->conn, "REFUSED_STREAM"); /* don't use this anymore */
data->state.refused_stream = TRUE;
- *err = CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */
+ *err = CURLE_SEND_ERROR; /* trigger Curl_retry_request() later */
+ return -1;
+ }
+ else if(stream->reset) {
+ failf(data, "HTTP/2 stream %u was reset", stream->id);
+ *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
return -1;
}
else if(stream->error != NGHTTP2_NO_ERROR) {
*err = CURLE_HTTP2_STREAM;
return -1;
}
- else if(stream->reset) {
- failf(data, "HTTP/2 stream %u was reset", stream->id);
- *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
- return -1;
- }
if(!stream->bodystarted) {
failf(data, "HTTP/2 stream %u was closed cleanly, but before getting "
ssize_t nread = -1;
*err = CURLE_AGAIN;
- drained_transfer(cf, data);
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
nread = Curl_bufq_read(&stream->recvbuf,
(unsigned char *)buf, len, err);
}
nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
- DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
- Curl_bufq_len(&ctx->inbufq), nread, result));
+ /* DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
+ Curl_bufq_len(&ctx->inbufq), nread, result)); */
if(nread < 0) {
if(result != CURLE_AGAIN) {
failf(data, "Failed receiving HTTP2 data");
if(stream->closed) {
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] closed stream, set drain",
stream->id));
- drain_this(cf, data);
+ drain_stream(cf, data, stream);
}
}
}
if(should_close_session(ctx)) {
- DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
- *err = CURLE_HTTP2;
- nwritten = -1;
+ if(stream->closed) {
+ nwritten = http2_handle_stream_close(cf, data, err);
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+ *err = CURLE_HTTP2;
+ nwritten = -1;
+ }
goto out;
}
}
if(should_close_session(ctx)) {
- DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
- *err = CURLE_HTTP2;
- nwritten = -1;
+ if(stream->closed) {
+ nwritten = http2_handle_stream_close(cf, data, err);
+ }
+ else {
+ DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+ *err = CURLE_HTTP2;
+ nwritten = -1;
+ }
goto out;
}
}
if(maxloops <= 0) {
/* we mark it as read-again-please */
- conn->cselect_bits = CURL_CSELECT_IN;
+ data->state.dselect_bits = CURL_CSELECT_IN;
*comeback = TRUE;
}
CURLcode result;
struct curltime now;
int didwhat = 0;
+ int select_bits;
- curl_socket_t fd_read;
- curl_socket_t fd_write;
- int select_res = conn->cselect_bits;
- conn->cselect_bits = 0;
-
- /* only use the proper socket if the *_HOLD bit is not set simultaneously as
- then we are in rate limiting state in that transfer direction */
-
- if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
- fd_read = conn->sockfd;
- else
- fd_read = CURL_SOCKET_BAD;
-
- if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
- fd_write = conn->writesockfd;
- else
- fd_write = CURL_SOCKET_BAD;
+ if(data->state.dselect_bits) {
+ select_bits = data->state.dselect_bits;
+ data->state.dselect_bits = 0;
+ }
+ else if(conn->cselect_bits) {
+ select_bits = conn->cselect_bits;
+ conn->cselect_bits = 0;
+ }
+ else {
+ curl_socket_t fd_read;
+ curl_socket_t fd_write;
+ /* only use the proper socket if the *_HOLD bit is not set simultaneously
+ as then we are in rate limiting state in that transfer direction */
+ if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
+ fd_read = conn->sockfd;
+ else
+ fd_read = CURL_SOCKET_BAD;
-#if defined(USE_HTTP2) || defined(USE_HTTP3)
- if(data->state.drain) {
- select_res |= CURL_CSELECT_IN;
- DEBUGF(infof(data, "Curl_readwrite: forcibly told to drain data"));
if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
- select_res |= CURL_CSELECT_OUT;
- }
-#endif
+ fd_write = conn->writesockfd;
+ else
+ fd_write = CURL_SOCKET_BAD;
- if(!select_res) /* Call for select()/poll() only, if read/write/error
- status is not known. */
- select_res = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
+ select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
+ }
- if(select_res == CURL_CSELECT_ERR) {
+ if(select_bits == CURL_CSELECT_ERR) {
failf(data, "select/poll returned error");
result = CURLE_SEND_ERROR;
goto out;
#ifdef USE_HYPER
if(conn->datastream) {
- result = conn->datastream(data, conn, &didwhat, done, select_res);
+ result = conn->datastream(data, conn, &didwhat, done, select_bits);
if(result || *done)
goto out;
}
/* We go ahead and do a read if we have a readable socket or if
the stream was rewound (in which case we have data in a
buffer) */
- if((k->keepon & KEEP_RECV) && (select_res & CURL_CSELECT_IN)) {
+ if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) {
result = readwrite_data(data, conn, k, &didwhat, done, comeback);
if(result || *done)
goto out;
}
/* If we still have writing to do, we check if we have a writable socket. */
- if((k->keepon & KEEP_SEND) && (select_res & CURL_CSELECT_OUT)) {
+ if((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) {
/* write */
result = readwrite_upload(data, conn, &didwhat);
char *scratch; /* huge buffer[set.buffer_size*2] for upload CRLF replacing */
long followlocation; /* redirect counter */
int requests; /* request counter: redirects + authentication retakes */
+ int dselect_bits; /* != 0 -> bitmask of socket events for this transfer
+ * overriding anything the socket may report */
#ifdef HAVE_SIGNAL
/* storage for the previous bag^H^H^HSIGPIPE signal handler :-) */
void (*prev_signal)(int sig);
curl_off_t infilesize; /* size of file to upload, -1 means unknown.
Copied from set.filesize at start of operation */
#if defined(USE_HTTP2) || defined(USE_HTTP3)
- size_t drain; /* Increased when this stream has data to read, even if its
- socket is not necessarily is readable. Decreased when
- checked. */
struct Curl_data_priority priority; /* shallow copy of data->set */
#endif
}
}
-static void notify_drain(struct Curl_cfilter *cf,
+static void drain_stream_from_other_thread(struct Curl_easy *data,
+ struct stream_ctx *stream)
+{
+ int bits;
+
+ /* risky */
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
+ /* cannot expire from other thread */
+ }
+}
+
+static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
+ struct stream_ctx *stream = H3_STREAM_CTX(data);
+ int bits;
+
(void)cf;
- if(!data->state.drain) {
- data->state.drain = 1;
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
}
}
- data->state.drain = 1;
+ drain_stream_from_other_thread(data, stream);
msh3_lock_release(&stream->recv_lock);
}
nread = 0;
out:
- data->state.drain = 0;
return nread;
}
if(stream->recv_error) {
failf(data, "request aborted");
- data->state.drain = 0;
*err = stream->recv_error;
goto out;
}
len, nread, *err));
if(nread < 0)
goto out;
- if(!Curl_bufq_is_empty(&stream->recvbuf) ||
- stream->closed) {
- notify_drain(cf, data);
- }
+ if(stream->closed)
+ drain_stream(cf, data);
}
else if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
if(stream->recv_error) {
bitmap |= GETSOCK_READSOCK(0);
- notify_drain(cf, data);
+ drain_stream(cf, data);
}
else if(stream->req) {
bitmap |= GETSOCK_READSOCK(0);
- notify_drain(cf, data);
+ drain_stream(cf, data);
}
}
- DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d",
- (uint32_t)data->state.drain, bitmap));
+ DEBUGF(LOG_CF(data, cf, "select_sock -> %d", bitmap));
CF_DATA_RESTORE(cf, save);
return bitmap;
}
Curl_bufq_len(&stream->recvbuf)));
pending = !Curl_bufq_is_empty(&stream->recvbuf);
msh3_lock_release(&stream->recv_lock);
+ if(pending)
+ drain_stream(cf, (struct Curl_easy *)data);
}
CF_DATA_RESTORE(cf, save);
consumed);
ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
}
- if(!stream->closed && data->state.drain &&
- Curl_bufq_is_empty(&stream->recvbuf)) {
- /* nothing buffered any more */
- data->state.drain = 0;
- }
}
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
return rv;
}
-static void notify_drain(struct Curl_cfilter *cf,
+static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
+ struct stream_ctx *stream = H3_STREAM_CTX(data);
+ int bits;
+
(void)cf;
- if(!data->state.drain) {
- data->state.drain = 1;
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
if(app_error_code == NGHTTP3_H3_INTERNAL_ERROR) {
stream->reset = TRUE;
}
- notify_drain(cf, data);
+ drain_stream(cf, data);
return 0;
}
(void)stream3_id;
result = write_resp_raw(cf, data, buf, buflen, TRUE);
- if(CF_DATA_CURRENT(cf) != data) {
- notify_drain(cf, data);
- }
+ drain_stream(cf, data);
return result? -1 : 0;
}
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
- if(CF_DATA_CURRENT(cf) != data) {
- notify_drain(cf, data);
- }
+ drain_stream(cf, data);
return 0;
}
nread = 0;
out:
- data->state.drain = 0;
return nread;
}
}
if(nread > 0) {
- if(1 || !Curl_bufq_is_empty(&stream->recvbuf)) {
- notify_drain(cf, data);
- }
+ drain_stream(cf, data);
}
else {
if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
goto out;
}
- data->state.drain = FALSE;
*err = CURLE_AGAIN;
nread = -1;
}
if((data->req.keepon & KEEP_SEND_HOLD) &&
(data->req.keepon & KEEP_SEND)) {
data->req.keepon &= ~KEEP_SEND_HOLD;
- notify_drain(cf, data);
+ drain_stream(cf, data);
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks",
stream_id));
}
}
}
-static void notify_drain(struct Curl_cfilter *cf, struct Curl_easy *data)
+static void drain_stream(struct Curl_cfilter *cf,
+ struct Curl_easy *data)
{
+ struct stream_ctx *stream = H3_STREAM_CTX(data);
+ int bits;
+
(void)cf;
- if(!data->state.drain) {
- data->state.drain = 1;
+ bits = CURL_CSELECT_IN;
+ if(stream && !stream->upload_done)
+ bits |= CURL_CSELECT_OUT;
+ if(data->state.dselect_bits != bits) {
+ data->state.dselect_bits = bits;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
}
}
}
else {
result = h3_process_event(cf, sdata, stream3_id, ev);
- if(sdata != data) {
- notify_drain(cf, sdata);
- }
+ drain_stream(cf, sdata);
if(result) {
DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] error processing event %s "
"for [h3sid=%"PRId64"] -> %d",
}
if(nread > 0) {
- data->state.drain = (!Curl_bufq_is_empty(&stream->recvbuf) ||
- stream->closed);
+ if(stream->closed)
+ drain_stream(cf, data);
}
else {
- data->state.drain = FALSE;
if(stream->closed) {
nread = recv_closed_stream(cf, data, err);
goto out;
}
+ else if(quiche_conn_is_draining(ctx->qconn)) {
+ failf(data, "QUIC connection is draining");
+ *err = CURLE_HTTP3;
+ nread = -1;
+ goto out;
+ }
*err = CURLE_AGAIN;
nread = -1;
}
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
- quiche_stream_iter *qiter;
- bool is_writable = FALSE;
- if(!stream)
- return FALSE;
- /* surely, there must be a better way */
- qiter = quiche_conn_writable(ctx->qconn);
- if(qiter) {
- uint64_t stream_id;
- while(quiche_stream_iter_next(qiter, &stream_id)) {
- if(stream_id == (uint64_t)stream->id) {
- is_writable = TRUE;
- break;
- }
- }
- quiche_stream_iter_free(qiter);
- }
- return is_writable;
+ return stream &&
+ quiche_conn_stream_writable(ctx->qconn, (uint64_t)stream->id, 1);
}
static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
}
case CF_CTRL_DATA_IDLE:
result = cf_flush_egress(cf, data);
- DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
+ if(result)
+ DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
break;
default:
break;
;
if(nread == -1) {
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
- DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN"));
goto out;
}
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
assert len(r.stats) == count, f'did not get all stats: {r}'
invalid_stats = []
for idx, s in enumerate(r.stats):
- if 'exitcode' not in s or s['exitcode'] not in [18, 56, 92, 95]:
+ if 'exitcode' not in s or s['exitcode'] not in [18, 55, 56, 92, 95]:
invalid_stats.append(f'request {idx} exit with {s["exitcode"]}\n{s}')
assert len(invalid_stats) == 0, f'failed: {invalid_stats}'