From: Stefan Eissing Date: Fri, 16 Jan 2026 12:59:03 +0000 (+0100) Subject: socketpair: cleaner interface X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6c8956c1cbf5cffcd2fd4571cf277e2eec280578;p=thirdparty%2Fcurl.git socketpair: cleaner interface Declutter the ifdefs in socketpair.h. Introduce Curl_wakeup_*() function that encapsulate the details about how the socketpair is implemented. This moves the EVENTFD specials from the using code into socketpair implemenatation, avoiding duplications in three places. Closes #20340 --- diff --git a/lib/asyn-thrdd.c b/lib/asyn-thrdd.c index b29651040c..51bd098169 100644 --- a/lib/asyn-thrdd.c +++ b/lib/asyn-thrdd.c @@ -133,12 +133,7 @@ static void addr_ctx_unlink(struct async_thrdd_addr_ctx **paddr_ctx, curlx_free(addr_ctx->hostname); if(addr_ctx->res) Curl_freeaddrinfo(addr_ctx->res); -#ifndef CURL_DISABLE_SOCKETPAIR -#ifndef USE_EVENTFD - wakeup_close(addr_ctx->sock_pair[1]); -#endif - wakeup_close(addr_ctx->sock_pair[0]); -#endif + Curl_wakeup_destroy(addr_ctx->sock_pair); curlx_free(addr_ctx); } *paddr_ctx = NULL; @@ -169,7 +164,7 @@ addr_ctx_create(struct Curl_easy *data, #ifndef CURL_DISABLE_SOCKETPAIR /* create socket pair or pipe */ - if(wakeup_create(addr_ctx->sock_pair, FALSE) < 0) { + if(Curl_wakeup_init(addr_ctx->sock_pair, FALSE) < 0) { addr_ctx->sock_pair[0] = CURL_SOCKET_BAD; addr_ctx->sock_pair[1] = CURL_SOCKET_BAD; goto err_exit; @@ -231,15 +226,11 @@ static CURL_THREAD_RETURN_T CURL_STDCALL getaddrinfo_thread(void *arg) Curl_mutex_release(&addr_ctx->mutx); #ifndef CURL_DISABLE_SOCKETPAIR if(!do_abort) { -#ifdef USE_EVENTFD - const uint64_t buf[1] = { 1 }; -#else - const char buf[1] = { 1 }; -#endif /* Thread is done, notify transfer */ - if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) { + int err = Curl_wakeup_signal(addr_ctx->sock_pair); + if(err) { /* update sock_error to errno */ - addr_ctx->sock_error = SOCKERRNO; + addr_ctx->sock_error = err; } } #endif @@ -276,15 +267,10 @@ static CURL_THREAD_RETURN_T CURL_STDCALL gethostbyname_thread(void *arg) Curl_mutex_release(&addr_ctx->mutx); #ifndef CURL_DISABLE_SOCKETPAIR if(!do_abort) { -#ifdef USE_EVENTFD - const uint64_t buf[1] = { 1 }; -#else - const char buf[1] = { 1 }; -#endif - /* Thread is done, notify transfer */ - if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) { + int err = Curl_wakeup_signal(addr_ctx->sock_pair); + if(err) { /* update sock_error to errno */ - addr_ctx->sock_error = SOCKERRNO; + addr_ctx->sock_error = err; } } #endif diff --git a/lib/multi.c b/lib/multi.c index 0c99e1b653..e0e37f8edd 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -294,7 +294,7 @@ struct Curl_multi *Curl_multi_handle(uint32_t xfer_table_size, if(multi->wsa_event == WSA_INVALID_EVENT) goto error; #elif defined(ENABLE_WAKEUP) - if(wakeup_create(multi->wakeup_pair, TRUE) < 0) { + if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) { multi->wakeup_pair[0] = CURL_SOCKET_BAD; multi->wakeup_pair[1] = CURL_SOCKET_BAD; } @@ -1533,20 +1533,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi, #ifdef ENABLE_WAKEUP if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) { - char buf[64]; - ssize_t nread; - while(1) { - /* the reading socket is non-blocking, try to read - data from it until it receives an error (except EINTR). - In normal cases it will get EAGAIN or EWOULDBLOCK - when there is no more data, breaking the loop. */ - nread = wakeup_read(multi->wakeup_pair[0], buf, sizeof(buf)); - if(nread <= 0) { - if(nread < 0 && SOCKEINTR == SOCKERRNO) - continue; - break; - } - } + (void)Curl_wakeup_consume(multi->wakeup_pair, TRUE); /* do not count the wakeup socket into the returned value */ retcode--; } @@ -1623,38 +1610,9 @@ CURLMcode curl_multi_wakeup(CURLM *m) making it safe to access from another thread after the init part and before cleanup */ if(multi->wakeup_pair[1] != CURL_SOCKET_BAD) { - while(1) { -#ifdef USE_EVENTFD - /* eventfd has a stringent rule of requiring the 8-byte buffer when - calling write(2) on it */ - const uint64_t buf[1] = { 1 }; -#else - const char buf[1] = { 1 }; -#endif - /* swrite() is not thread-safe in general, because concurrent calls - can have their messages interleaved, but in this case the content - of the messages does not matter, which makes it ok to call. - - The write socket is set to non-blocking, this way this function - cannot block, making it safe to call even from the same thread - that will call curl_multi_wait(). If swrite() returns that it - would block, it is considered successful because it means that - previous calls to this function will wake up the poll(). */ - if(wakeup_write(multi->wakeup_pair[1], buf, sizeof(buf)) < 0) { - int err = SOCKERRNO; - int return_success; -#ifdef USE_WINSOCK - return_success = SOCKEWOULDBLOCK == err; -#else - if(SOCKEINTR == err) - continue; - return_success = SOCKEWOULDBLOCK == err || EAGAIN == err; -#endif - if(!return_success) - return CURLM_WAKEUP_FAILURE; - } - return CURLM_OK; - } + if(Curl_wakeup_signal(multi->wakeup_pair)) + return CURLM_WAKEUP_FAILURE; + return CURLM_OK; } #endif #endif @@ -2929,10 +2887,7 @@ CURLMcode curl_multi_cleanup(CURLM *m) WSACloseEvent(multi->wsa_event); #else #ifdef ENABLE_WAKEUP - wakeup_close(multi->wakeup_pair[0]); -#ifndef USE_EVENTFD - wakeup_close(multi->wakeup_pair[1]); -#endif + Curl_wakeup_destroy(multi->wakeup_pair); #endif #endif diff --git a/lib/socketpair.c b/lib/socketpair.c index bdbe956233..ec45926237 100644 --- a/lib/socketpair.c +++ b/lib/socketpair.c @@ -28,11 +28,14 @@ #include "rand.h" #include "curlx/nonblock.h" +#ifndef CURL_DISABLE_SOCKETPAIR + +/* choose implementation */ #ifdef USE_EVENTFD #include -int Curl_eventfd(curl_socket_t socks[2], bool nonblocking) +static int wakeup_eventfd(curl_socket_t socks[2], bool nonblocking) { int efd = eventfd(0, nonblocking ? EFD_CLOEXEC | EFD_NONBLOCK : EFD_CLOEXEC); if(efd == -1) { @@ -49,7 +52,7 @@ int Curl_eventfd(curl_socket_t socks[2], bool nonblocking) #include #endif -int Curl_pipe(curl_socket_t socks[2], bool nonblocking) +static int wakeup_pipe(curl_socket_t socks[2], bool nonblocking) { #ifdef HAVE_PIPE2 int flags = nonblocking ? O_NONBLOCK | O_CLOEXEC : O_CLOEXEC; @@ -81,18 +84,24 @@ int Curl_pipe(curl_socket_t socks[2], bool nonblocking) return 0; } -#endif /* USE_EVENTFD */ +#elif defined(HAVE_SOCKETPAIR) /* !USE_EVENTFD && !HAVE_PIPE */ -#ifndef CURL_DISABLE_SOCKETPAIR -#ifdef HAVE_SOCKETPAIR -#ifdef USE_SOCKETPAIR -int Curl_socketpair(int domain, int type, int protocol, - curl_socket_t socks[2], bool nonblocking) +#ifndef USE_UNIX_SOCKETS +#error "unsupported Unix domain and socketpair build combo" +#endif + +static int wakeup_socketpair(curl_socket_t socks[2], bool nonblocking) { + int type = SOCK_STREAM; +#ifdef SOCK_CLOEXEC + type |= SOCK_CLOEXEC; +#endif #ifdef SOCK_NONBLOCK - type = nonblocking ? type | SOCK_NONBLOCK : type; + if(nonblocking) + type |= SOCK_NONBLOCK; #endif - if(CURL_SOCKETPAIR(domain, type, protocol, socks)) + + if(CURL_SOCKETPAIR(AF_UNIX, type, 0, socks)) return -1; #ifndef SOCK_NONBLOCK if(nonblocking) { @@ -106,8 +115,8 @@ int Curl_socketpair(int domain, int type, int protocol, #endif return 0; } -#endif /* USE_SOCKETPAIR */ -#else /* !HAVE_SOCKETPAIR */ + +#else /* !USE_EVENTFD && !HAVE_PIPE && !HAVE_SOCKETPAIR */ #ifdef HAVE_NETDB_H #include @@ -125,8 +134,7 @@ int Curl_socketpair(int domain, int type, int protocol, #include "select.h" /* for Curl_poll */ -int Curl_socketpair(int domain, int type, int protocol, - curl_socket_t socks[2], bool nonblocking) +static int wakeup_inet(curl_socket_t socks[2], bool nonblocking) { union { struct sockaddr_in inaddr; @@ -136,9 +144,6 @@ int Curl_socketpair(int domain, int type, int protocol, curl_socklen_t addrlen = sizeof(a.inaddr); int reuse = 1; struct pollfd pfd[1]; - (void)domain; - (void)type; - (void)protocol; listener = CURL_SOCKET(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(listener == CURL_SOCKET_BAD) @@ -257,5 +262,99 @@ error: sclose(socks[1]); return -1; } + +#endif /* choose implementation */ + +int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking) +{ +#ifdef USE_EVENTFD + return wakeup_eventfd(socks, nonblocking); +#elif defined(HAVE_PIPE) + return wakeup_pipe(socks, nonblocking); +#elif defined(HAVE_SOCKETPAIR) + return wakeup_socketpair(socks, nonblocking); +#else + return wakeup_inet(socks, nonblocking); +#endif +} + +#if defined(USE_EVENTFD) || defined(HAVE_PIPE) + +#define wakeup_write write +#define wakeup_read read +#define wakeup_close close + +#else /* !USE_EVENTFD && !HAVE_PIPE */ + +#define wakeup_write swrite +#define wakeup_read sread +#define wakeup_close sclose + +#endif + +int Curl_wakeup_signal(curl_socket_t socks[2]) +{ + int err = 0; +#ifdef USE_EVENTFD + const uint64_t buf[1] = { 1 }; +#else + const char buf[1] = { 1 }; +#endif + + while(1) { + if(wakeup_write(socks[1], buf, sizeof(buf)) < 0) { + err = SOCKERRNO; +#ifdef USE_WINSOCK + if(err == SOCKEWOULDBLOCK) + err = 0; /* wakeup is already ongoing */ +#else + if(SOCKEINTR == err) + continue; + if((err == SOCKEWOULDBLOCK) || (err == EAGAIN)) + err = 0; /* wakeup is already ongoing */ +#endif + } + break; + } + return err; +} + +CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all) +{ + char buf[64]; + ssize_t rc; + CURLcode result = CURLE_OK; + + do { + rc = wakeup_read(socks[0], buf, sizeof(buf)); + if(!rc) + break; + else if(rc < 0) { +#ifdef USE_WINSOCK + if(SOCKERRNO == SOCKEWOULDBLOCK) + break; +#else + if(SOCKEINTR == SOCKERRNO) + continue; + if((SOCKERRNO == SOCKEWOULDBLOCK) || (SOCKERRNO == EAGAIN)) + break; +#endif + result = CURLE_READ_ERROR; + break; + } + } while(all); + return result; +} + +void Curl_wakeup_destroy(curl_socket_t socks[2]) +{ +#ifndef USE_EVENTFD + if(socks[1] != CURL_SOCKET_BAD) + wakeup_close(socks[1]); #endif + if(socks[0] != CURL_SOCKET_BAD) + wakeup_close(socks[0]); + socks[0] = socks[1] = CURL_SOCKET_BAD; +} + #endif /* !CURL_DISABLE_SOCKETPAIR */ diff --git a/lib/socketpair.h b/lib/socketpair.h index 36685e71a5..2aa173396b 100644 --- a/lib/socketpair.h +++ b/lib/socketpair.h @@ -25,53 +25,19 @@ ***************************************************************************/ #include "curl_setup.h" -#ifdef USE_EVENTFD - -#define wakeup_write write -#define wakeup_read read -#define wakeup_close close -#define wakeup_create(p, nb) Curl_eventfd(p, nb) - -int Curl_eventfd(curl_socket_t socks[2], bool nonblocking); - -#elif defined(HAVE_PIPE) - -#define wakeup_write write -#define wakeup_read read -#define wakeup_close close -#define wakeup_create(p, nb) Curl_pipe(p, nb) +#ifndef CURL_DISABLE_SOCKETPAIR -int Curl_pipe(curl_socket_t socks[2], bool nonblocking); +/* return < 0 for failure to initialise */ +int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking); +void Curl_wakeup_destroy(curl_socket_t socks[2]); -#else /* !USE_EVENTFD && !HAVE_PIPE */ +/* return 0 on success or errno on failure */ +int Curl_wakeup_signal(curl_socket_t socks[2]); -#define wakeup_write swrite -#define wakeup_read sread -#define wakeup_close sclose +CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all); -#if defined(USE_UNIX_SOCKETS) && defined(HAVE_SOCKETPAIR) -#define SOCKETPAIR_FAMILY AF_UNIX -#elif !defined(HAVE_SOCKETPAIR) -#define SOCKETPAIR_FAMILY 0 /* not used */ #else -#error "unsupported Unix domain and socketpair build combo" -#endif - -#ifdef SOCK_CLOEXEC -#define SOCKETPAIR_TYPE (SOCK_STREAM | SOCK_CLOEXEC) -#else -#define SOCKETPAIR_TYPE SOCK_STREAM -#endif - -#define USE_SOCKETPAIR -#define wakeup_create(p, nb) \ - Curl_socketpair(SOCKETPAIR_FAMILY, SOCKETPAIR_TYPE, 0, p, nb) - -#endif /* USE_EVENTFD */ - -#ifndef CURL_DISABLE_SOCKETPAIR -int Curl_socketpair(int domain, int type, int protocol, - curl_socket_t socks[2], bool nonblocking); +#define Curl_wakeup_destroy(x) Curl_nop_stmt #endif #endif /* HEADER_CURL_SOCKETPAIR_H */