curlx_free(addr_ctx->hostname);
if(addr_ctx->res)
Curl_freeaddrinfo(addr_ctx->res);
-#ifndef CURL_DISABLE_SOCKETPAIR
-#ifndef USE_EVENTFD
- wakeup_close(addr_ctx->sock_pair[1]);
-#endif
- wakeup_close(addr_ctx->sock_pair[0]);
-#endif
+ Curl_wakeup_destroy(addr_ctx->sock_pair);
curlx_free(addr_ctx);
}
*paddr_ctx = NULL;
#ifndef CURL_DISABLE_SOCKETPAIR
/* create socket pair or pipe */
- if(wakeup_create(addr_ctx->sock_pair, FALSE) < 0) {
+ if(Curl_wakeup_init(addr_ctx->sock_pair, FALSE) < 0) {
addr_ctx->sock_pair[0] = CURL_SOCKET_BAD;
addr_ctx->sock_pair[1] = CURL_SOCKET_BAD;
goto err_exit;
Curl_mutex_release(&addr_ctx->mutx);
#ifndef CURL_DISABLE_SOCKETPAIR
if(!do_abort) {
-#ifdef USE_EVENTFD
- const uint64_t buf[1] = { 1 };
-#else
- const char buf[1] = { 1 };
-#endif
/* Thread is done, notify transfer */
- if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) {
+ int err = Curl_wakeup_signal(addr_ctx->sock_pair);
+ if(err) {
/* update sock_error to errno */
- addr_ctx->sock_error = SOCKERRNO;
+ addr_ctx->sock_error = err;
}
}
#endif
Curl_mutex_release(&addr_ctx->mutx);
#ifndef CURL_DISABLE_SOCKETPAIR
if(!do_abort) {
-#ifdef USE_EVENTFD
- const uint64_t buf[1] = { 1 };
-#else
- const char buf[1] = { 1 };
-#endif
- /* Thread is done, notify transfer */
- if(wakeup_write(addr_ctx->sock_pair[1], buf, sizeof(buf)) < 0) {
+ int err = Curl_wakeup_signal(addr_ctx->sock_pair);
+ if(err) {
/* update sock_error to errno */
- addr_ctx->sock_error = SOCKERRNO;
+ addr_ctx->sock_error = err;
}
}
#endif
if(multi->wsa_event == WSA_INVALID_EVENT)
goto error;
#elif defined(ENABLE_WAKEUP)
- if(wakeup_create(multi->wakeup_pair, TRUE) < 0) {
+ if(Curl_wakeup_init(multi->wakeup_pair, TRUE) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
}
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) {
- char buf[64];
- ssize_t nread;
- while(1) {
- /* the reading socket is non-blocking, try to read
- data from it until it receives an error (except EINTR).
- In normal cases it will get EAGAIN or EWOULDBLOCK
- when there is no more data, breaking the loop. */
- nread = wakeup_read(multi->wakeup_pair[0], buf, sizeof(buf));
- if(nread <= 0) {
- if(nread < 0 && SOCKEINTR == SOCKERRNO)
- continue;
- break;
- }
- }
+ (void)Curl_wakeup_consume(multi->wakeup_pair, TRUE);
/* do not count the wakeup socket into the returned value */
retcode--;
}
making it safe to access from another thread after the init part
and before cleanup */
if(multi->wakeup_pair[1] != CURL_SOCKET_BAD) {
- while(1) {
-#ifdef USE_EVENTFD
- /* eventfd has a stringent rule of requiring the 8-byte buffer when
- calling write(2) on it */
- const uint64_t buf[1] = { 1 };
-#else
- const char buf[1] = { 1 };
-#endif
- /* swrite() is not thread-safe in general, because concurrent calls
- can have their messages interleaved, but in this case the content
- of the messages does not matter, which makes it ok to call.
-
- The write socket is set to non-blocking, this way this function
- cannot block, making it safe to call even from the same thread
- that will call curl_multi_wait(). If swrite() returns that it
- would block, it is considered successful because it means that
- previous calls to this function will wake up the poll(). */
- if(wakeup_write(multi->wakeup_pair[1], buf, sizeof(buf)) < 0) {
- int err = SOCKERRNO;
- int return_success;
-#ifdef USE_WINSOCK
- return_success = SOCKEWOULDBLOCK == err;
-#else
- if(SOCKEINTR == err)
- continue;
- return_success = SOCKEWOULDBLOCK == err || EAGAIN == err;
-#endif
- if(!return_success)
- return CURLM_WAKEUP_FAILURE;
- }
- return CURLM_OK;
- }
+ if(Curl_wakeup_signal(multi->wakeup_pair))
+ return CURLM_WAKEUP_FAILURE;
+ return CURLM_OK;
}
#endif
#endif
WSACloseEvent(multi->wsa_event);
#else
#ifdef ENABLE_WAKEUP
- wakeup_close(multi->wakeup_pair[0]);
-#ifndef USE_EVENTFD
- wakeup_close(multi->wakeup_pair[1]);
-#endif
+ Curl_wakeup_destroy(multi->wakeup_pair);
#endif
#endif
#include "rand.h"
#include "curlx/nonblock.h"
+#ifndef CURL_DISABLE_SOCKETPAIR
+
+/* choose implementation */
#ifdef USE_EVENTFD
#include <sys/eventfd.h>
-int Curl_eventfd(curl_socket_t socks[2], bool nonblocking)
+static int wakeup_eventfd(curl_socket_t socks[2], bool nonblocking)
{
int efd = eventfd(0, nonblocking ? EFD_CLOEXEC | EFD_NONBLOCK : EFD_CLOEXEC);
if(efd == -1) {
#include <fcntl.h>
#endif
-int Curl_pipe(curl_socket_t socks[2], bool nonblocking)
+static int wakeup_pipe(curl_socket_t socks[2], bool nonblocking)
{
#ifdef HAVE_PIPE2
int flags = nonblocking ? O_NONBLOCK | O_CLOEXEC : O_CLOEXEC;
return 0;
}
-#endif /* USE_EVENTFD */
+#elif defined(HAVE_SOCKETPAIR) /* !USE_EVENTFD && !HAVE_PIPE */
-#ifndef CURL_DISABLE_SOCKETPAIR
-#ifdef HAVE_SOCKETPAIR
-#ifdef USE_SOCKETPAIR
-int Curl_socketpair(int domain, int type, int protocol,
- curl_socket_t socks[2], bool nonblocking)
+#ifndef USE_UNIX_SOCKETS
+#error "unsupported Unix domain and socketpair build combo"
+#endif
+
+static int wakeup_socketpair(curl_socket_t socks[2], bool nonblocking)
{
+ int type = SOCK_STREAM;
+#ifdef SOCK_CLOEXEC
+ type |= SOCK_CLOEXEC;
+#endif
#ifdef SOCK_NONBLOCK
- type = nonblocking ? type | SOCK_NONBLOCK : type;
+ if(nonblocking)
+ type |= SOCK_NONBLOCK;
#endif
- if(CURL_SOCKETPAIR(domain, type, protocol, socks))
+
+ if(CURL_SOCKETPAIR(AF_UNIX, type, 0, socks))
return -1;
#ifndef SOCK_NONBLOCK
if(nonblocking) {
#endif
return 0;
}
-#endif /* USE_SOCKETPAIR */
-#else /* !HAVE_SOCKETPAIR */
+
+#else /* !USE_EVENTFD && !HAVE_PIPE && !HAVE_SOCKETPAIR */
#ifdef HAVE_NETDB_H
#include <netdb.h>
#include "select.h" /* for Curl_poll */
-int Curl_socketpair(int domain, int type, int protocol,
- curl_socket_t socks[2], bool nonblocking)
+static int wakeup_inet(curl_socket_t socks[2], bool nonblocking)
{
union {
struct sockaddr_in inaddr;
curl_socklen_t addrlen = sizeof(a.inaddr);
int reuse = 1;
struct pollfd pfd[1];
- (void)domain;
- (void)type;
- (void)protocol;
listener = CURL_SOCKET(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(listener == CURL_SOCKET_BAD)
sclose(socks[1]);
return -1;
}
+
+#endif /* choose implementation */
+
+int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking)
+{
+#ifdef USE_EVENTFD
+ return wakeup_eventfd(socks, nonblocking);
+#elif defined(HAVE_PIPE)
+ return wakeup_pipe(socks, nonblocking);
+#elif defined(HAVE_SOCKETPAIR)
+ return wakeup_socketpair(socks, nonblocking);
+#else
+ return wakeup_inet(socks, nonblocking);
+#endif
+}
+
+#if defined(USE_EVENTFD) || defined(HAVE_PIPE)
+
+#define wakeup_write write
+#define wakeup_read read
+#define wakeup_close close
+
+#else /* !USE_EVENTFD && !HAVE_PIPE */
+
+#define wakeup_write swrite
+#define wakeup_read sread
+#define wakeup_close sclose
+
+#endif
+
+int Curl_wakeup_signal(curl_socket_t socks[2])
+{
+ int err = 0;
+#ifdef USE_EVENTFD
+ const uint64_t buf[1] = { 1 };
+#else
+ const char buf[1] = { 1 };
+#endif
+
+ while(1) {
+ if(wakeup_write(socks[1], buf, sizeof(buf)) < 0) {
+ err = SOCKERRNO;
+#ifdef USE_WINSOCK
+ if(err == SOCKEWOULDBLOCK)
+ err = 0; /* wakeup is already ongoing */
+#else
+ if(SOCKEINTR == err)
+ continue;
+ if((err == SOCKEWOULDBLOCK) || (err == EAGAIN))
+ err = 0; /* wakeup is already ongoing */
+#endif
+ }
+ break;
+ }
+ return err;
+}
+
+CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all)
+{
+ char buf[64];
+ ssize_t rc;
+ CURLcode result = CURLE_OK;
+
+ do {
+ rc = wakeup_read(socks[0], buf, sizeof(buf));
+ if(!rc)
+ break;
+ else if(rc < 0) {
+#ifdef USE_WINSOCK
+ if(SOCKERRNO == SOCKEWOULDBLOCK)
+ break;
+#else
+ if(SOCKEINTR == SOCKERRNO)
+ continue;
+ if((SOCKERRNO == SOCKEWOULDBLOCK) || (SOCKERRNO == EAGAIN))
+ break;
+#endif
+ result = CURLE_READ_ERROR;
+ break;
+ }
+ } while(all);
+ return result;
+}
+
+void Curl_wakeup_destroy(curl_socket_t socks[2])
+{
+#ifndef USE_EVENTFD
+ if(socks[1] != CURL_SOCKET_BAD)
+ wakeup_close(socks[1]);
#endif
+ if(socks[0] != CURL_SOCKET_BAD)
+ wakeup_close(socks[0]);
+ socks[0] = socks[1] = CURL_SOCKET_BAD;
+}
+
#endif /* !CURL_DISABLE_SOCKETPAIR */
***************************************************************************/
#include "curl_setup.h"
-#ifdef USE_EVENTFD
-
-#define wakeup_write write
-#define wakeup_read read
-#define wakeup_close close
-#define wakeup_create(p, nb) Curl_eventfd(p, nb)
-
-int Curl_eventfd(curl_socket_t socks[2], bool nonblocking);
-
-#elif defined(HAVE_PIPE)
-
-#define wakeup_write write
-#define wakeup_read read
-#define wakeup_close close
-#define wakeup_create(p, nb) Curl_pipe(p, nb)
+#ifndef CURL_DISABLE_SOCKETPAIR
-int Curl_pipe(curl_socket_t socks[2], bool nonblocking);
+/* return < 0 for failure to initialise */
+int Curl_wakeup_init(curl_socket_t socks[2], bool nonblocking);
+void Curl_wakeup_destroy(curl_socket_t socks[2]);
-#else /* !USE_EVENTFD && !HAVE_PIPE */
+/* return 0 on success or errno on failure */
+int Curl_wakeup_signal(curl_socket_t socks[2]);
-#define wakeup_write swrite
-#define wakeup_read sread
-#define wakeup_close sclose
+CURLcode Curl_wakeup_consume(curl_socket_t socks[2], bool all);
-#if defined(USE_UNIX_SOCKETS) && defined(HAVE_SOCKETPAIR)
-#define SOCKETPAIR_FAMILY AF_UNIX
-#elif !defined(HAVE_SOCKETPAIR)
-#define SOCKETPAIR_FAMILY 0 /* not used */
#else
-#error "unsupported Unix domain and socketpair build combo"
-#endif
-
-#ifdef SOCK_CLOEXEC
-#define SOCKETPAIR_TYPE (SOCK_STREAM | SOCK_CLOEXEC)
-#else
-#define SOCKETPAIR_TYPE SOCK_STREAM
-#endif
-
-#define USE_SOCKETPAIR
-#define wakeup_create(p, nb) \
- Curl_socketpair(SOCKETPAIR_FAMILY, SOCKETPAIR_TYPE, 0, p, nb)
-
-#endif /* USE_EVENTFD */
-
-#ifndef CURL_DISABLE_SOCKETPAIR
-int Curl_socketpair(int domain, int type, int protocol,
- curl_socket_t socks[2], bool nonblocking);
+#define Curl_wakeup_destroy(x) Curl_nop_stmt
#endif
#endif /* HEADER_CURL_SOCKETPAIR_H */