]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
http2: do flow window accounting for cancelled streams
authorStefan Eissing <stefan@eissing.org>
Fri, 28 Apr 2023 09:27:25 +0000 (11:27 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Fri, 28 Apr 2023 11:55:39 +0000 (13:55 +0200)
- nghttp2 does not free connection level window flow for
  aborted streams
- when closing transfers, make sure that any buffered
  response data is "given back" to the flow control window
- add tests test_02_22 and test_02_23 to reproduce

Closes #11052

lib/http2.c
tests/http/clients/h2-download.c
tests/http/test_02_download.py

index 0e361ab487e0ead29f9bf00ea6027a002cdcfb29..539b1fc815dc467616db1796ba06dc240e4daad4 100644 (file)
@@ -160,6 +160,9 @@ static void cf_h2_ctx_free(struct cf_h2_ctx *ctx)
   }
 }
 
+static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
+                                  struct Curl_easy *data);
+
 /**
  * All about the H3 internals of a stream
  */
@@ -272,6 +275,16 @@ static void http2_data_done(struct Curl_cfilter *cf,
                                     stream->id, NGHTTP2_STREAM_CLOSED))
         (void)nghttp2_session_send(ctx->h2);
     }
+    if(!Curl_bufq_is_empty(&stream->recvbuf)) {
+      /* Anything in the recvbuf is still being counted
+       * in stream and connection window flow control. Need
+       * to free that space or the connection window might get
+       * exhausted eventually. */
+      nghttp2_session_consume(ctx->h2, stream->id,
+                              Curl_bufq_len(&stream->recvbuf));
+      /* give WINDOW_UPATE a chance to be sent */
+      h2_progress_egress(cf, data);
+    }
 
     /* -1 means unassigned and 0 means cleared */
     if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) {
@@ -1825,7 +1838,7 @@ out:
                   ctx->h2, stream->id),
                 nghttp2_session_get_stream_effective_local_window_size(
                   ctx->h2, stream->id),
-                nghttp2_session_get_effective_local_window_size(ctx->h2),
+                nghttp2_session_get_local_window_size(ctx->h2),
                 HTTP2_HUGE_WINDOW_SIZE));
 
   CF_DATA_RESTORE(cf, save);
index dd621d3f4f51e92555f59bcb38137b5e315c6988..24ccedbddd58c8200ad78bcebf644e9e2bc688cc 100644 (file)
@@ -90,12 +90,13 @@ struct transfer {
   FILE *out;
   curl_off_t recv_size;
   curl_off_t pause_at;
+  int started;
   int paused;
   int resumed;
   int done;
 };
 
-static size_t transfer_count;
+static size_t transfer_count = 1;
 static struct transfer *transfers;
 
 static struct transfer *get_transfer_for_easy(CURL *easy)
@@ -117,7 +118,7 @@ static size_t my_write_cb(char *buf, size_t nitems, size_t buflen,
   if(!t->resumed &&
      t->recv_size < t->pause_at &&
      ((curl_off_t)(t->recv_size + (nitems * buflen)) >= t->pause_at)) {
-    fprintf(stderr, "transfer %d: PAUSE\n", t->idx);
+    fprintf(stderr, "[t-%d] PAUSE\n", t->idx);
     t->paused = 1;
     return CURL_WRITEFUNC_PAUSE;
   }
@@ -132,7 +133,7 @@ static size_t my_write_cb(char *buf, size_t nitems, size_t buflen,
 
   nwritten = fwrite(buf, nitems, buflen, t->out);
   if(nwritten < 0) {
-    fprintf(stderr, "transfer %d: write failure\n", t->idx);
+    fprintf(stderr, "[t-%d] write failure\n", t->idx);
     return 0;
   }
   t->recv_size += nwritten;
@@ -162,27 +163,65 @@ static int setup(CURL *hnd, const char *url, struct transfer *t)
   return 0; /* all is good */
 }
 
+static void usage(const char *msg)
+{
+  if(msg)
+    fprintf(stderr, "%s\n", msg);
+  fprintf(stderr,
+    "usage: [options] url\n"
+    "  download a url with following options:\n"
+    "  -m number  max parallel downloads\n"
+    "  -n number  total downloads\n"
+    "  -p number  pause transfer after `number` response bytes\n"
+  );
+}
+
 /*
  * Download a file over HTTP/2, take care of server push.
  */
 int main(int argc, char *argv[])
 {
   CURLM *multi_handle;
-  int active_transfers;
   struct CURLMsg *m;
   const char *url;
-  size_t i;
-  long pause_offset;
+  size_t i, n, max_parallel = 1;
+  size_t active_transfers;
+  long pause_offset = 0;
+  int abort_paused = 0;
   struct transfer *t;
+  int ch;
 
-  if(argc != 4) {
-    fprintf(stderr, "usage: h2-download count pause-offset url\n");
-    return 2;
+  while((ch = getopt(argc, argv, "ahm:n:P:")) != -1) {
+    switch(ch) {
+    case 'h':
+      usage(NULL);
+      return 2;
+      break;
+    case 'a':
+      abort_paused = 1;
+      break;
+    case 'm':
+      max_parallel = (size_t)strtol(optarg, NULL, 10);
+      break;
+    case 'n':
+      transfer_count = (size_t)strtol(optarg, NULL, 10);
+      break;
+    case 'P':
+      pause_offset = strtol(optarg, NULL, 10);
+      break;
+    default:
+     usage("invalid option");
+     return 1;
+    }
   }
+  argc -= optind;
+  argv += optind;
 
-  transfer_count = (size_t)strtol(argv[1], NULL, 10);
-  pause_offset = strtol(argv[2], NULL, 10);
-  url = argv[3];
+  if(argc != 1) {
+    usage("not enough arguments");
+    return 2;
+  }
+  url = argv[0];
 
   transfers = calloc(transfer_count, sizeof(*transfers));
   if(!transfers) {
@@ -198,13 +237,20 @@ int main(int argc, char *argv[])
     t = &transfers[i];
     t->idx = (int)i;
     t->pause_at = (curl_off_t)pause_offset * i;
+  }
+
+  n = (max_parallel < transfer_count)? max_parallel : transfer_count;
+  for(i = 0; i < n; ++i) {
+    t = &transfers[i];
     t->easy = curl_easy_init();
     if(!t->easy || setup(t->easy, url, t)) {
-      fprintf(stderr, "setup of transfer #%d failed\n", (int)i);
+      fprintf(stderr, "[t-%d] FAILED setup\n", (int)i);
       return 1;
     }
     curl_multi_add_handle(multi_handle, t->easy);
+    t->started = 1;
     ++active_transfers;
+    fprintf(stderr, "[t-%d] STARTED\n", t->idx);
   }
 
   do {
@@ -220,11 +266,6 @@ int main(int argc, char *argv[])
     if(mc)
       break;
 
-    /*
-     * A little caution when doing server push is that libcurl itself has
-     * created and added one or more easy handles but we need to clean them up
-     * when we are done.
-     */
     do {
       int msgq = 0;
       m = curl_multi_info_read(multi_handle, &msgq);
@@ -240,18 +281,53 @@ int main(int argc, char *argv[])
           curl_easy_cleanup(e);
       }
       else {
-        /* nothing happending, resume one paused transfer if there is one */
-        for(i = 0; i < transfer_count; ++i) {
-          t = &transfers[i];
-          if(!t->done && t->paused) {
-            t->resumed = 1;
-            t->paused = 0;
-            curl_easy_pause(t->easy, CURLPAUSE_CONT);
-            fprintf(stderr, "transfer %d: RESUME\n", t->idx);
-            break;
+        /* nothing happening, maintenance */
+        if(abort_paused) {
+          /* abort paused transfers */
+          for(i = 0; i < transfer_count; ++i) {
+            t = &transfers[i];
+            if(!t->done && t->paused && t->easy) {
+              curl_multi_remove_handle(multi_handle, t->easy);
+              t->done = 1;
+              active_transfers--;
+              fprintf(stderr, "[t-%d] ABORTED\n", t->idx);
+            }
+          }
+        }
+        else {
+          /* resume one paused transfer */
+          for(i = 0; i < transfer_count; ++i) {
+            t = &transfers[i];
+            if(!t->done && t->paused) {
+              t->resumed = 1;
+              t->paused = 0;
+              curl_easy_pause(t->easy, CURLPAUSE_CONT);
+              fprintf(stderr, "[t-%d] RESUMED\n", t->idx);
+              break;
+            }
           }
         }
 
+        while(active_transfers < max_parallel) {
+          for(i = 0; i < transfer_count; ++i) {
+            t = &transfers[i];
+            if(!t->started) {
+              t->easy = curl_easy_init();
+              if(!t->easy || setup(t->easy, url, t)) {
+                fprintf(stderr, "[t-%d] FAILEED setup\n", (int)i);
+                return 1;
+              }
+              curl_multi_add_handle(multi_handle, t->easy);
+              t->started = 1;
+              ++active_transfers;
+              fprintf(stderr, "[t-%d] STARTED\n", t->idx);
+              break;
+            }
+          }
+          /* all started */
+          if(i == transfer_count)
+            break;
+        }
       }
     } while(m);
 
index bd99d2a209aaad6cdd559f5bd4740c1f41512078..8336f5ffc3374f5e9a89a863c4e7bad4ba0fff9c 100644 (file)
@@ -281,25 +281,65 @@ class TestDownload:
         assert httpd.stop()
         assert httpd.start()
 
-    # download via lib client, pause/resume at different offsets
+    # download via lib client, 1 at a time, pause/resume at different offsets
     @pytest.mark.parametrize("pause_offset", [0, 10*1024, 100*1023, 640000])
-    def test_02_21_h2_lib_download(self, env: Env, httpd, nghttpx, pause_offset, repeat):
+    def test_02_21_h2_lib_serial(self, env: Env, httpd, nghttpx, pause_offset, repeat):
         count = 10
         docname = 'data-10m'
         url = f'https://localhost:{env.https_port}/{docname}'
         client = LocalClient(name='h2-download', env=env)
         if not client.exists():
             pytest.skip(f'example client not built: {client.name}')
-        r = client.run(args=[str(count), str(pause_offset), url])
+        r = client.run(args=[
+             '-n', f'{count}', '-P', f'{pause_offset}', url
+        ])
+        r.check_exit_code(0)
+        srcfile = os.path.join(httpd.docs_dir, docname)
+        self.check_downloads(client, srcfile, count)
+
+    # download via lib client, several at a time, pause/resume
+    @pytest.mark.parametrize("pause_offset", [100*1023])
+    def test_02_22_h2_lib_parallel_resume(self, env: Env, httpd, nghttpx, pause_offset, repeat):
+        count = 10
+        max_parallel = 5
+        docname = 'data-10m'
+        url = f'https://localhost:{env.https_port}/{docname}'
+        client = LocalClient(name='h2-download', env=env)
+        if not client.exists():
+            pytest.skip(f'example client not built: {client.name}')
+        r = client.run(args=[
+            '-n', f'{count}', '-m', f'{max_parallel}',
+            '-P', f'{pause_offset}', url
+        ])
         r.check_exit_code(0)
         srcfile = os.path.join(httpd.docs_dir, docname)
         self.check_downloads(client, srcfile, count)
 
-    def check_downloads(self, client, srcfile: str, count: int):
+    # download, several at a time, pause and abort paused
+    @pytest.mark.parametrize("pause_offset", [100*1023])
+    def test_02_23_h2_lib_parallel_abort(self, env: Env, httpd, nghttpx, pause_offset, repeat):
+        count = 200
+        max_parallel = 100
+        docname = 'data-10m'
+        url = f'https://localhost:{env.https_port}/{docname}'
+        client = LocalClient(name='h2-download', env=env)
+        if not client.exists():
+            pytest.skip(f'example client not built: {client.name}')
+        r = client.run(args=[
+            '-n', f'{count}', '-m', f'{max_parallel}', '-a',
+            '-P', f'{pause_offset}', url
+        ])
+        r.check_exit_code(0)
+        srcfile = os.path.join(httpd.docs_dir, docname)
+        # downloads should be there, but not necessarily complete
+        self.check_downloads(client, srcfile, count, complete=False)
+
+    def check_downloads(self, client, srcfile: str, count: int,
+                        complete: bool = True):
         for i in range(count):
             dfile = client.download_file(i)
             assert os.path.exists(dfile)
-            if not filecmp.cmp(srcfile, dfile, shallow=False):
+            if complete and not filecmp.cmp(srcfile, dfile, shallow=False):
                 diff = "".join(difflib.unified_diff(a=open(srcfile).readlines(),
                                                     b=open(dfile).readlines(),
                                                     fromfile=srcfile,