/*********************************************************
- * Copyright (C) 2020 VMware, Inc. All rights reserved.
+ * Copyright (C) 2020-2021 VMware, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* serviceDiscovery.c --
*
* Captures the information about services inside the guest
- * and writes it to Namespace DB.
+ * and writes it to either host-side gdp daemon or Namespace DB
*/
#include <string.h>
#define NSDB_PRIV_GET_VALUES_CMD "namespace-priv-get-values"
#define NSDB_PRIV_SET_KEYS_CMD "namespace-priv-set-keys"
-
#if defined (_WIN32)
#define SCRIPT_EXTN ".bat"
*/
#define SERVICE_DISCOVERY_SCRIPT_PERFORMANCE_METRICS \
"get-listening-process-perf-metrics" SCRIPT_EXTN
+
+#define _get_errno(p) (*p = errno)
+
#endif
/*
*/
#define SERVICE_DISCOVERY_RPC_WAIT_TIME 100
+/*
+ * Defines the configuration to cache data in gdp plugin
+ */
+#define CONFNAME_SERVICEDISCOVERY_CACHEDATA "cache-data"
+
+#define SERVICE_DISCOVERY_CONF_DEFAULT_CACHEDATA TRUE
+
+#define SERVICE_DISCOVERY_TOPIC_PREFIX "serviceDiscovery"
+
+
/*
* Maximum number of keys that can be deleted by one operation
*/
#define SERVICE_DISCOVERY_DELETE_CHUNK_SIZE 25
+/*
+ * GdpError message table.
+ */
+#define GDP_ERR_ITEM(a, b) b,
+static const char * const gdpErrMsgs[] = {
+GDP_ERR_LIST
+};
+#undef GDP_ERR_ITEM
typedef struct {
gchar *keyName;
static GArray *gFullPaths = NULL;
static volatile Bool gTaskSubmitted = FALSE;
+static size_t readBytesPerCycle = 0;
+static size_t cycle = 0;
+static Bool isGDPWriteReady = TRUE;
+static Bool isNDBWriteReady = TRUE;
+
+static Bool gSkipThisTask = FALSE; // Skip this task on some gdp errors.
/*
*****************************************************************************
return status;
}
+/*
+ *****************************************************************************
+ * SendData --
+ *
+ * Sends guest data to host-side gdp daemon.
+ *
+ * @param[in] ctx The application context
+ * @param[in] createTime Data create time
+ * @param[in] topic Data topic
+ * @param[in] data Service data
+ * @param[in] len Service data len
+ *
+ * @retval TRUE On success.
+ * @retval FALSE Failed.
+ *
+ *****************************************************************************
+ */
+
+Bool
+SendData(ToolsAppCtx *ctx,
+ gint64 createTime,
+ const char *topic,
+ const char *data,
+ const int len)
+{
+ GdpError gdpErr;
+ Bool status = FALSE;
+ Bool cacheData = VMTools_ConfigGetBoolean(ctx->config,
+ CONFGROUPNAME_SERVICEDISCOVERY,
+ CONFNAME_SERVICEDISCOVERY_CACHEDATA,
+ SERVICE_DISCOVERY_CONF_DEFAULT_CACHEDATA);
+
+ gdpErr = ToolsPluginSvcGdp_Publish(ctx,
+ createTime,
+ topic,
+ NULL,
+ NULL,
+ data,
+ len,
+ cacheData);
+ if (gdpErr != GDP_ERROR_SUCCESS) {
+ g_info("%s: ToolsPluginSvcGdp_Publish error: %s\n",
+ __FUNCTION__, gdpErrMsgs[gdpErr]);
+ if (gdpErr == GDP_ERROR_STOP ||
+ gdpErr == GDP_ERROR_UNREACH ||
+ gdpErr == GDP_ERROR_TIMEOUT) {
+ gSkipThisTask = TRUE;
+ }
+ } else {
+ status = TRUE;
+ }
+
+ return status;
+}
+
+/*
+ *****************************************************************************
+ * fread_safe --
+ *
+ * A wrapper of C runtime library fread() with almost same signature except
+ * the item size is always 1 byte. It ensures that when the returned number
+ * of bytes is less than the input buffer size in bytes, an error has occured
+ * or the end of the file is encountered.
+ *
+ * @param [out] buf Pointer to a block of memory with a size of at least
+ * (size) bytes, converted to a void*.
+ * @param [in] size Size, in bytes, of each element to be read.
+ * @param [in] stream Pointer to a FILE object that specifies an input stream.
+ * @param [out] eof Indicates whether end of file is reached.
+ *
+ * @retval The total number of elements successfully read is returned.
+ *
+ *****************************************************************************
+ */
+
+static size_t
+fread_safe(void *buf,
+ size_t size,
+ FILE *stream,
+ Bool *eof)
+{
+ size_t readBytes = 0;
+
+ while (readBytes < size) {
+ size_t localReadBytes;
+
+ /*
+ * fread is a blocking call.
+ */
+ localReadBytes = fread((char *)buf + readBytes, 1,
+ size - readBytes, stream);
+
+ if (ferror(stream)) {
+ int error_code = 0;
+ _get_errno(&error_code);
+ g_info("%s: fread returned %"FMTSZ"u with errno=%d\n",
+ __FUNCTION__, localReadBytes, error_code);
+ break;
+ }
+
+ readBytes += localReadBytes;
+
+ if (feof(stream)) {
+ g_debug("%s: fread reached end of file\n",
+ __FUNCTION__);
+ *eof = TRUE;
+ break;
+ }
+ }
+
+ return readBytes;
+}
/*
*****************************************************************************
return status;
}
+/*
+ *****************************************************************************
+ * DepleteReadFromStream --
+ *
+ * Reads from stream and appends to the output dynamic buffer
+ *
+ * @param[in] s Stream
+ * @param[out] out Output buffer
+ *
+ *****************************************************************************
+ */
+
+void
+DepleteReadFromStream(FILE *s,
+ DynBuf *out)
+{
+ for (;;) {
+ size_t readBytes;
+ char buf[SERVICE_DISCOVERY_VALUE_MAX_SIZE];
+
+ readBytes = fread(buf, 1, sizeof(buf), s);
+
+ g_debug("%s: readBytes = %"FMTSZ"u\n", __FUNCTION__, readBytes);
+
+ if (readBytes > 0) {
+ DynBuf_Append(out, buf, readBytes);
+ }
+
+ if (readBytes < sizeof(buf)) {
+ break;
+ }
+ }
+}
+
/*
*****************************************************************************
return status;
}
+/*
+ *****************************************************************************
+ * SendScriptOutput --
+ *
+ * Reads script child process stdout stream, sends output to
+ * host-side gdp daemon and/or namespace DB.
+ *
+ * Stream output data are cut into chunks with chunk size of 16K for namespace
+ * DB and 48K for gdp daemon. If there are multiple chunks of data, each chunk
+ * is sent to gdp daemon/namespace db separately with its chunk number in the
+ * topic.
+ *
+ * @param[in] ctx The application context
+ * @param[in] key Script name
+ * @param[in] childStdout Stream to read child process stdout
+ *
+ * @retval TRUE Successfully sent output.
+ * @retval FALSE Otherwise.
+ *
+ *****************************************************************************
+ */
+
+Bool
+SendScriptOutput(ToolsAppCtx *ctx,
+ const char *key,
+ FILE* childStdout)
+{
+ Bool status = TRUE;
+ Bool gdp_status = TRUE;
+ int i = 0;
+ size_t totalReadBytes = 0;
+ gint64 createTime = g_get_real_time();
+ size_t ndbBufSize = SERVICE_DISCOVERY_VALUE_MAX_SIZE * sizeof(char);
+ for (;;) {
+ size_t readBytes;
+ char buf[GDP_USER_DATA_LEN];
+ Bool eof = FALSE;
+ readBytes = fread_safe(buf, sizeof(buf), childStdout, &eof);
+
+ totalReadBytes += readBytes;
+ g_debug("%s: DB readBytes = %"FMTSZ"u\n", __FUNCTION__,
+ readBytes);
+ if (isGDPWriteReady && gdp_status && readBytes > 0) {
+ g_debug("%s:%s Write to GDP readBytes = %"FMTSZ"u\n",
+ __FUNCTION__, key, readBytes);
+ gchar* topic;
+
+ if (eof || readBytes < sizeof(buf)) {
+ topic = g_strdup_printf(SERVICE_DISCOVERY_TOPIC_PREFIX ".%s.%"FMTSZ
+ "u.%"FMTSZ"u", key, cycle, totalReadBytes);
+ } else {
+ topic = g_strdup_printf(SERVICE_DISCOVERY_TOPIC_PREFIX ".%s.%"FMTSZ
+ "u", key, cycle);
+ }
+ gdp_status = SendData(ctx, createTime, topic, buf, (int)readBytes);
+ readBytesPerCycle += readBytes;
+ g_free(topic);
+ }
+
+ if (isNDBWriteReady) {
+ size_t ndbReadBytes = 0;
+ size_t j;
+ for (j = 0; j < readBytes; j += ndbReadBytes) {
+ if (j + ndbBufSize > readBytes) {
+ ndbReadBytes = readBytes - j;
+ } else {
+ ndbReadBytes = ndbBufSize;
+ }
+ if (status && ndbReadBytes > 0) {
+ g_debug("%s:%s Write to Namespace DB readBytes = %"FMTSZ"u\n",
+ __FUNCTION__, key, ndbReadBytes);
+
+ gchar* msg = g_strdup_printf("%s-%d", key, ++i);
+ status = WriteData(ctx, msg, buf + j, ndbReadBytes);
+ if (!status) {
+ g_warning("%s: Failed to store data\n", __FUNCTION__);
+ }
+ g_free(msg);
+ }
+ }
+ }
+
+ /*
+ * Exit the loop only after childStdout is not readable any more.
+ * Otherwise, the child process may be blocked in writing its stdout
+ * and hang.
+ */
+ if (eof || readBytes < sizeof(buf)) {
+ break;
+ }
+
+ }
+
+ if (isNDBWriteReady && status) {
+ gchar *chunkCount = g_strdup_printf("%d", i);
+ status = WriteData(ctx, key, chunkCount, strlen(chunkCount));
+ if (status) {
+ g_debug("%s: Written key %s chunks %s\n", __FUNCTION__, key, chunkCount);
+ }
+ g_free(chunkCount);
+ }
+
+ return status && gdp_status;
+}
/*
*****************************************************************************
ServiceDiscoveryTask(ToolsAppCtx *ctx,
void *data)
{
- Bool status = FALSE;
int i;
- gint64 previousWriteTime = gLastWriteTime;
-
gTaskSubmitted = TRUE;
+ Bool status = FALSE;
+ if (isGDPWriteReady) {
+ gSkipThisTask = FALSE;
+ }
+ if (isNDBWriteReady) {
+ gint64 previousWriteTime = gLastWriteTime;
- /*
- * We are going to write to Namespace DB, update glastWriteTime
- */
- gLastWriteTime = GetGuestTimeInMillis();
- /*
- * Reset "ready" flag to stop readers until all data is written
- */
- status = WriteData(ctx, SERVICE_DISCOVERY_KEY_READY, "FALSE", 5);
- if (!status) {
- gLastWriteTime = previousWriteTime;
- g_warning("%s: Failed to reset %s flag", __FUNCTION__,
- SERVICE_DISCOVERY_KEY_READY);
- goto out;
- }
+ /*
+ * We are going to write to Namespace DB, update glastWriteTime
+ */
+ gLastWriteTime = GetGuestTimeInMillis();
- /*
- * Remove chunks written to DB in the previous iteration
- */
- CleanupNamespaceDB(ctx);
+ /*
+ * Reset "ready" flag to stop readers until all data is written
+ */
+ status = WriteData(ctx, SERVICE_DISCOVERY_KEY_READY, "FALSE", 5);
+ if (!status) {
+ gLastWriteTime = previousWriteTime;
+ g_warning("%s: Failed to reset %s flag", __FUNCTION__,
+ SERVICE_DISCOVERY_KEY_READY);
+ gTaskSubmitted = FALSE;
+ if (!isGDPWriteReady) {
+ return;
+ }
+ }
+
+ /*
+ * Remove chunks written to DB in the previous iteration
+ */
+ CleanupNamespaceDB(ctx);
+ }
+ readBytesPerCycle = 0;
+ cycle++;
for (i = 0; i < gFullPaths->len; i++) {
KeyNameValue tmp = g_array_index(gFullPaths, KeyNameValue, i);
- if (!PublishScriptOutputToNamespaceDB(ctx, tmp.keyName, tmp.val)) {
- g_debug("%s: PublishScriptOutputToNamespaceDB failed for script %s\n",
- __FUNCTION__, tmp.val);
+ if (!ExecuteScript(ctx, tmp.keyName, tmp.val)) {
+ g_debug("%s: ExecuteScript failed for script %s\n",
+ __FUNCTION__, tmp.val);
+ if (isGDPWriteReady && gSkipThisTask && !isNDBWriteReady) {
+ break;
+ }
}
}
+ if (isGDPWriteReady && !gSkipThisTask) {
+ gchar* readyData = g_strdup_printf("%"FMTSZ"u", readBytesPerCycle);
+ g_debug("%s: Sending ready flag with number of read bytes :%s\n",
+ __FUNCTION__, readyData);
+ gchar* topic = g_strdup_printf(SERVICE_DISCOVERY_TOPIC_PREFIX ".%s.%"
+ FMTSZ"u", "ready", cycle);
+ SendData(ctx, g_get_real_time(), topic, readyData, strlen(readyData));
+ g_free(topic);
+ g_free(readyData);
+ }
- /*
- * Update ready flag
- */
- status = WriteData(ctx, SERVICE_DISCOVERY_KEY_READY, "TRUE", 4);
- if (!status) {
- g_warning("%s: Failed to update ready flag", __FUNCTION__);
+ if (isNDBWriteReady) {
+ /*
+ * Update ready flag
+ */
+ status = WriteData(ctx, SERVICE_DISCOVERY_KEY_READY, "TRUE", 4);
+ if (!status) {
+ g_warning("%s: Failed to update ready flag", __FUNCTION__);
+ }
}
-out:
gTaskSubmitted = FALSE;
}
-
/*
*****************************************************************************
* checkForWrite --
* Second check - checks if time greater than interval read from Namespace DB
* has elapsed since the last write operation.
*
- * @param[in] ctx The application context.
+ * @param[in] ctx The application context.
+ * @param[in] signalKey Signal key to check the write redinness of Namespace DB or gdp.
*
- * @retval TRUE Execute scripts and write service data to Namespace DB.
+ * @retval TRUE Execute scripts and write service data to Namespace DB or gdp
* @retval FALSE Omit this cycle wihtout any script running.
*
*****************************************************************************
*/
static Bool
-checkForWrite(ToolsAppCtx *ctx)
+checkForWrite(ToolsAppCtx *ctx, const char *signalKey)
{
char *signal = NULL;
size_t signalLen = 0;
/*
* Read signal from Namespace DB
*/
- if (!ReadData(ctx, SERVICE_DISCOVERY_KEY_SIGNAL, &signal, &signalLen)) {
+ if (!ReadData(ctx, signalKey, &signal, &signalLen)) {
g_debug("%s: Failed to read necessary information from Namespace DB\n",
__FUNCTION__);
} else {
return result;
}
-
/*
*****************************************************************************
* ServiceDiscoveryThread --
*
- * Creates a new thread that collects all the desired application related
- * information and updates the Namespace DB.
+ * Creates a new task thread that gathers discovered services' data and
+ * publishes the data to either Namespace DB or host-side gdp daemon.
*
* @param[in] data The application context.
*
ServiceDiscoveryThread(gpointer data)
{
ToolsAppCtx *ctx = data;
+ isGDPWriteReady = checkForWrite(ctx, SERVICE_DISCOVERY_KEY_GDP_SIGNAL);
+ isNDBWriteReady = checkForWrite(ctx, SERVICE_DISCOVERY_KEY_SIGNAL);
/*
* First check for taskSubmitted, if it is true automatically omit this
* cycle even without checking for write to avoid resetting last write
* time.
*/
- if (gTaskSubmitted || !checkForWrite(ctx)) {
+ if ((gTaskSubmitted || !isNDBWriteReady) && !isGDPWriteReady) {
g_debug("%s: Data should not be written taskSubmitted = %s\n",
__FUNCTION__, gTaskSubmitted ? "True" : "False");
} else {
{
if (gServiceDiscoveryTimeoutSource == NULL) {
gServiceDiscoveryTimeoutSource =
- g_timeout_source_new(SERVICE_DISCOVERY_POLL_INTERVAL);
+ g_timeout_source_new(SERVICE_DISCOVERY_POLL_INTERVAL);
VMTOOLSAPP_ATTACH_SOURCE(ctx, gServiceDiscoveryTimeoutSource,
ServiceDiscoveryThread, ctx, NULL);
g_source_unref(gServiceDiscoveryTimeoutSource);
/*********************************************************
- * Copyright (C) 2020 VMware, Inc. All rights reserved.
+ * Copyright (C) 2020-2021 VMware, Inc. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
__FUNCTION__, errno);
}
} else {
- for (;;) {
- size_t readBytes;
- char buf[SERVICE_DISCOVERY_VALUE_MAX_SIZE];
- readBytes = fread(buf, 1, sizeof(buf), f);
- g_debug("%s: readBytes = %" G_GSSIZE_FORMAT "\n", __FUNCTION__,
- readBytes);
-
- if (readBytes > 0) {
- DynBuf_Append(out, buf, readBytes);
- }
-
- if (readBytes < sizeof(buf)) {
- break;
- }
- }
+ DepleteReadFromStream(f, out);
if (fclose(f) != 0) {
g_warning("%s: Failed to close file stream, errno=%d",
__FUNCTION__, errno);
}
}
-
/*
*****************************************************************************
- * PublishScriptOutputToNamespaceDB --
- *
- * Spawns child process for script, reads stdout from pipe, writes
- * generated chunks to Namespace DB.
+ * ExecuteScript --
*
- * Chunk count will be written to Namespace DB using received
- * "key" (ie. get-listening-process-info).
+ * Spawns child process for script, reads child process stdout stream and
+ * sends data to Namespace DB and/or host-side gdp daemon.
*
- * Chunks will be written to Namespace DB with keys constructed by "key-[i]"
- * template (ie. get-listening-process-info-1, get-listening-process-info-2)
- *
- * @param[in] ctx Application context.
- * @param[in] key Key used for chunk count
+ * @param[in] ctx The application context
+ * @param[in] key Script name
* @param[in] script Script to be executed
*
- * @retval TRUE Script execution and Namespace DB write over RPC succeeded.
- * @retval FALSE Either of script execution or Namespace DB write failed.
+ * @retval TRUE Successfully executed script and sent output to gdp daemon.
+ * @retval FALSE Otherwise.
*
*****************************************************************************
*/
Bool
-PublishScriptOutputToNamespaceDB(ToolsAppCtx *ctx,
- const char *key,
- const char *script)
+ExecuteScript(ToolsAppCtx *ctx,
+ const char *key,
+ const char *script)
{
- Bool status = FALSE;
- GPid pid;
+ Bool status;
gchar *command = g_strdup(script);
gchar *cmd[] = { command, NULL };
gint child_stdout = -1;
gint child_stderr = -1;
- FILE* child_stdout_f;
+ FILE *child_stdout_f;
GError *p_error = NULL;
DynBuf err;
- int i = 0;
- status = g_spawn_async_with_pipes(NULL, cmd, NULL, G_SPAWN_DEFAULT, NULL,
- NULL, &pid, NULL, &child_stdout, &child_stderr,
- &p_error);
+ status = g_spawn_async_with_pipes(NULL, // const gchar *working_directory
+ cmd, // gchar **argv
+ NULL, // gchar **envp
+ G_SPAWN_DEFAULT, // GSpawnFlags flags
+ NULL, // GSpawnChildSetupFunc child_setup
+ NULL, // gpointer user_data
+ NULL, // GPid *child_pid
+ NULL, // gint *standard_input
+ &child_stdout, // gint *standard_output
+ &child_stderr, // gint *standard_error
+ &p_error); // GError **error
if (!status) {
if (p_error != NULL) {
- g_warning("%s: Error during script exec %s\n", __FUNCTION__,
- p_error->message);
- g_error_free(p_error);
+ g_warning("%s: Error during script exec %s\n",
+ __FUNCTION__, p_error->message);
+ g_clear_error(&p_error);
} else {
g_warning("%s: Command not run\n", __FUNCTION__);
}
+
+ /*
+ * If an error occurs, child_pid, standard_input, standard_output,
+ * and standard_error will not be filled with valid values.
+ */
g_free(command);
return status;
}
g_debug("%s: Child process spawned for %s\n", __FUNCTION__, key);
+ status = FALSE;
+
child_stdout_f = fdopen(child_stdout, "r");
if (child_stdout_f == NULL) {
g_warning("%s: Failed to create file stream for child stdout, errno=%d",
- __FUNCTION__, errno);
- status = FALSE;
+ __FUNCTION__, errno);
goto out;
}
- for (;;) {
- char buf[SERVICE_DISCOVERY_VALUE_MAX_SIZE];
- size_t readBytes = fread(buf, 1, sizeof(buf), child_stdout_f);
-
- g_debug("%s: readBytes = %" G_GSSIZE_FORMAT " status = %s\n",
- __FUNCTION__, readBytes, status ? "TRUE" : "FALSE");
- // At first iteration we are sure that status is true
- if (status && readBytes > 0) {
- gchar* msg = g_strdup_printf("%s-%d", key, ++i);
- status = WriteData(ctx, msg, buf, readBytes);
- if (!status) {
- g_warning("%s: Failed to store data\n", __FUNCTION__);
- }
- g_free(msg);
- }
-
- if (readBytes < sizeof(buf)) {
- if (!status) {
- g_warning("%s: Data read finished but failed to store\n", __FUNCTION__);
- }
- break;
- }
- }
-
- if (status) {
- gchar *chunkCount = g_strdup_printf("%d", i);
- status = WriteData(ctx, key, chunkCount, strlen(chunkCount));
- if (status) {
- g_debug("%s: Written key %s chunks %s\n", __FUNCTION__, key, chunkCount);
- }
- g_free(chunkCount);
- }
+ status = SendScriptOutput(ctx, key, child_stdout_f);
DynBuf_Init(&err);
ReadFromHandle(child_stderr, &err);
child_stderr = -1;
if (DynBuf_GetSize(&err) != 0) {
DynBuf_AppendString(&err, "");
- g_debug("%s: stderr=%s\n", __FUNCTION__, (const char *) DynBuf_Get(&err));
+ g_debug("%s: stderr=%s\n",
+ __FUNCTION__, (const char *) DynBuf_Get(&err));
}
DynBuf_Destroy(&err);
+
out:
- g_free(command);
if (child_stdout_f != NULL) {
if (fclose(child_stdout_f) != 0) {
g_warning("%s: Failed to close child stdout file stream, errno=%d",
- __FUNCTION__, errno);
+ __FUNCTION__, errno);
}
} else if (close(child_stdout) != 0) {
g_warning("%s: Failed to close child stdout handle, errno=%d",
- __FUNCTION__, errno);
+ __FUNCTION__, errno);
}
if (child_stderr != -1 && close(child_stderr) != 0) {
g_warning("%s: Failed to close child process stderr handle, errno=%d",
- __FUNCTION__, errno);
+ __FUNCTION__, errno);
}
+
+ g_free(command);
return status;
}