]> git.ipfire.org Git - thirdparty/open-vm-tools.git/commitdiff
Reduce Vigor transport socket draining overhead
authorOliver Kurth <okurth@vmware.com>
Wed, 7 Feb 2018 00:32:37 +0000 (16:32 -0800)
committerOliver Kurth <okurth@vmware.com>
Wed, 7 Feb 2018 00:32:37 +0000 (16:32 -0800)
A previous change in VigorPollImpl and asyncsocket to fix a memory
leak that causes a performance regression for parallel reconfiguration of
running VMs (the kind requiring FSR).  During an FSR, the connection
between hostd and the source vmx would drop and hostd would then
reconnect to the destination vmx.  One source of regression is that
the buffer has to be drained when a disconnect happens and the draining
logic may potentially unregister and re-register the recv callback
multiple times.  This change adds an asyncsocket function specifically
for draining recv that reduces the manipulation of callback.  It
reduces the regression by up to ~50% (the effect is more pronounced with
more VMs).

open-vm-tools/lib/asyncsocket/asyncsocket.c
open-vm-tools/lib/include/asyncsocket.h

index ede3ff1de11c4b40d327e37397e18e758f7ba0e9..7e8c6e35e42fb5d295ee071058bf8a7ac7c21c74 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2003-2017 VMware, Inc. All rights reserved.
+ * Copyright (C) 2003-2018 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -93,6 +93,7 @@
 #include "util.h"
 #include "msg.h"
 #include "posix.h"
+#include "vm_basic_asm.h"
 #include "vmci_sockets.h"
 #ifndef VMX86_TOOLS
 #include "vmdblib.h"
@@ -3894,10 +3895,12 @@ out:
  * Results:
  *      ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call
  *         failures
+ *      ASOCKERR_BUSY if another thread is in the read callback.
  *      ASOCKERR_TIMEOUT if nothing happened in the allotted time.
  *
  * Side effects:
  *      None.
+ *
  *----------------------------------------------------------------------------
  */
 
@@ -3993,6 +3996,142 @@ AsyncTCPSocketDoOneMsg(AsyncSocket *base, // IN
 }
 
 
+/*
+ *----------------------------------------------------------------------------
+ *
+ * AsyncSocket_TCPDrainRecv --
+ *
+ *      This function can be used to drain all the messages from a socket
+ *      disconnected on the remote end.  It spins a socket until the specified
+ *      amount of time has elapsed or an error is encountered, with backoff
+ *      between read attempts if there is a conflict with another thread.  The
+ *      recv callback is restored at the end of this only if not all the
+ *      messages have been read, the socket is still connected and recv callack
+ *      has not been cancelled.
+ *
+ * Results:
+ *      ASOCKERR_SUCCESS if all messages are have been read, or if the callback
+ *      has canceled the recv, or if the socket is closed
+ *      ASOCKERR_GENERIC on system call failures
+ *      ASOCKERR_TIMEOUT if there may still be unread messages at the end of
+ *      the speccified time.
+ *
+ * Side effects:
+ *      None.
+ *
+ *----------------------------------------------------------------------------
+ */
+
+int
+AsyncSocket_TCPDrainRecv(AsyncSocket *base, // IN
+                         int timeoutMS)     // IN
+{
+   AsyncTCPSocket *s = TCPSocket(base);
+   int retVal;
+   Bool cbRemoved = FALSE;
+   Bool releaseLock = FALSE;
+   unsigned count = 0;
+   VmTimeType startMS = Hostinfo_SystemTimerMS();
+   VmTimeType nowMS;
+
+   ASSERT(AsyncTCPSocketGetState(s) == AsyncSocketConnected);
+   ASSERT(s->recvCb); /* We are supposed to call someone... */
+
+   if (!AsyncTCPSocketIsLocked(s) || !Poll_LockingEnabled()) {
+      AsyncTCPSocketLock(s);
+      releaseLock = TRUE;
+   }
+   AsyncTCPSocketAddRef(s);
+
+   while (TRUE) {
+      AsyncTCPSocket *asock = NULL;
+
+      count++;
+      if (s->inRecvLoop) {
+         /*
+          * The recv loop would read the data if there is any and it is
+          * not safe to proceed and race with the recv loop.
+          */
+         TCPSOCKLG0(s, ("busy: another thread in recv loop\n"));
+         retVal = ASOCKERR_BUSY;
+         /* Add a bit of backoff. */
+         AsyncTCPSocketUnlock(s);
+         Util_Usleep(MIN(100 << (mssb32(count) / 2), timeoutMS));
+         AsyncTCPSocketLock(s);
+         goto retry;
+      }
+
+      if (!cbRemoved) {
+         /*
+          * Cancel the recv callback, but pretend that it is still registered
+          * so we know if the callback cancel recv.
+          */
+         AsyncTCPSocketCancelRecvCb(s);
+         s->recvCb = TRUE;
+         cbRemoved = TRUE;
+      }
+
+      s->inBlockingRecv++;
+      retVal = AsyncTCPSocketPoll(s, TRUE, 0, &asock);
+      if (retVal != ASOCKERR_SUCCESS) {
+         if (retVal == ASOCKERR_GENERIC) {
+            TCPSOCKWARN(s, ("%s: failed to poll on the socket during read.\n",
+                       __FUNCTION__));
+         }
+      } else if (AsyncTCPSocketGetState(s) == AsyncSocketConnected) {
+         ASSERT(asock == s);
+         retVal = AsyncTCPSocketFillRecvBuffer(s);
+      }
+      s->inBlockingRecv--;
+
+retry:
+      if (retVal == ASOCKERR_REMOTE_DISCONNECT ||
+          AsyncTCPSocketGetState(s) == AsyncSocketClosed ||
+          !s->recvCb) {
+         /* No more messages to recv. */
+         retVal = ASOCKERR_SUCCESS;
+         break;
+      }
+      if (retVal == ASOCKERR_GENERIC) {
+         break;
+      }
+
+      nowMS = Hostinfo_SystemTimerMS();
+      if (nowMS >= startMS + timeoutMS) {
+         retVal = ASOCKERR_TIMEOUT;
+         break;
+      }
+      timeoutMS -= nowMS - startMS;
+      startMS = nowMS;
+      ASSERT(AsyncTCPSocketGetState(s) == AsyncSocketConnected && s->recvCb);
+   }
+
+   if (cbRemoved) {
+      s->recvCb = FALSE;
+      /*
+       * If AsyncTCPSocketPoll or AsyncTCPSocketFillRecvBuffer fails, do not
+       * add the recv callback as it may never fire.
+       */
+      if (retVal == ASOCKERR_TIMEOUT) {
+         ASSERT(AsyncTCPSocketGetState(s) == AsyncSocketConnected);
+         ASSERT(s->base.refCount > 1); /* We better not be the last user */
+         retVal = AsyncTCPSocketRegisterRecvCb(s);
+         Log("SOCKET reregister recvCb after DrainRecv (ref %d)\n",
+             BaseSocket(s)->refCount);
+      }
+   }
+   if (!s->recvCb) {
+      s->base.recvBuf = NULL;
+   }
+
+   AsyncTCPSocketRelease(s);
+   if (releaseLock) {
+      AsyncTCPSocketUnlock(s);
+   }
+   return retVal;
+}
+
+
 /*
  *----------------------------------------------------------------------------
  *
index 524147ea9ca259b9f77de9b444f2a9cf1d7c2c22..ec9b45aa3c93574f9165b4d8c5d352efc3f6915e 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2003-2017 VMware, Inc. All rights reserved.
+ * Copyright (C) 2003-2018 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -592,6 +592,11 @@ int AsyncSocket_WaitForReadMultiple(AsyncSocket **asock, int numSock,
  */
 int AsyncSocket_Flush(AsyncSocket *asock, int timeoutMS);
 
+/*
+ * Drain recv for a remotely disconnected TCP socket.
+ */
+int AsyncSocket_TCPDrainRecv(AsyncSocket *base, int timeoutMS);
+
 /*
  * Specify the exact amount of data to receive and the receive function to call.
  */