]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
h2/h3: replace `state.drain` counter with `state.dselect_bits`
authorStefan Eissing <stefan@eissing.org>
Fri, 21 Apr 2023 10:04:46 +0000 (12:04 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Tue, 25 Apr 2023 15:49:28 +0000 (17:49 +0200)
- `drain` was used by http/2 and http/3 implementations to indicate
  that the transfer requires send/recv independant from its socket
  poll state. Intended as a counter, it was used as bool flag only.
- a similar mechanism exists on `connectdata->cselect_bits` where
  specific protocols can indicate something similar, only for the
  whole connection.
- `cselect_bits` are cleard in transfer.c on use and, importantly,
  also set when the transfer loop expended its `maxloops` tries.
  `drain` was not cleared by transfer and the http2/3 implementations
  had to take care of that.
- `dselect_bits` is cleared *and* set by the transfer loop. http2/3
  does no longer clear it, only set when new events happen.

This change unifies the handling of socket poll overrides, extending
`cselect_bits` by a easy handle specific value and a common treatment in
transfers.

Closes #11005

lib/http2.c
lib/transfer.c
lib/urldata.h
lib/vquic/curl_msh3.c
lib/vquic/curl_ngtcp2.c
lib/vquic/curl_quiche.c
lib/vquic/vquic.c
tests/http/test_05_errors.py

index 4940918f8679b84a1257786313e9c984a46bdc34..9da3cae17b128925d49ce748ea428db7170da4a8 100644 (file)
@@ -154,34 +154,6 @@ static void cf_h2_ctx_free(struct cf_h2_ctx *ctx)
   }
 }
 
-/*
- * This specific transfer on this connection has been "drained".
- */
-static void drained_transfer(struct Curl_cfilter *cf,
-                             struct Curl_easy *data)
-{
-  if(data->state.drain) {
-    struct cf_h2_ctx *ctx = cf->ctx;
-    DEBUGASSERT(ctx->drain_total > 0);
-    ctx->drain_total--;
-    data->state.drain = 0;
-  }
-}
-
-/*
- * Mark this transfer to get "drained".
- */
-static void drain_this(struct Curl_cfilter *cf,
-                       struct Curl_easy *data)
-{
-  if(!data->state.drain) {
-    struct cf_h2_ctx *ctx = cf->ctx;
-    data->state.drain = 1;
-    ctx->drain_total++;
-    DEBUGASSERT(ctx->drain_total > 0);
-  }
-}
-
 /**
  * All about the H3 internals of a stream
  */
@@ -213,6 +185,25 @@ struct stream_ctx {
 #define H2_STREAM_ID(d)     (H2_STREAM_CTX(d)? \
                              H2_STREAM_CTX(d)->id : -2)
 
+/*
+ * Mark this transfer to get "drained".
+ */
+static void drain_stream(struct Curl_cfilter *cf,
+                         struct Curl_easy *data,
+                         struct stream_ctx *stream)
+{
+  int bits;
+
+  (void)cf;
+  bits = CURL_CSELECT_IN;
+  if(stream->upload_left)
+    bits |= CURL_CSELECT_OUT;
+  if(data->state.dselect_bits != bits) {
+    data->state.dselect_bits = bits;
+    Curl_expire(data, 0, EXPIRE_RUN_NOW);
+  }
+}
+
 static CURLcode http2_data_setup(struct Curl_cfilter *cf,
                                  struct Curl_easy *data,
                                  struct stream_ctx **pstream)
@@ -276,8 +267,6 @@ static void http2_data_done(struct Curl_cfilter *cf,
         (void)nghttp2_session_send(ctx->h2);
     }
 
-    drained_transfer(cf, data);
-
     /* -1 means unassigned and 0 means cleared */
     if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) {
       int rv = nghttp2_session_set_stream_user_data(ctx->h2,
@@ -515,8 +504,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf,
   while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) {
 
     rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen);
-    DEBUGF(LOG_CF(data, cf,
-                 "fed %zu bytes from nw to nghttp2 -> %zd", blen, rv));
     if(rv < 0) {
       failf(data,
             "process_pending_input: nghttp2_session_mem_recv() returned "
@@ -526,7 +513,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf,
     }
     Curl_bufq_skip(&ctx->inbufq, (size_t)rv);
     if(Curl_bufq_is_empty(&ctx->inbufq)) {
-      DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
       break;
     }
     else {
@@ -975,8 +961,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
       }
     }
     if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
-      drain_this(cf, data);
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
+      drain_stream(cf, data, stream);
     }
     break;
   case NGHTTP2_HEADERS:
@@ -1005,10 +990,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
 
     DEBUGF(LOG_CF(data, cf, "[h2sid=%d] %zu header bytes",
                   stream_id, Curl_bufq_len(&stream->recvbuf)));
-    if(CF_DATA_CURRENT(cf) != data) {
-      drain_this(cf, data);
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
-    }
+    drain_stream(cf, data, stream);
     break;
   case NGHTTP2_PUSH_PROMISE:
     DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv PUSH_PROMISE", stream_id));
@@ -1031,16 +1013,14 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
     DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv RST", stream_id));
     stream->closed = TRUE;
     stream->reset = TRUE;
-    drain_this(cf, data);
-    Curl_expire(data, 0, EXPIRE_RUN_NOW);
+    drain_stream(cf, data, stream);
     break;
   case NGHTTP2_WINDOW_UPDATE:
     DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv WINDOW_UPDATE", stream_id));
     if((data->req.keepon & KEEP_SEND_HOLD) &&
        (data->req.keepon & KEEP_SEND)) {
       data->req.keepon &= ~KEEP_SEND_HOLD;
-      drain_this(cf, data);
-      Curl_expire(data, 0, EXPIRE_RUN_NOW);
+      drain_stream(cf, data, stream);
       DEBUGF(LOG_CF(data, cf, "[h2sid=%d] un-holding after win update",
                     stream_id));
     }
@@ -1156,10 +1136,7 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
   }
 
   /* if we receive data for another handle, wake that up */
-  if(CF_DATA_CURRENT(cf) != data_s) {
-    drain_this(cf, data_s);
-    Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
-  }
+  drain_stream(cf, data_s, stream);
 
   DEBUGASSERT((size_t)nwritten == len);
   DEBUGF(LOG_CF(data_s, cf, "[h2sid=%d] %zd/%zu DATA recvd, "
@@ -1196,10 +1173,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
   if(stream->error)
     stream->reset = TRUE;
 
-  if(CF_DATA_CURRENT(cf) != data_s) {
-    drain_this(cf, data_s);
-    Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
-  }
+  drain_stream(cf, data_s, stream);
 
   /* remove `data_s` from the nghttp2 stream */
   rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
@@ -1529,7 +1503,7 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
     /* resume sending here to trigger the callback to get called again so
        that it can signal EOF to nghttp2 */
     (void)nghttp2_session_resume_data(ctx->h2, stream->id);
-    drain_this(cf, data);
+    drain_stream(cf, data, stream);
   }
 
 out:
@@ -1543,14 +1517,17 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
   struct stream_ctx *stream = H2_STREAM_CTX(data);
   ssize_t rv = 0;
 
-  drained_transfer(cf, data);
-
   if(stream->error == NGHTTP2_REFUSED_STREAM) {
     DEBUGF(LOG_CF(data, cf, "[h2sid=%d] REFUSED_STREAM, try again on a new "
                   "connection", stream->id));
     connclose(cf->conn, "REFUSED_STREAM"); /* don't use this anymore */
     data->state.refused_stream = TRUE;
-    *err = CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */
+    *err = CURLE_SEND_ERROR; /* trigger Curl_retry_request() later */
+    return -1;
+  }
+  else if(stream->reset) {
+    failf(data, "HTTP/2 stream %u was reset", stream->id);
+    *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
     return -1;
   }
   else if(stream->error != NGHTTP2_NO_ERROR) {
@@ -1560,11 +1537,6 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
     *err = CURLE_HTTP2_STREAM;
     return -1;
   }
-  else if(stream->reset) {
-    failf(data, "HTTP/2 stream %u was reset", stream->id);
-    *err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
-    return -1;
-  }
 
   if(!stream->bodystarted) {
     failf(data, "HTTP/2 stream %u was closed cleanly, but before getting "
@@ -1691,7 +1663,6 @@ static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
   ssize_t nread = -1;
 
   *err = CURLE_AGAIN;
-  drained_transfer(cf, data);
   if(!Curl_bufq_is_empty(&stream->recvbuf)) {
     nread = Curl_bufq_read(&stream->recvbuf,
                            (unsigned char *)buf, len, err);
@@ -1755,8 +1726,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
     }
 
     nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
-    DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
-                  Curl_bufq_len(&ctx->inbufq), nread, result));
+    /* DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
+                  Curl_bufq_len(&ctx->inbufq), nread, result)); */
     if(nread < 0) {
       if(result != CURLE_AGAIN) {
         failf(data, "Failed receiving HTTP2 data");
@@ -1832,7 +1803,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
     if(stream->closed) {
       DEBUGF(LOG_CF(data, cf, "[h2sid=%d] closed stream, set drain",
                     stream->id));
-      drain_this(cf, data);
+      drain_stream(cf, data, stream);
     }
   }
 
@@ -2040,9 +2011,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     }
 
     if(should_close_session(ctx)) {
-      DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
-      *err = CURLE_HTTP2;
-      nwritten = -1;
+      if(stream->closed) {
+        nwritten = http2_handle_stream_close(cf, data, err);
+      }
+      else {
+        DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+        *err = CURLE_HTTP2;
+        nwritten = -1;
+      }
       goto out;
     }
 
@@ -2085,9 +2061,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
     }
 
     if(should_close_session(ctx)) {
-      DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
-      *err = CURLE_HTTP2;
-      nwritten = -1;
+      if(stream->closed) {
+        nwritten = http2_handle_stream_close(cf, data, err);
+      }
+      else {
+        DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
+        *err = CURLE_HTTP2;
+        nwritten = -1;
+      }
       goto out;
     }
   }
index cb69f3365855af156e42d4aca2c44a7d90e68aff..947070956d24a3201e8ece09bf4ea37a1f99a154 100644 (file)
@@ -753,7 +753,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
 
   if(maxloops <= 0) {
     /* we mark it as read-again-please */
-    conn->cselect_bits = CURL_CSELECT_IN;
+    data->state.dselect_bits = CURL_CSELECT_IN;
     *comeback = TRUE;
   }
 
@@ -1065,40 +1065,36 @@ CURLcode Curl_readwrite(struct connectdata *conn,
   CURLcode result;
   struct curltime now;
   int didwhat = 0;
+  int select_bits;
 
-  curl_socket_t fd_read;
-  curl_socket_t fd_write;
-  int select_res = conn->cselect_bits;
 
-  conn->cselect_bits = 0;
-
-  /* only use the proper socket if the *_HOLD bit is not set simultaneously as
-     then we are in rate limiting state in that transfer direction */
-
-  if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
-    fd_read = conn->sockfd;
-  else
-    fd_read = CURL_SOCKET_BAD;
-
-  if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
-    fd_write = conn->writesockfd;
-  else
-    fd_write = CURL_SOCKET_BAD;
+  if(data->state.dselect_bits) {
+    select_bits = data->state.dselect_bits;
+    data->state.dselect_bits = 0;
+  }
+  else if(conn->cselect_bits) {
+    select_bits = conn->cselect_bits;
+    conn->cselect_bits = 0;
+  }
+  else {
+    curl_socket_t fd_read;
+    curl_socket_t fd_write;
+    /* only use the proper socket if the *_HOLD bit is not set simultaneously
+       as then we are in rate limiting state in that transfer direction */
+    if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
+      fd_read = conn->sockfd;
+    else
+      fd_read = CURL_SOCKET_BAD;
 
-#if defined(USE_HTTP2) || defined(USE_HTTP3)
-  if(data->state.drain) {
-    select_res |= CURL_CSELECT_IN;
-    DEBUGF(infof(data, "Curl_readwrite: forcibly told to drain data"));
     if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
-      select_res |= CURL_CSELECT_OUT;
-  }
-#endif
+      fd_write = conn->writesockfd;
+    else
+      fd_write = CURL_SOCKET_BAD;
 
-  if(!select_res) /* Call for select()/poll() only, if read/write/error
-                     status is not known. */
-    select_res = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
+    select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
+  }
 
-  if(select_res == CURL_CSELECT_ERR) {
+  if(select_bits == CURL_CSELECT_ERR) {
     failf(data, "select/poll returned error");
     result = CURLE_SEND_ERROR;
     goto out;
@@ -1106,7 +1102,7 @@ CURLcode Curl_readwrite(struct connectdata *conn,
 
 #ifdef USE_HYPER
   if(conn->datastream) {
-    result = conn->datastream(data, conn, &didwhat, done, select_res);
+    result = conn->datastream(data, conn, &didwhat, done, select_bits);
     if(result || *done)
       goto out;
   }
@@ -1115,14 +1111,14 @@ CURLcode Curl_readwrite(struct connectdata *conn,
   /* We go ahead and do a read if we have a readable socket or if
      the stream was rewound (in which case we have data in a
      buffer) */
-  if((k->keepon & KEEP_RECV) && (select_res & CURL_CSELECT_IN)) {
+  if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) {
     result = readwrite_data(data, conn, k, &didwhat, done, comeback);
     if(result || *done)
       goto out;
   }
 
   /* If we still have writing to do, we check if we have a writable socket. */
-  if((k->keepon & KEEP_SEND) && (select_res & CURL_CSELECT_OUT)) {
+  if((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) {
     /* write */
 
     result = readwrite_upload(data, conn, &didwhat);
index a8580bdb66fe8708f37a4c69ba14c8d52f73ab2a..777bc36f96650b3497fcef0bc57d21002d7b7b4d 100644 (file)
@@ -1319,6 +1319,8 @@ struct UrlState {
   char *scratch; /* huge buffer[set.buffer_size*2] for upload CRLF replacing */
   long followlocation; /* redirect counter */
   int requests; /* request counter: redirects + authentication retakes */
+  int dselect_bits; /* != 0 -> bitmask of socket events for this transfer
+                     * overriding anything the socket may report */
 #ifdef HAVE_SIGNAL
   /* storage for the previous bag^H^H^HSIGPIPE signal handler :-) */
   void (*prev_signal)(int sig);
@@ -1374,9 +1376,6 @@ struct UrlState {
   curl_off_t infilesize; /* size of file to upload, -1 means unknown.
                             Copied from set.filesize at start of operation */
 #if defined(USE_HTTP2) || defined(USE_HTTP3)
-  size_t drain; /* Increased when this stream has data to read, even if its
-                   socket is not necessarily is readable. Decreased when
-                   checked. */
   struct Curl_data_priority priority; /* shallow copy of data->set */
 #endif
 
index 1e1a15a8f4b7b54ec36618a7f9d1d5b39182865c..34ea0bcf625941b03d3e1f561fea227b0e2da6f4 100644 (file)
@@ -189,12 +189,33 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
   }
 }
 
-static void notify_drain(struct Curl_cfilter *cf,
+static void drain_stream_from_other_thread(struct Curl_easy *data,
+                                           struct stream_ctx *stream)
+{
+  int bits;
+
+  /* risky */
+  bits = CURL_CSELECT_IN;
+  if(stream && !stream->upload_done)
+    bits |= CURL_CSELECT_OUT;
+  if(data->state.dselect_bits != bits) {
+    data->state.dselect_bits = bits;
+    /* cannot expire from other thread */
+  }
+}
+
+static void drain_stream(struct Curl_cfilter *cf,
                          struct Curl_easy *data)
 {
+  struct stream_ctx *stream = H3_STREAM_CTX(data);
+  int bits;
+
   (void)cf;
-  if(!data->state.drain) {
-    data->state.drain = 1;
+  bits = CURL_CSELECT_IN;
+  if(stream && !stream->upload_done)
+    bits |= CURL_CSELECT_OUT;
+  if(data->state.dselect_bits != bits) {
+    data->state.dselect_bits = bits;
     Curl_expire(data, 0, EXPIRE_RUN_NOW);
   }
 }
@@ -350,7 +371,7 @@ static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request,
     }
   }
 
-  data->state.drain = 1;
+  drain_stream_from_other_thread(data, stream);
   msh3_lock_release(&stream->recv_lock);
 }
 
@@ -469,7 +490,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
   nread = 0;
 
 out:
-  data->state.drain = 0;
   return nread;
 }
 
@@ -508,7 +528,6 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
 
   if(stream->recv_error) {
     failf(data, "request aborted");
-    data->state.drain = 0;
     *err = stream->recv_error;
     goto out;
   }
@@ -522,10 +541,8 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
                   len, nread, *err));
     if(nread < 0)
       goto out;
-    if(!Curl_bufq_is_empty(&stream->recvbuf) ||
-       stream->closed) {
-       notify_drain(cf, data);
-    }
+    if(stream->closed)
+       drain_stream(cf, data);
   }
   else if(stream->closed) {
     nread = recv_closed_stream(cf, data, err);
@@ -669,15 +686,14 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
 
     if(stream->recv_error) {
       bitmap |= GETSOCK_READSOCK(0);
-      notify_drain(cf, data);
+      drain_stream(cf, data);
     }
     else if(stream->req) {
       bitmap |= GETSOCK_READSOCK(0);
-      notify_drain(cf, data);
+      drain_stream(cf, data);
     }
   }
-  DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d",
-                (uint32_t)data->state.drain, bitmap));
+  DEBUGF(LOG_CF(data, cf, "select_sock -> %d", bitmap));
   CF_DATA_RESTORE(cf, save);
   return bitmap;
 }
@@ -698,6 +714,8 @@ static bool cf_msh3_data_pending(struct Curl_cfilter *cf,
                   Curl_bufq_len(&stream->recvbuf)));
     pending = !Curl_bufq_is_empty(&stream->recvbuf);
     msh3_lock_release(&stream->recv_lock);
+    if(pending)
+      drain_stream(cf, (struct Curl_easy *)data);
   }
 
   CF_DATA_RESTORE(cf, save);
index 9c0c223b4abb3f0aeabd647bd023a1b5ecc435af..2f4b4cdb481a61e712ef520031fec7e697a513cb 100644 (file)
@@ -709,11 +709,6 @@ static void report_consumed_data(struct Curl_cfilter *cf,
                                          consumed);
     ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
   }
-  if(!stream->closed && data->state.drain &&
-     Curl_bufq_is_empty(&stream->recvbuf)) {
-     /* nothing buffered any more */
-     data->state.drain = 0;
-  }
 }
 
 static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
@@ -995,12 +990,18 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
   return rv;
 }
 
-static void notify_drain(struct Curl_cfilter *cf,
+static void drain_stream(struct Curl_cfilter *cf,
                          struct Curl_easy *data)
 {
+  struct stream_ctx *stream = H3_STREAM_CTX(data);
+  int bits;
+
   (void)cf;
-  if(!data->state.drain) {
-    data->state.drain = 1;
+  bits = CURL_CSELECT_IN;
+  if(stream && !stream->upload_done)
+    bits |= CURL_CSELECT_OUT;
+  if(data->state.dselect_bits != bits) {
+    data->state.dselect_bits = bits;
     Curl_expire(data, 0, EXPIRE_RUN_NOW);
   }
 }
@@ -1028,7 +1029,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
   if(app_error_code == NGHTTP3_H3_INTERNAL_ERROR) {
     stream->reset = TRUE;
   }
-  notify_drain(cf, data);
+  drain_stream(cf, data);
   return 0;
 }
 
@@ -1082,9 +1083,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
   (void)stream3_id;
 
   result = write_resp_raw(cf, data, buf, buflen, TRUE);
-  if(CF_DATA_CURRENT(cf) != data) {
-    notify_drain(cf, data);
-  }
+  drain_stream(cf, data);
   return result? -1 : 0;
 }
 
@@ -1129,9 +1128,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
   if(stream->status_code / 100 != 1) {
     stream->resp_hds_complete = TRUE;
   }
-  if(CF_DATA_CURRENT(cf) != data) {
-    notify_drain(cf, data);
-  }
+  drain_stream(cf, data);
   return 0;
 }
 
@@ -1358,7 +1355,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
   nread = 0;
 
 out:
-  data->state.drain = 0;
   return nread;
 }
 
@@ -1413,16 +1409,13 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
   }
 
   if(nread > 0) {
-    if(1 || !Curl_bufq_is_empty(&stream->recvbuf)) {
-      notify_drain(cf, data);
-    }
+    drain_stream(cf, data);
   }
   else {
     if(stream->closed) {
       nread = recv_closed_stream(cf, data, err);
       goto out;
     }
-    data->state.drain = FALSE;
     *err = CURLE_AGAIN;
     nread = -1;
   }
@@ -1468,7 +1461,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
     if((data->req.keepon & KEEP_SEND_HOLD) &&
        (data->req.keepon & KEEP_SEND)) {
       data->req.keepon &= ~KEEP_SEND_HOLD;
-      notify_drain(cf, data);
+      drain_stream(cf, data);
       DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks",
                     stream_id));
     }
index 1a6838b0127f8b649c059373f3745d6f237f7fe1..afd446b2e477cfc93d69fe5a20798c3e236038a7 100644 (file)
@@ -299,11 +299,18 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
   }
 }
 
-static void notify_drain(struct Curl_cfilter *cf, struct Curl_easy *data)
+static void drain_stream(struct Curl_cfilter *cf,
+                         struct Curl_easy *data)
 {
+  struct stream_ctx *stream = H3_STREAM_CTX(data);
+  int bits;
+
   (void)cf;
-  if(!data->state.drain) {
-    data->state.drain = 1;
+  bits = CURL_CSELECT_IN;
+  if(stream && !stream->upload_done)
+    bits |= CURL_CSELECT_OUT;
+  if(data->state.dselect_bits != bits) {
+    data->state.dselect_bits = bits;
     Curl_expire(data, 0, EXPIRE_RUN_NOW);
   }
 }
@@ -579,9 +586,7 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
     }
     else {
       result = h3_process_event(cf, sdata, stream3_id, ev);
-      if(sdata != data) {
-        notify_drain(cf, sdata);
-      }
+      drain_stream(cf, sdata);
       if(result) {
         DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] error processing event %s "
                       "for [h3sid=%"PRId64"] -> %d",
@@ -848,15 +853,20 @@ static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
   }
 
   if(nread > 0) {
-    data->state.drain = (!Curl_bufq_is_empty(&stream->recvbuf) ||
-                         stream->closed);
+    if(stream->closed)
+      drain_stream(cf, data);
   }
   else {
-    data->state.drain = FALSE;
     if(stream->closed) {
       nread = recv_closed_stream(cf, data, err);
       goto out;
     }
+    else if(quiche_conn_is_draining(ctx->qconn)) {
+      failf(data, "QUIC connection is draining");
+      *err = CURLE_HTTP3;
+      nread = -1;
+      goto out;
+    }
     *err = CURLE_AGAIN;
     nread = -1;
   }
@@ -1065,24 +1075,9 @@ static bool stream_is_writeable(struct Curl_cfilter *cf,
 {
   struct cf_quiche_ctx *ctx = cf->ctx;
   struct stream_ctx *stream = H3_STREAM_CTX(data);
-  quiche_stream_iter *qiter;
-  bool is_writable = FALSE;
 
-  if(!stream)
-    return FALSE;
-  /* surely, there must be a better way */
-  qiter = quiche_conn_writable(ctx->qconn);
-  if(qiter) {
-    uint64_t stream_id;
-    while(quiche_stream_iter_next(qiter, &stream_id)) {
-      if(stream_id == (uint64_t)stream->id) {
-        is_writable = TRUE;
-        break;
-      }
-    }
-    quiche_stream_iter_free(qiter);
-  }
-  return is_writable;
+  return stream &&
+         quiche_conn_stream_writable(ctx->qconn, (uint64_t)stream->id, 1);
 }
 
 static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
@@ -1152,7 +1147,8 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
   }
   case CF_CTRL_DATA_IDLE:
     result = cf_flush_egress(cf, data);
-    DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
+    if(result)
+      DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
     break;
   default:
     break;
index 87dd1a75de6b80ef6a89bc1bc3abb4f9535e94b7..a51ebfa7d972b99ba5c2c8daaccd61ea91aefd22 100644 (file)
@@ -398,7 +398,6 @@ static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
       ;
     if(nread == -1) {
       if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
-        DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN"));
         goto out;
       }
       if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
index 587ba33c49114d6e0d287a6838f8dcc0664875ce..219faf3ccf24eb9052e5d9ecf61bba2f86fa79a2 100644 (file)
@@ -89,6 +89,6 @@ class TestErrors:
         assert len(r.stats) == count, f'did not get all stats: {r}'
         invalid_stats = []
         for idx, s in enumerate(r.stats):
-            if 'exitcode' not in s or s['exitcode'] not in [18, 56, 92, 95]:
+            if 'exitcode' not in s or s['exitcode'] not in [18, 55, 56, 92, 95]:
                 invalid_stats.append(f'request {idx} exit with {s["exitcode"]}\n{s}')
         assert len(invalid_stats) == 0, f'failed: {invalid_stats}'