]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
curl: use libuv for parallel transfers with --test-event
authorDaniel Stenberg <daniel@haxx.se>
Mon, 5 Aug 2024 07:12:04 +0000 (09:12 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 5 Aug 2024 07:12:09 +0000 (09:12 +0200)
add --with-libuv to configure to (optionally) use it in debug-builds to
drive the event-based API

Use curl_multi_socket_action() and friends to drive parallel transfers.

tests/README has brief documentation for this

Closes #14298

configure.ac
src/tool_operate.c
tests/README.md

index cee96e541f3c14fe80baae03509cbfab9eff93aa..a991a0d285f76585961527c011c82978ec8b4bd6 100644 (file)
@@ -3511,6 +3511,93 @@ if test X"$want_msh3" != Xno; then
   )
 fi
 
+dnl **********************************************************************
+dnl libuv is only ever used for debug purposes
+dnl **********************************************************************
+
+OPT_LIBUV=no
+AC_ARG_WITH(libuv,
+AS_HELP_STRING([--with-libuv=PATH],[Enable libuv])
+AS_HELP_STRING([--without-libuv],[Disable libuv]),
+  [OPT_LIBUV=$withval])
+case "$OPT_LIBUV" in
+  no)
+    dnl --without-libuv option used
+    want_libuv="no"
+    ;;
+  yes)
+    dnl --with-libuv option used without path
+    want_libuv="default"
+    want_libuv_path=""
+    ;;
+  *)
+    dnl --with-libuv option used with path
+    want_libuv="yes"
+    want_libuv_path="$withval"
+    ;;
+esac
+
+if test X"$want_libuv" != Xno; then
+  if test x$want_debug != xyes; then
+    AC_MSG_ERROR([Using libuv without debug support enabled is useless])
+  fi
+
+  dnl backup the pre-libuv variables
+  CLEANLDFLAGS="$LDFLAGS"
+  CLEANCPPFLAGS="$CPPFLAGS"
+  CLEANLIBS="$LIBS"
+
+  CURL_CHECK_PKGCONFIG(libuv, $want_libuv_path)
+
+  if test "$PKGCONFIG" != "no" ; then
+    LIB_LIBUV=`CURL_EXPORT_PCDIR([$want_libuv_path])
+      $PKGCONFIG --libs-only-l libuv`
+    AC_MSG_NOTICE([-l is $LIB_LIBUV])
+
+    CPP_LIBUV=`CURL_EXPORT_PCDIR([$want_libuv_path]) dnl
+      $PKGCONFIG --cflags-only-I libuv`
+    AC_MSG_NOTICE([-I is $CPP_LIBUV])
+
+    LD_LIBUV=`CURL_EXPORT_PCDIR([$want_libuv_path])
+      $PKGCONFIG --libs-only-L libuv`
+    AC_MSG_NOTICE([-L is $LD_LIBUV])
+
+    LDFLAGS="$LDFLAGS $LD_LIBUV"
+    CPPFLAGS="$CPPFLAGS $CPP_LIBUV"
+    LIBS="$LIB_LIBUV $LIBS"
+
+    if test "x$cross_compiling" != "xyes"; then
+      DIR_LIBUV=`echo $LD_LIBUV | $SED -e 's/^-L//'`
+    fi
+    AC_CHECK_LIB(uv, uv_default_loop,
+      [
+       AC_CHECK_HEADERS(uv.h,
+          LIBUV_ENABLED=1
+          AC_DEFINE(USE_LIBUV, 1, [if libuv is in use])
+          AC_SUBST(USE_LIBUV, [1])
+          CURL_LIBRARY_PATH="$CURL_LIBRARY_PATH:$DIR_LIBUV"
+          export CURL_LIBRARY_PATH
+          AC_MSG_NOTICE([Added $DIR_LIBUV to CURL_LIBRARY_PATH])
+          LIBCURL_PC_REQUIRES_PRIVATE="$LIBCURL_PC_REQUIRES_PRIVATE libuv"
+       )
+      ],
+        dnl not found, revert back to clean variables
+        LDFLAGS=$CLEANLDFLAGS
+        CPPFLAGS=$CLEANCPPFLAGS
+        LIBS=$CLEANLIBS
+    )
+
+  else
+    dnl no libuv pkg-config found, deal with it
+    if test X"$want_libuv" != Xdefault; then
+      dnl To avoid link errors, we do not allow --with-libuv without
+      dnl a pkgconfig file
+      AC_MSG_ERROR([--with-libuv was specified but could not find libuv pkg-config file.])
+    fi
+  fi
+
+fi
+
 dnl **********************************************************************
 dnl Check for zsh completion path
 dnl **********************************************************************
index a3fff2a51aba50b0a8f4dcd0cda744038b918a2e..48cc6db63361258b34b84ecd2ebe6ae72479d3a2 100644 (file)
 #  include <netinet/in.h>
 #endif
 
+#ifdef HAVE_UV_H
+/* this is for libuv-enabled debug builds only */
+#include <uv.h>
+#endif
+
 #define ENABLE_CURLX_PRINTF
 /* use our own printf() functions */
 #include "curlx.h"
@@ -2445,6 +2450,10 @@ static CURLcode add_parallel_transfers(struct GlobalConfig *global,
     (void)curl_easy_setopt(per->curl, CURLOPT_XFERINFOFUNCTION, xferinfo_cb);
     (void)curl_easy_setopt(per->curl, CURLOPT_XFERINFODATA, per);
     (void)curl_easy_setopt(per->curl, CURLOPT_NOPROGRESS, 0L);
+#ifdef DEBUGBUILD
+    if(getenv("CURL_FORBID_REUSE"))
+      (void)curl_easy_setopt(per->curl, CURLOPT_FORBID_REUSE, 1L);
+#endif
 
     mcode = curl_multi_add_handle(multi, per->curl);
     if(mcode) {
@@ -2469,136 +2478,320 @@ static CURLcode add_parallel_transfers(struct GlobalConfig *global,
   return CURLE_OK;
 }
 
-static CURLcode parallel_transfers(struct GlobalConfig *global,
-                                   CURLSH *share)
-{
+struct parastate {
+  struct GlobalConfig *global;
   CURLM *multi;
-  CURLMcode mcode = CURLM_OK;
-  CURLcode result = CURLE_OK;
-  int still_running = 1;
-  struct timeval start = tvnow();
+  CURLSH *share;
+  CURLMcode mcode;
+  CURLcode result;
+  int still_running;
+  struct timeval start;
   bool more_transfers;
   bool added_transfers;
   /* wrapitup is set TRUE after a critical error occurs to end all transfers */
-  bool wrapitup = FALSE;
+  bool wrapitup;
   /* wrapitup_processed is set TRUE after the per transfer abort flag is set */
-  bool wrapitup_processed = FALSE;
-  time_t tick = time(NULL);
+  bool wrapitup_processed;
+  time_t tick;
+};
+
+#if defined(DEBUGBUILD) && defined(USE_LIBUV)
+/* object to pass to the callbacks */
+struct datauv {
+  uv_timer_t timeout;
+  uv_loop_t *loop;
+  struct parastate *s;
+};
+
+struct contextuv {
+  uv_poll_t poll_handle;
+  curl_socket_t sockfd;
+  struct datauv *uv;
+};
+
+static CURLcode check_finished(struct parastate *s);
+
+static void check_multi_info(struct contextuv *context)
+{
+  (void)check_finished(context->uv->s);
+}
+
+/* callback from libuv on socket activity */
+static void on_uv_socket(uv_poll_t *req, int status, int events)
+{
+  int flags = 0;
+  struct contextuv *c = (struct contextuv *) req->data;
+  (void)status;
+  if(events & UV_READABLE)
+    flags |= CURL_CSELECT_IN;
+  if(events & UV_WRITABLE)
+    flags |= CURL_CSELECT_OUT;
+
+  curl_multi_socket_action(c->uv->s->multi, c->sockfd, flags,
+                           &c->uv->s->still_running);
+  check_multi_info(c);
+}
+
+/* callback from libuv when timeout expires */
+static void on_uv_timeout(uv_timer_t *req)
+{
+  struct contextuv *c = (struct contextuv *) req->data;
+  if(c) {
+    curl_multi_socket_action(c->uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
+                             &c->uv->s->still_running);
+    check_multi_info(c);
+  }
+}
+
+/* callback from libcurl to update the timeout expiry */
+static int cb_timeout(CURLM *multi, long timeout_ms,
+                      struct datauv *uv)
+{
+  (void)multi;
+  if(timeout_ms < 0)
+    uv_timer_stop(&uv->timeout);
+  else {
+    if(timeout_ms == 0)
+      timeout_ms = 1; /* 0 means call curl_multi_socket_action asap but NOT
+                         within the callback itself */
+    uv_timer_start(&uv->timeout, on_uv_timeout, timeout_ms,
+                   0); /* do not repeat */
+  }
+  return 0;
+}
+
+static struct contextuv *create_context(curl_socket_t sockfd,
+                                        struct datauv *uv)
+{
+  struct contextuv *c;
+
+  c = (struct contextuv *) malloc(sizeof(*c));
+
+  c->sockfd = sockfd;
+  c->uv = uv;
+
+  uv_poll_init_socket(uv->loop, &c->poll_handle, sockfd);
+  c->poll_handle.data = c;
+
+  return c;
+}
+
+static void close_cb(uv_handle_t *handle)
+{
+  struct contextuv *c = (struct contextuv *) handle->data;
+  free(c);
+}
+
+static void destroy_context(struct contextuv *c)
+{
+  uv_close((uv_handle_t *) &c->poll_handle, close_cb);
+}
+
+/* callback from libcurl to update socket activity to wait for */
+static int cb_socket(CURL *easy, curl_socket_t s, int action,
+                     struct datauv *uv,
+                     void *socketp)
+{
+  struct contextuv *c;
+  int events = 0;
+  (void)easy;
+
+  switch(action) {
+  case CURL_POLL_IN:
+  case CURL_POLL_OUT:
+  case CURL_POLL_INOUT:
+    c = socketp ?
+      (struct contextuv *) socketp : create_context(s, uv);
+
+    curl_multi_assign(uv->s->multi, s, c);
+
+    if(action != CURL_POLL_IN)
+      events |= UV_WRITABLE;
+    if(action != CURL_POLL_OUT)
+      events |= UV_READABLE;
+
+    uv_poll_start(&c->poll_handle, events, on_uv_socket);
+    break;
+  case CURL_POLL_REMOVE:
+    if(socketp) {
+      c = (struct contextuv *)socketp;
+      uv_poll_stop(&c->poll_handle);
+      destroy_context(c);
+      curl_multi_assign(uv->s->multi, s, NULL);
+    }
+    break;
+  default:
+    abort();
+  }
+
+  return 0;
+}
 
-  multi = curl_multi_init();
-  if(!multi)
+static CURLcode parallel_event(struct parastate *s)
+{
+  CURLcode result = CURLE_OK;
+  struct datauv uv = { 0 };
+
+  uv.loop = uv_default_loop();
+  uv_timer_init(uv.loop, &uv.timeout);
+  uv.s = s;
+
+  /* setup event callbacks */
+  curl_multi_setopt(s->multi, CURLMOPT_SOCKETFUNCTION, cb_socket);
+  curl_multi_setopt(s->multi, CURLMOPT_SOCKETDATA, &uv);
+  curl_multi_setopt(s->multi, CURLMOPT_TIMERFUNCTION, cb_timeout);
+  curl_multi_setopt(s->multi, CURLMOPT_TIMERDATA, &uv);
+
+  /* kickstart the thing */
+  curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0,
+                           &uv.s->still_running);
+  uv_run(uv.loop, UV_RUN_DEFAULT);
+
+  return result;
+}
+
+#endif
+
+static CURLcode check_finished(struct parastate *s)
+{
+  CURLcode result = CURLE_OK;
+  int rc;
+  CURLMsg *msg;
+  bool checkmore = FALSE;
+  struct GlobalConfig *global = s->global;
+  progress_meter(global, &s->start, FALSE);
+  do {
+    msg = curl_multi_info_read(s->multi, &rc);
+    if(msg) {
+      bool retry;
+      long delay;
+      struct per_transfer *ended;
+      CURL *easy = msg->easy_handle;
+      CURLcode tres = msg->data.result;
+      curl_easy_getinfo(easy, CURLINFO_PRIVATE, (void *)&ended);
+      curl_multi_remove_handle(s->multi, easy);
+
+      if(ended->abort && (tres == CURLE_ABORTED_BY_CALLBACK) &&
+         ended->errorbuffer) {
+        msnprintf(ended->errorbuffer, CURL_ERROR_SIZE,
+                  "Transfer aborted due to critical error "
+                  "in another transfer");
+      }
+      tres = post_per_transfer(global, ended, tres, &retry, &delay);
+      progress_finalize(ended); /* before it goes away */
+      all_added--; /* one fewer added */
+      checkmore = TRUE;
+      if(retry) {
+        ended->added = FALSE; /* add it again */
+        /* we delay retries in full integer seconds only */
+        ended->startat = delay ? time(NULL) + delay/1000 : 0;
+      }
+      else {
+        /* result receives this transfer's error unless the transfer was
+           marked for abort due to a critical error in another transfer */
+        if(tres && (!ended->abort || !result))
+          result = tres;
+        if(is_fatal_error(result) || (result && global->fail_early))
+          s->wrapitup = TRUE;
+        (void)del_per_transfer(ended);
+      }
+    }
+  } while(msg);
+  if(!s->wrapitup) {
+    if(!checkmore) {
+      time_t tock = time(NULL);
+      if(s->tick != tock) {
+        checkmore = TRUE;
+        s->tick = tock;
+      }
+    }
+    if(checkmore) {
+      /* one or more transfers completed, add more! */
+      CURLcode tres = add_parallel_transfers(global, s->multi, s->share,
+                                             &s->more_transfers,
+                                             &s->added_transfers);
+      if(tres)
+        result = tres;
+      if(s->added_transfers)
+        /* we added new ones, make sure the loop does not exit yet */
+        s->still_running = 1;
+    }
+    if(is_fatal_error(result) || (result && global->fail_early))
+      s->wrapitup = TRUE;
+  }
+  return result;
+}
+
+static CURLcode parallel_transfers(struct GlobalConfig *global,
+                                   CURLSH *share)
+{
+  CURLcode result;
+  struct parastate p;
+  struct parastate *s = &p;
+  s->share = share;
+  s->mcode = CURLM_OK;
+  s->result = CURLE_OK;
+  s->still_running = 1;
+  s->start = tvnow();
+  s->wrapitup = FALSE;
+  s->wrapitup_processed = FALSE;
+  s->tick = time(NULL);
+  s->global = global;
+  s->multi = curl_multi_init();
+  if(!s->multi)
     return CURLE_OUT_OF_MEMORY;
 
-  result = add_parallel_transfers(global, multi, share,
-                                  &more_transfers, &added_transfers);
+  result = add_parallel_transfers(global, s->multi, s->share,
+                                  &s->more_transfers, &s->added_transfers);
   if(result) {
-    curl_multi_cleanup(multi);
+    curl_multi_cleanup(s->multi);
     return result;
   }
 
-  while(!mcode && (still_running || more_transfers)) {
+#ifdef DEBUGBUILD
+  if(global->test_event_based)
+#ifdef USE_LIBUV
+    result = parallel_event(s);
+#else
+    errorf(global, "Testing --parallel event-based requires libuv");
+#endif
+  else
+#endif
+  while(!s->mcode && (s->still_running || s->more_transfers)) {
     /* If stopping prematurely (eg due to a --fail-early condition) then signal
        that any transfers in the multi should abort (via progress callback). */
-    if(wrapitup) {
-      if(!still_running)
+    if(s->wrapitup) {
+      if(!s->still_running)
         break;
-      if(!wrapitup_processed) {
+      if(!s->wrapitup_processed) {
         struct per_transfer *per;
         for(per = transfers; per; per = per->next) {
           if(per->added)
             per->abort = TRUE;
         }
-        wrapitup_processed = TRUE;
+        s->wrapitup_processed = TRUE;
       }
     }
 
-    mcode = curl_multi_poll(multi, NULL, 0, 1000, NULL);
-    if(!mcode)
-      mcode = curl_multi_perform(multi, &still_running);
-
-    progress_meter(global, &start, FALSE);
-
-    if(!mcode) {
-      int rc;
-      CURLMsg *msg;
-      bool checkmore = FALSE;
-      do {
-        msg = curl_multi_info_read(multi, &rc);
-        if(msg) {
-          bool retry;
-          long delay;
-          struct per_transfer *ended;
-          CURL *easy = msg->easy_handle;
-          CURLcode tres = msg->data.result;
-          curl_easy_getinfo(easy, CURLINFO_PRIVATE, (void *)&ended);
-          curl_multi_remove_handle(multi, easy);
-
-          if(ended->abort && (tres == CURLE_ABORTED_BY_CALLBACK) &&
-             ended->errorbuffer) {
-            msnprintf(ended->errorbuffer, CURL_ERROR_SIZE,
-                      "Transfer aborted due to critical error "
-                      "in another transfer");
-          }
-          tres = post_per_transfer(global, ended, tres, &retry, &delay);
-          progress_finalize(ended); /* before it goes away */
-          all_added--; /* one fewer added */
-          checkmore = TRUE;
-          if(retry) {
-            ended->added = FALSE; /* add it again */
-            /* we delay retries in full integer seconds only */
-            ended->startat = delay ? time(NULL) + delay/1000 : 0;
-          }
-          else {
-            /* result receives this transfer's error unless the transfer was
-               marked for abort due to a critical error in another transfer */
-            if(tres && (!ended->abort || !result))
-              result = tres;
-            if(is_fatal_error(result) || (result && global->fail_early))
-              wrapitup = TRUE;
-            (void)del_per_transfer(ended);
-          }
-        }
-      } while(msg);
-      if(wrapitup) {
-        if(still_running)
-          continue;
-        else
-          break;
-      }
-      if(!checkmore) {
-        time_t tock = time(NULL);
-        if(tick != tock) {
-          checkmore = TRUE;
-          tick = tock;
-        }
-      }
-      if(checkmore) {
-        /* one or more transfers completed, add more! */
-        CURLcode tres = add_parallel_transfers(global, multi, share,
-                                               &more_transfers,
-                                               &added_transfers);
-        if(tres)
-          result = tres;
-        if(added_transfers)
-          /* we added new ones, make sure the loop does not exit yet */
-          still_running = 1;
-      }
-      if(is_fatal_error(result) || (result && global->fail_early))
-        wrapitup = TRUE;
-    }
+    s->mcode = curl_multi_poll(s->multi, NULL, 0, 1000, NULL);
+    if(!s->mcode)
+      s->mcode = curl_multi_perform(s->multi, &s->still_running);
+
+    if(!s->mcode)
+      result = check_finished(s);
   }
 
-  (void)progress_meter(global, &start, TRUE);
+  (void)progress_meter(global, &s->start, TRUE);
 
   /* Make sure to return some kind of error if there was a multi problem */
-  if(mcode) {
-    result = (mcode == CURLM_OUT_OF_MEMORY) ? CURLE_OUT_OF_MEMORY :
+  if(s->mcode) {
+    result = (s->mcode == CURLM_OUT_OF_MEMORY) ? CURLE_OUT_OF_MEMORY :
       /* The other multi errors should never happen, so return
          something suitably generic */
       CURLE_BAD_FUNCTION_ARGUMENT;
   }
 
-  curl_multi_cleanup(multi);
+  curl_multi_cleanup(s->multi);
 
   return result;
 }
index f450bbea3de74cefcd752e0c3c10ff8bdc628c89..0eb7b8948b44f140c0e7c7ba8b877637ffad0a42 100644 (file)
@@ -80,6 +80,22 @@ SPDX-License-Identifier: curl
   You may also need to manually install the Python package 'six'
   as that may be a missing requirement for impacket on Python 3.
 
+## Event-based
+
+  If curl is built with `Debug` enabled (see below), then the `runtests.pl`
+  script offers a `-e` option that makes it perform *event-based*. Such tests
+  invokes the curl tool with `--test-event`, a debug-only option made for this
+  purpose.
+
+  Performing event-based means that the curl tool uses the
+  `curl_multi_socket_action()` API call to drive the transfer(s), instead of
+  the otherwise "normal" functions it would use. This allows us to test drive
+  the socket_action API. Transfers done this way should work exactly the same
+  as with the non-event based API.
+
+  To be able to use `--test-event` together with `--parallel`, curl requires
+  *libuv* to be present and enabled in the build: `configure --enable-libuv`
+
 ### Port numbers used by test servers
 
   All test servers run on "random" port numbers. All tests should be written