struct tunnel_stream *tunnel)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
- unsigned char bits;
-
(void)cf;
- bits = CURL_CSELECT_IN;
if(!tunnel->closed && !tunnel->reset &&
!Curl_bufq_is_empty(&ctx->tunnel.sendbuf))
- bits |= CURL_CSELECT_OUT;
- if(data->state.select_bits != bits) {
- CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
- tunnel->stream_id, bits);
- data->state.select_bits = bits;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
+ Curl_multi_mark_dirty(data);
}
static ssize_t proxy_nw_in_reader(void *reader_ctx,
/* next attempt was started */
CURL_TRC_CF(data, cf, "%s trying next", baller->name);
++ongoing;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data);
}
}
}
if(!dohp->pending) {
/* DoH completed, run the transfer picking up the results */
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data);
}
}
}
Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* reset the too-slow time keeper */
data->state.keeps_speed.tv_sec = 0;
- /* Simulate socket events on next run for unpaused directions */
- if(!send_paused_new)
- data->state.select_bits |= CURL_CSELECT_OUT;
- if(!recv_paused_new)
- data->state.select_bits |= CURL_CSELECT_IN;
/* On changes, tell application to update its timers. */
if(changed && data->multi) {
if(Curl_update_timer(data->multi) && !result)
}
#endif /* !NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE */
-/*
- * Mark this transfer to get "drained".
- */
-static void drain_stream(struct Curl_cfilter *cf,
- struct Curl_easy *data,
- struct h2_stream_ctx *stream)
-{
- unsigned char bits;
-
- (void)cf;
- bits = CURL_CSELECT_IN;
- if(!stream->closed &&
- (!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
- bits |= CURL_CSELECT_OUT;
- if(stream->closed || (data->state.select_bits != bits)) {
- CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
- stream->id, bits);
- data->state.select_bits = bits;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
-}
static CURLcode http2_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data,
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
break;
case NGHTTP2_PUSH_PROMISE:
rv = push_promise(cf, data, &frame->push_promise);
if(frame->rst_stream.error_code) {
stream->reset = TRUE;
}
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
break;
case NGHTTP2_WINDOW_UPDATE:
if(CURL_WANT_SEND(data) && Curl_bufq_is_empty(&stream->sendbuf)) {
/* need more data, force processing of transfer */
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
}
else if(!Curl_bufq_is_empty(&stream->sendbuf)) {
/* resume the potentially suspended stream */
stream->id, NGHTTP2_STREAM_CLOSED);
stream->closed = TRUE;
}
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
}
return CURLE_OK;
}
* window and *assume* that we treat this like a WINDOW_UPDATE. Some
* servers send an explicit WINDOW_UPDATE, but not all seem to do that.
* To be safe, we UNHOLD a stream in order not to stall. */
- if(CURL_WANT_SEND(data)) {
- struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
- if(stream)
- drain_stream(cf, data, stream);
- }
+ if(CURL_WANT_SEND(data))
+ Curl_multi_mark_dirty(data);
}
break;
}
stream_id, nghttp2_http2_strerror(error_code), error_code);
else
CURL_TRC_CF(data_s, cf, "[%d] CLOSED", stream_id);
- drain_stream(cf, data_s, stream);
+ Curl_multi_mark_dirty(data_s);
/* remove `data_s` from the nghttp2 stream */
rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
}
/* if we receive data for another handle, wake that up */
if(CF_DATA_CURRENT(cf) != data_s)
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data_s);
CURL_TRC_CF(data_s, cf, "[%d] status: HTTP/2 %03d",
stream->id, stream->status_code);
}
/* if we receive data for another handle, wake that up */
if(CF_DATA_CURRENT(cf) != data_s)
- Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data_s);
CURL_TRC_CF(data_s, cf, "[%d] header: %.*s: %.*s",
stream->id, (int)namelen, name, (int)valuelen, value);
* this may leave data in underlying buffers that will not
* be consumed. */
if(!cf->next || !cf->next->cft->has_data_pending(cf->next, data))
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
break;
}
nghttp2_session_consume(ctx->h2, stream->id, *pnread);
if(stream->closed) {
CURL_TRC_CF(data, cf, "[%d] DRAIN closed stream", stream->id);
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
}
}
* monitor the socket for POLLOUT, but when not SENDING
* any more, we force processing of the transfer. */
if(!CURL_WANT_SEND(data))
- drain_stream(cf, data, stream);
+ Curl_multi_mark_dirty(data);
}
else if(r2) {
result = r2;
* not. We may have already buffered and exhausted the new window
* by operating on things in flight during the handling of other
* transfers. */
- drain_stream(cf, data, stream);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data);
}
CURL_TRC_CF(data, cf, "[%d] stream now %spaused", stream->id,
pause ? "" : "un");
else {
/* IMAP download */
data->req.maxdownload = size;
- /* force a recv/send check of this connection, as the data might've been
- read off the socket already */
- data->state.select_bits = CURL_CSELECT_IN;
Curl_xfer_setup1(data, CURL_XFER_RECV, size, FALSE);
}
}
Curl_multi_ev_init(multi, ev_hashsize);
Curl_uint_tbl_init(&multi->xfers, NULL);
Curl_uint_bset_init(&multi->process);
+ Curl_uint_bset_init(&multi->dirty);
Curl_uint_bset_init(&multi->pending);
Curl_uint_bset_init(&multi->msgsent);
Curl_hash_init(&multi->proto_hash, 23,
if(Curl_uint_bset_resize(&multi->process, xfer_table_size) ||
Curl_uint_bset_resize(&multi->pending, xfer_table_size) ||
+ Curl_uint_bset_resize(&multi->dirty, xfer_table_size) ||
Curl_uint_bset_resize(&multi->msgsent, xfer_table_size) ||
Curl_uint_tbl_resize(&multi->xfers, xfer_table_size))
goto error;
}
Curl_uint_bset_destroy(&multi->process);
+ Curl_uint_bset_destroy(&multi->dirty);
Curl_uint_bset_destroy(&multi->pending);
Curl_uint_bset_destroy(&multi->msgsent);
Curl_uint_tbl_destroy(&multi->xfers);
* to work properly when larger than the table, but not
* the other way around. */
if(Curl_uint_bset_resize(&multi->process, newsize) ||
+ Curl_uint_bset_resize(&multi->dirty, newsize) ||
Curl_uint_bset_resize(&multi->pending, newsize) ||
Curl_uint_bset_resize(&multi->msgsent, newsize) ||
Curl_uint_tbl_resize(&multi->xfers, newsize))
return CURLM_ABORTED_BY_CALLBACK;
multi->dead = FALSE;
Curl_uint_bset_clear(&multi->process);
+ Curl_uint_bset_clear(&multi->dirty);
Curl_uint_bset_clear(&multi->pending);
Curl_uint_bset_clear(&multi->msgsent);
}
DEBUGASSERT(Curl_uint_tbl_contains(&multi->xfers, mid));
Curl_uint_tbl_remove(&multi->xfers, mid);
Curl_uint_bset_remove(&multi->process, mid);
+ Curl_uint_bset_remove(&multi->dirty, mid);
Curl_uint_bset_remove(&multi->pending, mid);
Curl_uint_bset_remove(&multi->msgsent, mid);
data->multi = NULL;
(Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still "
- "buffered input to consume -> EXPIRE_RUN_NOW", caller);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ "buffered input to consume -> mark as dirty", caller);
+ Curl_multi_mark_dirty(data);
}
switch(ps->num) {
}
}
}
- else if(data->state.select_bits && !Curl_xfer_is_blocked(data)) {
- /* This avoids CURLM_CALL_MULTI_PERFORM so that a very fast transfer does
- not get stuck on this transfer at the expense of other concurrent
- transfers */
- CURL_TRC_M(data, "EXPIRE_RUN_NOW unblocked, select_bits=%x",
- data->state.select_bits);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
free(newurl);
*resultp = result;
return rc;
multi_warn_debug(multi, data);
+ /* transfer runs now, clear the dirty bit. This may be set
+ * again during processing, triggering a re-run later. */
+ Curl_uint_bset_remove(&multi->dirty, data->mid);
+
do {
/* A "stream" here is a logical stream if the protocol can handle that
(HTTP/2), or the full connection for older protocols */
}
#endif
Curl_uint_bset_destroy(&multi->process);
+ Curl_uint_bset_destroy(&multi->dirty);
Curl_uint_bset_destroy(&multi->pending);
Curl_uint_bset_destroy(&multi->msgsent);
Curl_uint_tbl_destroy(&multi->xfers);
bool run_cpool;
};
-static CURLMcode multi_run_expired(struct multi_run_ctx *mrc)
+static void multi_mark_expired_as_dirty(struct multi_run_ctx *mrc)
{
struct Curl_multi *multi = mrc->multi;
struct Curl_easy *data = NULL;
struct Curl_tree *t = NULL;
- CURLMcode result = CURLM_OK;
/*
* The loop following here will go on as long as there are expire-times left
extracts a matching node if there is one */
multi->timetree = Curl_splaygetbest(mrc->now, multi->timetree, &t);
if(!t)
- goto out;
+ return;
data = Curl_splayget(t); /* assign this for next loop */
if(!data)
continue;
(void)add_next_timeout(mrc->now, multi, data);
- if(data == multi->admin) {
- mrc->run_cpool = TRUE;
- continue;
- }
+ Curl_multi_mark_dirty(data);
+ }
+}
- mrc->run_xfers++;
- sigpipe_apply(data, &mrc->pipe_st);
- result = multi_runsingle(multi, &mrc->now, data);
+static CURLMcode multi_run_dirty(struct multi_run_ctx *mrc)
+{
+ struct Curl_multi *multi = mrc->multi;
+ CURLMcode result = CURLM_OK;
+ unsigned int mid;
- if(CURLM_OK >= result) {
- /* reassess event handling of data */
- result = Curl_multi_ev_assess_xfer(multi, data);
- if(result)
- goto out;
+ if(Curl_uint_bset_first(&multi->dirty, &mid)) {
+ do {
+ struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
+ if(data) {
+ CURL_TRC_M(data, "multi_run_dirty");
+
+ if(data == multi->admin) {
+ Curl_uint_bset_remove(&multi->dirty, mid);
+ mrc->run_cpool = TRUE;
+ continue;
+ }
+
+ mrc->run_xfers++;
+ sigpipe_apply(data, &mrc->pipe_st);
+ /* runsingle() clears the dirty mid */
+ result = multi_runsingle(multi, &mrc->now, data);
+
+ if(CURLM_OK >= result) {
+ /* reassess event handling of data */
+ result = Curl_multi_ev_assess_xfer(multi, data);
+ if(result)
+ goto out;
+ }
+ }
+ else {
+ CURL_TRC_M(multi->admin, "multi_run_dirty, %u no longer found", mid);
+ Curl_uint_bset_remove(&multi->dirty, mid);
+ }
}
+ while(Curl_uint_bset_next(&multi->dirty, mid, &mid));
}
out:
return result;
}
+
static CURLMcode multi_socket(struct Curl_multi *multi,
bool checkall,
curl_socket_t s,
}
if(s != CURL_SOCKET_TIMEOUT) {
- Curl_multi_ev_expire_xfers(multi, s, &mrc.now, &mrc.run_cpool);
+ /* Mark all transfers of that socket as dirty */
+ Curl_multi_ev_dirty_xfers(multi, s, &mrc.run_cpool);
}
else {
/* Asked to run due to time-out. Clear the 'last_expire_ts' variable to
mrc.run_cpool = TRUE;
}
- result = multi_run_expired(&mrc);
+ multi_mark_expired_as_dirty(&mrc);
+ result = multi_run_dirty(&mrc);
if(result)
goto out;
* Do that only once or it might be unfair to transfers on other
* sockets. */
mrc.now = curlx_now();
- result = multi_run_expired(&mrc);
+ multi_mark_expired_as_dirty(&mrc);
+ result = multi_run_dirty(&mrc);
}
out:
return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles);
}
+
+static bool multi_has_dirties(struct Curl_multi *multi)
+{
+ unsigned int mid;
+ if(Curl_uint_bset_first(&multi->dirty, &mid)) {
+ do {
+ struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
+ if(data) {
+ return TRUE;
+ }
+ else {
+ CURL_TRC_M(multi->admin, "dirty transfer %u no longer found", mid);
+ Curl_uint_bset_remove(&multi->dirty, mid);
+ }
+ }
+ while(Curl_uint_bset_next(&multi->dirty, mid, &mid));
+ }
+ return FALSE;
+}
+
static CURLMcode multi_timeout(struct Curl_multi *multi,
struct curltime *expire_time,
long *timeout_ms)
return CURLM_OK;
}
- if(multi->timetree) {
+ if(multi_has_dirties(multi)) {
+ *expire_time = curlx_now();
+ *timeout_ms = 0;
+ return CURLM_OK;
+ }
+ else if(multi->timetree) {
/* we have a tree of expire times */
struct curltime now = curlx_now();
return multi->xfers_alive;
}
+void Curl_multi_mark_dirty(struct Curl_easy *data)
+{
+ if(data->multi && data->mid != UINT_MAX)
+ Curl_uint_bset_add(&data->multi->dirty, data->mid);
+}
+
#ifdef DEBUGBUILD
static void multi_xfer_dump(struct Curl_multi *multi, unsigned int mid,
void *entry)
return CURLM_OK;
}
-void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
- curl_socket_t s,
- const struct curltime *nowp,
- bool *run_cpool)
+void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
+ curl_socket_t s,
+ bool *run_cpool)
{
struct mev_sh_entry *entry;
do {
data = Curl_multi_get_easy(multi, mid);
if(data) {
- /* Expire with out current now, so we will get it below when
- * asking the splaytree for expired transfers. */
- Curl_expire_ex(data, nowp, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data);
+ }
+ else {
+ CURL_TRC_M(multi->admin, "socket transfer %u no longer found", mid);
+ Curl_uint_spbset_remove(&entry->xfers, mid);
}
}
while(Curl_uint_spbset_next(&entry->xfers, mid, &mid));
struct Curl_easy *data,
struct connectdata *conn);
-/* Expire all transfers tied to the given socket */
-void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
- curl_socket_t s,
- const struct curltime *nowp,
- bool *run_cpool);
+/* Mark all transfers tied to the given socket as dirty */
+void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
+ curl_socket_t s,
+ bool *run_cpool);
/* Socket will be closed, forget anything we know about it. */
void Curl_multi_ev_socket_done(struct Curl_multi *multi,
struct uint_tbl xfers; /* transfers added to this multi */
/* Each transfer's mid may be present in at most one of these */
struct uint_bset process; /* transfer being processed */
+ struct uint_bset dirty; /* transfer to be run NOW, e.g. ASAP. */
struct uint_bset pending; /* transfers in waiting (conn limit etc.) */
struct uint_bset msgsent; /* transfers done with message for application */
/* Get the # of transfers current in process/pending. */
unsigned int Curl_multi_xfers_running(struct Curl_multi *multi);
+/* Mark a transfer as dirty, e.g. to be rerun at earliest convenience.
+ * A cheap operation, can be done many times repeatedly. */
+void Curl_multi_mark_dirty(struct Curl_easy *data);
+
#endif /* HEADER_CURL_MULTIIF_H */
} while(maxloops--);
- if(!rcvd_eagain || data_pending(data, rcvd_eagain)) {
+ if(!Curl_xfer_is_blocked(data) &&
+ (!rcvd_eagain || data_pending(data, rcvd_eagain))) {
/* Did not read until EAGAIN or there is still data pending
* in buffers. Mark as read-again via simulated SELECT results. */
- data->state.select_bits = CURL_CSELECT_IN;
- if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
- data->state.select_bits |= CURL_CSELECT_OUT;
- if(!Curl_xfer_is_blocked(data))
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, "
- "set select_bits=%x", data->state.select_bits);
+ Curl_multi_mark_dirty(data);
+ CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, mark as dirty");
}
if(((k->keepon & (KEEP_RECV|KEEP_SEND)) == KEEP_SEND) &&
return CURLE_OK;
}
-static int select_bits_paused(struct Curl_easy *data, int select_bits)
-{
- /* See issue #11982: we really need to be careful not to progress
- * a transfer direction when that direction is paused. Not all parts
- * of our state machine are handling PAUSED transfers correctly. So, we
- * do not want to go there.
- * NOTE: we are only interested in PAUSE, not HOLD. */
-
- /* if there is data in a direction not paused, return false */
- if(((select_bits & CURL_CSELECT_IN) && !Curl_xfer_recv_is_paused(data)) ||
- ((select_bits & CURL_CSELECT_OUT) && !Curl_xfer_send_is_paused(data)))
- return FALSE;
-
- return Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data);
-}
-
/*
* Curl_sendrecv() is the low-level function to be called when data is to
* be read and written to/from the connection.
int didwhat = 0;
DEBUGASSERT(nowp);
- if(data->state.select_bits) {
- if(select_bits_paused(data, data->state.select_bits)) {
- /* leave the bits unchanged, so they'll tell us what to do when
- * this transfer gets unpaused. */
- result = CURLE_OK;
- goto out;
- }
- data->state.select_bits = 0;
+ if(Curl_xfer_is_blocked(data)) {
+ result = CURLE_OK;
+ goto out;
}
/* We go ahead and do a read if we have a readable socket or if the stream
#endif
unsigned char httpreq; /* Curl_HttpReq; what kind of HTTP request (if any)
is this */
- unsigned char select_bits; /* != 0 -> bitmask of socket events for this
- transfer overriding anything the socket may
- report */
unsigned int creds_from:2; /* where is the server credentials originating
from, see the CREDS_* defines above */
static void drain_stream_from_other_thread(struct Curl_easy *data,
struct h3_stream_ctx *stream)
{
- unsigned char bits;
-
- /* risky */
- bits = CURL_CSELECT_IN;
- if(stream && !stream->upload_done)
- bits |= CURL_CSELECT_OUT;
- if(data->state.select_bits != bits) {
- data->state.select_bits = bits;
- /* cannot expire from other thread */
- }
+ (void)data;
+ (void)stream;
+ /* cannot expire from other thread.
+ here is the disconnect between msh3 and curl */
}
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
- struct cf_msh3_ctx *ctx = cf->ctx;
- struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
- unsigned char bits;
-
(void)cf;
- bits = CURL_CSELECT_IN;
- if(stream && !stream->upload_done)
- bits |= CURL_CSELECT_OUT;
- if(data->state.select_bits != bits) {
- data->state.select_bits = bits;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
+ Curl_multi_mark_dirty(data);
}
static const MSH3_CONNECTION_IF msh3_conn_if = {
}
}
-static void h3_drain_stream(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- struct cf_ngtcp2_ctx *ctx = cf->ctx;
- struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
- unsigned char bits;
-
- (void)cf;
- bits = CURL_CSELECT_IN;
- if(stream && stream->upload_left && !stream->send_closed)
- bits |= CURL_CSELECT_OUT;
- if(data->state.select_bits != bits) {
- data->state.select_bits = bits;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
-}
-
/* ngtcp2 default congestion controller does not perform pacing. Limit
the maximum packet burst to MAX_PKT_BURST packets. */
#define MAX_PKT_BURST 10
CURL_TRC_CF(s_data, cf, "[%" FMT_PRId64 "] unblock quic flow",
(curl_int64_t)stream_id);
stream->quic_flow_blocked = FALSE;
- h3_drain_stream(cf, s_data);
+ Curl_multi_mark_dirty(s_data);
}
return 0;
}
else {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->id);
}
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
return 0;
}
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
return 0;
}
{
/* There seems to exist no API in ngtcp2 to shrink/enlarge the streams
* windows. As we do in HTTP/2. */
- if(!pause) {
- h3_drain_stream(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
+ (void)cf;
+ if(!pause)
+ Curl_multi_mark_dirty(data);
return CURLE_OK;
}
return NULL;
}
-static void h3_drain_stream(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- struct cf_osslq_ctx *ctx = cf->ctx;
- struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
- unsigned char bits;
-
- (void)cf;
- bits = CURL_CSELECT_IN;
- if(stream && stream->upload_left && !stream->send_closed)
- bits |= CURL_CSELECT_OUT;
- if(data->state.select_bits != bits) {
- data->state.select_bits = bits;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
-}
-
static CURLcode h3_data_pause(struct Curl_cfilter *cf,
struct Curl_easy *data,
bool pause)
{
+ (void)cf;
if(!pause) {
/* unpaused. make it run again right away */
- h3_drain_stream(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data);
}
return CURLE_OK;
}
else {
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->s.id);
}
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
return 0;
}
stream->download_recvd += (curl_off_t)buflen;
CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu, total=%" FMT_OFF_T,
stream->s.id, buflen, stream->download_recvd);
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
return 0;
}
if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
return 0;
}
if(stream) {
nghttp3_conn_unblock_stream(ctx->h3.conn, stream->s.id);
stream->s.send_blocked = FALSE;
- h3_drain_stream(cf, ctx->curl_items[idx_count]);
+ Curl_multi_mark_dirty(ctx->curl_items[idx_count]);
CURL_TRC_CF(ctx->curl_items[idx_count], cf, "unblocked");
}
result_count--;
}
if(*pnread) {
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
}
else {
if(stream->closed) {
(void)user_data;
if(stream->quic_flow_blocked) {
stream->quic_flow_blocked = FALSE;
- Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(sdata);
CURL_TRC_CF(sdata, cf, "[%"FMT_PRIu64"] unblock", stream->id);
}
return TRUE;
{
(void)stream;
(void)user_data;
- CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
- Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
+ CURL_TRC_CF(sdata, cf, "conn closed, mark as dirty");
+ Curl_multi_mark_dirty(sdata);
return TRUE;
}
}
}
-static void h3_drain_stream(struct Curl_cfilter *cf,
- struct Curl_easy *data)
-{
- struct cf_quiche_ctx *ctx = cf->ctx;
- struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
- unsigned char bits;
-
- (void)cf;
- bits = CURL_CSELECT_IN;
- if(stream && !stream->send_closed)
- bits |= CURL_CSELECT_OUT;
- if(data->state.select_bits != bits) {
- data->state.select_bits = bits;
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
- }
-}
-
static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
quiche_h3_event *ev)
{
CURLcode result = h3_process_event(cf, data, stream, ev);
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
if(result)
CURL_TRC_CF(data, cf, "error processing event %s "
"for [%"FMT_PRIu64"] -> %d", cf_ev_name(ev),
if(*pnread) {
if(stream->closed)
- h3_drain_stream(cf, data);
+ Curl_multi_mark_dirty(data);
}
else {
if(stream->closed) {
{
/* There seems to exist no API in quiche to shrink/enlarge the streams
* windows. As we do in HTTP/2. */
+ (void)cf;
if(!pause) {
- h3_drain_stream(cf, data);
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ Curl_multi_mark_dirty(data);
}
return CURLE_OK;
}
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
- /* we want to use the _sending_ function even when the socket turns
- out readable as the underlying libssh sftp send function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_OUT;
-
/* since we do not really wait for anything at this point, we want the
- state machine to move on as soon as possible so we set a very short
- timeout here */
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ state machine to move on as soon as possible so we mark this as dirty */
+ Curl_multi_mark_dirty(data);
#if LIBSSH_VERSION_INT > SSH_VERSION_INT(0, 11, 0)
sshc->sftp_send_state = 0;
#endif
/* not set by Curl_xfer_setup to preserve keepon bits */
data->conn->writesockfd = data->conn->sockfd;
- /* we want to use the _receiving_ function even when the socket turns
- out writableable as the underlying libssh recv function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_IN;
-
sshc->sftp_recv_state = 0;
myssh_to(data, sshc, SSH_STOP);
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
- /* we want to use the _sending_ function even when the socket turns
- out readable as the underlying libssh scp send function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_OUT;
-
myssh_to(data, sshc, SSH_STOP);
break;
/* not set by Curl_xfer_setup to preserve keepon bits */
conn->writesockfd = conn->sockfd;
- /* we want to use the _receiving_ function even when the socket turns
- out writableable as the underlying libssh recv function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_IN;
-
myssh_to(data, sshc, SSH_STOP);
break;
}
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
- /* we want to use the _sending_ function even when the socket turns
- out readable as the underlying libssh2 sftp send function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_OUT;
-
/* since we do not really wait for anything at this point, we want the
- state machine to move on as soon as possible so we set a very short
- timeout here */
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ state machine to move on as soon as possible so mark this as dirty */
+ Curl_multi_mark_dirty(data);
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
/* not set by Curl_xfer_setup to preserve keepon bits */
data->conn->writesockfd = data->conn->sockfd;
- /* we want to use the _receiving_ function even when the socket turns
- out writableable as the underlying libssh2 recv function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_IN;
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
/* not set by Curl_xfer_setup to preserve keepon bits */
data->conn->writesockfd = data->conn->sockfd;
- /* we want to use the _receiving_ function even when the socket turns
- out writableable as the underlying libssh2 recv function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_IN;
-
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
}
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
- /* we want to use the _sending_ function even when the socket turns
- out readable as the underlying libssh2 scp send function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_OUT;
-
myssh_state(data, sshc, SSH_STOP);
return CURLE_OK;
figure out a "real" bitmask */
sshc->orig_waitfor = data->req.keepon;
- /* we want to use the _sending_ function even when the socket turns
- out readable as the underlying libssh2 sftp send function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_OUT;
-
/* since we do not really wait for anything at this point, we want the
- state machine to move on as soon as possible so we set a very short
- timeout here */
- Curl_expire(data, 0, EXPIRE_RUN_NOW);
+ state machine to move on as soon as possible */
+ Curl_multi_mark_dirty(data);
wssh_state(data, sshc, SSH_STOP);
}
/* not set by Curl_xfer_setup to preserve keepon bits */
conn->writesockfd = conn->sockfd;
- /* we want to use the _receiving_ function even when the socket turns
- out writableable as the underlying libssh2 recv function will deal
- with both accordingly */
- data->state.select_bits = CURL_CSELECT_IN;
-
if(result) {
/* this should never occur; the close state should be entered
at the time the error occurs */