]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
curl: fix --test-event --parallel
authorStefan Eissing <stefan@eissing.org>
Tue, 6 Aug 2024 09:23:04 +0000 (11:23 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Wed, 7 Aug 2024 06:57:05 +0000 (08:57 +0200)
(in debug-builds)

Fix implementation in curl using libuv to process parallel transfers.
Add pytest capabilities to run test cases with --test-event.

- fix uv_timer handling to carry correct 'data' pointing to uv context.
- fix uv_loop handling to reap and add transfers when possible
- fix return code when a transfer errored

Closes #14413

src/tool_operate.c
tests/http/test_02_download.py
tests/http/testenv/client.py
tests/http/testenv/curl.py

index 864ec959b761fee24ca95fd0efb51f4c0925163b..90380063b1f2be807aeb22db45cada93c083c23e 100644 (file)
@@ -2534,6 +2534,9 @@ struct parastate {
 };
 
 #if defined(DEBUGBUILD) && defined(USE_LIBUV)
+
+#define DEBUG_UV    0
+
 /* object to pass to the callbacks */
 struct datauv {
   uv_timer_t timeout;
@@ -2549,9 +2552,24 @@ struct contextuv {
 
 static CURLcode check_finished(struct parastate *s);
 
-static void check_multi_info(struct contextuv *context)
+static void check_multi_info(struct datauv *uv)
 {
-  (void)check_finished(context->uv->s);
+  CURLcode result;
+
+  result = check_finished(uv->s);
+  if(result && !uv->s->result)
+    uv->s->result = result;
+
+  if(uv->s->more_transfers) {
+    result = add_parallel_transfers(uv->s->global, uv->s->multi,
+                                    uv->s->share,
+                                    &uv->s->more_transfers,
+                                    &uv->s->added_transfers);
+    if(result && !uv->s->result)
+      uv->s->result = result;
+    if(result)
+      uv_stop(uv->loop);
+  }
 }
 
 /* callback from libuv on socket activity */
@@ -2567,17 +2585,19 @@ static void on_uv_socket(uv_poll_t *req, int status, int events)
 
   curl_multi_socket_action(c->uv->s->multi, c->sockfd, flags,
                            &c->uv->s->still_running);
-  check_multi_info(c);
 }
 
 /* callback from libuv when timeout expires */
 static void on_uv_timeout(uv_timer_t *req)
 {
-  struct contextuv *c = (struct contextuv *) req->data;
-  if(c) {
-    curl_multi_socket_action(c->uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
-                             &c->uv->s->still_running);
-    check_multi_info(c);
+  struct datauv *uv = (struct datauv *) req->data;
+#if DEBUG_UV
+  fprintf(tool_stderr, "parallel_event: on_uv_timeout\n");
+#endif
+  if(uv && uv->s) {
+    curl_multi_socket_action(uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
+                             &uv->s->still_running);
+    check_multi_info(uv);
   }
 }
 
@@ -2586,6 +2606,9 @@ static int cb_timeout(CURLM *multi, long timeout_ms,
                       struct datauv *uv)
 {
   (void)multi;
+#if DEBUG_UV
+  fprintf(tool_stderr, "parallel_event: cb_timeout=%ld\n", timeout_ms);
+#endif
   if(timeout_ms < 0)
     uv_timer_stop(&uv->timeout);
   else {
@@ -2656,6 +2679,8 @@ static int cb_socket(CURL *easy, curl_socket_t s, int action,
       uv_poll_stop(&c->poll_handle);
       destroy_context(c);
       curl_multi_assign(uv->s->multi, s, NULL);
+      /* check if we can do more now */
+      check_multi_info(uv);
     }
     break;
   default:
@@ -2670,9 +2695,11 @@ static CURLcode parallel_event(struct parastate *s)
   CURLcode result = CURLE_OK;
   struct datauv uv = { 0 };
 
+  s->result = CURLE_OK;
+  uv.s = s;
   uv.loop = uv_default_loop();
   uv_timer_init(uv.loop, &uv.timeout);
-  uv.s = s;
+  uv.timeout.data = &uv;
 
   /* setup event callbacks */
   curl_multi_setopt(s->multi, CURLMOPT_SOCKETFUNCTION, cb_socket);
@@ -2682,10 +2709,49 @@ static CURLcode parallel_event(struct parastate *s)
 
   /* kickstart the thing */
   curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0,
-                           &uv.s->still_running);
-  uv_run(uv.loop, UV_RUN_DEFAULT);
+                           &s->still_running);
 
-  return result;
+  while(!s->mcode && (s->still_running || s->more_transfers)) {
+#if DEBUG_UV
+    fprintf(tool_stderr, "parallel_event: uv_run(), mcode=%d, %d running, "
+            "%d more\n", s->mcode, uv.s->still_running, s->more_transfers);
+#endif
+    uv_run(uv.loop, UV_RUN_DEFAULT);
+#if DEBUG_UV
+    fprintf(tool_stderr, "parallel_event: uv_run() returned\n");
+#endif
+
+    result = check_finished(s);
+    if(result && !s->result)
+      s->result = result;
+
+    /* early exit called */
+    if(s->wrapitup) {
+      if(s->still_running && !s->wrapitup_processed) {
+        struct per_transfer *per;
+        for(per = transfers; per; per = per->next) {
+          if(per->added)
+            per->abort = TRUE;
+        }
+        s->wrapitup_processed = TRUE;
+      }
+      break;
+    }
+
+    if(s->more_transfers) {
+      result = add_parallel_transfers(s->global, s->multi, s->share,
+                                      &s->more_transfers, &s->added_transfers);
+      if(result && !s->result)
+        s->result = result;
+    }
+  }
+
+#if DEBUG_UV
+  fprintf(tool_stderr, "DONE parallel_event -> %d, mcode=%d, %d running, "
+          "%d more\n",
+          s->result, s->mcode, uv.s->still_running, s->more_transfers);
+#endif
+  return s->result;
 }
 
 #endif
index ff6a0bd1464d67ff83712ec0de63ad1ce4b3934d..55d0b0ce0a98bf979b0f9b15f6f5b95ee176e72e 100644 (file)
@@ -87,18 +87,19 @@ class TestDownload:
         r.check_response(http_status=200, count=100, connect_count=1)
 
     # download 100 files parallel
-    @pytest.mark.parametrize("proto", ['h2', 'h3'])
-    def test_02_04_download_100_parallel(self, env: Env,
+    @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
+    def test_02_04_download_20_parallel(self, env: Env,
                                          httpd, nghttpx, repeat, proto):
         if proto == 'h3' and not env.have_h3():
             pytest.skip("h3 not supported")
-        max_parallel = 50
+        count = 20
+        max_parallel = 10
         curl = CurlClient(env=env)
-        urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-99]'
+        urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]'
         r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
             '--parallel', '--parallel-max', f'{max_parallel}'
         ])
-        r.check_response(http_status=200, count=100)
+        r.check_response(http_status=200, count=count)
         if proto == 'http/1.1':
             # http/1.1 parallel transfers will open multiple connections
             assert r.total_connects > 1, r.dump_logs()
index e8ffb040aaf74fc0ce0f42a51dd0d9dd544b35d1..0a0030c75e3fdf010215decace35a4388b1b4c6b 100644 (file)
@@ -50,7 +50,7 @@ class LocalClient:
         self.name = name
         self.path = os.path.join(env.project_dir, f'tests/http/clients/{name}')
         self.env = env
-        self._run_env= run_env
+        self._run_env = run_env
         self._timeout = timeout if timeout else env.test_timeout
         self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
         self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, name)
@@ -92,12 +92,18 @@ class LocalClient:
         exception = None
         myargs = [self.path]
         myargs.extend(args)
+        run_env = None
+        if self._run_env:
+            run_env = self._run_env.copy()
+            for key in ['CURL_DEBUG']:
+                if key in os.environ and key not in run_env:
+                    run_env[key] = os.environ[key]
         try:
             with open(self._stdoutfile, 'w') as cout:
                 with open(self._stderrfile, 'w') as cerr:
                     p = subprocess.run(myargs, stderr=cerr, stdout=cout,
                                        cwd=self._run_dir, shell=False,
-                                       input=None, env=self._run_env,
+                                       input=None, env=run_env,
                                        timeout=self._timeout)
                     exitcode = p.returncode
         except subprocess.TimeoutExpired:
index f89b2c9a8ee17d21886a83c13ef249bb0bc84a2d..320185038733f43cd9b1644f6bb341b61c9902d4 100644 (file)
@@ -824,6 +824,9 @@ class CurlClient:
             urls = [urls]
 
         args = [self._curl, "-s", "--path-as-is"]
+        if 'CURL_TEST_EVENT' in os.environ:
+            args.append('--test-event')
+
         if with_headers:
             args.extend(["-D", self._headerfile])
         if def_tracing is not False and not self._silent: