From: John Wolfe Date: Thu, 4 Mar 2021 21:48:46 +0000 (-0800) Subject: Tools gdp plugin updates. X-Git-Tag: stable-11.3.0~121 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=65d92cd11edafe3e9ff7c96e83d824da6ecf4ae7;p=thirdparty%2Fopen-vm-tools.git Tools gdp plugin updates. --- diff --git a/open-vm-tools/configure.ac b/open-vm-tools/configure.ac index 901935f99..d08593346 100644 --- a/open-vm-tools/configure.ac +++ b/open-vm-tools/configure.ac @@ -1632,6 +1632,7 @@ AC_CONFIG_FILES([ \ lib/rpcVmx/Makefile \ lib/slashProc/Makefile \ lib/string/Makefile \ + lib/jsmn/Makefile \ lib/stubs/Makefile \ lib/syncDriver/Makefile \ lib/system/Makefile \ diff --git a/open-vm-tools/lib/Makefile.am b/open-vm-tools/lib/Makefile.am index dee38ed79..2c7661715 100644 --- a/open-vm-tools/lib/Makefile.am +++ b/open-vm-tools/lib/Makefile.am @@ -1,5 +1,5 @@ ################################################################################ -### Copyright (c) 2007-2016,2020 VMware, Inc. All rights reserved. +### Copyright (c) 2007-2016,2020-2021 VMware, Inc. All rights reserved. ### ### This program is free software; you can redistribute it and/or modify ### it under the terms of version 2 of the GNU General Public License as @@ -66,6 +66,7 @@ if USE_SLASH_PROC SUBDIRS += slashProc endif SUBDIRS += string +SUBDIRS += jsmn SUBDIRS += stubs SUBDIRS += syncDriver SUBDIRS += system diff --git a/open-vm-tools/lib/include/conf.h b/open-vm-tools/lib/include/conf.h index 1a3979940..1f9891395 100644 --- a/open-vm-tools/lib/include/conf.h +++ b/open-vm-tools/lib/include/conf.h @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2002-2020 VMware, Inc. All rights reserved. + * Copyright (C) 2002-2021 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -558,4 +558,31 @@ * END deviceHelper goodies. ****************************************************************************** */ + +/* + ****************************************************************************** + * BEGIN gdp plugin goodies. + */ + +/** + * Defines the string used for the gdp config file group. + */ +#define CONFGROUPNAME_GDP "gdp" + +/** + * Defines a custom history cache size (in bytes). + * + * @note Illegal values result in a @c g_warning and fallback to the default + * cache size 4194304. + * + * @param int User-defined cache size within [1048576, 16777216]. + * Set to 0 to disable caching. + */ +#define CONFNAME_GDP_CACHE_SIZE "cacheSize" + +/* + * END gdp goodies. + ****************************************************************************** + */ + #endif /* __CONF_H__ */ diff --git a/open-vm-tools/lib/include/jsmn.h b/open-vm-tools/lib/include/jsmn.h new file mode 100644 index 000000000..8daeb5184 --- /dev/null +++ b/open-vm-tools/lib/include/jsmn.h @@ -0,0 +1,94 @@ +/********************************************************* + * Copyright (C) 2019 VMware, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation version 2.1 and no later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the Lesser GNU General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + *********************************************************/ + +#ifndef __JSMN_H_ +#define __JSMN_H_ + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * JSON type identifier. Basic types are: + * o Object + * o Array + * o String + * o Other primitive: number, boolean (true/false) or null + */ +typedef enum { + JSMN_UNDEFINED = 0, + JSMN_OBJECT = 1, + JSMN_ARRAY = 2, + JSMN_STRING = 3, + JSMN_PRIMITIVE = 4 +} jsmntype_t; + +enum jsmnerr { + /* Not enough tokens were provided */ + JSMN_ERROR_NOMEM = -1, + /* Invalid character inside JSON string */ + JSMN_ERROR_INVAL = -2, + /* The string is not a full JSON packet, more bytes expected */ + JSMN_ERROR_PART = -3 +}; + +/** + * JSON token description. + * type type (object, array, string etc.) + * start start position in JSON data string + * end end position in JSON data string + */ +typedef struct { + jsmntype_t type; + int start; + int end; + int size; +#ifdef JSMN_PARENT_LINKS + int parent; +#endif +} jsmntok_t; + +/** + * JSON parser. Contains an array of token blocks available. Also stores + * the string being parsed now and current position in that string + */ +typedef struct { + unsigned int pos; /* offset in the JSON string */ + unsigned int toknext; /* next token to allocate */ + int toksuper; /* superior token node, e.g parent object or array */ +} jsmn_parser; + +/** + * Create JSON parser over an array of tokens + */ +void jsmn_init(jsmn_parser *parser); + +/** + * Run JSON parser. It parses a JSON data string into and array of tokens, each describing + * a single JSON object. + */ +int jsmn_parse(jsmn_parser *parser, const char *js, size_t len, + jsmntok_t *tokens, unsigned int num_tokens); + +#ifdef __cplusplus +} +#endif + +#endif /* __JSMN_H_ */ diff --git a/open-vm-tools/lib/include/vmware/tools/gdp.h b/open-vm-tools/lib/include/vmware/tools/gdp.h index 6f3996ed3..f7f5da925 100644 --- a/open-vm-tools/lib/include/vmware/tools/gdp.h +++ b/open-vm-tools/lib/include/vmware/tools/gdp.h @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2020 VMware, Inc. All rights reserved. + * Copyright (C) 2020-2021 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -33,8 +33,15 @@ * 17 * 4096 - Maximum VMCI datagram size * 24 - VMCI datagram header size */ -#define GDP_MAX_USER_DATA_LEN (17 * 4096 - 24) -#define GDP_SEND_RECV_BUF_LEN (17 * 4096) +#define GDP_MAX_PACKET_LEN (17 * 4096 - 24) + +/* + * Limit GDP packet JSON base64 key value size to (16 * 4096) bytes, then + * the rest JSON content will have (4096 - 24) bytes available. + * + * Base64 (16 * 4096) bytes are (12 * 4096) bytes before encoding. + */ +#define GDP_USER_DATA_LEN (12 * 4096) /* * Property name of the gdp plugin service in the tools @@ -48,18 +55,18 @@ #define GDP_ERR_LIST \ GDP_ERR_ITEM(GDP_ERROR_SUCCESS = 0, \ "No error") \ - GDP_ERR_ITEM(GDP_ERROR_INTERNAL, \ - "Internal system error") \ + GDP_ERR_ITEM(GDP_ERROR_INVALID_DATA, \ + "Invalid data") \ + GDP_ERR_ITEM(GDP_ERROR_DATA_SIZE, \ + "Data size too large") \ + GDP_ERR_ITEM(GDP_ERROR_GENERAL, \ + "General error") \ GDP_ERR_ITEM(GDP_ERROR_STOP, \ "Stopped for vmtoolsd shutdown") \ GDP_ERR_ITEM(GDP_ERROR_UNREACH, \ "Host daemon unreachable") \ GDP_ERR_ITEM(GDP_ERROR_TIMEOUT, \ - "Operation timed out") \ - GDP_ERR_ITEM(GDP_ERROR_SEND_SIZE, \ - "Message to host too large") \ - GDP_ERR_ITEM(GDP_ERROR_RECV_SIZE, \ - "Receive buffer too small") + "Operation timed out") /* * GdpError codes. @@ -78,11 +85,12 @@ typedef enum GdpError { * TOOLS_PLUGIN_SVC_PROP_GDP property. */ typedef struct ToolsPluginSvcGdp { - GdpError (*publish)(const char *msg, - int msgLen, - char *reply, - int *replyLen); - GdpError (*stop)(void); + GdpError (*publish)(gint64 createTime, + const gchar *topic, + const gchar *token, + const gchar *category, + const gchar *data, + guint32 dataLen); } ToolsPluginSvcGdp; @@ -96,64 +104,38 @@ typedef struct ToolsPluginSvcGdp { * pool threads started by ToolsCorePool_StartThread. Do not call the function * in ToolsOnLoad nor in/after TOOLS_CORE_SIG_SHUTDOWN handler. * - * @param[in] ctx The app context - * @param[in] msg Buffer containing guest data to publish - * @param[in] msgLen Guest data length - * @param[out,optional] reply Buffer to receive reply from gdp daemon - * @param[in,out,optional] replyLen NULL when param reply is NULL, otherwise: - * reply buffer length on input, - * reply length on output + * @param[in] ctx The application context + * @param[in] createTime UTC timestamp, in number of micro- + * seconds since January 1, 1970 UTC. + * @param[in] topic Topic + * @param[in,optional] token Token, can be NULL + * @param[in,optional] category Category, can be NULL that defaults to + * "application" + * @param[in] data Buffer containing data to publish + * @param[in] dataLen Buffer length * * @return GDP_ERROR_SUCCESS on success. - * @return Other GdpError codes otherwise. + * @return Other GdpError code otherwise. * ****************************************************************************** */ static inline GdpError -ToolsPluginSvcGdp_Publish(ToolsAppCtx *ctx, // IN - const char *msg, // IN - int msgLen, // IN - char *reply, // OUT, OPTIONAL - int *replyLen) // IN/OUT, OPTIONAL +ToolsPluginSvcGdp_Publish(ToolsAppCtx *ctx, // IN + gint64 createTime, // IN + const gchar *topic, // IN + const gchar *token, // IN, OPTIONAL + const gchar *category, // IN, OPTIONAL + const gchar *data, // IN + guint32 dataLen) // IN { ToolsPluginSvcGdp *svcGdp = NULL; g_object_get(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, &svcGdp, NULL); if (svcGdp != NULL && svcGdp->publish != NULL) { - return svcGdp->publish(msg, msgLen, reply, replyLen); - } - return GDP_ERROR_INTERNAL; -} - - -/* - ****************************************************************************** - * ToolsPluginSvcGdp_Stop -- */ /** - * - * @brief Stops guest data publishing. - * - * This function notifies the pool thread in publish call to stop and return - * immediately. It is intended to be called by the interrupt callback passed - * to ToolsCorePool_StartThread. At vmtoolsd shutdown time, main thread will - * call ToolsCorePool_Shutdown which invokes the interrupt callback. - * - * @param[in] ctx The app context - * - * @return GDP_ERROR_SUCCESS on success. - * @return GDP_ERROR_INTERNAL otherwise. - * - ****************************************************************************** - */ - -static inline GdpError -ToolsPluginSvcGdp_Stop(ToolsAppCtx *ctx) // IN -{ - ToolsPluginSvcGdp *svcGdp = NULL; - g_object_get(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, &svcGdp, NULL); - if (svcGdp != NULL && svcGdp->stop != NULL) { - return svcGdp->stop(); + return svcGdp->publish(createTime, topic, token, + category, data, dataLen); } - return GDP_ERROR_INTERNAL; + return GDP_ERROR_GENERAL; } #endif /* _VMWARE_TOOLS_GDP_H_ */ diff --git a/open-vm-tools/lib/jsmn/LICENSE b/open-vm-tools/lib/jsmn/LICENSE new file mode 100644 index 000000000..c84fb2e97 --- /dev/null +++ b/open-vm-tools/lib/jsmn/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2010 Serge A. Zaitsev + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + diff --git a/open-vm-tools/lib/jsmn/Makefile.am b/open-vm-tools/lib/jsmn/Makefile.am new file mode 100644 index 000000000..add87f626 --- /dev/null +++ b/open-vm-tools/lib/jsmn/Makefile.am @@ -0,0 +1,24 @@ +################################################################################ +### Copyright (c) 2021 VMware, Inc. All rights reserved. +### +### This program is free software; you can redistribute it and/or modify +### it under the terms of version 2 of the GNU General Public License as +### published by the Free Software Foundation. +### +### This program is distributed in the hope that it will be useful, +### but WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +### GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with this program; if not, write to the Free Software +### Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +################################################################################ + +noinst_LTLIBRARIES = libJsmn.la + +libJsmn_la_SOURCES = +libJsmn_la_SOURCES += jsmn.c + +AM_CFLAGS = +AM_CFLAGS += @GLIB2_CPPFLAGS@ diff --git a/open-vm-tools/lib/jsmn/README b/open-vm-tools/lib/jsmn/README new file mode 100644 index 000000000..22f25f759 --- /dev/null +++ b/open-vm-tools/lib/jsmn/README @@ -0,0 +1,8 @@ +This is a local copy of jsmn, a minimalistic json parser. + +jsmn included as src because its tiny, and its been augmented to use +Log() since the original code has no logging. + +https://zserge.com/jsmn.html +https://github.com/zserge/jsmn + diff --git a/open-vm-tools/lib/jsmn/jsmn.c b/open-vm-tools/lib/jsmn/jsmn.c new file mode 100644 index 000000000..e83f3b898 --- /dev/null +++ b/open-vm-tools/lib/jsmn/jsmn.c @@ -0,0 +1,367 @@ +/********************************************************* + * Copyright (C) 2019 VMware, Inc. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation version 2.1 and no later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the Lesser GNU General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + *********************************************************/ + +/* + * From https://zserge.com/jsmn.html + * + * Some local additions (commented with / * vmware * / for logging. + */ + +/* + * Turn on JSMN_PARENT to spend a bit of memory to get a lot of speed + * in large documents. + */ +#define JSMN_PARENT 1 +/* Tune on JSMN_STRICT to force strict json parsing */ +#define JSMN_STRICT 1 +#include "jsmn.h" + +#include "log.h" +#include "vm_assert.h" + +#define LOGLEVEL_MODULE jsmn +#include "loglevel_user.h" + +/** + * Allocates a fresh unused token from the token pool. + */ +static jsmntok_t *jsmn_alloc_token(jsmn_parser *parser, + jsmntok_t *tokens, size_t num_tokens) { + jsmntok_t *tok; + if (parser->toknext >= num_tokens) { + return NULL; + } + tok = &tokens[parser->toknext++]; + tok->start = tok->end = -1; + tok->size = 0; +#ifdef JSMN_PARENT_LINKS + tok->parent = -1; +#endif + return tok; +} + +/** + * Fills token type and boundaries. + */ +static void jsmn_fill_token(jsmntok_t *token, jsmntype_t type, + int start, int end) { + token->type = type; + token->start = start; + token->end = end; + token->size = 0; +} + +/** + * Fills next available token with JSON primitive. + */ +static int jsmn_parse_primitive(jsmn_parser *parser, const char *js, + size_t len, jsmntok_t *tokens, size_t num_tokens) { + jsmntok_t *token; + int start; + + start = parser->pos; + + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { + switch (js[parser->pos]) { +#ifndef JSMN_STRICT + /* In strict mode primitive must be followed by "," or "}" or "]" */ + case ':': +#endif + case '\t' : case '\r' : case '\n' : case ' ' : + case ',' : case ']' : case '}' : + goto found; + } + if (js[parser->pos] < 32 || js[parser->pos] >= 127) { + parser->pos = start; + /* vmware */ + Log("%s: Unexpected char '%c' in primitive at pos %d\n", + __FUNCTION__, js[parser->pos], parser->pos); + /* vmware */ + return JSMN_ERROR_INVAL; + } + } +#ifdef JSMN_STRICT + /* In strict mode primitive must be followed by a comma/object/array */ + parser->pos = start; + return JSMN_ERROR_PART; +#endif + +found: + if (tokens == NULL) { + parser->pos--; + return 0; + } + token = jsmn_alloc_token(parser, tokens, num_tokens); + if (token == NULL) { + parser->pos = start; + return JSMN_ERROR_NOMEM; + } + jsmn_fill_token(token, JSMN_PRIMITIVE, start, parser->pos); +#ifdef JSMN_PARENT_LINKS + token->parent = parser->toksuper; +#endif + parser->pos--; + return 0; +} + +/** + * Fills next token with JSON string. + */ +static int jsmn_parse_string(jsmn_parser *parser, const char *js, + size_t len, jsmntok_t *tokens, size_t num_tokens) { + jsmntok_t *token; + + int start = parser->pos; + + parser->pos++; + + /* Skip starting quote */ + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { + char c = js[parser->pos]; + + /* Quote: end of string */ + if (c == '\"') { + if (tokens == NULL) { + return 0; + } + token = jsmn_alloc_token(parser, tokens, num_tokens); + if (token == NULL) { + parser->pos = start; + return JSMN_ERROR_NOMEM; + } + jsmn_fill_token(token, JSMN_STRING, start+1, parser->pos); +#ifdef JSMN_PARENT_LINKS + token->parent = parser->toksuper; +#endif + return 0; + } + + /* Backslash: Quoted symbol expected */ + if (c == '\\' && parser->pos + 1 < len) { + int i; + parser->pos++; + switch (js[parser->pos]) { + /* Allowed escaped symbols */ + case '\"': case '/' : case '\\' : case 'b' : + case 'f' : case 'r' : case 'n' : case 't' : + break; + /* Allows escaped symbol \uXXXX */ + case 'u': + parser->pos++; + for(i = 0; i < 4 && parser->pos < len && js[parser->pos] != '\0'; i++) { + /* If it isn't a hex character we have an error */ + if(!((js[parser->pos] >= 48 && js[parser->pos] <= 57) || /* 0-9 */ + (js[parser->pos] >= 65 && js[parser->pos] <= 70) || /* A-F */ + (js[parser->pos] >= 97 && js[parser->pos] <= 102))) { /* a-f */ + parser->pos = start; + /* vmware */ + Log("%s: Unexpected char '%c' in escaped unicode at pos %d\n", + __FUNCTION__, js[parser->pos], parser->pos); + /* vmware */ + return JSMN_ERROR_INVAL; + } + parser->pos++; + } + parser->pos--; + break; + /* Unexpected symbol */ + default: + parser->pos = start; + /* vmware */ + Log("%s: Unexpected symbol '%c' in primitive at pos %d\n", + __FUNCTION__, js[parser->pos], parser->pos); + /* vmware */ + return JSMN_ERROR_INVAL; + } + } + } + parser->pos = start; + return JSMN_ERROR_PART; +} + +/** + * Parse JSON string and fill tokens. + */ +int jsmn_parse(jsmn_parser *parser, const char *js, size_t len, + jsmntok_t *tokens, unsigned int num_tokens) { + int r; + int i; + jsmntok_t *token; + int count = parser->toknext; + + for (; parser->pos < len && js[parser->pos] != '\0'; parser->pos++) { + char c; + jsmntype_t type; + + c = js[parser->pos]; + switch (c) { + case '{': case '[': + count++; + if (tokens == NULL) { + break; + } + token = jsmn_alloc_token(parser, tokens, num_tokens); + if (token == NULL) + return JSMN_ERROR_NOMEM; + if (parser->toksuper != -1) { + tokens[parser->toksuper].size++; +#ifdef JSMN_PARENT_LINKS + token->parent = parser->toksuper; +#endif + } + token->type = (c == '{' ? JSMN_OBJECT : JSMN_ARRAY); + token->start = parser->pos; + parser->toksuper = parser->toknext - 1; + break; + case '}': case ']': + if (tokens == NULL) + break; + type = (c == '}' ? JSMN_OBJECT : JSMN_ARRAY); +#ifdef JSMN_PARENT_LINKS + if (parser->toknext < 1) { + return JSMN_ERROR_INVAL; + } + token = &tokens[parser->toknext - 1]; + for (;;) { + if (token->start != -1 && token->end == -1) { + if (token->type != type) { + return JSMN_ERROR_INVAL; + } + token->end = parser->pos + 1; + parser->toksuper = token->parent; + break; + } + if (token->parent == -1) { + if(token->type != type || parser->toksuper == -1) { + return JSMN_ERROR_INVAL; + } + break; + } + token = &tokens[token->parent]; + } +#else + for (i = parser->toknext - 1; i >= 0; i--) { + token = &tokens[i]; + if (token->start != -1 && token->end == -1) { + if (token->type != type) { + return JSMN_ERROR_INVAL; + } + parser->toksuper = -1; + token->end = parser->pos + 1; + break; + } + } + /* Error if unmatched closing bracket */ + if (i == -1) return JSMN_ERROR_INVAL; + for (; i >= 0; i--) { + token = &tokens[i]; + if (token->start != -1 && token->end == -1) { + parser->toksuper = i; + break; + } + } +#endif + break; + case '\"': + r = jsmn_parse_string(parser, js, len, tokens, num_tokens); + if (r < 0) return r; + count++; + if (parser->toksuper != -1 && tokens != NULL) + tokens[parser->toksuper].size++; + break; + case '\t' : case '\r' : case '\n' : case ' ': + break; + case ':': + parser->toksuper = parser->toknext - 1; + break; + case ',': + if (tokens != NULL && parser->toksuper != -1 && + tokens[parser->toksuper].type != JSMN_ARRAY && + tokens[parser->toksuper].type != JSMN_OBJECT) { +#ifdef JSMN_PARENT_LINKS + parser->toksuper = tokens[parser->toksuper].parent; +#else + for (i = parser->toknext - 1; i >= 0; i--) { + if (tokens[i].type == JSMN_ARRAY || tokens[i].type == JSMN_OBJECT) { + if (tokens[i].start != -1 && tokens[i].end == -1) { + parser->toksuper = i; + break; + } + } + } +#endif + } + break; +#ifdef JSMN_STRICT + /* In strict mode primitives are: numbers and booleans */ + case '-': case '0': case '1' : case '2': case '3' : case '4': + case '5': case '6': case '7' : case '8': case '9': + case 't': case 'f': case 'n' : + /* And they must not be keys of the object */ + if (tokens != NULL && parser->toksuper != -1) { + jsmntok_t *t = &tokens[parser->toksuper]; + if (t->type == JSMN_OBJECT || + (t->type == JSMN_STRING && t->size != 0)) { + /* vmware */ + Log("%s: Unexpected char '%c' in STRING/OBJECT at pos %d\n", + __FUNCTION__, js[parser->pos], parser->pos); + /* vmware */ + return JSMN_ERROR_INVAL; + } + } +#else + /* In non-strict mode every unquoted value is a primitive */ + default: +#endif + r = jsmn_parse_primitive(parser, js, len, tokens, num_tokens); + if (r < 0) return r; + count++; + if (parser->toksuper != -1 && tokens != NULL) + tokens[parser->toksuper].size++; + break; + +#ifdef JSMN_STRICT + /* Unexpected char in strict mode */ + default: + return JSMN_ERROR_INVAL; +#endif + } + } + + if (tokens != NULL) { + for (i = parser->toknext - 1; i >= 0; i--) { + /* Unmatched opened object or array */ + if (tokens[i].start != -1 && tokens[i].end == -1) { + return JSMN_ERROR_PART; + } + } + } + + return count; +} + +/** + * Creates a new parser based over a given buffer with an array of tokens + * available. + */ +void jsmn_init(jsmn_parser *parser) { + parser->pos = 0; + parser->toknext = 0; + parser->toksuper = -1; +} + diff --git a/open-vm-tools/libvmtools/Makefile.am b/open-vm-tools/libvmtools/Makefile.am index 837c9dae2..6da973622 100644 --- a/open-vm-tools/libvmtools/Makefile.am +++ b/open-vm-tools/libvmtools/Makefile.am @@ -1,5 +1,5 @@ ################################################################################ -### Copyright (c) 2008-2020 VMware, Inc. All rights reserved. +### Copyright (c) 2008-2021 VMware, Inc. All rights reserved. ### ### This program is free software; you can redistribute it and/or modify ### it under the terms of version 2 of the GNU General Public License as @@ -48,6 +48,7 @@ libvmtools_la_LIBADD += ../lib/rpcIn/libRpcIn.la libvmtools_la_LIBADD += ../lib/rpcOut/libRpcOut.la libvmtools_la_LIBADD += ../lib/rpcVmx/libRpcVmx.la libvmtools_la_LIBADD += ../lib/string/libString.la +libvmtools_la_LIBADD += ../lib/jsmn/libJsmn.la libvmtools_la_LIBADD += ../lib/syncDriver/libSyncDriver.la libvmtools_la_LIBADD += ../lib/system/libSystem.la libvmtools_la_LIBADD += ../lib/stubs/libStubsCS.la diff --git a/open-vm-tools/services/plugins/gdp/gdpPlugin.c b/open-vm-tools/services/plugins/gdp/gdpPlugin.c index 71095ef8f..3785ae037 100644 --- a/open-vm-tools/services/plugins/gdp/gdpPlugin.c +++ b/open-vm-tools/services/plugins/gdp/gdpPlugin.c @@ -1,5 +1,5 @@ /********************************************************* - * Copyright (C) 2020 VMware, Inc. All rights reserved. + * Copyright (C) 2020-2021 VMware, Inc. All rights reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published @@ -36,17 +36,23 @@ #include #endif +#include "vm_basic_defs.h" #include "vm_assert.h" #include "vm_atomic.h" -#include "vm_basic_types.h" #define G_LOG_DOMAIN "gdp" #include "vmware/tools/log.h" -#include "vmware/tools/plugin.h" #include "vmware/tools/utils.h" +#include "vmware/tools/plugin.h" +#include "vmware/tools/threadPool.h" #include "vmware/tools/gdp.h" #include "vmci_defs.h" #include "vmci_sockets.h" #include "vmcheck.h" +#include "base64.h" +#include "util.h" +#include "str.h" +#include "jsmn.h" +#include "conf.h" #include "vm_version.h" #include "embed_version.h" @@ -55,45 +61,107 @@ VM_EMBED_VERSION(VMTOOLSD_VERSION_STRING); #if defined(_WIN32) -# define GetSockErr() WSAGetLastError() +# define GetSysErr() WSAGetLastError() -# define SYSERR_EADDRINUSE WSAEADDRINUSE -# define SYSERR_EHOSTUNREACH WSAEHOSTUNREACH -# define SYSERR_EINTR WSAEINTR -# define SYSERR_EMSGSIZE WSAEMSGSIZE -# define SYSERR_WOULDBLOCK(e) (e == WSAEWOULDBLOCK) +# define SYSERR_EADDRINUSE WSAEADDRINUSE +# define SYSERR_EHOSTUNREACH WSAEHOSTUNREACH +# define SYSERR_EINTR WSAEINTR +# define SYSERR_EMSGSIZE WSAEMSGSIZE +# define SYSERR_WOULDBLOCK(e) (e == WSAEWOULDBLOCK) typedef int socklen_t; # define CloseSocket closesocket -# define SOCK_READ FD_READ -# define SOCK_WRITE FD_WRITE + typedef WSAEVENT GdpEvent; +# define GDP_INVALID_EVENT WSA_INVALID_EVENT #else -# define GetSockErr() errno +# define GetSysErr() errno -# define SYSERR_EADDRINUSE EADDRINUSE -# define SYSERR_EHOSTUNREACH EHOSTUNREACH -# define SYSERR_EINTR EINTR -# define SYSERR_EMSGSIZE EMSGSIZE -# define SYSERR_WOULDBLOCK(e) (e == EAGAIN || e == EWOULDBLOCK) +# define SYSERR_EADDRINUSE EADDRINUSE +# define SYSERR_EHOSTUNREACH EHOSTUNREACH +# define SYSERR_EINTR EINTR +# define SYSERR_EMSGSIZE EMSGSIZE +# define SYSERR_WOULDBLOCK(e) (e == EAGAIN || e == EWOULDBLOCK) typedef int SOCKET; -# define SOCKET_ERROR (-1) -# define INVALID_SOCKET ((SOCKET) -1) -# define CloseSocket close +# define SOCKET_ERROR (-1) +# define INVALID_SOCKET ((SOCKET) -1) +# define CloseSocket close -# define SOCK_READ POLLIN -# define SOCK_WRITE POLLOUT + typedef int GdpEvent; +# define GDP_INVALID_EVENT (-1) #endif -#define PRIVILEGED_PORT_MAX 1023 -#define PRIVILEGED_PORT_MIN 1 +#define GDP_TIMEOUT_AT_INFINITE (-1) +#define GDP_WAIT_INFINITE (-1) +#define GDP_SEND_AFTER_ANY_TIME (-1) + +#define USEC_PER_SECOND (G_GINT64_CONSTANT(1000000)) +#define USEC_PER_MILLISECOND (G_GINT64_CONSTANT(1000)) + +/* + * Macros to read values from tools config + */ +#define GDP_CONFIG_GET_BOOL(key, defVal) \ + VMTools_ConfigGetBoolean(gPluginState.ctx->config, \ + CONFGROUPNAME_GDP, key, defVal) + +#define GDP_CONFIG_GET_INT(key, defVal) \ + VMTools_ConfigGetInteger(gPluginState.ctx->config, \ + CONFGROUPNAME_GDP, key, defVal) + +#define GDPD_RECV_PORT 7777 +#define GDP_RECV_PORT 766 -#define GDPD_LISTEN_PORT 7777 +/* + * The minimum rate limit is 1 pps, corresponding wait interval is 1 second. + */ +#define GDP_WAIT_RESULT_TIMEOUT 1500 // ms + +#define GDP_PACKET_JSON_LINE_SUBSCRIBERS \ + " \"subscribers\":[%s],\n" + +#define GDP_PACKET_JSON \ + "{\n" \ + " \"header\": {\n" \ + " \"sequence\":%" G_GUINT64_FORMAT ",\n" \ + "%s" \ + " \"createTime\":\"%s\",\n" \ + " \"topic\":\"%s\",\n" \ + " \"token\":\"%s\"\n" \ + " },\n" \ + " \"payload\":{\n" \ + " \"category\":\"%s\",\n" \ + " \"base64\":\"%s\"\n" \ + " }\n" \ + "}" + +#define GDP_RESULT_SEQUENCE "sequence" // Required, e.g. 12345 +#define GDP_RESULT_STATUS "status" // Required, "ok", or "bad" for + // malformed and rejected packet +#define GDP_RESULT_DIAGNOSIS "diagnosis" // Optional +#define GDP_RESULT_RATE_LIMIT "rateLimit" // Required, + // e.g. 2 packets per second + +#define GDP_RESULT_STATUS_OK "ok" +#define GDP_RESULT_STATUS_BAD "bad" + +#define GDP_RESULT_REQUIRED_KEYS 3 + +#define GDP_HISTORY_REQUEST_PAST_SECONDS "pastSeconds" // +#define GDP_HISTORY_REQUEST_ID "id" // + // subscription ID + +#define GDP_HISTORY_REQUEST_REQUIRED_KEYS 2 -#define GDP_SEND_TIMEOUT 1000 // ms -#define GDP_RECV_TIMEOUT 3000 // ms +/* + * Historical guest data cache size limit + */ +#define GDP_MAX_CACHE_SIZE_LIMIT (1 << 24) // 16M +#define GDP_DEFAULT_CACHE_SIZE_LIMIT (1 << 22) // 4M +#define GDP_MIN_CACHE_SIZE_LIMIT (1 << 20) // 1M +#define GDP_STR_SIZE(s) ((guint32) (s != NULL ? (strlen(s) + 1) : 0)) /* * GdpError message table. @@ -105,413 +173,2114 @@ GDP_ERR_LIST #undef GDP_ERR_ITEM -typedef struct PluginData { - ToolsAppCtx *ctx; /* The application context */ +typedef struct PluginState { + ToolsAppCtx *ctx; /* Tools application context */ + + Atomic_Bool started; /* TRUE : Guest data publishing is started + FALSE: otherwise + Transitions from FALSE to TRUE only */ + #if defined(_WIN32) Bool wsaStarted; /* TRUE : WSAStartup succeeded, WSACleanup required FALSE: otherwise */ - WSAEVENT eventSendRecv; /* The send-recv event object: - Event object to associate with - network send/recv ready event */ - WSAEVENT eventStop; /* The stop event object: - Event object signalled to stop guest data - publishing for vmtoolsd shutdown */ -#else - int eventStop; /* The stop event fd: - Event fd signalled to stop guest data - publishing for vmtoolsd shutdown */ #endif int vmciFd; /* vSocket address family value fd */ int vmciFamily; /* vSocket address family value */ SOCKET sock; /* Datagram socket for publishing guest data */ +#if defined(_WIN32) + GdpEvent eventNetwork; /* The network event object: + To be associated with network + send/recv ready event */ +#endif + + GdpEvent eventStop; /* The stop event object: + Signalled to stop guest data publishing */ Atomic_Bool stopped; /* TRUE : Guest data publishing is stopped - for vmtoolsd shutdown - FALSE: otherwise */ -} PluginData; + FALSE: otherwise + Transitions from FALSE to TRUE only */ -static PluginData pluginData; + GdpEvent eventConfig; /* The config event object: + Signalled to update config */ +} PluginState; +static PluginState gPluginState; -static Bool GdpInit(ToolsAppCtx *ctx); -static void GdpDestroy(void); -static void GdpSetStopEvent(void); -static Bool GdpCreateSocket(void); -static void GdpCloseSocket(void); -static GdpError GdpEmptyRecvQueue(void); -static GdpError GdpWaitForEvent(int netEvent, int timeout); -static GdpError GdpSend(const char *buf, int len, int timeout); -static GdpError GdpRecv(char *buf, int *len, int timeout); +typedef struct PublishState { + GMutex mutex; /* To sync incoming publish calls */ -/* - ****************************************************************************** - * GdpInit -- - * - * Initializes internal plugin data. - * - * @param[in] ctx The application context - * - * @return TRUE on success. - * @return FALSE otherwise. - * - ****************************************************************************** - */ + /* + * Data passed from the active incoming publish thread to + * the gdp task thread. + */ + gint64 createTime; /* Real wall-clock time, + in microseconds since January 1, 1970 UTC. */ + const gchar *topic; + const gchar *token; + const gchar *category; + const gchar *data; + guint32 dataLen; + + /* + * The publish event object: + * The active incoming publish thread signals this event object + * to notify the gdp task thread to publish new data. + */ + GdpEvent eventPublish; + + /* + * The get-result event object: + * The gdp task thread signals this event object to notify + * the active incoming publish thread to get publish result. + */ + GdpEvent eventGetResult; + + GdpError gdpErr; /* The publish result */ + +} PublishState; + +static PublishState gPublishState; + + +typedef enum GdpTaskEvent { + GDP_TASK_EVENT_NONE, /* No event */ + GDP_TASK_EVENT_STOP, /* Stop event */ + GDP_TASK_EVENT_CONFIG, /* Config event */ + GDP_TASK_EVENT_NETWORK, /* Network event */ + GDP_TASK_EVENT_PUBLISH, /* Publish event */ + GDP_TASK_EVENT_TIMEOUT, /* Wait timed out */ +} GdpTaskEvent; + +typedef enum GdpTaskMode { + GDP_TASK_MODE_NONE, /* Not publishing + valid with GDP_TASK_STATE_IDLE only */ + GDP_TASK_MODE_PUBLISH, /* Publishing new data */ + GDP_TASK_MODE_HISTORY, /* Publishing historical data */ +} GdpTaskMode; + +typedef enum GdpTaskState { + GDP_TASK_STATE_IDLE, /* Not started + valid with GDP_TASK_MODE_NONE only */ + GDP_TASK_STATE_WAIT_TO_SEND, /* Wait to send JSON packet */ + GDP_TASK_STATE_WAIT_FOR_RESULT1, /* Wait for publish result from daemon */ + GDP_TASK_STATE_WAIT_FOR_RESULT2, /* Wait for publish result after re-send */ +} GdpTaskState; + +typedef struct PublishResult { + guint64 sequence; /* Result for the packet with this sequence number */ + Bool statusOk; /* TRUE: ok, FALSE: bad */ + gchar *diagnosis; /* Diagnosis message if statusOk is FALSE */ + gint32 rateLimit; /* VMCI peer rate limit */ +} PublishResult; + +typedef struct HistoryRequest { + gint64 beginCacheTime; /* Begin cacheTime */ + gint64 endCacheTime; /* End cacheTime */ + gchar id[24]; /* Incremental uint64 subscription ID in string + format. + 2^64 - 1: 18446744073709551615 (20 digits) */ +} HistoryRequest; + +typedef struct HistoryCacheItem { + gint64 createTime; /* Guest data - begin */ + gchar *topic; + gchar *token; + gchar *category; + gchar *data; + guint32 dataLen; /* Guest data - end */ + gint64 cacheTime; /* Monotonic time point when item is cached */ + guint32 itemSize; /* Item size in bytes */ +} HistoryCacheItem; + +typedef struct HistoryCache { + GQueue queue; /* Container for HistoryCacheItem */ + guint32 sizeLimit; /* Cache size limit */ + guint32 size; /* Current cache size */ + GList *currentLink; /* Pointer to the history cache queue link + currently being published */ +} HistoryCache; + +typedef struct TaskContext { + /* + * Legal mode & state combination + * + * GDP_TASK_MODE_NONE : GDP_TASK_STATE_IDLE + * (Idle state will never time out) + * + * GDP_TASK_MODE_PUBLISH: GDP_TASK_STATE_WAIT_TO_SEND + * GDP_TASK_STATE_WAIT_FOR_RESULT1 + * GDP_TASK_STATE_WAIT_FOR_RESULT2 + * (All the above wait states will time out) + * + * GDP_TASK_MODE_HISTORY: GDP_TASK_STATE_WAIT_TO_SEND + * GDP_TASK_STATE_WAIT_FOR_RESULT1 + * GDP_TASK_STATE_WAIT_FOR_RESULT2 + * (All the above wait states will time out) + */ + GdpTaskMode mode; + GdpTaskState state; + + /* + * Publish event object signalled while mode is GDP_TASK_MODE_HISTORY. + */ + Bool publishPending; + /* + * History request can be received at any time, + * non-empty requests queue means history request pending. + */ + + HistoryCache cache; /* Container for history cache items */ + GQueue requests; /* Container for HistoryRequest */ + + guint64 sequence; /* Sequence number */ + gchar *packet; /* Formatted JSON packet */ + guint32 packetLen; /* JSON packet length */ + + gint64 timeoutAt; /* Time out at this monotonic time point, + in microseconds. */ + gint64 sendAfter; /* Send to daemon after this monotonic time point, + in microseconds. */ +} TaskContext; + + +static inline GdpEvent +GdpCreateEvent(void); + +static inline void +GdpCloseEvent(GdpEvent *event); // IN/OUT + +static inline void +GdpSetEvent(GdpEvent event); // IN + +static inline void +GdpResetEvent(GdpEvent event); // IN + +static GdpError +GdpWaitForEvent(GdpEvent event, // IN + int timeout); // IN static Bool -GdpInit(ToolsAppCtx *ctx) // IN -{ - Bool retVal = FALSE; +GdpCreateSocket(void); - pluginData.ctx = ctx; -#if defined(_WIN32) - pluginData.wsaStarted = FALSE; - pluginData.eventSendRecv = WSA_INVALID_EVENT; - pluginData.eventStop = WSA_INVALID_EVENT; -#else - pluginData.eventStop = -1; -#endif - pluginData.vmciFd = -1; - pluginData.vmciFamily = -1; - pluginData.sock = INVALID_SOCKET; - Atomic_WriteBool(&pluginData.stopped, FALSE); +static void +GdpCloseSocket(void); -#if defined(_WIN32) - { - WSADATA wsaData; - int res = WSAStartup(MAKEWORD(2, 2), &wsaData); - if (res != 0) { - g_critical("%s: WSAStartup failed: error=%d.\n", - __FUNCTION__, res); - return FALSE; - } +static GdpError +GdpSendTo(SOCKET sock, // IN + const char *buf, // IN + int bufLen, // IN + const struct sockaddr_vm *destAddr); // IN - pluginData.wsaStarted = TRUE; - } +static GdpError +GdpRecvFrom(SOCKET sock, // IN + char *buf, // OUT + int *bufLen, // IN/OUT + struct sockaddr_vm *srcAddr); // OUT - pluginData.eventSendRecv = WSACreateEvent(); - if (pluginData.eventSendRecv == WSA_INVALID_EVENT) { - g_critical("%s: WSACreateEvent for send/recv failed: error=%d.\n", - __FUNCTION__, WSAGetLastError()); - goto exit; - } +static inline void +GdpHistoryRequestFree(HistoryRequest *request); // IN - pluginData.eventStop = WSACreateEvent(); - if (pluginData.eventStop == WSA_INVALID_EVENT) { - g_critical("%s: WSACreateEvent for stop failed: error=%d.\n", - __FUNCTION__, WSAGetLastError()); - goto exit; - } -#else - pluginData.eventStop = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - if (pluginData.eventStop == -1) { - g_critical("%s: eventfd for stop failed: error=%d.\n", - __FUNCTION__, errno); - goto exit; - } -#endif +static void +GdpHistoryRequestFreeGFunc(gpointer item_data, // IN + gpointer user_data); // IN - pluginData.vmciFamily = VMCISock_GetAFValueFd(&pluginData.vmciFd); - if (pluginData.vmciFamily == -1) { - g_critical("%s: Failed to get vSocket address family value.\n", - __FUNCTION__); - goto exit; - } +static inline void +GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx); // IN/OUT - retVal = TRUE; +static guint32 +GdpGetHistoryCacheSizeLimit(); // IN -exit: - if (!retVal) { - GdpDestroy(); - } +static inline Bool +GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx); // IN - return retVal; -} +static void +GdpHistoryCacheItemFree(HistoryCacheItem *item); // IN +static void +GdpHistoryCacheItemFreeGFunc(gpointer item_data, // IN + gpointer user_data); // IN -/* - ****************************************************************************** - * GdpDestroy -- - * - * Destroys internal plugin data. - * - ****************************************************************************** - */ +static inline void +GdpTaskClearHistoryCacheQueue(TaskContext *taskCtx); // IN/OUT static void -GdpDestroy(void) -{ - GdpCloseSocket(); +GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx); // IN/OUT - if (pluginData.vmciFd != -1) { - VMCISock_ReleaseAFValueFd(pluginData.vmciFd); - pluginData.vmciFd = -1; - } +static void +GdpTaskHistoryCachePushItem(TaskContext *taskCtx, // IN/OUT + gint64 createTime, // IN + const gchar *topic, // IN + const gchar *token, // IN + const gchar *category, // IN + const gchar *data, // IN + guint32 dataLen); // IN -#if defined(_WIN32) - if (pluginData.eventStop != WSA_INVALID_EVENT) { - WSACloseEvent(pluginData.eventStop); - pluginData.eventStop = WSA_INVALID_EVENT; - } +static gchar * +GdpGetFormattedUtcTime(gint64 utcTime); // IN - if (pluginData.eventSendRecv != WSA_INVALID_EVENT) { - WSACloseEvent(pluginData.eventSendRecv); - pluginData.eventSendRecv = WSA_INVALID_EVENT; - } +static GdpError +GdpTaskBuildPacket(TaskContext *taskCtx, // IN/OUT + gint64 createTime, // IN + const gchar *topic, // IN + const gchar *token, // IN, OPTIONAL + const gchar *category, // IN, OPTIONAL + const gchar *data, // IN + guint32 dataLen, // IN + const gchar *subscribers); // IN, OPTIONAL - if (pluginData.wsaStarted) { - WSACleanup(); - pluginData.wsaStarted = FALSE; - } -#else - if (pluginData.eventStop != -1) { - close(pluginData.eventStop); - pluginData.eventStop = -1; - } -#endif +static inline void +GdpTaskDestroyPacket(TaskContext *taskCtx); // IN/OUT - pluginData.ctx = NULL; -} +static inline Bool +GdpTaskOkToSend(TaskContext *taskCtx); // IN + +static GdpError +GdpTaskSendPacket(TaskContext *taskCtx); // IN/OUT + +static gchar * +GdpTaskUpdateHistoryCachePointerAndGetSubscribers( + TaskContext *taskCtx); // IN/OUT + +static void +GdpTaskPublishHistory(TaskContext *taskCtx); // IN/OUT + +static void +GdpTaskProcessConfigChange(TaskContext *taskCtx); // IN/OUT + +static Bool +GdpJsonIsTokenOfKey(const char *json, // IN + jsmntok_t *token, // IN + const char *key); // IN + +static Bool +GdpJsonIsPublishResult(const char *json, // IN + jsmntok_t *tokens, // IN + int count, // IN + PublishResult *result); // OUT + +static void +GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT + PublishResult *result); // IN + +static Bool +GdpJsonIsHistoryRequest(const char *json, // IN + jsmntok_t *tokens, // IN + int count, // IN + HistoryRequest *request); // OUT + +static void +GdpTaskPushHistoryRequest(TaskContext *taskCtx, // IN/OUT + HistoryRequest *request); // IN + +static void +GdpTaskProcessNetwork(TaskContext *taskCtx); // IN/OUT + +static void +GdpTaskProcessPublish(TaskContext *taskCtx); // IN/OUT + +static void +GdpTaskProcessTimeout(TaskContext *taskCtx); // IN/OUT + +static int +GdpTaskGetTimeout(TaskContext *taskCtx); // IN + +static void +GdpTaskProcessEvents(TaskContext *taskCtx, // IN/OUT + GdpTaskEvent taskEvent); // IN + +static GdpError +GdpTaskWaitForEvents(int timeout, // IN + GdpTaskEvent *taskEvent); // OUT + +static void +GdpTaskCtxInit(TaskContext *taskCtx); // OUT + +static void +GdpTaskCtxDestroy(TaskContext *taskCtx); // IN/OUT + +static void +GdpThreadTask(ToolsAppCtx *ctx, // IN + void *data); // IN + +static void +GdpThreadInterrupt(ToolsAppCtx *ctx, // IN + void *data); // IN + +static void +GdpInit(ToolsAppCtx *ctx); // IN + +static Bool +GdpStart(void); + +static void +GdpDestroy(void); /* ****************************************************************************** - * GdpSetStopEvent -- + * GdpCreateEvent -- + * + * Creates a new event object/fd. * - * Signals the stop event object/fd. + * @return The new event object handle or fd on success. + * @return GDP_INVALID_EVENT otherwise. * ****************************************************************************** */ -static void -GdpSetStopEvent(void) +static inline GdpEvent +GdpCreateEvent(void) { #if defined(_WIN32) - ASSERT(pluginData.eventStop != WSA_INVALID_EVENT); - WSASetEvent(pluginData.eventStop); + return WSACreateEvent(); #else - eventfd_t val = 1; - ASSERT(pluginData.eventStop != -1); - eventfd_write(pluginData.eventStop, val); + return eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); #endif } /* ****************************************************************************** - * GdpCreateSocket -- - * - * Creates a non-blocking datagram socket for guest data publishing. + * GdpCloseEvent -- * - * The socket is bound to a local privileged port with default remote address - * set to host side gdp daemon endpoint. + * Closes the event object/fd. * - * @return TRUE on success. - * @return FALSE otherwise. + * @param[in,out] event The event object/fd pointer * ****************************************************************************** */ -static Bool -GdpCreateSocket(void) +static inline void +GdpCloseEvent(GdpEvent *event) // IN/OUT { - struct sockaddr_vm localAddr; - struct sockaddr_vm remoteAddr; - int sockErr; - Bool retVal = FALSE; -#if defined(_WIN32) - u_long nbMode = 1; // Non-blocking mode -# define SOCKET_TYPE_PARAM SOCK_DGRAM -#else - /* - * Requires Linux kernel version >= 2.6.27. - */ -# define SOCKET_TYPE_PARAM SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC -#endif - - ASSERT(pluginData.sock == INVALID_SOCKET); - - pluginData.sock = socket(pluginData.vmciFamily, SOCKET_TYPE_PARAM, 0); -#undef SOCKET_TYPE_PARAM - - if (pluginData.sock == INVALID_SOCKET) { - g_warning("%s: socket failed: error=%d.\n", __FUNCTION__, GetSockErr()); - return FALSE; - } + ASSERT(event != NULL); + if (*event != GDP_INVALID_EVENT) { #if defined(_WIN32) - /* - * Set socket to nonblocking mode. - * Note: WSAEventSelect automatically does this if not done. - */ - if (ioctlsocket(pluginData.sock, FIONBIO, &nbMode) != 0) { - sockErr = GetSockErr(); - g_warning("%s: ioctlsocket failed: error=%d.\n", __FUNCTION__, sockErr); - goto exit; - } -#endif - - memset(&localAddr, 0, sizeof localAddr); - localAddr.svm_family = pluginData.vmciFamily; - localAddr.svm_cid = VMCISock_GetLocalCID(); - localAddr.svm_port = PRIVILEGED_PORT_MAX; // No htons - - do { - if (bind(pluginData.sock, (struct sockaddr *)&localAddr, - (socklen_t) sizeof localAddr) == 0) { - sockErr = 0; - break; + if (!WSACloseEvent(*event)) { + g_warning("%s: WSACloseEvent failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); } - - sockErr = GetSockErr(); - if (sockErr == SYSERR_EADDRINUSE) { - g_info("%s: Local port %d is in use, retry the next one.\n", - __FUNCTION__, localAddr.svm_port); - localAddr.svm_port--; - } else { - g_warning("%s: bind failed: error=%d.\n", __FUNCTION__, sockErr); - goto exit; +#else + if (close(*event) != 0) { + g_warning("%s: close failed: error=%d.\n", + __FUNCTION__, errno); } - } while (localAddr.svm_port >= PRIVILEGED_PORT_MIN); - - if (sockErr != 0) { - goto exit; - } - - memset(&remoteAddr, 0, sizeof remoteAddr); - remoteAddr.svm_family = pluginData.vmciFamily; - remoteAddr.svm_cid = VMCI_HOST_CONTEXT_ID; - remoteAddr.svm_port = GDPD_LISTEN_PORT; // No htons - - /* - * Set default remote address to send to/recv from datagrams. - */ - if (connect(pluginData.sock, (struct sockaddr *)&remoteAddr, - (socklen_t) sizeof remoteAddr) != 0) { - sockErr = GetSockErr(); - g_warning("%s: connect failed: error=%d.\n", __FUNCTION__, sockErr); - goto exit; +#endif + *event = GDP_INVALID_EVENT; } +} - g_debug("%s: Socket created and bound to local port %d.\n", - __FUNCTION__, localAddr.svm_port); - retVal = TRUE; +/* + ****************************************************************************** + * GdpSetEvent -- + * + * Signals the event object/fd. + * + * @param[in] event The event object/fd + * + ****************************************************************************** + */ -exit: - if (!retVal) { - CloseSocket(pluginData.sock); - pluginData.sock = INVALID_SOCKET; +static inline void +GdpSetEvent(GdpEvent event) // IN +{ +#if defined(_WIN32) + ASSERT(event != GDP_INVALID_EVENT); + if (!WSASetEvent(event)) { + g_warning("%s: WSASetEvent failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); } - - return retVal; +#else + eventfd_t val = 1; + ASSERT(event != GDP_INVALID_EVENT); + if (eventfd_write(event, val) != 0) { + g_warning("%s: eventfd_write failed: error=%d.\n", __FUNCTION__, errno); + } +#endif } /* ****************************************************************************** - * GdpCloseSocket -- + * GdpResetEvent -- + * + * Resets the event object/fd. * - * Closes the guest data publishing socket. + * @param[in] event The event object/fd * ****************************************************************************** */ -static void -GdpCloseSocket(void) +static inline void +GdpResetEvent(GdpEvent event) // IN { - if (pluginData.sock != INVALID_SOCKET) { - g_debug("%s: Closing socket.\n", __FUNCTION__); - if (CloseSocket(pluginData.sock) != 0) { - g_warning("%s: CloseSocket failed: fd=%d, error=%d.\n", - __FUNCTION__, pluginData.sock, GetSockErr()); - } - - pluginData.sock = INVALID_SOCKET; +#if defined(_WIN32) + ASSERT(event != GDP_INVALID_EVENT); + if (!WSAResetEvent(event)) { + g_warning("%s: WSAResetEvent failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); + } +#else + eventfd_t val; + ASSERT(event != GDP_INVALID_EVENT); + if (eventfd_read(event, &val) != 0) { + g_warning("%s: eventfd_read failed: error=%d.\n", __FUNCTION__, errno); } +#endif } /* ****************************************************************************** - * GdpEmptyRecvQueue -- + * GdpWaitForEvent -- * - * Empties receive queue before publishing new data to host side gdp daemon. - * This is required in case previous receive from daemon timed out and daemon - * did reply later. + * Waits for the event object/fd or timeout. * - * @return GDP_ERROR_SUCCESS on success. - * @return Other GdpError codes otherwise. + * @param[in] event The event object/fd + * @param[in] timeout Timeout value in milliseconds, + * negative value means an infinite timeout, + * zero means no wait. + * + * @return GDP_ERROR_SUCCESS on event signalled. + * @return GDP_ERROR_TIMEOUT on timeout. + * @return Other GdpError code otherwise. * ****************************************************************************** */ static GdpError -GdpEmptyRecvQueue(void) +GdpWaitForEvent(GdpEvent event, // IN + int timeout) // IN { +#if defined(_WIN32) + DWORD localTimeout; + gint64 startTime; GdpError retVal; - ASSERT(pluginData.sock != INVALID_SOCKET); + localTimeout = (DWORD)(timeout >= 0 ? timeout : WSA_INFINITE); + if (timeout > 0) { + startTime = g_get_monotonic_time(); + } else { + startTime = 0; // Deals with [-Werror=maybe-uninitialized] + } + + while (TRUE) { + WSAEVENT eventObjects[] = { event }; + DWORD waitRes; + + waitRes = WSAWaitForMultipleEvents((DWORD) ARRAYSIZE(eventObjects), + eventObjects, + FALSE, localTimeout, TRUE); + if (waitRes == WSA_WAIT_EVENT_0) { + retVal = GDP_ERROR_SUCCESS; + break; + } else if (waitRes == WSA_WAIT_IO_COMPLETION) { + gint64 curTime; + gint64 passedTime; + + if (localTimeout == 0 || + localTimeout == WSA_INFINITE) { + continue; + } + + curTime = g_get_monotonic_time(); + passedTime = (curTime - startTime) / USEC_PER_MILLISECOND; + if (passedTime >= localTimeout) { + retVal = GDP_ERROR_TIMEOUT; + break; + } + + startTime = curTime; + localTimeout -= (DWORD) passedTime; + continue; + } else if (waitRes == WSA_WAIT_TIMEOUT) { + retVal = GDP_ERROR_TIMEOUT; + break; + } else { // WSA_WAIT_FAILED + g_warning("%s: WSAWaitForMultipleEvents failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); + retVal = GDP_ERROR_GENERAL; + break; + } + } + + return retVal; + +#else + + gint64 startTime; + GdpError retVal; + + if (timeout > 0) { + startTime = g_get_monotonic_time(); + } else { + startTime = 0; // Deals with [-Werror=maybe-uninitialized] + } + + while (TRUE) { + struct pollfd fds[1]; + int res; + + fds[0].fd = event; + fds[0].events = POLLIN; + fds[0].revents = 0; + + res = poll(fds, ARRAYSIZE(fds), timeout); + if (res > 0) { + if (fds[0].revents & POLLIN) { + retVal = GDP_ERROR_SUCCESS; + } else { // Not expected + g_warning("%s: Unexpected event.\n", __FUNCTION__); + retVal = GDP_ERROR_GENERAL; + } + + break; + } else if (res == -1) { + int err = errno; + if (err == EINTR) { + gint64 curTime; + gint64 passedTime; + + if (timeout <= 0) { + continue; + } + + curTime = g_get_monotonic_time(); + passedTime = (curTime - startTime) / USEC_PER_MILLISECOND; + if (passedTime >= timeout) { + retVal = GDP_ERROR_TIMEOUT; + break; + } + + startTime = curTime; + timeout -= (int) passedTime; + continue; + } else { + g_warning("%s: poll failed: error=%d.\n", __FUNCTION__, err); + retVal = GDP_ERROR_GENERAL; + break; + } + } else if (res == 0) { + retVal = GDP_ERROR_TIMEOUT; + break; + } else { + g_warning("%s: Unexpected poll return: %d.\n", __FUNCTION__, res); + retVal = GDP_ERROR_GENERAL; + break; + } + } + + return retVal; +#endif +} + + +/* + ****************************************************************************** + * GdpCreateSocket -- + * + * Creates a non-blocking datagram socket for guest data publishing. + * + * The socket is bound to the gdp guest receive port. + * + * @return TRUE on success. + * @return FALSE otherwise. + * + ****************************************************************************** + */ + +static Bool +GdpCreateSocket(void) +{ + Bool retVal; + struct sockaddr_vm localAddr; +#if defined(_WIN32) + u_long nbMode = 1; // Non-blocking mode +# define SOCKET_TYPE_PARAM SOCK_DGRAM +#else + /* + * Requires Linux kernel version >= 2.6.27. + */ +# define SOCKET_TYPE_PARAM SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC +#endif + + ASSERT(gPluginState.sock == INVALID_SOCKET); + ASSERT(gPluginState.vmciFamily != -1); + + gPluginState.sock = socket(gPluginState.vmciFamily, SOCKET_TYPE_PARAM, 0); +#undef SOCKET_TYPE_PARAM + + if (gPluginState.sock == INVALID_SOCKET) { + g_critical("%s: socket failed: error=%d.\n", __FUNCTION__, GetSysErr()); + return FALSE; + } + + retVal = FALSE; + +#if defined(_WIN32) + /* + * Sets socket to nonblocking mode. + * Note: WSAEventSelect automatically does this if not done. + */ + if (ioctlsocket(gPluginState.sock, FIONBIO, &nbMode) != 0) { + g_critical("%s: ioctlsocket failed: error=%d.\n", + __FUNCTION__, GetSysErr()); + goto exit; + } +#endif + + memset(&localAddr, 0, sizeof localAddr); + localAddr.svm_family = gPluginState.vmciFamily; + localAddr.svm_cid = VMCISock_GetLocalCID(); + localAddr.svm_port = GDP_RECV_PORT; // No htons + + if (bind(gPluginState.sock, (struct sockaddr *) &localAddr, + (socklen_t) sizeof localAddr) != 0) { + g_critical("%s: bind failed: error=%d.\n", + __FUNCTION__, GetSysErr()); + goto exit; + } + + g_debug("%s: Socket created and bound to local port %d.\n", + __FUNCTION__, localAddr.svm_port); + + retVal = TRUE; + +exit: + if (!retVal) { + CloseSocket(gPluginState.sock); + gPluginState.sock = INVALID_SOCKET; + } + + return retVal; +} + + +/* + ****************************************************************************** + * GdpCloseSocket -- + * + * Closes the guest data publishing datagram socket. + * + ****************************************************************************** + */ + +static void +GdpCloseSocket(void) +{ + if (gPluginState.sock != INVALID_SOCKET) { + g_debug("%s: Closing socket.\n", __FUNCTION__); + if (CloseSocket(gPluginState.sock) != 0) { + g_warning("%s: CloseSocket failed: fd=%d, error=%d.\n", + __FUNCTION__, gPluginState.sock, GetSysErr()); + } + + gPluginState.sock = INVALID_SOCKET; + } +} + + +/* + ****************************************************************************** + * GdpSendTo -- + * + * Wrapper of sendto() for VMCI datagram socket. + * + * Datagram send is not buffered, it will not return EWOULDBLOCK. + * If host daemon is not running, error EHOSTUNREACH is returned. + * + * @param[in] sock VMCI datagram socket descriptor + * @param[in] buf Data buffer pointer + * @param[in] bufLen Data length + * @param[in] destAddr Destination VMCI datagram socket address + * + * @return GDP_ERROR_SUCCESS on success. + * @return Other GdpError code otherwise. + * + ****************************************************************************** + */ + +static GdpError +GdpSendTo(SOCKET sock, // IN + const char *buf, // IN + int bufLen, // IN + const struct sockaddr_vm *destAddr) // IN +{ + GdpError retVal; + + ASSERT(sock != INVALID_SOCKET); + ASSERT(buf != NULL && bufLen > 0); + ASSERT(destAddr != NULL); do { long res; - int sockErr; - char buf[1]; // OK to truncate, case of SYSERR_EMSGSIZE. + int err; + socklen_t destAddrLen = (socklen_t) sizeof *destAddr; - /* - * Windows: recv returns -1, first with SYSERR_EMSGSIZE, - * then SYSERR_WOULDBLOCK. - * Linux : recv returns 1 first, then -1 with SYSERR_WOULDBLOCK. - */ - res = recv(pluginData.sock, buf, (int) sizeof buf, 0); - if (res >= 0) { - g_debug("%s: recv returns %d.\n", __FUNCTION__, (int)res); - continue; + res = sendto(sock, buf, bufLen, 0, + (const struct sockaddr *) destAddr, destAddrLen); + if (res == bufLen) { + retVal = GDP_ERROR_SUCCESS; + break; + } else if (res != SOCKET_ERROR) { // No partial send shall happen + g_warning("%s: sendto returned unexpected value %d.\n", + __FUNCTION__, (int) res); + retVal = GDP_ERROR_GENERAL; + break; } - sockErr = GetSockErr(); - if (sockErr == SYSERR_EINTR) { + err = GetSysErr(); + if (err == SYSERR_EINTR) { continue; - } else if (sockErr == SYSERR_EMSGSIZE) { - g_debug("%s: recv truncated.\n", __FUNCTION__); - continue; - } else if (SYSERR_WOULDBLOCK(sockErr)) { - retVal = GDP_ERROR_SUCCESS; // No more message in the recv queue. + } else if (err == SYSERR_EHOSTUNREACH) { + g_info("%s: sendto failed: host daemon unreachable.\n", __FUNCTION__); + retVal = GDP_ERROR_UNREACH; + } else if (err == SYSERR_EMSGSIZE) { + g_warning("%s: sendto failed: message too large.\n", __FUNCTION__); + retVal = GDP_ERROR_DATA_SIZE; } else { - /* - * Note: recv does not return SYSERR_EHOSTUNREACH. - */ - g_info("%s: recv failed: error=%d.\n", __FUNCTION__, sockErr); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: sendto failed: error=%d.\n", __FUNCTION__, err); + retVal = GDP_ERROR_GENERAL; + } + + break; + + } while (TRUE); + + return retVal; +} + + +/* + ****************************************************************************** + * GdpRecvFrom -- + * + * Wrapper of recvfrom() for VMCI datagram socket. + * + * @param[in] sock VMCI datagram socket descriptor + * @param[out] buf Buffer pointer + * @param[in,out] bufLen Buffer length on input, + * received data length on output + * @param[out] srcAddr Source VMCI datagram socket address + * + * @return GDP_ERROR_SUCCESS on success. + * @return Other GdpError code otherwise. + * + ****************************************************************************** + */ + +static GdpError +GdpRecvFrom(SOCKET sock, // IN + char *buf, // OUT + int *bufLen, // IN/OUT + struct sockaddr_vm *srcAddr) // OUT +{ + GdpError retVal; + + ASSERT(sock != INVALID_SOCKET); + ASSERT(buf != NULL && bufLen != NULL && *bufLen > 0); + ASSERT(srcAddr != NULL); + + do { + long res; + int err; + socklen_t srcAddrLen = (socklen_t) sizeof *srcAddr; + + res = recvfrom(sock, buf, *bufLen, 0, + (struct sockaddr *) srcAddr, &srcAddrLen); + if (res >= 0) { + *bufLen = (int) res; + retVal = GDP_ERROR_SUCCESS; + break; + } + + ASSERT(res == SOCKET_ERROR); + err = GetSysErr(); + if (err == SYSERR_EINTR) { + continue; + } else if (err == SYSERR_EMSGSIZE) { + g_warning("%s: recvfrom failed: buffer size too small.\n", + __FUNCTION__); + retVal = GDP_ERROR_DATA_SIZE; + } else { // Including EWOULDBLOCK + g_warning("%s: recvfrom failed: error=%d.\n", __FUNCTION__, err); + retVal = GDP_ERROR_GENERAL; } - break; + break; + + } while (TRUE); + + return retVal; +} + + +/* + ***************************************************************************** + * GdpHistoryRequestFree -- + * + * Frees history request resources. + * + * @param[in] request History request pointer + * + ***************************************************************************** + */ + +static inline void +GdpHistoryRequestFree(HistoryRequest *request) // IN +{ + g_debug("%s: Entering ...\n", __FUNCTION__); + free(request); +} + + +/* + ***************************************************************************** + * GdpHistoryRequestFreeGFunc -- + * + * GFunc called by GdpTaskClearHistoryRequestQueue. + * + * @param[in] item_data History request pointer + * @param[in] user_data Not used + * + ***************************************************************************** + */ + +static void +GdpHistoryRequestFreeGFunc(gpointer item_data, // IN + gpointer user_data) // IN +{ + GdpHistoryRequestFree((HistoryRequest *) item_data); +} + + +/* + ***************************************************************************** + * GdpTaskClearHistoryRequestQueue -- + * + * Removes all the elements in history request queue. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static inline void +GdpTaskClearHistoryRequestQueue(TaskContext *taskCtx) // IN/OUT +{ + g_queue_foreach(&taskCtx->requests, GdpHistoryRequestFreeGFunc, NULL); + g_queue_clear(&taskCtx->requests); +} + + +/* + ****************************************************************************** + * GdpGetHistoryCacheSizeLimit -- + * + * Gets history cache size limit from tools config. + * + ****************************************************************************** + */ + +static guint32 +GdpGetHistoryCacheSizeLimit() // IN +{ + gint sizeLimit; + + sizeLimit = GDP_CONFIG_GET_INT(CONFNAME_GDP_CACHE_SIZE, + GDP_DEFAULT_CACHE_SIZE_LIMIT); + if (sizeLimit != 0 && + (sizeLimit < GDP_MIN_CACHE_SIZE_LIMIT || + sizeLimit > GDP_MAX_CACHE_SIZE_LIMIT)) { + g_warning("%s: Configured history cache size limit %d exceeds range, " + "set to default value %d.\n", + __FUNCTION__, sizeLimit, GDP_DEFAULT_CACHE_SIZE_LIMIT); + sizeLimit = GDP_DEFAULT_CACHE_SIZE_LIMIT; + } + + return (guint32) sizeLimit; +} + + +/* + ****************************************************************************** + * GdpTaskIsHistoryCacheEnabled -- + * + * Checks if history cache is enabled. + * + * @param[in] taskCtx The task context + * + * @return TRUE if history cache size limit is not zero. + * @return FALSE otherwise. + * + ****************************************************************************** + */ + +static inline Bool +GdpTaskIsHistoryCacheEnabled(TaskContext *taskCtx) // IN +{ + return taskCtx->cache.sizeLimit != 0 ? TRUE : FALSE; +} + + +/* + ***************************************************************************** + * GdpHistoryCacheItemFree -- + * + * Frees history cache item resources. + * + * @param[in] item History cache item pointer + * + ***************************************************************************** + */ + +static void +GdpHistoryCacheItemFree(HistoryCacheItem *item) // IN +{ + g_debug("%s: Entering ...\n", __FUNCTION__); + ASSERT(item != NULL); + free(item->topic); + free(item->token); + free(item->category); + free(item->data); + free(item); +} + + +/* + ***************************************************************************** + * GdpHistoryCacheItemFreeGFunc -- + * + * GFunc called by GdpTaskClearHistoryCacheQueue. + * + * @param[in] item_data History cache item pointer + * @param[in] user_data Not used + * + ***************************************************************************** + */ + +static void +GdpHistoryCacheItemFreeGFunc(gpointer item_data, // IN + gpointer user_data) // IN +{ + GdpHistoryCacheItemFree((HistoryCacheItem *) item_data); +} + + +/* + ***************************************************************************** + * GdpTaskClearHistoryCacheQueue -- + * + * Removes all the elements in history cache queue. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static inline void +GdpTaskClearHistoryCacheQueue(TaskContext *taskCtx) // IN/OUT +{ + g_queue_foreach(&taskCtx->cache.queue, GdpHistoryCacheItemFreeGFunc, NULL); + g_queue_clear(&taskCtx->cache.queue); + +} + + +/* + ***************************************************************************** + * GdpTaskDeleteHistoryCacheTail -- + * + * Deletes the tail of history cache queue and removes its reference. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static void +GdpTaskDeleteHistoryCacheTail(TaskContext *taskCtx) // IN/OUT +{ + HistoryCacheItem *item; + + ASSERT(!g_queue_is_empty(&taskCtx->cache.queue)); + + if (taskCtx->cache.currentLink == + g_queue_peek_tail_link(&taskCtx->cache.queue)) { + taskCtx->cache.currentLink = NULL; + } + + item = (HistoryCacheItem *) g_queue_pop_tail(&taskCtx->cache.queue); + ASSERT(item != NULL); + taskCtx->cache.size -= item->itemSize; + GdpHistoryCacheItemFree(item); +} + + +/* + ****************************************************************************** + * GdpTaskHistoryCachePushItem -- + * + * Pushes the published guest data item into history cache queue. + * + * @param[in,out] taskCtx The task context + * @param[in] createTime UTC timestamp, in number of micro- + * seconds since January 1, 1970 UTC. + * @param[in] topic Topic + * @param[in,optional] token Token, can be NULL + * @param[in,optional] category Category, can be NULL that defaults to + * "application" + * @param[in] data Buffer containing data to publish + * @param[in] dataLen Buffer length + * + ****************************************************************************** + */ + +static void +GdpTaskHistoryCachePushItem(TaskContext *taskCtx, // IN/OUT + gint64 createTime, // IN + const gchar *topic, // IN + const gchar *token, // IN + const gchar *category, // IN + const gchar *data, // IN + guint32 dataLen) // IN +{ + HistoryCacheItem *item; + + ASSERT(topic != NULL); + ASSERT(data != NULL && dataLen > 0); + ASSERT(GdpTaskIsHistoryCacheEnabled(taskCtx)); + + item = (HistoryCacheItem *) Util_SafeMalloc(sizeof(HistoryCacheItem)); + item->createTime = createTime; + item->topic = Util_SafeStrdup(topic); + item->token = Util_SafeStrdup(token); + item->category = Util_SafeStrdup(category); + item->data = (gchar *) Util_SafeMalloc(dataLen); + Util_Memcpy(item->data, data, dataLen); + item->dataLen = dataLen; + + item->cacheTime = g_get_monotonic_time(); + item->itemSize = ((guint32) sizeof(HistoryCacheItem)) + + GDP_STR_SIZE(item->topic) + + GDP_STR_SIZE(item->token) + + GDP_STR_SIZE(item->category) + + item->dataLen; + + while (taskCtx->cache.size + item->itemSize > taskCtx->cache.sizeLimit) { + GdpTaskDeleteHistoryCacheTail(taskCtx); + } + + g_queue_push_head(&taskCtx->cache.queue, item); + taskCtx->cache.size += item->itemSize; + + g_debug("%s: Current history cache size in bytes: %u, item count: %u.\n", + __FUNCTION__, taskCtx->cache.size, + g_queue_get_length(&taskCtx->cache.queue)); +} + + +/* + ****************************************************************************** + * GdpGetFormattedUtcTime -- + * + * Converts UTC time to string in format like "2018-02-22T21:17:38.517Z". + * + * The caller must free the return value using g_free. + * + * @param[in] utcTime Real wall-clock time + * Number of microseconds since January 1, 1970 UTC. + * + * @return Formatted timestamp string on success. + * @return NULL otherwise. + * + ****************************************************************************** + */ + +static gchar * +GdpGetFormattedUtcTime(gint64 utcTime) // IN +{ + gchar *retVal = NULL; + GDateTime *utcDateTime; + + utcDateTime = g_date_time_new_from_unix_utc(utcTime / USEC_PER_SECOND); + if (utcDateTime != NULL) { + gchar *dateToSecond; // YYYY-MM-DDTHH:MM:SS + + dateToSecond = g_date_time_format(utcDateTime, "%FT%T"); + if (dateToSecond != NULL) { + gint msec; // milliseconds + + msec = (utcTime % USEC_PER_SECOND) / USEC_PER_MILLISECOND; + retVal = g_strdup_printf("%s.%03dZ", dateToSecond, msec); + g_free(dateToSecond); + } + + g_date_time_unref(utcDateTime); + } + + return retVal; +} + + +/* + ****************************************************************************** + * GdpTaskBuildPacket -- + * + * Builds JSON packet in the task context. + * + * @param[in,out] taskCtx The task context + * @param[in] createTime UTC timestamp, in number of micro- + * seconds since January 1, 1970 UTC. + * @param[in] topic Topic + * @param[in,optional] token Token, can be NULL + * @param[in,optional] category Category, can be NULL that defaults to + * "application" + * @param[in] data Buffer containing data to publish + * @param[in] dataLen Buffer length + * @param[in,optional] subscribers For history data only, NULL for new data + * + * @return GDP_ERROR_SUCCESS on success. + * @return Other GdpError code otherwise. + * + ****************************************************************************** + */ + +static GdpError +GdpTaskBuildPacket(TaskContext *taskCtx, // IN/OUT + gint64 createTime, // IN + const gchar *topic, // IN + const gchar *token, // IN, OPTIONAL + const gchar *category, // IN, OPTIONAL + const gchar *data, // IN + guint32 dataLen, // IN + const gchar *subscribers) // IN, OPTIONAL +{ + gchar base64Data[GDP_MAX_PACKET_LEN + 1]; // Add a space for NULL + gchar *subscribersLine = NULL; + gchar *formattedTime; + GdpError gdpErr; + + ASSERT(topic != NULL); + ASSERT(data != NULL && dataLen > 0); + + if (!Base64_Encode(data, dataLen, base64Data, sizeof base64Data, NULL)) { + g_info("%s: Base64_Encode failed, data length is %u.\n", + __FUNCTION__, dataLen); + return GDP_ERROR_DATA_SIZE; + } + + if (subscribers != NULL && *subscribers != '\0') { + subscribersLine = g_strdup_printf(GDP_PACKET_JSON_LINE_SUBSCRIBERS, + subscribers); + } + + formattedTime = GdpGetFormattedUtcTime(createTime); + + ASSERT(taskCtx->packet == NULL); + taskCtx->packet = g_strdup_printf(GDP_PACKET_JSON, + ++taskCtx->sequence, + subscribersLine != NULL ? + subscribersLine : "", + formattedTime != NULL ? + formattedTime : "", + topic, + token != NULL ? + token : "", + category != NULL ? + category : "application", + base64Data); + taskCtx->packetLen = (guint32) strlen(taskCtx->packet); + if (taskCtx->packetLen > GDP_MAX_PACKET_LEN) { + g_info("%s: Packet length (%u) exceeds maximum limit (%u).\n", + __FUNCTION__, taskCtx->packetLen, GDP_MAX_PACKET_LEN); + GdpTaskDestroyPacket(taskCtx); + gdpErr = GDP_ERROR_DATA_SIZE; + } else { + gdpErr = GDP_ERROR_SUCCESS; + } + + g_free(formattedTime); + g_free(subscribersLine); + return gdpErr; +} + + +/* + ***************************************************************************** + * GdpTaskDestroyPacket -- + * + * Destroys JSON packet in the task context. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static inline void +GdpTaskDestroyPacket(TaskContext *taskCtx) // IN/OUT +{ + g_free(taskCtx->packet); + taskCtx->packet = NULL; + taskCtx->packetLen = 0; +} + + +/* + ****************************************************************************** + * GdpTaskOkToSend -- + * + * Checks if current time is OK to send JSON packet to host side gdp daemon. + * + * @param[in] taskCtx The task context + * + * @return TRUE if current time has passed the task context sendAfter time. + * @return FALSE otherwise. + * + ****************************************************************************** + */ + +static inline Bool +GdpTaskOkToSend(TaskContext *taskCtx) // IN +{ + return g_get_monotonic_time() >= taskCtx->sendAfter ? + TRUE : FALSE; +} + + +/* + ****************************************************************************** + * GdpTaskSendPacket -- + * + * Sends JSON packet in the task context to host side gdp daemon. + * + * Datagram send is not buffered, it will not return EWOULDBLOCK. + * If host daemon is not running, error EHOSTUNREACH is returned. + * + * @param[in,out] taskCtx The task context + * + * @return GDP_ERROR_SUCCESS on success. + * @return Other GdpError code otherwise. + * + ****************************************************************************** + */ + +static GdpError +GdpTaskSendPacket(TaskContext *taskCtx) // IN/OUT +{ + struct sockaddr_vm destAddr; + GdpError gdpErr; + + ASSERT(gPluginState.sock != INVALID_SOCKET); + ASSERT(taskCtx->packet != NULL && taskCtx->packetLen > 0); + + memset(&destAddr, 0, sizeof destAddr); + destAddr.svm_family = gPluginState.vmciFamily; + destAddr.svm_cid = VMCI_HOST_CONTEXT_ID; + destAddr.svm_port = GDPD_RECV_PORT; // No htons + + gdpErr = GdpSendTo(gPluginState.sock, taskCtx->packet, + (int) taskCtx->packetLen, &destAddr); + if (gdpErr == GDP_ERROR_SUCCESS) { + /* + * Updates sendAfter for next send, phase 1 / 2. + */ + taskCtx->sendAfter = g_get_monotonic_time(); + taskCtx->timeoutAt = taskCtx->sendAfter + + GDP_WAIT_RESULT_TIMEOUT * USEC_PER_MILLISECOND; + } else { + GdpTaskDestroyPacket(taskCtx); + taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE; + } + + return gdpErr; +} + + +/* + ***************************************************************************** + * GdpTaskUpdateHistoryCachePointerAndGetSubscribers -- + * + * Updates history cache pointer and gets subscribers of the pointed item. + * + * @param[in,out] taskCtx The task context + * + * @return A string of subscription IDs separated by comma + (to be freed by caller), + * or NULL if no match. + * + ***************************************************************************** + */ + +static gchar * +GdpTaskUpdateHistoryCachePointerAndGetSubscribers( + TaskContext *taskCtx) // IN/OUT +{ + gchar *subscribers = NULL; + + if (taskCtx->cache.currentLink == NULL) { + /* + * Starts with the earliest history cache link. + */ + taskCtx->cache.currentLink = g_queue_peek_tail_link( + &taskCtx->cache.queue); + } + + while (taskCtx->cache.currentLink != NULL && + !g_queue_is_empty(&taskCtx->requests)) { + HistoryCacheItem *item; + GList *requestList; + + item = (HistoryCacheItem *) taskCtx->cache.currentLink->data; + requestList = g_queue_peek_tail_link(&taskCtx->requests); + while (requestList != NULL) { + HistoryRequest *request = (HistoryRequest *) requestList->data; + Bool requestDone = FALSE; + + if (item->cacheTime > request->endCacheTime) { + requestDone = TRUE; + } else if (request->beginCacheTime < item->cacheTime && + item->cacheTime <= request->endCacheTime) { + if (subscribers == NULL) { + subscribers = g_strdup_printf("\"%s\"", request->id); + } else { + gchar *subscribersNew; + subscribersNew = g_strdup_printf("%s,\"%s\"", + subscribers, request->id); + g_free(subscribers); + subscribers = subscribersNew; + } + + if (item->cacheTime == request->endCacheTime) { + requestDone = TRUE; + } else { + request->beginCacheTime = item->cacheTime; + } + } + + if (requestDone) { + GList *requestListPrev = requestList->prev; + GdpHistoryRequestFree(request); + g_queue_delete_link(&taskCtx->requests, requestList); + requestList = requestListPrev; + } else { + requestList = requestList->prev; + } + } + + if (subscribers != NULL) { + break; + } + + taskCtx->cache.currentLink = taskCtx->cache.currentLink->prev; + } + + return subscribers; +} + + +/* + ***************************************************************************** + * GdpTaskPublishHistory -- + * + * Publishes cached history guest data. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static void +GdpTaskPublishHistory(TaskContext *taskCtx) // IN/OUT +{ + GdpError gdpErr; + gchar *subscribers; + HistoryCacheItem *item; + + g_debug("%s: Entering ...\n", __FUNCTION__); + + ASSERT(taskCtx->mode == GDP_TASK_MODE_NONE && + taskCtx->state == GDP_TASK_STATE_IDLE); + + subscribers = GdpTaskUpdateHistoryCachePointerAndGetSubscribers(taskCtx); + if (subscribers == NULL) { + g_info("%s: No history item meets subscription requirement.\n", + __FUNCTION__); + goto cleanup; + } + + ASSERT(taskCtx->cache.currentLink != NULL); + item = (HistoryCacheItem *) taskCtx->cache.currentLink->data; + /* + * Updates history cache pointer. + */ + taskCtx->cache.currentLink = taskCtx->cache.currentLink->prev; + + gdpErr = GdpTaskBuildPacket(taskCtx, + item->createTime, + item->topic, + item->token, + item->category, + item->data, + item->dataLen, + subscribers); + if (gdpErr != GDP_ERROR_SUCCESS) { + /* + * Theoretically speaking, too many subscribers could cause JSON packet + * length exceed maximum limit and fail GdpTaskBuildPacket with + * GDP_ERROR_DATA_SIZE. + */ + g_info("%s: Failed to build JSON packet for subscribers: [%s].\n", + __FUNCTION__, subscribers); + g_free(subscribers); + goto cleanup; + } + g_free(subscribers); + + if (GdpTaskOkToSend(taskCtx)) { + gdpErr = GdpTaskSendPacket(taskCtx); + if (gdpErr != GDP_ERROR_SUCCESS) { + g_info("%s: Failed to send history JSON packet.\n", + __FUNCTION__); + goto cleanup; + } + + taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1; + } else { + taskCtx->timeoutAt = taskCtx->sendAfter; + taskCtx->state = GDP_TASK_STATE_WAIT_TO_SEND; + } + + taskCtx->mode = GDP_TASK_MODE_HISTORY; + g_debug("%s: Updated mode=%d, state=%d.\n", + __FUNCTION__, taskCtx->mode, taskCtx->state); + return; + +cleanup: + taskCtx->cache.currentLink = NULL; + GdpTaskClearHistoryRequestQueue(taskCtx); +} + + +/* + ***************************************************************************** + * GdpTaskProcessConfigChange -- + * + * Processes config change. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static void +GdpTaskProcessConfigChange(TaskContext *taskCtx) // IN/OUT +{ + guint32 sizeLimit = GdpGetHistoryCacheSizeLimit(); + + if (taskCtx->cache.sizeLimit == sizeLimit) { + return; + } + + g_debug("%s: Current history cache size limit: %u, new value: %u.\n", + __FUNCTION__, taskCtx->cache.sizeLimit, sizeLimit); + taskCtx->cache.sizeLimit = sizeLimit; + while (taskCtx->cache.size > taskCtx->cache.sizeLimit) { + GdpTaskDeleteHistoryCacheTail(taskCtx); + } +} + + +/* + ****************************************************************************** + * GdpJsonIsTokenOfKey -- + * + * Checks if the JSON token represents the key. + * + * @param[in] json The JSON text + * @param[in] token Token + * @param[in] key Key name + * + * @return TRUE if token represents the key. + * @return FALSE otherwise. + * + ****************************************************************************** + */ + +static Bool +GdpJsonIsTokenOfKey(const char *json, // IN + jsmntok_t *token, // IN + const char *key) // IN +{ + int tokenLen = token->end - token->start; + if (token->type == JSMN_STRING && + token->size == 1 && // The token represents a key + (int) strlen(key) == tokenLen && + strncmp(json + token->start, key, tokenLen) == 0) { + return TRUE; + } + return FALSE; +} + + +/* + ****************************************************************************** + * GdpJsonIsPublishResult -- + * + * Checks if the JSON text represents a publish result. + * + * @param[in] json The JSON text + * @param[in] tokens Token array + * @param[in] count Token array count + * @param[out] request Pointer to publish result + * + * @return TRUE if json represents a publish result. + * @return FALSE otherwise. + * + ****************************************************************************** + */ + +static Bool +GdpJsonIsPublishResult(const char *json, // IN + jsmntok_t *tokens, // IN + int count, // IN + PublishResult *result) // OUT +{ + int index; + int requiredKeys = GDP_RESULT_REQUIRED_KEYS; + gchar *diagnosis = NULL; + + /* + * Loops over all keys of the root object. + */ + for (index = 1; index < count; index++) { + int tokenLen; + + if (GdpJsonIsTokenOfKey(json, &tokens[index], GDP_RESULT_SEQUENCE)) { + requiredKeys--; + index++; + result->sequence = g_ascii_strtoull(json + tokens[index].start, + NULL, 10); + } else if (GdpJsonIsTokenOfKey(json, &tokens[index], + GDP_RESULT_STATUS)) { + requiredKeys--; + index++; + tokenLen = tokens[index].end - tokens[index].start; + if ((int) strlen(GDP_RESULT_STATUS_OK) == tokenLen && + strncmp(json + tokens[index].start, + GDP_RESULT_STATUS_OK, tokenLen) == 0) { + result->statusOk = TRUE; + } else { + result->statusOk = FALSE; + } + } else if (GdpJsonIsTokenOfKey(json, &tokens[index], + GDP_RESULT_DIAGNOSIS)) { + index++; + tokenLen = tokens[index].end - tokens[index].start; + diagnosis = g_strndup(json + tokens[index].start, tokenLen); + } else if (GdpJsonIsTokenOfKey(json, &tokens[index], + GDP_RESULT_RATE_LIMIT)) { + requiredKeys--; + index++; + result->rateLimit = atoi(json + tokens[index].start); + } + } + + if (requiredKeys == 0) { + result->diagnosis = diagnosis; + return TRUE; + } else { + g_free(diagnosis); + return FALSE; + } +} + + +/* + ***************************************************************************** + * GdpTaskProcessPublishResult -- + * + * Processes publish result. + * + * @param[in,out] taskCtx The task context + * @param[in] result Publish result + * + ***************************************************************************** + */ + +static void +GdpTaskProcessPublishResult(TaskContext *taskCtx, // IN/OUT + PublishResult *result) // IN +{ + g_debug("%s: Entering ...\n", __FUNCTION__); + + if (taskCtx->mode == GDP_TASK_MODE_NONE || + (taskCtx->state != GDP_TASK_STATE_WAIT_FOR_RESULT1 && + taskCtx->state != GDP_TASK_STATE_WAIT_FOR_RESULT2)) { + g_info("%s: Publish result not expected at mode=%d, state=%d.\n", + __FUNCTION__, taskCtx->mode, taskCtx->state); + return; + } + + if (taskCtx->sequence != result->sequence) { + g_info("%s: Publish result sequence number not match.\n", + __FUNCTION__); + return; + } + + if (!result->statusOk) { + g_info("%s: Publish failed: %s\n", __FUNCTION__, + result->diagnosis ? result->diagnosis : ""); + } + + if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) { + if (result->statusOk) { + if (GdpTaskIsHistoryCacheEnabled(taskCtx)) { + GdpTaskHistoryCachePushItem(taskCtx, + gPublishState.createTime, + gPublishState.topic, + gPublishState.token, + gPublishState.category, + gPublishState.data, + gPublishState.dataLen); + } + + gPublishState.gdpErr = GDP_ERROR_SUCCESS; + } else { + gPublishState.gdpErr = GDP_ERROR_INVALID_DATA; + } + + GdpSetEvent(gPublishState.eventGetResult); + } + + GdpTaskDestroyPacket(taskCtx); + if (result->rateLimit > 0) { + /* + * Updates sendAfter for next send, phase 2 / 2. + */ + taskCtx->sendAfter += (USEC_PER_SECOND / result->rateLimit); + } + taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE; + taskCtx->mode = GDP_TASK_MODE_NONE; + taskCtx->state = GDP_TASK_STATE_IDLE; + g_debug("%s: Reset mode=%d, state=%d.\n", + __FUNCTION__, taskCtx->mode, taskCtx->state); +} + + +/* + ****************************************************************************** + * GdpJsonIsHistoryRequest -- + * + * Checks if the JSON text represents a history request. + * + * @param[in] json The JSON text + * @param[in] tokens Token array + * @param[in] count Token array count + * @param[out] request Pointer to history request + * + * @return TRUE if json represents a history request. + * @return FALSE otherwise. + * + ****************************************************************************** + */ + +static Bool +GdpJsonIsHistoryRequest(const char *json, // IN + jsmntok_t *tokens, // IN + int count, // IN + HistoryRequest *request) // OUT +{ + int index; + int requiredKeys = GDP_HISTORY_REQUEST_REQUIRED_KEYS; + guint64 pastSeconds = 0; + + /* + * Loops over all keys of the root object + */ + for (index = 1; index < count; index++) { + if (GdpJsonIsTokenOfKey(json, &tokens[index], + GDP_HISTORY_REQUEST_PAST_SECONDS)) { + requiredKeys--; + index++; + pastSeconds = g_ascii_strtoull(json + tokens[index].start, NULL, 10); + } else if (GdpJsonIsTokenOfKey(json, &tokens[index], + GDP_HISTORY_REQUEST_ID)) { + requiredKeys--; + index++; + Str_Strncpy(request->id, sizeof request->id, + json + tokens[index].start, + tokens[index].end - tokens[index].start); + } + } + + if (requiredKeys == 0) { + request->endCacheTime = g_get_monotonic_time(); + request->beginCacheTime = request->endCacheTime - + (gint64)pastSeconds * USEC_PER_SECOND; + return TRUE; + } + + return FALSE; +} + + +/* + ***************************************************************************** + * GdpTaskPushHistoryRequest -- + * + * Pushes new history request to queue and resets history cache pointer. + * + * @param[in,out] taskCtx The task context + * @param[in] request New history request + * + ***************************************************************************** + */ + +static void +GdpTaskPushHistoryRequest(TaskContext *taskCtx, // IN/OUT + HistoryRequest *request) // IN +{ + HistoryRequest *requestCopy; + + g_debug("%s: Entering ...\n", __FUNCTION__); + + if (!GdpTaskIsHistoryCacheEnabled(taskCtx)) { + g_info("%s: History cache not enabled.\n", __FUNCTION__); + return; + } + + if (request->beginCacheTime >= request->endCacheTime) { + g_info("%s: Invalid history request.\n", __FUNCTION__); + return; + } + + if (request->beginCacheTime < 0) { + request->beginCacheTime = 0; + } + + requestCopy = (HistoryRequest *) Util_SafeMalloc(sizeof *request); + Util_Memcpy(requestCopy, request, sizeof *request); + /* + * Note: each request comes with a unique subscription ID. + */ + g_queue_push_head(&taskCtx->requests, requestCopy); + /* + * Resets history cache pointer. + */ + taskCtx->cache.currentLink = NULL; +} + + +/* + ***************************************************************************** + * GdpTaskProcessNetwork -- + * + * Processes the network event. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static void +GdpTaskProcessNetwork(TaskContext *taskCtx) // IN/OUT +{ + GdpError gdpErr; + char buf[GDP_MAX_PACKET_LEN + 1]; // Adds a space for NULL + int bufLen = (int) sizeof buf - 1; + struct sockaddr_vm srcAddr; + jsmn_parser parser; + jsmntok_t tokens[16]; // We expect no more than 16 tokens + int retVal; + Bool isPublishResult; + Bool isHistoryRequest; + PublishResult result = { 0 }; + HistoryRequest request = { 0 }; + + g_debug("%s: Entering ...\n", __FUNCTION__); + + gdpErr = GdpRecvFrom(gPluginState.sock, buf, &bufLen, &srcAddr); + if (gdpErr != GDP_ERROR_SUCCESS || bufLen <= 0) { + return; + } + + if (srcAddr.svm_cid != VMCI_HOST_CONTEXT_ID) { + g_info("%s: Unexpected source svm_cid: %u.\n", + __FUNCTION__, srcAddr.svm_cid); + return; + } + + buf[bufLen] = '\0'; + + jsmn_init(&parser); + retVal = jsmn_parse(&parser, buf, bufLen, + tokens, (unsigned int) ARRAYSIZE(tokens)); + if (retVal < 0) { + g_info("%s: Error %d while parsing JSON:\n%s\n", + __FUNCTION__, retVal, buf); + return; + } + + /* + * The top-level element should be an object. + */ + if (retVal < 1 || tokens[0].type != JSMN_OBJECT) { + g_info("%s: Invalid JSON:\n%s\n", __FUNCTION__, buf); + return; + } + + isPublishResult = FALSE; + isHistoryRequest = FALSE; + if (srcAddr.svm_port == GDPD_RECV_PORT) { + isPublishResult = GdpJsonIsPublishResult(buf, tokens, + retVal, &result); + if (!isPublishResult) { + isHistoryRequest = GdpJsonIsHistoryRequest(buf, tokens, + retVal, &request); + } + } else { + isHistoryRequest = GdpJsonIsHistoryRequest(buf, tokens, + retVal, &request); + if (!isHistoryRequest) { + isPublishResult = GdpJsonIsPublishResult(buf, tokens, + retVal, &result); + } + } + + if (isPublishResult) { + GdpTaskProcessPublishResult(taskCtx, &result); + g_free(result.diagnosis); + } else if (isHistoryRequest) { + GdpTaskPushHistoryRequest(taskCtx, &request); + } else { + g_info("%s: Unknown JSON:\n%s\n", __FUNCTION__, buf); + } +} + + +/* + ***************************************************************************** + * GdpTaskProcessPublish -- + * + * Processes the publish event. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static void +GdpTaskProcessPublish(TaskContext *taskCtx) // IN/OUT +{ + GdpError gdpErr; + + g_debug("%s: Entering ...\n", __FUNCTION__); + + ASSERT(taskCtx->mode != GDP_TASK_MODE_PUBLISH); + + if (taskCtx->mode != GDP_TASK_MODE_NONE) { + g_debug("%s: Set publish pending.\n", __FUNCTION__); + ASSERT(!taskCtx->publishPending); + taskCtx->publishPending = TRUE; + return; + } + + ASSERT(taskCtx->state == GDP_TASK_STATE_IDLE); + + gdpErr = GdpTaskBuildPacket(taskCtx, + gPublishState.createTime, + gPublishState.topic, + gPublishState.token, + gPublishState.category, + gPublishState.data, + gPublishState.dataLen, + NULL); + if (gdpErr != GDP_ERROR_SUCCESS) { + goto fail; + } + + if (GdpTaskOkToSend(taskCtx)) { + gdpErr = GdpTaskSendPacket(taskCtx); + if (gdpErr != GDP_ERROR_SUCCESS) { + goto fail; + } + + taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1; + } else { + taskCtx->timeoutAt = taskCtx->sendAfter; + taskCtx->state = GDP_TASK_STATE_WAIT_TO_SEND; + } + + taskCtx->mode = GDP_TASK_MODE_PUBLISH; + g_debug("%s: Updated mode=%d, state=%d.\n", + __FUNCTION__, taskCtx->mode, taskCtx->state); + return; + +fail: + gPublishState.gdpErr = gdpErr; + GdpSetEvent(gPublishState.eventGetResult); +} + + +/* + ***************************************************************************** + * GdpTaskProcessTimeout -- + * + * Processes wait timeout. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** + */ + +static void +GdpTaskProcessTimeout(TaskContext *taskCtx) // IN/OUT +{ + GdpError gdpErr; + + g_debug("%s: Entering ...\n", __FUNCTION__); + + ASSERT(taskCtx->mode != GDP_TASK_MODE_NONE && + taskCtx->state != GDP_TASK_STATE_IDLE); + + if (taskCtx->state == GDP_TASK_STATE_WAIT_TO_SEND || + taskCtx->state == GDP_TASK_STATE_WAIT_FOR_RESULT1) { + gdpErr = GdpTaskSendPacket(taskCtx); + if (gdpErr != GDP_ERROR_SUCCESS) { + goto fail; + } + + if (taskCtx->state == GDP_TASK_STATE_WAIT_TO_SEND) { + taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT1; + } else { + taskCtx->state = GDP_TASK_STATE_WAIT_FOR_RESULT2; + } + + g_debug("%s: Updated mode=%d, state=%d.\n", + __FUNCTION__, taskCtx->mode, taskCtx->state); + } else if (taskCtx->state == GDP_TASK_STATE_WAIT_FOR_RESULT2) { + g_warning("%s: Wait for publish result timed out.\n", __FUNCTION__); + GdpTaskDestroyPacket(taskCtx); + taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE; + gdpErr = GDP_ERROR_TIMEOUT; + goto fail; + } else { + NOT_REACHED(); + } + + return; + +fail: + if (taskCtx->mode == GDP_TASK_MODE_PUBLISH) { + gPublishState.gdpErr = gdpErr; + GdpSetEvent(gPublishState.eventGetResult); + } + + taskCtx->state = GDP_TASK_STATE_IDLE; + taskCtx->mode = GDP_TASK_MODE_NONE; + g_debug("%s: Reset mode=%d, state=%d.\n", + __FUNCTION__, taskCtx->mode, taskCtx->state); +} + + +/* + ***************************************************************************** + * GdpTaskGetTimeout -- + * + * Gets timeout value for the next wait call. + * + * @param[in] taskCtx The task context + * + ***************************************************************************** + */ + +static int +GdpTaskGetTimeout(TaskContext *taskCtx) // IN +{ + gint64 curTime; + gint64 timeout; + + if (taskCtx->timeoutAt == GDP_TIMEOUT_AT_INFINITE) { + return GDP_WAIT_INFINITE; + } + + curTime = g_get_monotonic_time(); + if (curTime >= taskCtx->timeoutAt) { + return 0; + } + + timeout = (taskCtx->timeoutAt - curTime) / USEC_PER_MILLISECOND; + return (int)timeout; +} + - } while (TRUE); +/* + ***************************************************************************** + * GdpTaskProcessEvents -- + * + * Processes task events. + * + * @param[in,out] taskCtx The task context + * @param[in] taskEvent Task event to process + * + ***************************************************************************** + */ - return retVal; +static void +GdpTaskProcessEvents(TaskContext *taskCtx, // IN/OUT + GdpTaskEvent taskEvent) // IN +{ + switch (taskEvent) { + case GDP_TASK_EVENT_CONFIG: + GdpResetEvent(gPluginState.eventConfig); + GdpTaskProcessConfigChange(taskCtx); + break; + case GDP_TASK_EVENT_NETWORK: + GdpTaskProcessNetwork(taskCtx); + break; + case GDP_TASK_EVENT_PUBLISH: + GdpResetEvent(gPublishState.eventPublish); + GdpTaskProcessPublish(taskCtx); + break; + case GDP_TASK_EVENT_TIMEOUT: + GdpTaskProcessTimeout(taskCtx); + break; + default: + //GDP_TASK_EVENT_NONE + //GDP_TASK_EVENT_STOP + NOT_REACHED(); + } } /* ****************************************************************************** - * GdpWaitForEvent -- + * GdpTaskWaitForEvents -- * - * Waits for the stop event object/fd signalled for vmtoolsd shutdown, - * network send/receive event ready or timeout. + * Waits for the following events or timeout: + * The stop event + * The config event + * The network event + * The publish event * - * @param[in] netEvent GOS specific network send/receive event flag - * @param[in] timeout Timeout value in milliseconds, + * @param[in] timeout Timeout value in milliseconds, * negative value means an infinite timeout, - * zero means no wait + * zero means no wait. + * @param[out] taskEvent Return which event is signalled + * if no error happens. * - * @return GDP_ERROR_SUCCESS on specified network event ready. - * @return Other GdpError codes otherwise. + * @return GDP_ERROR_SUCCESS if no error happens. + * @return Other GdpError code otherwise. * ****************************************************************************** */ static GdpError -GdpWaitForEvent(int netEvent, int timeout) +GdpTaskWaitForEvents(int timeout, // IN + GdpTaskEvent *taskEvent) // OUT { #if defined(_WIN32) int res; @@ -519,54 +2288,56 @@ GdpWaitForEvent(int netEvent, int timeout) gint64 startTime; GdpError retVal; - ASSERT(netEvent == FD_READ || netEvent == FD_WRITE); - ASSERT(pluginData.sock != INVALID_SOCKET); + ASSERT(Atomic_ReadBool(&gPluginState.started)); /* - * Reset the send-recv event object. + * Resets the network event object. */ - WSAResetEvent(pluginData.eventSendRecv); + WSAResetEvent(gPluginState.eventNetwork); /* - * Associate the send-recv event object with the specified network event + * Associates the network event object with FD_READ network event * in the socket. */ - res = WSAEventSelect(pluginData.sock, pluginData.eventSendRecv, netEvent); + res = WSAEventSelect(gPluginState.sock, gPluginState.eventNetwork, FD_READ); if (res != 0) { - g_info("%s: WSAEventSelect failed: error=%d.\n", - __FUNCTION__, WSAGetLastError()); - return GDP_ERROR_INTERNAL; + g_warning("%s: WSAEventSelect failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); + return GDP_ERROR_GENERAL; } - localTimeout = (DWORD)(timeout >= 0 ? timeout : WSA_INFINITE); + retVal = GDP_ERROR_SUCCESS; + + localTimeout = (timeout >= 0 ? (DWORD) timeout : WSA_INFINITE); if (timeout > 0) { startTime = g_get_monotonic_time(); } else { - startTime = 0; // Deal with [-Werror=maybe-uninitialized] + startTime = 0; // Deals with [-Werror=maybe-uninitialized] } while (TRUE) { - WSAEVENT eventObjects[] = {pluginData.eventStop, - pluginData.eventSendRecv}; + WSAEVENT eventObjects[] = { gPluginState.eventStop, + gPluginState.eventConfig, + gPluginState.eventNetwork, + gPublishState.eventPublish }; DWORD waitRes; waitRes = WSAWaitForMultipleEvents((DWORD)ARRAYSIZE(eventObjects), eventObjects, FALSE, localTimeout, TRUE); if (waitRes == WSA_WAIT_EVENT_0) { - /* - * Main thread has set the stop event object to interrupt - * pool thread for vmtoolsd shutdown. - */ - retVal = GDP_ERROR_STOP; + *taskEvent = GDP_TASK_EVENT_STOP; break; } else if (waitRes == (WSA_WAIT_EVENT_0 + 1)) { + *taskEvent = GDP_TASK_EVENT_CONFIG; + break; + } else if (waitRes == (WSA_WAIT_EVENT_0 + 2)) { WSANETWORKEVENTS networkEvents; - res = WSAEnumNetworkEvents(pluginData.sock, NULL, &networkEvents); + res = WSAEnumNetworkEvents(gPluginState.sock, NULL, &networkEvents); if (res != 0) { - g_info("%s: WSAEnumNetworkEvents failed: error=%d.\n", - __FUNCTION__, WSAGetLastError()); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: WSAEnumNetworkEvents failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); + retVal = GDP_ERROR_GENERAL; break; } @@ -576,14 +2347,17 @@ GdpWaitForEvent(int netEvent, int timeout) * since WSAEnumNetworkEvents should have returned WSAENETDOWN * if the error condition exists. */ - if (networkEvents.lNetworkEvents & netEvent) { - retVal = GDP_ERROR_SUCCESS; + if (networkEvents.lNetworkEvents & FD_READ) { + *taskEvent = GDP_TASK_EVENT_NETWORK; } else { // Not expected - g_info("%s: Unexpected network event from WSAEnumNetworkEvents.\n", - __FUNCTION__); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: Unexpected network event.\n", + __FUNCTION__); + retVal = GDP_ERROR_GENERAL; } + break; + } else if (waitRes == (WSA_WAIT_EVENT_0 + 3)) { + *taskEvent = GDP_TASK_EVENT_PUBLISH; break; } else if (waitRes == WSA_WAIT_IO_COMPLETION) { gint64 curTime; @@ -595,9 +2369,9 @@ GdpWaitForEvent(int netEvent, int timeout) } curTime = g_get_monotonic_time(); - passedTime = (curTime - startTime) / 1000; + passedTime = (curTime - startTime) / USEC_PER_MILLISECOND; if (passedTime >= localTimeout) { - retVal = GDP_ERROR_TIMEOUT; + *taskEvent = GDP_TASK_EVENT_TIMEOUT; break; } @@ -605,60 +2379,66 @@ GdpWaitForEvent(int netEvent, int timeout) localTimeout -= (DWORD)passedTime; continue; } else if (waitRes == WSA_WAIT_TIMEOUT) { - retVal = GDP_ERROR_TIMEOUT; + *taskEvent = GDP_TASK_EVENT_TIMEOUT; break; } else { // WSA_WAIT_FAILED - g_info("%s: WSAWaitForMultipleEvents failed: error=%d.\n", - __FUNCTION__, WSAGetLastError()); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: WSAWaitForMultipleEvents failed: error=%d.\n", + __FUNCTION__, WSAGetLastError()); + retVal = GDP_ERROR_GENERAL; break; } } /* - * Cancel the association. + * Cancels the association. */ - WSAEventSelect(pluginData.sock, NULL, 0); + WSAEventSelect(gPluginState.sock, NULL, 0); return retVal; #else gint64 startTime; - GdpError retVal; + GdpError retVal = GDP_ERROR_SUCCESS; - ASSERT(netEvent == POLLIN || netEvent == POLLOUT); - ASSERT(pluginData.sock != INVALID_SOCKET); + ASSERT(Atomic_ReadBool(&gPluginState.started)); if (timeout > 0) { startTime = g_get_monotonic_time(); } else { - startTime = 0; // Deal with [-Werror=maybe-uninitialized] + startTime = 0; // Deals with [-Werror=maybe-uninitialized] } while (TRUE) { - struct pollfd fds[2]; + struct pollfd fds[4]; int res; - fds[0].fd = pluginData.eventStop; + fds[0].fd = gPluginState.eventStop; fds[0].events = POLLIN; fds[0].revents = 0; - fds[1].fd = pluginData.sock; - fds[1].events = (short)netEvent; + fds[1].fd = gPluginState.eventConfig; + fds[1].events = POLLIN; fds[1].revents = 0; + fds[2].fd = gPluginState.sock; + fds[2].events = POLLIN; + fds[2].revents = 0; + fds[3].fd = gPublishState.eventPublish; + fds[3].events = POLLIN; + fds[3].revents = 0; res = poll(fds, ARRAYSIZE(fds), timeout); if (res > 0) { if (fds[0].revents & POLLIN) { - /* - * Check comments in _WIN32. - */ - retVal = GDP_ERROR_STOP; - } else if (fds[1].revents & netEvent) { - retVal = GDP_ERROR_SUCCESS; + *taskEvent = GDP_TASK_EVENT_STOP; + } else if (fds[1].revents & POLLIN) { + *taskEvent = GDP_TASK_EVENT_CONFIG; + } else if (fds[2].revents & POLLIN) { + *taskEvent = GDP_TASK_EVENT_NETWORK; + } else if (fds[3].revents & POLLIN) { + *taskEvent = GDP_TASK_EVENT_PUBLISH; } else { // Not expected - g_info("%s: Unexpected event from poll.\n", __FUNCTION__); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: Unexpected event.\n", __FUNCTION__); + retVal = GDP_ERROR_GENERAL; } break; @@ -673,26 +2453,26 @@ GdpWaitForEvent(int netEvent, int timeout) } curTime = g_get_monotonic_time(); - passedTime = (curTime - startTime) / 1000; - if (passedTime >= (gint64)timeout) { - retVal = GDP_ERROR_TIMEOUT; + passedTime = (curTime - startTime) / USEC_PER_MILLISECOND; + if (passedTime >= timeout) { + *taskEvent = GDP_TASK_EVENT_TIMEOUT; break; } startTime = curTime; - timeout -= (int)passedTime; + timeout -= (int) passedTime; continue; } else { - g_info("%s: poll failed: error=%d.\n", __FUNCTION__, err); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: poll failed: error=%d.\n", __FUNCTION__, err); + retVal = GDP_ERROR_GENERAL; break; } } else if (res == 0) { - retVal = GDP_ERROR_TIMEOUT; + *taskEvent = GDP_TASK_EVENT_TIMEOUT; break; } else { - g_info("%s: Unexpected poll return: %d.\n", __FUNCTION__, res); - retVal = GDP_ERROR_INTERNAL; + g_warning("%s: Unexpected poll return: %d.\n", __FUNCTION__, res); + retVal = GDP_ERROR_GENERAL; break; } } @@ -703,265 +2483,480 @@ GdpWaitForEvent(int netEvent, int timeout) /* - ****************************************************************************** - * GdpSend -- + ***************************************************************************** + * GdpTaskCtxInit -- * - * Sends guest data to host side gdp daemon. + * Initializes the task context states and resources. * - * @param[in] buf Send data buffer pointer - * @param[in] len Send data buffer length - * @param[in] timeout Timeout value in milliseconds, - * negative value means an infinite timeout, - * zero means no wait + * @param[out] taskCtx The task context * - * @return GDP_ERROR_SUCCESS on success. - * @return Other GdpError codes otherwise. + ***************************************************************************** + */ + +static void +GdpTaskCtxInit(TaskContext *taskCtx) // OUT +{ + taskCtx->mode = GDP_TASK_MODE_NONE; + taskCtx->state = GDP_TASK_STATE_IDLE; + taskCtx->publishPending = FALSE; + + g_queue_init(&taskCtx->cache.queue); + taskCtx->cache.sizeLimit = GdpGetHistoryCacheSizeLimit(); + taskCtx->cache.size = 0; + taskCtx->cache.currentLink = NULL; + + g_queue_init(&taskCtx->requests); + + taskCtx->sequence = 0; + taskCtx->packet = NULL; + taskCtx->packetLen = 0; + + taskCtx->timeoutAt = GDP_TIMEOUT_AT_INFINITE; + taskCtx->sendAfter = GDP_SEND_AFTER_ANY_TIME; +} + + +/* + ***************************************************************************** + * GdpTaskCtxDestroy -- * - ****************************************************************************** + * Destroys the task context resources. + * + * @param[in,out] taskCtx The task context + * + ***************************************************************************** */ -static GdpError -GdpSend(const char *buf, // IN - int len, // IN - int timeout) // IN +static void +GdpTaskCtxDestroy(TaskContext *taskCtx) // IN/OUT { - GdpError retVal; + GdpTaskClearHistoryCacheQueue(taskCtx); + GdpTaskClearHistoryRequestQueue(taskCtx); + GdpTaskDestroyPacket(taskCtx); +} + + +/* + ***************************************************************************** + * GdpThreadTask -- + * + * The gdp task thread routine. + * + * @param[in] ctx The application context + * @param[in] data Data pointer, not used + * + ***************************************************************************** + */ + +static void +GdpThreadTask(ToolsAppCtx *ctx, // IN + void *data) // IN +{ + TaskContext taskCtx; - ASSERT(buf != NULL && len > 0); - ASSERT(pluginData.sock != INVALID_SOCKET); + g_debug("%s: Entering ...\n", __FUNCTION__); + + GdpTaskCtxInit(&taskCtx); do { - long res; - int sockErr; + int timeout; + GdpError gdpErr; + GdpTaskEvent taskEvent = GDP_TASK_EVENT_NONE; - res = send(pluginData.sock, buf, len, 0); - if (res >= 0) { - retVal = GDP_ERROR_SUCCESS; - break; + /* + * Part 1 inside the loop: + * Checks and processes one pending event at idle state. + */ + + if (taskCtx.mode == GDP_TASK_MODE_NONE) { + ASSERT(taskCtx.state == GDP_TASK_STATE_IDLE); + ASSERT(taskCtx.timeoutAt == GDP_TIMEOUT_AT_INFINITE); + + if (taskCtx.publishPending) { // Higher priority + taskCtx.publishPending = FALSE; + GdpTaskProcessPublish(&taskCtx); + } else if (!g_queue_is_empty(&taskCtx.requests)) { + /* + * History request pending. + * + * GdpTaskPublishHistory clears history request queue if it fails + * to kick off the state machine. + */ + GdpTaskPublishHistory(&taskCtx); + } + } + + /* + * Part 2 inside the loop: + * Gets and processes one event at a time, then loops back to part 1 + * except for the stop event. + */ + + timeout = GdpTaskGetTimeout(&taskCtx); + if (timeout == 0) { + GdpTaskProcessEvents(&taskCtx, GDP_TASK_EVENT_TIMEOUT); + continue; } - sockErr = GetSockErr(); - if (sockErr == SYSERR_EINTR) { + /* + * timeout == GDP_WAIT_INFINITE means taskCtx.mode == GDP_TASK_MODE_NONE + * and taskCtx.state == GDP_TASK_STATE_IDLE. There should be no pending + * publish event or history request in this case. + */ + ASSERT(timeout != GDP_WAIT_INFINITE || + (!taskCtx.publishPending && + g_queue_is_empty(&taskCtx.requests))); + + gdpErr = GdpTaskWaitForEvents(timeout, &taskEvent); + if (gdpErr != GDP_ERROR_SUCCESS) { continue; - } else if (SYSERR_WOULDBLOCK(sockErr)) { + } + + if (taskEvent == GDP_TASK_EVENT_STOP) { /* - * Datagram send is not buffered, if host daemon is not running, - * send returns error EHOSTUNREACH. In theory, this case should - * not happen, we just follow standard async socket programming - * paradigm here. + * In case the publish event comes at the same time, + * only the stop event is handled, so do not check + * (taskCtx.mode == GDP_TASK_MODE_PUBLISH) here. */ - GdpError err; + gPublishState.gdpErr = GDP_ERROR_STOP; + GdpSetEvent(gPublishState.eventGetResult); + break; + } - g_info("%s: Gdp send would block.\n", __FUNCTION__); + GdpTaskProcessEvents(&taskCtx, taskEvent); - err = GdpWaitForEvent(SOCK_WRITE, timeout); - if (err == GDP_ERROR_SUCCESS) { - continue; - } + } while (!Atomic_ReadBool(&gPluginState.stopped)); - retVal = err; - } else if (sockErr == SYSERR_EHOSTUNREACH) { - g_info("%s: send failed: host daemon unreachable.\n", __FUNCTION__); - retVal = GDP_ERROR_UNREACH; - } else if (sockErr == SYSERR_EMSGSIZE) { - g_info("%s: send failed: message too large.\n", __FUNCTION__); - retVal = GDP_ERROR_SEND_SIZE; - } else { - g_info("%s: send failed: error=%d.\n", __FUNCTION__, sockErr); - retVal = GDP_ERROR_INTERNAL; - } + GdpTaskCtxDestroy(&taskCtx); - break; + g_debug("%s: Exiting ...\n", __FUNCTION__); +} - } while (TRUE); - return retVal; +/* + ***************************************************************************** + * GdpThreadInterrupt -- + * + * Interrupts the gdp task thread to exit. + * + * @param[in] ctx The application context + * @param[in] data Data pointer, not used + * + ***************************************************************************** + */ + +static void +GdpThreadInterrupt(ToolsAppCtx *ctx, // IN + void *data) // IN +{ + g_debug("%s: Entering ...\n", __FUNCTION__); + ASSERT(Atomic_ReadBool(&gPluginState.started)); + Atomic_WriteBool(&gPluginState.stopped, TRUE); + GdpSetEvent(gPluginState.eventStop); } /* ****************************************************************************** - * GdpRecv -- - * - * Receives reply from host side gdp daemon. + * GdpInit -- * - * @param[out] buf Receive buffer pointer - * @param[in,out] len Receive buffer length on input, - * reply data length on output - * @param[in] timeout Timeout value in milliseconds, - * negative value means an infinite timeout, - * zero means no wait + * Initializes plugin state data and resources. * - * @return GDP_ERROR_SUCCESS on success. - * @return Other GdpError codes otherwise. + * @param[in] ctx The application context * ****************************************************************************** */ -static GdpError -GdpRecv(char *buf, // OUT - int *len, // IN/OUT - int timeout) // IN +static void +GdpInit(ToolsAppCtx *ctx) // IN { - GdpError retVal; - - ASSERT(buf != NULL && len != NULL && *len > 0); - ASSERT(pluginData.sock != INVALID_SOCKET); + gPluginState.ctx = ctx; + Atomic_WriteBool(&gPluginState.started, FALSE); - do { - long res; - int sockErr; - - res = recv(pluginData.sock, buf, *len, 0); - if (res >= 0) { - *len = (int)res; - retVal = GDP_ERROR_SUCCESS; - break; - } +#if defined(_WIN32) + gPluginState.wsaStarted = FALSE; +#endif - sockErr = GetSockErr(); - if (sockErr == SYSERR_EINTR) { - continue; - } else if (SYSERR_WOULDBLOCK(sockErr)) { - GdpError err = GdpWaitForEvent(SOCK_READ, timeout); - if (err == GDP_ERROR_SUCCESS) { - continue; - } + gPluginState.vmciFd = -1; + gPluginState.vmciFamily = -1; + gPluginState.sock = INVALID_SOCKET; +#if defined(_WIN32) + gPluginState.eventNetwork = GDP_INVALID_EVENT; +#endif - retVal = err; - } else if (sockErr == SYSERR_EMSGSIZE) { - g_info("%s: recv failed: buffer size too small.\n", __FUNCTION__); - retVal = GDP_ERROR_RECV_SIZE; - } else { - g_info("%s: recv failed: error=%d.\n", __FUNCTION__, sockErr); - retVal = GDP_ERROR_INTERNAL; - } + gPluginState.eventStop = GDP_INVALID_EVENT; + Atomic_WriteBool(&gPluginState.stopped, FALSE); - break; + gPluginState.eventConfig = GDP_INVALID_EVENT; - } while (TRUE); + gPublishState.eventPublish = GDP_INVALID_EVENT; - return retVal; + gPublishState.eventGetResult = GDP_INVALID_EVENT; } /* ****************************************************************************** - * GdpPublish -- + * GdpStart -- * - * Publishes guest data to host side gdp daemon. - * - * @param[in] msg Buffer containing guest data to publish - * @param[in] msgLen Guest data length - * @param[out,optional] reply Buffer to receive reply from gdp daemon - * @param[in,out,optional] replyLen NULL when param reply is NULL, otherwise: - * reply buffer length on input, - * reply data length on output + * Creates plugin resources and starts the task thread for data publishing. * - * @return GDP_ERROR_SUCCESS on success. - * @return Other GdpError codes otherwise. + * @return TRUE on success. + * @return FALSE otherwise. * ****************************************************************************** */ -static GdpError -GdpPublish(const char *msg, // IN - int msgLen, // IN - char *reply, // OUT, OPTIONAL - int *replyLen) // IN/OUT, OPTIONAL +static Bool +GdpStart(void) { - static GMutex mutex; - - GdpError err; - char altRecvBuf[GDP_SEND_RECV_BUF_LEN]; // Enough for maximum size datagram - int altRecvBufLen = (int) sizeof altRecvBuf; - char *recvBuf; - int *recvBufLen; + Bool retVal = FALSE; g_debug("%s: Entering ...\n", __FUNCTION__); - ASSERT((reply == NULL && replyLen == NULL) || - (reply != NULL && replyLen != NULL && *replyLen > 0)); + ASSERT(!Atomic_ReadBool(&gPluginState.started)); - g_mutex_lock(&mutex); +#if defined(_WIN32) + { + WSADATA wsaData; + int res = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (res != 0) { + g_critical("%s: WSAStartup failed: error=%d.\n", + __FUNCTION__, res); + return FALSE; + } - if (Atomic_ReadBool(&pluginData.stopped)) { - /* - * Main thread has interrupted pool thread for vmtoolsd shutdown. - */ - err = GDP_ERROR_STOP; + gPluginState.wsaStarted = TRUE; + } +#endif + + gPluginState.vmciFamily = VMCISock_GetAFValueFd(&gPluginState.vmciFd); + if (gPluginState.vmciFamily == -1) { + g_critical("%s: Failed to get vSocket address family value.\n", + __FUNCTION__); goto exit; } - if (pluginData.sock == INVALID_SOCKET && !GdpCreateSocket()) { - err = GDP_ERROR_INTERNAL; + if (!GdpCreateSocket()) { + g_critical("%s: Failed to create VMCI datagram socket.\n", + __FUNCTION__); goto exit; } - err = GdpEmptyRecvQueue(); - if (err != GDP_ERROR_SUCCESS) { +#if defined(_WIN32) + gPluginState.eventNetwork = GdpCreateEvent(); + if (gPluginState.eventNetwork == GDP_INVALID_EVENT) { + g_critical("%s: GdpCreateEvent for network failed: error=%d.\n", + __FUNCTION__, GetSysErr()); goto exit; } +#endif - err = GdpSend(msg, msgLen, - GDP_SEND_TIMEOUT); // Should not time out in theory - if (err != GDP_ERROR_SUCCESS) { - g_info("%s: GdpSend failed: %s.\n", __FUNCTION__, gdpErrMsgs[err]); + gPluginState.eventStop = GdpCreateEvent(); + if (gPluginState.eventStop == GDP_INVALID_EVENT) { + g_critical("%s: GdpCreateEvent for stop failed: error=%d.\n", + __FUNCTION__, GetSysErr()); goto exit; } - if (reply != NULL) { - recvBuf = reply; - recvBufLen = replyLen; - } else { - recvBuf = altRecvBuf; - recvBufLen = &altRecvBufLen; + gPluginState.eventConfig = GdpCreateEvent(); + if (gPluginState.eventConfig == GDP_INVALID_EVENT) { + g_critical("%s: GdpCreateEvent for config failed: error=%d.\n", + __FUNCTION__, GetSysErr()); + goto exit; + } + + gPublishState.eventPublish = GdpCreateEvent(); + if (gPublishState.eventPublish == GDP_INVALID_EVENT) { + g_critical("%s: GdpCreateEvent for publish failed: error=%d.\n", + __FUNCTION__, GetSysErr()); + goto exit; } - err = GdpRecv(recvBuf, recvBufLen, GDP_RECV_TIMEOUT); - if (err != GDP_ERROR_SUCCESS) { - g_info("%s: GdpRecv failed: %s.\n", __FUNCTION__, gdpErrMsgs[err]); + gPublishState.eventGetResult = GdpCreateEvent(); + if (gPublishState.eventGetResult == GDP_INVALID_EVENT) { + g_critical("%s: GdpCreateEvent for get-result failed: error=%d.\n", + __FUNCTION__, GetSysErr()); + goto exit; + } + + if (!ToolsCorePool_StartThread(gPluginState.ctx, "GdpThread", + GdpThreadTask, + GdpThreadInterrupt, + NULL, NULL)) { + g_critical("%s: Failed to start the gdp task thread.\n", __FUNCTION__); + goto exit; } + Atomic_WriteBool(&gPluginState.started, TRUE); + retVal = TRUE; + exit: - if (err == GDP_ERROR_INTERNAL) { - /* - * No need to close and recreate socket for these errors: - * - * GDP_ERROR_UNREACH - * GDP_ERROR_TIMEOUT - * GDP_ERROR_SEND_SIZE - * GDP_ERROR_RECV_SIZE - */ - GdpCloseSocket(); + if (!retVal) { + GdpDestroy(); + } + + return retVal; +} + + +/* + ****************************************************************************** + * GdpDestroy -- + * + * Destroys plugin resources. + * + ****************************************************************************** + */ + +static void +GdpDestroy(void) +{ + g_debug("%s: Entering ...\n", __FUNCTION__); + + GdpCloseSocket(); + + if (gPluginState.vmciFd != -1) { + VMCISock_ReleaseAFValueFd(gPluginState.vmciFd); + gPluginState.vmciFd = -1; } - g_mutex_unlock(&mutex); +#if defined(_WIN32) + GdpCloseEvent(&gPluginState.eventNetwork); +#endif + + GdpCloseEvent(&gPluginState.eventStop); + + GdpCloseEvent(&gPluginState.eventConfig); + + GdpCloseEvent(&gPublishState.eventPublish); - g_debug("%s: Return: %s.\n", __FUNCTION__, gdpErrMsgs[err]); + GdpCloseEvent(&gPublishState.eventGetResult); - return err; +#if defined(_WIN32) + if (gPluginState.wsaStarted) { + WSACleanup(); + gPluginState.wsaStarted = FALSE; + } +#endif } /* ****************************************************************************** - * GdpStop -- + * GdpPublish -- + * + * Publishes guest data to host side gdp daemon. * - * Stops guest data publishing for vmtoolsd shutdown, called by main thread. + * @param[in] createTime UTC timestamp, in number of micro- + * seconds since January 1, 1970 UTC. + * @param[in] topic Topic + * @param[in,optional] token Token, can be NULL + * @param[in,optional] category Category, can be NULL that defaults to + * "application" + * @param[in] data Buffer containing data to publish + * @param[in] dataLen Buffer length * - * @return GDP_ERROR_SUCCESS. + * @return GDP_ERROR_SUCCESS on success. + * @return Other GdpError code otherwise. * ****************************************************************************** */ static GdpError -GdpStop(void) +GdpPublish(gint64 createTime, // IN + const gchar *topic, // IN + const gchar *token, // IN, OPTIONAL + const gchar *category, // IN, OPTIONAL + const gchar *data, // IN + guint32 dataLen) // IN { + GdpError gdpErr; + g_debug("%s: Entering ...\n", __FUNCTION__); - Atomic_WriteBool(&pluginData.stopped, TRUE); - GdpSetStopEvent(); - return GDP_ERROR_SUCCESS; + + if (topic == NULL || *topic == '\0') { + g_info("%s: Missing topic.\n", __FUNCTION__); + return GDP_ERROR_INVALID_DATA; + } + + if (data == NULL || dataLen == 0) { + g_info("%s: Topic '%s' has no data.\n", __FUNCTION__, topic); + return GDP_ERROR_INVALID_DATA; + } + + if (token != NULL && *token == '\0') { + token = NULL; + } + + if (category != NULL && *category == '\0') { + category = NULL; + } + + g_mutex_lock(&gPublishState.mutex); + + if (Atomic_ReadBool(&gPluginState.stopped)) { + gdpErr = GDP_ERROR_STOP; + goto exit; + } + + if (!Atomic_ReadBool(&gPluginState.started) && + !GdpStart()) { + gdpErr = GDP_ERROR_GENERAL; + goto exit; + } + + gPublishState.createTime = createTime; + gPublishState.topic = topic; + gPublishState.token = token; + gPublishState.category = category; + gPublishState.data = data; + gPublishState.dataLen = dataLen; + + GdpSetEvent(gPublishState.eventPublish); + + do { + gdpErr = GdpWaitForEvent(gPublishState.eventGetResult, + GDP_WAIT_INFINITE); + if (gdpErr == GDP_ERROR_SUCCESS) { + gdpErr = gPublishState.gdpErr; + break; + } + } while (!Atomic_ReadBool(&gPluginState.stopped)); + + GdpResetEvent(gPublishState.eventGetResult); + +exit: + g_mutex_unlock(&gPublishState.mutex); + g_debug("%s: Exiting with gdp error: %s.\n", + __FUNCTION__, gdpErrMsgs[gdpErr]); + return gdpErr; +} + + +/* + *----------------------------------------------------------------------------- + * GdpConfReload -- + * + * Handles gdp config change. + * + * @param[in] src The source object, unused + * @param[in] ctx The application context + * @param[in] data Unused + * + *----------------------------------------------------------------------------- + */ + +static void +GdpConfReload(gpointer src, // IN + ToolsAppCtx *ctx, // IN + gpointer data) // IN +{ + if (!Atomic_ReadBool(&gPluginState.started)) { + return; + } + + GdpSetEvent(gPluginState.eventConfig); } @@ -971,9 +2966,9 @@ GdpStop(void) * * Cleans up on shutdown. * - * @param[in] src The source object, unused - * @param[in] ctx The application context - * @param[in] data Unused + * @param[in] src The source object, unused + * @param[in] ctx The application context + * @param[in] data Unused * ****************************************************************************** */ @@ -984,6 +2979,8 @@ GdpShutdown(gpointer src, // IN gpointer data) // IN { g_debug("%s: Entering ...\n", __FUNCTION__); + ASSERT(!Atomic_ReadBool(&gPluginState.started) || + Atomic_ReadBool(&gPluginState.stopped)); g_object_set(ctx->serviceObj, TOOLS_PLUGIN_SVC_PROP_GDP, NULL, NULL); GdpDestroy(); } @@ -995,7 +2992,7 @@ GdpShutdown(gpointer src, // IN * * Plugin entry point. Initializes internal plugin state. * - * @param[in] ctx The application context + * @param[in] ctx The application context * * @return The registration data. * @@ -1006,7 +3003,7 @@ TOOLS_MODULE_EXPORT ToolsPluginData * ToolsOnLoad(ToolsAppCtx *ctx) // IN { /* - * Return NULL to disable the plugin if not running in vmsvc daemon. + * Returns NULL to disable the plugin if not running in vmsvc daemon. */ if (!TOOLS_IS_MAIN_SERVICE(ctx)) { g_info("%s: Not running in vmsvc daemon: container name='%s'.\n", @@ -1015,7 +3012,7 @@ ToolsOnLoad(ToolsAppCtx *ctx) // IN } /* - * Return NULL to disable the plugin if not running in a VMware VM. + * Returns NULL to disable the plugin if not running in a VMware VM. */ if (!ctx->isVMware) { g_info("%s: Not running in a VMware VM.\n", __FUNCTION__); @@ -1023,7 +3020,7 @@ ToolsOnLoad(ToolsAppCtx *ctx) // IN } /* - * Return NULL to disable the plugin if VM is not running on ESX host. + * Returns NULL to disable the plugin if VM is not running on ESX host. */ { uint32 vmxVersion = 0; @@ -1035,18 +3032,16 @@ ToolsOnLoad(ToolsAppCtx *ctx) // IN } } - if (!GdpInit(ctx)) { - g_info("%s: Failed to init plugin.\n", __FUNCTION__); - return NULL; - } + GdpInit(ctx); { - static ToolsPluginSvcGdp svcGdp = { GdpPublish, GdpStop }; + static ToolsPluginSvcGdp svcGdp = { GdpPublish }; static ToolsPluginData regData = { "gdp", NULL, NULL, NULL }; ToolsServiceProperty propGdp = { TOOLS_PLUGIN_SVC_PROP_GDP }; ToolsPluginSignalCb sigs[] = { + { TOOLS_CORE_SIG_CONF_RELOAD, GdpConfReload, NULL }, { TOOLS_CORE_SIG_SHUTDOWN, GdpShutdown, NULL }, }; ToolsAppReg regs[] = {