]> git.ipfire.org Git - thirdparty/open-vm-tools.git/commitdiff
Tools gdp plugin updates.
authorJohn Wolfe <jwolfe@vmware.com>
Mon, 5 Apr 2021 16:01:42 +0000 (09:01 -0700)
committerJohn Wolfe <jwolfe@vmware.com>
Mon, 5 Apr 2021 16:01:42 +0000 (09:01 -0700)
open-vm-tools/lib/include/conf.h
open-vm-tools/lib/include/vmware/tools/gdp.h
open-vm-tools/services/plugins/gdp/gdpPlugin.c

index 1f98913953d3ded8ac6f9daf4baaceb9ce0859e0..bdbfc605acce0899336a42fe2ff6843a50e4f0c9 100644 (file)
 #define CONFGROUPNAME_GDP "gdp"
 
 /**
- * Defines a custom history cache size (in bytes).
+ * Defines a custom history cache buffer size limit (in bytes).
  *
  * @note Illegal values result in a @c g_warning and fallback to the default
- * cache size 4194304.
+ * cache buffer size limit 1048576.
  *
- * @param int   User-defined cache size within [1048576, 16777216].
- *              Set to 0 to disable caching.
+ * @param int   User-defined cache buffer size limit within [262144, 4194304].
+ *              Set 0 to disable caching.
  */
 #define CONFNAME_GDP_CACHE_SIZE "cacheSize"
 
+/**
+ * Defines a custom history cache item count limit.
+ *
+ * @note Illegal values result in a @c g_warning and fallback to the default
+ * cache item count limit 256.
+ *
+ * @param int   User-defined cache item count limit within [64, 1024].
+ */
+#define CONFNAME_GDP_CACHE_COUNT "cacheCount"
+
 /*
  * END gdp goodies.
  ******************************************************************************
index f7f5da925acebd66eed08c0920f4d55c3235a2e7..2b059b946e858967f20a2d2c7c4fd14423d2da11 100644 (file)
@@ -90,7 +90,8 @@ typedef struct ToolsPluginSvcGdp {
                        const gchar *token,
                        const gchar *category,
                        const gchar *data,
-                       guint32 dataLen);
+                       guint32 dataLen,
+                       gboolean cacheData);
 } ToolsPluginSvcGdp;
 
 
@@ -113,6 +114,7 @@ typedef struct ToolsPluginSvcGdp {
  *                                 "application"
  * @param[in]          data        Buffer containing data to publish
  * @param[in]          dataLen     Buffer length
+ * @param[in]          cacheData   Cache the data if TRUE
  *
  * @return GDP_ERROR_SUCCESS on success.
  * @return Other GdpError code otherwise.
@@ -127,13 +129,14 @@ ToolsPluginSvcGdp_Publish(ToolsAppCtx *ctx,      // IN
                           const gchar *token,    // IN, OPTIONAL
                           const gchar *category, // IN, OPTIONAL
                           const gchar *data,     // IN
-                          guint32 dataLen)       // IN
+                          guint32 dataLen,       // IN
+                          gboolean cacheData)    // IN
 {
    ToolsPluginSvcGdp *svcGdp = NULL;
    g_object_get(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, &svcGdp, NULL);
    if (svcGdp != NULL && svcGdp->publish != NULL) {
       return svcGdp->publish(createTime, topic, token,
-                             category, data, dataLen);
+                             category, data, dataLen, cacheData);
    }
    return GDP_ERROR_GENERAL;
 }
index ec3880a429788e98741b37b4d4c390eddfe8e3d5..6ef6674648707daf4bd6116d6800a1c5799cadd4 100644 (file)
@@ -148,21 +148,34 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING);
 
 #define GDP_RESULT_REQUIRED_KEYS 3
 
-#define GDP_HISTORY_REQUEST_PAST_SECONDS "pastSeconds" // <uint64>
-#define GDP_HISTORY_REQUEST_ID           "id"          // <uint64>
-                                                       // Subscription ID
+#define GDP_HISTORY_REQUEST_PAST_SECONDS   "pastSeconds"   // <uint64>,
+                                                           // Required
+#define GDP_HISTORY_REQUEST_ID             "id"            // <uint64>,
+                                                           // Subscription ID,
+                                                           // Required
+#define GDP_HISTORY_REQUEST_TOPIC_PREFIXES "topicPrefixes" // <String array>,
+                                                           // Optional
 
 #define GDP_HISTORY_REQUEST_REQUIRED_KEYS 2
 
 /*
- * History guest data cache size limit
+ * History guest data cache buffer size limit
  */
-#define GDP_MAX_CACHE_SIZE_LIMIT     (1 << 24) // 16M
-#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 22) // 4M
-#define GDP_MIN_CACHE_SIZE_LIMIT     (1 << 20) // 1M
+#define GDP_MAX_CACHE_SIZE_LIMIT     (1 << 22) // 4M
+#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 20) // 1M
+#define GDP_MIN_CACHE_SIZE_LIMIT     (1 << 18) // 256K
+
+/*
+ * History guest data cache item count limit
+ */
+#define GDP_MAX_CACHE_COUNT_LIMIT     (1 << 10) // 1024
+#define GDP_DEFAULT_CACHE_COUNT_LIMIT (1 << 8)  // 256
+#define GDP_MIN_CACHE_COUNT_LIMIT     (1 << 6)  // 64
 
 #define GDP_STR_SIZE(s) ((guint32) (s != NULL ? (strlen(s) + 1) : 0))
 
+#define GDP_TOKENS_PER_ALLOC 50
+
 /*
  * GdpError message table.
  */
@@ -220,6 +233,7 @@ typedef struct PublishState {
    const gchar *category;
    const gchar *data;
    guint32 dataLen;
+   gboolean cacheData;
 
    /*
     * The publish event object:
@@ -274,9 +288,10 @@ typedef struct PublishResult {
 } PublishResult;
 
 typedef struct HistoryRequest {
-   gint64 beginCacheTime; /* Begin cacheTime */
-   gint64 endCacheTime;   /* End cacheTime */
-   guint64 id;            /* Subscription ID */
+   gint64 beginCacheTime;    /* Begin cacheTime */
+   gint64 endCacheTime;      /* End cacheTime */
+   guint64 id;               /* Subscription ID */
+   GPtrArray *topicPrefixes; /* Topic prefixes */
 } HistoryRequest;
 
 typedef struct HistoryCacheItem {
@@ -292,8 +307,9 @@ typedef struct HistoryCacheItem {
 
 typedef struct HistoryCache {
    GQueue queue;       /* Container for HistoryCacheItem */
-   guint32 sizeLimit;  /* Cache size limit */
-   guint32 size;       /* Current cache size */
+   guint32 sizeLimit;  /* Cache buffer size limit */
+   guint32 countLimit; /* Cache item count limit */
+   guint32 size;       /* Current cache buffer size */
    GList *currentLink; /* Pointer to the history cache queue link
                           currently being published */
 } HistoryCache;
@@ -375,6 +391,9 @@ GdpRecvFrom(SOCKET sock,                  // IN
             int *bufLen,                  // IN/OUT
             struct sockaddr_vm *srcAddr); // OUT
 
+static inline void
+GdpTopicPrefixFree(gpointer data); // IN
+
 static inline void
 GdpHistoryRequestFree(HistoryRequest *request); // IN
 
@@ -386,7 +405,10 @@ static inline void
 GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx); // IN/OUT
 
 static guint32
-GdpGetHistoryCacheSizeLimit(); // IN
+GdpGetHistoryCacheSizeLimit();
+
+static guint32
+GdpGetHistoryCacheCountLimit();
 
 static inline Bool
 GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx); // IN
@@ -435,6 +457,10 @@ GdpTaskOkToSend(TaskContext *taskCtx); // IN
 static GdpError
 GdpTaskSendPacket(TaskContext *taskCtx); // IN/OUT
 
+static Bool
+GdpMatchTopicPrefixes(const gchar *topic,              // IN
+                      const GPtrArray *topicPrefixes); // IN
+
 static gchar *
 GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
    TaskContext *taskCtx); // IN/OUT
@@ -446,29 +472,29 @@ static void
 GdpTaskProcessConfigChange(TaskContext *taskCtx); // IN/OUT
 
 static Bool
-GdpJsonIsTokenOfKey(const char *json, // IN
-                    jsmntok_t *token, // IN
-                    const char *key); // IN
+GdpJsonIsTokenOfKey(const char *json,       // IN
+                    const jsmntok_t *token, // IN
+                    const char *key);       // IN
 
 static Bool
-GdpJsonIsPublishResult(const char *json,       // IN
-                       jsmntok_t *tokens,      // IN
-                       int count,              // IN
-                       PublishResult *result); // OUT
+GdpJsonIsPublishResult(const char *json,        // IN
+                       const jsmntok_t *tokens, // IN
+                       int count,               // IN
+                       PublishResult *result);  // OUT
 
 static void
-GdpTaskProcessPublishResult(TaskContext *taskCtx,   // IN/OUT
-                            PublishResult *result); // IN
+GdpTaskProcessPublishResult(TaskContext *taskCtx,         // IN/OUT
+                            const PublishResult *result); // IN
 
 static Bool
 GdpJsonIsHistoryRequest(const char *json,         // IN
-                        jsmntok_t *tokens,        // IN
+                        const jsmntok_t *tokens,  // IN
                         int count,                // IN
                         HistoryRequest *request); // OUT
 
 static void
-GdpTaskPushHistoryRequest(TaskContext *taskCtx,     // IN/OUT
-                          HistoryRequest *request); // IN
+GdpTaskProcessHistoryRequest(TaskContext *taskCtx,     // IN/OUT
+                             HistoryRequest *request); // IN/OUT
 
 static void
 GdpTaskProcessNetwork(TaskContext *taskCtx); // IN/OUT
@@ -1015,6 +1041,26 @@ GdpRecvFrom(SOCKET sock,                 // IN
 }
 
 
+/*
+ *****************************************************************************
+ * GdpTopicPrefixFree --
+ *
+ * Frees topic prefix string buffer.
+ *
+ * @param[in] data  Topic prefix string buffer pointer
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTopicPrefixFree(gpointer data) // IN
+{
+   g_debug("%s: Freeing buffer for topic prefix \"%s\".\n",
+           __FUNCTION__, (const char *) data);
+   free(data);
+}
+
+
 /*
  *****************************************************************************
  * GdpHistoryRequestFree --
@@ -1030,6 +1076,11 @@ static inline void
 GdpHistoryRequestFree(HistoryRequest *request) // IN
 {
    g_debug("%s: Entering ...\n", __FUNCTION__);
+
+   if (request->topicPrefixes != NULL) {
+      g_ptr_array_free(request->topicPrefixes, TRUE);
+   }
+
    free(request);
 }
 
@@ -1077,13 +1128,13 @@ GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx) // IN/OUT
  ******************************************************************************
  * GdpGetHistoryCacheSizeLimit --
  *
- * Gets history cache size limit from tools config.
+ * Gets history cache buffer size limit from tools config.
  *
  ******************************************************************************
  */
 
 static guint32
-GdpGetHistoryCacheSizeLimit() // IN
+GdpGetHistoryCacheSizeLimit()
 {
    gint sizeLimit;
 
@@ -1092,8 +1143,8 @@ GdpGetHistoryCacheSizeLimit() // IN
    if (sizeLimit != 0 &&
        (sizeLimit < GDP_MIN_CACHE_SIZE_LIMIT ||
         sizeLimit > GDP_MAX_CACHE_SIZE_LIMIT)) {
-      g_warning("%s: Configured history cache size limit %d exceeds range, "
-                "set to default value %d.\n",
+      g_warning("%s: Configured history cache buffer size limit %d "
+                "exceeds range, set to default value %d.\n",
                 __FUNCTION__, sizeLimit, GDP_DEFAULT_CACHE_SIZE_LIMIT);
       sizeLimit = GDP_DEFAULT_CACHE_SIZE_LIMIT;
    }
@@ -1102,6 +1153,34 @@ GdpGetHistoryCacheSizeLimit() // IN
 }
 
 
+/*
+ ******************************************************************************
+ * GdpGetHistoryCacheCountLimit --
+ *
+ * Gets history cache item count limit from tools config.
+ *
+ ******************************************************************************
+ */
+
+static guint32
+GdpGetHistoryCacheCountLimit()
+{
+   gint countLimit;
+
+   countLimit = GDP_CONFIG_GET_INT(CONFNAME_GDP_CACHE_COUNT,
+                                   GDP_DEFAULT_CACHE_COUNT_LIMIT);
+   if (countLimit < GDP_MIN_CACHE_COUNT_LIMIT ||
+       countLimit > GDP_MAX_CACHE_COUNT_LIMIT) {
+      g_warning("%s: Configured history cache item count limit %d "
+                "exceeds range, set to default value %d.\n",
+                __FUNCTION__, countLimit, GDP_DEFAULT_CACHE_COUNT_LIMIT);
+      countLimit = GDP_DEFAULT_CACHE_COUNT_LIMIT;
+   }
+
+   return (guint32) countLimit;
+}
+
+
 /*
  ******************************************************************************
  * GdpTaskIsHistoryCacheEnabled --
@@ -1110,7 +1189,7 @@ GdpGetHistoryCacheSizeLimit() // IN
  *
  * @param[in] taskCtx  The task context
  *
- * @return TRUE if history cache size limit is not zero.
+ * @return TRUE if history cache buffer size limit is not zero.
  * @return FALSE otherwise.
  *
  ******************************************************************************
@@ -1267,7 +1346,10 @@ GdpTaskHistoryCachePushItem(TaskContext *taskCtx,  // IN/OUT
                     + GDP_STR_SIZE(item->category)
                     + item->dataLen;
 
-   while (taskCtx->cache.size + item->itemSize > taskCtx->cache.sizeLimit) {
+   while (taskCtx->cache.size + item->itemSize >
+             taskCtx->cache.sizeLimit ||
+          g_queue_get_length(&taskCtx->cache.queue) >=
+             taskCtx->cache.countLimit) {
       GdpTaskDeleteHistoryCacheTail(taskCtx);
    }
 
@@ -1497,6 +1579,45 @@ GdpTaskSendPacket(TaskContext *taskCtx) // IN/OUT
 }
 
 
+/*
+ *****************************************************************************
+ * GdpMatchTopicPrefixes --
+ *
+ * Matches a topic against prefixes.
+ *
+ * @param[in] topic          Topic
+ * @param[in] topicPrefixes  Topic prefixes
+ *
+ * @return TRUE if topic matches any prefix.
+ * @return FALSE otherwise.
+ *
+ *****************************************************************************
+ */
+
+static Bool
+GdpMatchTopicPrefixes(const gchar *topic,             // IN
+                      const GPtrArray *topicPrefixes) // IN
+{
+   guint index;
+
+   ASSERT(topic != NULL);
+   ASSERT(topicPrefixes != NULL);
+
+   for (index = 0; index < topicPrefixes->len; index++) {
+      const gchar *prefix = g_ptr_array_index(topicPrefixes, index);
+      guint prefixLen = (guint) strlen(prefix);
+
+      if (strncmp(topic, prefix, prefixLen) == 0) {
+         if (topic[prefixLen] == '.' || topic[prefixLen] == '\0') {
+            return TRUE;
+         }
+      }
+   }
+
+   return FALSE;
+}
+
+
 /*
  *****************************************************************************
  * GdpTaskUpdateHistoryCachePointerAndGetSubscribers --
@@ -1541,15 +1662,18 @@ GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
             requestDone = TRUE;
          } else if (request->beginCacheTime < item->cacheTime &&
                     item->cacheTime <= request->endCacheTime) {
-            if (subscribers == NULL) {
-               subscribers = g_strdup_printf("%" G_GUINT64_FORMAT,
-                                             request->id);
-            } else {
-               gchar *subscribersNew;
-               subscribersNew = g_strdup_printf("%s,%" G_GUINT64_FORMAT,
-                                                subscribers, request->id);
-               g_free(subscribers);
-               subscribers = subscribersNew;
+            if (request->topicPrefixes == NULL ||
+                GdpMatchTopicPrefixes(item->topic, request->topicPrefixes)) {
+               if (subscribers == NULL) {
+                  subscribers = g_strdup_printf("%" G_GUINT64_FORMAT,
+                                                request->id);
+               } else {
+                  gchar *subscribersNew;
+                  subscribersNew = g_strdup_printf("%s,%" G_GUINT64_FORMAT,
+                                                   subscribers, request->id);
+                  g_free(subscribers);
+                  subscribers = subscribersNew;
+               }
             }
 
             if (item->cacheTime == request->endCacheTime) {
@@ -1678,15 +1802,24 @@ static void
 GdpTaskProcessConfigChange(TaskContext *taskCtx) // IN/OUT
 {
    guint32 sizeLimit = GdpGetHistoryCacheSizeLimit();
+   guint32 countLimit = GdpGetHistoryCacheCountLimit();
 
-   if (taskCtx->cache.sizeLimit == sizeLimit) {
+   if (taskCtx->cache.sizeLimit == sizeLimit &&
+       taskCtx->cache.countLimit == countLimit) {
       return;
    }
 
-   g_debug("%s: Current history cache size limit: %u, new value: %u.\n",
+   g_debug("%s: Current history cache buffer size limit: %u, new value: %u.\n",
            __FUNCTION__, taskCtx->cache.sizeLimit, sizeLimit);
    taskCtx->cache.sizeLimit = sizeLimit;
-   while (taskCtx->cache.size > taskCtx->cache.sizeLimit) {
+
+   g_debug("%s: Current history cache item count limit: %u, new value: %u.\n",
+           __FUNCTION__, taskCtx->cache.countLimit, countLimit);
+   taskCtx->cache.countLimit = countLimit;
+
+   while (taskCtx->cache.size > taskCtx->cache.sizeLimit ||
+          g_queue_get_length(&taskCtx->cache.queue) >
+             taskCtx->cache.countLimit) {
       GdpTaskDeleteHistoryCacheTail(taskCtx);
    }
 }
@@ -1709,9 +1842,9 @@ GdpTaskProcessConfigChange(TaskContext *taskCtx) // IN/OUT
  */
 
 static Bool
-GdpJsonIsTokenOfKey(const char *json, // IN
-                    jsmntok_t *token, // IN
-                    const char *key)  // IN
+GdpJsonIsTokenOfKey(const char *json,       // IN
+                    const jsmntok_t *token, // IN
+                    const char *key)        // IN
 {
    int tokenLen = token->end - token->start;
    if (token->type == JSMN_STRING &&
@@ -1742,10 +1875,10 @@ GdpJsonIsTokenOfKey(const char *json, // IN
  */
 
 static Bool
-GdpJsonIsPublishResult(const char *json,      // IN
-                       jsmntok_t *tokens,     // IN
-                       int count,             // IN
-                       PublishResult *result) // OUT
+GdpJsonIsPublishResult(const char *json,        // IN
+                       const jsmntok_t *tokens, // IN
+                       int count,               // IN
+                       PublishResult *result)   // OUT
 {
    int index;
    int requiredKeys = GDP_RESULT_REQUIRED_KEYS;
@@ -1810,8 +1943,8 @@ GdpJsonIsPublishResult(const char *json,      // IN
  */
 
 static void
-GdpTaskProcessPublishResult(TaskContext *taskCtx,  // IN/OUT
-                            PublishResult *result) // IN
+GdpTaskProcessPublishResult(TaskContext *taskCtx,        // IN/OUT
+                            const PublishResult *result) // IN
 {
    g_debug("%s: Entering ...\n", __FUNCTION__);
 
@@ -1836,7 +1969,8 @@ GdpTaskProcessPublishResult(TaskContext *taskCtx,  // IN/OUT
 
    if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
       if (result->statusOk) {
-         if (GdpTaskIsHistoryCacheEnabled(taskCtx)) {
+         if (GdpTaskIsHistoryCacheEnabled(taskCtx) &&
+             gPublishState.cacheData) {
             GdpTaskHistoryCachePushItem(taskCtx,
                                         gPublishState.createTime,
                                         gPublishState.topic,
@@ -1888,7 +2022,7 @@ GdpTaskProcessPublishResult(TaskContext *taskCtx,  // IN/OUT
 
 static Bool
 GdpJsonIsHistoryRequest(const char *json,        // IN
-                        jsmntok_t *tokens,       // IN
+                        const jsmntok_t *tokens, // IN
                         int count,               // IN
                         HistoryRequest *request) // OUT
 {
@@ -1896,6 +2030,8 @@ GdpJsonIsHistoryRequest(const char *json,        // IN
    int requiredKeys = GDP_HISTORY_REQUEST_REQUIRED_KEYS;
    guint64 pastSeconds = 0;
 
+   request->topicPrefixes = NULL;
+
    /*
     * Loops over all keys of the root object
     */
@@ -1910,6 +2046,24 @@ GdpJsonIsHistoryRequest(const char *json,        // IN
          requiredKeys--;
          index++;
          request->id = g_ascii_strtoull(json + tokens[index].start, NULL, 10);
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_HISTORY_REQUEST_TOPIC_PREFIXES) &&
+                 tokens[index + 1].type == JSMN_ARRAY) {
+         int prefixCount;
+         int prefixIndex;
+
+         prefixCount = tokens[index + 1].size;
+         request->topicPrefixes = g_ptr_array_new_full(prefixCount,
+                                                       GdpTopicPrefixFree);
+         index += 2;
+         for (prefixIndex = 0; prefixIndex < prefixCount; prefixIndex++) {
+            const jsmntok_t *token = &tokens[index + prefixIndex];
+            g_ptr_array_add(request->topicPrefixes,
+                            Util_SafeStrndup(json + token->start,
+                                             token->end - token->start));
+         }
+
+         index += (prefixCount - 1);
       }
    }
 
@@ -1920,25 +2074,30 @@ GdpJsonIsHistoryRequest(const char *json,        // IN
       return TRUE;
    }
 
+   if (request->topicPrefixes != NULL) {
+      g_ptr_array_free(request->topicPrefixes, TRUE);
+      request->topicPrefixes = NULL;
+   }
    return FALSE;
 }
 
 
 /*
  *****************************************************************************
- * GdpTaskPushHistoryRequest --
+ * GdpTaskProcessHistoryRequest --
  *
- * Pushes new history request to queue and resets history cache pointer.
+ * Validates new history request, pushes the request to queue and
+ * resets history cache pointer.
  *
  * @param[in,out] taskCtx  The task context
- * @param[in]     request  New history request
+ * @param[in,out] request  New history request
  *
  *****************************************************************************
  */
 
 static void
-GdpTaskPushHistoryRequest(TaskContext *taskCtx,    // IN/OUT
-                          HistoryRequest *request) // IN
+GdpTaskProcessHistoryRequest(TaskContext *taskCtx,    // IN/OUT
+                             HistoryRequest *request) // IN/OUT
 {
    HistoryRequest *requestCopy;
 
@@ -1946,12 +2105,12 @@ GdpTaskPushHistoryRequest(TaskContext *taskCtx,    // IN/OUT
 
    if (!GdpTaskIsHistoryCacheEnabled(taskCtx)) {
       g_info("%s: History cache not enabled.\n", __FUNCTION__);
-      return;
+      goto fail;
    }
 
    if (request->beginCacheTime >= request->endCacheTime) {
       g_info("%s: Invalid history request.\n", __FUNCTION__);
-      return;
+      goto fail;
    }
 
    if (request->beginCacheTime < 0) {
@@ -1962,6 +2121,8 @@ GdpTaskPushHistoryRequest(TaskContext *taskCtx,    // IN/OUT
    requestCopy->beginCacheTime = request->beginCacheTime;
    requestCopy->endCacheTime = request->endCacheTime;
    requestCopy->id = request->id;
+   requestCopy->topicPrefixes = request->topicPrefixes;
+   request->topicPrefixes = NULL;
 
    /*
     * Note: each request comes with a unique subscription ID.
@@ -1971,6 +2132,14 @@ GdpTaskPushHistoryRequest(TaskContext *taskCtx,    // IN/OUT
     * Resets history cache pointer.
     */
    taskCtx->cache.currentLink = NULL;
+
+   return;
+
+fail:
+   if (request->topicPrefixes != NULL) {
+      g_ptr_array_free(request->topicPrefixes, TRUE);
+      request->topicPrefixes = NULL;
+   }
 }
 
 
@@ -1992,8 +2161,9 @@ GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
    char buf[GDP_MAX_PACKET_LEN + 1]; // Adds a space for NULL
    int bufLen = (int) sizeof buf - 1;
    struct sockaddr_vm srcAddr;
+   unsigned int numTokens;
+   jsmntok_t *tokens;
    jsmn_parser parser;
-   jsmntok_t tokens[16]; // We expect no more than 16 tokens
    int retVal;
    Bool isPublishResult;
    Bool isHistoryRequest;
@@ -2016,11 +2186,19 @@ GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
    buf[bufLen] = '\0';
 
    jsmn_init(&parser);
-   retVal = jsmn_parse(&parser, buf, bufLen,
-                       tokens, (unsigned int) ARRAYSIZE(tokens));
+
+   numTokens = GDP_TOKENS_PER_ALLOC;
+   tokens = Util_SafeMalloc(numTokens * sizeof *tokens);
+   while ((retVal = jsmn_parse(&parser, buf, bufLen, tokens, numTokens))
+          == JSMN_ERROR_NOMEM) {
+      numTokens += GDP_TOKENS_PER_ALLOC;
+      tokens = Util_SafeRealloc(tokens, numTokens * sizeof *tokens);
+   }
+
    if (retVal < 0) {
       g_info("%s: Error %d while parsing JSON:\n%s\n",
              __FUNCTION__, retVal, buf);
+      free(tokens);
       return;
    }
 
@@ -2029,6 +2207,7 @@ GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
     */
    if (retVal < 1 || tokens[0].type != JSMN_OBJECT) {
       g_info("%s: Invalid JSON:\n%s\n", __FUNCTION__, buf);
+      free(tokens);
       return;
    }
 
@@ -2055,10 +2234,12 @@ GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
       g_free(result.diagnosis);
    } else if (isHistoryRequest) {
       g_debug("%s: Received history request:\n%s\n", __FUNCTION__, buf);
-      GdpTaskPushHistoryRequest(taskCtx, &request);
+      GdpTaskProcessHistoryRequest(taskCtx, &request);
    } else {
       g_info("%s: Unknown JSON:\n%s\n", __FUNCTION__, buf);
    }
+
+   free(tokens);
 }
 
 
@@ -2503,6 +2684,7 @@ GdpTaskCtxInit(TaskContext *taskCtx) // OUT
 
    g_queue_init(&taskCtx->cache.queue);
    taskCtx->cache.sizeLimit = GdpGetHistoryCacheSizeLimit();
+   taskCtx->cache.countLimit = GdpGetHistoryCacheCountLimit();
    taskCtx->cache.size = 0;
    taskCtx->cache.currentLink = NULL;
 
@@ -2857,6 +3039,7 @@ GdpDestroy(void)
  *                                 "application"
  * @param[in]          data        Buffer containing data to publish
  * @param[in]          dataLen     Buffer length
+ * @param[in]          cacheData   Cache the data if TRUE
  *
  * @return GDP_ERROR_SUCCESS on success.
  * @return Other GdpError code otherwise.
@@ -2870,7 +3053,8 @@ GdpPublish(gint64 createTime,     // IN
            const gchar *token,    // IN, OPTIONAL
            const gchar *category, // IN, OPTIONAL
            const gchar *data,     // IN
-           guint32 dataLen)       // IN
+           guint32 dataLen,       // IN
+           gboolean cacheData)    // IN
 {
    GdpError gdpErr;
 
@@ -2913,6 +3097,7 @@ GdpPublish(gint64 createTime,     // IN
    gPublishState.category = category;
    gPublishState.data = data;
    gPublishState.dataLen = dataLen;
+   gPublishState.cacheData = cacheData;
 
    GdpSetEvent(gPublishState.eventPublish);