]> git.ipfire.org Git - thirdparty/open-vm-tools.git/commitdiff
Add new AsyncSocket API to wait on multiple sockets at once
authorOliver Kurth <okurth@vmware.com>
Fri, 15 Sep 2017 18:23:37 +0000 (11:23 -0700)
committerOliver Kurth <okurth@vmware.com>
Fri, 15 Sep 2017 18:23:37 +0000 (11:23 -0700)
Currently, there is no way to poll/select on multiple sockets through
the asyncsocket library. VRDMA is encountering a use case for such
functionality, so add such an API to asyncsocket.  It is only defined
for AsyncTCPSocket, and is mostly just a wrapper around AsyncTCPSocketPoll.

open-vm-tools/lib/asyncsocket/asyncSocketInterface.c
open-vm-tools/lib/asyncsocket/asyncSocketVTable.h
open-vm-tools/lib/asyncsocket/asyncsocket.c
open-vm-tools/lib/include/asyncsocket.h

index e4cd077fdc2bf3dbf464c5e1d5b3527ec0f55c8c..e0e1becc5181241a496a41de0e114f777ccb6cd6 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2016 VMware, Inc. All rights reserved.
+ * Copyright (C) 2016-2017 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -1459,3 +1459,54 @@ AsyncSocket_WaitForConnection(AsyncSocket *asock,         // IN
    }
    return ret;
 }
+
+
+/*
+ *----------------------------------------------------------------------------
+ *
+ * AsyncSocket_WaitForReadMultiple --
+ *
+ *      Waits on a list of sockets, returning when a socket becomes
+ *      available for read, or when the allowed time elapses.
+ *
+ *      Note, if this function is called by two threads with overlapping
+ *      sets of sockets, a deadlock can occur. The caller should guard
+ *      against such scenarios from happening, or making sure that there
+ *      is a consistent ordering to the lists of sockets.
+ *
+ *      The caller must also make sure synchronous and asynchronous
+ *      operations do not mix, as this function does not hold locks
+ *      for the entirety of the call.
+ *
+ * Results:
+ *      ASOCKERR_SUCCESS if one of the sockets is ready for read,
+ *      ASOCKERR_GENERIC on failures, and ASOCKERR_TIMEOUT if nothing
+ *      happened in the allotted time.
+ *
+ * Side effects:
+ *      None.
+ *----------------------------------------------------------------------------
+ */
+
+int
+AsyncSocket_WaitForReadMultiple(AsyncSocket **asock,  // IN
+                                int numSock,          // IN
+                                int timeoutMS,        // IN
+                                int *outIdx)          // OUT
+{
+   int i;
+   int ret;
+   if (numSock > 0 && VALID(asock[0], waitForReadMultiple)) {
+      for (i = 0; i < numSock; i++) {
+         AsyncSocketLock(asock[i]);
+      }
+      ret = VT(asock[0])->waitForReadMultiple(asock, numSock,
+                                              timeoutMS, outIdx);
+      for (i = numSock - 1; i >= 0; i--) {
+         AsyncSocketUnlock(asock[i]);
+      }
+   } else {
+      ret = ASOCKERR_INVAL;
+   }
+   return ret;
+}
index b33c6dae390241b58d7036d311dc63354084c509..c1216e03987c90bbf079f441eead6c5bfb1e3223 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2011,2014-2016 VMware, Inc. All rights reserved.
+ * Copyright (C) 2011,2014-2017 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -95,7 +95,8 @@ typedef struct AsyncSocketVTable {
    char *(*getWebSocketCookie)(AsyncSocket *asock);
    uint16 (*getWebSocketCloseStatus)(AsyncSocket *asock);
    const char *(*getWebSocketProtocol)(AsyncSocket *asock);
-   int (*setWebSocketCookie)(AsyncSocket *asock, void *clientData, const char *path, const char *sessionId);
+   int (*setWebSocketCookie)(AsyncSocket *asock, void *clientData,
+                             const char *path, const char *sessionId);
    int (*recvBlocking)(AsyncSocket *s, void *buf, int len, int *received,
                       int timeoutMS);
    int (*recvPartialBlocking)(AsyncSocket *s, void *buf, int len,
@@ -103,7 +104,9 @@ typedef struct AsyncSocketVTable {
    int (*sendBlocking)(AsyncSocket *s, void *buf, int len, int *sent,
                        int timeoutMS);
    int (*doOneMsg)(AsyncSocket *s, Bool read, int timeoutMS);
-   int (*waitForConnection)(AsyncSocket *s, int timeoutMS);   // IN:
+   int (*waitForConnection)(AsyncSocket *s, int timeoutMS);
+   int (*waitForReadMultiple)(AsyncSocket **asock, int numSock, int timeoutMS,
+                              int *outIdx);
 
 
    /*
index 7a2a28db64cba61637cbc9dd2a3b68c8f087b511..2dfbbc448eb86c0688ed2b6c2943df22d58583c0 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2003-2016 VMware, Inc. All rights reserved.
+ * Copyright (C) 2003-2017 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -357,6 +357,8 @@ static int AsyncTCPSocketRecvPartialBlocking(AsyncSocket *s, void *buf, int len,
 static int AsyncTCPSocketSendBlocking(AsyncSocket *s, void *buf, int len,
                                       int *sent, int timeoutMS);
 static int AsyncTCPSocketDoOneMsg(AsyncSocket *s, Bool read, int timeoutMS);
+static int AsyncTCPSocketWaitForReadMultiple(AsyncSocket **asock, int numSock,
+                                             int timeoutMS, int *outIdx);
 
 
 static const AsyncSocketVTable asyncTCPSocketVTable = {
@@ -398,6 +400,7 @@ static const AsyncSocketVTable asyncTCPSocketVTable = {
    AsyncTCPSocketSendBlocking,
    AsyncTCPSocketDoOneMsg,
    AsyncTCPSocketWaitForConnection,
+   AsyncTCPSocketWaitForReadMultiple,
    AsyncTCPSocketDestroy
 };
 
@@ -2567,15 +2570,13 @@ AsyncTCPSocketRecvPassedFd(AsyncSocket *base,   // IN/OUT: socket
 /*
  *----------------------------------------------------------------------------
  *
- * AsyncTCPSocketPoll --
+ * AsyncTCPSocketPollWork --
  *
- *      Blocks on the specified socket until there's data pending or a
+ *      Blocks on the specified sockets until there's data pending or a
  *      timeout occurs.
  *
- *      If the specified socket is a dual stack listener, we will poll on all
- *      listening sockets and will return when one is ready with data for a
- *      connection. If both socket families happen to race with connect data,
- *      we will favor IPv6 for the return.
+ *      If the asyncsocket is a dual stack listener, parentSock will not be
+ *      NULL, and the asock array will contain the IPv4 and v6 sockets.
  *
  * Results:
  *      ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call
@@ -2588,46 +2589,32 @@ AsyncTCPSocketRecvPassedFd(AsyncSocket *base,   // IN/OUT: socket
  */
 
 static int
-AsyncTCPSocketPoll(AsyncTCPSocket *s,          // IN:
-                   Bool read,                  // IN:
-                   int timeoutMS,              // IN:
-                   AsyncTCPSocket **outAsock)  // OUT:
+AsyncTCPSocketPollWork(AsyncTCPSocket **asock,     // IN:
+                       int numSock,                // IN:
+                       void *p,                    // IN:
+                       Bool read,                  // IN:
+                       int timeoutMS,              // IN:
+                       AsyncTCPSocket *parentSock, // IN:
+                       AsyncTCPSocket **outAsock)  // OUT:
 {
+   AsyncTCPSocket *warnSock = parentSock ? parentSock : asock[0];
 #ifndef _WIN32
-   struct pollfd p[2];
-   int retval;
+   struct pollfd *pfd = (struct pollfd *)p;
 #else
    /*
     * We use select() to do this on Windows, since there ain't no poll().
     * Fortunately, select() doesn't have the 1024 fd value limit.
     */
 
-   int retval;
    struct timeval tv;
    struct fd_set rwfds;
    struct fd_set exceptfds;
 #endif
-   AsyncTCPSocket *asock[2];
-   int numSock = 0;
    int i;
+   int retval;
 
-   ASSERT(*outAsock == NULL);
-
-   if (read && s->fd == -1) {
-      if (!s->listenAsock4 && !s->listenAsock6) {
-         TCPSOCKLG0(s, ("%s: Failed to find listener socket.\n", __FUNCTION__));
-         return ASOCKERR_GENERIC;
-      }
-
-      if (s->listenAsock6 && s->listenAsock6->fd != -1) {
-         asock[numSock++] = s->listenAsock6;
-      }
-      if (s->listenAsock4 && s->listenAsock4->fd != -1) {
-         asock[numSock++] = s->listenAsock4;
-      }
-   } else {
-      asock[numSock++] = s;
-   }
+   ASSERT(outAsock != NULL && *outAsock == NULL && asock != NULL &&
+          numSock > 0);
 
    for (i = 0; i < numSock; i++) {
       if (read && SSL_Pending(asock[i]->sslSock)) {
@@ -2639,13 +2626,23 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s,          // IN:
    while (1) {
 #ifndef _WIN32
       for (i = 0; i < numSock; i++) {
-         p[i].fd = asock[i]->fd;
-         p[i].events = read ? POLLIN : POLLOUT;
+         pfd[i].fd = asock[i]->fd;
+         pfd[i].events = read ? POLLIN : POLLOUT;
       }
 
-      AsyncTCPSocketUnlock(s);
-      retval = poll(p, numSock, timeoutMS);
-      AsyncTCPSocketLock(s);
+      if (parentSock != NULL) {
+         AsyncTCPSocketUnlock(parentSock);
+         retval = poll(pfd, numSock, timeoutMS);
+         AsyncTCPSocketLock(parentSock);
+      } else {
+         for (i = numSock - 1; i >= 0; i--) {
+            AsyncTCPSocketUnlock(asock[i]);
+         }
+         retval = poll(pfd, numSock, timeoutMS);
+         for (i = 0; i < numSock; i++) {
+            AsyncTCPSocketLock(asock[i]);
+         }
+      }
 #else
       tv.tv_sec = timeoutMS / 1000;
       tv.tv_usec = (timeoutMS % 1000) * 1000;
@@ -2658,20 +2655,64 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s,          // IN:
          FD_SET(asock[i]->fd, &exceptfds);
       }
 
-      AsyncTCPSocketUnlock(s);
-      retval = select(1, read ? &rwfds : NULL, read ? NULL : &rwfds,
-                      &exceptfds, timeoutMS >= 0 ? &tv : NULL);
-      AsyncTCPSocketLock(s);
+      if (parentSock != NULL) {
+         AsyncTCPSocketLock(parentSock);
+         retval = select(1, read ? &rwfds : NULL, read ? NULL : &rwfds,
+                         &exceptfds, timeoutMS >= 0 ? &tv : NULL);
+         AsyncTCPSocketLock(parentSock);
+      } else {
+         for (i = numSock - 1; i >= 0; i--) {
+            AsyncTCPSocketUnlock(asock[i]);
+         }
+         retval = select(1, read ? &rwfds : NULL, read ? NULL : &rwfds,
+                         &exceptfds, timeoutMS >= 0 ? &tv : NULL);
+         for (i = 0; i < numSock; i++) {
+            AsyncTCPSocketLock(asock[i]);
+         }
+      }
 #endif
 
       switch (retval) {
-      case 1:
-      case 2: {
+      case 0:
+         /*
+          * No sockets were ready within the specified time.
+          */
+         TCPSOCKLG0(warnSock, ("%s: Timeout waiting for a ready socket.\n",
+                      __FUNCTION__));
+         return ASOCKERR_TIMEOUT;
+
+      case -1: {
+         int sysErr = ASOCK_LASTERROR();
+
+         if (sysErr == EINTR) {
+            /*
+             * We were somehow interrupted by signal. Let's loop and retry.
+             * XXX: update the timeout by the amount we had previously waited.
+             */
+
+            TCPSOCKLG0(warnSock, ("%s: Socket interrupted by a signal.\n",
+                         __FUNCTION__));
+            continue;
+         }
+
+         if (parentSock != NULL) {
+            parentSock->genericErrno = sysErr;
+         } else {
+            for (i = 0; i < numSock; i++) {
+               asock[i]->genericErrno = sysErr;
+            }
+         }
+
+         TCPSOCKLG0(warnSock, ("%s: Failed with error %d: %s\n", __FUNCTION__,
+                   sysErr, Err_Errno2String(sysErr)));
+         return ASOCKERR_GENERIC;
+      }
+      default: {
          Bool failed = FALSE;
 
 #ifndef _WIN32
          for (i = 0; i < numSock; i++) {
-            if (p[i].revents & (POLLERR | POLLNVAL)) {
+            if (pfd[i].revents & (POLLERR | POLLNVAL)) {
                failed = TRUE;
             }
          }
@@ -2717,7 +2758,7 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s,          // IN:
 
 #ifndef _WIN32
          for (i = 0; i < numSock; i++) {
-            if (p[i].revents & (read ? POLLIN : POLLOUT)) {
+            if (pfd[i].revents & (read ? POLLIN : POLLOUT)) {
                *outAsock = asock[i];
                return ASOCKERR_SUCCESS;
             }
@@ -2731,41 +2772,123 @@ AsyncTCPSocketPoll(AsyncTCPSocket *s,          // IN:
          }
 #endif
 
-         TCPSOCKWARN(s, ("%s: Failed to return a ready socket.\n",
+         TCPSOCKWARN(warnSock, ("%s: Failed to return a ready socket.\n",
                          __FUNCTION__));
          return ASOCKERR_GENERIC;
       }
-      case 0:
-         /*
-          * No sockets were ready within the specified time.
-          */
-         TCPSOCKLG0(s, ("%s: Timeout waiting for a ready socket.\n",
-                      __FUNCTION__));
-         return ASOCKERR_TIMEOUT;
-
-      case -1: {
-         int sysErr = ASOCK_LASTERROR();
+      }
+   }
+}
 
-         if (sysErr == EINTR) {
-            /*
-             * We were somehow interrupted by signal. Let's loop and retry.
-             */
 
-            TCPSOCKLG0(s, ("%s: Socket interrupted by a signal.\n",
-                         __FUNCTION__));
-            continue;
-         }
+/*
+ *----------------------------------------------------------------------------
+ *
+ * AsyncTCPSocketPoll --
+ *
+ *      Blocks on the specified socket until there's data pending or a
+ *      timeout occurs.
+ *
+ *      If the specified socket is a dual stack listener, we will poll on all
+ *      listening sockets and will return when one is ready with data for a
+ *      connection. If both socket families happen to race with connect data,
+ *      we will favor IPv6 for the return.
+ *
+ * Results:
+ *      ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call
+ *        failures
+ *      ASOCKERR_TIMEOUT if we just didn't receive enough data.
+ *
+ * Side effects:
+ *      None.
+ *----------------------------------------------------------------------------
+ */
 
-         s->genericErrno = sysErr;
+static int
+AsyncTCPSocketPoll(AsyncTCPSocket *s,          // IN:
+                   Bool read,                  // IN:
+                   int timeoutMS,              // IN:
+                   AsyncTCPSocket **outAsock)  // OUT:
+{
+   AsyncTCPSocket *asock[2];
+#ifndef _WIN32
+   struct pollfd p[2];
+#else
+   void *p = NULL;
+#endif
+   int numSock = 0;
 
-         TCPSOCKLG0(s, ("%s: Failed with error %d: %s\n", __FUNCTION__, sysErr,
-                   Err_Errno2String(sysErr)));
+   if (read && s->fd == -1) {
+      if (!s->listenAsock4 && !s->listenAsock6) {
+         TCPSOCKLG0(s, ("%s: Failed to find listener socket.\n", __FUNCTION__));
          return ASOCKERR_GENERIC;
       }
-      default:
-         NOT_REACHED();
+
+      if (s->listenAsock6 && s->listenAsock6->fd != -1) {
+         asock[numSock++] = s->listenAsock6;
+      }
+      if (s->listenAsock4 && s->listenAsock4->fd != -1) {
+         asock[numSock++] = s->listenAsock4;
+      }
+   } else {
+      asock[numSock++] = s;
+   }
+
+   return AsyncTCPSocketPollWork(asock, numSock, p, read, timeoutMS, s,
+                                 outAsock);
+}
+
+
+/*
+ *----------------------------------------------------------------------------
+ *
+ * AsyncTCPSocketWaitForReadMultiple --
+ *
+ *      Blocks on the list of sockets until there's data readable or a
+ *      timeout occurs.
+ *
+ *      Please see the comment in asyncSocketInterface.c for more
+ *      information about using this function.
+ *
+ * Results:
+ *      ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call
+ *        failures
+ *      ASOCKERR_TIMEOUT if no sockets were ready with readable data.
+ *
+ * Side effects:
+ *      None.
+ *----------------------------------------------------------------------------
+ */
+
+static int
+AsyncTCPSocketWaitForReadMultiple(AsyncSocket **asock,   // IN:
+                                  int numSock,           // IN:
+                                  int timeoutMS,         // IN:
+                                  int *outIdx)           // OUT:
+{
+   int i;
+   int err;
+   AsyncTCPSocket *outAsock  = NULL;
+#ifndef _WIN32
+   struct pollfd *p          = Util_SafeCalloc(numSock, sizeof *p);
+#else
+   void *p                   = NULL;
+#endif
+
+   for (i = 0; i < numSock; i++) {
+      ASSERT(AsyncTCPSocketIsLocked(TCPSocket(asock[i])));
+   }
+   err = AsyncTCPSocketPollWork((AsyncTCPSocket **)asock, numSock, p, TRUE,
+                                timeoutMS, NULL, &outAsock);
+   for (i = numSock - 1; i >= 0; i--) {
+      AsyncTCPSocket *tcpAsock = TCPSocket(asock[i]);
+      if (outAsock == tcpAsock) {
+         *outIdx = i;
       }
    }
+
+   free(p);
+   return err;
 }
 
 
index 07b2b9d6890e25917d5d104cca9df04ec586548c..7939b54adbb381bd779c78cad4021b5422dfa528 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2003-2016 VMware, Inc. All rights reserved.
+ * Copyright (C) 2003-2017 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -425,6 +425,12 @@ int AsyncSocket_DoOneMsg(AsyncSocket *s, Bool read, int timeoutMS);
  */
 int AsyncSocket_WaitForConnection(AsyncSocket *s, int timeoutMS);
 
+/*
+ * Waits until a socket is ready with readable data or times out.
+ */
+int AsyncSocket_WaitForReadMultiple(AsyncSocket **asock, int numSock,
+                                    int timeoutMS, int *outIdx);
+
 /*
  * Send all pending packets onto the wire or give up after timeoutMS msecs.
  */