]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
multi: multi_wait improvements
authorStefan Eissing <stefan@eissing.org>
Wed, 20 Mar 2024 07:08:43 +0000 (08:08 +0100)
committerDaniel Stenberg <daniel@haxx.se>
Thu, 25 Apr 2024 21:31:59 +0000 (23:31 +0200)
 - only call `multi_getsock()` once for all transfers
 - realloc pollset array on demand
 - fold repeated sockets

Closes #13150

lib/multi.c
tests/http/scorecard.py

index 7e7590d60f8bcbeb67dec8646f5c1c68cc49b50d..ea7961e34a7a5376bb84fb8662fd062f949a3e50 100644 (file)
@@ -1289,6 +1289,29 @@ static void reset_socket_fdwrite(curl_socket_t s)
 }
 #endif
 
+static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len,
+                               unsigned int inc, bool *is_malloced)
+{
+  struct pollfd *new_fds, *old_fds = *pfds;
+  unsigned int new_len = *pfds_len + inc;
+
+  new_fds = calloc(new_len, sizeof(struct pollfd));
+  if(!new_fds) {
+    if(*is_malloced)
+      free(old_fds);
+    *pfds = NULL;
+    *pfds_len = 0;
+    return CURLM_OUT_OF_MEMORY;
+  }
+  memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd));
+  if(*is_malloced)
+    free(old_fds);
+  *pfds = new_fds;
+  *pfds_len = new_len;
+  *is_malloced = TRUE;
+  return CURLM_OK;
+}
+
 #define NUM_POLLS_ON_STACK 10
 
 static CURLMcode multi_wait(struct Curl_multi *multi,
@@ -1302,12 +1325,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
   struct Curl_easy *data;
   struct easy_pollset ps;
   size_t i;
-  unsigned int nfds = 0;
-  unsigned int curlfds;
   long timeout_internal;
   int retcode = 0;
   struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
   struct pollfd *ufds = &a_few_on_stack[0];
+  unsigned int ufds_len = NUM_POLLS_ON_STACK;
+  unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */
   bool ufds_malloc = FALSE;
 #ifdef USE_WINSOCK
   WSANETWORKEVENTS wsa_events;
@@ -1326,13 +1349,6 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
   if(timeout_ms < 0)
     return CURLM_BAD_FUNCTION_ARGUMENT;
 
-  /* Count up how many fds we have from the multi handle */
-  memset(&ps, 0, sizeof(ps));
-  for(data = multi->easyp; data; data = data->next) {
-    multi_getsock(data, &ps);
-    nfds += ps.num;
-  }
-
   /* If the internally desired timeout is actually shorter than requested from
      the outside, then use the shorter time! But only if the internal timer
      is actually larger than -1! */
@@ -1340,70 +1356,61 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
   if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms))
     timeout_ms = (int)timeout_internal;
 
-  curlfds = nfds; /* number of internal file descriptors */
-  nfds += extra_nfds; /* add the externally provided ones */
-
-#ifdef ENABLE_WAKEUP
-#ifdef USE_WINSOCK
-  if(use_wakeup) {
-#else
-  if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
-#endif
-    ++nfds;
-  }
-#endif
-
-  if(nfds > NUM_POLLS_ON_STACK) {
-    /* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
-       big, so at 2^29 sockets this value might wrap. When a process gets
-       the capability to actually handle over 500 million sockets this
-       calculation needs an integer overflow check. */
-    ufds = malloc(nfds * sizeof(struct pollfd));
-    if(!ufds)
-      return CURLM_OUT_OF_MEMORY;
-    ufds_malloc = TRUE;
-  }
   nfds = 0;
+  memset(ufds, 0, ufds_len * sizeof(struct pollfd));
+  memset(&ps, 0, sizeof(ps));
 
-  /* only do the second loop if we found descriptors in the first stage run
-     above */
-
-  if(curlfds) {
-    /* Add the curl handles to our pollfds first */
-    for(data = multi->easyp; data; data = data->next) {
-      multi_getsock(data, &ps);
+  /* Add the curl handles to our pollfds first */
+  for(data = multi->easyp; data; data = data->next) {
+    multi_getsock(data, &ps);
 
-      for(i = 0; i < ps.num; i++) {
-        struct pollfd *ufd = &ufds[nfds++];
+    for(i = 0; i < ps.num; i++) {
+      short events = 0;
 #ifdef USE_WINSOCK
-        long mask = 0;
+      long mask = 0;
 #endif
-        ufd->fd = ps.sockets[i];
-        ufd->events = 0;
-        if(ps.actions[i] & CURL_POLL_IN) {
+      if(ps.actions[i] & CURL_POLL_IN) {
 #ifdef USE_WINSOCK
-          mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
+        mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
 #endif
-          ufd->events |= POLLIN;
-        }
-        if(ps.actions[i] & CURL_POLL_OUT) {
+        events |= POLLIN;
+      }
+      if(ps.actions[i] & CURL_POLL_OUT) {
 #ifdef USE_WINSOCK
-          mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
-          reset_socket_fdwrite(ps.sockets[i]);
+        mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
+        reset_socket_fdwrite(ps.sockets[i]);
 #endif
-          ufd->events |= POLLOUT;
+        events |= POLLOUT;
+      }
+      if(events) {
+        if(nfds && ps.sockets[i] == ufds[nfds-1].fd) {
+          ufds[nfds-1].events |= events;
         }
+        else {
+          if(nfds >= ufds_len) {
+            if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
+              return CURLM_OUT_OF_MEMORY;
+          }
+          DEBUGASSERT(nfds < ufds_len);
+          ufds[nfds].fd = ps.sockets[i];
+          ufds[nfds].events = events;
+          ++nfds;
+        }
+      }
 #ifdef USE_WINSOCK
+      if(mask) {
         if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) {
           if(ufds_malloc)
             free(ufds);
           return CURLM_INTERNAL_ERROR;
         }
-#endif
       }
+#endif
     }
   }
 
+  curl_nfds = nfds; /* what curl internally used in ufds */
+
   /* Add external file descriptions from poll-like struct curl_waitfd */
   for(i = 0; i < extra_nfds; i++) {
 #ifdef USE_WINSOCK
@@ -1422,6 +1429,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
       return CURLM_INTERNAL_ERROR;
     }
 #endif
+    if(nfds >= ufds_len) {
+      if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
+        return CURLM_OUT_OF_MEMORY;
+    }
+    DEBUGASSERT(nfds < ufds_len);
     ufds[nfds].fd = extra_fds[i].fd;
     ufds[nfds].events = 0;
     if(extra_fds[i].events & CURL_WAIT_POLLIN)
@@ -1436,6 +1448,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
 #ifdef ENABLE_WAKEUP
 #ifndef USE_WINSOCK
   if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
+    if(nfds >= ufds_len) {
+      if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
+        return CURLM_OUT_OF_MEMORY;
+    }
+    DEBUGASSERT(nfds < ufds_len);
     ufds[nfds].fd = multi->wakeup_pair[0];
     ufds[nfds].events = POLLIN;
     ++nfds;
@@ -1475,7 +1492,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
          struct, the bit values of the actual underlying poll() implementation
          may not be the same as the ones in the public libcurl API! */
       for(i = 0; i < extra_nfds; i++) {
-        unsigned r = ufds[curlfds + i].revents;
+        unsigned r = ufds[curl_nfds + i].revents;
         unsigned short mask = 0;
 #ifdef USE_WINSOCK
         curl_socket_t s = extra_fds[i].fd;
@@ -1508,7 +1525,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
 #ifdef USE_WINSOCK
       /* Count up all our own sockets that had activity,
          and remove them from the event. */
-      if(curlfds) {
+      if(curl_nfds) {
 
         for(data = multi->easyp; data; data = data->next) {
           multi_getsock(data, &ps);
@@ -1529,7 +1546,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
 #else
 #ifdef ENABLE_WAKEUP
       if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
-        if(ufds[curlfds + extra_nfds].revents & POLLIN) {
+        if(ufds[curl_nfds + extra_nfds].revents & POLLIN) {
           char buf[64];
           ssize_t nread;
           while(1) {
index 446a1bc5cfb839b4d7f5840a7a07ba9d486f6959..d4e19c101debfe27d57425d054dc397ed3071540 100644 (file)
@@ -49,13 +49,15 @@ class ScoreCard:
                  nghttpx: Optional[Nghttpx],
                  caddy: Optional[Caddy],
                  verbose: int,
-                 curl_verbose: int):
+                 curl_verbose: int,
+                 download_parallel: int = 0):
         self.verbose = verbose
         self.env = env
         self.httpd = httpd
         self.nghttpx = nghttpx
         self.caddy = caddy
         self._silent_curl = not curl_verbose
+        self._download_parallel = download_parallel
 
     def info(self, msg):
         if self.verbose > 0:
@@ -138,6 +140,7 @@ class ScoreCard:
         return {
             'count': count,
             'samples': sample_size,
+            'max-parallel': 1,
             'speed': mean(samples) if len(samples) else -1,
             'errors': errors,
             'stats': RunProfile.AverageStats(profiles),
@@ -164,6 +167,7 @@ class ScoreCard:
         return {
             'count': count,
             'samples': sample_size,
+            'max-parallel': 1,
             'speed': mean(samples) if len(samples) else -1,
             'errors': errors,
             'stats': RunProfile.AverageStats(profiles),
@@ -174,6 +178,7 @@ class ScoreCard:
         samples = []
         errors = []
         profiles = []
+        max_parallel = self._download_parallel if self._download_parallel > 0 else count
         url = f'{url}?[0-{count - 1}]'
         self.info(f'parallel...')
         for i in range(sample_size):
@@ -182,7 +187,7 @@ class ScoreCard:
                                    with_headers=False,
                                    with_profile=True,
                                    extra_args=['--parallel',
-                                               '--parallel-max', str(count)])
+                                               '--parallel-max', str(max_parallel)])
             err = self._check_downloads(r, count)
             if err:
                 errors.append(err)
@@ -193,6 +198,7 @@ class ScoreCard:
         return {
             'count': count,
             'samples': sample_size,
+            'max-parallel': max_parallel,
             'speed': mean(samples) if len(samples) else -1,
             'errors': errors,
             'stats': RunProfile.AverageStats(profiles),
@@ -436,7 +442,7 @@ class ScoreCard:
                     for mkey, mval in server_score[sskey].items():
                         if mkey not in measures:
                             measures.append(mkey)
-                            m_names[mkey] = f'{mkey}({mval["count"]}x)'
+                            m_names[mkey] = f'{mkey}({mval["count"]}x{mval["max-parallel"]})'
 
             print('Downloads')
             print(f'  {"Server":<8} {"Size":>8}', end='')
@@ -543,6 +549,8 @@ def main():
                         default=None, help="evaluate download size")
     parser.add_argument("--download-count", action='store', type=int,
                         default=50, help="perform that many downloads")
+    parser.add_argument("--download-parallel", action='store', type=int,
+                        default=0, help="perform that many downloads in parallel (default all)")
     parser.add_argument("-r", "--requests", action='store_true',
                         default=False, help="evaluate requests")
     parser.add_argument("--request-count", action='store', type=int,
@@ -607,7 +615,8 @@ def main():
             assert caddy.start()
 
         card = ScoreCard(env=env, httpd=httpd, nghttpx=nghttpx, caddy=caddy,
-                         verbose=args.verbose, curl_verbose=args.curl_verbose)
+                         verbose=args.verbose, curl_verbose=args.curl_verbose,
+                         download_parallel=args.download_parallel)
         score = card.score_proto(proto=protocol,
                                  handshakes=handshakes,
                                  downloads=downloads,