]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
multi: add dirty bitset
authorStefan Eissing <stefan@eissing.org>
Wed, 18 Jun 2025 10:34:43 +0000 (12:34 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Sat, 21 Jun 2025 15:19:11 +0000 (17:19 +0200)
Add a bitset `dirty` to the multi handle. The presence of a transfer int
he "dirty" set means: this transfer has something to do ASAP.

"dirty" is set by multiplexing protocols like HTTP/2 and 3 when
encountering response data for another transfer than the current one.
"dirty" is set by protocols that want to be called.

Implementation:

* just an additional `uint_bset` in the multi handle
* `Curl_multi_mark_dirty()` to add a transfer to the dirty set.
* `multi_runsingle()` clears the dirty bit of the transfer at
   start. Without new dirty marks, this empties the set after
   al dirty transfers have been run.
* `multi_timeout()` immediately gives the current time and
   timeout_ms == 0 when dirty transfers are present.
* multi_event: marks all transfers tracked for a socket as dirty.
  Then marks all expired transfers as dirty. Then it runs
  all dirty transfers.

With this mechanism:

* Most uses of `EXPIRE_RUN_NOW` are replaced by `Curl_multi_mark_dirty()`
* `Curl_multi_mark_dirty()` is cheaper than querying if a transfer is
  already dirty or set for timeout. There is no need to check, just do it.
* `data->state.select_bits` is eliminated. We need no longer to
  simulate a poll event to make a transfer run.

Closes #17662

20 files changed:
lib/cf-h2-proxy.c
lib/connect.c
lib/doh.c
lib/easy.c
lib/http2.c
lib/imap.c
lib/multi.c
lib/multi_ev.c
lib/multi_ev.h
lib/multihandle.h
lib/multiif.h
lib/transfer.c
lib/urldata.h
lib/vquic/curl_msh3.c
lib/vquic/curl_ngtcp2.c
lib/vquic/curl_osslq.c
lib/vquic/curl_quiche.c
lib/vssh/libssh.c
lib/vssh/libssh2.c
lib/vssh/wolfssh.c

index 43eef7b4980c084ba2211106e81b6b0627a847d2..6258c18765888379457edd87a6a18156daebe411 100644 (file)
@@ -218,19 +218,10 @@ static void drain_tunnel(struct Curl_cfilter *cf,
                          struct tunnel_stream *tunnel)
 {
   struct cf_h2_proxy_ctx *ctx = cf->ctx;
-  unsigned char bits;
-
   (void)cf;
-  bits = CURL_CSELECT_IN;
   if(!tunnel->closed && !tunnel->reset &&
      !Curl_bufq_is_empty(&ctx->tunnel.sendbuf))
-    bits |= CURL_CSELECT_OUT;
-  if(data->state.select_bits != bits) {
-    CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
-                tunnel->stream_id, bits);
-    data->state.select_bits = bits;
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
+    Curl_multi_mark_dirty(data);
 }
 
 static ssize_t proxy_nw_in_reader(void *reader_ctx,
index 41435235cefb8830cbd4a7061bbb5bd2acc21d09..589f8ebfbb687d5a53d618f72b9861557f9cd03b 100644 (file)
@@ -684,7 +684,7 @@ evaluate:
         /* next attempt was started */
         CURL_TRC_CF(data, cf, "%s trying next", baller->name);
         ++ongoing;
-        Curl_expire(data, 0, EXPIRE_RUN_NOW);
+        Curl_multi_mark_dirty(data);
       }
     }
   }
index 0980a01938f39a8d8f242662a3c23acbe344ad57..376437c8b1098e93f45864b91c33c4a80de5e47d 100644 (file)
--- a/lib/doh.c
+++ b/lib/doh.c
@@ -257,7 +257,7 @@ static void doh_probe_done(struct Curl_easy *data,
 
     if(!dohp->pending) {
       /* DoH completed, run the transfer picking up the results */
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
+      Curl_multi_mark_dirty(data);
     }
   }
 }
index ca32521e3296050d06a30ed92eb5f1f80c3fadb8..d540888d3aac4938cbd86f51bdf31053dde4e3c2 100644 (file)
@@ -1167,11 +1167,6 @@ CURLcode curl_easy_pause(CURL *d, int action)
     Curl_expire(data, 0, EXPIRE_RUN_NOW);
     /* reset the too-slow time keeper */
     data->state.keeps_speed.tv_sec = 0;
-    /* Simulate socket events on next run for unpaused directions */
-    if(!send_paused_new)
-      data->state.select_bits |= CURL_CSELECT_OUT;
-    if(!recv_paused_new)
-      data->state.select_bits |= CURL_CSELECT_IN;
     /* On changes, tell application to update its timers. */
     if(changed && data->multi) {
       if(Curl_update_timer(data->multi) && !result)
index 1e583e89dc7f958535a94ce50294381e5c1bf016..58df5fac6e6722e1627c7cc6cd829ff1f1eab2ac 100644 (file)
@@ -377,27 +377,6 @@ static CURLcode cf_h2_update_local_win(struct Curl_cfilter *cf,
 }
 #endif /* !NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE */
 
-/*
- * Mark this transfer to get "drained".
- */
-static void drain_stream(struct Curl_cfilter *cf,
-                         struct Curl_easy *data,
-                         struct h2_stream_ctx *stream)
-{
-  unsigned char bits;
-
-  (void)cf;
-  bits = CURL_CSELECT_IN;
-  if(!stream->closed &&
-     (!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
-    bits |= CURL_CSELECT_OUT;
-  if(stream->closed || (data->state.select_bits != bits)) {
-    CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
-                stream->id, bits);
-    data->state.select_bits = bits;
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
-}
 
 static CURLcode http2_data_setup(struct Curl_cfilter *cf,
                                  struct Curl_easy *data,
@@ -1200,7 +1179,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
     if(stream->status_code / 100 != 1) {
       stream->resp_hds_complete = TRUE;
     }
-    drain_stream(cf, data, stream);
+    Curl_multi_mark_dirty(data);
     break;
   case NGHTTP2_PUSH_PROMISE:
     rv = push_promise(cf, data, &frame->push_promise);
@@ -1223,12 +1202,12 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
     if(frame->rst_stream.error_code) {
       stream->reset = TRUE;
     }
-    drain_stream(cf, data, stream);
+    Curl_multi_mark_dirty(data);
     break;
   case NGHTTP2_WINDOW_UPDATE:
     if(CURL_WANT_SEND(data) && Curl_bufq_is_empty(&stream->sendbuf)) {
       /* need more data, force processing of transfer */
-      drain_stream(cf, data, stream);
+      Curl_multi_mark_dirty(data);
     }
     else if(!Curl_bufq_is_empty(&stream->sendbuf)) {
       /* resume the potentially suspended stream */
@@ -1254,7 +1233,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
                                 stream->id, NGHTTP2_STREAM_CLOSED);
       stream->closed = TRUE;
     }
-    drain_stream(cf, data, stream);
+    Curl_multi_mark_dirty(data);
   }
   return CURLE_OK;
 }
@@ -1403,11 +1382,8 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
          * window and *assume* that we treat this like a WINDOW_UPDATE. Some
          * servers send an explicit WINDOW_UPDATE, but not all seem to do that.
          * To be safe, we UNHOLD a stream in order not to stall. */
-        if(CURL_WANT_SEND(data)) {
-          struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
-          if(stream)
-            drain_stream(cf, data, stream);
-        }
+        if(CURL_WANT_SEND(data))
+          Curl_multi_mark_dirty(data);
       }
       break;
     }
@@ -1552,7 +1528,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
               stream_id, nghttp2_http2_strerror(error_code), error_code);
   else
     CURL_TRC_CF(data_s, cf, "[%d] CLOSED", stream_id);
-  drain_stream(cf, data_s, stream);
+  Curl_multi_mark_dirty(data_s);
 
   /* remove `data_s` from the nghttp2 stream */
   rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
@@ -1746,7 +1722,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
     }
     /* if we receive data for another handle, wake that up */
     if(CF_DATA_CURRENT(cf) != data_s)
-      Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+      Curl_multi_mark_dirty(data_s);
 
     CURL_TRC_CF(data_s, cf, "[%d] status: HTTP/2 %03d",
                 stream->id, stream->status_code);
@@ -1773,7 +1749,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
   }
   /* if we receive data for another handle, wake that up */
   if(CF_DATA_CURRENT(cf) != data_s)
-    Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
+    Curl_multi_mark_dirty(data_s);
 
   CURL_TRC_CF(data_s, cf, "[%d] header: %.*s: %.*s",
               stream->id, (int)namelen, name, (int)valuelen, value);
@@ -2106,7 +2082,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
        * this may leave data in underlying buffers that will not
        * be consumed. */
       if(!cf->next || !cf->next->cft->has_data_pending(cf->next, data))
-        drain_stream(cf, data, stream);
+        Curl_multi_mark_dirty(data);
       break;
     }
 
@@ -2184,7 +2160,7 @@ static CURLcode cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
     nghttp2_session_consume(ctx->h2, stream->id, *pnread);
     if(stream->closed) {
       CURL_TRC_CF(data, cf, "[%d] DRAIN closed stream", stream->id);
-      drain_stream(cf, data, stream);
+      Curl_multi_mark_dirty(data);
     }
   }
 
@@ -2195,7 +2171,7 @@ out:
      * monitor the socket for POLLOUT, but when not SENDING
      * any more, we force processing of the transfer. */
     if(!CURL_WANT_SEND(data))
-      drain_stream(cf, data, stream);
+      Curl_multi_mark_dirty(data);
   }
   else if(r2) {
     result = r2;
@@ -2712,8 +2688,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
        * not. We may have already buffered and exhausted the new window
        * by operating on things in flight during the handling of other
        * transfers. */
-      drain_stream(cf, data, stream);
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
+      Curl_multi_mark_dirty(data);
     }
     CURL_TRC_CF(data, cf, "[%d] stream now %spaused", stream->id,
                 pause ? "" : "un");
index 4e6c01475c3ddb04b160bcca09f39188dd526af5..267671a56f33db0c78eb07302a2ad19ce11ead77 100644 (file)
@@ -1347,9 +1347,6 @@ static CURLcode imap_state_fetch_resp(struct Curl_easy *data,
     else {
       /* IMAP download */
       data->req.maxdownload = size;
-      /* force a recv/send check of this connection, as the data might've been
-       read off the socket already */
-      data->state.select_bits = CURL_CSELECT_IN;
       Curl_xfer_setup1(data, CURL_XFER_RECV, size, FALSE);
     }
   }
index 741576ddcc7aa80e73ace8632c6e9df116587eed..d0cb60afd2aad0e6eb336f590ff64682864d6bf1 100644 (file)
@@ -235,6 +235,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size,
   Curl_multi_ev_init(multi, ev_hashsize);
   Curl_uint_tbl_init(&multi->xfers, NULL);
   Curl_uint_bset_init(&multi->process);
+  Curl_uint_bset_init(&multi->dirty);
   Curl_uint_bset_init(&multi->pending);
   Curl_uint_bset_init(&multi->msgsent);
   Curl_hash_init(&multi->proto_hash, 23,
@@ -247,6 +248,7 @@ struct Curl_multi *Curl_multi_handle(unsigned int xfer_table_size,
 
   if(Curl_uint_bset_resize(&multi->process, xfer_table_size) ||
      Curl_uint_bset_resize(&multi->pending, xfer_table_size) ||
+     Curl_uint_bset_resize(&multi->dirty, xfer_table_size) ||
      Curl_uint_bset_resize(&multi->msgsent, xfer_table_size) ||
      Curl_uint_tbl_resize(&multi->xfers, xfer_table_size))
     goto error;
@@ -301,6 +303,7 @@ error:
   }
 
   Curl_uint_bset_destroy(&multi->process);
+  Curl_uint_bset_destroy(&multi->dirty);
   Curl_uint_bset_destroy(&multi->pending);
   Curl_uint_bset_destroy(&multi->msgsent);
   Curl_uint_tbl_destroy(&multi->xfers);
@@ -355,6 +358,7 @@ static CURLMcode multi_xfers_add(struct Curl_multi *multi,
      * to work properly when larger than the table, but not
      * the other way around. */
     if(Curl_uint_bset_resize(&multi->process, newsize) ||
+       Curl_uint_bset_resize(&multi->dirty, newsize) ||
        Curl_uint_bset_resize(&multi->pending, newsize) ||
        Curl_uint_bset_resize(&multi->msgsent, newsize) ||
        Curl_uint_tbl_resize(&multi->xfers, newsize))
@@ -401,6 +405,7 @@ CURLMcode curl_multi_add_handle(CURLM *m, CURL *d)
       return CURLM_ABORTED_BY_CALLBACK;
     multi->dead = FALSE;
     Curl_uint_bset_clear(&multi->process);
+    Curl_uint_bset_clear(&multi->dirty);
     Curl_uint_bset_clear(&multi->pending);
     Curl_uint_bset_clear(&multi->msgsent);
   }
@@ -795,6 +800,7 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d)
   DEBUGASSERT(Curl_uint_tbl_contains(&multi->xfers, mid));
   Curl_uint_tbl_remove(&multi->xfers, mid);
   Curl_uint_bset_remove(&multi->process, mid);
+  Curl_uint_bset_remove(&multi->dirty, mid);
   Curl_uint_bset_remove(&multi->pending, mid);
   Curl_uint_bset_remove(&multi->msgsent, mid);
   data->multi = NULL;
@@ -1048,8 +1054,8 @@ void Curl_multi_getsock(struct Curl_easy *data,
       (Curl_pollset_want_read(data, ps, data->conn->sock[SECONDARYSOCKET]) &&
        Curl_conn_data_pending(data, SECONDARYSOCKET)))) {
     CURL_TRC_M(data, "%s pollset[] has POLLIN, but there is still "
-               "buffered input to consume -> EXPIRE_RUN_NOW", caller);
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
+               "buffered input to consume -> mark as dirty", caller);
+    Curl_multi_mark_dirty(data);
   }
 
   switch(ps->num) {
@@ -1965,14 +1971,6 @@ static CURLMcode state_performing(struct Curl_easy *data,
       }
     }
   }
-  else if(data->state.select_bits && !Curl_xfer_is_blocked(data)) {
-    /* This avoids CURLM_CALL_MULTI_PERFORM so that a very fast transfer does
-       not get stuck on this transfer at the expense of other concurrent
-       transfers */
-    CURL_TRC_M(data, "EXPIRE_RUN_NOW unblocked, select_bits=%x",
-               data->state.select_bits);
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
   free(newurl);
   *resultp = result;
   return rc;
@@ -2297,6 +2295,10 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
 
   multi_warn_debug(multi, data);
 
+  /* transfer runs now, clear the dirty bit. This may be set
+   * again during processing, triggering a re-run later. */
+  Curl_uint_bset_remove(&multi->dirty, data->mid);
+
   do {
     /* A "stream" here is a logical stream if the protocol can handle that
        (HTTP/2), or the full connection for older protocols */
@@ -2840,6 +2842,7 @@ CURLMcode curl_multi_cleanup(CURLM *m)
     }
 #endif
     Curl_uint_bset_destroy(&multi->process);
+    Curl_uint_bset_destroy(&multi->dirty);
     Curl_uint_bset_destroy(&multi->pending);
     Curl_uint_bset_destroy(&multi->msgsent);
     Curl_uint_tbl_destroy(&multi->xfers);
@@ -2963,12 +2966,11 @@ struct multi_run_ctx {
   bool run_cpool;
 };
 
-static CURLMcode multi_run_expired(struct multi_run_ctx *mrc)
+static void multi_mark_expired_as_dirty(struct multi_run_ctx *mrc)
 {
   struct Curl_multi *multi = mrc->multi;
   struct Curl_easy *data = NULL;
   struct Curl_tree *t = NULL;
-  CURLMcode result = CURLM_OK;
 
   /*
    * The loop following here will go on as long as there are expire-times left
@@ -2980,33 +2982,59 @@ static CURLMcode multi_run_expired(struct multi_run_ctx *mrc)
        extracts a matching node if there is one */
     multi->timetree = Curl_splaygetbest(mrc->now, multi->timetree, &t);
     if(!t)
-      goto out;
+      return;
 
     data = Curl_splayget(t); /* assign this for next loop */
     if(!data)
       continue;
 
     (void)add_next_timeout(mrc->now, multi, data);
-    if(data == multi->admin) {
-      mrc->run_cpool = TRUE;
-      continue;
-    }
+    Curl_multi_mark_dirty(data);
+  }
+}
 
-    mrc->run_xfers++;
-    sigpipe_apply(data, &mrc->pipe_st);
-    result = multi_runsingle(multi, &mrc->now, data);
+static CURLMcode multi_run_dirty(struct multi_run_ctx *mrc)
+{
+  struct Curl_multi *multi = mrc->multi;
+  CURLMcode result = CURLM_OK;
+  unsigned int mid;
 
-    if(CURLM_OK >= result) {
-      /* reassess event handling of data */
-      result = Curl_multi_ev_assess_xfer(multi, data);
-      if(result)
-        goto out;
+  if(Curl_uint_bset_first(&multi->dirty, &mid)) {
+    do {
+      struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
+      if(data) {
+        CURL_TRC_M(data, "multi_run_dirty");
+
+        if(data == multi->admin) {
+          Curl_uint_bset_remove(&multi->dirty, mid);
+          mrc->run_cpool = TRUE;
+          continue;
+        }
+
+        mrc->run_xfers++;
+        sigpipe_apply(data, &mrc->pipe_st);
+        /* runsingle() clears the dirty mid */
+        result = multi_runsingle(multi, &mrc->now, data);
+
+        if(CURLM_OK >= result) {
+          /* reassess event handling of data */
+          result = Curl_multi_ev_assess_xfer(multi, data);
+          if(result)
+            goto out;
+        }
+      }
+      else {
+        CURL_TRC_M(multi->admin, "multi_run_dirty, %u no longer found", mid);
+        Curl_uint_bset_remove(&multi->dirty, mid);
+      }
     }
+    while(Curl_uint_bset_next(&multi->dirty, mid, &mid));
   }
 
 out:
   return result;
 }
+
 static CURLMcode multi_socket(struct Curl_multi *multi,
                               bool checkall,
                               curl_socket_t s,
@@ -3035,7 +3063,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
   }
 
   if(s != CURL_SOCKET_TIMEOUT) {
-    Curl_multi_ev_expire_xfers(multi, s, &mrc.now, &mrc.run_cpool);
+    /* Mark all transfers of that socket as dirty */
+    Curl_multi_ev_dirty_xfers(multi, s, &mrc.run_cpool);
   }
   else {
     /* Asked to run due to time-out. Clear the 'last_expire_ts' variable to
@@ -3047,7 +3076,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
     mrc.run_cpool = TRUE;
   }
 
-  result = multi_run_expired(&mrc);
+  multi_mark_expired_as_dirty(&mrc);
+  result = multi_run_dirty(&mrc);
   if(result)
     goto out;
 
@@ -3058,7 +3088,8 @@ static CURLMcode multi_socket(struct Curl_multi *multi,
      * Do that only once or it might be unfair to transfers on other
      * sockets. */
     mrc.now = curlx_now();
-    result = multi_run_expired(&mrc);
+    multi_mark_expired_as_dirty(&mrc);
+    result = multi_run_dirty(&mrc);
   }
 
 out:
@@ -3186,6 +3217,26 @@ CURLMcode curl_multi_socket_all(CURLM *m, int *running_handles)
   return multi_socket(multi, TRUE, CURL_SOCKET_BAD, 0, running_handles);
 }
 
+
+static bool multi_has_dirties(struct Curl_multi *multi)
+{
+  unsigned int mid;
+  if(Curl_uint_bset_first(&multi->dirty, &mid)) {
+    do {
+      struct Curl_easy *data = Curl_multi_get_easy(multi, mid);
+      if(data) {
+        return TRUE;
+      }
+      else {
+        CURL_TRC_M(multi->admin, "dirty transfer %u no longer found", mid);
+        Curl_uint_bset_remove(&multi->dirty, mid);
+      }
+    }
+    while(Curl_uint_bset_next(&multi->dirty, mid, &mid));
+  }
+  return FALSE;
+}
+
 static CURLMcode multi_timeout(struct Curl_multi *multi,
                                struct curltime *expire_time,
                                long *timeout_ms)
@@ -3197,7 +3248,12 @@ static CURLMcode multi_timeout(struct Curl_multi *multi,
     return CURLM_OK;
   }
 
-  if(multi->timetree) {
+  if(multi_has_dirties(multi)) {
+    *expire_time = curlx_now();
+    *timeout_ms = 0;
+    return CURLM_OK;
+  }
+  else if(multi->timetree) {
     /* we have a tree of expire times */
     struct curltime now = curlx_now();
 
@@ -3791,6 +3847,12 @@ unsigned int Curl_multi_xfers_running(struct Curl_multi *multi)
   return multi->xfers_alive;
 }
 
+void Curl_multi_mark_dirty(struct Curl_easy *data)
+{
+  if(data->multi && data->mid != UINT_MAX)
+    Curl_uint_bset_add(&data->multi->dirty, data->mid);
+}
+
 #ifdef DEBUGBUILD
 static void multi_xfer_dump(struct Curl_multi *multi, unsigned int mid,
                              void *entry)
index 21d28676195fce89a7580c6cc44ca008dcff4309..0b4c472849d4ad72e8bee201b7d6bdd69423e1f9 100644 (file)
@@ -563,10 +563,9 @@ CURLMcode Curl_multi_ev_assign(struct Curl_multi *multi,
   return CURLM_OK;
 }
 
-void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
-                                curl_socket_t s,
-                                const struct curltime *nowp,
-                                bool *run_cpool)
+void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
+                               curl_socket_t s,
+                               bool *run_cpool)
 {
   struct mev_sh_entry *entry;
 
@@ -586,9 +585,11 @@ void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
       do {
         data = Curl_multi_get_easy(multi, mid);
         if(data) {
-          /* Expire with out current now, so we will get it below when
-           * asking the splaytree for expired transfers. */
-          Curl_expire_ex(data, nowp, 0, EXPIRE_RUN_NOW);
+          Curl_multi_mark_dirty(data);
+        }
+        else {
+          CURL_TRC_M(multi->admin, "socket transfer %u no longer found", mid);
+          Curl_uint_spbset_remove(&entry->xfers, mid);
         }
       }
       while(Curl_uint_spbset_next(&entry->xfers, mid, &mid));
index 06be842fb96ebe218861dce2fa8d28ce9e0261c1..20c1aeac81a68722953219f0a544ad73b2846e65 100644 (file)
@@ -61,11 +61,10 @@ CURLMcode Curl_multi_ev_assess_conn(struct Curl_multi *multi,
                                     struct Curl_easy *data,
                                     struct connectdata *conn);
 
-/* Expire all transfers tied to the given socket */
-void Curl_multi_ev_expire_xfers(struct Curl_multi *multi,
-                                curl_socket_t s,
-                                const struct curltime *nowp,
-                                bool *run_cpool);
+/* Mark all transfers tied to the given socket as dirty */
+void Curl_multi_ev_dirty_xfers(struct Curl_multi *multi,
+                               curl_socket_t s,
+                               bool *run_cpool);
 
 /* Socket will be closed, forget anything we know about it. */
 void Curl_multi_ev_socket_done(struct Curl_multi *multi,
index 04db02f0b160c3c86967376980202dcfb86afcd1..4bf2dd811c24b9b7e19f0c63dff99e11b04b6682 100644 (file)
@@ -98,6 +98,7 @@ struct Curl_multi {
   struct uint_tbl xfers; /* transfers added to this multi */
   /* Each transfer's mid may be present in at most one of these */
   struct uint_bset process; /* transfer being processed */
+  struct uint_bset dirty; /* transfer to be run NOW, e.g. ASAP. */
   struct uint_bset pending; /* transfers in waiting (conn limit etc.) */
   struct uint_bset msgsent; /* transfers done with message for application */
 
index eae634ab40577993e25d2dc8de367434df4cf76c..e68a278a8da5412fbd727dc5f0318260dbbfec94 100644 (file)
@@ -173,4 +173,8 @@ struct Curl_easy *Curl_multi_get_easy(struct Curl_multi *multi,
 /* Get the # of transfers current in process/pending. */
 unsigned int Curl_multi_xfers_running(struct Curl_multi *multi);
 
+/* Mark a transfer as dirty, e.g. to be rerun at earliest convenience.
+ * A cheap operation, can be done many times repeatedly. */
+void Curl_multi_mark_dirty(struct Curl_easy *data);
+
 #endif /* HEADER_CURL_MULTIIF_H */
index 603efc4b1f1a542824046350eeae1bb5bccb6144..a348383c9ac3d849b769f0abc8393e6559cb48f2 100644 (file)
@@ -358,16 +358,12 @@ static CURLcode sendrecv_dl(struct Curl_easy *data,
 
   } while(maxloops--);
 
-  if(!rcvd_eagain || data_pending(data, rcvd_eagain)) {
+  if(!Curl_xfer_is_blocked(data) &&
+     (!rcvd_eagain || data_pending(data, rcvd_eagain))) {
     /* Did not read until EAGAIN or there is still data pending
      * in buffers. Mark as read-again via simulated SELECT results. */
-    data->state.select_bits = CURL_CSELECT_IN;
-    if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
-      data->state.select_bits |= CURL_CSELECT_OUT;
-    if(!Curl_xfer_is_blocked(data))
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
-    CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, "
-               "set select_bits=%x", data->state.select_bits);
+    Curl_multi_mark_dirty(data);
+    CURL_TRC_M(data, "sendrecv_dl() no EAGAIN/pending data, mark as dirty");
   }
 
   if(((k->keepon & (KEEP_RECV|KEEP_SEND)) == KEEP_SEND) &&
@@ -403,22 +399,6 @@ static CURLcode sendrecv_ul(struct Curl_easy *data, int *didwhat)
   return CURLE_OK;
 }
 
-static int select_bits_paused(struct Curl_easy *data, int select_bits)
-{
-  /* See issue #11982: we really need to be careful not to progress
-   * a transfer direction when that direction is paused. Not all parts
-   * of our state machine are handling PAUSED transfers correctly. So, we
-   * do not want to go there.
-   * NOTE: we are only interested in PAUSE, not HOLD. */
-
-  /* if there is data in a direction not paused, return false */
-  if(((select_bits & CURL_CSELECT_IN) && !Curl_xfer_recv_is_paused(data)) ||
-     ((select_bits & CURL_CSELECT_OUT) && !Curl_xfer_send_is_paused(data)))
-    return FALSE;
-
-  return Curl_xfer_recv_is_paused(data) || Curl_xfer_send_is_paused(data);
-}
-
 /*
  * Curl_sendrecv() is the low-level function to be called when data is to
  * be read and written to/from the connection.
@@ -430,14 +410,9 @@ CURLcode Curl_sendrecv(struct Curl_easy *data, struct curltime *nowp)
   int didwhat = 0;
 
   DEBUGASSERT(nowp);
-  if(data->state.select_bits) {
-    if(select_bits_paused(data, data->state.select_bits)) {
-      /* leave the bits unchanged, so they'll tell us what to do when
-       * this transfer gets unpaused. */
-      result = CURLE_OK;
-      goto out;
-    }
-    data->state.select_bits = 0;
+  if(Curl_xfer_is_blocked(data)) {
+    result = CURLE_OK;
+    goto out;
   }
 
   /* We go ahead and do a read if we have a readable socket or if the stream
index 20c8adf281c7815870d789ff8a8657bc4b2f4a7a..63bee65dc9d26d0de01650f19cba128b1ccf02d0 100644 (file)
@@ -1155,9 +1155,6 @@ struct UrlState {
 #endif
   unsigned char httpreq; /* Curl_HttpReq; what kind of HTTP request (if any)
                             is this */
-  unsigned char select_bits; /* != 0 -> bitmask of socket events for this
-                                 transfer overriding anything the socket may
-                                 report */
   unsigned int creds_from:2; /* where is the server credentials originating
                                 from, see the CREDS_* defines above */
 
index 947ce4fbebf65a82a641374c741f9111e5f15263..543902dea116c8590f26269cd36300666e86754d 100644 (file)
@@ -244,33 +244,17 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
 static void drain_stream_from_other_thread(struct Curl_easy *data,
                                            struct h3_stream_ctx *stream)
 {
-  unsigned char bits;
-
-  /* risky */
-  bits = CURL_CSELECT_IN;
-  if(stream && !stream->upload_done)
-    bits |= CURL_CSELECT_OUT;
-  if(data->state.select_bits != bits) {
-    data->state.select_bits = bits;
-    /* cannot expire from other thread */
-  }
+  (void)data;
+  (void)stream;
+  /* cannot expire from other thread.
+     here is the disconnect between msh3 and curl */
 }
 
 static void h3_drain_stream(struct Curl_cfilter *cf,
                             struct Curl_easy *data)
 {
-  struct cf_msh3_ctx *ctx = cf->ctx;
-  struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
-  unsigned char bits;
-
   (void)cf;
-  bits = CURL_CSELECT_IN;
-  if(stream && !stream->upload_done)
-    bits |= CURL_CSELECT_OUT;
-  if(data->state.select_bits != bits) {
-    data->state.select_bits = bits;
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
+  Curl_multi_mark_dirty(data);
 }
 
 static const MSH3_CONNECTION_IF msh3_conn_if = {
index 2cb44693990efa2638c0eea1b8b87713b00e17b9..444e9ca2bf46a55b9f8bbad58e51c6f0a2c6bbc3 100644 (file)
@@ -342,23 +342,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
   }
 }
 
-static void h3_drain_stream(struct Curl_cfilter *cf,
-                            struct Curl_easy *data)
-{
-  struct cf_ngtcp2_ctx *ctx = cf->ctx;
-  struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
-  unsigned char bits;
-
-  (void)cf;
-  bits = CURL_CSELECT_IN;
-  if(stream && stream->upload_left && !stream->send_closed)
-    bits |= CURL_CSELECT_OUT;
-  if(data->state.select_bits != bits) {
-    data->state.select_bits = bits;
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
-}
-
 /* ngtcp2 default congestion controller does not perform pacing. Limit
    the maximum packet burst to MAX_PKT_BURST packets. */
 #define MAX_PKT_BURST 10
@@ -744,7 +727,7 @@ static int cb_extend_max_stream_data(ngtcp2_conn *tconn, int64_t stream_id,
     CURL_TRC_CF(s_data, cf, "[%" FMT_PRId64 "] unblock quic flow",
                 (curl_int64_t)stream_id);
     stream->quic_flow_blocked = FALSE;
-    h3_drain_stream(cf, s_data);
+    Curl_multi_mark_dirty(s_data);
   }
   return 0;
 }
@@ -971,7 +954,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t sid,
   else {
     CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->id);
   }
-  h3_drain_stream(cf, data);
+  Curl_multi_mark_dirty(data);
   return 0;
 }
 
@@ -1072,7 +1055,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid,
   if(stream->status_code / 100 != 1) {
     stream->resp_hds_complete = TRUE;
   }
-  h3_drain_stream(cf, data);
+  Curl_multi_mark_dirty(data);
   return 0;
 }
 
@@ -1984,10 +1967,9 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf,
 {
   /* There seems to exist no API in ngtcp2 to shrink/enlarge the streams
    * windows. As we do in HTTP/2. */
-  if(!pause) {
-    h3_drain_stream(cf, data);
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
+  (void)cf;
+  if(!pause)
+    Curl_multi_mark_dirty(data);
   return CURLE_OK;
 }
 
index 5238c582acff6f868525a0390a985836062af508..aa93ae70e2da3a138092abc936e7617e0b6ff113 100644 (file)
@@ -711,31 +711,14 @@ static struct cf_osslq_stream *cf_osslq_get_qstream(struct Curl_cfilter *cf,
   return NULL;
 }
 
-static void h3_drain_stream(struct Curl_cfilter *cf,
-                            struct Curl_easy *data)
-{
-  struct cf_osslq_ctx *ctx = cf->ctx;
-  struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
-  unsigned char bits;
-
-  (void)cf;
-  bits = CURL_CSELECT_IN;
-  if(stream && stream->upload_left && !stream->send_closed)
-    bits |= CURL_CSELECT_OUT;
-  if(data->state.select_bits != bits) {
-    data->state.select_bits = bits;
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
-}
-
 static CURLcode h3_data_pause(struct Curl_cfilter *cf,
                               struct Curl_easy *data,
                               bool pause)
 {
+  (void)cf;
   if(!pause) {
     /* unpaused. make it run again right away */
-    h3_drain_stream(cf, data);
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
+    Curl_multi_mark_dirty(data);
   }
   return CURLE_OK;
 }
@@ -766,7 +749,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
   else {
     CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] CLOSED", stream->s.id);
   }
-  h3_drain_stream(cf, data);
+  Curl_multi_mark_dirty(data);
   return 0;
 }
 
@@ -831,7 +814,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
   stream->download_recvd += (curl_off_t)buflen;
   CURL_TRC_CF(data, cf, "[%" FMT_PRId64 "] DATA len=%zu, total=%" FMT_OFF_T,
               stream->s.id, buflen, stream->download_recvd);
-  h3_drain_stream(cf, data);
+  Curl_multi_mark_dirty(data);
   return 0;
 }
 
@@ -943,7 +926,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid,
   if(stream->status_code / 100 != 1) {
     stream->resp_hds_complete = TRUE;
   }
-  h3_drain_stream(cf, data);
+  Curl_multi_mark_dirty(data);
   return 0;
 }
 
@@ -1566,7 +1549,7 @@ static CURLcode cf_osslq_check_and_unblock(struct Curl_cfilter *cf,
           if(stream) {
             nghttp3_conn_unblock_stream(ctx->h3.conn, stream->s.id);
             stream->s.send_blocked = FALSE;
-            h3_drain_stream(cf, ctx->curl_items[idx_count]);
+            Curl_multi_mark_dirty(ctx->curl_items[idx_count]);
             CURL_TRC_CF(ctx->curl_items[idx_count], cf, "unblocked");
           }
           result_count--;
@@ -2163,7 +2146,7 @@ static CURLcode cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
   }
 
   if(*pnread) {
-    h3_drain_stream(cf, data);
+    Curl_multi_mark_dirty(data);
   }
   else {
     if(stream->closed) {
index 129b60604363ae2b2e6eb228cd0aa2ba43de0b9c..6b9ab2a0ece65862ef247c06f57c6698e580272a 100644 (file)
@@ -237,7 +237,7 @@ static bool cf_quiche_do_resume(struct Curl_cfilter *cf,
   (void)user_data;
   if(stream->quic_flow_blocked) {
     stream->quic_flow_blocked = FALSE;
-    Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
+    Curl_multi_mark_dirty(sdata);
     CURL_TRC_CF(sdata, cf, "[%"FMT_PRIu64"] unblock", stream->id);
   }
   return TRUE;
@@ -250,8 +250,8 @@ static bool cf_quiche_do_expire(struct Curl_cfilter *cf,
 {
   (void)stream;
   (void)user_data;
-  CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
-  Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
+  CURL_TRC_CF(sdata, cf, "conn closed, mark as dirty");
+  Curl_multi_mark_dirty(sdata);
   return TRUE;
 }
 
@@ -307,23 +307,6 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
   }
 }
 
-static void h3_drain_stream(struct Curl_cfilter *cf,
-                            struct Curl_easy *data)
-{
-  struct cf_quiche_ctx *ctx = cf->ctx;
-  struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
-  unsigned char bits;
-
-  (void)cf;
-  bits = CURL_CSELECT_IN;
-  if(stream && !stream->send_closed)
-    bits |= CURL_CSELECT_OUT;
-  if(data->state.select_bits != bits) {
-    data->state.select_bits = bits;
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
-  }
-}
-
 static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
                                          struct Curl_easy *data)
 {
@@ -562,7 +545,7 @@ static CURLcode cf_quiche_ev_process(struct Curl_cfilter *cf,
                                      quiche_h3_event *ev)
 {
   CURLcode result = h3_process_event(cf, data, stream, ev);
-  h3_drain_stream(cf, data);
+  Curl_multi_mark_dirty(data);
   if(result)
     CURL_TRC_CF(data, cf, "error processing event %s "
                 "for [%"FMT_PRIu64"] -> %d", cf_ev_name(ev),
@@ -917,7 +900,7 @@ static CURLcode cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
 
   if(*pnread) {
     if(stream->closed)
-      h3_drain_stream(cf, data);
+      Curl_multi_mark_dirty(data);
   }
   else {
     if(stream->closed) {
@@ -1229,9 +1212,9 @@ static CURLcode h3_data_pause(struct Curl_cfilter *cf,
 {
   /* There seems to exist no API in quiche to shrink/enlarge the streams
    * windows. As we do in HTTP/2. */
+  (void)cf;
   if(!pause) {
-    h3_drain_stream(cf, data);
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
+    Curl_multi_mark_dirty(data);
   }
   return CURLE_OK;
 }
index 84ef9e1ca941f558e8860d8b189baa84a116a236..643b8f4987f7a0bea212f2ca70f35fd0719ef1d3 100644 (file)
@@ -1256,15 +1256,9 @@ static int myssh_in_UPLOAD_INIT(struct Curl_easy *data,
      figure out a "real" bitmask */
   sshc->orig_waitfor = data->req.keepon;
 
-  /* we want to use the _sending_ function even when the socket turns
-     out readable as the underlying libssh sftp send function will deal
-     with both accordingly */
-  data->state.select_bits = CURL_CSELECT_OUT;
-
   /* since we do not really wait for anything at this point, we want the
-     state machine to move on as soon as possible so we set a very short
-     timeout here */
-  Curl_expire(data, 0, EXPIRE_RUN_NOW);
+     state machine to move on as soon as possible so we mark this as dirty */
+  Curl_multi_mark_dirty(data);
 #if LIBSSH_VERSION_INT > SSH_VERSION_INT(0, 11, 0)
   sshc->sftp_send_state = 0;
 #endif
@@ -1430,11 +1424,6 @@ static int myssh_in_SFTP_DOWNLOAD_STAT(struct Curl_easy *data,
   /* not set by Curl_xfer_setup to preserve keepon bits */
   data->conn->writesockfd = data->conn->sockfd;
 
-  /* we want to use the _receiving_ function even when the socket turns
-     out writableable as the underlying libssh recv function will deal
-     with both accordingly */
-  data->state.select_bits = CURL_CSELECT_IN;
-
   sshc->sftp_recv_state = 0;
   myssh_to(data, sshc, SSH_STOP);
 
@@ -2258,11 +2247,6 @@ static CURLcode myssh_statemach_act(struct Curl_easy *data,
          figure out a "real" bitmask */
       sshc->orig_waitfor = data->req.keepon;
 
-      /* we want to use the _sending_ function even when the socket turns
-         out readable as the underlying libssh scp send function will deal
-         with both accordingly */
-      data->state.select_bits = CURL_CSELECT_OUT;
-
       myssh_to(data, sshc, SSH_STOP);
 
       break;
@@ -2298,11 +2282,6 @@ static CURLcode myssh_statemach_act(struct Curl_easy *data,
         /* not set by Curl_xfer_setup to preserve keepon bits */
         conn->writesockfd = conn->sockfd;
 
-        /* we want to use the _receiving_ function even when the socket turns
-           out writableable as the underlying libssh recv function will deal
-           with both accordingly */
-        data->state.select_bits = CURL_CSELECT_IN;
-
         myssh_to(data, sshc, SSH_STOP);
         break;
     }
index 32ed0845b046ff7b97fc7efcd5e2cb7d95e58a79..c9ca46f5619970f8f9c7cde9f3d9ad89d65eced2 100644 (file)
@@ -1209,15 +1209,9 @@ sftp_upload_init(struct Curl_easy *data,
      figure out a "real" bitmask */
   sshc->orig_waitfor = data->req.keepon;
 
-  /* we want to use the _sending_ function even when the socket turns
-     out readable as the underlying libssh2 sftp send function will deal
-     with both accordingly */
-  data->state.select_bits = CURL_CSELECT_OUT;
-
   /* since we do not really wait for anything at this point, we want the
-     state machine to move on as soon as possible so we set a very short
-     timeout here */
-  Curl_expire(data, 0, EXPIRE_RUN_NOW);
+     state machine to move on as soon as possible so mark this as dirty */
+  Curl_multi_mark_dirty(data);
 
   myssh_state(data, sshc, SSH_STOP);
   return CURLE_OK;
@@ -1552,10 +1546,6 @@ sftp_download_stat(struct Curl_easy *data,
   /* not set by Curl_xfer_setup to preserve keepon bits */
   data->conn->writesockfd = data->conn->sockfd;
 
-  /* we want to use the _receiving_ function even when the socket turns
-     out writableable as the underlying libssh2 recv function will deal
-     with both accordingly */
-  data->state.select_bits = CURL_CSELECT_IN;
   myssh_state(data, sshc, SSH_STOP);
 
   return CURLE_OK;
@@ -2476,11 +2466,6 @@ static CURLcode ssh_state_scp_download_init(struct Curl_easy *data,
   /* not set by Curl_xfer_setup to preserve keepon bits */
   data->conn->writesockfd = data->conn->sockfd;
 
-  /* we want to use the _receiving_ function even when the socket turns
-     out writableable as the underlying libssh2 recv function will deal
-     with both accordingly */
-  data->state.select_bits = CURL_CSELECT_IN;
-
   myssh_state(data, sshc, SSH_STOP);
   return CURLE_OK;
 }
@@ -2634,11 +2619,6 @@ static CURLcode ssh_state_scp_upload_init(struct Curl_easy *data,
      figure out a "real" bitmask */
   sshc->orig_waitfor = data->req.keepon;
 
-  /* we want to use the _sending_ function even when the socket turns
-     out readable as the underlying libssh2 scp send function will deal
-     with both accordingly */
-  data->state.select_bits = CURL_CSELECT_OUT;
-
   myssh_state(data, sshc, SSH_STOP);
 
   return CURLE_OK;
index 642193b385e2f9557e6dc7a3dd49976eb1e8bc07..000ca457f74e4e2ae8b0c932f5940a1caa249a54 100644 (file)
@@ -732,15 +732,9 @@ static CURLcode wssh_statemach_act(struct Curl_easy *data,
            figure out a "real" bitmask */
         sshc->orig_waitfor = data->req.keepon;
 
-        /* we want to use the _sending_ function even when the socket turns
-           out readable as the underlying libssh2 sftp send function will deal
-           with both accordingly */
-        data->state.select_bits = CURL_CSELECT_OUT;
-
         /* since we do not really wait for anything at this point, we want the
-           state machine to move on as soon as possible so we set a very short
-           timeout here */
-        Curl_expire(data, 0, EXPIRE_RUN_NOW);
+           state machine to move on as soon as possible */
+        Curl_multi_mark_dirty(data);
 
         wssh_state(data, sshc, SSH_STOP);
       }
@@ -828,11 +822,6 @@ static CURLcode wssh_statemach_act(struct Curl_easy *data,
       /* not set by Curl_xfer_setup to preserve keepon bits */
       conn->writesockfd = conn->sockfd;
 
-      /* we want to use the _receiving_ function even when the socket turns
-         out writableable as the underlying libssh2 recv function will deal
-         with both accordingly */
-      data->state.select_bits = CURL_CSELECT_IN;
-
       if(result) {
         /* this should never occur; the close state should be entered
            at the time the error occurs */