]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
socketpair: cleaner interface
authorStefan Eissing <stefan@eissing.org>
Fri, 16 Jan 2026 12:59:03 +0000 (13:59 +0100)
committerDaniel Stenberg <daniel@haxx.se>
Fri, 16 Jan 2026 15:43:43 +0000 (16:43 +0100)
Declutter the ifdefs in socketpair.h. Introduce Curl_wakeup_*()
function that encapsulate the details about how the socketpair
is implemented.

This moves the EVENTFD specials from the using code into socketpair
implemenatation, avoiding duplications in three places.

Closes #20340

lib/asyn-thrdd.c
lib/multi.c
lib/socketpair.c
lib/socketpair.h

index b29651040cd8e551fc2d6997e5fa0af6c8af8535..51bd098169ea1151d43ea3786c149ea1b120a333 100644 (file)
@@ -133,12 +133,7 @@ static void addr_ctx_unlink(struct async_thrdd_addr_ctx **paddr_ctx,
     curlx_free(addr_ctx->hostname);
     if(addr_ctx->res)
       Curl_freeaddrinfo(addr_ctx->res);
-#ifndef CURL_DISABLE_SOCKETPAIR
-#ifndef USE_EVENTFD
-    wakeup_close(addr_ctx->sock_pair[1]);
-#endif
-    wakeup_close(addr_ctx->sock_pair[0]);
-#endif
+    Curl_wakeup_destroy(addr_ctx->sock_pair);
     curlx_free(addr_ctx);
   }
   *paddr_ctx = NULL;
@@ -169,7 +164,7 @@ addr_ctx_create(struct Curl_easy *data,
 
 #ifndef CURL_DISABLE_SOCKETPAIR
   /* create socket pair or pipe */
-  if(wakeup_create(addr_ctx->sock_pair, FALSE) < 0) {
+  if(Curl_wakeup_init(addr_ctx->sock_pair, FALSE) < 0) {
     addr_ctx->sock_pair[0] = CURL_SOCKET_BAD;
     addr_ctx->sock_pair[1] = CURL_SOCKET_BAD;
     goto err_exit;
@@ -231,15 +226,11 @@ static CURL_THREAD_RETURN_T CURL_STDCALL getaddrinfo_thread(void *arg)
     Curl_mutex_release(&addr_ctx->mutx);
 #ifndef CURL_DISABLE_SOCKETPAIR
     if(!do_abort) {
-#ifdef USE_EVENTFD
-      const uint64_t buf[1] = { 1 };
-#else
-      const char buf[1] = { 1 };
-#endif
       /* Thread is done, notify transfer */
-      if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) {
+      int err = Curl_wakeup_signal(addr_ctx->sock_pair);
+      if(err) {
         /* update sock_error to errno */
-        addr_ctx->sock_error = SOCKERRNO;
+        addr_ctx->sock_error = err;
       }
     }
 #endif
@@ -276,15 +267,10 @@ static CURL_THREAD_RETURN_T CURL_STDCALL gethostbyname_thread(void *arg)
     Curl_mutex_release(&addr_ctx->mutx);
 #ifndef CURL_DISABLE_SOCKETPAIR
     if(!do_abort) {
-#ifdef USE_EVENTFD
-      const uint64_t buf[1] = { 1 };
-#else
-      const char buf[1] = { 1 };
-#endif
-      /* Thread is done, notify transfer */
-      if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) {
+      int err = Curl_wakeup_signal(addr_ctx->sock_pair);
+      if(err) {
         /* update sock_error to errno */
-        addr_ctx->sock_error = SOCKERRNO;
+        addr_ctx->sock_error = err;
       }
     }
 #endif
index 0c99e1b653a98e963dd216f6d3dd65aec7535ffb..e0e37f8edd543ea4b2983de6f3dad3b5c5d849bf 100644 (file)
@@ -294,7 +294,7 @@ struct Curl_multi *Curl_multi_handle(uint32_t xfer_table_size,
   if(multi->wsa_event == WSA_INVALID_EVENT)
     goto error;
 #elif defined(ENABLE_WAKEUP)
-  if(wakeup_create(multi->wakeup_pair, TRUE) < 0) {
+  if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) {
     multi->wakeup_pair[0] = CURL_SOCKET_BAD;
     multi->wakeup_pair[1] = CURL_SOCKET_BAD;
   }
@@ -1533,20 +1533,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
 #ifdef ENABLE_WAKEUP
       if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
         if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) {
-          char buf[64];
-          ssize_t nread;
-          while(1) {
-            /* the reading socket is non-blocking, try to read
-               data from it until it receives an error (except EINTR).
-               In normal cases it will get EAGAIN or EWOULDBLOCK
-               when there is no more data, breaking the loop. */
-            nread = wakeup_read(multi->wakeup_pair[0], buf, sizeof(buf));
-            if(nread <= 0) {
-              if(nread < 0 && SOCKEINTR == SOCKERRNO)
-                continue;
-              break;
-            }
-          }
+          (void)Curl_wakeup_consume(multi->wakeup_pair, TRUE);
           /* do not count the wakeup socket into the returned value */
           retcode--;
         }
@@ -1623,38 +1610,9 @@ CURLMcode curl_multi_wakeup(CURLM *m)
      making it safe to access from another thread after the init part
      and before cleanup */
   if(multi->wakeup_pair[1] != CURL_SOCKET_BAD) {
-    while(1) {
-#ifdef USE_EVENTFD
-      /* eventfd has a stringent rule of requiring the 8-byte buffer when
-         calling write(2) on it */
-      const uint64_t buf[1] = { 1 };
-#else
-      const char buf[1] = { 1 };
-#endif
-      /* swrite() is not thread-safe in general, because concurrent calls
-         can have their messages interleaved, but in this case the content
-         of the messages does not matter, which makes it ok to call.
-
-         The write socket is set to non-blocking, this way this function
-         cannot block, making it safe to call even from the same thread
-         that will call curl_multi_wait(). If swrite() returns that it
-         would block, it is considered successful because it means that
-         previous calls to this function will wake up the poll(). */
-      if(wakeup_write(multi->wakeup_pair[1], buf, sizeof(buf)) < 0) {
-        int err = SOCKERRNO;
-        int return_success;
-#ifdef USE_WINSOCK
-        return_success = SOCKEWOULDBLOCK == err;
-#else
-        if(SOCKEINTR == err)
-          continue;
-        return_success = SOCKEWOULDBLOCK == err || EAGAIN == err;
-#endif
-        if(!return_success)
-          return CURLM_WAKEUP_FAILURE;
-      }
-      return CURLM_OK;
-    }
+    if(Curl_wakeup_signal(multi->wakeup_pair))
+      return CURLM_WAKEUP_FAILURE;
+    return CURLM_OK;
   }
 #endif
 #endif
@@ -2929,10 +2887,7 @@ CURLMcode curl_multi_cleanup(CURLM *m)
     WSACloseEvent(multi->wsa_event);
 #else
 #ifdef ENABLE_WAKEUP
-    wakeup_close(multi->wakeup_pair[0]);
-#ifndef USE_EVENTFD
-    wakeup_close(multi->wakeup_pair[1]);
-#endif
+  Curl_wakeup_destroy(multi->wakeup_pair);
 #endif
 #endif
 
index bdbe956233529588bc5c64556be16dea5b705379..ec45926237e8611f334caf0f4c801f29335e89c4 100644 (file)
 #include "rand.h"
 #include "curlx/nonblock.h"
 
+#ifndef CURL_DISABLE_SOCKETPAIR
+
+/* choose implementation */
 #ifdef USE_EVENTFD
 
 #include <sys/eventfd.h>
 
-int Curl_eventfd(curl_socket_t socks[2], bool nonblocking)
+static int wakeup_eventfd(curl_socket_t socks[2], bool nonblocking)
 {
   int efd = eventfd(0, nonblocking ? EFD_CLOEXEC | EFD_NONBLOCK : EFD_CLOEXEC);
   if(efd == -1) {
@@ -49,7 +52,7 @@ int Curl_eventfd(curl_socket_t socks[2], bool nonblocking)
 #include <fcntl.h>
 #endif
 
-int Curl_pipe(curl_socket_t socks[2], bool nonblocking)
+static int wakeup_pipe(curl_socket_t socks[2], bool nonblocking)
 {
 #ifdef HAVE_PIPE2
   int flags = nonblocking ? O_NONBLOCK | O_CLOEXEC : O_CLOEXEC;
@@ -81,18 +84,24 @@ int Curl_pipe(curl_socket_t socks[2], bool nonblocking)
   return 0;
 }
 
-#endif /* USE_EVENTFD */
+#elif defined(HAVE_SOCKETPAIR)  /* !USE_EVENTFD && !HAVE_PIPE */
 
-#ifndef CURL_DISABLE_SOCKETPAIR
-#ifdef HAVE_SOCKETPAIR
-#ifdef USE_SOCKETPAIR
-int Curl_socketpair(int domain, int type, int protocol,
-                    curl_socket_t socks[2], bool nonblocking)
+#ifndef USE_UNIX_SOCKETS
+#error "unsupported Unix domain and socketpair build combo"
+#endif
+
+static int wakeup_socketpair(curl_socket_t socks[2], bool nonblocking)
 {
+  int type = SOCK_STREAM;
+#ifdef SOCK_CLOEXEC
+  type |= SOCK_CLOEXEC;
+#endif
 #ifdef SOCK_NONBLOCK
-  type = nonblocking ? type | SOCK_NONBLOCK : type;
+  if(nonblocking)
+    type |= SOCK_NONBLOCK;
 #endif
-  if(CURL_SOCKETPAIR(domain, type, protocol, socks))
+
+  if(CURL_SOCKETPAIR(AF_UNIX, type, 0, socks))
     return -1;
 #ifndef SOCK_NONBLOCK
   if(nonblocking) {
@@ -106,8 +115,8 @@ int Curl_socketpair(int domain, int type, int protocol,
 #endif
   return 0;
 }
-#endif /* USE_SOCKETPAIR */
-#else /* !HAVE_SOCKETPAIR */
+
+#else /* !USE_EVENTFD && !HAVE_PIPE && !HAVE_SOCKETPAIR */
 
 #ifdef HAVE_NETDB_H
 #include <netdb.h>
@@ -125,8 +134,7 @@ int Curl_socketpair(int domain, int type, int protocol,
 
 #include "select.h"   /* for Curl_poll */
 
-int Curl_socketpair(int domain, int type, int protocol,
-                    curl_socket_t socks[2], bool nonblocking)
+static int wakeup_inet(curl_socket_t socks[2], bool nonblocking)
 {
   union {
     struct sockaddr_in inaddr;
@@ -136,9 +144,6 @@ int Curl_socketpair(int domain, int type, int protocol,
   curl_socklen_t addrlen = sizeof(a.inaddr);
   int reuse = 1;
   struct pollfd pfd[1];
-  (void)domain;
-  (void)type;
-  (void)protocol;
 
   listener = CURL_SOCKET(AF_INET, SOCK_STREAM, IPPROTO_TCP);
   if(listener == CURL_SOCKET_BAD)
@@ -257,5 +262,99 @@ error:
   sclose(socks[1]);
   return -1;
 }
+
+#endif /* choose implementation */
+
+int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking)
+{
+#ifdef USE_EVENTFD
+  return wakeup_eventfd(socks, nonblocking);
+#elif defined(HAVE_PIPE)
+  return wakeup_pipe(socks, nonblocking);
+#elif defined(HAVE_SOCKETPAIR)
+  return wakeup_socketpair(socks, nonblocking);
+#else
+  return wakeup_inet(socks, nonblocking);
+#endif
+}
+
+#if defined(USE_EVENTFD) || defined(HAVE_PIPE)
+
+#define wakeup_write        write
+#define wakeup_read         read
+#define wakeup_close        close
+
+#else /* !USE_EVENTFD && !HAVE_PIPE */
+
+#define wakeup_write        swrite
+#define wakeup_read         sread
+#define wakeup_close        sclose
+
+#endif
+
+int Curl_wakeup_signal(curl_socket_t socks[2])
+{
+  int err = 0;
+#ifdef USE_EVENTFD
+  const uint64_t buf[1] = { 1 };
+#else
+  const char buf[1] = { 1 };
+#endif
+
+  while(1) {
+    if(wakeup_write(socks[1], buf, sizeof(buf)) < 0) {
+      err = SOCKERRNO;
+#ifdef USE_WINSOCK
+      if(err == SOCKEWOULDBLOCK)
+        err = 0; /* wakeup is already ongoing */
+#else
+      if(SOCKEINTR == err)
+        continue;
+      if((err == SOCKEWOULDBLOCK) || (err == EAGAIN))
+        err = 0; /* wakeup is already ongoing */
+#endif
+    }
+    break;
+  }
+  return err;
+}
+
+CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all)
+{
+  char buf[64];
+  ssize_t rc;
+  CURLcode result = CURLE_OK;
+
+  do {
+    rc = wakeup_read(socks[0], buf, sizeof(buf));
+    if(!rc)
+      break;
+    else if(rc < 0) {
+#ifdef USE_WINSOCK
+      if(SOCKERRNO == SOCKEWOULDBLOCK)
+        break;
+#else
+      if(SOCKEINTR == SOCKERRNO)
+        continue;
+      if((SOCKERRNO == SOCKEWOULDBLOCK) || (SOCKERRNO == EAGAIN))
+        break;
+#endif
+      result = CURLE_READ_ERROR;
+      break;
+    }
+  } while(all);
+  return result;
+}
+
+void Curl_wakeup_destroy(curl_socket_t socks[2])
+{
+#ifndef USE_EVENTFD
+  if(socks[1] != CURL_SOCKET_BAD)
+    wakeup_close(socks[1]);
 #endif
+  if(socks[0] != CURL_SOCKET_BAD)
+    wakeup_close(socks[0]);
+  socks[0] = socks[1] = CURL_SOCKET_BAD;
+}
+
 #endif /* !CURL_DISABLE_SOCKETPAIR */
index 36685e71a5f49b8bca628fb8975ef2603d31c725..2aa173396bdaebab6231077c8d4717f2c80fcb11 100644 (file)
  ***************************************************************************/
 #include "curl_setup.h"
 
-#ifdef USE_EVENTFD
-
-#define wakeup_write         write
-#define wakeup_read          read
-#define wakeup_close         close
-#define wakeup_create(p, nb) Curl_eventfd(p, nb)
-
-int Curl_eventfd(curl_socket_t socks[2], bool nonblocking);
-
-#elif defined(HAVE_PIPE)
-
-#define wakeup_write         write
-#define wakeup_read          read
-#define wakeup_close         close
-#define wakeup_create(p, nb) Curl_pipe(p, nb)
+#ifndef CURL_DISABLE_SOCKETPAIR
 
-int Curl_pipe(curl_socket_t socks[2], bool nonblocking);
+/* return < 0 for failure to initialise */
+int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking);
+void Curl_wakeup_destroy(curl_socket_t socks[2]);
 
-#else /* !USE_EVENTFD && !HAVE_PIPE */
+/* return 0 on success or errno on failure */
+int Curl_wakeup_signal(curl_socket_t socks[2]);
 
-#define wakeup_write     swrite
-#define wakeup_read      sread
-#define wakeup_close     sclose
+CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all);
 
-#if defined(USE_UNIX_SOCKETS) && defined(HAVE_SOCKETPAIR)
-#define SOCKETPAIR_FAMILY AF_UNIX
-#elif !defined(HAVE_SOCKETPAIR)
-#define SOCKETPAIR_FAMILY 0 /* not used */
 #else
-#error "unsupported Unix domain and socketpair build combo"
-#endif
-
-#ifdef SOCK_CLOEXEC
-#define SOCKETPAIR_TYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define SOCKETPAIR_TYPE SOCK_STREAM
-#endif
-
-#define USE_SOCKETPAIR
-#define wakeup_create(p, nb)                                    \
-  Curl_socketpair(SOCKETPAIR_FAMILY, SOCKETPAIR_TYPE, 0, p, nb)
-
-#endif /* USE_EVENTFD */
-
-#ifndef CURL_DISABLE_SOCKETPAIR
-int Curl_socketpair(int domain, int type, int protocol,
-                    curl_socket_t socks[2], bool nonblocking);
+#define Curl_wakeup_destroy(x)  Curl_nop_stmt
 #endif
 
 #endif /* HEADER_CURL_SOCKETPAIR_H */