]> git.ipfire.org Git - thirdparty/open-vm-tools.git/commitdiff
Tools gdp plugin updates.
authorJohn Wolfe <jwolfe@vmware.com>
Thu, 4 Mar 2021 21:48:46 +0000 (13:48 -0800)
committerJohn Wolfe <jwolfe@vmware.com>
Thu, 4 Mar 2021 21:48:46 +0000 (13:48 -0800)
open-vm-tools/configure.ac
open-vm-tools/lib/Makefile.am
open-vm-tools/lib/include/conf.h
open-vm-tools/lib/include/jsmn.h [new file with mode: 0644]
open-vm-tools/lib/include/vmware/tools/gdp.h
open-vm-tools/lib/jsmn/LICENSE [new file with mode: 0644]
open-vm-tools/lib/jsmn/Makefile.am [new file with mode: 0644]
open-vm-tools/lib/jsmn/README [new file with mode: 0644]
open-vm-tools/lib/jsmn/jsmn.c [new file with mode: 0644]
open-vm-tools/libvmtools/Makefile.am
open-vm-tools/services/plugins/gdp/gdpPlugin.c

index 901935f99ab0d69fc9f7ef4d63a26ff9923dab57..d085933462be1b0091205880be5eb0831872975f 100644 (file)
@@ -1632,6 +1632,7 @@ AC_CONFIG_FILES([                      \
    lib/rpcVmx/Makefile                 \
    lib/slashProc/Makefile              \
    lib/string/Makefile                 \
+   lib/jsmn/Makefile                   \
    lib/stubs/Makefile                  \
    lib/syncDriver/Makefile             \
    lib/system/Makefile                 \
index dee38ed79197a173247009dcc97f0a2f1dbbdd9e..2c7661715df8a7d09f2f3c261bea80cef8328179 100644 (file)
@@ -1,5 +1,5 @@
 ################################################################################
-### Copyright (c) 2007-2016,2020 VMware, Inc.  All rights reserved.
+### Copyright (c) 2007-2016,2020-2021 VMware, Inc.  All rights reserved.
 ###
 ### This program is free software; you can redistribute it and/or modify
 ### it under the terms of version 2 of the GNU General Public License as
@@ -66,6 +66,7 @@ if USE_SLASH_PROC
    SUBDIRS += slashProc
 endif
 SUBDIRS += string
+SUBDIRS += jsmn
 SUBDIRS += stubs
 SUBDIRS += syncDriver
 SUBDIRS += system
index 1a39799404e059066abdffff3bd88a8025a75451..1f98913953d3ded8ac6f9daf4baaceb9ce0859e0 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2002-2020 VMware, Inc. All rights reserved.
+ * Copyright (C) 2002-2021 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
  * END deviceHelper goodies.
  ******************************************************************************
  */
+
+/*
+ ******************************************************************************
+ * BEGIN gdp plugin goodies.
+ */
+
+/**
+ * Defines the string used for the gdp config file group.
+ */
+#define CONFGROUPNAME_GDP "gdp"
+
+/**
+ * Defines a custom history cache size (in bytes).
+ *
+ * @note Illegal values result in a @c g_warning and fallback to the default
+ * cache size 4194304.
+ *
+ * @param int   User-defined cache size within [1048576, 16777216].
+ *              Set to 0 to disable caching.
+ */
+#define CONFNAME_GDP_CACHE_SIZE "cacheSize"
+
+/*
+ * END gdp goodies.
+ ******************************************************************************
+ */
+
 #endif /* __CONF_H__ */
diff --git a/open-vm-tools/lib/include/jsmn.h b/open-vm-tools/lib/include/jsmn.h
new file mode 100644 (file)
index 0000000..8daeb51
--- /dev/null
@@ -0,0 +1,94 @@
+/*********************************************************
+ * Copyright (C) 2019 VMware, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation version 2.1 and no later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the Lesser GNU General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA.
+ *
+ *********************************************************/
+
+#ifndef __JSMN_H_
+#define __JSMN_H_
+
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * JSON type identifier. Basic types are:
+ *    o Object
+ *    o Array
+ *    o String
+ *    o Other primitive: number, boolean (true/false) or null
+ */
+typedef enum {
+   JSMN_UNDEFINED = 0,
+   JSMN_OBJECT = 1,
+   JSMN_ARRAY = 2,
+   JSMN_STRING = 3,
+   JSMN_PRIMITIVE = 4
+} jsmntype_t;
+
+enum jsmnerr {
+   /* Not enough tokens were provided */
+   JSMN_ERROR_NOMEM = -1,
+   /* Invalid character inside JSON string */
+   JSMN_ERROR_INVAL = -2,
+   /* The string is not a full JSON packet, more bytes expected */
+   JSMN_ERROR_PART = -3
+};
+
+/**
+ * JSON token description.
+ * type     type (object, array, string etc.)
+ * start start position in JSON data string
+ * end      end position in JSON data string
+ */
+typedef struct {
+   jsmntype_t type;
+   int start;
+   int end;
+   int size;
+#ifdef JSMN_PARENT_LINKS
+   int parent;
+#endif
+} jsmntok_t;
+
+/**
+ * JSON parser. Contains an array of token blocks available. Also stores
+ * the string being parsed now and current position in that string
+ */
+typedef struct {
+   unsigned int pos; /* offset in the JSON string */
+   unsigned int toknext; /* next token to allocate */
+   int toksuper; /* superior token node, e.g parent object or array */
+} jsmn_parser;
+
+/**
+ * Create JSON parser over an array of tokens
+ */
+void jsmn_init(jsmn_parser *parser);
+
+/**
+ * Run JSON parser. It parses a JSON data string into and array of tokens, each describing
+ * a single JSON object.
+ */
+int jsmn_parse(jsmn_parser *parser, const char *js, size_t len,
+      jsmntok_t *tokens, unsigned int num_tokens);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* __JSMN_H_ */
index 6f3996ed3e7e2d7db17bd85d1a96f4823cc7377b..f7f5da925acebd66eed08c0920f4d55c3235a2e7 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2020 VMware, Inc. All rights reserved.
+ * Copyright (C) 2020-2021 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
  * 17 * 4096 - Maximum VMCI datagram size
  *        24 - VMCI datagram header size
  */
-#define GDP_MAX_USER_DATA_LEN (17 * 4096 - 24)
-#define GDP_SEND_RECV_BUF_LEN (17 * 4096)
+#define GDP_MAX_PACKET_LEN (17 * 4096 - 24)
+
+/*
+ * Limit GDP packet JSON base64 key value size to (16 * 4096) bytes, then
+ * the rest JSON content will have (4096 - 24) bytes available.
+ *
+ * Base64 (16 * 4096) bytes are (12 * 4096) bytes before encoding.
+ */
+#define GDP_USER_DATA_LEN (12 * 4096)
 
 /*
  * Property name of the gdp plugin service in the tools
 #define GDP_ERR_LIST                                      \
    GDP_ERR_ITEM(GDP_ERROR_SUCCESS = 0,                    \
                 "No error")                               \
-   GDP_ERR_ITEM(GDP_ERROR_INTERNAL,                       \
-                "Internal system error")                  \
+   GDP_ERR_ITEM(GDP_ERROR_INVALID_DATA,                   \
+                "Invalid data")                           \
+   GDP_ERR_ITEM(GDP_ERROR_DATA_SIZE,                      \
+                "Data size too large")                    \
+   GDP_ERR_ITEM(GDP_ERROR_GENERAL,                        \
+                "General error")                          \
    GDP_ERR_ITEM(GDP_ERROR_STOP,                           \
                 "Stopped for vmtoolsd shutdown")          \
    GDP_ERR_ITEM(GDP_ERROR_UNREACH,                        \
                 "Host daemon unreachable")                \
    GDP_ERR_ITEM(GDP_ERROR_TIMEOUT,                        \
-                "Operation timed out")                    \
-   GDP_ERR_ITEM(GDP_ERROR_SEND_SIZE,                      \
-                "Message to host too large")              \
-   GDP_ERR_ITEM(GDP_ERROR_RECV_SIZE,                      \
-                "Receive buffer too small")
+                "Operation timed out")
 
 /*
  * GdpError codes.
@@ -78,11 +85,12 @@ typedef enum GdpError {
  * TOOLS_PLUGIN_SVC_PROP_GDP property.
  */
 typedef struct ToolsPluginSvcGdp {
-   GdpError (*publish)(const char *msg,
-                       int msgLen,
-                       char *reply,
-                       int *replyLen);
-   GdpError (*stop)(void);
+   GdpError (*publish)(gint64 createTime,
+                       const gchar *topic,
+                       const gchar *token,
+                       const gchar *category,
+                       const gchar *data,
+                       guint32 dataLen);
 } ToolsPluginSvcGdp;
 
 
@@ -96,64 +104,38 @@ typedef struct ToolsPluginSvcGdp {
  * pool threads started by ToolsCorePool_StartThread. Do not call the function
  * in ToolsOnLoad nor in/after TOOLS_CORE_SIG_SHUTDOWN handler.
  *
- * @param[in]              ctx       The app context
- * @param[in]              msg       Buffer containing guest data to publish
- * @param[in]              msgLen    Guest data length
- * @param[out,optional]    reply     Buffer to receive reply from gdp daemon
- * @param[in,out,optional] replyLen  NULL when param reply is NULL, otherwise:
- *                                   reply buffer length on input,
- *                                   reply length on output
+ * @param[in]          ctx         The application context
+ * @param[in]          createTime  UTC timestamp, in number of micro-
+ *                                 seconds since January 1, 1970 UTC.
+ * @param[in]          topic       Topic
+ * @param[in,optional] token       Token, can be NULL
+ * @param[in,optional] category    Category, can be NULL that defaults to
+ *                                 "application"
+ * @param[in]          data        Buffer containing data to publish
+ * @param[in]          dataLen     Buffer length
  *
  * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @return Other GdpError code otherwise.
  *
  ******************************************************************************
  */
 
 static inline GdpError
-ToolsPluginSvcGdp_Publish(ToolsAppCtx *ctx, // IN
-                          const char *msg,  // IN
-                          int msgLen,       // IN
-                          char *reply,      // OUT, OPTIONAL
-                          int *replyLen)    // IN/OUT, OPTIONAL
+ToolsPluginSvcGdp_Publish(ToolsAppCtx *ctx,      // IN
+                          gint64 createTime,     // IN
+                          const gchar *topic,    // IN
+                          const gchar *token,    // IN, OPTIONAL
+                          const gchar *category, // IN, OPTIONAL
+                          const gchar *data,     // IN
+                          guint32 dataLen)       // IN
 {
    ToolsPluginSvcGdp *svcGdp = NULL;
    g_object_get(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, &svcGdp, NULL);
    if (svcGdp != NULL && svcGdp->publish != NULL) {
-      return svcGdp->publish(msg, msgLen, reply, replyLen);
-   }
-   return GDP_ERROR_INTERNAL;
-}
-
-
-/*
- ******************************************************************************
- * ToolsPluginSvcGdp_Stop --                                             */ /**
- *
- * @brief Stops guest data publishing.
- *
- * This function notifies the pool thread in publish call to stop and return
- * immediately. It is intended to be called by the interrupt callback passed
- * to ToolsCorePool_StartThread. At vmtoolsd shutdown time, main thread will
- * call ToolsCorePool_Shutdown which invokes the interrupt callback.
- *
- * @param[in]              ctx       The app context
- *
- * @return GDP_ERROR_SUCCESS on success.
- * @return GDP_ERROR_INTERNAL otherwise.
- *
- ******************************************************************************
- */
-
-static inline GdpError
-ToolsPluginSvcGdp_Stop(ToolsAppCtx *ctx) // IN
-{
-   ToolsPluginSvcGdp *svcGdp = NULL;
-   g_object_get(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, &svcGdp, NULL);
-   if (svcGdp != NULL && svcGdp->stop != NULL) {
-      return svcGdp->stop();
+      return svcGdp->publish(createTime, topic, token,
+                             category, data, dataLen);
    }
-   return GDP_ERROR_INTERNAL;
+   return GDP_ERROR_GENERAL;
 }
 
 #endif /* _VMWARE_TOOLS_GDP_H_ */
diff --git a/open-vm-tools/lib/jsmn/LICENSE b/open-vm-tools/lib/jsmn/LICENSE
new file mode 100644 (file)
index 0000000..c84fb2e
--- /dev/null
@@ -0,0 +1,20 @@
+Copyright (c) 2010 Serge A. Zaitsev
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
+
diff --git a/open-vm-tools/lib/jsmn/Makefile.am b/open-vm-tools/lib/jsmn/Makefile.am
new file mode 100644 (file)
index 0000000..add87f6
--- /dev/null
@@ -0,0 +1,24 @@
+################################################################################
+### Copyright (c) 2021 VMware, Inc.  All rights reserved.
+###
+### This program is free software; you can redistribute it and/or modify
+### it under the terms of version 2 of the GNU General Public License as
+### published by the Free Software Foundation.
+###
+### This program is distributed in the hope that it will be useful,
+### but WITHOUT ANY WARRANTY; without even the implied warranty of
+### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+### GNU General Public License for more details.
+###
+### You should have received a copy of the GNU General Public License
+### along with this program; if not, write to the Free Software
+### Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+################################################################################
+
+noinst_LTLIBRARIES = libJsmn.la
+
+libJsmn_la_SOURCES =
+libJsmn_la_SOURCES += jsmn.c
+
+AM_CFLAGS =
+AM_CFLAGS += @GLIB2_CPPFLAGS@
diff --git a/open-vm-tools/lib/jsmn/README b/open-vm-tools/lib/jsmn/README
new file mode 100644 (file)
index 0000000..22f25f7
--- /dev/null
@@ -0,0 +1,8 @@
+This is a local copy of jsmn, a minimalistic json parser.
+
+jsmn included as src because its tiny, and its been augmented to use
+Log() since the original code has no logging.
+
+https://zserge.com/jsmn.html
+https://github.com/zserge/jsmn
+
diff --git a/open-vm-tools/lib/jsmn/jsmn.c b/open-vm-tools/lib/jsmn/jsmn.c
new file mode 100644 (file)
index 0000000..e83f3b8
--- /dev/null
@@ -0,0 +1,367 @@
+/*********************************************************
+ * Copyright (C) 2019 VMware, Inc. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation version 2.1 and no later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the Lesser GNU General Public
+ * License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA.
+ *
+ *********************************************************/
+
+/*
+ * From https://zserge.com/jsmn.html
+ *
+ * Some local additions (commented with / * vmware * / for logging.
+ */
+
+/*
+ * Turn on JSMN_PARENT to spend a bit of memory to get a lot of speed
+ * in large documents.
+ */
+#define JSMN_PARENT  1
+/* Tune on JSMN_STRICT to force strict json parsing */
+#define JSMN_STRICT  1
+#include "jsmn.h"
+
+#include "log.h"
+#include "vm_assert.h"
+
+#define LOGLEVEL_MODULE jsmn
+#include "loglevel_user.h"
+
+/**
+ * Allocates a fresh unused token from the token pool.
+ */
+static jsmntok_t *jsmn_alloc_token(jsmn_parser *parser,
+      jsmntok_t *tokens, size_t num_tokens) {
+   jsmntok_t *tok;
+   if (parser->toknext >= num_tokens) {
+      return NULL;
+   }
+   tok = &tokens[parser->toknext++];
+   tok->start = tok->end = -1;
+   tok->size = 0;
+#ifdef JSMN_PARENT_LINKS
+   tok->parent = -1;
+#endif
+   return tok;
+}
+
+/**
+ * Fills token type and boundaries.
+ */
+static void jsmn_fill_token(jsmntok_t *token, jsmntype_t type,
+                            int start, int end) {
+   token->type = type;
+   token->start = start;
+   token->end = end;
+   token->size = 0;
+}
+
+/**
+ * Fills next available token with JSON primitive.
+ */
+static int jsmn_parse_primitive(jsmn_parser *parser, const char *js,
+      size_t len, jsmntok_t *tokens, size_t num_tokens) {
+   jsmntok_t *token;
+   int start;
+
+   start = parser->pos;
+
+   for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) {
+      switch (js[parser->pos]) {
+#ifndef JSMN_STRICT
+         /* In strict mode primitive must be followed by "," or "}" or "]" */
+         case ':':
+#endif
+         case '\t' : case '\r' : case '\n' : case ' ' :
+         case ','  : case ']'  : case '}' :
+            goto found;
+      }
+      if (js[parser->pos] < 32 || js[parser->pos] >= 127) {
+         parser->pos = start;
+         /* vmware */
+         Log("%s: Unexpected char '%c' in primitive at pos %d\n",
+             __FUNCTION__, js[parser->pos], parser->pos);
+         /* vmware */
+         return JSMN_ERROR_INVAL;
+      }
+   }
+#ifdef JSMN_STRICT
+   /* In strict mode primitive must be followed by a comma/object/array */
+   parser->pos = start;
+   return JSMN_ERROR_PART;
+#endif
+
+found:
+   if (tokens == NULL) {
+      parser->pos--;
+      return 0;
+   }
+   token = jsmn_alloc_token(parser, tokens, num_tokens);
+   if (token == NULL) {
+      parser->pos = start;
+      return JSMN_ERROR_NOMEM;
+   }
+   jsmn_fill_token(token, JSMN_PRIMITIVE, start, parser->pos);
+#ifdef JSMN_PARENT_LINKS
+   token->parent = parser->toksuper;
+#endif
+   parser->pos--;
+   return 0;
+}
+
+/**
+ * Fills next token with JSON string.
+ */
+static int jsmn_parse_string(jsmn_parser *parser, const char *js,
+      size_t len, jsmntok_t *tokens, size_t num_tokens) {
+   jsmntok_t *token;
+
+   int start = parser->pos;
+
+   parser->pos++;
+
+   /* Skip starting quote */
+   for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) {
+      char c = js[parser->pos];
+
+      /* Quote: end of string */
+      if (c == '\"') {
+         if (tokens == NULL) {
+            return 0;
+         }
+         token = jsmn_alloc_token(parser, tokens, num_tokens);
+         if (token == NULL) {
+            parser->pos = start;
+            return JSMN_ERROR_NOMEM;
+         }
+         jsmn_fill_token(token, JSMN_STRING, start+1, parser->pos);
+#ifdef JSMN_PARENT_LINKS
+         token->parent = parser->toksuper;
+#endif
+         return 0;
+      }
+
+      /* Backslash: Quoted symbol expected */
+      if (c == '\\' && parser->pos + 1 < len) {
+         int i;
+         parser->pos++;
+         switch (js[parser->pos]) {
+            /* Allowed escaped symbols */
+            case '\"': case '/' : case '\\' : case 'b' :
+            case 'f' : case 'r' : case 'n'  : case 't' :
+               break;
+            /* Allows escaped symbol \uXXXX */
+            case 'u':
+               parser->pos++;
+               for(i = 0; i < 4 && parser->pos < len && js[parser->pos] != '\0'; i++) {
+                  /* If it isn't a hex character we have an error */
+                  if(!((js[parser->pos] >= 48 && js[parser->pos] <= 57) || /* 0-9 */
+                           (js[parser->pos] >= 65 && js[parser->pos] <= 70) || /* A-F */
+                           (js[parser->pos] >= 97 && js[parser->pos] <= 102))) { /* a-f */
+                     parser->pos = start;
+                     /* vmware */
+                     Log("%s: Unexpected char '%c' in escaped unicode at pos %d\n",
+                         __FUNCTION__, js[parser->pos], parser->pos);
+                     /* vmware */
+                     return JSMN_ERROR_INVAL;
+                  }
+                  parser->pos++;
+               }
+               parser->pos--;
+               break;
+            /* Unexpected symbol */
+            default:
+               parser->pos = start;
+               /* vmware */
+               Log("%s: Unexpected symbol '%c' in primitive at pos %d\n",
+                   __FUNCTION__, js[parser->pos], parser->pos);
+               /* vmware */
+               return JSMN_ERROR_INVAL;
+         }
+      }
+   }
+   parser->pos = start;
+   return JSMN_ERROR_PART;
+}
+
+/**
+ * Parse JSON string and fill tokens.
+ */
+int jsmn_parse(jsmn_parser *parser, const char *js, size_t len,
+      jsmntok_t *tokens, unsigned int num_tokens) {
+   int r;
+   int i;
+   jsmntok_t *token;
+   int count = parser->toknext;
+
+   for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) {
+      char c;
+      jsmntype_t type;
+
+      c = js[parser->pos];
+      switch (c) {
+         case '{': case '[':
+            count++;
+            if (tokens == NULL) {
+               break;
+            }
+            token = jsmn_alloc_token(parser, tokens, num_tokens);
+            if (token == NULL)
+               return JSMN_ERROR_NOMEM;
+            if (parser->toksuper != -1) {
+               tokens[parser->toksuper].size++;
+#ifdef JSMN_PARENT_LINKS
+               token->parent = parser->toksuper;
+#endif
+            }
+            token->type = (c == '{' ? JSMN_OBJECT : JSMN_ARRAY);
+            token->start = parser->pos;
+            parser->toksuper = parser->toknext - 1;
+            break;
+         case '}': case ']':
+            if (tokens == NULL)
+               break;
+            type = (c == '}' ? JSMN_OBJECT : JSMN_ARRAY);
+#ifdef JSMN_PARENT_LINKS
+            if (parser->toknext < 1) {
+               return JSMN_ERROR_INVAL;
+            }
+            token = &tokens[parser->toknext - 1];
+            for (;;) {
+               if (token->start != -1 && token->end == -1) {
+                  if (token->type != type) {
+                     return JSMN_ERROR_INVAL;
+                  }
+                  token->end = parser->pos + 1;
+                  parser->toksuper = token->parent;
+                  break;
+               }
+               if (token->parent == -1) {
+                  if(token->type != type || parser->toksuper == -1) {
+                     return JSMN_ERROR_INVAL;
+                  }
+                  break;
+               }
+               token = &tokens[token->parent];
+            }
+#else
+            for (i = parser->toknext - 1; i >= 0; i--) {
+               token = &tokens[i];
+               if (token->start != -1 && token->end == -1) {
+                  if (token->type != type) {
+                     return JSMN_ERROR_INVAL;
+                  }
+                  parser->toksuper = -1;
+                  token->end = parser->pos + 1;
+                  break;
+               }
+            }
+            /* Error if unmatched closing bracket */
+            if (i == -1) return JSMN_ERROR_INVAL;
+            for (; i >= 0; i--) {
+               token = &tokens[i];
+               if (token->start != -1 && token->end == -1) {
+                  parser->toksuper = i;
+                  break;
+               }
+            }
+#endif
+            break;
+         case '\"':
+            r = jsmn_parse_string(parser, js, len, tokens, num_tokens);
+            if (r < 0) return r;
+            count++;
+            if (parser->toksuper != -1 && tokens != NULL)
+               tokens[parser->toksuper].size++;
+            break;
+         case '\t' : case '\r' : case '\n' : case ' ':
+            break;
+         case ':':
+            parser->toksuper = parser->toknext - 1;
+            break;
+         case ',':
+            if (tokens != NULL && parser->toksuper != -1 &&
+                  tokens[parser->toksuper].type != JSMN_ARRAY &&
+                  tokens[parser->toksuper].type != JSMN_OBJECT) {
+#ifdef JSMN_PARENT_LINKS
+               parser->toksuper = tokens[parser->toksuper].parent;
+#else
+               for (i = parser->toknext - 1; i >= 0; i--) {
+                  if (tokens[i].type == JSMN_ARRAY || tokens[i].type == JSMN_OBJECT) {
+                     if (tokens[i].start != -1 && tokens[i].end == -1) {
+                        parser->toksuper = i;
+                        break;
+                     }
+                  }
+               }
+#endif
+            }
+            break;
+#ifdef JSMN_STRICT
+         /* In strict mode primitives are: numbers and booleans */
+         case '-': case '0': case '1' : case '2': case '3' : case '4':
+         case '5': case '6': case '7' : case '8': case '9':
+         case 't': case 'f': case 'n' :
+            /* And they must not be keys of the object */
+            if (tokens != NULL && parser->toksuper != -1) {
+               jsmntok_t *t = &tokens[parser->toksuper];
+               if (t->type == JSMN_OBJECT ||
+                     (t->type == JSMN_STRING && t->size != 0)) {
+                  /* vmware */
+                  Log("%s: Unexpected char '%c' in STRING/OBJECT at pos %d\n",
+                      __FUNCTION__, js[parser->pos], parser->pos);
+                  /* vmware */
+                  return JSMN_ERROR_INVAL;
+               }
+            }
+#else
+         /* In non-strict mode every unquoted value is a primitive */
+         default:
+#endif
+            r = jsmn_parse_primitive(parser, js, len, tokens, num_tokens);
+            if (r < 0) return r;
+            count++;
+            if (parser->toksuper != -1 && tokens != NULL)
+               tokens[parser->toksuper].size++;
+            break;
+
+#ifdef JSMN_STRICT
+         /* Unexpected char in strict mode */
+         default:
+            return JSMN_ERROR_INVAL;
+#endif
+      }
+   }
+
+   if (tokens != NULL) {
+      for (i = parser->toknext - 1; i >= 0; i--) {
+         /* Unmatched opened object or array */
+         if (tokens[i].start != -1 && tokens[i].end == -1) {
+            return JSMN_ERROR_PART;
+         }
+      }
+   }
+
+   return count;
+}
+
+/**
+ * Creates a new parser based over a given  buffer with an array of tokens
+ * available.
+ */
+void jsmn_init(jsmn_parser *parser) {
+   parser->pos = 0;
+   parser->toknext = 0;
+   parser->toksuper = -1;
+}
+
index 837c9dae2678453295b370ee9bfa2506cf5bbf14..6da9736227e6d14422d4433119a843adfc4dac5b 100644 (file)
@@ -1,5 +1,5 @@
 ################################################################################
-### Copyright (c) 2008-2020 VMware, Inc.  All rights reserved.
+### Copyright (c) 2008-2021 VMware, Inc.  All rights reserved.
 ###
 ### This program is free software; you can redistribute it and/or modify
 ### it under the terms of version 2 of the GNU General Public License as
@@ -48,6 +48,7 @@ libvmtools_la_LIBADD += ../lib/rpcIn/libRpcIn.la
 libvmtools_la_LIBADD += ../lib/rpcOut/libRpcOut.la
 libvmtools_la_LIBADD += ../lib/rpcVmx/libRpcVmx.la
 libvmtools_la_LIBADD += ../lib/string/libString.la
+libvmtools_la_LIBADD += ../lib/jsmn/libJsmn.la
 libvmtools_la_LIBADD += ../lib/syncDriver/libSyncDriver.la
 libvmtools_la_LIBADD += ../lib/system/libSystem.la
 libvmtools_la_LIBADD += ../lib/stubs/libStubsCS.la
index 71095ef8f8a8691c4b07ab4225779e770a49a6c4..3785ae0376cf609a2a229ca58ce459483b5614a5 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2020 VMware, Inc. All rights reserved.
+ * Copyright (C) 2020-2021 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
 #include <unistd.h>
 #endif
 
+#include "vm_basic_defs.h"
 #include "vm_assert.h"
 #include "vm_atomic.h"
-#include "vm_basic_types.h"
 #define  G_LOG_DOMAIN  "gdp"
 #include "vmware/tools/log.h"
-#include "vmware/tools/plugin.h"
 #include "vmware/tools/utils.h"
+#include "vmware/tools/plugin.h"
+#include "vmware/tools/threadPool.h"
 #include "vmware/tools/gdp.h"
 #include "vmci_defs.h"
 #include "vmci_sockets.h"
 #include "vmcheck.h"
+#include "base64.h"
+#include "util.h"
+#include "str.h"
+#include "jsmn.h"
+#include "conf.h"
 
 #include "vm_version.h"
 #include "embed_version.h"
@@ -55,45 +61,107 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING);
 
 
 #if defined(_WIN32)
-#   define GetSockErr()  WSAGetLastError()
+#   define GetSysErr() WSAGetLastError()
 
-#   define SYSERR_EADDRINUSE     WSAEADDRINUSE
-#   define SYSERR_EHOSTUNREACH   WSAEHOSTUNREACH
-#   define SYSERR_EINTR          WSAEINTR
-#   define SYSERR_EMSGSIZE       WSAEMSGSIZE
-#   define SYSERR_WOULDBLOCK(e)  (e == WSAEWOULDBLOCK)
+#   define SYSERR_EADDRINUSE    WSAEADDRINUSE
+#   define SYSERR_EHOSTUNREACH  WSAEHOSTUNREACH
+#   define SYSERR_EINTR         WSAEINTR
+#   define SYSERR_EMSGSIZE      WSAEMSGSIZE
+#   define SYSERR_WOULDBLOCK(e) (e == WSAEWOULDBLOCK)
 
     typedef int socklen_t;
 #   define CloseSocket closesocket
 
-#   define SOCK_READ   FD_READ
-#   define SOCK_WRITE  FD_WRITE
+    typedef WSAEVENT GdpEvent;
+#   define GDP_INVALID_EVENT WSA_INVALID_EVENT
 #else
-#   define GetSockErr()  errno
+#   define GetSysErr() errno
 
-#   define SYSERR_EADDRINUSE     EADDRINUSE
-#   define SYSERR_EHOSTUNREACH   EHOSTUNREACH
-#   define SYSERR_EINTR          EINTR
-#   define SYSERR_EMSGSIZE       EMSGSIZE
-#   define SYSERR_WOULDBLOCK(e)  (e == EAGAIN || e == EWOULDBLOCK)
+#   define SYSERR_EADDRINUSE    EADDRINUSE
+#   define SYSERR_EHOSTUNREACH  EHOSTUNREACH
+#   define SYSERR_EINTR         EINTR
+#   define SYSERR_EMSGSIZE      EMSGSIZE
+#   define SYSERR_WOULDBLOCK(e) (e == EAGAIN || e == EWOULDBLOCK)
 
     typedef int SOCKET;
-#   define SOCKET_ERROR    (-1)
-#   define INVALID_SOCKET  ((SOCKET) -1)
-#   define CloseSocket     close
+#   define SOCKET_ERROR   (-1)
+#   define INVALID_SOCKET ((SOCKET) -1)
+#   define CloseSocket    close
 
-#   define SOCK_READ   POLLIN
-#   define SOCK_WRITE  POLLOUT
+    typedef int GdpEvent;
+#   define GDP_INVALID_EVENT (-1)
 #endif
 
-#define PRIVILEGED_PORT_MAX  1023
-#define PRIVILEGED_PORT_MIN  1
+#define GDP_TIMEOUT_AT_INFINITE (-1)
+#define GDP_WAIT_INFINITE       (-1)
+#define GDP_SEND_AFTER_ANY_TIME (-1)
+
+#define USEC_PER_SECOND      (G_GINT64_CONSTANT(1000000))
+#define USEC_PER_MILLISECOND (G_GINT64_CONSTANT(1000))
+
+/*
+ * Macros to read values from tools config
+ */
+#define GDP_CONFIG_GET_BOOL(key, defVal) \
+   VMTools_ConfigGetBoolean(gPluginState.ctx->config, \
+                            CONFGROUPNAME_GDP, key, defVal)
+
+#define GDP_CONFIG_GET_INT(key, defVal) \
+   VMTools_ConfigGetInteger(gPluginState.ctx->config, \
+                            CONFGROUPNAME_GDP, key, defVal)
+
+#define GDPD_RECV_PORT 7777
+#define GDP_RECV_PORT  766
 
-#define GDPD_LISTEN_PORT  7777
+/*
+ * The minimum rate limit is 1 pps, corresponding wait interval is 1 second.
+ */
+#define GDP_WAIT_RESULT_TIMEOUT 1500 // ms
+
+#define GDP_PACKET_JSON_LINE_SUBSCRIBERS \
+   "      \"subscribers\":[%s],\n"
+
+#define GDP_PACKET_JSON \
+   "{\n" \
+   "   \"header\": {\n" \
+   "      \"sequence\":%" G_GUINT64_FORMAT ",\n" \
+   "%s" \
+   "      \"createTime\":\"%s\",\n" \
+   "      \"topic\":\"%s\",\n" \
+   "      \"token\":\"%s\"\n" \
+   "   },\n" \
+   "   \"payload\":{\n" \
+   "      \"category\":\"%s\",\n" \
+   "      \"base64\":\"%s\"\n" \
+   "   }\n" \
+   "}"
+
+#define GDP_RESULT_SEQUENCE   "sequence"  // Required, <uint64> e.g. 12345
+#define GDP_RESULT_STATUS     "status"    // Required,  "ok", or "bad" for
+                                          // malformed and rejected packet
+#define GDP_RESULT_DIAGNOSIS  "diagnosis" // Optional
+#define GDP_RESULT_RATE_LIMIT "rateLimit" // Required,  <int>
+                                          // e.g. 2 packets per second
+
+#define GDP_RESULT_STATUS_OK  "ok"
+#define GDP_RESULT_STATUS_BAD "bad"
+
+#define GDP_RESULT_REQUIRED_KEYS 3
+
+#define GDP_HISTORY_REQUEST_PAST_SECONDS "pastSeconds" // <uint64>
+#define GDP_HISTORY_REQUEST_ID           "id"          // <uint64>
+                                                       // subscription ID
+
+#define GDP_HISTORY_REQUEST_REQUIRED_KEYS 2
 
-#define GDP_SEND_TIMEOUT  1000 // ms
-#define GDP_RECV_TIMEOUT  3000 // ms
+/*
+ * Historical guest data cache size limit
+ */
+#define GDP_MAX_CACHE_SIZE_LIMIT     (1 << 24) // 16M
+#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 22) // 4M
+#define GDP_MIN_CACHE_SIZE_LIMIT     (1 << 20) // 1M
 
+#define GDP_STR_SIZE(s) ((guint32) (s != NULL ? (strlen(s) + 1) : 0))
 
 /*
  * GdpError message table.
@@ -105,413 +173,2114 @@ GDP_ERR_LIST
 #undef GDP_ERR_ITEM
 
 
-typedef struct PluginData {
-   ToolsAppCtx *ctx;       /* The application context */
+typedef struct PluginState {
+   ToolsAppCtx *ctx;       /* Tools application context */
+
+   Atomic_Bool started;    /* TRUE : Guest data publishing is started
+                              FALSE: otherwise
+                              Transitions from FALSE to TRUE only */
+
 #if defined(_WIN32)
    Bool wsaStarted;        /* TRUE : WSAStartup succeeded, WSACleanup required
                               FALSE: otherwise */
-   WSAEVENT eventSendRecv; /* The send-recv event object:
-                              Event object to associate with
-                              network send/recv ready event */
-   WSAEVENT eventStop;     /* The stop event object:
-                              Event object signalled to stop guest data
-                              publishing for vmtoolsd shutdown */
-#else
-   int eventStop;          /* The stop event fd:
-                              Event fd signalled to stop guest data
-                              publishing for vmtoolsd shutdown */
 #endif
    int vmciFd;             /* vSocket address family value fd */
    int vmciFamily;         /* vSocket address family value */
    SOCKET sock;            /* Datagram socket for publishing guest data */
+#if defined(_WIN32)
+   GdpEvent eventNetwork;  /* The network event object:
+                              To be associated with network
+                              send/recv ready event */
+#endif
+
+   GdpEvent eventStop;     /* The stop event object:
+                              Signalled to stop guest data publishing */
    Atomic_Bool stopped;    /* TRUE : Guest data publishing is stopped
-                                     for vmtoolsd shutdown
-                              FALSE: otherwise */
-} PluginData;
+                              FALSE: otherwise
+                              Transitions from FALSE to TRUE only */
 
-static PluginData pluginData;
+   GdpEvent eventConfig;   /* The config event object:
+                              Signalled to update config */
+} PluginState;
 
+static PluginState gPluginState;
 
-static Bool GdpInit(ToolsAppCtx *ctx);
-static void GdpDestroy(void);
-static void GdpSetStopEvent(void);
-static Bool GdpCreateSocket(void);
-static void GdpCloseSocket(void);
-static GdpError GdpEmptyRecvQueue(void);
-static GdpError GdpWaitForEvent(int netEvent, int timeout);
-static GdpError GdpSend(const char *buf, int len, int timeout);
-static GdpError GdpRecv(char *buf, int *len, int timeout);
 
+typedef struct PublishState {
+   GMutex mutex; /* To sync incoming publish calls */
 
-/*
- ******************************************************************************
- * GdpInit --
- *
- * Initializes internal plugin data.
- *
- * @param[in]  ctx   The application context
- *
- * @return TRUE on success.
- * @return FALSE otherwise.
- *
- ******************************************************************************
- */
+   /*
+    * Data passed from the active incoming publish thread to
+    * the gdp task thread.
+    */
+   gint64 createTime; /* Real wall-clock time,
+                         in microseconds since January 1, 1970 UTC. */
+   const gchar *topic;
+   const gchar *token;
+   const gchar *category;
+   const gchar *data;
+   guint32 dataLen;
+
+   /*
+    * The publish event object:
+    * The active incoming publish thread signals this event object
+    * to notify the gdp task thread to publish new data.
+    */
+   GdpEvent eventPublish;
+
+   /*
+    * The get-result event object:
+    * The gdp task thread signals this event object to notify
+    * the active incoming publish thread to get publish result.
+    */
+   GdpEvent eventGetResult;
+
+   GdpError gdpErr; /* The publish result */
+
+} PublishState;
+
+static PublishState gPublishState;
+
+
+typedef enum GdpTaskEvent {
+   GDP_TASK_EVENT_NONE,    /* No event */
+   GDP_TASK_EVENT_STOP,    /* Stop event */
+   GDP_TASK_EVENT_CONFIG,  /* Config event */
+   GDP_TASK_EVENT_NETWORK, /* Network event */
+   GDP_TASK_EVENT_PUBLISH, /* Publish event */
+   GDP_TASK_EVENT_TIMEOUT, /* Wait timed out */
+} GdpTaskEvent;
+
+typedef enum GdpTaskMode {
+   GDP_TASK_MODE_NONE,    /* Not publishing
+                             valid with GDP_TASK_STATE_IDLE only */
+   GDP_TASK_MODE_PUBLISH, /* Publishing new data */
+   GDP_TASK_MODE_HISTORY, /* Publishing historical data */
+} GdpTaskMode;
+
+typedef enum GdpTaskState {
+   GDP_TASK_STATE_IDLE,             /* Not started
+                                       valid with GDP_TASK_MODE_NONE only */
+   GDP_TASK_STATE_WAIT_TO_SEND,     /* Wait to send JSON packet */
+   GDP_TASK_STATE_WAIT_FOR_RESULT1, /* Wait for publish result from daemon */
+   GDP_TASK_STATE_WAIT_FOR_RESULT2, /* Wait for publish result after re-send */
+} GdpTaskState;
+
+typedef struct PublishResult {
+   guint64 sequence; /* Result for the packet with this sequence number */
+   Bool statusOk;    /* TRUE: ok, FALSE: bad */
+   gchar *diagnosis; /* Diagnosis message if statusOk is FALSE */
+   gint32 rateLimit; /* VMCI peer rate limit */
+} PublishResult;
+
+typedef struct HistoryRequest {
+   gint64 beginCacheTime; /* Begin cacheTime */
+   gint64 endCacheTime;   /* End cacheTime */
+   gchar id[24];          /* Incremental uint64 subscription ID in string
+                             format.
+                             2^64 - 1: 18446744073709551615 (20 digits) */
+} HistoryRequest;
+
+typedef struct HistoryCacheItem {
+   gint64 createTime; /* Guest data - begin */
+   gchar *topic;
+   gchar *token;
+   gchar *category;
+   gchar *data;
+   guint32 dataLen;   /* Guest data - end */
+   gint64 cacheTime;  /* Monotonic time point when item is cached */
+   guint32 itemSize;  /* Item size in bytes */
+} HistoryCacheItem;
+
+typedef struct HistoryCache {
+   GQueue queue;       /* Container for HistoryCacheItem */
+   guint32 sizeLimit;  /* Cache size limit */
+   guint32 size;       /* Current cache size */
+   GList *currentLink; /* Pointer to the history cache queue link
+                          currently being published */
+} HistoryCache;
+
+typedef struct TaskContext {
+   /*
+    * Legal mode & state combination
+    *
+    * GDP_TASK_MODE_NONE   : GDP_TASK_STATE_IDLE
+    *                        (Idle state will never time out)
+    *
+    * GDP_TASK_MODE_PUBLISH: GDP_TASK_STATE_WAIT_TO_SEND
+    *                        GDP_TASK_STATE_WAIT_FOR_RESULT1
+    *                        GDP_TASK_STATE_WAIT_FOR_RESULT2
+    *                        (All the above wait states will time out)
+    *
+    * GDP_TASK_MODE_HISTORY: GDP_TASK_STATE_WAIT_TO_SEND
+    *                        GDP_TASK_STATE_WAIT_FOR_RESULT1
+    *                        GDP_TASK_STATE_WAIT_FOR_RESULT2
+    *                        (All the above wait states will time out)
+    */
+   GdpTaskMode mode;
+   GdpTaskState state;
+
+   /*
+    * Publish event object signalled while mode is GDP_TASK_MODE_HISTORY.
+    */
+   Bool publishPending;
+   /*
+    * History request can be received at any time,
+    * non-empty requests queue means history request pending.
+    */
+
+   HistoryCache cache;  /* Container for history cache items */
+   GQueue requests;     /* Container for HistoryRequest */
+
+   guint64 sequence;    /* Sequence number */
+   gchar *packet;       /* Formatted JSON packet */
+   guint32 packetLen;   /* JSON packet length */
+
+   gint64 timeoutAt;    /* Time out at this monotonic time point,
+                           in microseconds. */
+   gint64 sendAfter;    /* Send to daemon after this monotonic time point,
+                           in microseconds. */
+} TaskContext;
+
+
+static inline GdpEvent
+GdpCreateEvent(void);
+
+static inline void
+GdpCloseEvent(GdpEvent *event); // IN/OUT
+
+static inline void
+GdpSetEvent(GdpEvent event); // IN
+
+static inline void
+GdpResetEvent(GdpEvent event); // IN
+
+static GdpError
+GdpWaitForEvent(GdpEvent event, // IN
+                int timeout);   // IN
 
 static Bool
-GdpInit(ToolsAppCtx *ctx) // IN
-{
-   Bool retVal = FALSE;
+GdpCreateSocket(void);
 
-   pluginData.ctx = ctx;
-#if defined(_WIN32)
-   pluginData.wsaStarted = FALSE;
-   pluginData.eventSendRecv = WSA_INVALID_EVENT;
-   pluginData.eventStop = WSA_INVALID_EVENT;
-#else
-   pluginData.eventStop = -1;
-#endif
-   pluginData.vmciFd = -1;
-   pluginData.vmciFamily = -1;
-   pluginData.sock = INVALID_SOCKET;
-   Atomic_WriteBool(&pluginData.stopped, FALSE);
+static void
+GdpCloseSocket(void);
 
-#if defined(_WIN32)
-   {
-      WSADATA wsaData;
-      int res = WSAStartup(MAKEWORD(2, 2), &wsaData);
-      if (res != 0) {
-         g_critical("%s: WSAStartup failed: error=%d.\n",
-                     __FUNCTION__, res);
-         return FALSE;
-      }
+static GdpError
+GdpSendTo(SOCKET sock,                         // IN
+          const char *buf,                     // IN
+          int bufLen,                          // IN
+          const struct sockaddr_vm *destAddr); // IN
 
-      pluginData.wsaStarted = TRUE;
-   }
+static GdpError
+GdpRecvFrom(SOCKET sock,                  // IN
+            char *buf,                    // OUT
+            int *bufLen,                  // IN/OUT
+            struct sockaddr_vm *srcAddr); // OUT
 
-   pluginData.eventSendRecv = WSACreateEvent();
-   if (pluginData.eventSendRecv == WSA_INVALID_EVENT) {
-      g_critical("%s: WSACreateEvent for send/recv failed: error=%d.\n",
-                 __FUNCTION__, WSAGetLastError());
-      goto exit;
-   }
+static inline void
+GdpHistoryRequestFree(HistoryRequest *request); // IN
 
-   pluginData.eventStop = WSACreateEvent();
-   if (pluginData.eventStop == WSA_INVALID_EVENT) {
-      g_critical("%s: WSACreateEvent for stop failed: error=%d.\n",
-                 __FUNCTION__, WSAGetLastError());
-      goto exit;
-   }
-#else
-   pluginData.eventStop = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
-   if (pluginData.eventStop == -1) {
-      g_critical("%s: eventfd for stop failed: error=%d.\n",
-                 __FUNCTION__, errno);
-      goto exit;
-   }
-#endif
+static void
+GdpHistoryRequestFreeGFunc(gpointer item_data,  // IN
+                           gpointer user_data); // IN
 
-   pluginData.vmciFamily = VMCISock_GetAFValueFd(&pluginData.vmciFd);
-   if (pluginData.vmciFamily == -1) {
-      g_critical("%s: Failed to get vSocket address family value.\n",
-                 __FUNCTION__);
-      goto exit;
-   }
+static inline void
+GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx); // IN/OUT
 
-   retVal = TRUE;
+static guint32
+GdpGetHistoryCacheSizeLimit(); // IN
 
-exit:
-   if (!retVal) {
-      GdpDestroy();
-   }
+static inline Bool
+GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx); // IN
 
-   return retVal;
-}
+static void
+GdpHistoryCacheItemFree(HistoryCacheItem *item); // IN
 
+static void
+GdpHistoryCacheItemFreeGFunc(gpointer item_data,  // IN
+                             gpointer user_data); // IN
 
-/*
- ******************************************************************************
- * GdpDestroy --
- *
- * Destroys internal plugin data.
- *
- ******************************************************************************
- */
+static inline void
+GdpTaskClearHistoryCacheQueue(TaskContext *taskCtx); // IN/OUT
 
 static void
-GdpDestroy(void)
-{
-   GdpCloseSocket();
+GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx); // IN/OUT
 
-   if (pluginData.vmciFd != -1) {
-      VMCISock_ReleaseAFValueFd(pluginData.vmciFd);
-      pluginData.vmciFd = -1;
-   }
+static void
+GdpTaskHistoryCachePushItem(TaskContext *taskCtx,  // IN/OUT
+                            gint64 createTime,     // IN
+                            const gchar *topic,    // IN
+                            const gchar *token,    // IN
+                            const gchar *category, // IN
+                            const gchar *data,     // IN
+                            guint32 dataLen);      // IN
 
-#if defined(_WIN32)
-   if (pluginData.eventStop != WSA_INVALID_EVENT) {
-      WSACloseEvent(pluginData.eventStop);
-      pluginData.eventStop = WSA_INVALID_EVENT;
-   }
+static gchar *
+GdpGetFormattedUtcTime(gint64 utcTime); // IN
 
-   if (pluginData.eventSendRecv != WSA_INVALID_EVENT) {
-      WSACloseEvent(pluginData.eventSendRecv);
-      pluginData.eventSendRecv = WSA_INVALID_EVENT;
-   }
+static GdpError
+GdpTaskBuildPacket(TaskContext *taskCtx,      // IN/OUT
+                   gint64 createTime,         // IN
+                   const gchar *topic,        // IN
+                   const gchar *token,        // IN, OPTIONAL
+                   const gchar *category,     // IN, OPTIONAL
+                   const gchar *data,         // IN
+                   guint32 dataLen,           // IN
+                   const gchar *subscribers); // IN, OPTIONAL
 
-   if (pluginData.wsaStarted) {
-      WSACleanup();
-      pluginData.wsaStarted = FALSE;
-   }
-#else
-   if (pluginData.eventStop != -1) {
-      close(pluginData.eventStop);
-      pluginData.eventStop = -1;
-   }
-#endif
+static inline void
+GdpTaskDestroyPacket(TaskContext *taskCtx); // IN/OUT
 
-   pluginData.ctx = NULL;
-}
+static inline Bool
+GdpTaskOkToSend(TaskContext *taskCtx); // IN
+
+static GdpError
+GdpTaskSendPacket(TaskContext *taskCtx); // IN/OUT
+
+static gchar *
+GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
+   TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskPublishHistory(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskProcessConfigChange(TaskContext *taskCtx); // IN/OUT
+
+static Bool
+GdpJsonIsTokenOfKey(const char *json, // IN
+                    jsmntok_t *token, // IN
+                    const char *key); // IN
+
+static Bool
+GdpJsonIsPublishResult(const char *json,       // IN
+                       jsmntok_t *tokens,      // IN
+                       int count,              // IN
+                       PublishResult *result); // OUT
+
+static void
+GdpTaskProcessPublishResult(TaskContext *taskCtx,   // IN/OUT
+                            PublishResult *result); // IN
+
+static Bool
+GdpJsonIsHistoryRequest(const char *json,         // IN
+                        jsmntok_t *tokens,        // IN
+                        int count,                // IN
+                        HistoryRequest *request); // OUT
+
+static void
+GdpTaskPushHistoryRequest(TaskContext *taskCtx,     // IN/OUT
+                          HistoryRequest *request); // IN
+
+static void
+GdpTaskProcessNetwork(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskProcessPublish(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpTaskProcessTimeout(TaskContext *taskCtx); // IN/OUT
+
+static int
+GdpTaskGetTimeout(TaskContext *taskCtx); // IN
+
+static void
+GdpTaskProcessEvents(TaskContext *taskCtx,    // IN/OUT
+                     GdpTaskEvent taskEvent); // IN
+
+static GdpError
+GdpTaskWaitForEvents(int timeout,              // IN
+                     GdpTaskEvent *taskEvent); // OUT
+
+static void
+GdpTaskCtxInit(TaskContext *taskCtx); // OUT
+
+static void
+GdpTaskCtxDestroy(TaskContext *taskCtx); // IN/OUT
+
+static void
+GdpThreadTask(ToolsAppCtx *ctx, // IN
+              void *data);      // IN
+
+static void
+GdpThreadInterrupt(ToolsAppCtx *ctx, // IN
+                   void *data);      // IN
+
+static void
+GdpInit(ToolsAppCtx *ctx); // IN
+
+static Bool
+GdpStart(void);
+
+static void
+GdpDestroy(void);
 
 
 /*
  ******************************************************************************
- * GdpSetStopEvent --
+ * GdpCreateEvent --
+ *
+ * Creates a new event object/fd.
  *
- * Signals the stop event object/fd.
+ * @return The new event object handle or fd on success.
+ * @return GDP_INVALID_EVENT otherwise.
  *
  ******************************************************************************
  */
 
-static void
-GdpSetStopEvent(void)
+static inline GdpEvent
+GdpCreateEvent(void)
 {
 #if defined(_WIN32)
-   ASSERT(pluginData.eventStop != WSA_INVALID_EVENT);
-   WSASetEvent(pluginData.eventStop);
+   return WSACreateEvent();
 #else
-   eventfd_t val = 1;
-   ASSERT(pluginData.eventStop != -1);
-   eventfd_write(pluginData.eventStop, val);
+   return eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
 #endif
 }
 
 
 /*
  ******************************************************************************
- * GdpCreateSocket --
- *
- * Creates a non-blocking datagram socket for guest data publishing.
+ * GdpCloseEvent --
  *
- * The socket is bound to a local privileged port with default remote address
- * set to host side gdp daemon endpoint.
+ * Closes the event object/fd.
  *
- * @return TRUE on success.
- * @return FALSE otherwise.
+ * @param[in,out] event  The event object/fd pointer
  *
  ******************************************************************************
  */
 
-static Bool
-GdpCreateSocket(void)
+static inline void
+GdpCloseEvent(GdpEvent *event) // IN/OUT
 {
-   struct sockaddr_vm localAddr;
-   struct sockaddr_vm remoteAddr;
-   int    sockErr;
-   Bool   retVal = FALSE;
-#if defined(_WIN32)
-   u_long nbMode = 1; // Non-blocking mode
-#  define SOCKET_TYPE_PARAM SOCK_DGRAM
-#else
-   /*
-    * Requires Linux kernel version >= 2.6.27.
-    */
-#  define SOCKET_TYPE_PARAM SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC
-#endif
-
-   ASSERT(pluginData.sock == INVALID_SOCKET);
-
-   pluginData.sock = socket(pluginData.vmciFamily, SOCKET_TYPE_PARAM, 0);
-#undef SOCKET_TYPE_PARAM
-
-   if (pluginData.sock == INVALID_SOCKET) {
-      g_warning("%s: socket failed: error=%d.\n", __FUNCTION__, GetSockErr());
-      return FALSE;
-   }
+   ASSERT(event != NULL);
 
+   if (*event != GDP_INVALID_EVENT) {
 #if defined(_WIN32)
-   /*
-    * Set socket to nonblocking mode.
-    * Note: WSAEventSelect automatically does this if not done.
-    */
-   if (ioctlsocket(pluginData.sock, FIONBIO, &nbMode) != 0) {
-      sockErr = GetSockErr();
-      g_warning("%s: ioctlsocket failed: error=%d.\n", __FUNCTION__, sockErr);
-      goto exit;
-   }
-#endif
-
-   memset(&localAddr, 0, sizeof localAddr);
-   localAddr.svm_family = pluginData.vmciFamily;
-   localAddr.svm_cid = VMCISock_GetLocalCID();
-   localAddr.svm_port = PRIVILEGED_PORT_MAX; // No htons
-
-   do {
-      if (bind(pluginData.sock, (struct sockaddr *)&localAddr,
-               (socklen_t) sizeof localAddr) == 0) {
-         sockErr = 0;
-         break;
+      if (!WSACloseEvent(*event)) {
+         g_warning("%s: WSACloseEvent failed: error=%d.\n",
+                   __FUNCTION__, WSAGetLastError());
       }
-
-      sockErr = GetSockErr();
-      if (sockErr == SYSERR_EADDRINUSE) {
-         g_info("%s: Local port %d is in use, retry the next one.\n",
-                __FUNCTION__, localAddr.svm_port);
-         localAddr.svm_port--;
-      } else {
-         g_warning("%s: bind failed: error=%d.\n", __FUNCTION__, sockErr);
-         goto exit;
+#else
+      if (close(*event) != 0) {
+         g_warning("%s: close failed: error=%d.\n",
+                   __FUNCTION__, errno);
       }
-   } while (localAddr.svm_port >= PRIVILEGED_PORT_MIN);
-
-   if (sockErr != 0) {
-      goto exit;
-   }
-
-   memset(&remoteAddr, 0, sizeof remoteAddr);
-   remoteAddr.svm_family = pluginData.vmciFamily;
-   remoteAddr.svm_cid = VMCI_HOST_CONTEXT_ID;
-   remoteAddr.svm_port = GDPD_LISTEN_PORT; // No htons
-
-   /*
-    * Set default remote address to send to/recv from datagrams.
-    */
-   if (connect(pluginData.sock, (struct sockaddr *)&remoteAddr,
-               (socklen_t) sizeof remoteAddr) != 0) {
-      sockErr = GetSockErr();
-      g_warning("%s: connect failed: error=%d.\n", __FUNCTION__, sockErr);
-      goto exit;
+#endif
+      *event = GDP_INVALID_EVENT;
    }
+}
 
-   g_debug("%s: Socket created and bound to local port %d.\n",
-           __FUNCTION__, localAddr.svm_port);
 
-   retVal = TRUE;
+/*
+ ******************************************************************************
+ * GdpSetEvent --
+ *
+ * Signals the event object/fd.
+ *
+ * @param[in] event  The event object/fd
+ *
+ ******************************************************************************
+ */
 
-exit:
-   if (!retVal) {
-      CloseSocket(pluginData.sock);
-      pluginData.sock = INVALID_SOCKET;
+static inline void
+GdpSetEvent(GdpEvent event) // IN
+{
+#if defined(_WIN32)
+   ASSERT(event != GDP_INVALID_EVENT);
+   if (!WSASetEvent(event)) {
+      g_warning("%s: WSASetEvent failed: error=%d.\n",
+                __FUNCTION__, WSAGetLastError());
    }
-
-   return retVal;
+#else
+   eventfd_t val = 1;
+   ASSERT(event != GDP_INVALID_EVENT);
+   if (eventfd_write(event, val) != 0) {
+      g_warning("%s: eventfd_write failed: error=%d.\n", __FUNCTION__, errno);
+   }
+#endif
 }
 
 
 /*
  ******************************************************************************
- * GdpCloseSocket --
+ * GdpResetEvent --
+ *
+ * Resets the event object/fd.
  *
- * Closes the guest data publishing socket.
+ * @param[in] event  The event object/fd
  *
  ******************************************************************************
  */
 
-static void
-GdpCloseSocket(void)
+static inline void
+GdpResetEvent(GdpEvent event) // IN
 {
-   if (pluginData.sock != INVALID_SOCKET) {
-      g_debug("%s: Closing socket.\n", __FUNCTION__);
-      if (CloseSocket(pluginData.sock) != 0) {
-         g_warning("%s: CloseSocket failed: fd=%d, error=%d.\n",
-                   __FUNCTION__, pluginData.sock, GetSockErr());
-      }
-
-      pluginData.sock = INVALID_SOCKET;
+#if defined(_WIN32)
+   ASSERT(event != GDP_INVALID_EVENT);
+   if (!WSAResetEvent(event)) {
+      g_warning("%s: WSAResetEvent failed: error=%d.\n",
+                __FUNCTION__, WSAGetLastError());
+   }
+#else
+   eventfd_t val;
+   ASSERT(event != GDP_INVALID_EVENT);
+   if (eventfd_read(event, &val) != 0) {
+      g_warning("%s: eventfd_read failed: error=%d.\n", __FUNCTION__, errno);
    }
+#endif
 }
 
 
 /*
  ******************************************************************************
- * GdpEmptyRecvQueue --
+ * GdpWaitForEvent --
  *
- * Empties receive queue before publishing new data to host side gdp daemon.
- * This is required in case previous receive from daemon timed out and daemon
- * did reply later.
+ * Waits for the event object/fd or timeout.
  *
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @param[in] event    The event object/fd
+ * @param[in] timeout  Timeout value in milliseconds,
+ *                     negative value means an infinite timeout,
+ *                     zero means no wait.
+ *
+ * @return GDP_ERROR_SUCCESS on event signalled.
+ * @return GDP_ERROR_TIMEOUT on timeout.
+ * @return Other GdpError code otherwise.
  *
  ******************************************************************************
  */
 
 static GdpError
-GdpEmptyRecvQueue(void)
+GdpWaitForEvent(GdpEvent event, // IN
+                int timeout)    // IN
 {
+#if defined(_WIN32)
+   DWORD localTimeout;
+   gint64 startTime;
    GdpError retVal;
 
-   ASSERT(pluginData.sock != INVALID_SOCKET);
+   localTimeout = (DWORD)(timeout >= 0 ? timeout : WSA_INFINITE);
+   if (timeout > 0) {
+      startTime = g_get_monotonic_time();
+   } else {
+      startTime = 0; // Deals with [-Werror=maybe-uninitialized]
+   }
+
+   while (TRUE) {
+      WSAEVENT eventObjects[] = { event };
+      DWORD waitRes;
+
+      waitRes = WSAWaitForMultipleEvents((DWORD) ARRAYSIZE(eventObjects),
+                                         eventObjects,
+                                         FALSE, localTimeout, TRUE);
+      if (waitRes == WSA_WAIT_EVENT_0) {
+         retVal = GDP_ERROR_SUCCESS;
+         break;
+      } else if (waitRes == WSA_WAIT_IO_COMPLETION) {
+         gint64 curTime;
+         gint64 passedTime;
+
+         if (localTimeout == 0 ||
+             localTimeout == WSA_INFINITE) {
+            continue;
+         }
+
+         curTime = g_get_monotonic_time();
+         passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
+         if (passedTime >= localTimeout) {
+            retVal = GDP_ERROR_TIMEOUT;
+            break;
+         }
+
+         startTime = curTime;
+         localTimeout -= (DWORD) passedTime;
+         continue;
+      } else if (waitRes == WSA_WAIT_TIMEOUT) {
+         retVal = GDP_ERROR_TIMEOUT;
+         break;
+      } else { // WSA_WAIT_FAILED
+         g_warning("%s: WSAWaitForMultipleEvents failed: error=%d.\n",
+                   __FUNCTION__, WSAGetLastError());
+         retVal = GDP_ERROR_GENERAL;
+         break;
+      }
+   }
+
+   return retVal;
+
+#else
+
+   gint64 startTime;
+   GdpError retVal;
+
+   if (timeout > 0) {
+      startTime = g_get_monotonic_time();
+   } else {
+      startTime = 0; // Deals with [-Werror=maybe-uninitialized]
+   }
+
+   while (TRUE) {
+      struct pollfd fds[1];
+      int res;
+
+      fds[0].fd = event;
+      fds[0].events = POLLIN;
+      fds[0].revents = 0;
+
+      res = poll(fds, ARRAYSIZE(fds), timeout);
+      if (res > 0) {
+         if (fds[0].revents & POLLIN) {
+            retVal = GDP_ERROR_SUCCESS;
+         } else { // Not expected
+            g_warning("%s: Unexpected event.\n", __FUNCTION__);
+            retVal = GDP_ERROR_GENERAL;
+         }
+
+         break;
+      } else if (res == -1) {
+         int err = errno;
+         if (err == EINTR) {
+            gint64 curTime;
+            gint64 passedTime;
+
+            if (timeout <= 0) {
+               continue;
+            }
+
+            curTime = g_get_monotonic_time();
+            passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
+            if (passedTime >= timeout) {
+               retVal = GDP_ERROR_TIMEOUT;
+               break;
+            }
+
+            startTime = curTime;
+            timeout -= (int) passedTime;
+            continue;
+         } else {
+            g_warning("%s: poll failed: error=%d.\n", __FUNCTION__, err);
+            retVal = GDP_ERROR_GENERAL;
+            break;
+         }
+      } else if (res == 0) {
+         retVal = GDP_ERROR_TIMEOUT;
+         break;
+      } else {
+         g_warning("%s: Unexpected poll return: %d.\n", __FUNCTION__, res);
+         retVal = GDP_ERROR_GENERAL;
+         break;
+      }
+   }
+
+   return retVal;
+#endif
+}
+
+
+/*
+ ******************************************************************************
+ * GdpCreateSocket --
+ *
+ * Creates a non-blocking datagram socket for guest data publishing.
+ *
+ * The socket is bound to the gdp guest receive port.
+ *
+ * @return TRUE on success.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpCreateSocket(void)
+{
+   Bool retVal;
+   struct sockaddr_vm localAddr;
+#if defined(_WIN32)
+   u_long nbMode = 1; // Non-blocking mode
+#  define SOCKET_TYPE_PARAM SOCK_DGRAM
+#else
+   /*
+    * Requires Linux kernel version >= 2.6.27.
+    */
+#  define SOCKET_TYPE_PARAM SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC
+#endif
+
+   ASSERT(gPluginState.sock == INVALID_SOCKET);
+   ASSERT(gPluginState.vmciFamily != -1);
+
+   gPluginState.sock = socket(gPluginState.vmciFamily, SOCKET_TYPE_PARAM, 0);
+#undef SOCKET_TYPE_PARAM
+
+   if (gPluginState.sock == INVALID_SOCKET) {
+      g_critical("%s: socket failed: error=%d.\n", __FUNCTION__, GetSysErr());
+      return FALSE;
+   }
+
+   retVal = FALSE;
+
+#if defined(_WIN32)
+   /*
+    * Sets socket to nonblocking mode.
+    * Note: WSAEventSelect automatically does this if not done.
+    */
+   if (ioctlsocket(gPluginState.sock, FIONBIO, &nbMode) != 0) {
+      g_critical("%s: ioctlsocket failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
+      goto exit;
+   }
+#endif
+
+   memset(&localAddr, 0, sizeof localAddr);
+   localAddr.svm_family = gPluginState.vmciFamily;
+   localAddr.svm_cid = VMCISock_GetLocalCID();
+   localAddr.svm_port = GDP_RECV_PORT; // No htons
+
+   if (bind(gPluginState.sock, (struct sockaddr *) &localAddr,
+            (socklen_t) sizeof localAddr) != 0) {
+      g_critical("%s: bind failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
+      goto exit;
+   }
+
+   g_debug("%s: Socket created and bound to local port %d.\n",
+           __FUNCTION__, localAddr.svm_port);
+
+   retVal = TRUE;
+
+exit:
+   if (!retVal) {
+      CloseSocket(gPluginState.sock);
+      gPluginState.sock = INVALID_SOCKET;
+   }
+
+   return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpCloseSocket --
+ *
+ * Closes the guest data publishing datagram socket.
+ *
+ ******************************************************************************
+ */
+
+static void
+GdpCloseSocket(void)
+{
+   if (gPluginState.sock != INVALID_SOCKET) {
+      g_debug("%s: Closing socket.\n", __FUNCTION__);
+      if (CloseSocket(gPluginState.sock) != 0) {
+         g_warning("%s: CloseSocket failed: fd=%d, error=%d.\n",
+                   __FUNCTION__, gPluginState.sock, GetSysErr());
+      }
+
+      gPluginState.sock = INVALID_SOCKET;
+   }
+}
+
+
+/*
+ ******************************************************************************
+ * GdpSendTo --
+ *
+ * Wrapper of sendto() for VMCI datagram socket.
+ *
+ * Datagram send is not buffered, it will not return EWOULDBLOCK.
+ * If host daemon is not running, error EHOSTUNREACH is returned.
+ *
+ * @param[in] sock      VMCI datagram socket descriptor
+ * @param[in] buf       Data buffer pointer
+ * @param[in] bufLen    Data length
+ * @param[in] destAddr  Destination VMCI datagram socket address
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpSendTo(SOCKET sock,                        // IN
+          const char *buf,                    // IN
+          int bufLen,                         // IN
+          const struct sockaddr_vm *destAddr) // IN
+{
+   GdpError retVal;
+
+   ASSERT(sock != INVALID_SOCKET);
+   ASSERT(buf != NULL && bufLen > 0);
+   ASSERT(destAddr != NULL);
 
    do {
       long res;
-      int sockErr;
-      char buf[1]; // OK to truncate, case of SYSERR_EMSGSIZE.
+      int err;
+      socklen_t destAddrLen = (socklen_t) sizeof *destAddr;
 
-      /*
-       * Windows: recv returns -1, first with SYSERR_EMSGSIZE,
-       *          then SYSERR_WOULDBLOCK.
-       * Linux  : recv returns 1 first, then -1 with SYSERR_WOULDBLOCK.
-       */
-      res = recv(pluginData.sock, buf, (int) sizeof buf, 0);
-      if (res >= 0) {
-         g_debug("%s: recv returns %d.\n", __FUNCTION__, (int)res);
-         continue;
+      res = sendto(sock, buf, bufLen, 0,
+                   (const struct sockaddr *) destAddr, destAddrLen);
+      if (res == bufLen) {
+         retVal = GDP_ERROR_SUCCESS;
+         break;
+      } else if (res != SOCKET_ERROR) { // No partial send shall happen
+         g_warning("%s: sendto returned unexpected value %d.\n",
+                   __FUNCTION__, (int) res);
+         retVal = GDP_ERROR_GENERAL;
+         break;
       }
 
-      sockErr = GetSockErr();
-      if (sockErr == SYSERR_EINTR) {
+      err = GetSysErr();
+      if (err == SYSERR_EINTR) {
          continue;
-      } else if (sockErr == SYSERR_EMSGSIZE) {
-         g_debug("%s: recv truncated.\n", __FUNCTION__);
-         continue;
-      } else if (SYSERR_WOULDBLOCK(sockErr)) {
-         retVal = GDP_ERROR_SUCCESS; // No more message in the recv queue.
+      } else if (err == SYSERR_EHOSTUNREACH) {
+         g_info("%s: sendto failed: host daemon unreachable.\n", __FUNCTION__);
+         retVal = GDP_ERROR_UNREACH;
+      } else if (err == SYSERR_EMSGSIZE) {
+         g_warning("%s: sendto failed: message too large.\n", __FUNCTION__);
+         retVal = GDP_ERROR_DATA_SIZE;
       } else {
-         /*
-          * Note: recv does not return SYSERR_EHOSTUNREACH.
-          */
-         g_info("%s: recv failed: error=%d.\n", __FUNCTION__, sockErr);
-         retVal = GDP_ERROR_INTERNAL;
+         g_warning("%s: sendto failed: error=%d.\n", __FUNCTION__, err);
+         retVal = GDP_ERROR_GENERAL;
+      }
+
+      break;
+
+   } while (TRUE);
+
+   return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpRecvFrom --
+ *
+ * Wrapper of recvfrom() for VMCI datagram socket.
+ *
+ * @param[in]     sock     VMCI datagram socket descriptor
+ * @param[out]    buf      Buffer pointer
+ * @param[in,out] bufLen   Buffer length on input,
+ *                         received data length on output
+ * @param[out]    srcAddr  Source VMCI datagram socket address
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpRecvFrom(SOCKET sock,                 // IN
+            char *buf,                   // OUT
+            int *bufLen,                 // IN/OUT
+            struct sockaddr_vm *srcAddr) // OUT
+{
+   GdpError retVal;
+
+   ASSERT(sock != INVALID_SOCKET);
+   ASSERT(buf != NULL && bufLen != NULL && *bufLen > 0);
+   ASSERT(srcAddr != NULL);
+
+   do {
+      long res;
+      int err;
+      socklen_t srcAddrLen = (socklen_t) sizeof *srcAddr;
+
+      res = recvfrom(sock, buf, *bufLen, 0,
+                     (struct sockaddr *) srcAddr, &srcAddrLen);
+      if (res >= 0) {
+         *bufLen = (int) res;
+         retVal = GDP_ERROR_SUCCESS;
+         break;
+      }
+
+      ASSERT(res == SOCKET_ERROR);
+      err = GetSysErr();
+      if (err == SYSERR_EINTR) {
+         continue;
+      } else if (err == SYSERR_EMSGSIZE) {
+         g_warning("%s: recvfrom failed: buffer size too small.\n",
+                   __FUNCTION__);
+         retVal = GDP_ERROR_DATA_SIZE;
+      } else { // Including EWOULDBLOCK
+         g_warning("%s: recvfrom failed: error=%d.\n", __FUNCTION__, err);
+         retVal = GDP_ERROR_GENERAL;
       }
 
-      break;
+      break;
+
+   } while (TRUE);
+
+   return retVal;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryRequestFree --
+ *
+ * Frees history request resources.
+ *
+ * @param[in] request  History request pointer
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpHistoryRequestFree(HistoryRequest *request) // IN
+{
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+   free(request);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryRequestFreeGFunc --
+ *
+ * GFunc called by GdpTaskClearHistoryRequestQueue.
+ *
+ * @param[in] item_data  History request pointer
+ * @param[in] user_data  Not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpHistoryRequestFreeGFunc(gpointer item_data, // IN
+                           gpointer user_data) // IN
+{
+   GdpHistoryRequestFree((HistoryRequest *) item_data);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskClearHistoryRequestQueue --
+ *
+ * Removes all the elements in history request queue.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx) // IN/OUT
+{
+   g_queue_foreach(&taskCtx->requests, GdpHistoryRequestFreeGFunc, NULL);
+   g_queue_clear(&taskCtx->requests);
+}
+
+
+/*
+ ******************************************************************************
+ * GdpGetHistoryCacheSizeLimit --
+ *
+ * Gets history cache size limit from tools config.
+ *
+ ******************************************************************************
+ */
+
+static guint32
+GdpGetHistoryCacheSizeLimit() // IN
+{
+   gint sizeLimit;
+
+   sizeLimit = GDP_CONFIG_GET_INT(CONFNAME_GDP_CACHE_SIZE,
+                                  GDP_DEFAULT_CACHE_SIZE_LIMIT);
+   if (sizeLimit != 0 &&
+       (sizeLimit < GDP_MIN_CACHE_SIZE_LIMIT ||
+        sizeLimit > GDP_MAX_CACHE_SIZE_LIMIT)) {
+      g_warning("%s: Configured history cache size limit %d exceeds range, "
+                "set to default value %d.\n",
+                __FUNCTION__, sizeLimit, GDP_DEFAULT_CACHE_SIZE_LIMIT);
+      sizeLimit = GDP_DEFAULT_CACHE_SIZE_LIMIT;
+   }
+
+   return (guint32) sizeLimit;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskIsHistoryCacheEnabled --
+ *
+ * Checks if history cache is enabled.
+ *
+ * @param[in] taskCtx  The task context
+ *
+ * @return TRUE if history cache size limit is not zero.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static inline Bool
+GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx) // IN
+{
+   return taskCtx->cache.sizeLimit != 0 ? TRUE : FALSE;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryCacheItemFree --
+ *
+ * Frees history cache item resources.
+ *
+ * @param[in] item  History cache item pointer
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpHistoryCacheItemFree(HistoryCacheItem *item) // IN
+{
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+   ASSERT(item != NULL);
+   free(item->topic);
+   free(item->token);
+   free(item->category);
+   free(item->data);
+   free(item);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpHistoryCacheItemFreeGFunc --
+ *
+ * GFunc called by GdpTaskClearHistoryCacheQueue.
+ *
+ * @param[in] item_data  History cache item pointer
+ * @param[in] user_data  Not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpHistoryCacheItemFreeGFunc(gpointer item_data, // IN
+                             gpointer user_data) // IN
+{
+   GdpHistoryCacheItemFree((HistoryCacheItem *) item_data);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskClearHistoryCacheQueue --
+ *
+ * Removes all the elements in history cache queue.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTaskClearHistoryCacheQueue(TaskContext *taskCtx) // IN/OUT
+{
+   g_queue_foreach(&taskCtx->cache.queue, GdpHistoryCacheItemFreeGFunc, NULL);
+   g_queue_clear(&taskCtx->cache.queue);
+
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskDeleteHistoryCacheTail --
+ *
+ * Deletes the tail of history cache queue and removes its reference.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx) // IN/OUT
+{
+   HistoryCacheItem *item;
+
+   ASSERT(!g_queue_is_empty(&taskCtx->cache.queue));
+
+   if (taskCtx->cache.currentLink ==
+       g_queue_peek_tail_link(&taskCtx->cache.queue)) {
+      taskCtx->cache.currentLink = NULL;
+   }
+
+   item = (HistoryCacheItem *) g_queue_pop_tail(&taskCtx->cache.queue);
+   ASSERT(item != NULL);
+   taskCtx->cache.size -= item->itemSize;
+   GdpHistoryCacheItemFree(item);
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskHistoryCachePushItem --
+ *
+ * Pushes the published guest data item into history cache queue.
+ *
+ * @param[in,out]      taskCtx     The task context
+ * @param[in]          createTime  UTC timestamp, in number of micro-
+ *                                 seconds since January 1, 1970 UTC.
+ * @param[in]          topic       Topic
+ * @param[in,optional] token       Token, can be NULL
+ * @param[in,optional] category    Category, can be NULL that defaults to
+ *                                 "application"
+ * @param[in]          data        Buffer containing data to publish
+ * @param[in]          dataLen     Buffer length
+ *
+ ******************************************************************************
+ */
+
+static void
+GdpTaskHistoryCachePushItem(TaskContext *taskCtx,  // IN/OUT
+                            gint64 createTime,     // IN
+                            const gchar *topic,    // IN
+                            const gchar *token,    // IN
+                            const gchar *category, // IN
+                            const gchar *data,     // IN
+                            guint32 dataLen)       // IN
+{
+   HistoryCacheItem *item;
+
+   ASSERT(topic != NULL);
+   ASSERT(data != NULL && dataLen > 0);
+   ASSERT(GdpTaskIsHistoryCacheEnabled(taskCtx));
+
+   item = (HistoryCacheItem *) Util_SafeMalloc(sizeof(HistoryCacheItem));
+   item->createTime = createTime;
+   item->topic = Util_SafeStrdup(topic);
+   item->token = Util_SafeStrdup(token);
+   item->category = Util_SafeStrdup(category);
+   item->data = (gchar *) Util_SafeMalloc(dataLen);
+   Util_Memcpy(item->data, data, dataLen);
+   item->dataLen = dataLen;
+
+   item->cacheTime = g_get_monotonic_time();
+   item->itemSize = ((guint32) sizeof(HistoryCacheItem))
+                    + GDP_STR_SIZE(item->topic)
+                    + GDP_STR_SIZE(item->token)
+                    + GDP_STR_SIZE(item->category)
+                    + item->dataLen;
+
+   while (taskCtx->cache.size + item->itemSize > taskCtx->cache.sizeLimit) {
+      GdpTaskDeleteHistoryCacheTail(taskCtx);
+   }
+
+   g_queue_push_head(&taskCtx->cache.queue, item);
+   taskCtx->cache.size += item->itemSize;
+
+   g_debug("%s: Current history cache size in bytes: %u, item count: %u.\n",
+           __FUNCTION__, taskCtx->cache.size,
+           g_queue_get_length(&taskCtx->cache.queue));
+}
+
+
+/*
+ ******************************************************************************
+ * GdpGetFormattedUtcTime --
+ *
+ * Converts UTC time to string in format like "2018-02-22T21:17:38.517Z".
+ *
+ * The caller must free the return value using g_free.
+ *
+ * @param[in] utcTime  Real wall-clock time
+ *                     Number of microseconds since January 1, 1970 UTC.
+ *
+ * @return Formatted timestamp string on success.
+ * @return NULL otherwise.
+ *
+ ******************************************************************************
+ */
+
+static gchar *
+GdpGetFormattedUtcTime(gint64 utcTime) // IN
+{
+   gchar *retVal = NULL;
+   GDateTime *utcDateTime;
+
+   utcDateTime = g_date_time_new_from_unix_utc(utcTime / USEC_PER_SECOND);
+   if (utcDateTime != NULL) {
+      gchar *dateToSecond; // YYYY-MM-DDTHH:MM:SS
+
+      dateToSecond = g_date_time_format(utcDateTime, "%FT%T");
+      if (dateToSecond != NULL) {
+         gint msec; // milliseconds
+
+         msec = (utcTime % USEC_PER_SECOND) / USEC_PER_MILLISECOND;
+         retVal = g_strdup_printf("%s.%03dZ", dateToSecond, msec);
+         g_free(dateToSecond);
+      }
+
+      g_date_time_unref(utcDateTime);
+   }
+
+   return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskBuildPacket --
+ *
+ * Builds JSON packet in the task context.
+ *
+ * @param[in,out]      taskCtx      The task context
+ * @param[in]          createTime   UTC timestamp, in number of micro-
+ *                                  seconds since January 1, 1970 UTC.
+ * @param[in]          topic        Topic
+ * @param[in,optional] token        Token, can be NULL
+ * @param[in,optional] category     Category, can be NULL that defaults to
+ *                                  "application"
+ * @param[in]          data         Buffer containing data to publish
+ * @param[in]          dataLen      Buffer length
+ * @param[in,optional] subscribers  For history data only, NULL for new data
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpTaskBuildPacket(TaskContext *taskCtx,     // IN/OUT
+                   gint64 createTime,        // IN
+                   const gchar *topic,       // IN
+                   const gchar *token,       // IN, OPTIONAL
+                   const gchar *category,    // IN, OPTIONAL
+                   const gchar *data,        // IN
+                   guint32 dataLen,          // IN
+                   const gchar *subscribers) // IN, OPTIONAL
+{
+   gchar base64Data[GDP_MAX_PACKET_LEN + 1]; // Add a space for NULL
+   gchar *subscribersLine = NULL;
+   gchar *formattedTime;
+   GdpError gdpErr;
+
+   ASSERT(topic != NULL);
+   ASSERT(data != NULL && dataLen > 0);
+
+   if (!Base64_Encode(data, dataLen, base64Data, sizeof base64Data, NULL)) {
+      g_info("%s: Base64_Encode failed, data length is %u.\n",
+             __FUNCTION__, dataLen);
+      return GDP_ERROR_DATA_SIZE;
+   }
+
+   if (subscribers != NULL && *subscribers != '\0') {
+      subscribersLine = g_strdup_printf(GDP_PACKET_JSON_LINE_SUBSCRIBERS,
+                                        subscribers);
+   }
+
+   formattedTime = GdpGetFormattedUtcTime(createTime);
+
+   ASSERT(taskCtx->packet == NULL);
+   taskCtx->packet = g_strdup_printf(GDP_PACKET_JSON,
+                                     ++taskCtx->sequence,
+                                     subscribersLine != NULL ?
+                                        subscribersLine : "",
+                                     formattedTime != NULL ?
+                                        formattedTime : "",
+                                     topic,
+                                     token != NULL ?
+                                        token : "",
+                                     category != NULL ?
+                                        category : "application",
+                                     base64Data);
+   taskCtx->packetLen = (guint32) strlen(taskCtx->packet);
+   if (taskCtx->packetLen > GDP_MAX_PACKET_LEN) {
+      g_info("%s: Packet length (%u) exceeds maximum limit (%u).\n",
+             __FUNCTION__, taskCtx->packetLen, GDP_MAX_PACKET_LEN);
+      GdpTaskDestroyPacket(taskCtx);
+      gdpErr = GDP_ERROR_DATA_SIZE;
+   } else {
+      gdpErr = GDP_ERROR_SUCCESS;
+   }
+
+   g_free(formattedTime);
+   g_free(subscribersLine);
+   return gdpErr;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskDestroyPacket --
+ *
+ * Destroys JSON packet in the task context.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTaskDestroyPacket(TaskContext *taskCtx) // IN/OUT
+{
+   g_free(taskCtx->packet);
+   taskCtx->packet = NULL;
+   taskCtx->packetLen = 0;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskOkToSend --
+ *
+ * Checks if current time is OK to send JSON packet to host side gdp daemon.
+ *
+ * @param[in] taskCtx  The task context
+ *
+ * @return TRUE if current time has passed the task context sendAfter time.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static inline Bool
+GdpTaskOkToSend(TaskContext *taskCtx) // IN
+{
+   return g_get_monotonic_time() >= taskCtx->sendAfter ?
+             TRUE : FALSE;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpTaskSendPacket --
+ *
+ * Sends JSON packet in the task context to host side gdp daemon.
+ *
+ * Datagram send is not buffered, it will not return EWOULDBLOCK.
+ * If host daemon is not running, error EHOSTUNREACH is returned.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GdpTaskSendPacket(TaskContext *taskCtx) // IN/OUT
+{
+   struct sockaddr_vm destAddr;
+   GdpError gdpErr;
+
+   ASSERT(gPluginState.sock != INVALID_SOCKET);
+   ASSERT(taskCtx->packet != NULL && taskCtx->packetLen > 0);
+
+   memset(&destAddr, 0, sizeof destAddr);
+   destAddr.svm_family = gPluginState.vmciFamily;
+   destAddr.svm_cid = VMCI_HOST_CONTEXT_ID;
+   destAddr.svm_port = GDPD_RECV_PORT; // No htons
+
+   gdpErr = GdpSendTo(gPluginState.sock, taskCtx->packet,
+                      (int) taskCtx->packetLen, &destAddr);
+   if (gdpErr == GDP_ERROR_SUCCESS) {
+      /*
+       * Updates sendAfter for next send, phase 1 / 2.
+       */
+      taskCtx->sendAfter = g_get_monotonic_time();
+      taskCtx->timeoutAt = taskCtx->sendAfter +
+                           GDP_WAIT_RESULT_TIMEOUT * USEC_PER_MILLISECOND;
+   } else {
+      GdpTaskDestroyPacket(taskCtx);
+      taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+   }
+
+   return gdpErr;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskUpdateHistoryCachePointerAndGetSubscribers --
+ *
+ * Updates history cache pointer and gets subscribers of the pointed item.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ * @return A string of subscription IDs separated by comma
+           (to be freed by caller),
+ *         or NULL if no match.
+ *
+ *****************************************************************************
+ */
+
+static gchar *
+GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
+   TaskContext *taskCtx) // IN/OUT
+{
+   gchar *subscribers = NULL;
+
+   if (taskCtx->cache.currentLink == NULL) {
+      /*
+       * Starts with the earliest history cache link.
+       */
+      taskCtx->cache.currentLink = g_queue_peek_tail_link(
+                                      &taskCtx->cache.queue);
+   }
+
+   while (taskCtx->cache.currentLink != NULL &&
+          !g_queue_is_empty(&taskCtx->requests)) {
+      HistoryCacheItem *item;
+      GList *requestList;
+
+      item = (HistoryCacheItem *) taskCtx->cache.currentLink->data;
+      requestList = g_queue_peek_tail_link(&taskCtx->requests);
+      while (requestList != NULL) {
+         HistoryRequest *request = (HistoryRequest *) requestList->data;
+         Bool requestDone = FALSE;
+
+         if (item->cacheTime > request->endCacheTime) {
+            requestDone = TRUE;
+         } else if (request->beginCacheTime < item->cacheTime &&
+                    item->cacheTime <= request->endCacheTime) {
+            if (subscribers == NULL) {
+               subscribers = g_strdup_printf("\"%s\"", request->id);
+            } else {
+               gchar *subscribersNew;
+               subscribersNew = g_strdup_printf("%s,\"%s\"",
+                                                subscribers, request->id);
+               g_free(subscribers);
+               subscribers = subscribersNew;
+            }
+
+            if (item->cacheTime == request->endCacheTime) {
+               requestDone = TRUE;
+            } else {
+               request->beginCacheTime = item->cacheTime;
+            }
+         }
+
+         if (requestDone) {
+            GList *requestListPrev = requestList->prev;
+            GdpHistoryRequestFree(request);
+            g_queue_delete_link(&taskCtx->requests, requestList);
+            requestList = requestListPrev;
+         } else {
+            requestList = requestList->prev;
+         }
+      }
+
+      if (subscribers != NULL) {
+         break;
+      }
+
+      taskCtx->cache.currentLink = taskCtx->cache.currentLink->prev;
+   }
+
+   return subscribers;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskPublishHistory --
+ *
+ * Publishes cached history guest data.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskPublishHistory(TaskContext *taskCtx) // IN/OUT
+{
+   GdpError gdpErr;
+   gchar *subscribers;
+   HistoryCacheItem *item;
+
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   ASSERT(taskCtx->mode == GDP_TASK_MODE_NONE &&
+          taskCtx->state == GDP_TASK_STATE_IDLE);
+
+   subscribers = GdpTaskUpdateHistoryCachePointerAndGetSubscribers(taskCtx);
+   if (subscribers == NULL) {
+      g_info("%s: No history item meets subscription requirement.\n",
+             __FUNCTION__);
+      goto cleanup;
+   }
+
+   ASSERT(taskCtx->cache.currentLink != NULL);
+   item = (HistoryCacheItem *) taskCtx->cache.currentLink->data;
+   /*
+    * Updates history cache pointer.
+    */
+   taskCtx->cache.currentLink = taskCtx->cache.currentLink->prev;
+
+   gdpErr = GdpTaskBuildPacket(taskCtx,
+                               item->createTime,
+                               item->topic,
+                               item->token,
+                               item->category,
+                               item->data,
+                               item->dataLen,
+                               subscribers);
+   if (gdpErr != GDP_ERROR_SUCCESS) {
+      /*
+       * Theoretically speaking, too many subscribers could cause JSON packet
+       * length exceed maximum limit and fail GdpTaskBuildPacket with
+       * GDP_ERROR_DATA_SIZE.
+       */
+      g_info("%s: Failed to build JSON packet for subscribers: [%s].\n",
+             __FUNCTION__, subscribers);
+      g_free(subscribers);
+      goto cleanup;
+   }
+   g_free(subscribers);
+
+   if (GdpTaskOkToSend(taskCtx)) {
+      gdpErr = GdpTaskSendPacket(taskCtx);
+      if (gdpErr != GDP_ERROR_SUCCESS) {
+         g_info("%s: Failed to send history JSON packet.\n",
+                __FUNCTION__);
+         goto cleanup;
+      }
+
+      taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1;
+   } else {
+      taskCtx->timeoutAt = taskCtx->sendAfter;
+      taskCtx->state = GDP_TASK_STATE_WAIT_TO_SEND;
+   }
+
+   taskCtx->mode = GDP_TASK_MODE_HISTORY;
+   g_debug("%s: Updated mode=%d, state=%d.\n",
+           __FUNCTION__, taskCtx->mode, taskCtx->state);
+   return;
+
+cleanup:
+   taskCtx->cache.currentLink = NULL;
+   GdpTaskClearHistoryRequestQueue(taskCtx);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessConfigChange --
+ *
+ * Processes config change.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessConfigChange(TaskContext *taskCtx) // IN/OUT
+{
+   guint32 sizeLimit = GdpGetHistoryCacheSizeLimit();
+
+   if (taskCtx->cache.sizeLimit == sizeLimit) {
+      return;
+   }
+
+   g_debug("%s: Current history cache size limit: %u, new value: %u.\n",
+           __FUNCTION__, taskCtx->cache.sizeLimit, sizeLimit);
+   taskCtx->cache.sizeLimit = sizeLimit;
+   while (taskCtx->cache.size > taskCtx->cache.sizeLimit) {
+      GdpTaskDeleteHistoryCacheTail(taskCtx);
+   }
+}
+
+
+/*
+ ******************************************************************************
+ * GdpJsonIsTokenOfKey --
+ *
+ * Checks if the JSON token represents the key.
+ *
+ * @param[in]  json   The JSON text
+ * @param[in]  token  Token
+ * @param[in]  key    Key name
+ *
+ * @return TRUE if token represents the key.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpJsonIsTokenOfKey(const char *json, // IN
+                    jsmntok_t *token, // IN
+                    const char *key)  // IN
+{
+   int tokenLen = token->end - token->start;
+   if (token->type == JSMN_STRING &&
+       token->size == 1 && // The token represents a key
+       (int) strlen(key) == tokenLen &&
+       strncmp(json + token->start, key, tokenLen) == 0) {
+      return TRUE;
+   }
+   return FALSE;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpJsonIsPublishResult --
+ *
+ * Checks if the JSON text represents a publish result.
+ *
+ * @param[in]  json     The JSON text
+ * @param[in]  tokens   Token array
+ * @param[in]  count    Token array count
+ * @param[out] request  Pointer to publish result
+ *
+ * @return TRUE if json represents a publish result.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpJsonIsPublishResult(const char *json,      // IN
+                       jsmntok_t *tokens,     // IN
+                       int count,             // IN
+                       PublishResult *result) // OUT
+{
+   int index;
+   int requiredKeys = GDP_RESULT_REQUIRED_KEYS;
+   gchar *diagnosis = NULL;
+
+   /*
+    * Loops over all keys of the root object.
+    */
+   for (index = 1; index < count; index++) {
+      int tokenLen;
+
+      if (GdpJsonIsTokenOfKey(json, &tokens[index], GDP_RESULT_SEQUENCE)) {
+         requiredKeys--;
+         index++;
+         result->sequence = g_ascii_strtoull(json + tokens[index].start,
+                                             NULL, 10);
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_RESULT_STATUS)) {
+         requiredKeys--;
+         index++;
+         tokenLen = tokens[index].end - tokens[index].start;
+         if ((int) strlen(GDP_RESULT_STATUS_OK) == tokenLen &&
+             strncmp(json + tokens[index].start,
+                     GDP_RESULT_STATUS_OK, tokenLen) == 0) {
+            result->statusOk = TRUE;
+         } else {
+            result->statusOk = FALSE;
+         }
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_RESULT_DIAGNOSIS)) {
+         index++;
+         tokenLen = tokens[index].end - tokens[index].start;
+         diagnosis = g_strndup(json + tokens[index].start, tokenLen);
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_RESULT_RATE_LIMIT)) {
+         requiredKeys--;
+         index++;
+         result->rateLimit = atoi(json + tokens[index].start);
+      }
+   }
+
+   if (requiredKeys == 0) {
+      result->diagnosis = diagnosis;
+      return TRUE;
+   } else {
+      g_free(diagnosis);
+      return FALSE;
+   }
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessPublishResult --
+ *
+ * Processes publish result.
+ *
+ * @param[in,out] taskCtx  The task context
+ * @param[in]     result   Publish result
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessPublishResult(TaskContext *taskCtx,  // IN/OUT
+                            PublishResult *result) // IN
+{
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   if (taskCtx->mode == GDP_TASK_MODE_NONE ||
+       (taskCtx->state != GDP_TASK_STATE_WAIT_FOR_RESULT1 &&
+        taskCtx->state != GDP_TASK_STATE_WAIT_FOR_RESULT2)) {
+      g_info("%s: Publish result not expected at mode=%d, state=%d.\n",
+             __FUNCTION__, taskCtx->mode, taskCtx->state);
+      return;
+   }
+
+   if (taskCtx->sequence != result->sequence) {
+      g_info("%s: Publish result sequence number not match.\n",
+             __FUNCTION__);
+      return;
+   }
+
+   if (!result->statusOk) {
+      g_info("%s: Publish failed: %s\n", __FUNCTION__,
+             result->diagnosis ? result->diagnosis : "");
+   }
+
+   if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
+      if (result->statusOk) {
+         if (GdpTaskIsHistoryCacheEnabled(taskCtx)) {
+            GdpTaskHistoryCachePushItem(taskCtx,
+                                        gPublishState.createTime,
+                                        gPublishState.topic,
+                                        gPublishState.token,
+                                        gPublishState.category,
+                                        gPublishState.data,
+                                        gPublishState.dataLen);
+         }
+
+         gPublishState.gdpErr = GDP_ERROR_SUCCESS;
+      } else {
+         gPublishState.gdpErr = GDP_ERROR_INVALID_DATA;
+      }
+
+      GdpSetEvent(gPublishState.eventGetResult);
+   }
+
+   GdpTaskDestroyPacket(taskCtx);
+   if (result->rateLimit > 0) {
+      /*
+       * Updates sendAfter for next send, phase 2 / 2.
+       */
+      taskCtx->sendAfter += (USEC_PER_SECOND / result->rateLimit);
+   }
+   taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+   taskCtx->mode = GDP_TASK_MODE_NONE;
+   taskCtx->state = GDP_TASK_STATE_IDLE;
+   g_debug("%s: Reset mode=%d, state=%d.\n",
+           __FUNCTION__, taskCtx->mode, taskCtx->state);
+}
+
+
+/*
+ ******************************************************************************
+ * GdpJsonIsHistoryRequest --
+ *
+ * Checks if the JSON text represents a history request.
+ *
+ * @param[in]  json     The JSON text
+ * @param[in]  tokens   Token array
+ * @param[in]  count    Token array count
+ * @param[out] request  Pointer to history request
+ *
+ * @return TRUE if json represents a history request.
+ * @return FALSE otherwise.
+ *
+ ******************************************************************************
+ */
+
+static Bool
+GdpJsonIsHistoryRequest(const char *json,        // IN
+                        jsmntok_t *tokens,       // IN
+                        int count,               // IN
+                        HistoryRequest *request) // OUT
+{
+   int index;
+   int requiredKeys = GDP_HISTORY_REQUEST_REQUIRED_KEYS;
+   guint64 pastSeconds = 0;
+
+   /*
+    * Loops over all keys of the root object
+    */
+   for (index = 1; index < count; index++) {
+      if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                              GDP_HISTORY_REQUEST_PAST_SECONDS)) {
+         requiredKeys--;
+         index++;
+         pastSeconds = g_ascii_strtoull(json + tokens[index].start, NULL, 10);
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_HISTORY_REQUEST_ID)) {
+         requiredKeys--;
+         index++;
+         Str_Strncpy(request->id, sizeof request->id,
+                     json + tokens[index].start,
+                     tokens[index].end - tokens[index].start);
+      }
+   }
+
+   if (requiredKeys == 0) {
+      request->endCacheTime = g_get_monotonic_time();
+      request->beginCacheTime = request->endCacheTime -
+                                (gint64)pastSeconds * USEC_PER_SECOND;
+      return TRUE;
+   }
+
+   return FALSE;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskPushHistoryRequest --
+ *
+ * Pushes new history request to queue and resets history cache pointer.
+ *
+ * @param[in,out] taskCtx  The task context
+ * @param[in]     request  New history request
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskPushHistoryRequest(TaskContext *taskCtx,    // IN/OUT
+                          HistoryRequest *request) // IN
+{
+   HistoryRequest *requestCopy;
+
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   if (!GdpTaskIsHistoryCacheEnabled(taskCtx)) {
+      g_info("%s: History cache not enabled.\n", __FUNCTION__);
+      return;
+   }
+
+   if (request->beginCacheTime >= request->endCacheTime) {
+      g_info("%s: Invalid history request.\n", __FUNCTION__);
+      return;
+   }
+
+   if (request->beginCacheTime < 0) {
+      request->beginCacheTime = 0;
+   }
+
+   requestCopy = (HistoryRequest *) Util_SafeMalloc(sizeof *request);
+   Util_Memcpy(requestCopy, request, sizeof *request);
+   /*
+    * Note: each request comes with a unique subscription ID.
+    */
+   g_queue_push_head(&taskCtx->requests, requestCopy);
+   /*
+    * Resets history cache pointer.
+    */
+   taskCtx->cache.currentLink = NULL;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessNetwork --
+ *
+ * Processes the network event.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
+{
+   GdpError gdpErr;
+   char buf[GDP_MAX_PACKET_LEN + 1]; // Adds a space for NULL
+   int bufLen = (int) sizeof buf - 1;
+   struct sockaddr_vm srcAddr;
+   jsmn_parser parser;
+   jsmntok_t tokens[16]; // We expect no more than 16 tokens
+   int retVal;
+   Bool isPublishResult;
+   Bool isHistoryRequest;
+   PublishResult result = { 0 };
+   HistoryRequest request = { 0 };
+
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   gdpErr = GdpRecvFrom(gPluginState.sock, buf, &bufLen, &srcAddr);
+   if (gdpErr != GDP_ERROR_SUCCESS || bufLen <= 0) {
+      return;
+   }
+
+   if (srcAddr.svm_cid != VMCI_HOST_CONTEXT_ID) {
+      g_info("%s: Unexpected source svm_cid: %u.\n",
+             __FUNCTION__, srcAddr.svm_cid);
+      return;
+   }
+
+   buf[bufLen] = '\0';
+
+   jsmn_init(&parser);
+   retVal = jsmn_parse(&parser, buf, bufLen,
+                       tokens, (unsigned int) ARRAYSIZE(tokens));
+   if (retVal < 0) {
+      g_info("%s: Error %d while parsing JSON:\n%s\n",
+             __FUNCTION__, retVal, buf);
+      return;
+   }
+
+   /*
+    * The top-level element should be an object.
+    */
+   if (retVal < 1 || tokens[0].type != JSMN_OBJECT) {
+      g_info("%s: Invalid JSON:\n%s\n", __FUNCTION__, buf);
+      return;
+   }
+
+   isPublishResult = FALSE;
+   isHistoryRequest = FALSE;
+   if (srcAddr.svm_port == GDPD_RECV_PORT) {
+      isPublishResult = GdpJsonIsPublishResult(buf, tokens,
+                                               retVal, &result);
+      if (!isPublishResult) {
+         isHistoryRequest = GdpJsonIsHistoryRequest(buf, tokens,
+                                                    retVal, &request);
+      }
+   } else {
+      isHistoryRequest = GdpJsonIsHistoryRequest(buf, tokens,
+                                                 retVal, &request);
+      if (!isHistoryRequest) {
+         isPublishResult = GdpJsonIsPublishResult(buf, tokens,
+                                                  retVal, &result);
+      }
+   }
+
+   if (isPublishResult) {
+      GdpTaskProcessPublishResult(taskCtx, &result);
+      g_free(result.diagnosis);
+   } else if (isHistoryRequest) {
+      GdpTaskPushHistoryRequest(taskCtx, &request);
+   } else {
+      g_info("%s: Unknown JSON:\n%s\n", __FUNCTION__, buf);
+   }
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessPublish --
+ *
+ * Processes the publish event.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessPublish(TaskContext *taskCtx) // IN/OUT
+{
+   GdpError gdpErr;
+
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   ASSERT(taskCtx->mode != GDP_TASK_MODE_PUBLISH);
+
+   if (taskCtx->mode != GDP_TASK_MODE_NONE) {
+      g_debug("%s: Set publish pending.\n", __FUNCTION__);
+      ASSERT(!taskCtx->publishPending);
+      taskCtx->publishPending = TRUE;
+      return;
+   }
+
+   ASSERT(taskCtx->state == GDP_TASK_STATE_IDLE);
+
+   gdpErr = GdpTaskBuildPacket(taskCtx,
+                               gPublishState.createTime,
+                               gPublishState.topic,
+                               gPublishState.token,
+                               gPublishState.category,
+                               gPublishState.data,
+                               gPublishState.dataLen,
+                               NULL);
+   if (gdpErr != GDP_ERROR_SUCCESS) {
+      goto fail;
+   }
+
+   if (GdpTaskOkToSend(taskCtx)) {
+      gdpErr = GdpTaskSendPacket(taskCtx);
+      if (gdpErr != GDP_ERROR_SUCCESS) {
+         goto fail;
+      }
+
+      taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1;
+   } else {
+      taskCtx->timeoutAt = taskCtx->sendAfter;
+      taskCtx->state = GDP_TASK_STATE_WAIT_TO_SEND;
+   }
+
+   taskCtx->mode = GDP_TASK_MODE_PUBLISH;
+   g_debug("%s: Updated mode=%d, state=%d.\n",
+           __FUNCTION__, taskCtx->mode, taskCtx->state);
+   return;
+
+fail:
+   gPublishState.gdpErr = gdpErr;
+   GdpSetEvent(gPublishState.eventGetResult);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskProcessTimeout --
+ *
+ * Processes wait timeout.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpTaskProcessTimeout(TaskContext *taskCtx) // IN/OUT
+{
+   GdpError gdpErr;
+
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   ASSERT(taskCtx->mode != GDP_TASK_MODE_NONE &&
+          taskCtx->state != GDP_TASK_STATE_IDLE);
+
+   if (taskCtx->state == GDP_TASK_STATE_WAIT_TO_SEND ||
+       taskCtx->state == GDP_TASK_STATE_WAIT_FOR_RESULT1) {
+      gdpErr = GdpTaskSendPacket(taskCtx);
+      if (gdpErr != GDP_ERROR_SUCCESS) {
+         goto fail;
+      }
+
+      if (taskCtx->state == GDP_TASK_STATE_WAIT_TO_SEND) {
+         taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1;
+      } else {
+         taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT2;
+      }
+
+      g_debug("%s: Updated mode=%d, state=%d.\n",
+              __FUNCTION__, taskCtx->mode, taskCtx->state);
+   } else if (taskCtx->state == GDP_TASK_STATE_WAIT_FOR_RESULT2) {
+      g_warning("%s: Wait for publish result timed out.\n", __FUNCTION__);
+      GdpTaskDestroyPacket(taskCtx);
+      taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+      gdpErr = GDP_ERROR_TIMEOUT;
+      goto fail;
+   } else {
+      NOT_REACHED();
+   }
+
+   return;
+
+fail:
+   if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
+      gPublishState.gdpErr = gdpErr;
+      GdpSetEvent(gPublishState.eventGetResult);
+   }
+
+   taskCtx->state = GDP_TASK_STATE_IDLE;
+   taskCtx->mode = GDP_TASK_MODE_NONE;
+   g_debug("%s: Reset mode=%d, state=%d.\n",
+           __FUNCTION__, taskCtx->mode, taskCtx->state);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskGetTimeout --
+ *
+ * Gets timeout value for the next wait call.
+ *
+ * @param[in] taskCtx  The task context
+ *
+ *****************************************************************************
+ */
+
+static int
+GdpTaskGetTimeout(TaskContext *taskCtx) // IN
+{
+   gint64 curTime;
+   gint64 timeout;
+
+   if (taskCtx->timeoutAt == GDP_TIMEOUT_AT_INFINITE) {
+      return GDP_WAIT_INFINITE;
+   }
+
+   curTime = g_get_monotonic_time();
+   if (curTime >= taskCtx->timeoutAt) {
+      return 0;
+   }
+
+   timeout = (taskCtx->timeoutAt - curTime) / USEC_PER_MILLISECOND;
+   return (int)timeout;
+}
+
 
-   } while (TRUE);
+/*
+ *****************************************************************************
+ * GdpTaskProcessEvents --
+ *
+ * Processes task events.
+ *
+ * @param[in,out] taskCtx    The task context
+ * @param[in]     taskEvent  Task event to process
+ *
+ *****************************************************************************
+ */
 
-   return retVal;
+static void
+GdpTaskProcessEvents(TaskContext *taskCtx,   // IN/OUT
+                     GdpTaskEvent taskEvent) // IN
+{
+   switch (taskEvent) {
+      case GDP_TASK_EVENT_CONFIG:
+         GdpResetEvent(gPluginState.eventConfig);
+         GdpTaskProcessConfigChange(taskCtx);
+         break;
+      case GDP_TASK_EVENT_NETWORK:
+         GdpTaskProcessNetwork(taskCtx);
+         break;
+      case GDP_TASK_EVENT_PUBLISH:
+         GdpResetEvent(gPublishState.eventPublish);
+         GdpTaskProcessPublish(taskCtx);
+         break;
+      case GDP_TASK_EVENT_TIMEOUT:
+         GdpTaskProcessTimeout(taskCtx);
+         break;
+      default:
+         //GDP_TASK_EVENT_NONE
+         //GDP_TASK_EVENT_STOP
+         NOT_REACHED();
+   }
 }
 
 
 /*
  ******************************************************************************
- * GdpWaitForEvent --
+ * GdpTaskWaitForEvents --
  *
- * Waits for the stop event object/fd signalled for vmtoolsd shutdown,
- * network send/receive event ready or timeout.
+ * Waits for the following events or timeout:
+ *    The stop event
+ *    The config event
+ *    The network event
+ *    The publish event
  *
- * @param[in]    netEvent GOS specific network send/receive event flag
- * @param[in]    timeout  Timeout value in milliseconds,
+ * @param[in]  timeout    Timeout value in milliseconds,
  *                        negative value means an infinite timeout,
- *                        zero means no wait
+ *                        zero means no wait.
+ * @param[out] taskEvent  Return which event is signalled
+ *                        if no error happens.
  *
- * @return GDP_ERROR_SUCCESS on specified network event ready.
- * @return Other GdpError codes otherwise.
+ * @return GDP_ERROR_SUCCESS if no error happens.
+ * @return Other GdpError code otherwise.
  *
  ******************************************************************************
  */
 
 static GdpError
-GdpWaitForEvent(int netEvent, int timeout)
+GdpTaskWaitForEvents(int timeout,             // IN
+                     GdpTaskEvent *taskEvent) // OUT
 {
 #if defined(_WIN32)
    int res;
@@ -519,54 +2288,56 @@ GdpWaitForEvent(int netEvent, int timeout)
    gint64 startTime;
    GdpError retVal;
 
-   ASSERT(netEvent == FD_READ || netEvent == FD_WRITE);
-   ASSERT(pluginData.sock != INVALID_SOCKET);
+   ASSERT(Atomic_ReadBool(&gPluginState.started));
 
    /*
-    * Reset the send-recv event object.
+    * Resets the network event object.
     */
-   WSAResetEvent(pluginData.eventSendRecv);
+   WSAResetEvent(gPluginState.eventNetwork);
 
    /*
-    * Associate the send-recv event object with the specified network event
+    * Associates the network event object with FD_READ network event
     * in the socket.
     */
-   res = WSAEventSelect(pluginData.sock, pluginData.eventSendRecv, netEvent);
+   res = WSAEventSelect(gPluginState.sock, gPluginState.eventNetwork, FD_READ);
    if (res != 0) {
-      g_info("%s: WSAEventSelect failed: error=%d.\n",
-             __FUNCTION__, WSAGetLastError());
-      return GDP_ERROR_INTERNAL;
+      g_warning("%s: WSAEventSelect failed: error=%d.\n",
+                __FUNCTION__, WSAGetLastError());
+      return GDP_ERROR_GENERAL;
    }
 
-   localTimeout = (DWORD)(timeout >= 0 ? timeout : WSA_INFINITE);
+   retVal = GDP_ERROR_SUCCESS;
+
+   localTimeout = (timeout >= 0 ? (DWORD) timeout : WSA_INFINITE);
    if (timeout > 0) {
       startTime = g_get_monotonic_time();
    } else {
-      startTime = 0; // Deal with [-Werror=maybe-uninitialized]
+      startTime = 0; // Deals with [-Werror=maybe-uninitialized]
    }
 
    while (TRUE) {
-      WSAEVENT eventObjects[] = {pluginData.eventStop,
-                                 pluginData.eventSendRecv};
+      WSAEVENT eventObjects[] = { gPluginState.eventStop,
+                                  gPluginState.eventConfig,
+                                  gPluginState.eventNetwork,
+                                  gPublishState.eventPublish };
       DWORD waitRes;
 
       waitRes = WSAWaitForMultipleEvents((DWORD)ARRAYSIZE(eventObjects),
                                          eventObjects,
                                          FALSE, localTimeout, TRUE);
       if (waitRes == WSA_WAIT_EVENT_0) {
-         /*
-          * Main thread has set the stop event object to interrupt
-          * pool thread for vmtoolsd shutdown.
-          */
-         retVal = GDP_ERROR_STOP;
+         *taskEvent = GDP_TASK_EVENT_STOP;
          break;
       } else if (waitRes == (WSA_WAIT_EVENT_0 + 1)) {
+         *taskEvent = GDP_TASK_EVENT_CONFIG;
+         break;
+      } else if (waitRes == (WSA_WAIT_EVENT_0 + 2)) {
          WSANETWORKEVENTS networkEvents;
-         res = WSAEnumNetworkEvents(pluginData.sock, NULL, &networkEvents);
+         res = WSAEnumNetworkEvents(gPluginState.sock, NULL, &networkEvents);
          if (res != 0) {
-            g_info("%s: WSAEnumNetworkEvents failed: error=%d.\n",
-                   __FUNCTION__, WSAGetLastError());
-            retVal = GDP_ERROR_INTERNAL;
+            g_warning("%s: WSAEnumNetworkEvents failed: error=%d.\n",
+                      __FUNCTION__, WSAGetLastError());
+            retVal = GDP_ERROR_GENERAL;
             break;
          }
 
@@ -576,14 +2347,17 @@ GdpWaitForEvent(int netEvent, int timeout)
           * since WSAEnumNetworkEvents should have returned WSAENETDOWN
           * if the error condition exists.
           */
-         if (networkEvents.lNetworkEvents & netEvent) {
-            retVal = GDP_ERROR_SUCCESS;
+         if (networkEvents.lNetworkEvents & FD_READ) {
+            *taskEvent = GDP_TASK_EVENT_NETWORK;
          } else { // Not expected
-            g_info("%s: Unexpected network event from WSAEnumNetworkEvents.\n",
-                   __FUNCTION__);
-            retVal = GDP_ERROR_INTERNAL;
+            g_warning("%s: Unexpected network event.\n",
+                      __FUNCTION__);
+            retVal = GDP_ERROR_GENERAL;
          }
 
+         break;
+      } else if (waitRes == (WSA_WAIT_EVENT_0 + 3)) {
+         *taskEvent = GDP_TASK_EVENT_PUBLISH;
          break;
       } else if (waitRes == WSA_WAIT_IO_COMPLETION) {
          gint64 curTime;
@@ -595,9 +2369,9 @@ GdpWaitForEvent(int netEvent, int timeout)
          }
 
          curTime = g_get_monotonic_time();
-         passedTime = (curTime - startTime) / 1000;
+         passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
          if (passedTime >= localTimeout) {
-            retVal = GDP_ERROR_TIMEOUT;
+            *taskEvent = GDP_TASK_EVENT_TIMEOUT;
             break;
          }
 
@@ -605,60 +2379,66 @@ GdpWaitForEvent(int netEvent, int timeout)
          localTimeout -= (DWORD)passedTime;
          continue;
       } else if (waitRes == WSA_WAIT_TIMEOUT) {
-         retVal = GDP_ERROR_TIMEOUT;
+         *taskEvent = GDP_TASK_EVENT_TIMEOUT;
          break;
       } else { // WSA_WAIT_FAILED
-         g_info("%s: WSAWaitForMultipleEvents failed: error=%d.\n",
-                __FUNCTION__, WSAGetLastError());
-         retVal = GDP_ERROR_INTERNAL;
+         g_warning("%s: WSAWaitForMultipleEvents failed: error=%d.\n",
+                   __FUNCTION__, WSAGetLastError());
+         retVal = GDP_ERROR_GENERAL;
          break;
       }
    }
 
    /*
-    * Cancel the association.
+    * Cancels the association.
     */
-   WSAEventSelect(pluginData.sock, NULL, 0);
+   WSAEventSelect(gPluginState.sock, NULL, 0);
 
    return retVal;
 
 #else
 
    gint64 startTime;
-   GdpError retVal;
+   GdpError retVal = GDP_ERROR_SUCCESS;
 
-   ASSERT(netEvent == POLLIN || netEvent == POLLOUT);
-   ASSERT(pluginData.sock != INVALID_SOCKET);
+   ASSERT(Atomic_ReadBool(&gPluginState.started));
 
    if (timeout > 0) {
       startTime = g_get_monotonic_time();
    } else {
-      startTime = 0; // Deal with [-Werror=maybe-uninitialized]
+      startTime = 0; // Deals with [-Werror=maybe-uninitialized]
    }
 
    while (TRUE) {
-      struct pollfd fds[2];
+      struct pollfd fds[4];
       int res;
 
-      fds[0].fd = pluginData.eventStop;
+      fds[0].fd = gPluginState.eventStop;
       fds[0].events = POLLIN;
       fds[0].revents = 0;
-      fds[1].fd = pluginData.sock;
-      fds[1].events = (short)netEvent;
+      fds[1].fd = gPluginState.eventConfig;
+      fds[1].events = POLLIN;
       fds[1].revents = 0;
+      fds[2].fd = gPluginState.sock;
+      fds[2].events = POLLIN;
+      fds[2].revents = 0;
+      fds[3].fd = gPublishState.eventPublish;
+      fds[3].events = POLLIN;
+      fds[3].revents = 0;
 
       res = poll(fds, ARRAYSIZE(fds), timeout);
       if (res > 0) {
          if (fds[0].revents & POLLIN) {
-            /*
-             * Check comments in _WIN32.
-             */
-            retVal = GDP_ERROR_STOP;
-         } else if (fds[1].revents & netEvent) {
-            retVal = GDP_ERROR_SUCCESS;
+            *taskEvent = GDP_TASK_EVENT_STOP;
+         } else if (fds[1].revents & POLLIN) {
+            *taskEvent = GDP_TASK_EVENT_CONFIG;
+         } else if (fds[2].revents & POLLIN) {
+            *taskEvent = GDP_TASK_EVENT_NETWORK;
+         } else if (fds[3].revents & POLLIN) {
+            *taskEvent = GDP_TASK_EVENT_PUBLISH;
          } else { // Not expected
-            g_info("%s: Unexpected event from poll.\n", __FUNCTION__);
-            retVal = GDP_ERROR_INTERNAL;
+            g_warning("%s: Unexpected event.\n", __FUNCTION__);
+            retVal = GDP_ERROR_GENERAL;
          }
 
          break;
@@ -673,26 +2453,26 @@ GdpWaitForEvent(int netEvent, int timeout)
             }
 
             curTime = g_get_monotonic_time();
-            passedTime = (curTime - startTime) / 1000;
-            if (passedTime >= (gint64)timeout) {
-               retVal = GDP_ERROR_TIMEOUT;
+            passedTime = (curTime - startTime) / USEC_PER_MILLISECOND;
+            if (passedTime >= timeout) {
+               *taskEvent = GDP_TASK_EVENT_TIMEOUT;
                break;
             }
 
             startTime = curTime;
-            timeout -= (int)passedTime;
+            timeout -= (int) passedTime;
             continue;
          } else {
-            g_info("%s: poll failed: error=%d.\n", __FUNCTION__, err);
-            retVal = GDP_ERROR_INTERNAL;
+            g_warning("%s: poll failed: error=%d.\n", __FUNCTION__, err);
+            retVal = GDP_ERROR_GENERAL;
             break;
          }
       } else if (res == 0) {
-         retVal = GDP_ERROR_TIMEOUT;
+         *taskEvent = GDP_TASK_EVENT_TIMEOUT;
          break;
       } else {
-         g_info("%s: Unexpected poll return: %d.\n", __FUNCTION__, res);
-         retVal = GDP_ERROR_INTERNAL;
+         g_warning("%s: Unexpected poll return: %d.\n", __FUNCTION__, res);
+         retVal = GDP_ERROR_GENERAL;
          break;
       }
    }
@@ -703,265 +2483,480 @@ GdpWaitForEvent(int netEvent, int timeout)
 
 
 /*
- ******************************************************************************
- * GdpSend --
+ *****************************************************************************
+ * GdpTaskCtxInit --
  *
- * Sends guest data to host side gdp daemon.
+ * Initializes the task context states and resources.
  *
- * @param[in]    buf      Send data buffer pointer
- * @param[in]    len      Send data buffer length
- * @param[in]    timeout  Timeout value in milliseconds,
- *                        negative value means an infinite timeout,
- *                        zero means no wait
+ * @param[out] taskCtx  The task context
  *
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ *****************************************************************************
+ */
+
+static void
+GdpTaskCtxInit(TaskContext *taskCtx) // OUT
+{
+   taskCtx->mode = GDP_TASK_MODE_NONE;
+   taskCtx->state = GDP_TASK_STATE_IDLE;
+   taskCtx->publishPending = FALSE;
+
+   g_queue_init(&taskCtx->cache.queue);
+   taskCtx->cache.sizeLimit = GdpGetHistoryCacheSizeLimit();
+   taskCtx->cache.size = 0;
+   taskCtx->cache.currentLink = NULL;
+
+   g_queue_init(&taskCtx->requests);
+
+   taskCtx->sequence = 0;
+   taskCtx->packet = NULL;
+   taskCtx->packetLen = 0;
+
+   taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE;
+   taskCtx->sendAfter = GDP_SEND_AFTER_ANY_TIME;
+}
+
+
+/*
+ *****************************************************************************
+ * GdpTaskCtxDestroy --
  *
- ******************************************************************************
+ * Destroys the task context resources.
+ *
+ * @param[in,out] taskCtx  The task context
+ *
+ *****************************************************************************
  */
 
-static GdpError
-GdpSend(const char *buf, // IN
-        int len,         // IN
-        int timeout)     // IN
+static void
+GdpTaskCtxDestroy(TaskContext *taskCtx) // IN/OUT
 {
-   GdpError retVal;
+   GdpTaskClearHistoryCacheQueue(taskCtx);
+   GdpTaskClearHistoryRequestQueue(taskCtx);
+   GdpTaskDestroyPacket(taskCtx);
+}
+
+
+/*
+ *****************************************************************************
+ * GdpThreadTask --
+ *
+ * The gdp task thread routine.
+ *
+ * @param[in] ctx   The application context
+ * @param[in] data  Data pointer, not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpThreadTask(ToolsAppCtx *ctx, // IN
+              void *data)       // IN
+{
+   TaskContext taskCtx;
 
-   ASSERT(buf != NULL && len > 0);
-   ASSERT(pluginData.sock != INVALID_SOCKET);
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   GdpTaskCtxInit(&taskCtx);
 
    do {
-      long res;
-      int sockErr;
+      int timeout;
+      GdpError gdpErr;
+      GdpTaskEvent taskEvent = GDP_TASK_EVENT_NONE;
 
-      res = send(pluginData.sock, buf, len, 0);
-      if (res >= 0) {
-         retVal = GDP_ERROR_SUCCESS;
-         break;
+      /*
+       * Part 1 inside the loop:
+       * Checks and processes one pending event at idle state.
+       */
+
+      if (taskCtx.mode == GDP_TASK_MODE_NONE) {
+         ASSERT(taskCtx.state == GDP_TASK_STATE_IDLE);
+         ASSERT(taskCtx.timeoutAt == GDP_TIMEOUT_AT_INFINITE);
+
+         if (taskCtx.publishPending) { // Higher priority
+            taskCtx.publishPending = FALSE;
+            GdpTaskProcessPublish(&taskCtx);
+         } else if (!g_queue_is_empty(&taskCtx.requests)) {
+            /*
+             * History request pending.
+             *
+             * GdpTaskPublishHistory clears history request queue if it fails
+             * to kick off the state machine.
+             */
+            GdpTaskPublishHistory(&taskCtx);
+         }
+      }
+
+      /*
+       * Part 2 inside the loop:
+       * Gets and processes one event at a time, then loops back to part 1
+       * except for the stop event.
+       */
+
+      timeout = GdpTaskGetTimeout(&taskCtx);
+      if (timeout == 0) {
+         GdpTaskProcessEvents(&taskCtx, GDP_TASK_EVENT_TIMEOUT);
+         continue;
       }
 
-      sockErr = GetSockErr();
-      if (sockErr == SYSERR_EINTR) {
+      /*
+       * timeout == GDP_WAIT_INFINITE means taskCtx.mode == GDP_TASK_MODE_NONE
+       * and taskCtx.state == GDP_TASK_STATE_IDLE. There should be no pending
+       * publish event or history request in this case.
+       */
+      ASSERT(timeout != GDP_WAIT_INFINITE ||
+             (!taskCtx.publishPending &&
+              g_queue_is_empty(&taskCtx.requests)));
+
+      gdpErr = GdpTaskWaitForEvents(timeout, &taskEvent);
+      if (gdpErr != GDP_ERROR_SUCCESS) {
          continue;
-      } else if (SYSERR_WOULDBLOCK(sockErr)) {
+      }
+
+      if (taskEvent == GDP_TASK_EVENT_STOP) {
          /*
-          * Datagram send is not buffered, if host daemon is not running,
-          * send returns error EHOSTUNREACH. In theory, this case should
-          * not happen, we just follow standard async socket programming
-          * paradigm here.
+          * In case the publish event comes at the same time,
+          * only the stop event is handled, so do not check
+          * (taskCtx.mode == GDP_TASK_MODE_PUBLISH) here.
           */
-         GdpError err;
+         gPublishState.gdpErr = GDP_ERROR_STOP;
+         GdpSetEvent(gPublishState.eventGetResult);
+         break;
+      }
 
-         g_info("%s: Gdp send would block.\n", __FUNCTION__);
+      GdpTaskProcessEvents(&taskCtx, taskEvent);
 
-         err = GdpWaitForEvent(SOCK_WRITE, timeout);
-         if (err == GDP_ERROR_SUCCESS) {
-            continue;
-         }
+   } while (!Atomic_ReadBool(&gPluginState.stopped));
 
-         retVal = err;
-      } else if (sockErr == SYSERR_EHOSTUNREACH) {
-         g_info("%s: send failed: host daemon unreachable.\n", __FUNCTION__);
-         retVal = GDP_ERROR_UNREACH;
-      } else if (sockErr == SYSERR_EMSGSIZE) {
-         g_info("%s: send failed: message too large.\n", __FUNCTION__);
-         retVal = GDP_ERROR_SEND_SIZE;
-      } else {
-         g_info("%s: send failed: error=%d.\n", __FUNCTION__, sockErr);
-         retVal = GDP_ERROR_INTERNAL;
-      }
+   GdpTaskCtxDestroy(&taskCtx);
 
-      break;
+   g_debug("%s: Exiting ...\n", __FUNCTION__);
+}
 
-   } while (TRUE);
 
-   return retVal;
+/*
+ *****************************************************************************
+ * GdpThreadInterrupt --
+ *
+ * Interrupts the gdp task thread to exit.
+ *
+ * @param[in] ctx   The application context
+ * @param[in] data  Data pointer, not used
+ *
+ *****************************************************************************
+ */
+
+static void
+GdpThreadInterrupt(ToolsAppCtx *ctx, // IN
+                   void *data)       // IN
+{
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+   ASSERT(Atomic_ReadBool(&gPluginState.started));
+   Atomic_WriteBool(&gPluginState.stopped, TRUE);
+   GdpSetEvent(gPluginState.eventStop);
 }
 
 
 /*
  ******************************************************************************
- * GdpRecv --
- *
- * Receives reply from host side gdp daemon.
+ * GdpInit --
  *
- * @param[out]    buf      Receive buffer pointer
- * @param[in,out] len      Receive buffer length on input,
- *                         reply data length on output
- * @param[in]     timeout  Timeout value in milliseconds,
- *                         negative value means an infinite timeout,
- *                         zero means no wait
+ * Initializes plugin state data and resources.
  *
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @param[in] ctx  The application context
  *
  ******************************************************************************
  */
 
-static GdpError
-GdpRecv(char *buf,   // OUT
-        int *len,    // IN/OUT
-        int timeout) // IN
+static void
+GdpInit(ToolsAppCtx *ctx) // IN
 {
-   GdpError retVal;
-
-   ASSERT(buf != NULL && len != NULL && *len > 0);
-   ASSERT(pluginData.sock != INVALID_SOCKET);
+   gPluginState.ctx = ctx;
+   Atomic_WriteBool(&gPluginState.started, FALSE);
 
-   do {
-      long res;
-      int sockErr;
-
-      res = recv(pluginData.sock, buf, *len, 0);
-      if (res >= 0) {
-         *len = (int)res;
-         retVal = GDP_ERROR_SUCCESS;
-         break;
-      }
+#if defined(_WIN32)
+   gPluginState.wsaStarted = FALSE;
+#endif
 
-      sockErr = GetSockErr();
-      if (sockErr == SYSERR_EINTR) {
-         continue;
-      } else if (SYSERR_WOULDBLOCK(sockErr)) {
-         GdpError err = GdpWaitForEvent(SOCK_READ, timeout);
-         if (err == GDP_ERROR_SUCCESS) {
-            continue;
-         }
+   gPluginState.vmciFd = -1;
+   gPluginState.vmciFamily = -1;
+   gPluginState.sock = INVALID_SOCKET;
+#if defined(_WIN32)
+   gPluginState.eventNetwork = GDP_INVALID_EVENT;
+#endif
 
-         retVal = err;
-      } else if (sockErr == SYSERR_EMSGSIZE) {
-         g_info("%s: recv failed: buffer size too small.\n", __FUNCTION__);
-         retVal = GDP_ERROR_RECV_SIZE;
-      } else {
-         g_info("%s: recv failed: error=%d.\n", __FUNCTION__, sockErr);
-         retVal = GDP_ERROR_INTERNAL;
-      }
+   gPluginState.eventStop = GDP_INVALID_EVENT;
+   Atomic_WriteBool(&gPluginState.stopped, FALSE);
 
-      break;
+   gPluginState.eventConfig = GDP_INVALID_EVENT;
 
-   } while (TRUE);
+   gPublishState.eventPublish = GDP_INVALID_EVENT;
 
-   return retVal;
+   gPublishState.eventGetResult = GDP_INVALID_EVENT;
 }
 
 
 /*
  ******************************************************************************
- * GdpPublish --
+ * GdpStart --
  *
- * Publishes guest data to host side gdp daemon.
- *
- * @param[in]              msg       Buffer containing guest data to publish
- * @param[in]              msgLen    Guest data length
- * @param[out,optional]    reply     Buffer to receive reply from gdp daemon
- * @param[in,out,optional] replyLen  NULL when param reply is NULL, otherwise:
- *                                   reply buffer length on input,
- *                                   reply data length on output
+ * Creates plugin resources and starts the task thread for data publishing.
  *
- * @return GDP_ERROR_SUCCESS on success.
- * @return Other GdpError codes otherwise.
+ * @return TRUE on success.
+ * @return FALSE otherwise.
  *
  ******************************************************************************
  */
 
-static GdpError
-GdpPublish(const char *msg, // IN
-           int msgLen,      // IN
-           char *reply,     // OUT, OPTIONAL
-           int *replyLen)   // IN/OUT, OPTIONAL
+static Bool
+GdpStart(void)
 {
-   static GMutex mutex;
-
-   GdpError err;
-   char altRecvBuf[GDP_SEND_RECV_BUF_LEN]; // Enough for maximum size datagram
-   int altRecvBufLen = (int) sizeof altRecvBuf;
-   char *recvBuf;
-   int *recvBufLen;
+   Bool retVal = FALSE;
 
    g_debug("%s: Entering ...\n", __FUNCTION__);
 
-   ASSERT((reply == NULL && replyLen == NULL) ||
-          (reply != NULL && replyLen != NULL && *replyLen > 0));
+   ASSERT(!Atomic_ReadBool(&gPluginState.started));
 
-   g_mutex_lock(&mutex);
+#if defined(_WIN32)
+   {
+      WSADATA wsaData;
+      int res = WSAStartup(MAKEWORD(2, 2), &wsaData);
+      if (res != 0) {
+         g_critical("%s: WSAStartup failed: error=%d.\n",
+                    __FUNCTION__, res);
+         return FALSE;
+      }
 
-   if (Atomic_ReadBool(&pluginData.stopped)) {
-      /*
-       * Main thread has interrupted pool thread for vmtoolsd shutdown.
-       */
-      err = GDP_ERROR_STOP;
+      gPluginState.wsaStarted = TRUE;
+   }
+#endif
+
+   gPluginState.vmciFamily = VMCISock_GetAFValueFd(&gPluginState.vmciFd);
+   if (gPluginState.vmciFamily == -1) {
+      g_critical("%s: Failed to get vSocket address family value.\n",
+                 __FUNCTION__);
       goto exit;
    }
 
-   if (pluginData.sock == INVALID_SOCKET && !GdpCreateSocket()) {
-      err = GDP_ERROR_INTERNAL;
+   if (!GdpCreateSocket()) {
+      g_critical("%s: Failed to create VMCI datagram socket.\n",
+                 __FUNCTION__);
       goto exit;
    }
 
-   err = GdpEmptyRecvQueue();
-   if (err != GDP_ERROR_SUCCESS) {
+#if defined(_WIN32)
+   gPluginState.eventNetwork = GdpCreateEvent();
+   if (gPluginState.eventNetwork == GDP_INVALID_EVENT) {
+      g_critical("%s: GdpCreateEvent for network failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
       goto exit;
    }
+#endif
 
-   err = GdpSend(msg, msgLen,
-                 GDP_SEND_TIMEOUT); // Should not time out in theory
-   if (err != GDP_ERROR_SUCCESS) {
-      g_info("%s: GdpSend failed: %s.\n", __FUNCTION__, gdpErrMsgs[err]);
+   gPluginState.eventStop = GdpCreateEvent();
+   if (gPluginState.eventStop == GDP_INVALID_EVENT) {
+      g_critical("%s: GdpCreateEvent for stop failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
       goto exit;
    }
 
-   if (reply != NULL) {
-      recvBuf = reply;
-      recvBufLen = replyLen;
-   } else {
-      recvBuf = altRecvBuf;
-      recvBufLen = &altRecvBufLen;
+   gPluginState.eventConfig = GdpCreateEvent();
+   if (gPluginState.eventConfig == GDP_INVALID_EVENT) {
+      g_critical("%s: GdpCreateEvent for config failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
+      goto exit;
+   }
+
+   gPublishState.eventPublish = GdpCreateEvent();
+   if (gPublishState.eventPublish == GDP_INVALID_EVENT) {
+      g_critical("%s: GdpCreateEvent for publish failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
+      goto exit;
    }
 
-   err = GdpRecv(recvBuf, recvBufLen, GDP_RECV_TIMEOUT);
-   if (err != GDP_ERROR_SUCCESS) {
-      g_info("%s: GdpRecv failed: %s.\n", __FUNCTION__, gdpErrMsgs[err]);
+   gPublishState.eventGetResult = GdpCreateEvent();
+   if (gPublishState.eventGetResult == GDP_INVALID_EVENT) {
+      g_critical("%s: GdpCreateEvent for get-result failed: error=%d.\n",
+                 __FUNCTION__, GetSysErr());
+      goto exit;
+   }
+
+   if (!ToolsCorePool_StartThread(gPluginState.ctx, "GdpThread",
+                                  GdpThreadTask,
+                                  GdpThreadInterrupt,
+                                  NULL, NULL)) {
+      g_critical("%s: Failed to start the gdp task thread.\n", __FUNCTION__);
+      goto exit;
    }
 
+   Atomic_WriteBool(&gPluginState.started, TRUE);
+   retVal = TRUE;
+
 exit:
-   if (err == GDP_ERROR_INTERNAL) {
-      /*
-       * No need to close and recreate socket for these errors:
-       *
-       * GDP_ERROR_UNREACH
-       * GDP_ERROR_TIMEOUT
-       * GDP_ERROR_SEND_SIZE
-       * GDP_ERROR_RECV_SIZE
-       */
-      GdpCloseSocket();
+   if (!retVal) {
+      GdpDestroy();
+   }
+
+   return retVal;
+}
+
+
+/*
+ ******************************************************************************
+ * GdpDestroy --
+ *
+ * Destroys plugin resources.
+ *
+ ******************************************************************************
+ */
+
+static void
+GdpDestroy(void)
+{
+   g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   GdpCloseSocket();
+
+   if (gPluginState.vmciFd != -1) {
+      VMCISock_ReleaseAFValueFd(gPluginState.vmciFd);
+      gPluginState.vmciFd = -1;
    }
 
-   g_mutex_unlock(&mutex);
+#if defined(_WIN32)
+   GdpCloseEvent(&gPluginState.eventNetwork);
+#endif
+
+   GdpCloseEvent(&gPluginState.eventStop);
+
+   GdpCloseEvent(&gPluginState.eventConfig);
+
+   GdpCloseEvent(&gPublishState.eventPublish);
 
-   g_debug("%s: Return: %s.\n", __FUNCTION__, gdpErrMsgs[err]);
+   GdpCloseEvent(&gPublishState.eventGetResult);
 
-   return err;
+#if defined(_WIN32)
+   if (gPluginState.wsaStarted) {
+      WSACleanup();
+      gPluginState.wsaStarted = FALSE;
+   }
+#endif
 }
 
 
 /*
  ******************************************************************************
- * GdpStop --
+ * GdpPublish --
+ *
+ * Publishes guest data to host side gdp daemon.
  *
- * Stops guest data publishing for vmtoolsd shutdown, called by main thread.
+ * @param[in]          createTime  UTC timestamp, in number of micro-
+ *                                 seconds since January 1, 1970 UTC.
+ * @param[in]          topic       Topic
+ * @param[in,optional] token       Token, can be NULL
+ * @param[in,optional] category    Category, can be NULL that defaults to
+ *                                 "application"
+ * @param[in]          data        Buffer containing data to publish
+ * @param[in]          dataLen     Buffer length
  *
- * @return GDP_ERROR_SUCCESS.
+ * @return GDP_ERROR_SUCCESS on success.
+ * @return Other GdpError code otherwise.
  *
  ******************************************************************************
  */
 
 static GdpError
-GdpStop(void)
+GdpPublish(gint64 createTime,     // IN
+           const gchar *topic,    // IN
+           const gchar *token,    // IN, OPTIONAL
+           const gchar *category, // IN, OPTIONAL
+           const gchar *data,     // IN
+           guint32 dataLen)       // IN
 {
+   GdpError gdpErr;
+
    g_debug("%s: Entering ...\n", __FUNCTION__);
-   Atomic_WriteBool(&pluginData.stopped, TRUE);
-   GdpSetStopEvent();
-   return GDP_ERROR_SUCCESS;
+
+   if (topic == NULL || *topic == '\0') {
+      g_info("%s: Missing topic.\n", __FUNCTION__);
+      return GDP_ERROR_INVALID_DATA;
+   }
+
+   if (data == NULL || dataLen == 0) {
+      g_info("%s: Topic '%s' has no data.\n", __FUNCTION__, topic);
+      return GDP_ERROR_INVALID_DATA;
+   }
+
+   if (token != NULL && *token == '\0') {
+      token = NULL;
+   }
+
+   if (category != NULL && *category == '\0') {
+      category = NULL;
+   }
+
+   g_mutex_lock(&gPublishState.mutex);
+
+   if (Atomic_ReadBool(&gPluginState.stopped)) {
+      gdpErr = GDP_ERROR_STOP;
+      goto exit;
+   }
+
+   if (!Atomic_ReadBool(&gPluginState.started) &&
+       !GdpStart()) {
+      gdpErr = GDP_ERROR_GENERAL;
+      goto exit;
+   }
+
+   gPublishState.createTime = createTime;
+   gPublishState.topic = topic;
+   gPublishState.token = token;
+   gPublishState.category = category;
+   gPublishState.data = data;
+   gPublishState.dataLen = dataLen;
+
+   GdpSetEvent(gPublishState.eventPublish);
+
+   do {
+      gdpErr = GdpWaitForEvent(gPublishState.eventGetResult,
+                               GDP_WAIT_INFINITE);
+      if (gdpErr == GDP_ERROR_SUCCESS) {
+         gdpErr = gPublishState.gdpErr;
+         break;
+      }
+   } while (!Atomic_ReadBool(&gPluginState.stopped));
+
+   GdpResetEvent(gPublishState.eventGetResult);
+
+exit:
+   g_mutex_unlock(&gPublishState.mutex);
+   g_debug("%s: Exiting with gdp error: %s.\n",
+           __FUNCTION__, gdpErrMsgs[gdpErr]);
+   return gdpErr;
+}
+
+
+/*
+ *-----------------------------------------------------------------------------
+ * GdpConfReload --
+ *
+ * Handles gdp config change.
+ *
+ * @param[in] src   The source object, unused
+ * @param[in] ctx   The application context
+ * @param[in] data  Unused
+ *
+ *-----------------------------------------------------------------------------
+ */
+
+static void
+GdpConfReload(gpointer src,     // IN
+              ToolsAppCtx *ctx, // IN
+              gpointer data)    // IN
+{
+   if (!Atomic_ReadBool(&gPluginState.started)) {
+      return;
+   }
+
+   GdpSetEvent(gPluginState.eventConfig);
 }
 
 
@@ -971,9 +2966,9 @@ GdpStop(void)
  *
  * Cleans up on shutdown.
  *
- * @param[in]  src     The source object, unused
- * @param[in]  ctx     The application context
- * @param[in]  data    Unused
+ * @param[in] src   The source object, unused
+ * @param[in] ctx   The application context
+ * @param[in] data  Unused
  *
  ******************************************************************************
  */
@@ -984,6 +2979,8 @@ GdpShutdown(gpointer src,     // IN
             gpointer data)    // IN
 {
    g_debug("%s: Entering ...\n", __FUNCTION__);
+   ASSERT(!Atomic_ReadBool(&gPluginState.started) ||
+          Atomic_ReadBool(&gPluginState.stopped));
    g_object_set(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, NULL, NULL);
    GdpDestroy();
 }
@@ -995,7 +2992,7 @@ GdpShutdown(gpointer src,     // IN
  *
  * Plugin entry point. Initializes internal plugin state.
  *
- * @param[in]  ctx   The application context
+ * @param[in] ctx  The application context
  *
  * @return The registration data.
  *
@@ -1006,7 +3003,7 @@ TOOLS_MODULE_EXPORT ToolsPluginData *
 ToolsOnLoad(ToolsAppCtx *ctx) // IN
 {
    /*
-    * Return NULL to disable the plugin if not running in vmsvc daemon.
+    * Returns NULL to disable the plugin if not running in vmsvc daemon.
     */
    if (!TOOLS_IS_MAIN_SERVICE(ctx)) {
       g_info("%s: Not running in vmsvc daemon: container name='%s'.\n",
@@ -1015,7 +3012,7 @@ ToolsOnLoad(ToolsAppCtx *ctx) // IN
    }
 
    /*
-    * Return NULL to disable the plugin if not running in a VMware VM.
+    * Returns NULL to disable the plugin if not running in a VMware VM.
     */
    if (!ctx->isVMware) {
       g_info("%s: Not running in a VMware VM.\n", __FUNCTION__);
@@ -1023,7 +3020,7 @@ ToolsOnLoad(ToolsAppCtx *ctx) // IN
    }
 
    /*
-    * Return NULL to disable the plugin if VM is not running on ESX host.
+    * Returns NULL to disable the plugin if VM is not running on ESX host.
     */
    {
       uint32 vmxVersion = 0;
@@ -1035,18 +3032,16 @@ ToolsOnLoad(ToolsAppCtx *ctx) // IN
       }
    }
 
-   if (!GdpInit(ctx)) {
-      g_info("%s: Failed to init plugin.\n", __FUNCTION__);
-      return NULL;
-   }
+   GdpInit(ctx);
 
    {
-      static ToolsPluginSvcGdp svcGdp = { GdpPublish, GdpStop };
+      static ToolsPluginSvcGdp svcGdp = { GdpPublish };
       static ToolsPluginData regData = { "gdp", NULL, NULL, NULL };
 
       ToolsServiceProperty propGdp = { TOOLS_PLUGIN_SVC_PROP_GDP };
 
       ToolsPluginSignalCb sigs[] = {
+         { TOOLS_CORE_SIG_CONF_RELOAD, GdpConfReload, NULL },
          { TOOLS_CORE_SIG_SHUTDOWN, GdpShutdown, NULL },
       };
       ToolsAppReg regs[] = {