/*********************************************************
- * Copyright (C) 2020 VMware, Inc. All rights reserved.
+ * Copyright (C) 2020-2021 VMware, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
#include <unistd.h>
#endif
+#include "vm_basic_defs.h"
#include "vm_assert.h"
#include "vm_atomic.h"
-#include "vm_basic_types.h"
#define G_LOG_DOMAIN "gdp"
#include "vmware/tools/log.h"
-#include "vmware/tools/plugin.h"
#include "vmware/tools/utils.h"
+#include "vmware/tools/plugin.h"
+#include "vmware/tools/threadPool.h"
#include "vmware/tools/gdp.h"
#include "vmci_defs.h"
#include "vmci_sockets.h"
#include "vmcheck.h"
+#include "base64.h"
+#include "util.h"
+#include "str.h"
+#include "jsmn.h"
+#include "conf.h"
#include "vm_version.h"
#include "embed_version.h"
#if defined(_WIN32)
-# define GetSockErr() WSAGetLastError()
+# define GetSysErr() WSAGetLastError()
-# define SYSERR_EADDRINUSE WSAEADDRINUSE
-# define SYSERR_EHOSTUNREACH WSAEHOSTUNREACH
-# define SYSERR_EINTR WSAEINTR
-# define SYSERR_EMSGSIZE WSAEMSGSIZE
-# define SYSERR_WOULDBLOCK(e) (e == WSAEWOULDBLOCK)
+# define SYSERR_EADDRINUSE WSAEADDRINUSE
+# define SYSERR_EHOSTUNREACH WSAEHOSTUNREACH
+# define SYSERR_EINTR WSAEINTR
+# define SYSERR_EMSGSIZE WSAEMSGSIZE
+# define SYSERR_WOULDBLOCK(e) (e == WSAEWOULDBLOCK)
typedef int socklen_t;
# define CloseSocket closesocket
-# define SOCK_READ FD_READ
-# define SOCK_WRITE FD_WRITE
+ typedef WSAEVENT GdpEvent;
+# define GDP_INVALID_EVENT WSA_INVALID_EVENT
#else
-# define GetSockErr() errno
+# define GetSysErr() errno
-# define SYSERR_EADDRINUSE EADDRINUSE
-# define SYSERR_EHOSTUNREACH EHOSTUNREACH
-# define SYSERR_EINTR EINTR
-# define SYSERR_EMSGSIZE EMSGSIZE
-# define SYSERR_WOULDBLOCK(e) (e == EAGAIN || e == EWOULDBLOCK)
+# define SYSERR_EADDRINUSE EADDRINUSE
+# define SYSERR_EHOSTUNREACH EHOSTUNREACH
+# define SYSERR_EINTR EINTR
+# define SYSERR_EMSGSIZE EMSGSIZE
+# define SYSERR_WOULDBLOCK(e) (e == EAGAIN || e == EWOULDBLOCK)
typedef int SOCKET;
-# define SOCKET_ERROR (-1)
-# define INVALID_SOCKET ((SOCKET) -1)
-# define CloseSocket close
+# define SOCKET_ERROR (-1)
+# define INVALID_SOCKET ((SOCKET) -1)
+# define CloseSocket close
-# define SOCK_READ POLLIN
-# define SOCK_WRITE POLLOUT
+ typedef int GdpEvent;
+# define GDP_INVALID_EVENT (-1)
#endif
-#define PRIVILEGED_PORT_MAX 1023
-#define PRIVILEGED_PORT_MIN 1
+#define GDP_TIMEOUT_AT_INFINITE (-1)
+#define GDP_WAIT_INFINITE (-1)
+#define GDP_SEND_AFTER_ANY_TIME (-1)
+
+#define USEC_PER_SECOND (G_GINT64_CONSTANT(1000000))
+#define USEC_PER_MILLISECOND (G_GINT64_CONSTANT(1000))
+
+/*
+ * Macros to read values from tools config
+ */
+#define GDP_CONFIG_GET_BOOL(key, defVal) \
+ VMTools_ConfigGetBoolean(gPluginState.ctx->config, \
+ CONFGROUPNAME_GDP, key, defVal)
+
+#define GDP_CONFIG_GET_INT(key, defVal) \
+ VMTools_ConfigGetInteger(gPluginState.ctx->config, \
+ CONFGROUPNAME_GDP, key, defVal)
+
+#define GDPD_RECV_PORT 7777
+#define GDP_RECV_PORT 766
-#define GDPD_LISTEN_PORT 7777
+/*
+ * The minimum rate limit is 1 pps, corresponding wait interval is 1 second.
+ */
+#define GDP_WAIT_RESULT_TIMEOUT 1500 // ms
+
+#define GDP_PACKET_JSON_LINE_SUBSCRIBERS \
+ " \"subscribers\":[%s],\n"
+
+#define GDP_PACKET_JSON \
+ "{\n" \
+ " \"header\": {\n" \
+ " \"sequence\":%" G_GUINT64_FORMAT ",\n" \
+ "%s" \
+ " \"createTime\":\"%s\",\n" \
+ " \"topic\":\"%s\",\n" \
+ " \"token\":\"%s\"\n" \
+ " },\n" \
+ " \"payload\":{\n" \
+ " \"category\":\"%s\",\n" \
+ " \"base64\":\"%s\"\n" \
+ " }\n" \
+ "}"
+
+#define GDP_RESULT_SEQUENCE "sequence" // Required, <uint64> e.g. 12345
+#define GDP_RESULT_STATUS "status" // Required, "ok", or "bad" for
+ // malformed and rejected packet
+#define GDP_RESULT_DIAGNOSIS "diagnosis" // Optional
+#define GDP_RESULT_RATE_LIMIT "rateLimit" // Required, <int>
+ // e.g. 2 packets per second
+
+#define GDP_RESULT_STATUS_OK "ok"
+#define GDP_RESULT_STATUS_BAD "bad"
+
+#define GDP_RESULT_REQUIRED_KEYS 3
+
+#define GDP_HISTORY_REQUEST_PAST_SECONDS "pastSeconds" // <uint64>
+#define GDP_HISTORY_REQUEST_ID "id" // <uint64>
+ // subscription ID
+
+#define GDP_HISTORY_REQUEST_REQUIRED_KEYS 2
-#define GDP_SEND_TIMEOUT 1000 // ms
-#define GDP_RECV_TIMEOUT 3000 // ms
+/*
+ * Historical guest data cache size limit
+ */
+#define GDP_MAX_CACHE_SIZE_LIMIT (1 << 24) // 16M
+#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 22) // 4M
+#define GDP_MIN_CACHE_SIZE_LIMIT (1 << 20) // 1M
+#define GDP_STR_SIZE(s) ((guint32) (s != NULL ? (strlen(s) + 1) : 0))
/*
* GdpError message table.
#undef GDP_ERR_ITEM
-typedef struct PluginData {
- ToolsAppCtx *ctx; /* The application context */
+typedef struct PluginState {
+ ToolsAppCtx *ctx; /* Tools application context */
+
+ Atomic_Bool started; /* TRUE : Guest data publishing is started
+ FALSE: otherwise
+ Transitions from FALSE to TRUE only */
+
#if defined(_WIN32)
Bool wsaStarted; /* TRUE : WSAStartup succeeded, WSACleanup required
FALSE: otherwise */
- WSAEVENT eventSendRecv; /* The send-recv event object:
- Event object to associate with
- network send/recv ready event */
- WSAEVENT eventStop; /* The stop event object:
- Event object signalled to stop guest data
- publishing for vmtoolsd shutdown */
-#else
- int eventStop; /* The stop event fd:
- Event fd signalled to stop guest data
- publishing for vmtoolsd shutdown */
#endif
int vmciFd; /* vSocket address family value fd */
int vmciFamily; /* vSocket address family value */
SOCKET sock; /* Datagram socket for publishing guest data */
+#if defined(_WIN32)
+ GdpEvent eventNetwork; /* The network event object:
+ To be associated with network
+ send/recv ready event */
+#endif
+
+ GdpEvent eventStop; /* The stop event object:
+ Signalled to stop guest data publishing */
Atomic_Bool stopped; /* TRUE : Guest data publishing is stopped
- for vmtoolsd shutdown
- FALSE: otherwise */
-} PluginData;
+ FALSE: otherwise
+ Transitions from FALSE to TRUE only */
-static PluginData pluginData;
+ GdpEvent eventConfig; /* The config event object:
+ Signalled to update config */
+} PluginState;
+static PluginState gPluginState;
-static Bool GdpInit(ToolsAppCtx *ctx);
-static void GdpDestroy(void);
-static void GdpSetStopEvent(void);
-static Bool GdpCreateSocket(void);
-static void GdpCloseSocket(void);
-static GdpError GdpEmptyRecvQueue(void);
-static GdpError GdpWaitForEvent(int netEvent, int timeout);
-static GdpError GdpSend(const char *buf, int len, int timeout);
-static GdpError GdpRecv(char *buf, int *len, int timeout);
+typedef struct PublishState {
+ GMutex mutex; /* To sync incoming publish calls */
-/*
- ******************************************************************************
- * GdpInit --
- *
- * Initializes internal plugin data.
- *
- * @param[in] ctx The application context
- *
- * @return TRUE on success.
- * @return FALSE otherwise.
- *
- ******************************************************************************
- */
+ /*
+ * Data passed from the active incoming publish thread to
+ * the gdp task thread.
+ */
+ gint64 createTime; /* Real wall-clock time,
+ in microseconds since January 1, 1970 UTC. */
+ const gchar *topic;
+ const gchar *token;
+ const gchar *category;
+ const gchar *data;
+ guint32 dataLen;
+
+ /*
+ * The publish event object:
+ * The active incoming publish thread signals this event object
+ * to notify the gdp task thread to publish new data.
+ */
+ GdpEvent eventPublish;
+
+ /*
+ * The get-result event object:
+ * The gdp task thread signals this event object to notify
+ * the active incoming publish thread to get publish result.
+ */
+ GdpEvent eventGetResult;
+
+ GdpError gdpErr; /* The publish result */
+
+} PublishState;
+
+static PublishState gPublishState;
+
+
+typedef enum GdpTaskEvent {
+ GDP_TASK_EVENT_NONE, /* No event */
+ GDP_TASK_EVENT_STOP, /* Stop event */
+ GDP_TASK_EVENT_CONFIG, /* Config event */
+ GDP_TASK_EVENT_NETWORK, /* Network event */
+ GDP_TASK_EVENT_PUBLISH, /* Publish event */
+ GDP_TASK_EVENT_TIMEOUT, /* Wait timed out */
+} GdpTaskEvent;
+
+typedef enum GdpTaskMode {
+ GDP_TASK_MODE_NONE, /* Not publishing
+ valid with GDP_TASK_STATE_IDLE only */
+ GDP_TASK_MODE_PUBLISH, /* Publishing new data */
+ GDP_TASK_MODE_HISTORY, /* Publishing historical data */
+} GdpTaskMode;
+
+typedef enum GdpTaskState {
+ GDP_TASK_STATE_IDLE, /* Not started
+ valid with GDP_TASK_MODE_NONE only */
+ GDP_TASK_STATE_WAIT_TO_SEND, /* Wait to send JSON packet */
+ GDP_TASK_STATE_WAIT_FOR_RESULT1, /* Wait for publish result from daemon */
+ GDP_TASK_STATE_WAIT_FOR_RESULT2, /* Wait for publish result after re-send */
+} GdpTaskState;
+
+typedef struct PublishResult {
+ guint64 sequence; /* Result for the packet with this sequence number */
+ Bool statusOk; /* TRUE: ok, FALSE: bad */
+ gchar *diagnosis; /* Diagnosis message if statusOk is FALSE */
+ gint32 rateLimit; /* VMCI peer rate limit */
+} PublishResult;
+
+typedef struct HistoryRequest {
+ gint64 beginCacheTime; /* Begin cacheTime */
+ gint64 endCacheTime; /* End cacheTime */
+ gchar id[24]; /* Incremental uint64 subscription ID in string
+ format.
+ 2^64 - 1: 18446744073709551615 (20 digits) */
+} HistoryRequest;
+
+typedef struct HistoryCacheItem {
+ gint64 createTime; /* Guest data - begin */
+ gchar *topic;
+ gchar *token;
+ gchar *category;
+ gchar *data;
+ guint32 dataLen; /* Guest data - end */
+ gint64 cacheTime; /* Monotonic time point when item is cached */
+ guint32 itemSize; /* Item size in bytes */
+} HistoryCacheItem;
+
+typedef struct HistoryCache {
+ GQueue queue; /* Container for HistoryCacheItem */
+ guint32 sizeLimit; /* Cache size limit */
+ guint32 size; /* Current cache size */
+ GList *currentLink; /* Pointer to the history cache queue link
+ currently being published */
+} HistoryCache;
+
+typedef struct TaskContext {
+ /*
+ * Legal mode & state combination
+ *
+ * GDP_TASK_MODE_NONE : GDP_TASK_STATE_IDLE
+ * (Idle state will never time out)
+ *
+ * GDP_TASK_MODE_PUBLISH: GDP_TASK_STATE_WAIT_TO_SEND
+ * GDP_TASK_STATE_WAIT_FOR_RESULT1
+ * GDP_TASK_STATE_WAIT_FOR_RESULT2
+ * (All the above wait states will time out)
+ *
+ * GDP_TASK_MODE_HISTORY: GDP_TASK_STATE_WAIT_TO_SEND
+ * GDP_TASK_STATE_WAIT_FOR_RESULT1
+ * GDP_TASK_STATE_WAIT_FOR_RESULT2
+ * (All the above wait states will time out)
+ */
+ GdpTaskMode mode;
+ GdpTaskState state;
+
+ /*
+ * Publish event object signalled while mode is GDP_TASK_MODE_HISTORY.
+ */
+ Bool publishPending;
+ /*
+ * History request can be received at any time,
+ * non-empty requests queue means history request pending.
+ */
+
+ HistoryCache cache; /* Container for history cache items */
+ GQueue requests; /* Container for HistoryRequest */
+
+ guint64 sequence; /* Sequence number */
+ gchar *packet; /* Formatted JSON packet */
+ guint32 packetLen; /* JSON packet length */
+
+ gint64 timeoutAt; /* Time out at this monotonic time point,
+ in microseconds. */
+ gint64 sendAfter; /* Send to daemon after this monotonic time point,
+ in microseconds. */
+} TaskContext;
+
+
+static inline GdpEvent
+GdpCreateEvent(void);
+
+static inline void
+GdpCloseEvent(GdpEvent *event); // IN/OUT
+
+static inline void
+GdpSetEvent(GdpEvent event); // IN
+
+static inline void
+GdpResetEvent(GdpEvent event); // IN
+
+static GdpError
+GdpWaitForEvent(GdpEvent event, // IN
+ int timeout); // IN
static Bool
-GdpInit(ToolsAppCtx *ctx) // IN
-{
- Bool retVal = FALSE;
+GdpCreateSocket(void);
- pluginData.ctx = ctx;
-#if defined(_WIN32)
- pluginData.wsaStarted = FALSE;
- pluginData.eventSendRecv = WSA_INVALID_EVENT;
- pluginData.eventStop = WSA_INVALID_EVENT;
-#else
- pluginData.eventStop = -1;
-#endif
- pluginData.vmciFd = -1;
- pluginData.vmciFamily = -1;
- pluginData.sock = INVALID_SOCKET;
- Atomic_WriteBool(&pluginData.stopped, FALSE);
+static void
+GdpCloseSocket(void);
-#if defined(_WIN32)
- {
- WSADATA wsaData;
- int res = WSAStartup(MAKEWORD(2, 2), &wsaData);
- if (res != 0) {
- g_critical("%s: WSAStartup failed: error=%d.\n",
- __FUNCTION__, res);
- return FALSE;
- }
+static GdpError
+GdpSendTo(SOCKET sock, // IN
+ const char *buf, // IN
+ int bufLen, // IN
+ const struct sockaddr_vm *destAddr); // IN
- pluginData.wsaStarted = TRUE;
- }
+static GdpError
+GdpRecvFrom(SOCKET sock, // IN
+ char *buf, // OUT
+ int *bufLen, // IN/OUT
+ struct sockaddr_vm *srcAddr); // OUT
- pluginData.eventSendRecv = WSACreateEvent();
- if (pluginData.eventSendRecv == WSA_INVALID_EVENT) {
- g_critical("%s: WSACreateEvent for send/recv failed: error=%d.\n",
- __FUNCTION__, WSAGetLastError());
- goto exit;
- }
+static inline void
+GdpHistoryRequestFree(HistoryRequest *request); // IN
- pluginData.eventStop = WSACreateEvent();
- if (pluginData.eventStop == WSA_INVALID_EVENT) {
- g_critical("%s: WSACreateEvent for stop failed: error=%d.\n",
- __FUNCTION__, WSAGetLastError());
- goto exit;
- }
-#else
- pluginData.eventStop = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
- if (pluginData.eventStop == -1) {
- g_critical("%s: eventfd for stop failed: error=%d.\n",
- __FUNCTION__, errno);
- goto exit;
- }
-#endif
+static void
+GdpHistoryRequestFreeGFunc(gpointer item_data, // IN
+ gpointer user_data); // IN
- pluginData.vmciFamily = VMCISock_GetAFValueFd(&pluginData.vmciFd);
- if (pluginData.vmciFamily == -1) {
- g_critical("%s: Failed to get vSocket address family value.\n",
- __FUNCTION__);
- goto exit;
- }
+static inline void
+GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx); // IN/OUT
- retVal = TRUE;
+static guint32
+GdpGetHistoryCacheSizeLimit(); // IN
-exit:
- if (!retVal) {
- GdpDestroy();
- }
+static inline Bool
+GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx); // IN
- return retVal;
-}
+static void
+GdpHistoryCacheItemFree(HistoryCacheItem *item); // IN
+static void
+GdpHistoryCacheItemFreeGFunc(gpointer item_data, // IN
+ gpointer user_data); // IN
-/*
- ******************************************************************************
- * GdpDestroy --
- *
- * Destroys internal plugin data.
- *
- ******************************************************************************
- */
+static inline void
+GdpTaskClearHistoryCacheQueue(TaskContext *taskCtx); // IN/OUT
static void
-GdpDestroy(void)
-{
- GdpCloseSocket();
+GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx); // IN/OUT
- if (pluginData.vmciFd != -1) {
- VMCISock_ReleaseAFValueFd(pluginData.vmciFd);
- pluginData.vmciFd = -1;
- }
+static void
+GdpTaskHistoryCachePushItem(TaskContext *taskCtx, // IN/OUT
+ gint64 createTime, // IN
+ const gchar *topic, // IN
+ const gchar *token, // IN
+ const gchar *category, // IN
+ const gchar *data, // IN
+ guint32 dataLen); // IN
-#if defined(_WIN32)
- if (pluginData.eventStop != WSA_INVALID_EVENT) {
- WSACloseEvent(pluginData.eventStop);
- pluginData.eventStop = WSA_INVALID_EVENT;
- }
+static gchar *
+GdpGetFormattedUtcTime(gint64 utcTime); // IN
- if (pluginData.eventSendRecv != WSA_INVALID_EVENT) {
- WSACloseEvent(pluginData.eventSendRecv);
- pluginData.eventSendRecv = WSA_INVALID_EVENT;
- }
+static GdpError
+GdpTaskBuildPacket(TaskContext *taskCtx, // IN/OUT
+ gint64 createTime, // IN
+ const gchar *topic, // IN
+ const gchar *token, // IN, OPTIONAL
+ const gchar *category, // IN, OPTIONAL
+ const gchar *data, // IN
+ guint32 dataLen, // IN
+ const gchar *subscribers); // IN, OPTIONAL
- if (pluginData.wsaStarted) {
- WSACleanup();
- pluginData.wsaStarted = FALSE;
- }
-#else
- if (pluginData.eventStop != -1) {
- close(pluginData.eventStop);
- pluginData.eventStop = -1;
- }
-#endif
+static inline void
+GdpTaskDestroyPacket(TaskContext *taskCtx); // IN/OUT
- pluginData.ctx = NULL;
-}
+static inline Bool
+GdpTaskOkToSend(TaskContext *taskCtx); // IN
+
+static GdpError
+GdpTaskSendPacket(TaskContext *taskCtx); // IN/OUT
+
+static gchar *
+GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
+ TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskPublishHistory(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskProcessConfigChange(TaskContext *taskCtx); // IN/OUT
+
+static Bool
+GdpJsonIsTokenOfKey(const char *json, // IN
+ jsmntok_t *token, // IN
+ const char *key); // IN
+
+static Bool
+GdpJsonIsPublishResult(const char *json, // IN
+ jsmntok_t *tokens, // IN
+ int count, // IN
+ PublishResult *result); // OUT
+
+static void
+GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT
+ PublishResult *result); // IN
+
+static Bool
+GdpJsonIsHistoryRequest(const char *json, // IN
+ jsmntok_t *tokens, // IN
+ int count, // IN
+ HistoryRequest *request); // OUT
+
+static void
+GdpTaskPushHistoryRequest(TaskContext *taskCtx, // IN/OUT
+ HistoryRequest *request); // IN
+
+static void
+GdpTaskProcessNetwork(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskProcessPublish(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskProcessTimeout(TaskContext *taskCtx); // IN/OUT
+
+static int
+GdpTaskGetTimeout(TaskContext *taskCtx); // IN
+
+static void
+GdpTaskProcessEvents(TaskContext *taskCtx, // IN/OUT
+ GdpTaskEvent taskEvent); // IN
+
+static GdpError
+GdpTaskWaitForEvents(int timeout, // IN
+ GdpTaskEvent *taskEvent); // OUT
+
+static void
+GdpTaskCtxInit(TaskContext *taskCtx); // OUT
+
+static void
+GdpTaskCtxDestroy(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpThreadTask(ToolsAppCtx *ctx, // IN
+ void *data); // IN
+
+static void
+GdpThreadInterrupt(ToolsAppCtx *ctx, // IN
+ void *data); // IN
+
+static void
+GdpInit(ToolsAppCtx *ctx); // IN
+
+static Bool
+GdpStart(void);
+
+static void
+GdpDestroy(void);
/*
******************************************************************************
- * GdpSetStopEvent --
+ * GdpCreateEvent --
+ *
+ * Creates a new event object/fd.
*
- * Signals the stop event object/fd.
+ * @return The new event object handle or fd on success.
+ * @return GDP_INVALID_EVENT otherwise.
*
******************************************************************************
*/
-static void
-GdpSetStopEvent(void)
+static inline GdpEvent
+GdpCreateEvent(void)
{
#if defined(_WIN32)
- ASSERT(pluginData.eventStop != WSA_INVALID_EVENT);
- WSASetEvent(pluginData.eventStop);
+ return WSACreateEvent();
#else
- eventfd_t val = 1;
- ASSERT(pluginData.eventStop != -1);
- eventfd_write(pluginData.eventStop, val);
+ return eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
#endif
}
/*
******************************************************************************
- * GdpCreateSocket --
- *
- * Creates a non-blocking datagram socket for guest data publishing.
+ * GdpCloseEvent --
*
- * The socket is bound to a local privileged port with default remote address
- * set to host side gdp daemon endpoint.
+ * Closes the event object/fd.
*
- * @return TRUE on success.
- * @return FALSE otherwise.
+ * @param[in,out] event The event object/fd pointer
*
******************************************************************************
*/
-static Bool
-GdpCreateSocket(void)
+static inline void
+GdpCloseEvent(GdpEvent *event) // IN/OUT
{
- struct sockaddr_vm localAddr;
- struct sockaddr_vm remoteAddr;
- int sockErr;
- Bool retVal = FALSE;
-#if defined(_WIN32)
- u_long nbMode = 1; // Non-blocking mode
-# define SOCKET_TYPE_PARAM SOCK_DGRAM
-#else
- /*
- * Requires Linux kernel version >= 2.6.27.
- */
-# define SOCKET_TYPE_PARAM SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC
-#endif
-
- ASSERT(pluginData.sock == INVALID_SOCKET);
-
- pluginData.sock = socket(pluginData.vmciFamily, SOCKET_TYPE_PARAM, 0);
-#undef SOCKET_TYPE_PARAM
-
- if (pluginData.sock == INVALID_SOCKET) {
- g_warning("%s: socket failed: error=%d.\n", __FUNCTION__, GetSockErr());
- return FALSE;
- }
+ ASSERT(event != NULL);
+ if (*event != GDP_INVALID_EVENT) {
#if defined(_WIN32)
- /*
- * Set socket to nonblocking mode.
- * Note: WSAEventSelect automatically does this if not done.
- */
- if (ioctlsocket(pluginData.sock, FIONBIO, &nbMode) != 0) {
- sockErr = GetSockErr();
- g_warning("%s: ioctlsocket failed: error=%d.\n", __FUNCTION__, sockErr);
- goto exit;
- }
-#endif
-
- memset(&localAddr, 0, sizeof localAddr);
- localAddr.svm_family = pluginData.vmciFamily;
- localAddr.svm_cid = VMCISock_GetLocalCID();
- localAddr.svm_port = PRIVILEGED_PORT_MAX; // No htons
-
- do {
- if (bind(pluginData.sock, (struct sockaddr *)&localAddr,
- (socklen_t) sizeof localAddr) == 0) {
- sockErr = 0;
- break;
+ if (!WSACloseEvent(*event)) {
+ g_warning("%s: WSACloseEvent failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
}
-
- sockErr = GetSockErr();
- if (sockErr == SYSERR_EADDRINUSE) {
- g_info("%s: Local port %d is in use, retry the next one.\n",
- __FUNCTION__, localAddr.svm_port);
- localAddr.svm_port--;
- } else {
- g_warning("%s: bind failed: error=%d.\n", __FUNCTION__, sockErr);
- goto exit;
+#else
+ if (close(*event) != 0) {
+ g_warning("%s: close failed: error=%d.\n",
+ __FUNCTION__, errno);
}
- } while (localAddr.svm_port >= PRIVILEGED_PORT_MIN);
-
- if (sockErr != 0) {
- goto exit;
- }
-
- memset(&remoteAddr, 0, sizeof remoteAddr);
- remoteAddr.svm_family = pluginData.vmciFamily;
- remoteAddr.svm_cid = VMCI_HOST_CONTEXT_ID;
- remoteAddr.svm_port = GDPD_LISTEN_PORT; // No htons
-
- /*
- * Set default remote address to send to/recv from datagrams.
- */
- if (connect(pluginData.sock, (struct sockaddr *)&remoteAddr,
- (socklen_t) sizeof remoteAddr) != 0) {
- sockErr = GetSockErr();
- g_warning("%s: connect failed: error=%d.\n", __FUNCTION__, sockErr);
- goto exit;
+#endif
+ *event = GDP_INVALID_EVENT;
}
+}
- g_debug("%s: Socket created and bound to local port %d.\n",
- __FUNCTION__, localAddr.svm_port);
- retVal = TRUE;
+/*
+ ******************************************************************************
+ * GdpSetEvent --
+ *
+ * Signals the event object/fd.
+ *
+ * @param[in] event The event object/fd
+ *
+ ******************************************************************************
+ */
-exit:
- if (!retVal) {
- CloseSocket(pluginData.sock);
- pluginData.sock = INVALID_SOCKET;
+static inline void
+GdpSetEvent(GdpEvent event) // IN
+{
+#if defined(_WIN32)
+ ASSERT(event != GDP_INVALID_EVENT);
+ if (!WSASetEvent(event)) {
+ g_warning("%s: WSASetEvent failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
}
-
- return retVal;
+#else
+ eventfd_t val = 1;
+ ASSERT(event != GDP_INVALID_EVENT);
+ if (eventfd_write(event, val) != 0) {
+ g_warning("%s: eventfd_write failed: error=%d.\n", __FUNCTION__, errno);
+ }
+#endif
}
/*
******************************************************************************
- * GdpCloseSocket --
+ * GdpResetEvent --
+ *
+ * Resets the event object/fd.
*
- * Closes the guest data publishing socket.
+ * @param[in] event The event object/fd
*
******************************************************************************
*/
-static void
-GdpCloseSocket(void)
+static inline void
+GdpResetEvent(GdpEvent event) // IN
{
- if (pluginData.sock != INVALID_SOCKET) {
- g_debug("%s: Closing socket.\n", __FUNCTION__);
- if (CloseSocket(pluginData.sock) != 0) {
- g_warning("%s: CloseSocket failed: fd=%d, error=%d.\n",
- __FUNCTION__, pluginData.sock, GetSockErr());
- }
-
- pluginData.sock = INVALID_SOCKET;
+#if defined(_WIN32)
+ ASSERT(event != GDP_INVALID_EVENT);
+ if (!WSAResetEvent(event)) {
+ g_warning("%s: WSAResetEvent failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
+ }
+#else
+ eventfd_t val;
+ ASSERT(event != GDP_INVALID_EVENT);
+ if (eventfd_read(event, &val) != 0) {
+ g_warning("%s: eventfd_read failed: error=%d.\n", __FUNCTION__, errno);
}
+#endif
}
/*
******************************************************************************
- * GdpEmptyRecvQueue --
+ * GdpWaitForEvent --
*
- * Empties receive queue before publishing new data to host side gdp daemon.
- * This is required in case previous receive from daemon timed out and daemon
- * did reply later.
+ * Waits for the event object/fd or timeout.
*
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @param[in] event The event object/fd
+ * @param[in] timeout Timeout value in milliseconds,
+ * negative value means an infinite timeout,
+ * zero means no wait.
+ *
+ * @return GDP_ERROR_SUCCESS on event signalled.
+ * @return GDP_ERROR_TIMEOUT on timeout.
+ * @return Other GdpError code otherwise.
*
******************************************************************************
*/
static GdpError
-GdpEmptyRecvQueue(void)
+GdpWaitForEvent(GdpEvent event, // IN
+ int timeout) // IN
{
+#if defined(_WIN32)
+ DWORD localTimeout;
+ gint64 startTime;
GdpError retVal;
- ASSERT(pluginData.sock != INVALID_SOCKET);
+ localTimeout = (DWORD)(timeout >= 0 ? timeout : WSA_INFINITE);
+ if (timeout > 0) {
+ startTime = g_get_monotonic_time();
+ } else {
+ startTime = 0; // Deals with [-Werror=maybe-uninitialized]
+ }
+
+ while (TRUE) {
+ WSAEVENT eventObjects[] = { event };
+ DWORD waitRes;
+
+ waitRes = WSAWaitForMultipleEvents((DWORD) ARRAYSIZE(eventObjects),
+ eventObjects,
+ FALSE, localTimeout, TRUE);
+ if (waitRes == WSA_WAIT_EVENT_0) {
+ retVal = GDP_ERROR_SUCCESS;
+ break;
+ } else if (waitRes == WSA_WAIT_IO_COMPLETION) {
+ gint64 curTime;
+ gint64 passedTime;
+
+ if (localTimeout == 0 ||
+ localTimeout == WSA_INFINITE) {
+ continue;
+ }
+
+ curTime = g_get_monotonic_time();
+ passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
+ if (passedTime >= localTimeout) {
+ retVal = GDP_ERROR_TIMEOUT;
+ break;
+ }
+
+ startTime = curTime;
+ localTimeout -= (DWORD) passedTime;
+ continue;
+ } else if (waitRes == WSA_WAIT_TIMEOUT) {
+ retVal = GDP_ERROR_TIMEOUT;
+ break;
+ } else { // WSA_WAIT_FAILED
+ g_warning("%s: WSAWaitForMultipleEvents failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
+ retVal = GDP_ERROR_GENERAL;
+ break;
+ }
+ }
+
+ return retVal;
+
+#else
+
+ gint64 startTime;
+ GdpError retVal;
+
+ if (timeout > 0) {
+ startTime = g_get_monotonic_time();
+ } else {
+ startTime = 0; // Deals with [-Werror=maybe-uninitialized]
+ }
+
+ while (TRUE) {
+ struct pollfd fds[1];
+ int res;
+
+ fds[0].fd = event;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+
+ res = poll(fds, ARRAYSIZE(fds), timeout);
+ if (res > 0) {
+ if (fds[0].revents & POLLIN) {
+ retVal = GDP_ERROR_SUCCESS;
+ } else { // Not expected
+ g_warning("%s: Unexpected event.\n", __FUNCTION__);
+ retVal = GDP_ERROR_GENERAL;
+ }
+
+ break;
+ } else if (res == -1) {
+ int err = errno;
+ if (err == EINTR) {
+ gint64 curTime;
+ gint64 passedTime;
+
+ if (timeout <= 0) {
+ continue;
+ }
+
+ curTime = g_get_monotonic_time();
+ passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
+ if (passedTime >= timeout) {
+ retVal = GDP_ERROR_TIMEOUT;
+ break;
+ }
+
+ startTime = curTime;
+ timeout -= (int) passedTime;
+ continue;
+ } else {
+ g_warning("%s: poll failed: error=%d.\n", __FUNCTION__, err);
+ retVal = GDP_ERROR_GENERAL;
+ break;
+ }
+ } else if (res == 0) {
+ retVal = GDP_ERROR_TIMEOUT;
+ break;
+ } else {
+ g_warning("%s: Unexpected poll return: %d.\n", __FUNCTION__, res);
+ retVal = GDP_ERROR_GENERAL;
+ break;
+ }
+ }
+
+ return retVal;
+#endif
+}
+
+
+/*
+ ******************************************************************************
+ * GdpCreateSocket --
+ *
+ * Creates a non-blocking datagram socket for guest data publishing.
+ *
+ * The socket is bound to the gdp guest receive port.
+ *
+ * @return TRUE on success.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpCreateSocket(void)
+{
+ Bool retVal;
+ struct sockaddr_vm localAddr;
+#if defined(_WIN32)
+ u_long nbMode = 1; // Non-blocking mode
+# define SOCKET_TYPE_PARAM SOCK_DGRAM
+#else
+ /*
+ * Requires Linux kernel version >= 2.6.27.
+ */
+# define SOCKET_TYPE_PARAM SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC
+#endif
+
+ ASSERT(gPluginState.sock == INVALID_SOCKET);
+ ASSERT(gPluginState.vmciFamily != -1);
+
+ gPluginState.sock = socket(gPluginState.vmciFamily, SOCKET_TYPE_PARAM, 0);
+#undef SOCKET_TYPE_PARAM
+
+ if (gPluginState.sock == INVALID_SOCKET) {
+ g_critical("%s: socket failed: error=%d.\n", __FUNCTION__, GetSysErr());
+ return FALSE;
+ }
+
+ retVal = FALSE;
+
+#if defined(_WIN32)
+ /*
+ * Sets socket to nonblocking mode.
+ * Note: WSAEventSelect automatically does this if not done.
+ */
+ if (ioctlsocket(gPluginState.sock, FIONBIO, &nbMode) != 0) {
+ g_critical("%s: ioctlsocket failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
+ goto exit;
+ }
+#endif
+
+ memset(&localAddr, 0, sizeof localAddr);
+ localAddr.svm_family = gPluginState.vmciFamily;
+ localAddr.svm_cid = VMCISock_GetLocalCID();
+ localAddr.svm_port = GDP_RECV_PORT; // No htons
+
+ if (bind(gPluginState.sock, (struct sockaddr *) &localAddr,
+ (socklen_t) sizeof localAddr) != 0) {
+ g_critical("%s: bind failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
+ goto exit;
+ }
+
+ g_debug("%s: Socket created and bound to local port %d.\n",
+ __FUNCTION__, localAddr.svm_port);
+
+ retVal = TRUE;
+
+exit:
+ if (!retVal) {
+ CloseSocket(gPluginState.sock);
+ gPluginState.sock = INVALID_SOCKET;
+ }
+
+ return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpCloseSocket --
+ *
+ * Closes the guest data publishing datagram socket.
+ *
+ ******************************************************************************
+ */
+
+static void
+GdpCloseSocket(void)
+{
+ if (gPluginState.sock != INVALID_SOCKET) {
+ g_debug("%s: Closing socket.\n", __FUNCTION__);
+ if (CloseSocket(gPluginState.sock) != 0) {
+ g_warning("%s: CloseSocket failed: fd=%d, error=%d.\n",
+ __FUNCTION__, gPluginState.sock, GetSysErr());
+ }
+
+ gPluginState.sock = INVALID_SOCKET;
+ }
+}
+
+
+/*
+ ******************************************************************************
+ * GdpSendTo --
+ *
+ * Wrapper of sendto() for VMCI datagram socket.
+ *
+ * Datagram send is not buffered, it will not return EWOULDBLOCK.
+ * If host daemon is not running, error EHOSTUNREACH is returned.
+ *
+ * @param[in] sock VMCI datagram socket descriptor
+ * @param[in] buf Data buffer pointer
+ * @param[in] bufLen Data length
+ * @param[in] destAddr Destination VMCI datagram socket address
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpSendTo(SOCKET sock, // IN
+ const char *buf, // IN
+ int bufLen, // IN
+ const struct sockaddr_vm *destAddr) // IN
+{
+ GdpError retVal;
+
+ ASSERT(sock != INVALID_SOCKET);
+ ASSERT(buf != NULL && bufLen > 0);
+ ASSERT(destAddr != NULL);
do {
long res;
- int sockErr;
- char buf[1]; // OK to truncate, case of SYSERR_EMSGSIZE.
+ int err;
+ socklen_t destAddrLen = (socklen_t) sizeof *destAddr;
- /*
- * Windows: recv returns -1, first with SYSERR_EMSGSIZE,
- * then SYSERR_WOULDBLOCK.
- * Linux : recv returns 1 first, then -1 with SYSERR_WOULDBLOCK.
- */
- res = recv(pluginData.sock, buf, (int) sizeof buf, 0);
- if (res >= 0) {
- g_debug("%s: recv returns %d.\n", __FUNCTION__, (int)res);
- continue;
+ res = sendto(sock, buf, bufLen, 0,
+ (const struct sockaddr *) destAddr, destAddrLen);
+ if (res == bufLen) {
+ retVal = GDP_ERROR_SUCCESS;
+ break;
+ } else if (res != SOCKET_ERROR) { // No partial send shall happen
+ g_warning("%s: sendto returned unexpected value %d.\n",
+ __FUNCTION__, (int) res);
+ retVal = GDP_ERROR_GENERAL;
+ break;
}
- sockErr = GetSockErr();
- if (sockErr == SYSERR_EINTR) {
+ err = GetSysErr();
+ if (err == SYSERR_EINTR) {
continue;
- } else if (sockErr == SYSERR_EMSGSIZE) {
- g_debug("%s: recv truncated.\n", __FUNCTION__);
- continue;
- } else if (SYSERR_WOULDBLOCK(sockErr)) {
- retVal = GDP_ERROR_SUCCESS; // No more message in the recv queue.
+ } else if (err == SYSERR_EHOSTUNREACH) {
+ g_info("%s: sendto failed: host daemon unreachable.\n", __FUNCTION__);
+ retVal = GDP_ERROR_UNREACH;
+ } else if (err == SYSERR_EMSGSIZE) {
+ g_warning("%s: sendto failed: message too large.\n", __FUNCTION__);
+ retVal = GDP_ERROR_DATA_SIZE;
} else {
- /*
- * Note: recv does not return SYSERR_EHOSTUNREACH.
- */
- g_info("%s: recv failed: error=%d.\n", __FUNCTION__, sockErr);
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: sendto failed: error=%d.\n", __FUNCTION__, err);
+ retVal = GDP_ERROR_GENERAL;
+ }
+
+ break;
+
+ } while (TRUE);
+
+ return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpRecvFrom --
+ *
+ * Wrapper of recvfrom() for VMCI datagram socket.
+ *
+ * @param[in] sock VMCI datagram socket descriptor
+ * @param[out] buf Buffer pointer
+ * @param[in,out] bufLen Buffer length on input,
+ * received data length on output
+ * @param[out] srcAddr Source VMCI datagram socket address
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpRecvFrom(SOCKET sock, // IN
+ char *buf, // OUT
+ int *bufLen, // IN/OUT
+ struct sockaddr_vm *srcAddr) // OUT
+{
+ GdpError retVal;
+
+ ASSERT(sock != INVALID_SOCKET);
+ ASSERT(buf != NULL && bufLen != NULL && *bufLen > 0);
+ ASSERT(srcAddr != NULL);
+
+ do {
+ long res;
+ int err;
+ socklen_t srcAddrLen = (socklen_t) sizeof *srcAddr;
+
+ res = recvfrom(sock, buf, *bufLen, 0,
+ (struct sockaddr *) srcAddr, &srcAddrLen);
+ if (res >= 0) {
+ *bufLen = (int) res;
+ retVal = GDP_ERROR_SUCCESS;
+ break;
+ }
+
+ ASSERT(res == SOCKET_ERROR);
+ err = GetSysErr();
+ if (err == SYSERR_EINTR) {
+ continue;
+ } else if (err == SYSERR_EMSGSIZE) {
+ g_warning("%s: recvfrom failed: buffer size too small.\n",
+ __FUNCTION__);
+ retVal = GDP_ERROR_DATA_SIZE;
+ } else { // Including EWOULDBLOCK
+ g_warning("%s: recvfrom failed: error=%d.\n", __FUNCTION__, err);
+ retVal = GDP_ERROR_GENERAL;
}
- break;
+ break;
+
+ } while (TRUE);
+
+ return retVal;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryRequestFree --
+ *
+ * Frees history request resources.
+ *
+ * @param[in] request History request pointer
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpHistoryRequestFree(HistoryRequest *request) // IN
+{
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+ free(request);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryRequestFreeGFunc --
+ *
+ * GFunc called by GdpTaskClearHistoryRequestQueue.
+ *
+ * @param[in] item_data History request pointer
+ * @param[in] user_data Not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpHistoryRequestFreeGFunc(gpointer item_data, // IN
+ gpointer user_data) // IN
+{
+ GdpHistoryRequestFree((HistoryRequest *) item_data);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskClearHistoryRequestQueue --
+ *
+ * Removes all the elements in history request queue.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx) // IN/OUT
+{
+ g_queue_foreach(&taskCtx->requests, GdpHistoryRequestFreeGFunc, NULL);
+ g_queue_clear(&taskCtx->requests);
+}
+
+
+/*
+ ******************************************************************************
+ * GdpGetHistoryCacheSizeLimit --
+ *
+ * Gets history cache size limit from tools config.
+ *
+ ******************************************************************************
+ */
+
+static guint32
+GdpGetHistoryCacheSizeLimit() // IN
+{
+ gint sizeLimit;
+
+ sizeLimit = GDP_CONFIG_GET_INT(CONFNAME_GDP_CACHE_SIZE,
+ GDP_DEFAULT_CACHE_SIZE_LIMIT);
+ if (sizeLimit != 0 &&
+ (sizeLimit < GDP_MIN_CACHE_SIZE_LIMIT ||
+ sizeLimit > GDP_MAX_CACHE_SIZE_LIMIT)) {
+ g_warning("%s: Configured history cache size limit %d exceeds range, "
+ "set to default value %d.\n",
+ __FUNCTION__, sizeLimit, GDP_DEFAULT_CACHE_SIZE_LIMIT);
+ sizeLimit = GDP_DEFAULT_CACHE_SIZE_LIMIT;
+ }
+
+ return (guint32) sizeLimit;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskIsHistoryCacheEnabled --
+ *
+ * Checks if history cache is enabled.
+ *
+ * @param[in] taskCtx The task context
+ *
+ * @return TRUE if history cache size limit is not zero.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static inline Bool
+GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx) // IN
+{
+ return taskCtx->cache.sizeLimit != 0 ? TRUE : FALSE;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryCacheItemFree --
+ *
+ * Frees history cache item resources.
+ *
+ * @param[in] item History cache item pointer
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpHistoryCacheItemFree(HistoryCacheItem *item) // IN
+{
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+ ASSERT(item != NULL);
+ free(item->topic);
+ free(item->token);
+ free(item->category);
+ free(item->data);
+ free(item);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryCacheItemFreeGFunc --
+ *
+ * GFunc called by GdpTaskClearHistoryCacheQueue.
+ *
+ * @param[in] item_data History cache item pointer
+ * @param[in] user_data Not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpHistoryCacheItemFreeGFunc(gpointer item_data, // IN
+ gpointer user_data) // IN
+{
+ GdpHistoryCacheItemFree((HistoryCacheItem *) item_data);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskClearHistoryCacheQueue --
+ *
+ * Removes all the elements in history cache queue.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTaskClearHistoryCacheQueue(TaskContext *taskCtx) // IN/OUT
+{
+ g_queue_foreach(&taskCtx->cache.queue, GdpHistoryCacheItemFreeGFunc, NULL);
+ g_queue_clear(&taskCtx->cache.queue);
+
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskDeleteHistoryCacheTail --
+ *
+ * Deletes the tail of history cache queue and removes its reference.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx) // IN/OUT
+{
+ HistoryCacheItem *item;
+
+ ASSERT(!g_queue_is_empty(&taskCtx->cache.queue));
+
+ if (taskCtx->cache.currentLink ==
+ g_queue_peek_tail_link(&taskCtx->cache.queue)) {
+ taskCtx->cache.currentLink = NULL;
+ }
+
+ item = (HistoryCacheItem *) g_queue_pop_tail(&taskCtx->cache.queue);
+ ASSERT(item != NULL);
+ taskCtx->cache.size -= item->itemSize;
+ GdpHistoryCacheItemFree(item);
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskHistoryCachePushItem --
+ *
+ * Pushes the published guest data item into history cache queue.
+ *
+ * @param[in,out] taskCtx The task context
+ * @param[in] createTime UTC timestamp, in number of micro-
+ * seconds since January 1, 1970 UTC.
+ * @param[in] topic Topic
+ * @param[in,optional] token Token, can be NULL
+ * @param[in,optional] category Category, can be NULL that defaults to
+ * "application"
+ * @param[in] data Buffer containing data to publish
+ * @param[in] dataLen Buffer length
+ *
+ ******************************************************************************
+ */
+
+static void
+GdpTaskHistoryCachePushItem(TaskContext *taskCtx, // IN/OUT
+ gint64 createTime, // IN
+ const gchar *topic, // IN
+ const gchar *token, // IN
+ const gchar *category, // IN
+ const gchar *data, // IN
+ guint32 dataLen) // IN
+{
+ HistoryCacheItem *item;
+
+ ASSERT(topic != NULL);
+ ASSERT(data != NULL && dataLen > 0);
+ ASSERT(GdpTaskIsHistoryCacheEnabled(taskCtx));
+
+ item = (HistoryCacheItem *) Util_SafeMalloc(sizeof(HistoryCacheItem));
+ item->createTime = createTime;
+ item->topic = Util_SafeStrdup(topic);
+ item->token = Util_SafeStrdup(token);
+ item->category = Util_SafeStrdup(category);
+ item->data = (gchar *) Util_SafeMalloc(dataLen);
+ Util_Memcpy(item->data, data, dataLen);
+ item->dataLen = dataLen;
+
+ item->cacheTime = g_get_monotonic_time();
+ item->itemSize = ((guint32) sizeof(HistoryCacheItem))
+ + GDP_STR_SIZE(item->topic)
+ + GDP_STR_SIZE(item->token)
+ + GDP_STR_SIZE(item->category)
+ + item->dataLen;
+
+ while (taskCtx->cache.size + item->itemSize > taskCtx->cache.sizeLimit) {
+ GdpTaskDeleteHistoryCacheTail(taskCtx);
+ }
+
+ g_queue_push_head(&taskCtx->cache.queue, item);
+ taskCtx->cache.size += item->itemSize;
+
+ g_debug("%s: Current history cache size in bytes: %u, item count: %u.\n",
+ __FUNCTION__, taskCtx->cache.size,
+ g_queue_get_length(&taskCtx->cache.queue));
+}
+
+
+/*
+ ******************************************************************************
+ * GdpGetFormattedUtcTime --
+ *
+ * Converts UTC time to string in format like "2018-02-22T21:17:38.517Z".
+ *
+ * The caller must free the return value using g_free.
+ *
+ * @param[in] utcTime Real wall-clock time
+ * Number of microseconds since January 1, 1970 UTC.
+ *
+ * @return Formatted timestamp string on success.
+ * @return NULL otherwise.
+ *
+ ******************************************************************************
+ */
+
+static gchar *
+GdpGetFormattedUtcTime(gint64 utcTime) // IN
+{
+ gchar *retVal = NULL;
+ GDateTime *utcDateTime;
+
+ utcDateTime = g_date_time_new_from_unix_utc(utcTime / USEC_PER_SECOND);
+ if (utcDateTime != NULL) {
+ gchar *dateToSecond; // YYYY-MM-DDTHH:MM:SS
+
+ dateToSecond = g_date_time_format(utcDateTime, "%FT%T");
+ if (dateToSecond != NULL) {
+ gint msec; // milliseconds
+
+ msec = (utcTime % USEC_PER_SECOND) / USEC_PER_MILLISECOND;
+ retVal = g_strdup_printf("%s.%03dZ", dateToSecond, msec);
+ g_free(dateToSecond);
+ }
+
+ g_date_time_unref(utcDateTime);
+ }
+
+ return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskBuildPacket --
+ *
+ * Builds JSON packet in the task context.
+ *
+ * @param[in,out] taskCtx The task context
+ * @param[in] createTime UTC timestamp, in number of micro-
+ * seconds since January 1, 1970 UTC.
+ * @param[in] topic Topic
+ * @param[in,optional] token Token, can be NULL
+ * @param[in,optional] category Category, can be NULL that defaults to
+ * "application"
+ * @param[in] data Buffer containing data to publish
+ * @param[in] dataLen Buffer length
+ * @param[in,optional] subscribers For history data only, NULL for new data
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpTaskBuildPacket(TaskContext *taskCtx, // IN/OUT
+ gint64 createTime, // IN
+ const gchar *topic, // IN
+ const gchar *token, // IN, OPTIONAL
+ const gchar *category, // IN, OPTIONAL
+ const gchar *data, // IN
+ guint32 dataLen, // IN
+ const gchar *subscribers) // IN, OPTIONAL
+{
+ gchar base64Data[GDP_MAX_PACKET_LEN + 1]; // Add a space for NULL
+ gchar *subscribersLine = NULL;
+ gchar *formattedTime;
+ GdpError gdpErr;
+
+ ASSERT(topic != NULL);
+ ASSERT(data != NULL && dataLen > 0);
+
+ if (!Base64_Encode(data, dataLen, base64Data, sizeof base64Data, NULL)) {
+ g_info("%s: Base64_Encode failed, data length is %u.\n",
+ __FUNCTION__, dataLen);
+ return GDP_ERROR_DATA_SIZE;
+ }
+
+ if (subscribers != NULL && *subscribers != '\0') {
+ subscribersLine = g_strdup_printf(GDP_PACKET_JSON_LINE_SUBSCRIBERS,
+ subscribers);
+ }
+
+ formattedTime = GdpGetFormattedUtcTime(createTime);
+
+ ASSERT(taskCtx->packet == NULL);
+ taskCtx->packet = g_strdup_printf(GDP_PACKET_JSON,
+ ++taskCtx->sequence,
+ subscribersLine != NULL ?
+ subscribersLine : "",
+ formattedTime != NULL ?
+ formattedTime : "",
+ topic,
+ token != NULL ?
+ token : "",
+ category != NULL ?
+ category : "application",
+ base64Data);
+ taskCtx->packetLen = (guint32) strlen(taskCtx->packet);
+ if (taskCtx->packetLen > GDP_MAX_PACKET_LEN) {
+ g_info("%s: Packet length (%u) exceeds maximum limit (%u).\n",
+ __FUNCTION__, taskCtx->packetLen, GDP_MAX_PACKET_LEN);
+ GdpTaskDestroyPacket(taskCtx);
+ gdpErr = GDP_ERROR_DATA_SIZE;
+ } else {
+ gdpErr = GDP_ERROR_SUCCESS;
+ }
+
+ g_free(formattedTime);
+ g_free(subscribersLine);
+ return gdpErr;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskDestroyPacket --
+ *
+ * Destroys JSON packet in the task context.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTaskDestroyPacket(TaskContext *taskCtx) // IN/OUT
+{
+ g_free(taskCtx->packet);
+ taskCtx->packet = NULL;
+ taskCtx->packetLen = 0;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskOkToSend --
+ *
+ * Checks if current time is OK to send JSON packet to host side gdp daemon.
+ *
+ * @param[in] taskCtx The task context
+ *
+ * @return TRUE if current time has passed the task context sendAfter time.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static inline Bool
+GdpTaskOkToSend(TaskContext *taskCtx) // IN
+{
+ return g_get_monotonic_time() >= taskCtx->sendAfter ?
+ TRUE : FALSE;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskSendPacket --
+ *
+ * Sends JSON packet in the task context to host side gdp daemon.
+ *
+ * Datagram send is not buffered, it will not return EWOULDBLOCK.
+ * If host daemon is not running, error EHOSTUNREACH is returned.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpTaskSendPacket(TaskContext *taskCtx) // IN/OUT
+{
+ struct sockaddr_vm destAddr;
+ GdpError gdpErr;
+
+ ASSERT(gPluginState.sock != INVALID_SOCKET);
+ ASSERT(taskCtx->packet != NULL && taskCtx->packetLen > 0);
+
+ memset(&destAddr, 0, sizeof destAddr);
+ destAddr.svm_family = gPluginState.vmciFamily;
+ destAddr.svm_cid = VMCI_HOST_CONTEXT_ID;
+ destAddr.svm_port = GDPD_RECV_PORT; // No htons
+
+ gdpErr = GdpSendTo(gPluginState.sock, taskCtx->packet,
+ (int) taskCtx->packetLen, &destAddr);
+ if (gdpErr == GDP_ERROR_SUCCESS) {
+ /*
+ * Updates sendAfter for next send, phase 1 / 2.
+ */
+ taskCtx->sendAfter = g_get_monotonic_time();
+ taskCtx->timeoutAt = taskCtx->sendAfter +
+ GDP_WAIT_RESULT_TIMEOUT * USEC_PER_MILLISECOND;
+ } else {
+ GdpTaskDestroyPacket(taskCtx);
+ taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+ }
+
+ return gdpErr;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskUpdateHistoryCachePointerAndGetSubscribers --
+ *
+ * Updates history cache pointer and gets subscribers of the pointed item.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ * @return A string of subscription IDs separated by comma
+ (to be freed by caller),
+ * or NULL if no match.
+ *
+ *****************************************************************************
+ */
+
+static gchar *
+GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
+ TaskContext *taskCtx) // IN/OUT
+{
+ gchar *subscribers = NULL;
+
+ if (taskCtx->cache.currentLink == NULL) {
+ /*
+ * Starts with the earliest history cache link.
+ */
+ taskCtx->cache.currentLink = g_queue_peek_tail_link(
+ &taskCtx->cache.queue);
+ }
+
+ while (taskCtx->cache.currentLink != NULL &&
+ !g_queue_is_empty(&taskCtx->requests)) {
+ HistoryCacheItem *item;
+ GList *requestList;
+
+ item = (HistoryCacheItem *) taskCtx->cache.currentLink->data;
+ requestList = g_queue_peek_tail_link(&taskCtx->requests);
+ while (requestList != NULL) {
+ HistoryRequest *request = (HistoryRequest *) requestList->data;
+ Bool requestDone = FALSE;
+
+ if (item->cacheTime > request->endCacheTime) {
+ requestDone = TRUE;
+ } else if (request->beginCacheTime < item->cacheTime &&
+ item->cacheTime <= request->endCacheTime) {
+ if (subscribers == NULL) {
+ subscribers = g_strdup_printf("\"%s\"", request->id);
+ } else {
+ gchar *subscribersNew;
+ subscribersNew = g_strdup_printf("%s,\"%s\"",
+ subscribers, request->id);
+ g_free(subscribers);
+ subscribers = subscribersNew;
+ }
+
+ if (item->cacheTime == request->endCacheTime) {
+ requestDone = TRUE;
+ } else {
+ request->beginCacheTime = item->cacheTime;
+ }
+ }
+
+ if (requestDone) {
+ GList *requestListPrev = requestList->prev;
+ GdpHistoryRequestFree(request);
+ g_queue_delete_link(&taskCtx->requests, requestList);
+ requestList = requestListPrev;
+ } else {
+ requestList = requestList->prev;
+ }
+ }
+
+ if (subscribers != NULL) {
+ break;
+ }
+
+ taskCtx->cache.currentLink = taskCtx->cache.currentLink->prev;
+ }
+
+ return subscribers;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskPublishHistory --
+ *
+ * Publishes cached history guest data.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskPublishHistory(TaskContext *taskCtx) // IN/OUT
+{
+ GdpError gdpErr;
+ gchar *subscribers;
+ HistoryCacheItem *item;
+
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ ASSERT(taskCtx->mode == GDP_TASK_MODE_NONE &&
+ taskCtx->state == GDP_TASK_STATE_IDLE);
+
+ subscribers = GdpTaskUpdateHistoryCachePointerAndGetSubscribers(taskCtx);
+ if (subscribers == NULL) {
+ g_info("%s: No history item meets subscription requirement.\n",
+ __FUNCTION__);
+ goto cleanup;
+ }
+
+ ASSERT(taskCtx->cache.currentLink != NULL);
+ item = (HistoryCacheItem *) taskCtx->cache.currentLink->data;
+ /*
+ * Updates history cache pointer.
+ */
+ taskCtx->cache.currentLink = taskCtx->cache.currentLink->prev;
+
+ gdpErr = GdpTaskBuildPacket(taskCtx,
+ item->createTime,
+ item->topic,
+ item->token,
+ item->category,
+ item->data,
+ item->dataLen,
+ subscribers);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
+ /*
+ * Theoretically speaking, too many subscribers could cause JSON packet
+ * length exceed maximum limit and fail GdpTaskBuildPacket with
+ * GDP_ERROR_DATA_SIZE.
+ */
+ g_info("%s: Failed to build JSON packet for subscribers: [%s].\n",
+ __FUNCTION__, subscribers);
+ g_free(subscribers);
+ goto cleanup;
+ }
+ g_free(subscribers);
+
+ if (GdpTaskOkToSend(taskCtx)) {
+ gdpErr = GdpTaskSendPacket(taskCtx);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
+ g_info("%s: Failed to send history JSON packet.\n",
+ __FUNCTION__);
+ goto cleanup;
+ }
+
+ taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1;
+ } else {
+ taskCtx->timeoutAt = taskCtx->sendAfter;
+ taskCtx->state = GDP_TASK_STATE_WAIT_TO_SEND;
+ }
+
+ taskCtx->mode = GDP_TASK_MODE_HISTORY;
+ g_debug("%s: Updated mode=%d, state=%d.\n",
+ __FUNCTION__, taskCtx->mode, taskCtx->state);
+ return;
+
+cleanup:
+ taskCtx->cache.currentLink = NULL;
+ GdpTaskClearHistoryRequestQueue(taskCtx);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessConfigChange --
+ *
+ * Processes config change.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessConfigChange(TaskContext *taskCtx) // IN/OUT
+{
+ guint32 sizeLimit = GdpGetHistoryCacheSizeLimit();
+
+ if (taskCtx->cache.sizeLimit == sizeLimit) {
+ return;
+ }
+
+ g_debug("%s: Current history cache size limit: %u, new value: %u.\n",
+ __FUNCTION__, taskCtx->cache.sizeLimit, sizeLimit);
+ taskCtx->cache.sizeLimit = sizeLimit;
+ while (taskCtx->cache.size > taskCtx->cache.sizeLimit) {
+ GdpTaskDeleteHistoryCacheTail(taskCtx);
+ }
+}
+
+
+/*
+ ******************************************************************************
+ * GdpJsonIsTokenOfKey --
+ *
+ * Checks if the JSON token represents the key.
+ *
+ * @param[in] json The JSON text
+ * @param[in] token Token
+ * @param[in] key Key name
+ *
+ * @return TRUE if token represents the key.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpJsonIsTokenOfKey(const char *json, // IN
+ jsmntok_t *token, // IN
+ const char *key) // IN
+{
+ int tokenLen = token->end - token->start;
+ if (token->type == JSMN_STRING &&
+ token->size == 1 && // The token represents a key
+ (int) strlen(key) == tokenLen &&
+ strncmp(json + token->start, key, tokenLen) == 0) {
+ return TRUE;
+ }
+ return FALSE;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpJsonIsPublishResult --
+ *
+ * Checks if the JSON text represents a publish result.
+ *
+ * @param[in] json The JSON text
+ * @param[in] tokens Token array
+ * @param[in] count Token array count
+ * @param[out] request Pointer to publish result
+ *
+ * @return TRUE if json represents a publish result.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpJsonIsPublishResult(const char *json, // IN
+ jsmntok_t *tokens, // IN
+ int count, // IN
+ PublishResult *result) // OUT
+{
+ int index;
+ int requiredKeys = GDP_RESULT_REQUIRED_KEYS;
+ gchar *diagnosis = NULL;
+
+ /*
+ * Loops over all keys of the root object.
+ */
+ for (index = 1; index < count; index++) {
+ int tokenLen;
+
+ if (GdpJsonIsTokenOfKey(json, &tokens[index], GDP_RESULT_SEQUENCE)) {
+ requiredKeys--;
+ index++;
+ result->sequence = g_ascii_strtoull(json + tokens[index].start,
+ NULL, 10);
+ } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+ GDP_RESULT_STATUS)) {
+ requiredKeys--;
+ index++;
+ tokenLen = tokens[index].end - tokens[index].start;
+ if ((int) strlen(GDP_RESULT_STATUS_OK) == tokenLen &&
+ strncmp(json + tokens[index].start,
+ GDP_RESULT_STATUS_OK, tokenLen) == 0) {
+ result->statusOk = TRUE;
+ } else {
+ result->statusOk = FALSE;
+ }
+ } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+ GDP_RESULT_DIAGNOSIS)) {
+ index++;
+ tokenLen = tokens[index].end - tokens[index].start;
+ diagnosis = g_strndup(json + tokens[index].start, tokenLen);
+ } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+ GDP_RESULT_RATE_LIMIT)) {
+ requiredKeys--;
+ index++;
+ result->rateLimit = atoi(json + tokens[index].start);
+ }
+ }
+
+ if (requiredKeys == 0) {
+ result->diagnosis = diagnosis;
+ return TRUE;
+ } else {
+ g_free(diagnosis);
+ return FALSE;
+ }
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessPublishResult --
+ *
+ * Processes publish result.
+ *
+ * @param[in,out] taskCtx The task context
+ * @param[in] result Publish result
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT
+ PublishResult *result) // IN
+{
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ if (taskCtx->mode == GDP_TASK_MODE_NONE ||
+ (taskCtx->state != GDP_TASK_STATE_WAIT_FOR_RESULT1 &&
+ taskCtx->state != GDP_TASK_STATE_WAIT_FOR_RESULT2)) {
+ g_info("%s: Publish result not expected at mode=%d, state=%d.\n",
+ __FUNCTION__, taskCtx->mode, taskCtx->state);
+ return;
+ }
+
+ if (taskCtx->sequence != result->sequence) {
+ g_info("%s: Publish result sequence number not match.\n",
+ __FUNCTION__);
+ return;
+ }
+
+ if (!result->statusOk) {
+ g_info("%s: Publish failed: %s\n", __FUNCTION__,
+ result->diagnosis ? result->diagnosis : "");
+ }
+
+ if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
+ if (result->statusOk) {
+ if (GdpTaskIsHistoryCacheEnabled(taskCtx)) {
+ GdpTaskHistoryCachePushItem(taskCtx,
+ gPublishState.createTime,
+ gPublishState.topic,
+ gPublishState.token,
+ gPublishState.category,
+ gPublishState.data,
+ gPublishState.dataLen);
+ }
+
+ gPublishState.gdpErr = GDP_ERROR_SUCCESS;
+ } else {
+ gPublishState.gdpErr = GDP_ERROR_INVALID_DATA;
+ }
+
+ GdpSetEvent(gPublishState.eventGetResult);
+ }
+
+ GdpTaskDestroyPacket(taskCtx);
+ if (result->rateLimit > 0) {
+ /*
+ * Updates sendAfter for next send, phase 2 / 2.
+ */
+ taskCtx->sendAfter += (USEC_PER_SECOND / result->rateLimit);
+ }
+ taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+ taskCtx->mode = GDP_TASK_MODE_NONE;
+ taskCtx->state = GDP_TASK_STATE_IDLE;
+ g_debug("%s: Reset mode=%d, state=%d.\n",
+ __FUNCTION__, taskCtx->mode, taskCtx->state);
+}
+
+
+/*
+ ******************************************************************************
+ * GdpJsonIsHistoryRequest --
+ *
+ * Checks if the JSON text represents a history request.
+ *
+ * @param[in] json The JSON text
+ * @param[in] tokens Token array
+ * @param[in] count Token array count
+ * @param[out] request Pointer to history request
+ *
+ * @return TRUE if json represents a history request.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpJsonIsHistoryRequest(const char *json, // IN
+ jsmntok_t *tokens, // IN
+ int count, // IN
+ HistoryRequest *request) // OUT
+{
+ int index;
+ int requiredKeys = GDP_HISTORY_REQUEST_REQUIRED_KEYS;
+ guint64 pastSeconds = 0;
+
+ /*
+ * Loops over all keys of the root object
+ */
+ for (index = 1; index < count; index++) {
+ if (GdpJsonIsTokenOfKey(json, &tokens[index],
+ GDP_HISTORY_REQUEST_PAST_SECONDS)) {
+ requiredKeys--;
+ index++;
+ pastSeconds = g_ascii_strtoull(json + tokens[index].start, NULL, 10);
+ } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+ GDP_HISTORY_REQUEST_ID)) {
+ requiredKeys--;
+ index++;
+ Str_Strncpy(request->id, sizeof request->id,
+ json + tokens[index].start,
+ tokens[index].end - tokens[index].start);
+ }
+ }
+
+ if (requiredKeys == 0) {
+ request->endCacheTime = g_get_monotonic_time();
+ request->beginCacheTime = request->endCacheTime -
+ (gint64)pastSeconds * USEC_PER_SECOND;
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskPushHistoryRequest --
+ *
+ * Pushes new history request to queue and resets history cache pointer.
+ *
+ * @param[in,out] taskCtx The task context
+ * @param[in] request New history request
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskPushHistoryRequest(TaskContext *taskCtx, // IN/OUT
+ HistoryRequest *request) // IN
+{
+ HistoryRequest *requestCopy;
+
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ if (!GdpTaskIsHistoryCacheEnabled(taskCtx)) {
+ g_info("%s: History cache not enabled.\n", __FUNCTION__);
+ return;
+ }
+
+ if (request->beginCacheTime >= request->endCacheTime) {
+ g_info("%s: Invalid history request.\n", __FUNCTION__);
+ return;
+ }
+
+ if (request->beginCacheTime < 0) {
+ request->beginCacheTime = 0;
+ }
+
+ requestCopy = (HistoryRequest *) Util_SafeMalloc(sizeof *request);
+ Util_Memcpy(requestCopy, request, sizeof *request);
+ /*
+ * Note: each request comes with a unique subscription ID.
+ */
+ g_queue_push_head(&taskCtx->requests, requestCopy);
+ /*
+ * Resets history cache pointer.
+ */
+ taskCtx->cache.currentLink = NULL;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessNetwork --
+ *
+ * Processes the network event.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
+{
+ GdpError gdpErr;
+ char buf[GDP_MAX_PACKET_LEN + 1]; // Adds a space for NULL
+ int bufLen = (int) sizeof buf - 1;
+ struct sockaddr_vm srcAddr;
+ jsmn_parser parser;
+ jsmntok_t tokens[16]; // We expect no more than 16 tokens
+ int retVal;
+ Bool isPublishResult;
+ Bool isHistoryRequest;
+ PublishResult result = { 0 };
+ HistoryRequest request = { 0 };
+
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ gdpErr = GdpRecvFrom(gPluginState.sock, buf, &bufLen, &srcAddr);
+ if (gdpErr != GDP_ERROR_SUCCESS || bufLen <= 0) {
+ return;
+ }
+
+ if (srcAddr.svm_cid != VMCI_HOST_CONTEXT_ID) {
+ g_info("%s: Unexpected source svm_cid: %u.\n",
+ __FUNCTION__, srcAddr.svm_cid);
+ return;
+ }
+
+ buf[bufLen] = '\0';
+
+ jsmn_init(&parser);
+ retVal = jsmn_parse(&parser, buf, bufLen,
+ tokens, (unsigned int) ARRAYSIZE(tokens));
+ if (retVal < 0) {
+ g_info("%s: Error %d while parsing JSON:\n%s\n",
+ __FUNCTION__, retVal, buf);
+ return;
+ }
+
+ /*
+ * The top-level element should be an object.
+ */
+ if (retVal < 1 || tokens[0].type != JSMN_OBJECT) {
+ g_info("%s: Invalid JSON:\n%s\n", __FUNCTION__, buf);
+ return;
+ }
+
+ isPublishResult = FALSE;
+ isHistoryRequest = FALSE;
+ if (srcAddr.svm_port == GDPD_RECV_PORT) {
+ isPublishResult = GdpJsonIsPublishResult(buf, tokens,
+ retVal, &result);
+ if (!isPublishResult) {
+ isHistoryRequest = GdpJsonIsHistoryRequest(buf, tokens,
+ retVal, &request);
+ }
+ } else {
+ isHistoryRequest = GdpJsonIsHistoryRequest(buf, tokens,
+ retVal, &request);
+ if (!isHistoryRequest) {
+ isPublishResult = GdpJsonIsPublishResult(buf, tokens,
+ retVal, &result);
+ }
+ }
+
+ if (isPublishResult) {
+ GdpTaskProcessPublishResult(taskCtx, &result);
+ g_free(result.diagnosis);
+ } else if (isHistoryRequest) {
+ GdpTaskPushHistoryRequest(taskCtx, &request);
+ } else {
+ g_info("%s: Unknown JSON:\n%s\n", __FUNCTION__, buf);
+ }
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessPublish --
+ *
+ * Processes the publish event.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessPublish(TaskContext *taskCtx) // IN/OUT
+{
+ GdpError gdpErr;
+
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ ASSERT(taskCtx->mode != GDP_TASK_MODE_PUBLISH);
+
+ if (taskCtx->mode != GDP_TASK_MODE_NONE) {
+ g_debug("%s: Set publish pending.\n", __FUNCTION__);
+ ASSERT(!taskCtx->publishPending);
+ taskCtx->publishPending = TRUE;
+ return;
+ }
+
+ ASSERT(taskCtx->state == GDP_TASK_STATE_IDLE);
+
+ gdpErr = GdpTaskBuildPacket(taskCtx,
+ gPublishState.createTime,
+ gPublishState.topic,
+ gPublishState.token,
+ gPublishState.category,
+ gPublishState.data,
+ gPublishState.dataLen,
+ NULL);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
+ goto fail;
+ }
+
+ if (GdpTaskOkToSend(taskCtx)) {
+ gdpErr = GdpTaskSendPacket(taskCtx);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
+ goto fail;
+ }
+
+ taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1;
+ } else {
+ taskCtx->timeoutAt = taskCtx->sendAfter;
+ taskCtx->state = GDP_TASK_STATE_WAIT_TO_SEND;
+ }
+
+ taskCtx->mode = GDP_TASK_MODE_PUBLISH;
+ g_debug("%s: Updated mode=%d, state=%d.\n",
+ __FUNCTION__, taskCtx->mode, taskCtx->state);
+ return;
+
+fail:
+ gPublishState.gdpErr = gdpErr;
+ GdpSetEvent(gPublishState.eventGetResult);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessTimeout --
+ *
+ * Processes wait timeout.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessTimeout(TaskContext *taskCtx) // IN/OUT
+{
+ GdpError gdpErr;
+
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ ASSERT(taskCtx->mode != GDP_TASK_MODE_NONE &&
+ taskCtx->state != GDP_TASK_STATE_IDLE);
+
+ if (taskCtx->state == GDP_TASK_STATE_WAIT_TO_SEND ||
+ taskCtx->state == GDP_TASK_STATE_WAIT_FOR_RESULT1) {
+ gdpErr = GdpTaskSendPacket(taskCtx);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
+ goto fail;
+ }
+
+ if (taskCtx->state == GDP_TASK_STATE_WAIT_TO_SEND) {
+ taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1;
+ } else {
+ taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT2;
+ }
+
+ g_debug("%s: Updated mode=%d, state=%d.\n",
+ __FUNCTION__, taskCtx->mode, taskCtx->state);
+ } else if (taskCtx->state == GDP_TASK_STATE_WAIT_FOR_RESULT2) {
+ g_warning("%s: Wait for publish result timed out.\n", __FUNCTION__);
+ GdpTaskDestroyPacket(taskCtx);
+ taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+ gdpErr = GDP_ERROR_TIMEOUT;
+ goto fail;
+ } else {
+ NOT_REACHED();
+ }
+
+ return;
+
+fail:
+ if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
+ gPublishState.gdpErr = gdpErr;
+ GdpSetEvent(gPublishState.eventGetResult);
+ }
+
+ taskCtx->state = GDP_TASK_STATE_IDLE;
+ taskCtx->mode = GDP_TASK_MODE_NONE;
+ g_debug("%s: Reset mode=%d, state=%d.\n",
+ __FUNCTION__, taskCtx->mode, taskCtx->state);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskGetTimeout --
+ *
+ * Gets timeout value for the next wait call.
+ *
+ * @param[in] taskCtx The task context
+ *
+ *****************************************************************************
+ */
+
+static int
+GdpTaskGetTimeout(TaskContext *taskCtx) // IN
+{
+ gint64 curTime;
+ gint64 timeout;
+
+ if (taskCtx->timeoutAt == GDP_TIMEOUT_AT_INFINITE) {
+ return GDP_WAIT_INFINITE;
+ }
+
+ curTime = g_get_monotonic_time();
+ if (curTime >= taskCtx->timeoutAt) {
+ return 0;
+ }
+
+ timeout = (taskCtx->timeoutAt - curTime) / USEC_PER_MILLISECOND;
+ return (int)timeout;
+}
+
- } while (TRUE);
+/*
+ *****************************************************************************
+ * GdpTaskProcessEvents --
+ *
+ * Processes task events.
+ *
+ * @param[in,out] taskCtx The task context
+ * @param[in] taskEvent Task event to process
+ *
+ *****************************************************************************
+ */
- return retVal;
+static void
+GdpTaskProcessEvents(TaskContext *taskCtx, // IN/OUT
+ GdpTaskEvent taskEvent) // IN
+{
+ switch (taskEvent) {
+ case GDP_TASK_EVENT_CONFIG:
+ GdpResetEvent(gPluginState.eventConfig);
+ GdpTaskProcessConfigChange(taskCtx);
+ break;
+ case GDP_TASK_EVENT_NETWORK:
+ GdpTaskProcessNetwork(taskCtx);
+ break;
+ case GDP_TASK_EVENT_PUBLISH:
+ GdpResetEvent(gPublishState.eventPublish);
+ GdpTaskProcessPublish(taskCtx);
+ break;
+ case GDP_TASK_EVENT_TIMEOUT:
+ GdpTaskProcessTimeout(taskCtx);
+ break;
+ default:
+ //GDP_TASK_EVENT_NONE
+ //GDP_TASK_EVENT_STOP
+ NOT_REACHED();
+ }
}
/*
******************************************************************************
- * GdpWaitForEvent --
+ * GdpTaskWaitForEvents --
*
- * Waits for the stop event object/fd signalled for vmtoolsd shutdown,
- * network send/receive event ready or timeout.
+ * Waits for the following events or timeout:
+ * The stop event
+ * The config event
+ * The network event
+ * The publish event
*
- * @param[in] netEvent GOS specific network send/receive event flag
- * @param[in] timeout Timeout value in milliseconds,
+ * @param[in] timeout Timeout value in milliseconds,
* negative value means an infinite timeout,
- * zero means no wait
+ * zero means no wait.
+ * @param[out] taskEvent Return which event is signalled
+ * if no error happens.
*
- * @return GDP_ERROR_SUCCESS on specified network event ready.
- * @return Other GdpError codes otherwise.
+ * @return GDP_ERROR_SUCCESS if no error happens.
+ * @return Other GdpError code otherwise.
*
******************************************************************************
*/
static GdpError
-GdpWaitForEvent(int netEvent, int timeout)
+GdpTaskWaitForEvents(int timeout, // IN
+ GdpTaskEvent *taskEvent) // OUT
{
#if defined(_WIN32)
int res;
gint64 startTime;
GdpError retVal;
- ASSERT(netEvent == FD_READ || netEvent == FD_WRITE);
- ASSERT(pluginData.sock != INVALID_SOCKET);
+ ASSERT(Atomic_ReadBool(&gPluginState.started));
/*
- * Reset the send-recv event object.
+ * Resets the network event object.
*/
- WSAResetEvent(pluginData.eventSendRecv);
+ WSAResetEvent(gPluginState.eventNetwork);
/*
- * Associate the send-recv event object with the specified network event
+ * Associates the network event object with FD_READ network event
* in the socket.
*/
- res = WSAEventSelect(pluginData.sock, pluginData.eventSendRecv, netEvent);
+ res = WSAEventSelect(gPluginState.sock, gPluginState.eventNetwork, FD_READ);
if (res != 0) {
- g_info("%s: WSAEventSelect failed: error=%d.\n",
- __FUNCTION__, WSAGetLastError());
- return GDP_ERROR_INTERNAL;
+ g_warning("%s: WSAEventSelect failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
+ return GDP_ERROR_GENERAL;
}
- localTimeout = (DWORD)(timeout >= 0 ? timeout : WSA_INFINITE);
+ retVal = GDP_ERROR_SUCCESS;
+
+ localTimeout = (timeout >= 0 ? (DWORD) timeout : WSA_INFINITE);
if (timeout > 0) {
startTime = g_get_monotonic_time();
} else {
- startTime = 0; // Deal with [-Werror=maybe-uninitialized]
+ startTime = 0; // Deals with [-Werror=maybe-uninitialized]
}
while (TRUE) {
- WSAEVENT eventObjects[] = {pluginData.eventStop,
- pluginData.eventSendRecv};
+ WSAEVENT eventObjects[] = { gPluginState.eventStop,
+ gPluginState.eventConfig,
+ gPluginState.eventNetwork,
+ gPublishState.eventPublish };
DWORD waitRes;
waitRes = WSAWaitForMultipleEvents((DWORD)ARRAYSIZE(eventObjects),
eventObjects,
FALSE, localTimeout, TRUE);
if (waitRes == WSA_WAIT_EVENT_0) {
- /*
- * Main thread has set the stop event object to interrupt
- * pool thread for vmtoolsd shutdown.
- */
- retVal = GDP_ERROR_STOP;
+ *taskEvent = GDP_TASK_EVENT_STOP;
break;
} else if (waitRes == (WSA_WAIT_EVENT_0 + 1)) {
+ *taskEvent = GDP_TASK_EVENT_CONFIG;
+ break;
+ } else if (waitRes == (WSA_WAIT_EVENT_0 + 2)) {
WSANETWORKEVENTS networkEvents;
- res = WSAEnumNetworkEvents(pluginData.sock, NULL, &networkEvents);
+ res = WSAEnumNetworkEvents(gPluginState.sock, NULL, &networkEvents);
if (res != 0) {
- g_info("%s: WSAEnumNetworkEvents failed: error=%d.\n",
- __FUNCTION__, WSAGetLastError());
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: WSAEnumNetworkEvents failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
+ retVal = GDP_ERROR_GENERAL;
break;
}
* since WSAEnumNetworkEvents should have returned WSAENETDOWN
* if the error condition exists.
*/
- if (networkEvents.lNetworkEvents & netEvent) {
- retVal = GDP_ERROR_SUCCESS;
+ if (networkEvents.lNetworkEvents & FD_READ) {
+ *taskEvent = GDP_TASK_EVENT_NETWORK;
} else { // Not expected
- g_info("%s: Unexpected network event from WSAEnumNetworkEvents.\n",
- __FUNCTION__);
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: Unexpected network event.\n",
+ __FUNCTION__);
+ retVal = GDP_ERROR_GENERAL;
}
+ break;
+ } else if (waitRes == (WSA_WAIT_EVENT_0 + 3)) {
+ *taskEvent = GDP_TASK_EVENT_PUBLISH;
break;
} else if (waitRes == WSA_WAIT_IO_COMPLETION) {
gint64 curTime;
}
curTime = g_get_monotonic_time();
- passedTime = (curTime - startTime) / 1000;
+ passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
if (passedTime >= localTimeout) {
- retVal = GDP_ERROR_TIMEOUT;
+ *taskEvent = GDP_TASK_EVENT_TIMEOUT;
break;
}
localTimeout -= (DWORD)passedTime;
continue;
} else if (waitRes == WSA_WAIT_TIMEOUT) {
- retVal = GDP_ERROR_TIMEOUT;
+ *taskEvent = GDP_TASK_EVENT_TIMEOUT;
break;
} else { // WSA_WAIT_FAILED
- g_info("%s: WSAWaitForMultipleEvents failed: error=%d.\n",
- __FUNCTION__, WSAGetLastError());
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: WSAWaitForMultipleEvents failed: error=%d.\n",
+ __FUNCTION__, WSAGetLastError());
+ retVal = GDP_ERROR_GENERAL;
break;
}
}
/*
- * Cancel the association.
+ * Cancels the association.
*/
- WSAEventSelect(pluginData.sock, NULL, 0);
+ WSAEventSelect(gPluginState.sock, NULL, 0);
return retVal;
#else
gint64 startTime;
- GdpError retVal;
+ GdpError retVal = GDP_ERROR_SUCCESS;
- ASSERT(netEvent == POLLIN || netEvent == POLLOUT);
- ASSERT(pluginData.sock != INVALID_SOCKET);
+ ASSERT(Atomic_ReadBool(&gPluginState.started));
if (timeout > 0) {
startTime = g_get_monotonic_time();
} else {
- startTime = 0; // Deal with [-Werror=maybe-uninitialized]
+ startTime = 0; // Deals with [-Werror=maybe-uninitialized]
}
while (TRUE) {
- struct pollfd fds[2];
+ struct pollfd fds[4];
int res;
- fds[0].fd = pluginData.eventStop;
+ fds[0].fd = gPluginState.eventStop;
fds[0].events = POLLIN;
fds[0].revents = 0;
- fds[1].fd = pluginData.sock;
- fds[1].events = (short)netEvent;
+ fds[1].fd = gPluginState.eventConfig;
+ fds[1].events = POLLIN;
fds[1].revents = 0;
+ fds[2].fd = gPluginState.sock;
+ fds[2].events = POLLIN;
+ fds[2].revents = 0;
+ fds[3].fd = gPublishState.eventPublish;
+ fds[3].events = POLLIN;
+ fds[3].revents = 0;
res = poll(fds, ARRAYSIZE(fds), timeout);
if (res > 0) {
if (fds[0].revents & POLLIN) {
- /*
- * Check comments in _WIN32.
- */
- retVal = GDP_ERROR_STOP;
- } else if (fds[1].revents & netEvent) {
- retVal = GDP_ERROR_SUCCESS;
+ *taskEvent = GDP_TASK_EVENT_STOP;
+ } else if (fds[1].revents & POLLIN) {
+ *taskEvent = GDP_TASK_EVENT_CONFIG;
+ } else if (fds[2].revents & POLLIN) {
+ *taskEvent = GDP_TASK_EVENT_NETWORK;
+ } else if (fds[3].revents & POLLIN) {
+ *taskEvent = GDP_TASK_EVENT_PUBLISH;
} else { // Not expected
- g_info("%s: Unexpected event from poll.\n", __FUNCTION__);
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: Unexpected event.\n", __FUNCTION__);
+ retVal = GDP_ERROR_GENERAL;
}
break;
}
curTime = g_get_monotonic_time();
- passedTime = (curTime - startTime) / 1000;
- if (passedTime >= (gint64)timeout) {
- retVal = GDP_ERROR_TIMEOUT;
+ passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
+ if (passedTime >= timeout) {
+ *taskEvent = GDP_TASK_EVENT_TIMEOUT;
break;
}
startTime = curTime;
- timeout -= (int)passedTime;
+ timeout -= (int) passedTime;
continue;
} else {
- g_info("%s: poll failed: error=%d.\n", __FUNCTION__, err);
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: poll failed: error=%d.\n", __FUNCTION__, err);
+ retVal = GDP_ERROR_GENERAL;
break;
}
} else if (res == 0) {
- retVal = GDP_ERROR_TIMEOUT;
+ *taskEvent = GDP_TASK_EVENT_TIMEOUT;
break;
} else {
- g_info("%s: Unexpected poll return: %d.\n", __FUNCTION__, res);
- retVal = GDP_ERROR_INTERNAL;
+ g_warning("%s: Unexpected poll return: %d.\n", __FUNCTION__, res);
+ retVal = GDP_ERROR_GENERAL;
break;
}
}
/*
- ******************************************************************************
- * GdpSend --
+ *****************************************************************************
+ * GdpTaskCtxInit --
*
- * Sends guest data to host side gdp daemon.
+ * Initializes the task context states and resources.
*
- * @param[in] buf Send data buffer pointer
- * @param[in] len Send data buffer length
- * @param[in] timeout Timeout value in milliseconds,
- * negative value means an infinite timeout,
- * zero means no wait
+ * @param[out] taskCtx The task context
*
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ *****************************************************************************
+ */
+
+static void
+GdpTaskCtxInit(TaskContext *taskCtx) // OUT
+{
+ taskCtx->mode = GDP_TASK_MODE_NONE;
+ taskCtx->state = GDP_TASK_STATE_IDLE;
+ taskCtx->publishPending = FALSE;
+
+ g_queue_init(&taskCtx->cache.queue);
+ taskCtx->cache.sizeLimit = GdpGetHistoryCacheSizeLimit();
+ taskCtx->cache.size = 0;
+ taskCtx->cache.currentLink = NULL;
+
+ g_queue_init(&taskCtx->requests);
+
+ taskCtx->sequence = 0;
+ taskCtx->packet = NULL;
+ taskCtx->packetLen = 0;
+
+ taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+ taskCtx->sendAfter = GDP_SEND_AFTER_ANY_TIME;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskCtxDestroy --
*
- ******************************************************************************
+ * Destroys the task context resources.
+ *
+ * @param[in,out] taskCtx The task context
+ *
+ *****************************************************************************
*/
-static GdpError
-GdpSend(const char *buf, // IN
- int len, // IN
- int timeout) // IN
+static void
+GdpTaskCtxDestroy(TaskContext *taskCtx) // IN/OUT
{
- GdpError retVal;
+ GdpTaskClearHistoryCacheQueue(taskCtx);
+ GdpTaskClearHistoryRequestQueue(taskCtx);
+ GdpTaskDestroyPacket(taskCtx);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpThreadTask --
+ *
+ * The gdp task thread routine.
+ *
+ * @param[in] ctx The application context
+ * @param[in] data Data pointer, not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpThreadTask(ToolsAppCtx *ctx, // IN
+ void *data) // IN
+{
+ TaskContext taskCtx;
- ASSERT(buf != NULL && len > 0);
- ASSERT(pluginData.sock != INVALID_SOCKET);
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ GdpTaskCtxInit(&taskCtx);
do {
- long res;
- int sockErr;
+ int timeout;
+ GdpError gdpErr;
+ GdpTaskEvent taskEvent = GDP_TASK_EVENT_NONE;
- res = send(pluginData.sock, buf, len, 0);
- if (res >= 0) {
- retVal = GDP_ERROR_SUCCESS;
- break;
+ /*
+ * Part 1 inside the loop:
+ * Checks and processes one pending event at idle state.
+ */
+
+ if (taskCtx.mode == GDP_TASK_MODE_NONE) {
+ ASSERT(taskCtx.state == GDP_TASK_STATE_IDLE);
+ ASSERT(taskCtx.timeoutAt == GDP_TIMEOUT_AT_INFINITE);
+
+ if (taskCtx.publishPending) { // Higher priority
+ taskCtx.publishPending = FALSE;
+ GdpTaskProcessPublish(&taskCtx);
+ } else if (!g_queue_is_empty(&taskCtx.requests)) {
+ /*
+ * History request pending.
+ *
+ * GdpTaskPublishHistory clears history request queue if it fails
+ * to kick off the state machine.
+ */
+ GdpTaskPublishHistory(&taskCtx);
+ }
+ }
+
+ /*
+ * Part 2 inside the loop:
+ * Gets and processes one event at a time, then loops back to part 1
+ * except for the stop event.
+ */
+
+ timeout = GdpTaskGetTimeout(&taskCtx);
+ if (timeout == 0) {
+ GdpTaskProcessEvents(&taskCtx, GDP_TASK_EVENT_TIMEOUT);
+ continue;
}
- sockErr = GetSockErr();
- if (sockErr == SYSERR_EINTR) {
+ /*
+ * timeout == GDP_WAIT_INFINITE means taskCtx.mode == GDP_TASK_MODE_NONE
+ * and taskCtx.state == GDP_TASK_STATE_IDLE. There should be no pending
+ * publish event or history request in this case.
+ */
+ ASSERT(timeout != GDP_WAIT_INFINITE ||
+ (!taskCtx.publishPending &&
+ g_queue_is_empty(&taskCtx.requests)));
+
+ gdpErr = GdpTaskWaitForEvents(timeout, &taskEvent);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
continue;
- } else if (SYSERR_WOULDBLOCK(sockErr)) {
+ }
+
+ if (taskEvent == GDP_TASK_EVENT_STOP) {
/*
- * Datagram send is not buffered, if host daemon is not running,
- * send returns error EHOSTUNREACH. In theory, this case should
- * not happen, we just follow standard async socket programming
- * paradigm here.
+ * In case the publish event comes at the same time,
+ * only the stop event is handled, so do not check
+ * (taskCtx.mode == GDP_TASK_MODE_PUBLISH) here.
*/
- GdpError err;
+ gPublishState.gdpErr = GDP_ERROR_STOP;
+ GdpSetEvent(gPublishState.eventGetResult);
+ break;
+ }
- g_info("%s: Gdp send would block.\n", __FUNCTION__);
+ GdpTaskProcessEvents(&taskCtx, taskEvent);
- err = GdpWaitForEvent(SOCK_WRITE, timeout);
- if (err == GDP_ERROR_SUCCESS) {
- continue;
- }
+ } while (!Atomic_ReadBool(&gPluginState.stopped));
- retVal = err;
- } else if (sockErr == SYSERR_EHOSTUNREACH) {
- g_info("%s: send failed: host daemon unreachable.\n", __FUNCTION__);
- retVal = GDP_ERROR_UNREACH;
- } else if (sockErr == SYSERR_EMSGSIZE) {
- g_info("%s: send failed: message too large.\n", __FUNCTION__);
- retVal = GDP_ERROR_SEND_SIZE;
- } else {
- g_info("%s: send failed: error=%d.\n", __FUNCTION__, sockErr);
- retVal = GDP_ERROR_INTERNAL;
- }
+ GdpTaskCtxDestroy(&taskCtx);
- break;
+ g_debug("%s: Exiting ...\n", __FUNCTION__);
+}
- } while (TRUE);
- return retVal;
+/*
+ *****************************************************************************
+ * GdpThreadInterrupt --
+ *
+ * Interrupts the gdp task thread to exit.
+ *
+ * @param[in] ctx The application context
+ * @param[in] data Data pointer, not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpThreadInterrupt(ToolsAppCtx *ctx, // IN
+ void *data) // IN
+{
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+ ASSERT(Atomic_ReadBool(&gPluginState.started));
+ Atomic_WriteBool(&gPluginState.stopped, TRUE);
+ GdpSetEvent(gPluginState.eventStop);
}
/*
******************************************************************************
- * GdpRecv --
- *
- * Receives reply from host side gdp daemon.
+ * GdpInit --
*
- * @param[out] buf Receive buffer pointer
- * @param[in,out] len Receive buffer length on input,
- * reply data length on output
- * @param[in] timeout Timeout value in milliseconds,
- * negative value means an infinite timeout,
- * zero means no wait
+ * Initializes plugin state data and resources.
*
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @param[in] ctx The application context
*
******************************************************************************
*/
-static GdpError
-GdpRecv(char *buf, // OUT
- int *len, // IN/OUT
- int timeout) // IN
+static void
+GdpInit(ToolsAppCtx *ctx) // IN
{
- GdpError retVal;
-
- ASSERT(buf != NULL && len != NULL && *len > 0);
- ASSERT(pluginData.sock != INVALID_SOCKET);
+ gPluginState.ctx = ctx;
+ Atomic_WriteBool(&gPluginState.started, FALSE);
- do {
- long res;
- int sockErr;
-
- res = recv(pluginData.sock, buf, *len, 0);
- if (res >= 0) {
- *len = (int)res;
- retVal = GDP_ERROR_SUCCESS;
- break;
- }
+#if defined(_WIN32)
+ gPluginState.wsaStarted = FALSE;
+#endif
- sockErr = GetSockErr();
- if (sockErr == SYSERR_EINTR) {
- continue;
- } else if (SYSERR_WOULDBLOCK(sockErr)) {
- GdpError err = GdpWaitForEvent(SOCK_READ, timeout);
- if (err == GDP_ERROR_SUCCESS) {
- continue;
- }
+ gPluginState.vmciFd = -1;
+ gPluginState.vmciFamily = -1;
+ gPluginState.sock = INVALID_SOCKET;
+#if defined(_WIN32)
+ gPluginState.eventNetwork = GDP_INVALID_EVENT;
+#endif
- retVal = err;
- } else if (sockErr == SYSERR_EMSGSIZE) {
- g_info("%s: recv failed: buffer size too small.\n", __FUNCTION__);
- retVal = GDP_ERROR_RECV_SIZE;
- } else {
- g_info("%s: recv failed: error=%d.\n", __FUNCTION__, sockErr);
- retVal = GDP_ERROR_INTERNAL;
- }
+ gPluginState.eventStop = GDP_INVALID_EVENT;
+ Atomic_WriteBool(&gPluginState.stopped, FALSE);
- break;
+ gPluginState.eventConfig = GDP_INVALID_EVENT;
- } while (TRUE);
+ gPublishState.eventPublish = GDP_INVALID_EVENT;
- return retVal;
+ gPublishState.eventGetResult = GDP_INVALID_EVENT;
}
/*
******************************************************************************
- * GdpPublish --
+ * GdpStart --
*
- * Publishes guest data to host side gdp daemon.
- *
- * @param[in] msg Buffer containing guest data to publish
- * @param[in] msgLen Guest data length
- * @param[out,optional] reply Buffer to receive reply from gdp daemon
- * @param[in,out,optional] replyLen NULL when param reply is NULL, otherwise:
- * reply buffer length on input,
- * reply data length on output
+ * Creates plugin resources and starts the task thread for data publishing.
*
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @return TRUE on success.
+ * @return FALSE otherwise.
*
******************************************************************************
*/
-static GdpError
-GdpPublish(const char *msg, // IN
- int msgLen, // IN
- char *reply, // OUT, OPTIONAL
- int *replyLen) // IN/OUT, OPTIONAL
+static Bool
+GdpStart(void)
{
- static GMutex mutex;
-
- GdpError err;
- char altRecvBuf[GDP_SEND_RECV_BUF_LEN]; // Enough for maximum size datagram
- int altRecvBufLen = (int) sizeof altRecvBuf;
- char *recvBuf;
- int *recvBufLen;
+ Bool retVal = FALSE;
g_debug("%s: Entering ...\n", __FUNCTION__);
- ASSERT((reply == NULL && replyLen == NULL) ||
- (reply != NULL && replyLen != NULL && *replyLen > 0));
+ ASSERT(!Atomic_ReadBool(&gPluginState.started));
- g_mutex_lock(&mutex);
+#if defined(_WIN32)
+ {
+ WSADATA wsaData;
+ int res = WSAStartup(MAKEWORD(2, 2), &wsaData);
+ if (res != 0) {
+ g_critical("%s: WSAStartup failed: error=%d.\n",
+ __FUNCTION__, res);
+ return FALSE;
+ }
- if (Atomic_ReadBool(&pluginData.stopped)) {
- /*
- * Main thread has interrupted pool thread for vmtoolsd shutdown.
- */
- err = GDP_ERROR_STOP;
+ gPluginState.wsaStarted = TRUE;
+ }
+#endif
+
+ gPluginState.vmciFamily = VMCISock_GetAFValueFd(&gPluginState.vmciFd);
+ if (gPluginState.vmciFamily == -1) {
+ g_critical("%s: Failed to get vSocket address family value.\n",
+ __FUNCTION__);
goto exit;
}
- if (pluginData.sock == INVALID_SOCKET && !GdpCreateSocket()) {
- err = GDP_ERROR_INTERNAL;
+ if (!GdpCreateSocket()) {
+ g_critical("%s: Failed to create VMCI datagram socket.\n",
+ __FUNCTION__);
goto exit;
}
- err = GdpEmptyRecvQueue();
- if (err != GDP_ERROR_SUCCESS) {
+#if defined(_WIN32)
+ gPluginState.eventNetwork = GdpCreateEvent();
+ if (gPluginState.eventNetwork == GDP_INVALID_EVENT) {
+ g_critical("%s: GdpCreateEvent for network failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
goto exit;
}
+#endif
- err = GdpSend(msg, msgLen,
- GDP_SEND_TIMEOUT); // Should not time out in theory
- if (err != GDP_ERROR_SUCCESS) {
- g_info("%s: GdpSend failed: %s.\n", __FUNCTION__, gdpErrMsgs[err]);
+ gPluginState.eventStop = GdpCreateEvent();
+ if (gPluginState.eventStop == GDP_INVALID_EVENT) {
+ g_critical("%s: GdpCreateEvent for stop failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
goto exit;
}
- if (reply != NULL) {
- recvBuf = reply;
- recvBufLen = replyLen;
- } else {
- recvBuf = altRecvBuf;
- recvBufLen = &altRecvBufLen;
+ gPluginState.eventConfig = GdpCreateEvent();
+ if (gPluginState.eventConfig == GDP_INVALID_EVENT) {
+ g_critical("%s: GdpCreateEvent for config failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
+ goto exit;
+ }
+
+ gPublishState.eventPublish = GdpCreateEvent();
+ if (gPublishState.eventPublish == GDP_INVALID_EVENT) {
+ g_critical("%s: GdpCreateEvent for publish failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
+ goto exit;
}
- err = GdpRecv(recvBuf, recvBufLen, GDP_RECV_TIMEOUT);
- if (err != GDP_ERROR_SUCCESS) {
- g_info("%s: GdpRecv failed: %s.\n", __FUNCTION__, gdpErrMsgs[err]);
+ gPublishState.eventGetResult = GdpCreateEvent();
+ if (gPublishState.eventGetResult == GDP_INVALID_EVENT) {
+ g_critical("%s: GdpCreateEvent for get-result failed: error=%d.\n",
+ __FUNCTION__, GetSysErr());
+ goto exit;
+ }
+
+ if (!ToolsCorePool_StartThread(gPluginState.ctx, "GdpThread",
+ GdpThreadTask,
+ GdpThreadInterrupt,
+ NULL, NULL)) {
+ g_critical("%s: Failed to start the gdp task thread.\n", __FUNCTION__);
+ goto exit;
}
+ Atomic_WriteBool(&gPluginState.started, TRUE);
+ retVal = TRUE;
+
exit:
- if (err == GDP_ERROR_INTERNAL) {
- /*
- * No need to close and recreate socket for these errors:
- *
- * GDP_ERROR_UNREACH
- * GDP_ERROR_TIMEOUT
- * GDP_ERROR_SEND_SIZE
- * GDP_ERROR_RECV_SIZE
- */
- GdpCloseSocket();
+ if (!retVal) {
+ GdpDestroy();
+ }
+
+ return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpDestroy --
+ *
+ * Destroys plugin resources.
+ *
+ ******************************************************************************
+ */
+
+static void
+GdpDestroy(void)
+{
+ g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ GdpCloseSocket();
+
+ if (gPluginState.vmciFd != -1) {
+ VMCISock_ReleaseAFValueFd(gPluginState.vmciFd);
+ gPluginState.vmciFd = -1;
}
- g_mutex_unlock(&mutex);
+#if defined(_WIN32)
+ GdpCloseEvent(&gPluginState.eventNetwork);
+#endif
+
+ GdpCloseEvent(&gPluginState.eventStop);
+
+ GdpCloseEvent(&gPluginState.eventConfig);
+
+ GdpCloseEvent(&gPublishState.eventPublish);
- g_debug("%s: Return: %s.\n", __FUNCTION__, gdpErrMsgs[err]);
+ GdpCloseEvent(&gPublishState.eventGetResult);
- return err;
+#if defined(_WIN32)
+ if (gPluginState.wsaStarted) {
+ WSACleanup();
+ gPluginState.wsaStarted = FALSE;
+ }
+#endif
}
/*
******************************************************************************
- * GdpStop --
+ * GdpPublish --
+ *
+ * Publishes guest data to host side gdp daemon.
*
- * Stops guest data publishing for vmtoolsd shutdown, called by main thread.
+ * @param[in] createTime UTC timestamp, in number of micro-
+ * seconds since January 1, 1970 UTC.
+ * @param[in] topic Topic
+ * @param[in,optional] token Token, can be NULL
+ * @param[in,optional] category Category, can be NULL that defaults to
+ * "application"
+ * @param[in] data Buffer containing data to publish
+ * @param[in] dataLen Buffer length
*
- * @return GDP_ERROR_SUCCESS.
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
*
******************************************************************************
*/
static GdpError
-GdpStop(void)
+GdpPublish(gint64 createTime, // IN
+ const gchar *topic, // IN
+ const gchar *token, // IN, OPTIONAL
+ const gchar *category, // IN, OPTIONAL
+ const gchar *data, // IN
+ guint32 dataLen) // IN
{
+ GdpError gdpErr;
+
g_debug("%s: Entering ...\n", __FUNCTION__);
- Atomic_WriteBool(&pluginData.stopped, TRUE);
- GdpSetStopEvent();
- return GDP_ERROR_SUCCESS;
+
+ if (topic == NULL || *topic == '\0') {
+ g_info("%s: Missing topic.\n", __FUNCTION__);
+ return GDP_ERROR_INVALID_DATA;
+ }
+
+ if (data == NULL || dataLen == 0) {
+ g_info("%s: Topic '%s' has no data.\n", __FUNCTION__, topic);
+ return GDP_ERROR_INVALID_DATA;
+ }
+
+ if (token != NULL && *token == '\0') {
+ token = NULL;
+ }
+
+ if (category != NULL && *category == '\0') {
+ category = NULL;
+ }
+
+ g_mutex_lock(&gPublishState.mutex);
+
+ if (Atomic_ReadBool(&gPluginState.stopped)) {
+ gdpErr = GDP_ERROR_STOP;
+ goto exit;
+ }
+
+ if (!Atomic_ReadBool(&gPluginState.started) &&
+ !GdpStart()) {
+ gdpErr = GDP_ERROR_GENERAL;
+ goto exit;
+ }
+
+ gPublishState.createTime = createTime;
+ gPublishState.topic = topic;
+ gPublishState.token = token;
+ gPublishState.category = category;
+ gPublishState.data = data;
+ gPublishState.dataLen = dataLen;
+
+ GdpSetEvent(gPublishState.eventPublish);
+
+ do {
+ gdpErr = GdpWaitForEvent(gPublishState.eventGetResult,
+ GDP_WAIT_INFINITE);
+ if (gdpErr == GDP_ERROR_SUCCESS) {
+ gdpErr = gPublishState.gdpErr;
+ break;
+ }
+ } while (!Atomic_ReadBool(&gPluginState.stopped));
+
+ GdpResetEvent(gPublishState.eventGetResult);
+
+exit:
+ g_mutex_unlock(&gPublishState.mutex);
+ g_debug("%s: Exiting with gdp error: %s.\n",
+ __FUNCTION__, gdpErrMsgs[gdpErr]);
+ return gdpErr;
+}
+
+
+/*
+ *-----------------------------------------------------------------------------
+ * GdpConfReload --
+ *
+ * Handles gdp config change.
+ *
+ * @param[in] src The source object, unused
+ * @param[in] ctx The application context
+ * @param[in] data Unused
+ *
+ *-----------------------------------------------------------------------------
+ */
+
+static void
+GdpConfReload(gpointer src, // IN
+ ToolsAppCtx *ctx, // IN
+ gpointer data) // IN
+{
+ if (!Atomic_ReadBool(&gPluginState.started)) {
+ return;
+ }
+
+ GdpSetEvent(gPluginState.eventConfig);
}
*
* Cleans up on shutdown.
*
- * @param[in] src The source object, unused
- * @param[in] ctx The application context
- * @param[in] data Unused
+ * @param[in] src The source object, unused
+ * @param[in] ctx The application context
+ * @param[in] data Unused
*
******************************************************************************
*/
gpointer data) // IN
{
g_debug("%s: Entering ...\n", __FUNCTION__);
+ ASSERT(!Atomic_ReadBool(&gPluginState.started) ||
+ Atomic_ReadBool(&gPluginState.stopped));
g_object_set(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, NULL, NULL);
GdpDestroy();
}
*
* Plugin entry point. Initializes internal plugin state.
*
- * @param[in] ctx The application context
+ * @param[in] ctx The application context
*
* @return The registration data.
*
ToolsOnLoad(ToolsAppCtx *ctx) // IN
{
/*
- * Return NULL to disable the plugin if not running in vmsvc daemon.
+ * Returns NULL to disable the plugin if not running in vmsvc daemon.
*/
if (!TOOLS_IS_MAIN_SERVICE(ctx)) {
g_info("%s: Not running in vmsvc daemon: container name='%s'.\n",
}
/*
- * Return NULL to disable the plugin if not running in a VMware VM.
+ * Returns NULL to disable the plugin if not running in a VMware VM.
*/
if (!ctx->isVMware) {
g_info("%s: Not running in a VMware VM.\n", __FUNCTION__);
}
/*
- * Return NULL to disable the plugin if VM is not running on ESX host.
+ * Returns NULL to disable the plugin if VM is not running on ESX host.
*/
{
uint32 vmxVersion = 0;
}
}
- if (!GdpInit(ctx)) {
- g_info("%s: Failed to init plugin.\n", __FUNCTION__);
- return NULL;
- }
+ GdpInit(ctx);
{
- static ToolsPluginSvcGdp svcGdp = { GdpPublish, GdpStop };
+ static ToolsPluginSvcGdp svcGdp = { GdpPublish };
static ToolsPluginData regData = { "gdp", NULL, NULL, NULL };
ToolsServiceProperty propGdp = { TOOLS_PLUGIN_SVC_PROP_GDP };
ToolsPluginSignalCb sigs[] = {
+ { TOOLS_CORE_SIG_CONF_RELOAD, GdpConfReload, NULL },
{ TOOLS_CORE_SIG_SHUTDOWN, GdpShutdown, NULL },
};
ToolsAppReg regs[] = {