]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
multi: prepare multi_wait() for future shutdown usage
authorStefan Eissing <stefan@eissing.org>
Thu, 6 Jun 2024 10:40:38 +0000 (12:40 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 10 Jun 2024 11:11:05 +0000 (13:11 +0200)
- new struct curl_pollfds and struct curl_waitfds
- add structs and methods to init/add/cleanup an array of pollfd and
  struct curl_waitfd. Use in multi_wait() and multi_waitfds() to
  populate the sets for polling.
- place USE_WINSOCK WSAEventSelect() setting into a separate loop over
  all collected pfds

Closes #13900

lib/multi.c
lib/select.c
lib/select.h

index 4f6f6d3f770dd9213c1d4b70630de45d9cf8680d..88841c71c7781376a9abe39d1559967bdecf412b 100644 (file)
@@ -1227,12 +1227,8 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi,
                              unsigned int *fd_count)
 {
   struct Curl_easy *data;
-  unsigned int nfds = 0;
+  struct curl_waitfds cwfds;
   struct easy_pollset ps;
-  unsigned int i;
-  CURLMcode result = CURLM_OK;
-  struct curl_waitfd *ufd;
-  unsigned int j;
 
   if(!ufds)
     return CURLM_BAD_FUNCTION_ARGUMENT;
@@ -1243,44 +1239,17 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi,
   if(multi->in_callback)
     return CURLM_RECURSIVE_API_CALL;
 
+  Curl_waitfds_init(&cwfds, ufds, size);
   memset(&ps, 0, sizeof(ps));
   for(data = multi->easyp; data; data = data->next) {
     multi_getsock(data, &ps);
-
-    for(i = 0; i < ps.num; i++) {
-      if(nfds < size) {
-        curl_socket_t fd = ps.sockets[i];
-        int fd_idx = -1;
-
-        /* Simple linear search to skip an already added descriptor */
-        for(j = 0; j < nfds; j++) {
-          if(ufds[j].fd == fd) {
-            fd_idx = (int)j;
-            break;
-          }
-        }
-
-        if(fd_idx < 0) {
-          ufd = &ufds[nfds++];
-          ufd->fd = ps.sockets[i];
-          ufd->events = 0;
-        }
-        else
-          ufd = &ufds[fd_idx];
-
-        if(ps.actions[i] & CURL_POLL_IN)
-          ufd->events |= CURL_WAIT_POLLIN;
-        if(ps.actions[i] & CURL_POLL_OUT)
-          ufd->events |= CURL_WAIT_POLLOUT;
-      }
-      else
-        return CURLM_OUT_OF_MEMORY;
-    }
+    if(Curl_waitfds_add_ps(&cwfds, &ps))
+      return CURLM_OUT_OF_MEMORY;
   }
 
   if(fd_count)
-    *fd_count = nfds;
-  return result;
+    *fd_count = cwfds.n;
+  return CURLM_OK;
 }
 
 #ifdef USE_WINSOCK
@@ -1299,29 +1268,6 @@ static void reset_socket_fdwrite(curl_socket_t s)
 }
 #endif
 
-static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len,
-                               unsigned int inc, bool *is_malloced)
-{
-  struct pollfd *new_fds, *old_fds = *pfds;
-  unsigned int new_len = *pfds_len + inc;
-
-  new_fds = calloc(new_len, sizeof(struct pollfd));
-  if(!new_fds) {
-    if(*is_malloced)
-      free(old_fds);
-    *pfds = NULL;
-    *pfds_len = 0;
-    return CURLM_OUT_OF_MEMORY;
-  }
-  memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd));
-  if(*is_malloced)
-    free(old_fds);
-  *pfds = new_fds;
-  *pfds_len = new_len;
-  *is_malloced = TRUE;
-  return CURLM_OK;
-}
-
 #define NUM_POLLS_ON_STACK 10
 
 static CURLMcode multi_wait(struct Curl_multi *multi,
@@ -1338,10 +1284,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
   long timeout_internal;
   int retcode = 0;
   struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
-  struct pollfd *ufds = &a_few_on_stack[0];
-  unsigned int ufds_len = NUM_POLLS_ON_STACK;
-  unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */
-  bool ufds_malloc = FALSE;
+  struct curl_pollfds cpfds;
+  unsigned int curl_nfds = 0; /* how many pfds are for curl transfers */
 #ifdef USE_WINSOCK
   WSANETWORKEVENTS wsa_events;
   DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
@@ -1359,105 +1303,62 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
   if(timeout_ms < 0)
     return CURLM_BAD_FUNCTION_ARGUMENT;
 
-  memset(ufds, 0, ufds_len * sizeof(struct pollfd));
+  Curl_pollfds_init(&cpfds, a_few_on_stack, NUM_POLLS_ON_STACK);
   memset(&ps, 0, sizeof(ps));
 
   /* Add the curl handles to our pollfds first */
   for(data = multi->easyp; data; data = data->next) {
     multi_getsock(data, &ps);
-
-    for(i = 0; i < ps.num; i++) {
-      short events = 0;
-#ifdef USE_WINSOCK
-      long mask = 0;
-#endif
-      if(ps.actions[i] & CURL_POLL_IN) {
-#ifdef USE_WINSOCK
-        mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
-#endif
-        events |= POLLIN;
-      }
-      if(ps.actions[i] & CURL_POLL_OUT) {
-#ifdef USE_WINSOCK
-        mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
-        reset_socket_fdwrite(ps.sockets[i]);
-#endif
-        events |= POLLOUT;
-      }
-      if(events) {
-        if(nfds && ps.sockets[i] == ufds[nfds-1].fd) {
-          ufds[nfds-1].events |= events;
-        }
-        else {
-          if(nfds >= ufds_len) {
-            if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
-              return CURLM_OUT_OF_MEMORY;
-          }
-          DEBUGASSERT(nfds < ufds_len);
-          ufds[nfds].fd = ps.sockets[i];
-          ufds[nfds].events = events;
-          ++nfds;
-        }
-      }
-#ifdef USE_WINSOCK
-      if(mask) {
-        if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) {
-          if(ufds_malloc)
-            free(ufds);
-          return CURLM_INTERNAL_ERROR;
-        }
-      }
-#endif
+    if(Curl_pollfds_add_ps(&cpfds, &ps)) {
+      Curl_pollfds_cleanup(&cpfds);
+      return CURLM_OUT_OF_MEMORY;
     }
   }
 
-  curl_nfds = nfds; /* what curl internally used in ufds */
-
+  curl_nfds = cpfds.n; /* what curl internally uses in cpfds */
   /* Add external file descriptions from poll-like struct curl_waitfd */
   for(i = 0; i < extra_nfds; i++) {
+    unsigned short events = 0;
+    if(extra_fds[i].events & CURL_WAIT_POLLIN)
+      events |= POLLIN;
+    if(extra_fds[i].events & CURL_WAIT_POLLPRI)
+      events |= POLLPRI;
+    if(extra_fds[i].events & CURL_WAIT_POLLOUT)
+      events |= POLLOUT;
+    if(Curl_pollfds_add_sock(&cpfds, extra_fds[i].fd, events)) {
+      Curl_pollfds_cleanup(&cpfds);
+      return CURLM_OUT_OF_MEMORY;
+    }
+  }
+
 #ifdef USE_WINSOCK
+  /* Set the WSA events based on the collected pollds */
+  for(i = 0; i < cpfds.n; i++) {
     long mask = 0;
-    if(extra_fds[i].events & CURL_WAIT_POLLIN)
+    if(cpfds.pfds[i].events & POLLIN)
       mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
-    if(extra_fds[i].events & CURL_WAIT_POLLPRI)
+    if(cpfds.pfds[i].events & POLLPRI)
       mask |= FD_OOB;
-    if(extra_fds[i].events & CURL_WAIT_POLLOUT) {
+    if(cpfds.pfds[i].events & POLLOUT) {
       mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
-      reset_socket_fdwrite(extra_fds[i].fd);
-    }
-    if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, mask) != 0) {
-      if(ufds_malloc)
-        free(ufds);
-      return CURLM_INTERNAL_ERROR;
+      reset_socket_fdwrite(cpfds.pfds[i].fd);
     }
-#endif
-    if(nfds >= ufds_len) {
-      if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
-        return CURLM_OUT_OF_MEMORY;
+    if(mask) {
+      if(WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, mask) != 0) {
+        Curl_pollfds_cleanup(&cpfds);
+        return CURLM_INTERNAL_ERROR;
+      }
     }
-    DEBUGASSERT(nfds < ufds_len);
-    ufds[nfds].fd = extra_fds[i].fd;
-    ufds[nfds].events = 0;
-    if(extra_fds[i].events & CURL_WAIT_POLLIN)
-      ufds[nfds].events |= POLLIN;
-    if(extra_fds[i].events & CURL_WAIT_POLLPRI)
-      ufds[nfds].events |= POLLPRI;
-    if(extra_fds[i].events & CURL_WAIT_POLLOUT)
-      ufds[nfds].events |= POLLOUT;
-    ++nfds;
   }
+#endif
 
 #ifdef ENABLE_WAKEUP
 #ifndef USE_WINSOCK
   if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
-    if(nfds >= ufds_len) {
-      if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
-        return CURLM_OUT_OF_MEMORY;
+    if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) {
+      Curl_pollfds_cleanup(&cpfds);
+      return CURLM_OUT_OF_MEMORY;
     }
-    DEBUGASSERT(nfds < ufds_len);
-    ufds[nfds].fd = multi->wakeup_pair[0];
-    ufds[nfds].events = POLLIN;
-    ++nfds;
   }
 #endif
 #endif
@@ -1471,21 +1372,23 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
     timeout_ms = (int)timeout_internal;
 
 #if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK)
-  if(nfds || use_wakeup) {
+  if(cpfds.n || use_wakeup) {
 #else
-  if(nfds) {
+  if(cpfds.n) {
 #endif
     int pollrc;
 #ifdef USE_WINSOCK
-    if(nfds)
-      pollrc = Curl_poll(ufds, nfds, 0); /* just pre-check with WinSock */
+    if(cpfds.n)         /* just pre-check with WinSock */
+      pollrc = Curl_poll(cpfds.pfds, cpfds.n, 0);
     else
       pollrc = 0;
 #else
-    pollrc = Curl_poll(ufds, nfds, timeout_ms); /* wait... */
+    pollrc = Curl_poll(cpfds.pfds, cpfds.n, timeout_ms); /* wait... */
 #endif
-    if(pollrc < 0)
+    if(pollrc < 0) {
+      Curl_pollfds_cleanup(&cpfds);
       return CURLM_UNRECOVERABLE_POLL;
+    }
 
     if(pollrc > 0) {
       retcode = pollrc;
@@ -1503,7 +1406,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
          struct, the bit values of the actual underlying poll() implementation
          may not be the same as the ones in the public libcurl API! */
       for(i = 0; i < extra_nfds; i++) {
-        unsigned r = (unsigned)ufds[curl_nfds + i].revents;
+        unsigned r = (unsigned)cpfds.pfds[curl_nfds + i].revents;
         unsigned short mask = 0;
 #ifdef USE_WINSOCK
         curl_socket_t s = extra_fds[i].fd;
@@ -1557,7 +1460,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
 #else
 #ifdef ENABLE_WAKEUP
       if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
-        if(ufds[curl_nfds + extra_nfds].revents & POLLIN) {
+        if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) {
           char buf[64];
           ssize_t nread;
           while(1) {
@@ -1581,14 +1484,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
     }
   }
 
-  if(ufds_malloc)
-    free(ufds);
   if(ret)
     *ret = retcode;
 #if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK)
-  if(extrawait && !nfds && !use_wakeup) {
+  if(extrawait && !cpfds.n && !use_wakeup) {
 #else
-  if(extrawait && !nfds) {
+  if(extrawait && !cpfds.n) {
 #endif
     long sleep_ms = 0;
 
@@ -1604,6 +1505,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
     }
   }
 
+  Curl_pollfds_cleanup(&cpfds);
   return CURLM_OK;
 }
 
index 1a5ab8572c44e651fc0f6e427107a051db217a72..d5127d74323ff41c197e60f5242a804c1dc78d78 100644 (file)
 #include "select.h"
 #include "timediff.h"
 #include "warnless.h"
+/* The last 3 #include files should be in this order */
+#include "curl_printf.h"
+#include "curl_memory.h"
+#include "memdebug.h"
 
 /*
  * Internal function used for waiting a specific amount of ms
@@ -401,3 +405,147 @@ int Curl_poll(struct pollfd ufds[], unsigned int nfds, timediff_t timeout_ms)
 
   return r;
 }
+
+void Curl_pollfds_init(struct curl_pollfds *cpfds,
+                       struct pollfd *static_pfds,
+                       unsigned int static_count)
+{
+  DEBUGASSERT(cpfds);
+  memset(cpfds, 0, sizeof(*cpfds));
+  if(static_pfds && static_count) {
+    cpfds->pfds = static_pfds;
+    cpfds->count = static_count;
+  }
+}
+
+void Curl_pollfds_cleanup(struct curl_pollfds *cpfds)
+{
+  DEBUGASSERT(cpfds);
+  if(cpfds->allocated_pfds) {
+    free(cpfds->pfds);
+  }
+  memset(cpfds, 0, sizeof(*cpfds));
+}
+
+static CURLcode cpfds_increase(struct curl_pollfds *cpfds, unsigned int inc)
+{
+  struct pollfd *new_fds;
+  unsigned int new_count = cpfds->count + inc;
+
+  new_fds = calloc(new_count, sizeof(struct pollfd));
+  if(!new_fds)
+    return CURLE_OUT_OF_MEMORY;
+
+  memcpy(new_fds, cpfds->pfds, cpfds->count * sizeof(struct pollfd));
+  if(cpfds->allocated_pfds)
+    free(cpfds->pfds);
+  cpfds->pfds = new_fds;
+  cpfds->count = new_count;
+  cpfds->allocated_pfds = TRUE;
+  return CURLE_OK;
+}
+
+static CURLcode cpfds_add_sock(struct curl_pollfds *cpfds,
+                               curl_socket_t sock, short events, bool fold)
+{
+  int i;
+
+  if(fold && cpfds->n <= INT_MAX) {
+    for(i = (int)cpfds->n - 1; i >= 0; --i) {
+      if(sock == cpfds->pfds[i].fd) {
+        cpfds->pfds[i].events |= events;
+        return CURLE_OK;
+      }
+    }
+  }
+  /* not folded, add new entry */
+  if(cpfds->n >= cpfds->count) {
+    if(cpfds_increase(cpfds, 100))
+      return CURLE_OUT_OF_MEMORY;
+  }
+  cpfds->pfds[cpfds->n].fd = sock;
+  cpfds->pfds[cpfds->n].events = events;
+  ++cpfds->n;
+  return CURLE_OK;
+}
+
+CURLcode Curl_pollfds_add_sock(struct curl_pollfds *cpfds,
+                               curl_socket_t sock, short events)
+{
+  return cpfds_add_sock(cpfds, sock, events, FALSE);
+}
+
+CURLcode Curl_pollfds_add_ps(struct curl_pollfds *cpfds,
+                             struct easy_pollset *ps)
+{
+  size_t i;
+
+  DEBUGASSERT(cpfds);
+  DEBUGASSERT(ps);
+  for(i = 0; i < ps->num; i++) {
+    short events = 0;
+    if(ps->actions[i] & CURL_POLL_IN)
+      events |= POLLIN;
+    if(ps->actions[i] & CURL_POLL_OUT)
+      events |= POLLOUT;
+    if(events) {
+      if(cpfds_add_sock(cpfds, ps->sockets[i], events, TRUE))
+        return CURLE_OUT_OF_MEMORY;
+    }
+  }
+  return CURLE_OK;
+}
+
+void Curl_waitfds_init(struct curl_waitfds *cwfds,
+                       struct curl_waitfd *static_wfds,
+                       unsigned int static_count)
+{
+  DEBUGASSERT(cwfds);
+  DEBUGASSERT(static_wfds);
+  memset(cwfds, 0, sizeof(*cwfds));
+  cwfds->wfds = static_wfds;
+  cwfds->count = static_count;
+}
+
+static CURLcode cwfds_add_sock(struct curl_waitfds *cwfds,
+                               curl_socket_t sock, short events)
+{
+  int i;
+
+  if(cwfds->n <= INT_MAX) {
+    for(i = (int)cwfds->n - 1; i >= 0; --i) {
+      if(sock == cwfds->wfds[i].fd) {
+        cwfds->wfds[i].events |= events;
+        return CURLE_OK;
+      }
+    }
+  }
+  /* not folded, add new entry */
+  if(cwfds->n >= cwfds->count)
+    return CURLE_OUT_OF_MEMORY;
+  cwfds->wfds[cwfds->n].fd = sock;
+  cwfds->wfds[cwfds->n].events = events;
+  ++cwfds->n;
+  return CURLE_OK;
+}
+
+CURLcode Curl_waitfds_add_ps(struct curl_waitfds *cwfds,
+                             struct easy_pollset *ps)
+{
+  size_t i;
+
+  DEBUGASSERT(cwfds);
+  DEBUGASSERT(ps);
+  for(i = 0; i < ps->num; i++) {
+    short events = 0;
+    if(ps->actions[i] & CURL_POLL_IN)
+      events |= CURL_WAIT_POLLIN;
+    if(ps->actions[i] & CURL_POLL_OUT)
+      events |= CURL_WAIT_POLLOUT;
+    if(events) {
+      if(cwfds_add_sock(cwfds, ps->sockets[i], events))
+        return CURLE_OUT_OF_MEMORY;
+    }
+  }
+  return CURLE_OK;
+}
index 5b1ca23eb1b2507b92eb13303c42a76870494886..f01acbdefc21d3dd935dca5863356e4b29f76e82 100644 (file)
@@ -111,4 +111,37 @@ int Curl_wait_ms(timediff_t timeout_ms);
   } while(0)
 #endif
 
+struct curl_pollfds {
+  struct pollfd *pfds;
+  unsigned int n;
+  unsigned int count;
+  BIT(allocated_pfds);
+};
+
+void Curl_pollfds_init(struct curl_pollfds *cpfds,
+                       struct pollfd *static_pfds,
+                       unsigned int static_count);
+
+void Curl_pollfds_cleanup(struct curl_pollfds *cpfds);
+
+CURLcode Curl_pollfds_add_ps(struct curl_pollfds *cpfds,
+                             struct easy_pollset *ps);
+
+CURLcode Curl_pollfds_add_sock(struct curl_pollfds *cpfds,
+                               curl_socket_t sock, short events);
+
+struct curl_waitfds {
+  struct curl_waitfd *wfds;
+  unsigned int n;
+  unsigned int count;
+};
+
+void Curl_waitfds_init(struct curl_waitfds *cwfds,
+                       struct curl_waitfd *static_wfds,
+                       unsigned int static_count);
+
+CURLcode Curl_waitfds_add_ps(struct curl_waitfds *cwfds,
+                             struct easy_pollset *ps);
+
+
 #endif /* HEADER_CURL_SELECT_H */