/*********************************************************
- * Copyright (C) 2003-2017 VMware, Inc. All rights reserved.
+ * Copyright (C) 2003-2018 VMware, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
#include "util.h"
#include "msg.h"
#include "posix.h"
+#include "vm_basic_asm.h"
#include "vmci_sockets.h"
#ifndef VMX86_TOOLS
#include "vmdblib.h"
* Results:
* ASOCKERR_SUCCESS if it worked, ASOCKERR_GENERIC on system call
* failures
+ * ASOCKERR_BUSY if another thread is in the read callback.
* ASOCKERR_TIMEOUT if nothing happened in the allotted time.
*
* Side effects:
* None.
+ *
*----------------------------------------------------------------------------
*/
}
+/*
+ *----------------------------------------------------------------------------
+ *
+ * AsyncSocket_TCPDrainRecv --
+ *
+ * This function can be used to drain all the messages from a socket
+ * disconnected on the remote end. It spins a socket until the specified
+ * amount of time has elapsed or an error is encountered, with backoff
+ * between read attempts if there is a conflict with another thread. The
+ * recv callback is restored at the end of this only if not all the
+ * messages have been read, the socket is still connected and recv callack
+ * has not been cancelled.
+ *
+ * Results:
+ * ASOCKERR_SUCCESS if all messages are have been read, or if the callback
+ * has canceled the recv, or if the socket is closed
+ * ASOCKERR_GENERIC on system call failures
+ * ASOCKERR_TIMEOUT if there may still be unread messages at the end of
+ * the speccified time.
+ *
+ * Side effects:
+ * None.
+ *
+ *----------------------------------------------------------------------------
+ */
+
+int
+AsyncSocket_TCPDrainRecv(AsyncSocket *base, // IN
+ int timeoutMS) // IN
+{
+ AsyncTCPSocket *s = TCPSocket(base);
+ int retVal;
+ Bool cbRemoved = FALSE;
+ Bool releaseLock = FALSE;
+ unsigned count = 0;
+ VmTimeType startMS = Hostinfo_SystemTimerMS();
+ VmTimeType nowMS;
+
+ ASSERT(AsyncTCPSocketGetState(s) == AsyncSocketConnected);
+ ASSERT(s->recvCb); /* We are supposed to call someone... */
+
+ if (!AsyncTCPSocketIsLocked(s) || !Poll_LockingEnabled()) {
+ AsyncTCPSocketLock(s);
+ releaseLock = TRUE;
+ }
+ AsyncTCPSocketAddRef(s);
+
+ while (TRUE) {
+ AsyncTCPSocket *asock = NULL;
+
+ count++;
+ if (s->inRecvLoop) {
+ /*
+ * The recv loop would read the data if there is any and it is
+ * not safe to proceed and race with the recv loop.
+ */
+ TCPSOCKLG0(s, ("busy: another thread in recv loop\n"));
+ retVal = ASOCKERR_BUSY;
+ /* Add a bit of backoff. */
+ AsyncTCPSocketUnlock(s);
+ Util_Usleep(MIN(100 << (mssb32(count) / 2), timeoutMS));
+ AsyncTCPSocketLock(s);
+ goto retry;
+ }
+
+ if (!cbRemoved) {
+ /*
+ * Cancel the recv callback, but pretend that it is still registered
+ * so we know if the callback cancel recv.
+ */
+ AsyncTCPSocketCancelRecvCb(s);
+ s->recvCb = TRUE;
+ cbRemoved = TRUE;
+ }
+
+ s->inBlockingRecv++;
+ retVal = AsyncTCPSocketPoll(s, TRUE, 0, &asock);
+ if (retVal != ASOCKERR_SUCCESS) {
+ if (retVal == ASOCKERR_GENERIC) {
+ TCPSOCKWARN(s, ("%s: failed to poll on the socket during read.\n",
+ __FUNCTION__));
+ }
+ } else if (AsyncTCPSocketGetState(s) == AsyncSocketConnected) {
+ ASSERT(asock == s);
+ retVal = AsyncTCPSocketFillRecvBuffer(s);
+ }
+ s->inBlockingRecv--;
+
+retry:
+ if (retVal == ASOCKERR_REMOTE_DISCONNECT ||
+ AsyncTCPSocketGetState(s) == AsyncSocketClosed ||
+ !s->recvCb) {
+ /* No more messages to recv. */
+ retVal = ASOCKERR_SUCCESS;
+ break;
+ }
+ if (retVal == ASOCKERR_GENERIC) {
+ break;
+ }
+
+ nowMS = Hostinfo_SystemTimerMS();
+ if (nowMS >= startMS + timeoutMS) {
+ retVal = ASOCKERR_TIMEOUT;
+ break;
+ }
+ timeoutMS -= nowMS - startMS;
+ startMS = nowMS;
+ ASSERT(AsyncTCPSocketGetState(s) == AsyncSocketConnected && s->recvCb);
+ }
+
+ if (cbRemoved) {
+ s->recvCb = FALSE;
+ /*
+ * If AsyncTCPSocketPoll or AsyncTCPSocketFillRecvBuffer fails, do not
+ * add the recv callback as it may never fire.
+ */
+ if (retVal == ASOCKERR_TIMEOUT) {
+ ASSERT(AsyncTCPSocketGetState(s) == AsyncSocketConnected);
+ ASSERT(s->base.refCount > 1); /* We better not be the last user */
+ retVal = AsyncTCPSocketRegisterRecvCb(s);
+ Log("SOCKET reregister recvCb after DrainRecv (ref %d)\n",
+ BaseSocket(s)->refCount);
+ }
+ }
+ if (!s->recvCb) {
+ s->base.recvBuf = NULL;
+ }
+
+ AsyncTCPSocketRelease(s);
+ if (releaseLock) {
+ AsyncTCPSocketUnlock(s);
+ }
+ return retVal;
+}
+
+
/*
*----------------------------------------------------------------------------
*