From: Stefan Eissing Date: Wed, 18 Jun 2025 10:34:43 +0000 (+0200) Subject: multi: add dirty bitset X-Git-Tag: curl-8_15_0~197 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=779937f8400910924154afafb85dedbe95b7dffa;p=thirdparty%2Fcurl.git multi: add dirty bitset Add a bitset `dirty` to the multi handle. The presence of a transfer int he "dirty" set means: this transfer has something to do ASAP. "dirty" is set by multiplexing protocols like HTTP/2 and 3 when encountering response data for another transfer than the current one. "dirty" is set by protocols that want to be called. Implementation: * just an additional `uint_bset` in the multi handle * `Curl_multi_mark_dirty()` to add a transfer to the dirty set. * `multi_runsingle()` clears the dirty bit of the transfer at start. Without new dirty marks, this empties the set after al dirty transfers have been run. * `multi_timeout()` immediately gives the current time and timeout_ms == 0 when dirty transfers are present. * multi_event: marks all transfers tracked for a socket as dirty. Then marks all expired transfers as dirty. Then it runs all dirty transfers. With this mechanism: * Most uses of `EXPIRE_RUN_NOW` are replaced by `Curl_multi_mark_dirty()` * `Curl_multi_mark_dirty()` is cheaper than querying if a transfer is already dirty or set for timeout. There is no need to check, just do it. * `data->state.select_bits` is eliminated. We need no longer to simulate a poll event to make a transfer run. Closes #17662 --- diff --git a/lib/cf-h2-proxy.c b/lib/cf-h2-proxy.c index 43eef7b498..6258c18765 100644 --- a/lib/cf-h2-proxy.c +++ b/lib/cf-h2-proxy.c @@ -218,19 +218,10 @@ static void drain_tunnel(struct Curl_cfilter *cf, struct tunnel_stream *tunnel) { struct cf_h2_proxy_ctx *ctx = cf->ctx; - unsigned char bits; - (void)cf; - bits = CURL_CSELECT_IN; if(!tunnel->closed && !tunnel->reset && !Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) - bits |= CURL_CSELECT_OUT; - if(data->state.select_bits != bits) { - CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x", - tunnel->stream_id, bits); - data->state.select_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } + Curl_multi_mark_dirty(data); } static ssize_t proxy_nw_in_reader(void *reader_ctx, diff --git a/lib/connect.c b/lib/connect.c index 41435235ce..589f8ebfbb 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -684,7 +684,7 @@ evaluate: /* next attempt was started */ CURL_TRC_CF(data, cf, "%s trying next", baller->name); ++ongoing; - Curl_expire(data, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data); } } } diff --git a/lib/doh.c b/lib/doh.c index 0980a01938..376437c8b1 100644 --- a/lib/doh.c +++ b/lib/doh.c @@ -257,7 +257,7 @@ static void doh_probe_done(struct Curl_easy *data, if(!dohp->pending) { /* DoH completed, run the transfer picking up the results */ - Curl_expire(data, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data); } } } diff --git a/lib/easy.c b/lib/easy.c index ca32521e32..d540888d3a 100644 --- a/lib/easy.c +++ b/lib/easy.c @@ -1167,11 +1167,6 @@ CURLcode curl_easy_pause(CURL *d, int action) Curl_expire(data, 0, EXPIRE_RUN_NOW); /* reset the too-slow time keeper */ data->state.keeps_speed.tv_sec = 0; - /* Simulate socket events on next run for unpaused directions */ - if(!send_paused_new) - data->state.select_bits |= CURL_CSELECT_OUT; - if(!recv_paused_new) - data->state.select_bits |= CURL_CSELECT_IN; /* On changes, tell application to update its timers. */ if(changed && data->multi) { if(Curl_update_timer(data->multi) && !result) diff --git a/lib/http2.c b/lib/http2.c index 1e583e89dc..58df5fac6e 100644 --- a/lib/http2.c +++ b/lib/http2.c @@ -377,27 +377,6 @@ static CURLcode cf_h2_update_local_win(struct Curl_cfilter *cf, } #endif /* !NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE */ -/* - * Mark this transfer to get "drained". - */ -static void drain_stream(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct h2_stream_ctx *stream) -{ - unsigned char bits; - - (void)cf; - bits = CURL_CSELECT_IN; - if(!stream->closed && - (!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf))) - bits |= CURL_CSELECT_OUT; - if(stream->closed || (data->state.select_bits != bits)) { - CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x", - stream->id, bits); - data->state.select_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } -} static CURLcode http2_data_setup(struct Curl_cfilter *cf, struct Curl_easy *data, @@ -1200,7 +1179,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, if(stream->status_code / 100 != 1) { stream->resp_hds_complete = TRUE; } - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); break; case NGHTTP2_PUSH_PROMISE: rv = push_promise(cf, data, &frame->push_promise); @@ -1223,12 +1202,12 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, if(frame->rst_stream.error_code) { stream->reset = TRUE; } - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); break; case NGHTTP2_WINDOW_UPDATE: if(CURL_WANT_SEND(data) && Curl_bufq_is_empty(&stream->sendbuf)) { /* need more data, force processing of transfer */ - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); } else if(!Curl_bufq_is_empty(&stream->sendbuf)) { /* resume the potentially suspended stream */ @@ -1254,7 +1233,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf, stream->id, NGHTTP2_STREAM_CLOSED); stream->closed = TRUE; } - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); } return CURLE_OK; } @@ -1403,11 +1382,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame, * window and *assume* that we treat this like a WINDOW_UPDATE. Some * servers send an explicit WINDOW_UPDATE, but not all seem to do that. * To be safe, we UNHOLD a stream in order not to stall. */ - if(CURL_WANT_SEND(data)) { - struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data); - if(stream) - drain_stream(cf, data, stream); - } + if(CURL_WANT_SEND(data)) + Curl_multi_mark_dirty(data); } break; } @@ -1552,7 +1528,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id, stream_id, nghttp2_http2_strerror(error_code), error_code); else CURL_TRC_CF(data_s, cf, "[%d] CLOSED", stream_id); - drain_stream(cf, data_s, stream); + Curl_multi_mark_dirty(data_s); /* remove `data_s` from the nghttp2 stream */ rv = nghttp2_session_set_stream_user_data(session, stream_id, 0); @@ -1746,7 +1722,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, } /* if we receive data for another handle, wake that up */ if(CF_DATA_CURRENT(cf) != data_s) - Curl_expire(data_s, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data_s); CURL_TRC_CF(data_s, cf, "[%d] status: HTTP/2 %03d", stream->id, stream->status_code); @@ -1773,7 +1749,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame, } /* if we receive data for another handle, wake that up */ if(CF_DATA_CURRENT(cf) != data_s) - Curl_expire(data_s, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data_s); CURL_TRC_CF(data_s, cf, "[%d] header: %.*s: %.*s", stream->id, (int)namelen, name, (int)valuelen, value); @@ -2106,7 +2082,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf, * this may leave data in underlying buffers that will not * be consumed. */ if(!cf->next || !cf->next->cft->has_data_pending(cf->next, data)) - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); break; } @@ -2184,7 +2160,7 @@ static CURLcode cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, nghttp2_session_consume(ctx->h2, stream->id, *pnread); if(stream->closed) { CURL_TRC_CF(data, cf, "[%d] DRAIN closed stream", stream->id); - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); } } @@ -2195,7 +2171,7 @@ out: * monitor the socket for POLLOUT, but when not SENDING * any more, we force processing of the transfer. */ if(!CURL_WANT_SEND(data)) - drain_stream(cf, data, stream); + Curl_multi_mark_dirty(data); } else if(r2) { result = r2; @@ -2712,8 +2688,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf, * not. We may have already buffered and exhausted the new window * by operating on things in flight during the handling of other * transfers. */ - drain_stream(cf, data, stream); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data); } CURL_TRC_CF(data, cf, "[%d] stream now %spaused", stream->id, pause ? "" : "un"); diff --git a/lib/imap.c b/lib/imap.c index 4e6c01475c..267671a56f 100644 --- a/lib/imap.c +++ b/lib/imap.c @@ -1347,9 +1347,6 @@ static CURLcode imap_state_fetch_resp(struct Curl_easy *data, else { /* IMAP download */ data->req.maxdownload = size; - /* force a recv/send check of this connection, as the data might've been - read off the socket already */ - data->state.select_bits = CURL_CSELECT_IN; Curl_xfer_setup1(data, CURL_XFER_RECV, size, FALSE); } } diff --git a/lib/multi.c b/lib/multi.c index 741576ddcc..d0cb60afd2 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -235,6 +235,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size, Curl_multi_ev_init(multi, ev_hashsize); Curl_uint_tbl_init(&multi->xfers, NULL); Curl_uint_bset_init(&multi->process); + Curl_uint_bset_init(&multi->dirty); Curl_uint_bset_init(&multi->pending); Curl_uint_bset_init(&multi->msgsent); Curl_hash_init(&multi->proto_hash, 23, @@ -247,6 +248,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size, if(Curl_uint_bset_resize(&multi->process, xfer_table_size) || Curl_uint_bset_resize(&multi->pending, xfer_table_size) || + Curl_uint_bset_resize(&multi->dirty, xfer_table_size) || Curl_uint_bset_resize(&multi->msgsent, xfer_table_size) || Curl_uint_tbl_resize(&multi->xfers, xfer_table_size)) goto error; @@ -301,6 +303,7 @@ error: } Curl_uint_bset_destroy(&multi->process); + Curl_uint_bset_destroy(&multi->dirty); Curl_uint_bset_destroy(&multi->pending); Curl_uint_bset_destroy(&multi->msgsent); Curl_uint_tbl_destroy(&multi->xfers); @@ -355,6 +358,7 @@ static CURLMcode multi_xfers_add(struct Curl_multi *multi, * to work properly when larger than the table, but not * the other way around. */ if(Curl_uint_bset_resize(&multi->process, newsize) || + Curl_uint_bset_resize(&multi->dirty, newsize) || Curl_uint_bset_resize(&multi->pending, newsize) || Curl_uint_bset_resize(&multi->msgsent, newsize) || Curl_uint_tbl_resize(&multi->xfers, newsize)) @@ -401,6 +405,7 @@ CURLMcode curl_multi_add_handle(CURLM *m, CURL *d) return CURLM_ABORTED_BY_CALLBACK; multi->dead = FALSE; Curl_uint_bset_clear(&multi->process); + Curl_uint_bset_clear(&multi->dirty); Curl_uint_bset_clear(&multi->pending); Curl_uint_bset_clear(&multi->msgsent); } @@ -795,6 +800,7 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d) DEBUGASSERT(Curl_uint_tbl_contains(&multi->xfers, mid)); Curl_uint_tbl_remove(&multi->xfers, mid); Curl_uint_bset_remove(&multi->process, mid); + Curl_uint_bset_remove(&multi->dirty, mid); Curl_uint_bset_remove(&multi->pending, mid); Curl_uint_bset_remove(&multi->msgsent, mid); data->multi = NULL; @@ -1048,8 +1054,8 @@ void Curl_multi_getsock(struct Curl_easy *data, (Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) && Curl_conn_data_pending(data, SECONDARYSOCKET)))) { CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still " - "buffered input to consume -> EXPIRE_RUN_NOW", caller); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + "buffered input to consume -> mark as dirty", caller); + Curl_multi_mark_dirty(data); } switch(ps->num) { @@ -1965,14 +1971,6 @@ static CURLMcode state_performing(struct Curl_easy *data, } } } - else if(data->state.select_bits && !Curl_xfer_is_blocked(data)) { - /* This avoids CURLM_CALL_MULTI_PERFORM so that a very fast transfer does - not get stuck on this transfer at the expense of other concurrent - transfers */ - CURL_TRC_M(data, "EXPIRE_RUN_NOW unblocked, select_bits=%x", - data->state.select_bits); - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } free(newurl); *resultp = result; return rc; @@ -2297,6 +2295,10 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, multi_warn_debug(multi, data); + /* transfer runs now, clear the dirty bit. This may be set + * again during processing, triggering a re-run later. */ + Curl_uint_bset_remove(&multi->dirty, data->mid); + do { /* A "stream" here is a logical stream if the protocol can handle that (HTTP/2), or the full connection for older protocols */ @@ -2840,6 +2842,7 @@ CURLMcode curl_multi_cleanup(CURLM *m) } #endif Curl_uint_bset_destroy(&multi->process); + Curl_uint_bset_destroy(&multi->dirty); Curl_uint_bset_destroy(&multi->pending); Curl_uint_bset_destroy(&multi->msgsent); Curl_uint_tbl_destroy(&multi->xfers); @@ -2963,12 +2966,11 @@ struct multi_run_ctx { bool run_cpool; }; -static CURLMcode multi_run_expired(struct multi_run_ctx *mrc) +static void multi_mark_expired_as_dirty(struct multi_run_ctx *mrc) { struct Curl_multi *multi = mrc->multi; struct Curl_easy *data = NULL; struct Curl_tree *t = NULL; - CURLMcode result = CURLM_OK; /* * The loop following here will go on as long as there are expire-times left @@ -2980,33 +2982,59 @@ static CURLMcode multi_run_expired(struct multi_run_ctx *mrc) extracts a matching node if there is one */ multi->timetree = Curl_splaygetbest(mrc->now, multi->timetree, &t); if(!t) - goto out; + return; data = Curl_splayget(t); /* assign this for next loop */ if(!data) continue; (void)add_next_timeout(mrc->now, multi, data); - if(data == multi->admin) { - mrc->run_cpool = TRUE; - continue; - } + Curl_multi_mark_dirty(data); + } +} - mrc->run_xfers++; - sigpipe_apply(data, &mrc->pipe_st); - result = multi_runsingle(multi, &mrc->now, data); +static CURLMcode multi_run_dirty(struct multi_run_ctx *mrc) +{ + struct Curl_multi *multi = mrc->multi; + CURLMcode result = CURLM_OK; + unsigned int mid; - if(CURLM_OK >= result) { - /* reassess event handling of data */ - result = Curl_multi_ev_assess_xfer(multi, data); - if(result) - goto out; + if(Curl_uint_bset_first(&multi->dirty, &mid)) { + do { + struct Curl_easy *data = Curl_multi_get_easy(multi, mid); + if(data) { + CURL_TRC_M(data, "multi_run_dirty"); + + if(data == multi->admin) { + Curl_uint_bset_remove(&multi->dirty, mid); + mrc->run_cpool = TRUE; + continue; + } + + mrc->run_xfers++; + sigpipe_apply(data, &mrc->pipe_st); + /* runsingle() clears the dirty mid */ + result = multi_runsingle(multi, &mrc->now, data); + + if(CURLM_OK >= result) { + /* reassess event handling of data */ + result = Curl_multi_ev_assess_xfer(multi, data); + if(result) + goto out; + } + } + else { + CURL_TRC_M(multi->admin, "multi_run_dirty, %u no longer found", mid); + Curl_uint_bset_remove(&multi->dirty, mid); + } } + while(Curl_uint_bset_next(&multi->dirty, mid, &mid)); } out: return result; } + static CURLMcode multi_socket(struct Curl_multi *multi, bool checkall, curl_socket_t s, @@ -3035,7 +3063,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi, } if(s != CURL_SOCKET_TIMEOUT) { - Curl_multi_ev_expire_xfers(multi, s, &mrc.now, &mrc.run_cpool); + /* Mark all transfers of that socket as dirty */ + Curl_multi_ev_dirty_xfers(multi, s, &mrc.run_cpool); } else { /* Asked to run due to time-out. Clear the 'last_expire_ts' variable to @@ -3047,7 +3076,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi, mrc.run_cpool = TRUE; } - result = multi_run_expired(&mrc); + multi_mark_expired_as_dirty(&mrc); + result = multi_run_dirty(&mrc); if(result) goto out; @@ -3058,7 +3088,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi, * Do that only once or it might be unfair to transfers on other * sockets. */ mrc.now = curlx_now(); - result = multi_run_expired(&mrc); + multi_mark_expired_as_dirty(&mrc); + result = multi_run_dirty(&mrc); } out: @@ -3186,6 +3217,26 @@ CURLMcode curl_multi_socket_all(CURLM *m, int *running_handles) return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles); } + +static bool multi_has_dirties(struct Curl_multi *multi) +{ + unsigned int mid; + if(Curl_uint_bset_first(&multi->dirty, &mid)) { + do { + struct Curl_easy *data = Curl_multi_get_easy(multi, mid); + if(data) { + return TRUE; + } + else { + CURL_TRC_M(multi->admin, "dirty transfer %u no longer found", mid); + Curl_uint_bset_remove(&multi->dirty, mid); + } + } + while(Curl_uint_bset_next(&multi->dirty, mid, &mid)); + } + return FALSE; +} + static CURLMcode multi_timeout(struct Curl_multi *multi, struct curltime *expire_time, long *timeout_ms) @@ -3197,7 +3248,12 @@ static CURLMcode multi_timeout(struct Curl_multi *multi, return CURLM_OK; } - if(multi->timetree) { + if(multi_has_dirties(multi)) { + *expire_time = curlx_now(); + *timeout_ms = 0; + return CURLM_OK; + } + else if(multi->timetree) { /* we have a tree of expire times */ struct curltime now = curlx_now(); @@ -3791,6 +3847,12 @@ unsigned int Curl_multi_xfers_running(struct Curl_multi *multi) return multi->xfers_alive; } +void Curl_multi_mark_dirty(struct Curl_easy *data) +{ + if(data->multi && data->mid != UINT_MAX) + Curl_uint_bset_add(&data->multi->dirty, data->mid); +} + #ifdef DEBUGBUILD static void multi_xfer_dump(struct Curl_multi *multi, unsigned int mid, void *entry) diff --git a/lib/multi_ev.c b/lib/multi_ev.c index 21d2867619..0b4c472849 100644 --- a/lib/multi_ev.c +++ b/lib/multi_ev.c @@ -563,10 +563,9 @@ CURLMcode Curl_multi_ev_assign(struct Curl_multi *multi, return CURLM_OK; } -void Curl_multi_ev_expire_xfers(struct Curl_multi *multi, - curl_socket_t s, - const struct curltime *nowp, - bool *run_cpool) +void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi, + curl_socket_t s, + bool *run_cpool) { struct mev_sh_entry *entry; @@ -586,9 +585,11 @@ void Curl_multi_ev_expire_xfers(struct Curl_multi *multi, do { data = Curl_multi_get_easy(multi, mid); if(data) { - /* Expire with out current now, so we will get it below when - * asking the splaytree for expired transfers. */ - Curl_expire_ex(data, nowp, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data); + } + else { + CURL_TRC_M(multi->admin, "socket transfer %u no longer found", mid); + Curl_uint_spbset_remove(&entry->xfers, mid); } } while(Curl_uint_spbset_next(&entry->xfers, mid, &mid)); diff --git a/lib/multi_ev.h b/lib/multi_ev.h index 06be842fb9..20c1aeac81 100644 --- a/lib/multi_ev.h +++ b/lib/multi_ev.h @@ -61,11 +61,10 @@ CURLMcode Curl_multi_ev_assess_conn(struct Curl_multi *multi, struct Curl_easy *data, struct connectdata *conn); -/* Expire all transfers tied to the given socket */ -void Curl_multi_ev_expire_xfers(struct Curl_multi *multi, - curl_socket_t s, - const struct curltime *nowp, - bool *run_cpool); +/* Mark all transfers tied to the given socket as dirty */ +void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi, + curl_socket_t s, + bool *run_cpool); /* Socket will be closed, forget anything we know about it. */ void Curl_multi_ev_socket_done(struct Curl_multi *multi, diff --git a/lib/multihandle.h b/lib/multihandle.h index 04db02f0b1..4bf2dd811c 100644 --- a/lib/multihandle.h +++ b/lib/multihandle.h @@ -98,6 +98,7 @@ struct Curl_multi { struct uint_tbl xfers; /* transfers added to this multi */ /* Each transfer's mid may be present in at most one of these */ struct uint_bset process; /* transfer being processed */ + struct uint_bset dirty; /* transfer to be run NOW, e.g. ASAP. */ struct uint_bset pending; /* transfers in waiting (conn limit etc.) */ struct uint_bset msgsent; /* transfers done with message for application */ diff --git a/lib/multiif.h b/lib/multiif.h index eae634ab40..e68a278a8d 100644 --- a/lib/multiif.h +++ b/lib/multiif.h @@ -173,4 +173,8 @@ struct Curl_easy *Curl_multi_get_easy(struct Curl_multi *multi, /* Get the # of transfers current in process/pending. */ unsigned int Curl_multi_xfers_running(struct Curl_multi *multi); +/* Mark a transfer as dirty, e.g. to be rerun at earliest convenience. + * A cheap operation, can be done many times repeatedly. */ +void Curl_multi_mark_dirty(struct Curl_easy *data); + #endif /* HEADER_CURL_MULTIIF_H */ diff --git a/lib/transfer.c b/lib/transfer.c index 603efc4b1f..a348383c9a 100644 --- a/lib/transfer.c +++ b/lib/transfer.c @@ -358,16 +358,12 @@ static CURLcode sendrecv_dl(struct Curl_easy *data, } while(maxloops--); - if(!rcvd_eagain || data_pending(data, rcvd_eagain)) { + if(!Curl_xfer_is_blocked(data) && + (!rcvd_eagain || data_pending(data, rcvd_eagain))) { /* Did not read until EAGAIN or there is still data pending * in buffers. Mark as read-again via simulated SELECT results. */ - data->state.select_bits = CURL_CSELECT_IN; - if((k->keepon & KEEP_SENDBITS) == KEEP_SEND) - data->state.select_bits |= CURL_CSELECT_OUT; - if(!Curl_xfer_is_blocked(data)) - Curl_expire(data, 0, EXPIRE_RUN_NOW); - CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, " - "set select_bits=%x", data->state.select_bits); + Curl_multi_mark_dirty(data); + CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, mark as dirty"); } if(((k->keepon & (KEEP_RECV|KEEP_SEND)) == KEEP_SEND) && @@ -403,22 +399,6 @@ static CURLcode sendrecv_ul(struct Curl_easy *data, int *didwhat) return CURLE_OK; } -static int select_bits_paused(struct Curl_easy *data, int select_bits) -{ - /* See issue #11982: we really need to be careful not to progress - * a transfer direction when that direction is paused. Not all parts - * of our state machine are handling PAUSED transfers correctly. So, we - * do not want to go there. - * NOTE: we are only interested in PAUSE, not HOLD. */ - - /* if there is data in a direction not paused, return false */ - if(((select_bits & CURL_CSELECT_IN) && !Curl_xfer_recv_is_paused(data)) || - ((select_bits & CURL_CSELECT_OUT) && !Curl_xfer_send_is_paused(data))) - return FALSE; - - return Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data); -} - /* * Curl_sendrecv() is the low-level function to be called when data is to * be read and written to/from the connection. @@ -430,14 +410,9 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp) int didwhat = 0; DEBUGASSERT(nowp); - if(data->state.select_bits) { - if(select_bits_paused(data, data->state.select_bits)) { - /* leave the bits unchanged, so they'll tell us what to do when - * this transfer gets unpaused. */ - result = CURLE_OK; - goto out; - } - data->state.select_bits = 0; + if(Curl_xfer_is_blocked(data)) { + result = CURLE_OK; + goto out; } /* We go ahead and do a read if we have a readable socket or if the stream diff --git a/lib/urldata.h b/lib/urldata.h index 20c8adf281..63bee65dc9 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -1155,9 +1155,6 @@ struct UrlState { #endif unsigned char httpreq; /* Curl_HttpReq; what kind of HTTP request (if any) is this */ - unsigned char select_bits; /* != 0 -> bitmask of socket events for this - transfer overriding anything the socket may - report */ unsigned int creds_from:2; /* where is the server credentials originating from, see the CREDS_* defines above */ diff --git a/lib/vquic/curl_msh3.c b/lib/vquic/curl_msh3.c index 947ce4fbeb..543902dea1 100644 --- a/lib/vquic/curl_msh3.c +++ b/lib/vquic/curl_msh3.c @@ -244,33 +244,17 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) static void drain_stream_from_other_thread(struct Curl_easy *data, struct h3_stream_ctx *stream) { - unsigned char bits; - - /* risky */ - bits = CURL_CSELECT_IN; - if(stream && !stream->upload_done) - bits |= CURL_CSELECT_OUT; - if(data->state.select_bits != bits) { - data->state.select_bits = bits; - /* cannot expire from other thread */ - } + (void)data; + (void)stream; + /* cannot expire from other thread. + here is the disconnect between msh3 and curl */ } static void h3_drain_stream(struct Curl_cfilter *cf, struct Curl_easy *data) { - struct cf_msh3_ctx *ctx = cf->ctx; - struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); - unsigned char bits; - (void)cf; - bits = CURL_CSELECT_IN; - if(stream && !stream->upload_done) - bits |= CURL_CSELECT_OUT; - if(data->state.select_bits != bits) { - data->state.select_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } + Curl_multi_mark_dirty(data); } static const MSH3_CONNECTION_IF msh3_conn_if = { diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index 2cb4469399..444e9ca2bf 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -342,23 +342,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) } } -static void h3_drain_stream(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct cf_ngtcp2_ctx *ctx = cf->ctx; - struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); - unsigned char bits; - - (void)cf; - bits = CURL_CSELECT_IN; - if(stream && stream->upload_left && !stream->send_closed) - bits |= CURL_CSELECT_OUT; - if(data->state.select_bits != bits) { - data->state.select_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } -} - /* ngtcp2 default congestion controller does not perform pacing. Limit the maximum packet burst to MAX_PKT_BURST packets. */ #define MAX_PKT_BURST 10 @@ -744,7 +727,7 @@ static int cb_extend_max_stream_data(ngtcp2_conn *tconn, int64_t stream_id, CURL_TRC_CF(s_data, cf, "[%" FMT_PRId64 "] unblock quic flow", (curl_int64_t)stream_id); stream->quic_flow_blocked = FALSE; - h3_drain_stream(cf, s_data); + Curl_multi_mark_dirty(s_data); } return 0; } @@ -971,7 +954,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t sid, else { CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->id); } - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); return 0; } @@ -1072,7 +1055,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid, if(stream->status_code / 100 != 1) { stream->resp_hds_complete = TRUE; } - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); return 0; } @@ -1984,10 +1967,9 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf, { /* There seems to exist no API in ngtcp2 to shrink/enlarge the streams * windows. As we do in HTTP/2. */ - if(!pause) { - h3_drain_stream(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } + (void)cf; + if(!pause) + Curl_multi_mark_dirty(data); return CURLE_OK; } diff --git a/lib/vquic/curl_osslq.c b/lib/vquic/curl_osslq.c index 5238c582ac..aa93ae70e2 100644 --- a/lib/vquic/curl_osslq.c +++ b/lib/vquic/curl_osslq.c @@ -711,31 +711,14 @@ static struct cf_osslq_stream *cf_osslq_get_qstream(struct Curl_cfilter *cf, return NULL; } -static void h3_drain_stream(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct cf_osslq_ctx *ctx = cf->ctx; - struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); - unsigned char bits; - - (void)cf; - bits = CURL_CSELECT_IN; - if(stream && stream->upload_left && !stream->send_closed) - bits |= CURL_CSELECT_OUT; - if(data->state.select_bits != bits) { - data->state.select_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } -} - static CURLcode h3_data_pause(struct Curl_cfilter *cf, struct Curl_easy *data, bool pause) { + (void)cf; if(!pause) { /* unpaused. make it run again right away */ - h3_drain_stream(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data); } return CURLE_OK; } @@ -766,7 +749,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id, else { CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->s.id); } - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); return 0; } @@ -831,7 +814,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id, stream->download_recvd += (curl_off_t)buflen; CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu, total=%" FMT_OFF_T, stream->s.id, buflen, stream->download_recvd); - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); return 0; } @@ -943,7 +926,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid, if(stream->status_code / 100 != 1) { stream->resp_hds_complete = TRUE; } - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); return 0; } @@ -1566,7 +1549,7 @@ static CURLcode cf_osslq_check_and_unblock(struct Curl_cfilter *cf, if(stream) { nghttp3_conn_unblock_stream(ctx->h3.conn, stream->s.id); stream->s.send_blocked = FALSE; - h3_drain_stream(cf, ctx->curl_items[idx_count]); + Curl_multi_mark_dirty(ctx->curl_items[idx_count]); CURL_TRC_CF(ctx->curl_items[idx_count], cf, "unblocked"); } result_count--; @@ -2163,7 +2146,7 @@ static CURLcode cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data, } if(*pnread) { - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); } else { if(stream->closed) { diff --git a/lib/vquic/curl_quiche.c b/lib/vquic/curl_quiche.c index 129b606043..6b9ab2a0ec 100644 --- a/lib/vquic/curl_quiche.c +++ b/lib/vquic/curl_quiche.c @@ -237,7 +237,7 @@ static bool cf_quiche_do_resume(struct Curl_cfilter *cf, (void)user_data; if(stream->quic_flow_blocked) { stream->quic_flow_blocked = FALSE; - Curl_expire(sdata, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(sdata); CURL_TRC_CF(sdata, cf, "[%"FMT_PRIu64"] unblock", stream->id); } return TRUE; @@ -250,8 +250,8 @@ static bool cf_quiche_do_expire(struct Curl_cfilter *cf, { (void)stream; (void)user_data; - CURL_TRC_CF(sdata, cf, "conn closed, expire transfer"); - Curl_expire(sdata, 0, EXPIRE_RUN_NOW); + CURL_TRC_CF(sdata, cf, "conn closed, mark as dirty"); + Curl_multi_mark_dirty(sdata); return TRUE; } @@ -307,23 +307,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) } } -static void h3_drain_stream(struct Curl_cfilter *cf, - struct Curl_easy *data) -{ - struct cf_quiche_ctx *ctx = cf->ctx; - struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); - unsigned char bits; - - (void)cf; - bits = CURL_CSELECT_IN; - if(stream && !stream->send_closed) - bits |= CURL_CSELECT_OUT; - if(data->state.select_bits != bits) { - data->state.select_bits = bits; - Curl_expire(data, 0, EXPIRE_RUN_NOW); - } -} - static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf, struct Curl_easy *data) { @@ -562,7 +545,7 @@ static CURLcode cf_quiche_ev_process(struct Curl_cfilter *cf, quiche_h3_event *ev) { CURLcode result = h3_process_event(cf, data, stream, ev); - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); if(result) CURL_TRC_CF(data, cf, "error processing event %s " "for [%"FMT_PRIu64"] -> %d", cf_ev_name(ev), @@ -917,7 +900,7 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data, if(*pnread) { if(stream->closed) - h3_drain_stream(cf, data); + Curl_multi_mark_dirty(data); } else { if(stream->closed) { @@ -1229,9 +1212,9 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf, { /* There seems to exist no API in quiche to shrink/enlarge the streams * windows. As we do in HTTP/2. */ + (void)cf; if(!pause) { - h3_drain_stream(cf, data); - Curl_expire(data, 0, EXPIRE_RUN_NOW); + Curl_multi_mark_dirty(data); } return CURLE_OK; } diff --git a/lib/vssh/libssh.c b/lib/vssh/libssh.c index 84ef9e1ca9..643b8f4987 100644 --- a/lib/vssh/libssh.c +++ b/lib/vssh/libssh.c @@ -1256,15 +1256,9 @@ static int myssh_in_UPLOAD_INIT(struct Curl_easy *data, figure out a "real" bitmask */ sshc->orig_waitfor = data->req.keepon; - /* we want to use the _sending_ function even when the socket turns - out readable as the underlying libssh sftp send function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_OUT; - /* since we do not really wait for anything at this point, we want the - state machine to move on as soon as possible so we set a very short - timeout here */ - Curl_expire(data, 0, EXPIRE_RUN_NOW); + state machine to move on as soon as possible so we mark this as dirty */ + Curl_multi_mark_dirty(data); #if LIBSSH_VERSION_INT > SSH_VERSION_INT(0, 11, 0) sshc->sftp_send_state = 0; #endif @@ -1430,11 +1424,6 @@ static int myssh_in_SFTP_DOWNLOAD_STAT(struct Curl_easy *data, /* not set by Curl_xfer_setup to preserve keepon bits */ data->conn->writesockfd = data->conn->sockfd; - /* we want to use the _receiving_ function even when the socket turns - out writableable as the underlying libssh recv function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_IN; - sshc->sftp_recv_state = 0; myssh_to(data, sshc, SSH_STOP); @@ -2258,11 +2247,6 @@ static CURLcode myssh_statemach_act(struct Curl_easy *data, figure out a "real" bitmask */ sshc->orig_waitfor = data->req.keepon; - /* we want to use the _sending_ function even when the socket turns - out readable as the underlying libssh scp send function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_OUT; - myssh_to(data, sshc, SSH_STOP); break; @@ -2298,11 +2282,6 @@ static CURLcode myssh_statemach_act(struct Curl_easy *data, /* not set by Curl_xfer_setup to preserve keepon bits */ conn->writesockfd = conn->sockfd; - /* we want to use the _receiving_ function even when the socket turns - out writableable as the underlying libssh recv function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_IN; - myssh_to(data, sshc, SSH_STOP); break; } diff --git a/lib/vssh/libssh2.c b/lib/vssh/libssh2.c index 32ed0845b0..c9ca46f561 100644 --- a/lib/vssh/libssh2.c +++ b/lib/vssh/libssh2.c @@ -1209,15 +1209,9 @@ sftp_upload_init(struct Curl_easy *data, figure out a "real" bitmask */ sshc->orig_waitfor = data->req.keepon; - /* we want to use the _sending_ function even when the socket turns - out readable as the underlying libssh2 sftp send function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_OUT; - /* since we do not really wait for anything at this point, we want the - state machine to move on as soon as possible so we set a very short - timeout here */ - Curl_expire(data, 0, EXPIRE_RUN_NOW); + state machine to move on as soon as possible so mark this as dirty */ + Curl_multi_mark_dirty(data); myssh_state(data, sshc, SSH_STOP); return CURLE_OK; @@ -1552,10 +1546,6 @@ sftp_download_stat(struct Curl_easy *data, /* not set by Curl_xfer_setup to preserve keepon bits */ data->conn->writesockfd = data->conn->sockfd; - /* we want to use the _receiving_ function even when the socket turns - out writableable as the underlying libssh2 recv function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_IN; myssh_state(data, sshc, SSH_STOP); return CURLE_OK; @@ -2476,11 +2466,6 @@ static CURLcode ssh_state_scp_download_init(struct Curl_easy *data, /* not set by Curl_xfer_setup to preserve keepon bits */ data->conn->writesockfd = data->conn->sockfd; - /* we want to use the _receiving_ function even when the socket turns - out writableable as the underlying libssh2 recv function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_IN; - myssh_state(data, sshc, SSH_STOP); return CURLE_OK; } @@ -2634,11 +2619,6 @@ static CURLcode ssh_state_scp_upload_init(struct Curl_easy *data, figure out a "real" bitmask */ sshc->orig_waitfor = data->req.keepon; - /* we want to use the _sending_ function even when the socket turns - out readable as the underlying libssh2 scp send function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_OUT; - myssh_state(data, sshc, SSH_STOP); return CURLE_OK; diff --git a/lib/vssh/wolfssh.c b/lib/vssh/wolfssh.c index 642193b385..000ca457f7 100644 --- a/lib/vssh/wolfssh.c +++ b/lib/vssh/wolfssh.c @@ -732,15 +732,9 @@ static CURLcode wssh_statemach_act(struct Curl_easy *data, figure out a "real" bitmask */ sshc->orig_waitfor = data->req.keepon; - /* we want to use the _sending_ function even when the socket turns - out readable as the underlying libssh2 sftp send function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_OUT; - /* since we do not really wait for anything at this point, we want the - state machine to move on as soon as possible so we set a very short - timeout here */ - Curl_expire(data, 0, EXPIRE_RUN_NOW); + state machine to move on as soon as possible */ + Curl_multi_mark_dirty(data); wssh_state(data, sshc, SSH_STOP); } @@ -828,11 +822,6 @@ static CURLcode wssh_statemach_act(struct Curl_easy *data, /* not set by Curl_xfer_setup to preserve keepon bits */ conn->writesockfd = conn->sockfd; - /* we want to use the _receiving_ function even when the socket turns - out writableable as the underlying libssh2 recv function will deal - with both accordingly */ - data->state.select_bits = CURL_CSELECT_IN; - if(result) { /* this should never occur; the close state should be entered at the time the error occurs */