#define GDP_RESULT_REQUIRED_KEYS 3
-#define GDP_HISTORY_REQUEST_PAST_SECONDS "pastSeconds" // <uint64>
-#define GDP_HISTORY_REQUEST_ID "id" // <uint64>
- // Subscription ID
+#define GDP_HISTORY_REQUEST_PAST_SECONDS "pastSeconds" // <uint64>,
+ // Required
+#define GDP_HISTORY_REQUEST_ID "id" // <uint64>,
+ // Subscription ID,
+ // Required
+#define GDP_HISTORY_REQUEST_TOPIC_PREFIXES "topicPrefixes" // <String array>,
+ // Optional
#define GDP_HISTORY_REQUEST_REQUIRED_KEYS 2
/*
- * History guest data cache size limit
+ * History guest data cache buffer size limit
*/
-#define GDP_MAX_CACHE_SIZE_LIMIT (1 << 24) // 16M
-#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 22) // 4M
-#define GDP_MIN_CACHE_SIZE_LIMIT (1 << 20) // 1M
+#define GDP_MAX_CACHE_SIZE_LIMIT (1 << 22) // 4M
+#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 20) // 1M
+#define GDP_MIN_CACHE_SIZE_LIMIT (1 << 18) // 256K
+
+/*
+ * History guest data cache item count limit
+ */
+#define GDP_MAX_CACHE_COUNT_LIMIT (1 << 10) // 1024
+#define GDP_DEFAULT_CACHE_COUNT_LIMIT (1 << 8) // 256
+#define GDP_MIN_CACHE_COUNT_LIMIT (1 << 6) // 64
#define GDP_STR_SIZE(s) ((guint32) (s != NULL ? (strlen(s) + 1) : 0))
+#define GDP_TOKENS_PER_ALLOC 50
+
/*
* GdpError message table.
*/
const gchar *category;
const gchar *data;
guint32 dataLen;
+ gboolean cacheData;
/*
* The publish event object:
} PublishResult;
typedef struct HistoryRequest {
- gint64 beginCacheTime; /* Begin cacheTime */
- gint64 endCacheTime; /* End cacheTime */
- guint64 id; /* Subscription ID */
+ gint64 beginCacheTime; /* Begin cacheTime */
+ gint64 endCacheTime; /* End cacheTime */
+ guint64 id; /* Subscription ID */
+ GPtrArray *topicPrefixes; /* Topic prefixes */
} HistoryRequest;
typedef struct HistoryCacheItem {
typedef struct HistoryCache {
GQueue queue; /* Container for HistoryCacheItem */
- guint32 sizeLimit; /* Cache size limit */
- guint32 size; /* Current cache size */
+ guint32 sizeLimit; /* Cache buffer size limit */
+ guint32 countLimit; /* Cache item count limit */
+ guint32 size; /* Current cache buffer size */
GList *currentLink; /* Pointer to the history cache queue link
currently being published */
} HistoryCache;
int *bufLen, // IN/OUT
struct sockaddr_vm *srcAddr); // OUT
+static inline void
+GdpTopicPrefixFree(gpointer data); // IN
+
static inline void
GdpHistoryRequestFree(HistoryRequest *request); // IN
GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx); // IN/OUT
static guint32
-GdpGetHistoryCacheSizeLimit(); // IN
+GdpGetHistoryCacheSizeLimit();
+
+static guint32
+GdpGetHistoryCacheCountLimit();
static inline Bool
GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx); // IN
static GdpError
GdpTaskSendPacket(TaskContext *taskCtx); // IN/OUT
+static Bool
+GdpMatchTopicPrefixes(const gchar *topic, // IN
+ const GPtrArray *topicPrefixes); // IN
+
static gchar *
GdpTaskUpdateHistoryCachePointerAndGetSubscribers(
TaskContext *taskCtx); // IN/OUT
GdpTaskProcessConfigChange(TaskContext *taskCtx); // IN/OUT
static Bool
-GdpJsonIsTokenOfKey(const char *json, // IN
- jsmntok_t *token, // IN
- const char *key); // IN
+GdpJsonIsTokenOfKey(const char *json, // IN
+ const jsmntok_t *token, // IN
+ const char *key); // IN
static Bool
-GdpJsonIsPublishResult(const char *json, // IN
- jsmntok_t *tokens, // IN
- int count, // IN
- PublishResult *result); // OUT
+GdpJsonIsPublishResult(const char *json, // IN
+ const jsmntok_t *tokens, // IN
+ int count, // IN
+ PublishResult *result); // OUT
static void
-GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT
- PublishResult *result); // IN
+GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT
+ const PublishResult *result); // IN
static Bool
GdpJsonIsHistoryRequest(const char *json, // IN
- jsmntok_t *tokens, // IN
+ const jsmntok_t *tokens, // IN
int count, // IN
HistoryRequest *request); // OUT
static void
-GdpTaskPushHistoryRequest(TaskContext *taskCtx, // IN/OUT
- HistoryRequest *request); // IN
+GdpTaskProcessHistoryRequest(TaskContext *taskCtx, // IN/OUT
+ HistoryRequest *request); // IN/OUT
static void
GdpTaskProcessNetwork(TaskContext *taskCtx); // IN/OUT
}
+/*
+ *****************************************************************************
+ * GdpTopicPrefixFree --
+ *
+ * Frees topic prefix string buffer.
+ *
+ * @param[in] data Topic prefix string buffer pointer
+ *
+ *****************************************************************************
+ */
+
+static inline void
+GdpTopicPrefixFree(gpointer data) // IN
+{
+ g_debug("%s: Freeing buffer for topic prefix \"%s\".\n",
+ __FUNCTION__, (const char *) data);
+ free(data);
+}
+
+
/*
*****************************************************************************
* GdpHistoryRequestFree --
GdpHistoryRequestFree(HistoryRequest *request) // IN
{
g_debug("%s: Entering ...\n", __FUNCTION__);
+
+ if (request->topicPrefixes != NULL) {
+ g_ptr_array_free(request->topicPrefixes, TRUE);
+ }
+
free(request);
}
******************************************************************************
* GdpGetHistoryCacheSizeLimit --
*
- * Gets history cache size limit from tools config.
+ * Gets history cache buffer size limit from tools config.
*
******************************************************************************
*/
static guint32
-GdpGetHistoryCacheSizeLimit() // IN
+GdpGetHistoryCacheSizeLimit()
{
gint sizeLimit;
if (sizeLimit != 0 &&
(sizeLimit < GDP_MIN_CACHE_SIZE_LIMIT ||
sizeLimit > GDP_MAX_CACHE_SIZE_LIMIT)) {
- g_warning("%s: Configured history cache size limit %d exceeds range, "
- "set to default value %d.\n",
+ g_warning("%s: Configured history cache buffer size limit %d "
+ "exceeds range, set to default value %d.\n",
__FUNCTION__, sizeLimit, GDP_DEFAULT_CACHE_SIZE_LIMIT);
sizeLimit = GDP_DEFAULT_CACHE_SIZE_LIMIT;
}
}
+/*
+ ******************************************************************************
+ * GdpGetHistoryCacheCountLimit --
+ *
+ * Gets history cache item count limit from tools config.
+ *
+ ******************************************************************************
+ */
+
+static guint32
+GdpGetHistoryCacheCountLimit()
+{
+ gint countLimit;
+
+ countLimit = GDP_CONFIG_GET_INT(CONFNAME_GDP_CACHE_COUNT,
+ GDP_DEFAULT_CACHE_COUNT_LIMIT);
+ if (countLimit < GDP_MIN_CACHE_COUNT_LIMIT ||
+ countLimit > GDP_MAX_CACHE_COUNT_LIMIT) {
+ g_warning("%s: Configured history cache item count limit %d "
+ "exceeds range, set to default value %d.\n",
+ __FUNCTION__, countLimit, GDP_DEFAULT_CACHE_COUNT_LIMIT);
+ countLimit = GDP_DEFAULT_CACHE_COUNT_LIMIT;
+ }
+
+ return (guint32) countLimit;
+}
+
+
/*
******************************************************************************
* GdpTaskIsHistoryCacheEnabled --
*
* @param[in] taskCtx The task context
*
- * @return TRUE if history cache size limit is not zero.
+ * @return TRUE if history cache buffer size limit is not zero.
* @return FALSE otherwise.
*
******************************************************************************
+ GDP_STR_SIZE(item->category)
+ item->dataLen;
- while (taskCtx->cache.size + item->itemSize > taskCtx->cache.sizeLimit) {
+ while (taskCtx->cache.size + item->itemSize >
+ taskCtx->cache.sizeLimit ||
+ g_queue_get_length(&taskCtx->cache.queue) >=
+ taskCtx->cache.countLimit) {
GdpTaskDeleteHistoryCacheTail(taskCtx);
}
}
+/*
+ *****************************************************************************
+ * GdpMatchTopicPrefixes --
+ *
+ * Matches a topic against prefixes.
+ *
+ * @param[in] topic Topic
+ * @param[in] topicPrefixes Topic prefixes
+ *
+ * @return TRUE if topic matches any prefix.
+ * @return FALSE otherwise.
+ *
+ *****************************************************************************
+ */
+
+static Bool
+GdpMatchTopicPrefixes(const gchar *topic, // IN
+ const GPtrArray *topicPrefixes) // IN
+{
+ guint index;
+
+ ASSERT(topic != NULL);
+ ASSERT(topicPrefixes != NULL);
+
+ for (index = 0; index < topicPrefixes->len; index++) {
+ const gchar *prefix = g_ptr_array_index(topicPrefixes, index);
+ guint prefixLen = (guint) strlen(prefix);
+
+ if (strncmp(topic, prefix, prefixLen) == 0) {
+ if (topic[prefixLen] == '.' || topic[prefixLen] == '\0') {
+ return TRUE;
+ }
+ }
+ }
+
+ return FALSE;
+}
+
+
/*
*****************************************************************************
* GdpTaskUpdateHistoryCachePointerAndGetSubscribers --
requestDone = TRUE;
} else if (request->beginCacheTime < item->cacheTime &&
item->cacheTime <= request->endCacheTime) {
- if (subscribers == NULL) {
- subscribers = g_strdup_printf("%" G_GUINT64_FORMAT,
- request->id);
- } else {
- gchar *subscribersNew;
- subscribersNew = g_strdup_printf("%s,%" G_GUINT64_FORMAT,
- subscribers, request->id);
- g_free(subscribers);
- subscribers = subscribersNew;
+ if (request->topicPrefixes == NULL ||
+ GdpMatchTopicPrefixes(item->topic, request->topicPrefixes)) {
+ if (subscribers == NULL) {
+ subscribers = g_strdup_printf("%" G_GUINT64_FORMAT,
+ request->id);
+ } else {
+ gchar *subscribersNew;
+ subscribersNew = g_strdup_printf("%s,%" G_GUINT64_FORMAT,
+ subscribers, request->id);
+ g_free(subscribers);
+ subscribers = subscribersNew;
+ }
}
if (item->cacheTime == request->endCacheTime) {
GdpTaskProcessConfigChange(TaskContext *taskCtx) // IN/OUT
{
guint32 sizeLimit = GdpGetHistoryCacheSizeLimit();
+ guint32 countLimit = GdpGetHistoryCacheCountLimit();
- if (taskCtx->cache.sizeLimit == sizeLimit) {
+ if (taskCtx->cache.sizeLimit == sizeLimit &&
+ taskCtx->cache.countLimit == countLimit) {
return;
}
- g_debug("%s: Current history cache size limit: %u, new value: %u.\n",
+ g_debug("%s: Current history cache buffer size limit: %u, new value: %u.\n",
__FUNCTION__, taskCtx->cache.sizeLimit, sizeLimit);
taskCtx->cache.sizeLimit = sizeLimit;
- while (taskCtx->cache.size > taskCtx->cache.sizeLimit) {
+
+ g_debug("%s: Current history cache item count limit: %u, new value: %u.\n",
+ __FUNCTION__, taskCtx->cache.countLimit, countLimit);
+ taskCtx->cache.countLimit = countLimit;
+
+ while (taskCtx->cache.size > taskCtx->cache.sizeLimit ||
+ g_queue_get_length(&taskCtx->cache.queue) >
+ taskCtx->cache.countLimit) {
GdpTaskDeleteHistoryCacheTail(taskCtx);
}
}
*/
static Bool
-GdpJsonIsTokenOfKey(const char *json, // IN
- jsmntok_t *token, // IN
- const char *key) // IN
+GdpJsonIsTokenOfKey(const char *json, // IN
+ const jsmntok_t *token, // IN
+ const char *key) // IN
{
int tokenLen = token->end - token->start;
if (token->type == JSMN_STRING &&
*/
static Bool
-GdpJsonIsPublishResult(const char *json, // IN
- jsmntok_t *tokens, // IN
- int count, // IN
- PublishResult *result) // OUT
+GdpJsonIsPublishResult(const char *json, // IN
+ const jsmntok_t *tokens, // IN
+ int count, // IN
+ PublishResult *result) // OUT
{
int index;
int requiredKeys = GDP_RESULT_REQUIRED_KEYS;
*/
static void
-GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT
- PublishResult *result) // IN
+GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT
+ const PublishResult *result) // IN
{
g_debug("%s: Entering ...\n", __FUNCTION__);
if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
if (result->statusOk) {
- if (GdpTaskIsHistoryCacheEnabled(taskCtx)) {
+ if (GdpTaskIsHistoryCacheEnabled(taskCtx) &&
+ gPublishState.cacheData) {
GdpTaskHistoryCachePushItem(taskCtx,
gPublishState.createTime,
gPublishState.topic,
static Bool
GdpJsonIsHistoryRequest(const char *json, // IN
- jsmntok_t *tokens, // IN
+ const jsmntok_t *tokens, // IN
int count, // IN
HistoryRequest *request) // OUT
{
int requiredKeys = GDP_HISTORY_REQUEST_REQUIRED_KEYS;
guint64 pastSeconds = 0;
+ request->topicPrefixes = NULL;
+
/*
* Loops over all keys of the root object
*/
requiredKeys--;
index++;
request->id = g_ascii_strtoull(json + tokens[index].start, NULL, 10);
+ } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+ GDP_HISTORY_REQUEST_TOPIC_PREFIXES) &&
+ tokens[index + 1].type == JSMN_ARRAY) {
+ int prefixCount;
+ int prefixIndex;
+
+ prefixCount = tokens[index + 1].size;
+ request->topicPrefixes = g_ptr_array_new_full(prefixCount,
+ GdpTopicPrefixFree);
+ index += 2;
+ for (prefixIndex = 0; prefixIndex < prefixCount; prefixIndex++) {
+ const jsmntok_t *token = &tokens[index + prefixIndex];
+ g_ptr_array_add(request->topicPrefixes,
+ Util_SafeStrndup(json + token->start,
+ token->end - token->start));
+ }
+
+ index += (prefixCount - 1);
}
}
return TRUE;
}
+ if (request->topicPrefixes != NULL) {
+ g_ptr_array_free(request->topicPrefixes, TRUE);
+ request->topicPrefixes = NULL;
+ }
return FALSE;
}
/*
*****************************************************************************
- * GdpTaskPushHistoryRequest --
+ * GdpTaskProcessHistoryRequest --
*
- * Pushes new history request to queue and resets history cache pointer.
+ * Validates new history request, pushes the request to queue and
+ * resets history cache pointer.
*
* @param[in,out] taskCtx The task context
- * @param[in] request New history request
+ * @param[in,out] request New history request
*
*****************************************************************************
*/
static void
-GdpTaskPushHistoryRequest(TaskContext *taskCtx, // IN/OUT
- HistoryRequest *request) // IN
+GdpTaskProcessHistoryRequest(TaskContext *taskCtx, // IN/OUT
+ HistoryRequest *request) // IN/OUT
{
HistoryRequest *requestCopy;
if (!GdpTaskIsHistoryCacheEnabled(taskCtx)) {
g_info("%s: History cache not enabled.\n", __FUNCTION__);
- return;
+ goto fail;
}
if (request->beginCacheTime >= request->endCacheTime) {
g_info("%s: Invalid history request.\n", __FUNCTION__);
- return;
+ goto fail;
}
if (request->beginCacheTime < 0) {
requestCopy->beginCacheTime = request->beginCacheTime;
requestCopy->endCacheTime = request->endCacheTime;
requestCopy->id = request->id;
+ requestCopy->topicPrefixes = request->topicPrefixes;
+ request->topicPrefixes = NULL;
/*
* Note: each request comes with a unique subscription ID.
* Resets history cache pointer.
*/
taskCtx->cache.currentLink = NULL;
+
+ return;
+
+fail:
+ if (request->topicPrefixes != NULL) {
+ g_ptr_array_free(request->topicPrefixes, TRUE);
+ request->topicPrefixes = NULL;
+ }
}
char buf[GDP_MAX_PACKET_LEN + 1]; // Adds a space for NULL
int bufLen = (int) sizeof buf - 1;
struct sockaddr_vm srcAddr;
+ unsigned int numTokens;
+ jsmntok_t *tokens;
jsmn_parser parser;
- jsmntok_t tokens[16]; // We expect no more than 16 tokens
int retVal;
Bool isPublishResult;
Bool isHistoryRequest;
buf[bufLen] = '\0';
jsmn_init(&parser);
- retVal = jsmn_parse(&parser, buf, bufLen,
- tokens, (unsigned int) ARRAYSIZE(tokens));
+
+ numTokens = GDP_TOKENS_PER_ALLOC;
+ tokens = Util_SafeMalloc(numTokens * sizeof *tokens);
+ while ((retVal = jsmn_parse(&parser, buf, bufLen, tokens, numTokens))
+ == JSMN_ERROR_NOMEM) {
+ numTokens += GDP_TOKENS_PER_ALLOC;
+ tokens = Util_SafeRealloc(tokens, numTokens * sizeof *tokens);
+ }
+
if (retVal < 0) {
g_info("%s: Error %d while parsing JSON:\n%s\n",
__FUNCTION__, retVal, buf);
+ free(tokens);
return;
}
*/
if (retVal < 1 || tokens[0].type != JSMN_OBJECT) {
g_info("%s: Invalid JSON:\n%s\n", __FUNCTION__, buf);
+ free(tokens);
return;
}
g_free(result.diagnosis);
} else if (isHistoryRequest) {
g_debug("%s: Received history request:\n%s\n", __FUNCTION__, buf);
- GdpTaskPushHistoryRequest(taskCtx, &request);
+ GdpTaskProcessHistoryRequest(taskCtx, &request);
} else {
g_info("%s: Unknown JSON:\n%s\n", __FUNCTION__, buf);
}
+
+ free(tokens);
}
g_queue_init(&taskCtx->cache.queue);
taskCtx->cache.sizeLimit = GdpGetHistoryCacheSizeLimit();
+ taskCtx->cache.countLimit = GdpGetHistoryCacheCountLimit();
taskCtx->cache.size = 0;
taskCtx->cache.currentLink = NULL;
* "application"
* @param[in] data Buffer containing data to publish
* @param[in] dataLen Buffer length
+ * @param[in] cacheData Cache the data if TRUE
*
* @return GDP_ERROR_SUCCESS on success.
* @return Other GdpError code otherwise.
const gchar *token, // IN, OPTIONAL
const gchar *category, // IN, OPTIONAL
const gchar *data, // IN
- guint32 dataLen) // IN
+ guint32 dataLen, // IN
+ gboolean cacheData) // IN
{
GdpError gdpErr;
gPublishState.category = category;
gPublishState.data = data;
gPublishState.dataLen = dataLen;
+ gPublishState.cacheData = cacheData;
GdpSetEvent(gPublishState.eventPublish);