From: Oliver Kurth Date: Fri, 15 Sep 2017 18:23:37 +0000 (-0700) Subject: Add new AsyncSocket API to wait on multiple sockets at once X-Git-Tag: stable-10.2.0~231 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e5f98a856b27e39c06b9d9244adf9e4f195f28b8;p=thirdparty%2Fopen-vm-tools.git Add new AsyncSocket API to wait on multiple sockets at once Currently, there is no way to poll/select on multiple sockets through the asyncsocket library. VRDMA is encountering a use case for such functionality, so add such an API to asyncsocket. It is only defined for AsyncTCPSocket, and is mostly just a wrapper around AsyncTCPSocketPoll. --- diff --git a/open-vm-tools/lib/asyncsocket/asyncSocketInterface.c b/open-vm-tools/lib/asyncsocket/asyncSocketInterface.c index e4cd077fd..e0e1becc5 100644 --- a/open-vm-tools/lib/asyncsocket/asyncSocketInterface.c +++ b/open-vm-tools/lib/asyncsocket/asyncSocketInterface.c @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2016 VMware, Inc. All rights reserved. + * Copyright (C) 2016-2017 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -1459,3 +1459,54 @@ AsyncSocket_WaitForConnection(AsyncSocket *asock, // IN } return ret; } + + +/* + *---------------------------------------------------------------------------- + * + * AsyncSocket_WaitForReadMultiple -- + * + * Waits on a list of sockets, returning when a socket becomes + * available for read, or when the allowed time elapses. + * + * Note, if this function is called by two threads with overlapping + * sets of sockets, a deadlock can occur. The caller should guard + * against such scenarios from happening, or making sure that there + * is a consistent ordering to the lists of sockets. + * + * The caller must also make sure synchronous and asynchronous + * operations do not mix, as this function does not hold locks + * for the entirety of the call. + * + * Results: + * ASOCKERR_SUCCESS if one of the sockets is ready for read, + * ASOCKERR_GENERIC on failures, and ASOCKERR_TIMEOUT if nothing + * happened in the allotted time. + * + * Side effects: + * None. + *---------------------------------------------------------------------------- + */ + +int +AsyncSocket_WaitForReadMultiple(AsyncSocket **asock, // IN + int numSock, // IN + int timeoutMS, // IN + int *outIdx) // OUT +{ + int i; + int ret; + if (numSock > 0 && VALID(asock[0], waitForReadMultiple)) { + for (i = 0; i < numSock; i++) { + AsyncSocketLock(asock[i]); + } + ret = VT(asock[0])->waitForReadMultiple(asock, numSock, + timeoutMS, outIdx); + for (i = numSock - 1; i >= 0; i--) { + AsyncSocketUnlock(asock[i]); + } + } else { + ret = ASOCKERR_INVAL; + } + return ret; +} diff --git a/open-vm-tools/lib/asyncsocket/asyncSocketVTable.h b/open-vm-tools/lib/asyncsocket/asyncSocketVTable.h index b33c6dae3..c1216e039 100644 --- a/open-vm-tools/lib/asyncsocket/asyncSocketVTable.h +++ b/open-vm-tools/lib/asyncsocket/asyncSocketVTable.h @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2011,2014-2016 VMware, Inc. All rights reserved. + * Copyright (C) 2011,2014-2017 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -95,7 +95,8 @@ typedef struct AsyncSocketVTable { char *(*getWebSocketCookie)(AsyncSocket *asock); uint16 (*getWebSocketCloseStatus)(AsyncSocket *asock); const char *(*getWebSocketProtocol)(AsyncSocket *asock); - int (*setWebSocketCookie)(AsyncSocket *asock, void *clientData, const char *path, const char *sessionId); + int (*setWebSocketCookie)(AsyncSocket *asock, void *clientData, + const char *path, const char *sessionId); int (*recvBlocking)(AsyncSocket *s, void *buf, int len, int *received, int timeoutMS); int (*recvPartialBlocking)(AsyncSocket *s, void *buf, int len, @@ -103,7 +104,9 @@ typedef struct AsyncSocketVTable { int (*sendBlocking)(AsyncSocket *s, void *buf, int len, int *sent, int timeoutMS); int (*doOneMsg)(AsyncSocket *s, Bool read, int timeoutMS); - int (*waitForConnection)(AsyncSocket *s, int timeoutMS); // IN: + int (*waitForConnection)(AsyncSocket *s, int timeoutMS); + int (*waitForReadMultiple)(AsyncSocket **asock, int numSock, int timeoutMS, + int *outIdx); /* diff --git a/open-vm-tools/lib/asyncsocket/asyncsocket.c b/open-vm-tools/lib/asyncsocket/asyncsocket.c index 7a2a28db6..2dfbbc448 100644 --- a/open-vm-tools/lib/asyncsocket/asyncsocket.c +++ b/open-vm-tools/lib/asyncsocket/asyncsocket.c @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2003-2016 VMware, Inc. All rights reserved. + * Copyright (C) 2003-2017 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -357,6 +357,8 @@ static int AsyncTCPSocketRecvPartialBlocking(AsyncSocket *s, void *buf, int len, static int AsyncTCPSocketSendBlocking(AsyncSocket *s, void *buf, int len, int *sent, int timeoutMS); static int AsyncTCPSocketDoOneMsg(AsyncSocket *s, Bool read, int timeoutMS); +static int AsyncTCPSocketWaitForReadMultiple(AsyncSocket **asock, int numSock, + int timeoutMS, int *outIdx); static const AsyncSocketVTable asyncTCPSocketVTable = { @@ -398,6 +400,7 @@ static const AsyncSocketVTable asyncTCPSocketVTable = { AsyncTCPSocketSendBlocking, AsyncTCPSocketDoOneMsg, AsyncTCPSocketWaitForConnection, + AsyncTCPSocketWaitForReadMultiple, AsyncTCPSocketDestroy }; @@ -2567,15 +2570,13 @@ AsyncTCPSocketRecvPassedFd(AsyncSocket *base, // IN/OUT: socket /* *---------------------------------------------------------------------------- * - * AsyncTCPSocketPoll -- + * AsyncTCPSocketPollWork -- * - * Blocks on the specified socket until there's data pending or a + * Blocks on the specified sockets until there's data pending or a * timeout occurs. * - * If the specified socket is a dual stack listener, we will poll on all - * listening sockets and will return when one is ready with data for a - * connection. If both socket families happen to race with connect data, - * we will favor IPv6 for the return. + * If the asyncsocket is a dual stack listener, parentSock will not be + * NULL, and the asock array will contain the IPv4 and v6 sockets. * * Results: * ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call @@ -2588,46 +2589,32 @@ AsyncTCPSocketRecvPassedFd(AsyncSocket *base, // IN/OUT: socket */ static int -AsyncTCPSocketPoll(AsyncTCPSocket *s, // IN: - Bool read, // IN: - int timeoutMS, // IN: - AsyncTCPSocket **outAsock) // OUT: +AsyncTCPSocketPollWork(AsyncTCPSocket **asock, // IN: + int numSock, // IN: + void *p, // IN: + Bool read, // IN: + int timeoutMS, // IN: + AsyncTCPSocket *parentSock, // IN: + AsyncTCPSocket **outAsock) // OUT: { + AsyncTCPSocket *warnSock = parentSock ? parentSock : asock[0]; #ifndef _WIN32 - struct pollfd p[2]; - int retval; + struct pollfd *pfd = (struct pollfd *)p; #else /* * We use select() to do this on Windows, since there ain't no poll(). * Fortunately, select() doesn't have the 1024 fd value limit. */ - int retval; struct timeval tv; struct fd_set rwfds; struct fd_set exceptfds; #endif - AsyncTCPSocket *asock[2]; - int numSock = 0; int i; + int retval; - ASSERT(*outAsock == NULL); - - if (read && s->fd == -1) { - if (!s->listenAsock4 && !s->listenAsock6) { - TCPSOCKLG0(s, ("%s: Failed to find listener socket.\n", __FUNCTION__)); - return ASOCKERR_GENERIC; - } - - if (s->listenAsock6 && s->listenAsock6->fd != -1) { - asock[numSock++] = s->listenAsock6; - } - if (s->listenAsock4 && s->listenAsock4->fd != -1) { - asock[numSock++] = s->listenAsock4; - } - } else { - asock[numSock++] = s; - } + ASSERT(outAsock != NULL && *outAsock == NULL && asock != NULL && + numSock > 0); for (i = 0; i < numSock; i++) { if (read && SSL_Pending(asock[i]->sslSock)) { @@ -2639,13 +2626,23 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s, // IN: while (1) { #ifndef _WIN32 for (i = 0; i < numSock; i++) { - p[i].fd = asock[i]->fd; - p[i].events = read ? POLLIN : POLLOUT; + pfd[i].fd = asock[i]->fd; + pfd[i].events = read ? POLLIN : POLLOUT; } - AsyncTCPSocketUnlock(s); - retval = poll(p, numSock, timeoutMS); - AsyncTCPSocketLock(s); + if (parentSock != NULL) { + AsyncTCPSocketUnlock(parentSock); + retval = poll(pfd, numSock, timeoutMS); + AsyncTCPSocketLock(parentSock); + } else { + for (i = numSock - 1; i >= 0; i--) { + AsyncTCPSocketUnlock(asock[i]); + } + retval = poll(pfd, numSock, timeoutMS); + for (i = 0; i < numSock; i++) { + AsyncTCPSocketLock(asock[i]); + } + } #else tv.tv_sec = timeoutMS / 1000; tv.tv_usec = (timeoutMS % 1000) * 1000; @@ -2658,20 +2655,64 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s, // IN: FD_SET(asock[i]->fd, &exceptfds); } - AsyncTCPSocketUnlock(s); - retval = select(1, read ? &rwfds : NULL, read ? NULL : &rwfds, - &exceptfds, timeoutMS >= 0 ? &tv : NULL); - AsyncTCPSocketLock(s); + if (parentSock != NULL) { + AsyncTCPSocketLock(parentSock); + retval = select(1, read ? &rwfds : NULL, read ? NULL : &rwfds, + &exceptfds, timeoutMS >= 0 ? &tv : NULL); + AsyncTCPSocketLock(parentSock); + } else { + for (i = numSock - 1; i >= 0; i--) { + AsyncTCPSocketUnlock(asock[i]); + } + retval = select(1, read ? &rwfds : NULL, read ? NULL : &rwfds, + &exceptfds, timeoutMS >= 0 ? &tv : NULL); + for (i = 0; i < numSock; i++) { + AsyncTCPSocketLock(asock[i]); + } + } #endif switch (retval) { - case 1: - case 2: { + case 0: + /* + * No sockets were ready within the specified time. + */ + TCPSOCKLG0(warnSock, ("%s: Timeout waiting for a ready socket.\n", + __FUNCTION__)); + return ASOCKERR_TIMEOUT; + + case -1: { + int sysErr = ASOCK_LASTERROR(); + + if (sysErr == EINTR) { + /* + * We were somehow interrupted by signal. Let's loop and retry. + * XXX: update the timeout by the amount we had previously waited. + */ + + TCPSOCKLG0(warnSock, ("%s: Socket interrupted by a signal.\n", + __FUNCTION__)); + continue; + } + + if (parentSock != NULL) { + parentSock->genericErrno = sysErr; + } else { + for (i = 0; i < numSock; i++) { + asock[i]->genericErrno = sysErr; + } + } + + TCPSOCKLG0(warnSock, ("%s: Failed with error %d: %s\n", __FUNCTION__, + sysErr, Err_Errno2String(sysErr))); + return ASOCKERR_GENERIC; + } + default: { Bool failed = FALSE; #ifndef _WIN32 for (i = 0; i < numSock; i++) { - if (p[i].revents & (POLLERR | POLLNVAL)) { + if (pfd[i].revents & (POLLERR | POLLNVAL)) { failed = TRUE; } } @@ -2717,7 +2758,7 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s, // IN: #ifndef _WIN32 for (i = 0; i < numSock; i++) { - if (p[i].revents & (read ? POLLIN : POLLOUT)) { + if (pfd[i].revents & (read ? POLLIN : POLLOUT)) { *outAsock = asock[i]; return ASOCKERR_SUCCESS; } @@ -2731,41 +2772,123 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s, // IN: } #endif - TCPSOCKWARN(s, ("%s: Failed to return a ready socket.\n", + TCPSOCKWARN(warnSock, ("%s: Failed to return a ready socket.\n", __FUNCTION__)); return ASOCKERR_GENERIC; } - case 0: - /* - * No sockets were ready within the specified time. - */ - TCPSOCKLG0(s, ("%s: Timeout waiting for a ready socket.\n", - __FUNCTION__)); - return ASOCKERR_TIMEOUT; - - case -1: { - int sysErr = ASOCK_LASTERROR(); + } + } +} - if (sysErr == EINTR) { - /* - * We were somehow interrupted by signal. Let's loop and retry. - */ - TCPSOCKLG0(s, ("%s: Socket interrupted by a signal.\n", - __FUNCTION__)); - continue; - } +/* + *---------------------------------------------------------------------------- + * + * AsyncTCPSocketPoll -- + * + * Blocks on the specified socket until there's data pending or a + * timeout occurs. + * + * If the specified socket is a dual stack listener, we will poll on all + * listening sockets and will return when one is ready with data for a + * connection. If both socket families happen to race with connect data, + * we will favor IPv6 for the return. + * + * Results: + * ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call + * failures + * ASOCKERR_TIMEOUT if we just didn't receive enough data. + * + * Side effects: + * None. + *---------------------------------------------------------------------------- + */ - s->genericErrno = sysErr; +static int +AsyncTCPSocketPoll(AsyncTCPSocket *s, // IN: + Bool read, // IN: + int timeoutMS, // IN: + AsyncTCPSocket **outAsock) // OUT: +{ + AsyncTCPSocket *asock[2]; +#ifndef _WIN32 + struct pollfd p[2]; +#else + void *p = NULL; +#endif + int numSock = 0; - TCPSOCKLG0(s, ("%s: Failed with error %d: %s\n", __FUNCTION__, sysErr, - Err_Errno2String(sysErr))); + if (read && s->fd == -1) { + if (!s->listenAsock4 && !s->listenAsock6) { + TCPSOCKLG0(s, ("%s: Failed to find listener socket.\n", __FUNCTION__)); return ASOCKERR_GENERIC; } - default: - NOT_REACHED(); + + if (s->listenAsock6 && s->listenAsock6->fd != -1) { + asock[numSock++] = s->listenAsock6; + } + if (s->listenAsock4 && s->listenAsock4->fd != -1) { + asock[numSock++] = s->listenAsock4; + } + } else { + asock[numSock++] = s; + } + + return AsyncTCPSocketPollWork(asock, numSock, p, read, timeoutMS, s, + outAsock); +} + + +/* + *---------------------------------------------------------------------------- + * + * AsyncTCPSocketWaitForReadMultiple -- + * + * Blocks on the list of sockets until there's data readable or a + * timeout occurs. + * + * Please see the comment in asyncSocketInterface.c for more + * information about using this function. + * + * Results: + * ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call + * failures + * ASOCKERR_TIMEOUT if no sockets were ready with readable data. + * + * Side effects: + * None. + *---------------------------------------------------------------------------- + */ + +static int +AsyncTCPSocketWaitForReadMultiple(AsyncSocket **asock, // IN: + int numSock, // IN: + int timeoutMS, // IN: + int *outIdx) // OUT: +{ + int i; + int err; + AsyncTCPSocket *outAsock = NULL; +#ifndef _WIN32 + struct pollfd *p = Util_SafeCalloc(numSock, sizeof *p); +#else + void *p = NULL; +#endif + + for (i = 0; i < numSock; i++) { + ASSERT(AsyncTCPSocketIsLocked(TCPSocket(asock[i]))); + } + err = AsyncTCPSocketPollWork((AsyncTCPSocket **)asock, numSock, p, TRUE, + timeoutMS, NULL, &outAsock); + for (i = numSock - 1; i >= 0; i--) { + AsyncTCPSocket *tcpAsock = TCPSocket(asock[i]); + if (outAsock == tcpAsock) { + *outIdx = i; } } + + free(p); + return err; } diff --git a/open-vm-tools/lib/include/asyncsocket.h b/open-vm-tools/lib/include/asyncsocket.h index 07b2b9d68..7939b54ad 100644 --- a/open-vm-tools/lib/include/asyncsocket.h +++ b/open-vm-tools/lib/include/asyncsocket.h @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2003-2016 VMware, Inc. All rights reserved. + * Copyright (C) 2003-2017 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -425,6 +425,12 @@ int AsyncSocket_DoOneMsg(AsyncSocket *s, Bool read, int timeoutMS); */ int AsyncSocket_WaitForConnection(AsyncSocket *s, int timeoutMS); +/* + * Waits until a socket is ready with readable data or times out. + */ +int AsyncSocket_WaitForReadMultiple(AsyncSocket **asock, int numSock, + int timeoutMS, int *outIdx); + /* * Send all pending packets onto the wire or give up after timeoutMS msecs. */