]> git.ipfire.org Git - thirdparty/open-vm-tools.git/commitdiff
Change in Guest Data Producer plugin API and Guest Data Producer
authorKaty Feng <fkaty@vmware.com>
Thu, 16 Nov 2023 17:21:20 +0000 (09:21 -0800)
committerKaty Feng <fkaty@vmware.com>
Thu, 16 Nov 2023 17:21:20 +0000 (09:21 -0800)
protocol/handshake between the host and guest to support ‘no subscribers’
error feedback.

The Service Discovery plugin was updated for API compatibility.

Changes in the GDP protocol/handshake introduce versioning of the protocol.
The new version is “2” and introduces new attributes (‘version’ and
‘requireSubs’) in the guest request header sent to the host publisher service.
The response from the host publisher service to the guest producer client is
also versioned and changes more significantly: V2 response messages have new
attributes (‘version’, ‘error-id’, and ‘error-text’) and have the ‘status’
attributes type change from string (‘ok’, ‘bad’) to Boolean (‘true’, ‘false’).
The ‘error-id’, when present, contains a string identifier for an error
type/code and ‘error-text’, when present, contains textual details for the error.

The host publisher supports all protocol versions up to the version it provides
for backward and forward compatibility. The host publisher service responds with
the same version of protocol as the incoming request or its highest protocol
version when the incoming request version is higher than it supports.
The guest producer client supports all response versions up to the version it
provides for backward and forward compatibility. The guest producer client
always sends request to the host publisher service using the highest protocol
version it supports.

open-vm-tools/lib/include/vmware/tools/gdp.h
open-vm-tools/services/plugins/gdp/gdpPlugin.c
open-vm-tools/services/plugins/serviceDiscovery/serviceDiscovery.c

index c071cd5320c1216ecb1ee1996f38ea59c29ffd51..ec59e06b583818bb76b12b015f6373b5101ba436 100644 (file)
@@ -40,6 +40,31 @@ extern "C++" {
 
 #include "vmware/tools/plugin.h"
 
+/*
+ * GDP Protocol version
+ */
+#define GDP_PROTOCOL_VERSION 2
+
+/*
+ * GDP Protocol version before versioning was introduced.
+ */
+#define GDP_PROTOCOL_UNVERSIONED_VERSION 1
+
+/*
+ * First GDP Protocol version to support versioning.
+ */
+#define GDP_PROTOCOL_VERSIONED_VERSION 2
+
+/*
+ * Maximum GDP Data Message protocol version generated by GDP plugin.
+ */
+#define GDP_PROTOCOL_DM_MAX_VERSION GDP_PROTOCOL_VERSION
+
+/*
+ * Maximum GDP Data Message Response protocol version handled by GDP plugin.
+ */
+#define GDP_PROTOCOL_DM_RESP_MAX_VERSION GDP_PROTOCOL_VERSION
+
 /*
  * Size in bytes:
  * 17 * 4096 - Maximum VMCI datagram size
@@ -63,33 +88,48 @@ extern "C++" {
 
 /*
  * GdpError definitions.
+ * The GDP_ERR_ITEM Tuple is:
+ *   - GdpEnum name
+ *   - error-id string id
+ *   - Default error message string
  */
 #define GDP_ERR_LIST                                      \
    GDP_ERR_ITEM(GDP_ERROR_SUCCESS = 0,                    \
+                "success",                                \
                 "No error")                               \
    GDP_ERR_ITEM(GDP_ERROR_INVALID_DATA,                   \
+                "invalid-data",                           \
                 "Invalid data")                           \
    GDP_ERR_ITEM(GDP_ERROR_DATA_SIZE,                      \
+                "data-size",                              \
                 "Data size too large")                    \
    GDP_ERR_ITEM(GDP_ERROR_GENERAL,                        \
+                "error",                                  \
                 "General error")                          \
    GDP_ERR_ITEM(GDP_ERROR_STOP,                           \
+                "stopped-for-shutdown",                   \
                 "Stopped for vmtoolsd shutdown")          \
    GDP_ERR_ITEM(GDP_ERROR_UNREACH,                        \
+                "publisher-unreachable",                  \
                 "Host daemon unreachable")                \
    GDP_ERR_ITEM(GDP_ERROR_TIMEOUT,                        \
-                "Operation timed out")
+                "timeout",                                \
+                "Operation timed out")                    \
+   GDP_ERR_ITEM(GDP_ERROR_NO_SUBSCRIBERS,                 \
+                "no-subscribers",                         \
+                "No subscribers for data")
 
 /*
- * GdpError codes.
+ * GdpError codes enum.
  */
-#define GDP_ERR_ITEM(a, b) a,
+#define GDP_ERR_ITEM(a, b, c) a,
 typedef enum GdpError {
    GDP_ERR_LIST
    GDP_ERR_MAX
 } GdpError;
 #undef GDP_ERR_ITEM
 
+
 /**
  * @brief Type of the public interface of the gdp plugin service.
  *
@@ -103,7 +143,8 @@ typedef struct ToolsPluginSvcGdp {
                        const gchar *category,
                        const gchar *data,
                        guint32 dataLen,
-                       gboolean cacheData);
+                       gboolean cacheData,
+                       gboolean requireSubs);
 } ToolsPluginSvcGdp;
 
 
@@ -127,6 +168,7 @@ typedef struct ToolsPluginSvcGdp {
  * @param[in]          data        Buffer containing data to publish
  * @param[in]          dataLen     Buffer length
  * @param[in]          cacheData   Cache the data if TRUE
+ * @param[in]          requireSubs Require subscriber(s) if TRUE
  *
  * @return GDP_ERROR_SUCCESS on success.
  * @return Other GdpError code otherwise.
@@ -142,13 +184,14 @@ ToolsPluginSvcGdp_Publish(ToolsAppCtx *ctx,      // IN
                           const gchar *category, // IN, OPTIONAL
                           const gchar *data,     // IN
                           guint32 dataLen,       // IN
-                          gboolean cacheData)    // IN
+                          gboolean cacheData,    // IN
+                          gboolean requireSubs)  // IN
 {
    ToolsPluginSvcGdp *svcGdp = NULL;
    g_object_get(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, &svcGdp, NULL);
    if (svcGdp != NULL && svcGdp->publish != NULL) {
       return svcGdp->publish(createTime, topic, token,
-                             category, data, dataLen, cacheData);
+                             category, data, dataLen, cacheData, requireSubs);
    }
    return GDP_ERROR_GENERAL;
 }
index 96e6d717bb96816a6bf8153ebfb2e9e3ef6b5a32..0e67ccac580c28e26c7c69a962791f20f068e2e2 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (c) 2020-2021 VMware, Inc. All rights reserved.
+ * Copyright (c) 2020-2021,2023 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -118,13 +118,130 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING);
  */
 #define GDP_WAIT_RESULT_TIMEOUT 1500 // ms
 
-#define GDP_PACKET_JSON_LINE_SUBSCRIBERS \
-   "      \"subscribers\":[%s],\n"
+/*
+ * GDP Protocol
+ *
+ * The Producer (Guest) to Publisher (Host) protocol specification and
+ * implementation aim to be backward compatible.
+ *
+ * Newer gdp plugin and clients using it must handle operation with hosts using
+ * older versions of the protocol and implementation.
+ *
+ * The GDP protocol messages supported (newest to oldest) are described below:
+ *
+ * V2 --- Data Message
+ * {
+ *    "header": {
+ *       "sequence":uint64,
+ *       "version":int,
+ *       "subscribers":[int-array],
+ *       "requireSubs":Boolean,
+ *       "createTime":"time-string",
+ *       "topic":"string",
+ *       "token":"string"
+ *    }
+ *    "payload": {
+ *       "category":"string",
+ *       "base64":"base64-encoded-string"
+ *    }
+ * }
+ * NOTES:
+ *   - Required attributes:
+ *       o In header: sequence, version (new in V2), createTime, topic, token
+ *       o In payload: category, base64
+ *   - Optional fields
+ *       o In header:
+ *           + subscribers: Present only in History Data Message.
+ *           + requireSubs: Present only when Data Producer requires subscriber
+ *                          to be present/subscribed (new in V2).
+ *
+ * V2 --- Response
+ * {
+ *    "sequence":uint64,
+ *    "version":int,
+ *    "status":Boolean,
+ *    "error-id":"id-string",
+ *    "error-text":"string",
+ *    "rateLimit":int
+ * }
+ * NOTES:
+ *   - Required attributes:
+ *       o sequence, status, rateLimit
+ *   - Required but optional for Backward compatibility.
+ *       o version: Required when the request header was version >= 2 AND the
+ *                  host gdp daemon supports versions >= 2.
+ *   - Optional fields:
+ *       o error-id: Required when version >= 2 AND status==false; else ignored.
+ *       o error-text: Same conditions as error-id.
+ *
+ * -------------------------------------
+ *
+ * Unversioned/original -- Data Message
+ * {
+ *    "header": {
+ *       "sequence":uint64,
+ *       "subscribers":[int-array],
+ *       "createTime":"time-string",
+ *       "topic":"string",
+ *       "token":"string"
+ *    }
+ *    "payload": {
+ *       "category":"string",
+ *       "base64":"base64-encoded-string"
+ *    }
+ * }
+ * NOTES:
+ *   - Required attributes:
+ *       o In header: sequence, createTime, topic, token
+ *       o In payload: category, base64
+ *   - Optional fields
+ *       o In header:
+ *           + subscribers: Present only in History Data Message.
+ *
+ * Unversioned/original -- Response
+ * {
+ *    "sequence":uint64,
+ *    "status":"string-{ok|bad}",
+ *    "diagnosis":"formatted-string",
+ *    "rateLimit":int
+ * }
+ * NOTES:
+ *   - Required attributes:
+ *       o sequence, status, rateLimit
+ *   - Optional attributes:
+ *       o diagnosis: Present when error (status is 'bad') or for diagnostics
+ *                    (debug only)
+ *   - The 'status' field type changes from 'string' in V1 to 'boolean' in V2.
+ *     This adds some complexity in format validation.
+ *
+ * -------------------------------------
+ * Backward Compatibility
+ *   Response parsing can handle all message version, required attributes
+ *   expectations must be met and rely on the provided version field (or the
+ *   default version) to evaluate if a valid response was received.
+ *
+ *   A V2 request can receive an unversioned ('V1') response from an old host
+ *   and must handle the format. The impact is some functionality will not be
+ *   available to the gdp plugin clients.
+ *
+ * Forward Compatibility
+ *   Unknown attributes are ignored.
+ *
+ *   A host will not send a response with an higher version field value than the
+ *   data message it is responding to. But if it did, the gdp plugin will ignore
+ *   unknown attributes.
+ *
+ *   When receiving a response with a version field value lower then what the
+ *   gdp plugin can support, this indicate the highest protocol version the host
+ *   supports.
+ */
 
 #define GDP_PACKET_JSON \
    "{\n" \
    "   \"header\": {\n" \
    "      \"sequence\":%" G_GUINT64_FORMAT ",\n" \
+   "      \"version\":%" G_GUINT64_FORMAT ",\n" \
+   "%s" \
    "%s" \
    "      \"createTime\":\"%s\",\n" \
    "      \"topic\":\"%s\",\n" \
@@ -136,17 +253,32 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING);
    "   }\n" \
    "}"
 
+#define GDP_PACKET_JSON_LINE_SUBSCRIBERS \
+   "      \"subscribers\":[%s],\n"
+
+#define GDP_PACKET_JSON_LINE_REQUIRE_SUBS \
+   "      \"requireSubs\":%s,\n"
+
 #define GDP_RESULT_SEQUENCE   "sequence"  // Required, <uint64> e.g. 12345
-#define GDP_RESULT_STATUS     "status"    // Required,  "ok", or "bad" for
+#define GDP_RESULT_VERSION    "version"   // V2: Required, <uint> e.g. 2
+#define GDP_RESULT_STATUS     "status"    // V1: Required,  "ok", or "bad" for
                                           // malformed and rejected packet
+                                          // V2: Required, true, or false for
+                                          // malformed and rejected packet, and
+                                          // other errors (see error-id and
+                                          // error-text).
 #define GDP_RESULT_DIAGNOSIS  "diagnosis" // Optional
+#define GDP_RESULT_ERROR_ID   "error-id"  // V2: Required Error ID to map to
+                                          // GdpError enum.
+#define GDP_RESULT_ERROR_TEXT "error-text"// V2: Optional Provided error message.
 #define GDP_RESULT_RATE_LIMIT "rateLimit" // Required,  <int>
                                           // e.g. 2 packets per second
 
-#define GDP_RESULT_STATUS_OK  "ok"
-#define GDP_RESULT_STATUS_BAD "bad"
+#define GDP_RESULT_STATUS_OK  "ok"        // V1: possible value for status
+#define GDP_RESULT_STATUS_BAD "bad"       // V1: possible value for status
 
 #define GDP_RESULT_REQUIRED_KEYS 3
+#define GDP_RESULT_V2_REQUIRED_KEYS 4
 
 #define GDP_HISTORY_REQUEST_PAST_SECONDS   "pastSeconds"   // <uint64>,
                                                            // Required
@@ -155,7 +287,6 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING);
                                                            // Required
 #define GDP_HISTORY_REQUEST_TOPIC_PREFIXES "topicPrefixes" // <String array>,
                                                            // Optional
-
 #define GDP_HISTORY_REQUEST_REQUIRED_KEYS 2
 
 /*
@@ -176,44 +307,64 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING);
 
 #define GDP_TOKENS_PER_ALLOC 50
 
+
+/*
+ * Gdp protocol 'error-id' response attribute table.
+ * From GDP_ERR_ITEM tuple:
+ *   - GdpEnum name
+ *   - error-id string id
+ *   - Default error message string
+ */
+#define GDP_ERR_ITEM(a, b, c) b,
+static const char * const gdpErrIds[] = {
+GDP_ERR_LIST
+};
+#undef GDP_ERR_ITEM
+
 /*
  * GdpError message table.
+ * From GDP_ERR_ITEM tuple:
+ *   - GdpEnum name
+ *   - error-id string id
+ *   - Default error message string
  */
-#define GDP_ERR_ITEM(a, b) b,
+#define GDP_ERR_ITEM(a, b, c) c,
 static const char * const gdpErrMsgs[] = {
 GDP_ERR_LIST
 };
 #undef GDP_ERR_ITEM
 
+static GdpError GetGdpErrorFromErrorId(const char *erroIdStr); // IN
+
 
 typedef struct PluginState {
    ToolsAppCtx *ctx;       /* Tools application context */
 
    Atomic_Bool started;    /* TRUE : Guest data publishing is started
-                              FALSE: otherwise
-                              Transitions from FALSE to TRUE only */
+                            * FALSE: otherwise
+                            * Transitions from FALSE to TRUE only */
 
 #if defined(_WIN32)
    Bool wsaStarted;        /* TRUE : WSAStartup succeeded, WSACleanup required
-                              FALSE: otherwise */
+                            * FALSE: otherwise */
 #endif
    int vmciFd;             /* vSocket address family value fd */
    int vmciFamily;         /* vSocket address family value */
    SOCKET sock;            /* Datagram socket for publishing guest data */
 #if defined(_WIN32)
    GdpEvent eventNetwork;  /* The network event object:
-                              To be associated with network
-                              send/recv ready event */
+                            * To be associated with network
+                            * send/recv ready event */
 #endif
 
    GdpEvent eventStop;     /* The stop event object:
-                              Signalled to stop guest data publishing */
+                            * Signalled to stop guest data publishing */
    Atomic_Bool stopped;    /* TRUE : Guest data publishing is stopped
-                              FALSE: otherwise
-                              Transitions from FALSE to TRUE only */
+                            * FALSE: otherwise
+                            * Transitions from FALSE to TRUE only */
 
    GdpEvent eventConfig;   /* The config event object:
-                              Signalled to update config */
+                            * Signalled to update config */
 } PluginState;
 
 static PluginState gPluginState;
@@ -227,13 +378,14 @@ typedef struct PublishState {
     * the gdp task thread.
     */
    gint64 createTime; /* Real wall-clock time,
-                         in microseconds since January 1, 1970 UTC. */
+                       * in microseconds since January 1, 1970 UTC. */
    const gchar *topic;
    const gchar *token;
    const gchar *category;
    const gchar *data;
    guint32 dataLen;
    gboolean cacheData;
+   gboolean requireSubs; /* Subscriber presence required by publisher */
 
    /*
     * The publish event object:
@@ -267,14 +419,14 @@ typedef enum GdpTaskEvent {
 
 typedef enum GdpTaskMode {
    GDP_TASK_MODE_NONE,    /* Not publishing
-                             valid with GDP_TASK_STATE_IDLE only */
+                           * valid with GDP_TASK_STATE_IDLE only */
    GDP_TASK_MODE_PUBLISH, /* Publishing new data */
    GDP_TASK_MODE_HISTORY, /* Publishing history data */
 } GdpTaskMode;
 
 typedef enum GdpTaskState {
    GDP_TASK_STATE_IDLE,             /* Not started
-                                       valid with GDP_TASK_MODE_NONE only */
+                                     * valid with GDP_TASK_MODE_NONE only */
    GDP_TASK_STATE_WAIT_TO_SEND,     /* Wait to send JSON packet */
    GDP_TASK_STATE_WAIT_FOR_RESULT1, /* Wait for publish result from daemon */
    GDP_TASK_STATE_WAIT_FOR_RESULT2, /* Wait for publish result after re-send */
@@ -282,9 +434,14 @@ typedef enum GdpTaskState {
 
 typedef struct PublishResult {
    guint64 sequence; /* Result for the packet with this sequence number */
-   Bool statusOk;    /* TRUE: ok, FALSE: bad */
+   Bool statusOk;    /* TRUE: ok|true, FALSE: bad|false */
    gchar *diagnosis; /* Diagnosis message if statusOk is FALSE */
    gint32 rateLimit; /* VMCI peer rate limit */
+   guint64 version;  /* Response message protocol version, since v2 */
+   GdpError errorId; /* GdpError from error-id when statusOk is FALSE,
+                      * GDP_ERROR_SUCCESS otherwise */
+   gchar *errorText; /* error-text; can be provided when when statusOk is
+                      * FALSE; NULL otherwise */
 } PublishResult;
 
 typedef struct HistoryRequest {
@@ -301,6 +458,9 @@ typedef struct HistoryCacheItem {
    gchar *category;
    gchar *data;
    guint32 dataLen;   /* Guest data - end */
+   gboolean requireSubs; /* Publisher requires subscriber to publish Guest Data.
+                          * Requires a V2 (and up) protocol compatible host;
+                          * ignored otherwise */
    gint64 cacheTime;  /* Monotonic time point when item is cached */
    guint32 itemSize;  /* Item size in bytes */
 } HistoryCacheItem;
@@ -311,7 +471,7 @@ typedef struct HistoryCache {
    guint32 countLimit; /* Cache item count limit */
    guint32 size;       /* Current cache buffer size */
    GList *currentLink; /* Pointer to the history cache queue link
-                          currently being published */
+                        * currently being published */
 } HistoryCache;
 
 typedef struct TaskContext {
@@ -351,9 +511,9 @@ typedef struct TaskContext {
    guint32 packetLen;   /* JSON packet length */
 
    gint64 timeoutAt;    /* Time out at this monotonic time point,
-                           in microseconds. */
+                         * in microseconds. */
    gint64 sendAfter;    /* Send to daemon after this monotonic time point,
-                           in microseconds. */
+                         * in microseconds. */
 } TaskContext;
 
 
@@ -433,7 +593,8 @@ GdpTaskHistoryCachePushItem(TaskContext *taskCtx,  // IN/OUT
                             const gchar *token,    // IN
                             const gchar *category, // IN
                             const gchar *data,     // IN
-                            guint32 dataLen);      // IN
+                            guint32 dataLen,       // IN
+                            gboolean requireSubs); // IN
 
 static gchar *
 GdpGetFormattedUtcTime(gint64 utcTime); // IN
@@ -446,6 +607,7 @@ GdpTaskBuildPacket(TaskContext *taskCtx,      // IN/OUT
                    const gchar *category,     // IN, OPTIONAL
                    const gchar *data,         // IN
                    guint32 dataLen,           // IN
+                   gboolean requireSubs,      // IN
                    const gchar *subscribers); // IN, OPTIONAL
 
 static inline void
@@ -540,6 +702,38 @@ static void
 GdpDestroy(void);
 
 
+/*
+ ******************************************************************************
+ * GetGdpErrorFromErrorId --
+ *
+ * Translate an error-id string to a GdpError enumeration.
+ *
+ * @return The GdpError match
+ * @return GDP_ERR_MAX otherwise.
+ *
+ ******************************************************************************
+ */
+
+static GdpError
+GetGdpErrorFromErrorId(const char *errorIdStr) // IN
+{
+   if (errorIdStr != NULL && errorIdStr[0] != '\0') {
+
+      int i; // Err: 'for' loop initial declarations only allowed in C99 mode
+
+      for (i = 0; i < GDP_ERR_MAX; i++) {
+         if (strcmp(gdpErrIds[i], errorIdStr) == 0) {
+            // Match
+            return i;
+         }
+      }
+   }
+
+   // Return GDP_ERR_MAX for not found/invalid
+   return GDP_ERR_MAX;
+}
+
+
 /*
  ******************************************************************************
  * GdpCreateEvent --
@@ -1311,6 +1505,7 @@ GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx) // IN/OUT
  *                                 "application"
  * @param[in]          data        Buffer containing data to publish
  * @param[in]          dataLen     Buffer length
+ * @param[in]          requireSubs Subscriber required flag
  *
  ******************************************************************************
  */
@@ -1322,7 +1517,8 @@ GdpTaskHistoryCachePushItem(TaskContext *taskCtx,  // IN/OUT
                             const gchar *token,    // IN
                             const gchar *category, // IN
                             const gchar *data,     // IN
-                            guint32 dataLen)       // IN
+                            guint32 dataLen,       // IN
+                            gboolean requireSubs)  // IN
 {
    HistoryCacheItem *item;
 
@@ -1338,6 +1534,7 @@ GdpTaskHistoryCachePushItem(TaskContext *taskCtx,  // IN/OUT
    item->data = (gchar *) Util_SafeMalloc(dataLen);
    Util_Memcpy(item->data, data, dataLen);
    item->dataLen = dataLen;
+   item->requireSubs = requireSubs;
 
    item->cacheTime = g_get_monotonic_time();
    item->itemSize = ((guint32) sizeof(HistoryCacheItem))
@@ -1420,6 +1617,10 @@ GdpGetFormattedUtcTime(gint64 utcTime) // IN
  *                                  "application"
  * @param[in]          data         Buffer containing data to publish
  * @param[in]          dataLen      Buffer length
+ * @param[in]          requireSubs  Indicate whether the publisher requires or
+ *                                  not that subscribers be present. Expected to
+ *                                  be false for history data. Ignored by pre-V2
+ *                                  protocol host.
  * @param[in,optional] subscribers  For history data only, NULL for new data
  *
  * @return GDP_ERROR_SUCCESS on success.
@@ -1436,10 +1637,12 @@ GdpTaskBuildPacket(TaskContext *taskCtx,     // IN/OUT
                    const gchar *category,    // IN, OPTIONAL
                    const gchar *data,        // IN
                    guint32 dataLen,          // IN
+                   const gboolean requireSubs, // IN
                    const gchar *subscribers) // IN, OPTIONAL
 {
    gchar base64Data[GDP_MAX_PACKET_LEN + 1]; // Add a space for NULL
    gchar *subscribersLine = NULL;
+   gchar *subscribersRequiredLine = NULL;
    gchar *formattedTime;
    GdpError gdpErr;
 
@@ -1457,13 +1660,24 @@ GdpTaskBuildPacket(TaskContext *taskCtx,     // IN/OUT
                                         subscribers);
    }
 
+   // subscribers required is optional in (V2).
+   // since: GDP_PROTOCOL_VERSIONED_VERSION
+   if (requireSubs) {
+      subscribersRequiredLine =
+         g_strdup_printf(GDP_PACKET_JSON_LINE_REQUIRE_SUBS,
+                         "true");
+   }
+
    formattedTime = GdpGetFormattedUtcTime(createTime);
 
    ASSERT(taskCtx->packet == NULL);
    taskCtx->packet = g_strdup_printf(GDP_PACKET_JSON,
                                      ++taskCtx->sequence,
+                                     (uint64) GDP_PROTOCOL_VERSION,
                                      subscribersLine != NULL ?
                                         subscribersLine : "",
+                                     subscribersRequiredLine != NULL ?
+                                        subscribersRequiredLine : "",
                                      formattedTime != NULL ?
                                         formattedTime : "",
                                      topic,
@@ -1484,6 +1698,7 @@ GdpTaskBuildPacket(TaskContext *taskCtx,     // IN/OUT
 
    g_free(formattedTime);
    g_free(subscribersLine);
+   g_free(subscribersRequiredLine);
    return gdpErr;
 }
 
@@ -1748,6 +1963,7 @@ GdpTaskPublishHistory(TaskContext *taskCtx) // IN/OUT
                                item->category,
                                item->data,
                                item->dataLen,
+                               item->requireSubs,
                                subscribers);
    if (gdpErr != GDP_ERROR_SUCCESS) {
       /*
@@ -1881,8 +2097,12 @@ GdpJsonIsPublishResult(const char *json,        // IN
                        PublishResult *result)   // OUT
 {
    int index;
-   int requiredKeys = GDP_RESULT_REQUIRED_KEYS;
+   int requiredKeysCount = 0;
    gchar *diagnosis = NULL;
+   Bool handledRateLimit = FALSE;
+   Bool handledStatus = FALSE;
+
+   result->errorId = GDP_ERR_MAX; /* result usually zeroed in caller */
 
    /*
     * Loops over all keys of the root object.
@@ -1891,41 +2111,117 @@ GdpJsonIsPublishResult(const char *json,        // IN
       int tokenLen;
 
       if (GdpJsonIsTokenOfKey(json, &tokens[index], GDP_RESULT_SEQUENCE)) {
-         requiredKeys--;
+         ASSERT(result->sequence == 0); /* duplicate check */
+         requiredKeysCount++;
          index++;
          result->sequence = g_ascii_strtoull(json + tokens[index].start,
                                              NULL, 10);
+         ASSERT(result->sequence != 0); /* no! */
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_RESULT_VERSION)) {
+         ASSERT(result->version == 0); /* duplicate check */
+         requiredKeysCount++;
+         index++;
+         result->version = g_ascii_strtoull(json + tokens[index].start,
+                                            NULL, 10);
+         ASSERT(result->version > 0); /* version == 0 is bad */
       } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
                                      GDP_RESULT_STATUS)) {
-         requiredKeys--;
+         ASSERT(handledStatus == FALSE); /* duplicate check */
+         handledStatus = TRUE;
+         requiredKeysCount++;
          index++;
          tokenLen = tokens[index].end - tokens[index].start;
+         /*
+          * V1 status: 'ok' or V2 status: 'true'
+          */
          if ((int) strlen(GDP_RESULT_STATUS_OK) == tokenLen &&
              strncmp(json + tokens[index].start,
                      GDP_RESULT_STATUS_OK, tokenLen) == 0) {
+            /*
+             * Success V1
+             */
+            result->statusOk = TRUE;
+         } else if (4 == tokenLen &&
+                    strncmp(json + tokens[index].start,
+                            "true", tokenLen) == 0) {
+            /*
+             * Success V2
+             */
             result->statusOk = TRUE;
          } else {
+            /*
+             * Anything else is error response
+             */
             result->statusOk = FALSE;
          }
       } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
                                      GDP_RESULT_DIAGNOSIS)) {
+         ASSERT(diagnosis == NULL); /* duplicate check */
          index++;
-         ASSERT(diagnosis == NULL);
          tokenLen = tokens[index].end - tokens[index].start;
          diagnosis = g_strndup(json + tokens[index].start, tokenLen);
       } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
                                      GDP_RESULT_RATE_LIMIT)) {
-         requiredKeys--;
+         ASSERT(handledRateLimit == FALSE); /* duplicate check */
+         handledRateLimit = TRUE;
+         requiredKeysCount++;
          index++;
          result->rateLimit = atoi(json + tokens[index].start);
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_RESULT_ERROR_ID)) {
+         gchar *errorIdStr = NULL;
+         ASSERT(result->errorId == GDP_ERR_MAX); /* duplicate check */
+         /*
+          * Forward compatible error-id lookup, set to a valid error
+          */
+         index++;
+         tokenLen = tokens[index].end - tokens[index].start;
+         errorIdStr = g_strndup(json + tokens[index].start, tokenLen);
+         result->errorId = GetGdpErrorFromErrorId(errorIdStr);
+         if (result->errorId == GDP_ERR_MAX) {
+            /*
+             * Unknown error-id error
+             */
+            g_warning("%s: Unknown error-id: '%s' converting to '%s'",
+                      __FUNCTION__, errorIdStr, gdpErrIds[GDP_ERROR_GENERAL]);
+            result->errorId = GDP_ERROR_GENERAL;
+         }
+         g_free(errorIdStr);
+      } else if (GdpJsonIsTokenOfKey(json, &tokens[index],
+                                     GDP_RESULT_ERROR_TEXT)) {
+         ASSERT(result->errorText == NULL); /* duplicate check */
+         index++;
+         tokenLen = tokens[index].end - tokens[index].start;
+         result->errorText = g_strndup(json + tokens[index].start, tokenLen);
       }
-   }
+   } /* for index */
 
-   if (requiredKeys == 0) {
+   /*
+    * Make sure requiredKeysCount for Unversioned or versioned protocol matches
+    * their respective expectations.
+    */
+   if (result->version < GDP_PROTOCOL_VERSIONED_VERSION &&
+       requiredKeysCount == GDP_RESULT_REQUIRED_KEYS) {
+      /*
+       * Assign diagnosis for unversioned protocol
+       */
       result->diagnosis = diagnosis;
       return TRUE;
+   } else if (result->version >= GDP_PROTOCOL_VERSIONED_VERSION &&
+              requiredKeysCount == GDP_RESULT_V2_REQUIRED_KEYS) {
+      /*
+       * diagnosis not used for versioned protocol, clean it up;
+       * error-text is assigned during parsing when present.
+       */
+      g_free(diagnosis);
+      return TRUE;
    } else {
       g_free(diagnosis);
+      if (result->errorText != NULL) {
+         g_free(result->errorText);
+         result->errorText = NULL;
+      }
       return FALSE;
    }
 }
@@ -1964,28 +2260,49 @@ GdpTaskProcessPublishResult(TaskContext *taskCtx,        // IN/OUT
    }
 
    if (!result->statusOk) {
-      g_info("%s: Publish failed: %s\n", __FUNCTION__,
-             result->diagnosis ? result->diagnosis : "");
+      if (result->version >= GDP_PROTOCOL_VERSIONED_VERSION) {
+         // V2 and up; use result->errorId and errorText
+         g_info("%s: Publish failed: Id(%d), Message: %s\n", __FUNCTION__,
+                result->errorId, result->errorText ? result->errorText : "");
+      } else {
+         g_info("%s: Publish failed: %s\n", __FUNCTION__,
+                result->diagnosis ? result->diagnosis : "");
+      }
    }
 
    if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) {
-      if (result->statusOk) {
-         if (GdpTaskIsHistoryCacheEnabled(taskCtx) &&
-             gPublishState.cacheData) {
-            GdpTaskHistoryCachePushItem(taskCtx,
-                                        gPublishState.createTime,
-                                        gPublishState.topic,
-                                        gPublishState.token,
-                                        gPublishState.category,
-                                        gPublishState.data,
-                                        gPublishState.dataLen);
-         }
+      Bool addToHistory = FALSE;
 
+      if (result->statusOk) {
          gPublishState.gdpErr = GDP_ERROR_SUCCESS;
+         addToHistory = TRUE;
+      } else if (result->version >= GDP_PROTOCOL_VERSIONED_VERSION) {
+         // V2 and up; use result->errorId
+         gPublishState.gdpErr = result->errorId;
+
+         if (gPublishState.requireSubs &&
+             result->errorId == GDP_ERROR_NO_SUBSCRIBERS) {
+            // Add Data Message to history on no-subscriber error.
+            addToHistory = TRUE;
+         }
       } else {
+         // Unversioned/original - default error response.
          gPublishState.gdpErr = GDP_ERROR_INVALID_DATA;
       }
 
+      if (addToHistory &&
+          GdpTaskIsHistoryCacheEnabled(taskCtx) &&
+          gPublishState.cacheData) {
+         GdpTaskHistoryCachePushItem(taskCtx,
+                                     gPublishState.createTime,
+                                     gPublishState.topic,
+                                     gPublishState.token,
+                                     gPublishState.category,
+                                     gPublishState.data,
+                                     gPublishState.dataLen,
+                                     gPublishState.requireSubs);
+      }
+
       GdpSetEvent(gPublishState.eventGetResult);
    }
 
@@ -2233,6 +2550,7 @@ GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT
    if (isPublishResult) {
       GdpTaskProcessPublishResult(taskCtx, &result);
       g_free(result.diagnosis);
+      g_free(result.errorText);
    } else if (isHistoryRequest) {
       g_debug("%s: Received history request:\n%s\n", __FUNCTION__, buf);
       GdpTaskProcessHistoryRequest(taskCtx, &request);
@@ -2280,6 +2598,7 @@ GdpTaskProcessPublish(TaskContext *taskCtx) // IN/OUT
                                gPublishState.category,
                                gPublishState.data,
                                gPublishState.dataLen,
+                               gPublishState.requireSubs,
                                NULL);
    if (gdpErr != GDP_ERROR_SUCCESS) {
       goto fail;
@@ -3041,6 +3360,7 @@ GdpDestroy(void)
  * @param[in]          data        Buffer containing data to publish
  * @param[in]          dataLen     Buffer length
  * @param[in]          cacheData   Cache the data if TRUE
+ * @param[in]          requireSubs Require subscribers if TRUE
  *
  * @return GDP_ERROR_SUCCESS on success.
  * @return Other GdpError code otherwise.
@@ -3055,7 +3375,8 @@ GdpPublish(gint64 createTime,     // IN
            const gchar *category, // IN, OPTIONAL
            const gchar *data,     // IN
            guint32 dataLen,       // IN
-           gboolean cacheData)    // IN
+           gboolean cacheData,    // IN
+           gboolean requireSubs)  // IN
 {
    GdpError gdpErr;
 
@@ -3099,6 +3420,7 @@ GdpPublish(gint64 createTime,     // IN
    gPublishState.data = data;
    gPublishState.dataLen = dataLen;
    gPublishState.cacheData = cacheData;
+   gPublishState.requireSubs = requireSubs;
 
    GdpSetEvent(gPublishState.eventPublish);
 
index 103cf14ee714f5d8f1fb088c19df115f5468f04a..e35f1c00800372b360818d8dca7bb212a44964fe 100644 (file)
@@ -1,5 +1,5 @@
 /*********************************************************
- * Copyright (C) 2020-2021,2023 VMware, Inc. All rights reserved.
+ * Copyright (c) 2020-2021,2023 VMware, Inc. All rights reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU Lesser General Public License as published
@@ -105,7 +105,7 @@ static gchar* scriptInstallDir = NULL;
 #define SERVICE_DISCOVERY_POLL_INTERVAL 300000
 
 /*
- * Time shift for comparision of time read from the signal and
+ * Time shift for comparison of time read from the signal and
  * current system time in milliseconds.
  */
 #define SERVICE_DISCOVERY_WRITE_DELTA 60000
@@ -119,23 +119,31 @@ static gchar* scriptInstallDir = NULL;
  * Defines the configuration to cache data in gdp plugin
  */
 #define CONFNAME_SERVICEDISCOVERY_CACHEDATA "cache-data"
-
 #define SERVICE_DISCOVERY_CONF_DEFAULT_CACHEDATA TRUE
 
+/*
+ * Define the configuration to require at least one subscriber subscribed for
+ * the gdp message.
+ *
+ * TODO: SD maintainer to update default to TRUE when ready.
+ */
+#define CONFNAME_SERVICEDISCOVERY_REQUIRESUBS "require-subscribers"
+#define SERVICE_DISCOVERY_CONF_DEFAULT_REQUIRESUBS FALSE
+
 #define SERVICE_DISCOVERY_TOPIC_PREFIX "serviceDiscovery"
 
 #if defined(VMX86_DEBUG)
 /*
  * Defines the configuration to identify whether is in GDP debug mode
  *
- * Tools daemon restart is required to apply this setting's cahnge
+ * Tools daemon restart is required to apply this setting's change
  */
 #define CONFNAME_SERVICEDISCOVERY_GDP_DEBUG "gdp-debug"
 
 /*
  * Defines the configuration to customize polling interval for GDP debug
  *
- * Tools daemon restart is required to apply this setting's cahnge
+ * Tools daemon restart is required to apply this setting's change
  */
 #define CONFNAME_SERVICEDISCOVERY_GDP_POLL_INTERVAL "poll-interval"
 
@@ -159,8 +167,12 @@ static Bool isGDPDebug = FALSE;
 
 /*
  * GdpError message table.
+ * From GDP_ERR_ITEM tuple:
+ *   - GdpEnum name
+ *   - error-id string id
+ *   - Default error message string
  */
-#define GDP_ERR_ITEM(a, b) b,
+#define GDP_ERR_ITEM(a, b, c) c,
 static const char * const gdpErrMsgs[] = {
 GDP_ERR_LIST
 };
@@ -317,7 +329,7 @@ SendRpcMessage(ToolsAppCtx *ctx,
  * @param[in] createTime  Data create time
  * @param[in] topic       Data topic
  * @param[in] data        Service data
- * @param[in] len         Service data len
+ * @param[in] len         Service data length
  *
  * @retval TRUE  On success.
  * @retval FALSE Failed.
@@ -334,22 +346,30 @@ SendData(ToolsAppCtx *ctx,
 {
    GdpError gdpErr;
    Bool status = FALSE;
-   Bool cacheData = VMTools_ConfigGetBoolean(ctx->config,
-                                             CONFGROUPNAME_SERVICEDISCOVERY,
-                                             CONFNAME_SERVICEDISCOVERY_CACHEDATA,
-                                             SERVICE_DISCOVERY_CONF_DEFAULT_CACHEDATA);
+   Bool cacheData = VMTools_ConfigGetBoolean(
+                       ctx->config,
+                       CONFGROUPNAME_SERVICEDISCOVERY,
+                       CONFNAME_SERVICEDISCOVERY_CACHEDATA,
+                       SERVICE_DISCOVERY_CONF_DEFAULT_CACHEDATA);
+   Bool requireSubs = VMTools_ConfigGetBoolean(
+                         ctx->config,
+                         CONFGROUPNAME_SERVICEDISCOVERY,
+                         CONFNAME_SERVICEDISCOVERY_REQUIRESUBS,
+                         SERVICE_DISCOVERY_CONF_DEFAULT_REQUIRESUBS);
 
    gdpErr = ToolsPluginSvcGdp_Publish(ctx,
                                       createTime,
                                       topic,
-                                      NULL,
-                                      NULL,
+                                      NULL, /* token (optional) */
+                                      NULL, /* category (optional) */
                                       data,
                                       len,
-                                      cacheData);
+                                      cacheData,
+                                      requireSubs);
    if (gdpErr != GDP_ERROR_SUCCESS) {
       g_info("%s: ToolsPluginSvcGdp_Publish error: %s\n",
              __FUNCTION__, gdpErrMsgs[gdpErr]);
+      /* NOTE to SD maintainer: gdpErr == GDP_ERROR_NO_SUBSCRIBERS to be handled here when ready*/
       if (gdpErr == GDP_ERROR_STOP ||
           gdpErr == GDP_ERROR_UNREACH ||
           gdpErr == GDP_ERROR_TIMEOUT) {
@@ -368,7 +388,7 @@ SendData(ToolsAppCtx *ctx,
  *
  * A wrapper of C runtime library fread() with almost same signature except
  * the item size is always 1 byte. It ensures that when the returned number
- * of bytes is less than the input buffer size in bytes, an error has occured
+ * of bytes is less than the input buffer size in bytes, an error has occurred
  * or the end of the file is encountered.
  *
  * @param [out] buf     Pointer to a block of memory with a size of at least
@@ -922,9 +942,8 @@ ServiceDiscoveryTask(ToolsAppCtx *ctx,
    if (isNDBWriteReady) {
       gint64 previousWriteTime = gLastWriteTime;
 
-
       /*
-       * We are going to write to Namespace DB, update glastWriteTime
+       * We are going to write to Namespace DB, update gLastWriteTime
        */
       gLastWriteTime = GetGuestTimeInMillis();
 
@@ -960,6 +979,7 @@ ServiceDiscoveryTask(ToolsAppCtx *ctx,
          }
       }
    }
+
    if (isGDPWriteReady && !gSkipThisTask) {
       gchar* readyData = g_strdup_printf("%"FMTSZ"u", readBytesPerCycle);
       g_debug("%s: Sending ready flag with number of read bytes :%s\n",
@@ -998,7 +1018,8 @@ ServiceDiscoveryTask(ToolsAppCtx *ctx,
  * has elapsed since the last write operation.
  *
  * @param[in] ctx           The application context.
- * @param[in] signalKey     Signal key to check the write redinness of Namespace DB or gdp.
+ * @param[in] signalKey     Signal key to check the write readiness of
+ *                          Namespace DB or gdp.
  *
  * @retval TRUE  Execute scripts and write service data to Namespace DB or gdp
  * @retval FALSE Omit this cycle wihtout any script running.