]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
cw-out: improved error handling
authorStefan Eissing <stefan@eissing.org>
Wed, 10 Apr 2024 12:52:34 +0000 (14:52 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Tue, 16 Apr 2024 13:52:10 +0000 (15:52 +0200)
- remember error encountered in invoking write callback and always fail
  afterwards without further invokes

- check behaviour in test_02_17 with h2-pausing client

Reported-by: Pavel Kropachev
Fixes #13337
Closes #13340

lib/cw-out.c
lib/cw-out.h
lib/easy.c
lib/multi.c
lib/sendf.c
lib/sendf.h
lib/transfer.c
tests/http/clients/h2-pausing.c
tests/http/test_02_download.py

index 07172b6151fd7acb2af49fe221930079ce650199..4e56c6a1bb911847e361c9c5baec049a4049acbe 100644 (file)
@@ -102,6 +102,8 @@ static void cw_out_buf_free(struct cw_out_buf *cwbuf)
 struct cw_out_ctx {
   struct Curl_cwriter super;
   struct cw_out_buf *buf;
+  BIT(paused);
+  BIT(errored);
 };
 
 static CURLcode cw_out_write(struct Curl_easy *data,
@@ -201,7 +203,10 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
   size_t max_write, min_write;
   size_t wlen, nwritten;
 
-  (void)ctx;
+  /* If we errored once, we do not invoke the client callback  again */
+  if(ctx->errored)
+    return CURLE_WRITE_ERROR;
+
   /* write callbacks may get NULLed by the client between calls. */
   cw_get_writefunc(data, otype, &wcb, &wcb_data, &max_write, &min_write);
   if(!wcb) {
@@ -210,7 +215,7 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
   }
 
   *pconsumed = 0;
-  while(blen && !(data->req.keepon & KEEP_RECV_PAUSE)) {
+  while(blen && !ctx->paused) {
     if(!flush_all && blen < min_write)
       break;
     wlen = max_write? CURLMIN(blen, max_write) : blen;
@@ -230,10 +235,15 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
       }
       /* mark the connection as RECV paused */
       data->req.keepon |= KEEP_RECV_PAUSE;
+      ctx->paused = TRUE;
       CURL_TRC_WRITE(data, "cw_out, PAUSE requested by client");
       break;
     }
-    if(nwritten != wlen) {
+    else if(CURL_WRITEFUNC_ERROR == nwritten) {
+      failf(data, "client returned ERROR on write of %zu bytes", wlen);
+      return CURLE_WRITE_ERROR;
+    }
+    else if(nwritten != wlen) {
       failf(data, "Failure writing output to destination, "
             "passed %zu returned %zd", wlen, nwritten);
       return CURLE_WRITE_ERROR;
@@ -287,7 +297,7 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx,
 
   if(!cwbuf)
     return CURLE_OK;
-  if(data->req.keepon & KEEP_RECV_PAUSE)
+  if(ctx->paused)
     return CURLE_OK;
 
   /* write the end of the chain until it blocks or gets empty */
@@ -300,7 +310,7 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx,
       return result;
     if(*plast) {
       /* could not write last, paused again? */
-      DEBUGASSERT(data->req.keepon & KEEP_RECV_PAUSE);
+      DEBUGASSERT(ctx->paused);
       return CURLE_OK;
     }
   }
@@ -342,14 +352,14 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
                                 bool flush_all,
                                 const char *buf, size_t blen)
 {
-  CURLcode result;
+  CURLcode result = CURLE_OK;
 
   /* if we have buffered data and it is a different type than what
    * we are writing now, try to flush all */
   if(ctx->buf && ctx->buf->type != otype) {
     result = cw_out_flush_chain(ctx, data, &ctx->buf, TRUE);
     if(result)
-      return result;
+      goto out;
   }
 
   if(ctx->buf) {
@@ -359,7 +369,7 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
       return result;
     result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
     if(result)
-      return result;
+      goto out;
   }
   else {
     /* nothing buffered, try direct write */
@@ -372,10 +382,18 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
       /* did not write all, append the rest */
       result = cw_out_append(ctx, otype, buf + consumed, blen - consumed);
       if(result)
-        return result;
+        goto out;
     }
   }
-  return CURLE_OK;
+
+out:
+  if(result) {
+    /* We do not want to invoked client callbacks a second time after
+     * encountering an error. See issue #13337 */
+    ctx->errored = TRUE;
+    cw_out_bufs_free(ctx);
+  }
+  return result;
 }
 
 static CURLcode cw_out_write(struct Curl_easy *data,
@@ -413,10 +431,12 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data)
     return FALSE;
 
   ctx = (struct cw_out_ctx *)cw_out;
-  return cw_out_bufs_len(ctx) > 0;
+  CURL_TRC_WRITE(data, "cw-out is%spaused", ctx->paused? "" : " not");
+  return ctx->paused;
 }
 
-static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all)
+static CURLcode cw_out_flush(struct Curl_easy *data,
+                             bool unpause, bool flush_all)
 {
   struct Curl_cwriter *cw_out;
   CURLcode result = CURLE_OK;
@@ -424,18 +444,31 @@ static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all)
   cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out);
   if(cw_out) {
     struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out;
+    if(ctx->errored)
+      return CURLE_WRITE_ERROR;
+    if(unpause && ctx->paused)
+      ctx->paused = FALSE;
+    if(ctx->paused)
+      return CURLE_OK;  /* not doing it */
 
     result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
+    if(result) {
+      ctx->errored = TRUE;
+      cw_out_bufs_free(ctx);
+      return result;
+    }
   }
   return result;
 }
 
-CURLcode Curl_cw_out_flush(struct Curl_easy *data)
+CURLcode Curl_cw_out_unpause(struct Curl_easy *data)
 {
-  return cw_out_flush(data, FALSE);
+  CURL_TRC_WRITE(data, "cw-out unpause");
+  return cw_out_flush(data, TRUE, FALSE);
 }
 
 CURLcode Curl_cw_out_done(struct Curl_easy *data)
 {
-  return cw_out_flush(data, TRUE);
+  CURL_TRC_WRITE(data, "cw-out done");
+  return cw_out_flush(data, FALSE, TRUE);
 }
index c13e85380b7e3ac635828eb78abb5283fe1064ae..ca4c2e435d2a780861ee529240bb2c015bcc2651 100644 (file)
@@ -43,7 +43,7 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data);
 /**
  * Flush any buffered date to the client, chunk collation still applies.
  */
-CURLcode Curl_cw_out_flush(struct Curl_easy *data);
+CURLcode Curl_cw_out_unpause(struct Curl_easy *data);
 
 /**
  * Mark EndOfStream reached and flush ALL data to the client.
index 30fd6ca89a6765ead9cfd0e2eea29e967f4892ab..f4f4d2cc634e923a1a3bca9411180152b26b7a4d 100644 (file)
@@ -58,7 +58,6 @@
 #include "multiif.h"
 #include "select.h"
 #include "cfilters.h"
-#include "cw-out.h"
 #include "sendf.h" /* for failf function prototype */
 #include "connect.h" /* for Curl_getconnectinfo */
 #include "slist.h"
@@ -1086,6 +1085,7 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action)
   int oldstate;
   int newstate;
   bool recursive = FALSE;
+  bool keep_changed, unpause_read, not_all_paused;
 
   if(!GOOD_EASY_HANDLE(data) || !data->conn)
     /* crazy input, don't continue */
@@ -1101,51 +1101,47 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action)
     ((action & CURLPAUSE_RECV)?KEEP_RECV_PAUSE:0) |
     ((action & CURLPAUSE_SEND)?KEEP_SEND_PAUSE:0);
 
-  if((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) == oldstate) {
-    /* Not changing any pause state, return */
-    DEBUGF(infof(data, "pause: no change, early return"));
-    return CURLE_OK;
-  }
-
-  /* Unpause parts in active mime tree. */
-  if((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
-     (data->mstate == MSTATE_PERFORMING ||
-      data->mstate == MSTATE_RATELIMITING)) {
-    result = Curl_creader_unpause(data);
-    if(result)
-      return result;
-  }
-
-  /* put it back in the keepon */
+  keep_changed = ((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) != oldstate);
+  not_all_paused = (newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
+                   (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE);
+  unpause_read = ((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
+                  (data->mstate == MSTATE_PERFORMING ||
+                   data->mstate == MSTATE_RATELIMITING));
+  /* Unpausing writes is detected on the next run in
+   * transfer.c:Curl_readwrite(). This is because this may result
+   * in a transfer error if the application's callbacks fail */
+
+  /* Set the new keepon state, so it takes effect no matter what error
+   * may happen afterwards. */
   k->keepon = newstate;
 
-  if(!(newstate & KEEP_RECV_PAUSE)) {
-    Curl_conn_ev_data_pause(data, FALSE);
-    result = Curl_cw_out_flush(data);
-    if(result)
-      return result;
-  }
-
-  /* if there's no error and we're not pausing both directions, we want
-     to have this handle checked soon */
-  if((newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
-     (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) {
-    Curl_expire(data, 0, EXPIRE_RUN_NOW); /* get this handle going again */
-
+  /* If not completely pausing both directions now, run again in any case. */
+  if(not_all_paused) {
+    Curl_expire(data, 0, EXPIRE_RUN_NOW);
     /* reset the too-slow time keeper */
     data->state.keeps_speed.tv_sec = 0;
-
-    if(!Curl_cw_out_is_paused(data))
-      /* if not pausing again, force a recv/send check of this connection as
-         the data might've been read off the socket already */
-      data->state.select_bits = CURL_CSELECT_IN | CURL_CSELECT_OUT;
-    if(data->multi) {
-      if(Curl_update_timer(data->multi))
-        return CURLE_ABORTED_BY_CALLBACK;
+    /* Simulate socket events on next run for unpaused directions */
+    if(!(newstate & KEEP_SEND_PAUSE))
+      data->state.select_bits |= CURL_CSELECT_OUT;
+    if(!(newstate & KEEP_RECV_PAUSE))
+      data->state.select_bits |= CURL_CSELECT_IN;
+    /* On changes, tell application to update its timers. */
+    if(keep_changed && data->multi) {
+      if(Curl_update_timer(data->multi)) {
+        result = CURLE_ABORTED_BY_CALLBACK;
+        goto out;
+      }
     }
   }
 
-  if(!data->state.done)
+  if(unpause_read) {
+    result = Curl_creader_unpause(data);
+    if(result)
+      goto out;
+  }
+
+out:
+  if(!result && !data->state.done && keep_changed)
     /* This transfer may have been moved in or out of the bundle, update the
        corresponding socket callback, if used */
     result = Curl_updatesocket(data);
index 63cdd11af2a1b183223a8ba2bc4403997a75b3a1..d9094ae3f243de4f52e1b5d7901813241c2107f1 100644 (file)
@@ -2521,7 +2521,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
         Curl_posttransfer(data);
         multi_done(data, result, TRUE);
       }
-      else if(data->req.done) {
+      else if(data->req.done && !Curl_cwriter_is_paused(data)) {
 
         /* call this even if the readwrite function returned error */
         Curl_posttransfer(data);
index 7e099a2d158d7056ed24ae89ca30ed5973cca7d1..7b00c4ce48ec0074d61a5c3b20bf3aab1f11c0e4 100644 (file)
@@ -506,6 +506,16 @@ void Curl_cwriter_remove_by_name(struct Curl_easy *data,
   }
 }
 
+bool Curl_cwriter_is_paused(struct Curl_easy *data)
+{
+  return Curl_cw_out_is_paused(data);
+}
+
+CURLcode Curl_cwriter_unpause(struct Curl_easy *data)
+{
+  return Curl_cw_out_unpause(data);
+}
+
 CURLcode Curl_creader_read(struct Curl_easy *data,
                            struct Curl_creader *reader,
                            char *buf, size_t blen, size_t *nread, bool *eos)
index d736ce44add3a78165c9699214e245d100a17f5c..3838f876eb96402b323c5e912e0f1e1450739fe7 100644 (file)
@@ -180,6 +180,16 @@ CURLcode Curl_cwriter_write(struct Curl_easy *data,
                             struct Curl_cwriter *writer, int type,
                             const char *buf, size_t nbytes);
 
+/**
+ * Return TRUE iff client writer is paused.
+ */
+bool Curl_cwriter_is_paused(struct Curl_easy *data);
+
+/**
+ * Unpause client writer and flush any buffered date to the client.
+ */
+CURLcode Curl_cwriter_unpause(struct Curl_easy *data);
+
 /**
  * Default implementations for do_init, do_write, do_close that
  * do nothing and pass the data through.
index 4b32b53e99d711ff338a6337133be3356df91fd5..8c7e33dac556f28f170f433fbcd60998aefa118a 100644 (file)
@@ -272,7 +272,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
         DEBUGF(infof(data, "nread == 0, stream closed, bailing"));
       else
         DEBUGF(infof(data, "nread <= 0, server closed connection, bailing"));
-      k->keepon = 0; /* stop sending as well */
+      k->keepon &= ~(KEEP_RECV|KEEP_SEND); /* stop sending as well */
       if(k->eos_written) /* already did write this to client, leave */
         break;
     }
@@ -409,6 +409,14 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
   int didwhat = 0;
   int select_bits;
 
+  /* Check if client writes had been paused and can resume now. */
+  if(!(k->keepon & KEEP_RECV_PAUSE) && Curl_cwriter_is_paused(data)) {
+    Curl_conn_ev_data_pause(data, FALSE);
+    result = Curl_cwriter_unpause(data);
+    if(result)
+      goto out;
+  }
+
   if(data->state.select_bits) {
     if(select_bits_paused(data, data->state.select_bits)) {
       /* leave the bits unchanged, so they'll tell us what to do when
index 40ae361f1bcedf79b6a627e74d9d72d6e0acd324..c12e8fab0046342e024976029b146d437c20a3b9 100644 (file)
@@ -27,6 +27,7 @@
  */
 /* This is based on the poc client of issue #11982
  */
+#include <assert.h>
 #include <stdio.h>
 #include <string.h>
 #include <sys/time.h>
@@ -141,11 +142,24 @@ static int err(void)
   exit(2);
 }
 
+static void usage(const char *msg)
+{
+  if(msg)
+    fprintf(stderr, "%s\n", msg);
+  fprintf(stderr,
+    "usage: [options] url\n"
+    "  pause downloads with following options:\n"
+    "  -V http_version (http/1.1, h2, h3) http version to use\n"
+  );
+}
+
 struct handle
 {
   int idx;
   int paused;
   int resumed;
+  int errored;
+  int fail_write;
   CURL *h;
 };
 
@@ -165,8 +179,15 @@ static size_t cb(void *data, size_t size, size_t nmemb, void *clientp)
     ++handle->paused;
     fprintf(stderr, "INFO: [%d] write, PAUSING %d time on %lu bytes\n",
             handle->idx, handle->paused, (long)realsize);
+    assert(handle->paused == 1);
     return CURL_WRITEFUNC_PAUSE;
   }
+  if(handle->fail_write) {
+    ++handle->errored;
+    fprintf(stderr, "INFO: [%d] FAIL write of %lu bytes, %d time\n",
+            handle->idx, (long)realsize, handle->errored);
+    return CURL_WRITEFUNC_ERROR;
+  }
   fprintf(stderr, "INFO: [%d] write, accepting %lu bytes\n",
           handle->idx, (long)realsize);
   return realsize;
@@ -186,15 +207,43 @@ int main(int argc, char *argv[])
   char *url, *host = NULL, *port = NULL;
   int all_paused = 0;
   int resume_round = -1;
+  int http_version = CURL_HTTP_VERSION_2_0;
+  int ch;
+
+  while((ch = getopt(argc, argv, "hV:")) != -1) {
+    switch(ch) {
+    case 'h':
+      usage(NULL);
+      return 2;
+    case 'V': {
+      if(!strcmp("http/1.1", optarg))
+        http_version = CURL_HTTP_VERSION_1_1;
+      else if(!strcmp("h2", optarg))
+        http_version = CURL_HTTP_VERSION_2_0;
+      else if(!strcmp("h3", optarg))
+        http_version = CURL_HTTP_VERSION_3ONLY;
+      else {
+        usage("invalid http version");
+        return 1;
+      }
+      break;
+    }
+    default:
+     usage("invalid option");
+     return 1;
+    }
+  }
+  argc -= optind;
+  argv += optind;
 
-  if(argc != 2) {
+  if(argc != 1) {
     fprintf(stderr, "ERROR: need URL as argument\n");
     return 2;
   }
-  url = argv[1];
+  url = argv[0];
 
   curl_global_init(CURL_GLOBAL_DEFAULT);
-  curl_global_trace("ids,time,http/2");
+  curl_global_trace("ids,time,http/2,http/3");
 
   cu = curl_url();
   if(!cu) {
@@ -222,6 +271,8 @@ int main(int argc, char *argv[])
     handles[i].idx = i;
     handles[i].paused = 0;
     handles[i].resumed = 0;
+    handles[i].errored = 0;
+    handles[i].fail_write = 1;
     handles[i].h = curl_easy_init();
     if(!handles[i].h ||
       curl_easy_setopt(handles[i].h, CURLOPT_WRITEFUNCTION, cb) != CURLE_OK ||
@@ -233,9 +284,11 @@ int main(int argc, char *argv[])
         != CURLE_OK ||
       curl_easy_setopt(handles[i].h, CURLOPT_SSL_VERIFYPEER, 0L) != CURLE_OK ||
       curl_easy_setopt(handles[i].h, CURLOPT_RESOLVE, resolve) != CURLE_OK ||
+      curl_easy_setopt(handles[i].h, CURLOPT_PIPEWAIT, 1L) ||
       curl_easy_setopt(handles[i].h, CURLOPT_URL, url) != CURLE_OK) {
       err();
     }
+    curl_easy_setopt(handles[i].h, CURLOPT_HTTP_VERSION, (long)http_version);
   }
 
   multi_handle = curl_multi_init();
@@ -269,6 +322,11 @@ int main(int argc, char *argv[])
           fprintf(stderr, "ERROR: [%d] NOT resumed!\n", i);
           as_expected = 0;
         }
+        else if(handles[i].errored != 1) {
+          fprintf(stderr, "ERROR: [%d] NOT errored once, %d instead!\n",
+                  i, handles[i].errored);
+          as_expected = 0;
+        }
       }
       if(!as_expected) {
         fprintf(stderr, "ERROR: handles not in expected state "
@@ -308,7 +366,7 @@ int main(int argc, char *argv[])
       if(all_paused) {
         fprintf(stderr, "INFO: all transfers paused\n");
         /* give transfer some rounds to mess things up */
-        resume_round = rounds + 3;
+        resume_round = rounds + 2;
       }
     }
     if(resume_round > 0 && rounds == resume_round) {
index 00b4a04a4a48d59941ef269c458ec66b1485c14c..e0010a96ffdfe6321b240f7738c5404d145d3e91 100644 (file)
@@ -445,12 +445,30 @@ class TestDownload:
         r.check_exit_code(0)
 
     # test on paused transfers, based on issue #11982
-    def test_02_27_paused_no_cl(self, env: Env, httpd, nghttpx, repeat):
-        proto = 'h2'
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_02_27a_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
         url = f'https://{env.authority_for(env.domain1, proto)}' \
-            '/tweak?&chunks=2&chunk_size=16000'
+            '/curltest/tweak/?&chunks=6&chunk_size=8000'
         client = LocalClient(env=env, name='h2-pausing')
-        r = client.run(args=[url])
+        r = client.run(args=['-V', proto, url])
+        r.check_exit_code(0)
+
+    # test on paused transfers, based on issue #11982
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_02_27b_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
+        url = f'https://{env.authority_for(env.domain1, proto)}' \
+            '/curltest/tweak/?error=502'
+        client = LocalClient(env=env, name='h2-pausing')
+        r = client.run(args=['-V', proto, url])
+        r.check_exit_code(0)
+
+    # test on paused transfers, based on issue #11982
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_02_27c_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
+        url = f'https://{env.authority_for(env.domain1, proto)}' \
+            '/curltest/tweak/?status=200&chunks=1&chunk_size=100'
+        client = LocalClient(env=env, name='h2-pausing')
+        r = client.run(args=['-V', proto, url])
         r.check_exit_code(0)
 
     @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])