]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
http3/ngtcp2: upload EAGAIN handling
authorStefan Eissing <stefan@eissing.org>
Tue, 27 Jun 2023 10:06:21 +0000 (12:06 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Sun, 9 Jul 2023 16:53:25 +0000 (18:53 +0200)
- refs #11389 where IDLE timeouts on upload are reported
- reword ngtcp2 expiry handling to apply to both send+recv
  calls into the filter
- EAGAIN uploads similar to the recent changes in HTTP/2, e.g.
  report success only when send data was ACKed.
- HOLD sending of EAGAINed uploads to avoid cpu busy loops
- rename internal function for consistency with HTTP/2
  implementation

Fixes #11389
Closes #11390

lib/vquic/curl_ngtcp2.c

index 34d16013d67ce984ccb881416316723895f8c3e6..0fb0daae54ffd12e3ea071d302b1784a08222323 100644 (file)
@@ -177,6 +177,7 @@ struct h3_stream_ctx {
   struct bufq sendbuf;   /* h3 request body */
   struct bufq recvbuf;   /* h3 response body */
   size_t sendbuf_len_in_flight; /* sendbuf amount "in flight" */
+  size_t upload_blocked_len; /* the amount written last and EGAINed */
   size_t recv_buf_nonflow; /* buffered bytes, not counting for flow control */
   uint64_t error3; /* HTTP/3 stream error code */
   curl_off_t upload_left; /* number of request bytes left to upload */
@@ -272,12 +273,12 @@ static void pktx_init(struct pkt_io_ctx *pktx,
   ngtcp2_path_storage_zero(&pktx->ps);
 }
 
-static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+                                    struct Curl_easy *data,
+                                    struct pkt_io_ctx *pktx);
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
                                    struct Curl_easy *data,
                                    struct pkt_io_ctx *pktx);
-static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
-                                struct Curl_easy *data,
-                                struct pkt_io_ctx *pktx);
 static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
                                    uint64_t datalen, void *user_data,
                                    void *stream_user_data);
@@ -985,6 +986,63 @@ static ngtcp2_callbacks ng_callbacks = {
   NULL, /* early_data_rejected */
 };
 
+/**
+ * Connection maintenance like timeouts on packet ACKs etc. are done by us, not
+ * the OS like for TCP. POLL events on the socket therefore are not
+ * sufficient.
+ * ngtcp2 tells us when it wants to be invoked again. We handle that via
+ * the `Curl_expire()` mechanisms.
+ */
+static CURLcode check_and_set_expiry(struct Curl_cfilter *cf,
+                                     struct Curl_easy *data,
+                                     struct pkt_io_ctx *pktx)
+{
+  struct cf_ngtcp2_ctx *ctx = cf->ctx;
+  struct pkt_io_ctx local_pktx;
+  ngtcp2_tstamp expiry;
+  ngtcp2_duration timeout;
+
+  if(!pktx) {
+    pktx_init(&local_pktx, cf, data);
+    pktx = &local_pktx;
+  }
+  else {
+    pktx->ts = timestamp();
+  }
+
+  expiry = ngtcp2_conn_get_expiry(ctx->qconn);
+  if(expiry != UINT64_MAX) {
+    if(expiry <= pktx->ts) {
+      CURLcode result;
+      int rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
+      if(rv) {
+        failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
+              ngtcp2_strerror(rv));
+        ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
+        return CURLE_SEND_ERROR;
+      }
+      timeout = 0;
+      result = cf_progress_ingress(cf, data, pktx);
+      if(result)
+        return result;
+      result = cf_progress_egress(cf, data, pktx);
+      if(result)
+        return result;
+      /* ask again, things might have changed */
+      expiry = ngtcp2_conn_get_expiry(ctx->qconn);
+    }
+
+    if(expiry > pktx->ts) {
+      timeout = expiry - pktx->ts;
+      if(timeout % NGTCP2_MILLISECONDS) {
+        timeout += NGTCP2_MILLISECONDS;
+      }
+      Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
+    }
+  }
+  return CURLE_OK;
+}
+
 static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
                                       struct Curl_easy *data,
                                       curl_socket_t *socks)
@@ -1022,7 +1080,7 @@ static void h3_drain_stream(struct Curl_cfilter *cf,
 
   (void)cf;
   bits = CURL_CSELECT_IN;
-  if(stream && !stream->send_closed && stream->upload_left)
+  if(stream && stream->upload_left && !stream->send_closed)
     bits |= CURL_CSELECT_OUT;
   if(data->state.dselect_bits != bits) {
     data->state.dselect_bits = bits;
@@ -1420,7 +1478,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
     report_consumed_data(cf, data, nread);
   }
 
-  if(cf_process_ingress(cf, data, &pktx)) {
+  if(cf_progress_ingress(cf, data, &pktx)) {
     *err = CURLE_RECV_ERROR;
     nread = -1;
     goto out;
@@ -1450,10 +1508,17 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
   }
 
 out:
-  if(cf_flush_egress(cf, data, &pktx)) {
+  if(cf_progress_egress(cf, data, &pktx)) {
     *err = CURLE_SEND_ERROR;
     nread = -1;
   }
+  else {
+    CURLcode result2 = check_and_set_expiry(cf, data, &pktx);
+    if(result2) {
+      *err = result2;
+      nread = -1;
+    }
+  }
   DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_recv(len=%zu) -> %zd, %d",
                 stream? stream->id : -1, len, nread, *err));
   CF_DATA_RESTORE(cf, save);
@@ -1482,10 +1547,8 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
   Curl_bufq_skip(&stream->sendbuf, skiplen);
   stream->sendbuf_len_in_flight -= skiplen;
 
-  /* `sendbuf` *might* now have more room. If so, resume this
-   * possibly paused stream. And also tell our transfer engine that
-   * it may continue KEEP_SEND if told to PAUSE. */
-  if(!Curl_bufq_is_full(&stream->sendbuf)) {
+  /* Everything ACKed, we resume upload processing */
+  if(!stream->sendbuf_len_in_flight) {
     int rv = nghttp3_conn_resume_stream(conn, stream_id);
     if(rv) {
       return NGTCP2_ERR_CALLBACK_FAILURE;
@@ -1644,16 +1707,19 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
     else
       /* data sending without specifying the data amount up front */
       stream->upload_left = -1; /* unknown */
-    reader.read_data = cb_h3_read_req_body;
-    preader = &reader;
     break;
   default:
     /* there is not request body */
     stream->upload_left = 0; /* no request body */
-    preader = NULL;
     break;
   }
 
+  stream->send_closed = (stream->upload_left == 0);
+  if(!stream->send_closed) {
+    reader.read_data = cb_h3_read_req_body;
+    preader = &reader;
+  }
+
   rc = nghttp3_conn_submit_request(ctx->h3conn, stream->id,
                                    nva, nheader, preader, data);
   if(rc) {
@@ -1691,6 +1757,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
   ssize_t sent = 0;
   struct cf_call_data save;
   struct pkt_io_ctx pktx;
+  CURLcode result;
 
   CF_DATA_SAVE(save, cf, data);
   DEBUGASSERT(cf->connected);
@@ -1699,10 +1766,10 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
   pktx_init(&pktx, cf, data);
   *err = CURLE_OK;
 
-  if(stream && stream->closed) {
-    *err = CURLE_HTTP3;
+  result = cf_progress_ingress(cf, data, &pktx);
+  if(result) {
+    *err = result;
     sent = -1;
-    goto out;
   }
 
   if(!stream || stream->id < 0) {
@@ -1712,32 +1779,64 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
       goto out;
     }
   }
+  else if(stream->upload_blocked_len) {
+    /* the data in `buf` has alread been submitted or added to the
+     * buffers, but have been EAGAINed on the last invocation. */
+    DEBUGASSERT(len >= stream->upload_blocked_len);
+    if(len < stream->upload_blocked_len) {
+      /* Did we get called again with a smaller `len`? This should not
+       * happen. We are not prepared to handle that. */
+      failf(data, "HTTP/3 send again with decreased length");
+      *err = CURLE_HTTP3;
+      sent = -1;
+      goto out;
+    }
+    sent = (ssize_t)stream->upload_blocked_len;
+    stream->upload_blocked_len = 0;
+  }
+  else if(stream->closed) {
+    *err = CURLE_HTTP3;
+    sent = -1;
+    goto out;
+  }
   else {
     sent = Curl_bufq_write(&stream->sendbuf, buf, len, err);
     DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send, add to "
                   "sendbuf(len=%zu) -> %zd, %d",
                   stream->id, len, sent, *err));
     if(sent < 0) {
-      if(*err == CURLE_AGAIN) {
-        /* Can't add more to the send buf, needs to drain first.
-         * Pause the sending to avoid a busy loop. */
-        data->req.keepon |= KEEP_SEND_HOLD;
-        DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] pause send",
-                      stream->id));
-      }
       goto out;
     }
 
     (void)nghttp3_conn_resume_stream(ctx->h3conn, stream->id);
   }
 
-  if(cf_flush_egress(cf, data, &pktx)) {
-    *err = CURLE_SEND_ERROR;
+  result = cf_progress_egress(cf, data, &pktx);
+  if(result) {
+    *err = result;
     sent = -1;
-    goto out;
+  }
+
+  if(stream && sent > 0 && stream->sendbuf_len_in_flight) {
+    /* We have unacknowledged DATA and cannot report success to our
+     * caller. Instead we EAGAIN and remember how much we have already
+     * "written" into our various internal connection buffers.
+     * We put the stream upload on HOLD, until this gets ACKed. */
+    stream->upload_blocked_len = sent;
+    DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] cf_send(len=%zu), "
+                  "%zu bytes in flight -> EGAIN", stream->id, len,
+                  stream->sendbuf_len_in_flight));
+    *err = CURLE_AGAIN;
+    sent = -1;
+    data->req.keepon |= KEEP_SEND_HOLD;
   }
 
 out:
+  result = check_and_set_expiry(cf, data, &pktx);
+  if(result) {
+    *err = result;
+    sent = -1;
+  }
   CF_DATA_RESTORE(cf, save);
   return sent;
 }
@@ -1837,15 +1936,15 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen,
   return CURLE_OK;
 }
 
-static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
-                                   struct Curl_easy *data,
-                                   struct pkt_io_ctx *pktx)
+static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
+                                    struct Curl_easy *data,
+                                    struct pkt_io_ctx *pktx)
 {
   struct cf_ngtcp2_ctx *ctx = cf->ctx;
   struct pkt_io_ctx local_pktx;
   size_t pkts_chunk = 128, i;
   size_t pkts_max = 10 * pkts_chunk;
-  CURLcode result;
+  CURLcode result = CURLE_OK;
 
   if(!pktx) {
     pktx_init(&local_pktx, cf, data);
@@ -1864,7 +1963,9 @@ static CURLcode cf_process_ingress(struct Curl_cfilter *cf,
     if(pktx->pkt_count < pkts_chunk) /* got less than we could */
       break;
     /* give egress a chance before we receive more */
-    result = cf_flush_egress(cf, data, pktx);
+    result = cf_progress_egress(cf, data, pktx);
+    if(result) /* error */
+      break;
   }
   return result;
 }
@@ -1976,18 +2077,15 @@ out:
   return nwritten;
 }
 
-static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
-                                struct Curl_easy *data,
-                                struct pkt_io_ctx *pktx)
+static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
+                                   struct Curl_easy *data,
+                                   struct pkt_io_ctx *pktx)
 {
   struct cf_ngtcp2_ctx *ctx = cf->ctx;
-  int rv;
   ssize_t nread;
   size_t max_payload_size, path_max_payload_size, max_pktcnt;
   size_t pktcnt = 0;
   size_t gsolen = 0;  /* this disables gso until we have a clue */
-  ngtcp2_tstamp expiry;
-  ngtcp2_duration timeout;
   CURLcode curlcode;
   struct pkt_io_ctx local_pktx;
 
@@ -2000,14 +2098,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
     ngtcp2_path_storage_zero(&pktx->ps);
   }
 
-  rv = ngtcp2_conn_handle_expiry(ctx->qconn, pktx->ts);
-  if(rv) {
-    failf(data, "ngtcp2_conn_handle_expiry returned error: %s",
-          ngtcp2_strerror(rv));
-    ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0);
-    return CURLE_SEND_ERROR;
-  }
-
   curlcode = vquic_flush(cf, data, &ctx->q);
   if(curlcode) {
     if(curlcode == CURLE_AGAIN) {
@@ -2098,21 +2188,6 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
   }
 
 out:
-  /* non-errored exit. check when we should run again. */
-  expiry = ngtcp2_conn_get_expiry(ctx->qconn);
-  if(expiry != UINT64_MAX) {
-    if(expiry <= pktx->ts) {
-      timeout = 0;
-    }
-    else {
-      timeout = expiry - pktx->ts;
-      if(timeout % NGTCP2_MILLISECONDS) {
-        timeout += NGTCP2_MILLISECONDS;
-      }
-    }
-    Curl_expire(data, timeout / NGTCP2_MILLISECONDS, EXPIRE_QUIC);
-  }
-
   return CURLE_OK;
 }
 
@@ -2172,11 +2247,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
     break;
   }
   case CF_CTRL_DATA_IDLE:
-    if(timestamp() >= ngtcp2_conn_get_expiry(ctx->qconn)) {
-      if(cf_flush_egress(cf, data, NULL)) {
-        result = CURLE_SEND_ERROR;
-      }
-    }
+    result = check_and_set_expiry(cf, data, NULL);
     break;
   default:
     break;
@@ -2398,16 +2469,16 @@ static CURLcode cf_ngtcp2_connect(struct Curl_cfilter *cf,
     result = cf_connect_start(cf, data, &pktx);
     if(result)
       goto out;
-    result = cf_flush_egress(cf, data, &pktx);
+    result = cf_progress_egress(cf, data, &pktx);
     /* we do not expect to be able to recv anything yet */
     goto out;
   }
 
-  result = cf_process_ingress(cf, data, &pktx);
+  result = cf_progress_ingress(cf, data, &pktx);
   if(result)
     goto out;
 
-  result = cf_flush_egress(cf, data, &pktx);
+  result = cf_progress_egress(cf, data, &pktx);
   if(result)
     goto out;
 
@@ -2464,6 +2535,9 @@ out:
           r_ip, r_port, curl_easy_strerror(result));
   }
 #endif
+  if(!result && ctx->qconn) {
+    result = check_and_set_expiry(cf, data, &pktx);
+  }
   DEBUGF(LOG_CF(data, cf, "connect -> %d, done=%d", result, *done));
   CF_DATA_RESTORE(cf, save);
   return result;
@@ -2535,7 +2609,7 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf,
        not in use by any other transfer, there shouldn't be any data here,
        only "protocol frames" */
     *input_pending = FALSE;
-    if(cf_process_ingress(cf, data, NULL))
+    if(cf_progress_ingress(cf, data, NULL))
       alive = FALSE;
     else {
       alive = TRUE;