]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
dpdk_telemetry plugin: add plugin for DPDK metrics via DPDK Telemetry library
authorReshma Pattan <reshma.pattan@intel.com>
Thu, 16 May 2019 11:00:38 +0000 (12:00 +0100)
committerReshma Pattan <reshma.pattan@intel.com>
Fri, 11 Oct 2019 16:39:57 +0000 (17:39 +0100)
This patch introduces a new plugin for collectd, which consumes DPDK metrics
via the dpdk_telemetry library. The collectd plugin here provides an
easy way to use the DPDK telemetry API to query ethernet device metrics.

The collectd plugin retrieves metrics from a DPDK packet forwarding
application by sending a JSON formatted message via a UNIX domain
socket. The DPDK telemetry component will respond with a JSON formatted
reply, delivering the requested metrics. The dpdk_telemetry collectd
plugin parses the JSON data, and publishes the metric values to collectd
for further use.

This plugin has a dependency on the DPDK Telemetry library, as it must be
"in sync" with the DPDK Telemetry implementation.

Change-Id: If3343aae4c5473f0574465fab0395b7672fa2488
Signed-off-by: Emma Kenny <emma.kenny@intel.com>
Signed-off-by: Brian Archbold <brian.archbold@intel.com>
Signed-off-by: Reshma Pattan <reshma.pattan@intel.com>
Makefile.am
README
configure.ac
src/collectd.conf.in
src/collectd.conf.pod
src/dpdk_telemetry.c [new file with mode: 0755]
src/types.db

index bdb95a1b367eca87908eac31f097968769c24b68..22988a811163546aef4561b2d5c0f46d68b4dea2 100644 (file)
@@ -963,6 +963,13 @@ dpdkstat_la_CFLAGS = $(AM_CFLAGS) $(LIBDPDK_CFLAGS)
 dpdkstat_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(LIBDPDK_LDFLAGS)
 dpdkstat_la_LIBADD = $(LIBDPDK_LIBS)
 endif
+if BUILD_PLUGIN_DPDK_TELEMETRY
+pkglib_LTLIBRARIES += dpdk_telemetry.la
+dpdk_telemetry_la_SOURCES = src/dpdk_telemetry.c
+dpdk_telemetry_la_CFLAGS = $(AM_CFLAGS)
+dpdk_telemetry_la_LDFLAGS = $(PLUGIN_LDFLAGS) $(BUILD_WITH_LIBJANSSON_LDFLAGS)
+dpdk_telemetry_la_LIBADD = $(BUILD_WITH_LIBJANSSON_LIBS)
+endif
 
 if BUILD_PLUGIN_DRBD
 pkglib_LTLIBRARIES += drbd.la
diff --git a/README b/README
index f77efd219583ee1f72589ce12ff98b04c40a1407..b5ce45dfd0e8706537ebb414146f2fc942de9e12 100644 (file)
--- a/README
+++ b/README
@@ -106,6 +106,15 @@ Features
       This plugin should be compiled with compiler defenses enabled, for
       example -fstack-protector.
 
+    - dpdk_telemetry
+      Collect DPDK interface, application and global statistics.
+      This plugin can be used as substitute to dpdkstat plugin.
+
+      This plugin is dependent on DPDK 19.08 release and must be used
+      along with the DPDK application.
+
+      Also, this plugin has dependency on Jansson library.
+
     - drbd
       Collect individual drbd resource statistics.
 
@@ -824,6 +833,9 @@ Prerequisites
     For querying iptables counters.
     <http://netfilter.org/>
 
+  * libjansson (optional)
+    Used by the `dpdk_telemetry` plugin.
+
   * libjevents (optional)
     The jevents library is used by the `intel_pmu' plugin to access the Linux
     kernel perf interface.
index 4e585bd2165c2c73d0b728a56b2d0ebc38959403..5a349f4a4b1725e8354096f6052c7e0b1b70c46d 100644 (file)
@@ -4552,6 +4552,51 @@ AC_SUBST([BUILD_WITH_LIBPQOS_LDFLAGS])
 AC_SUBST([BUILD_WITH_LIBPQOS_LIBS])
 # }}}
 
+# --with-libjansson {{{
+AC_ARG_WITH([libjansson],
+[AS_HELP_STRING([--with-libjansson@<:@=PREFIX@:>@], [Path to libjansson.])],
+[
+  if test "x$withval" != "xno" && test "x$withval" != "xyes"; then
+   with_libjansson_cppflags="-I$withval/include"
+   with_libjansson_ldflags="-L$withval/lib"
+   with_libjansson="yes"
+  else
+      with_libjansson="$withval"
+  fi
+],
+  [with_libjansson="yes"]
+)
+
+if test "x$with_libjansson" = "xyes"; then
+  SAVE_CPPFLAGS="$CPPFLAGS"
+  CPPFLAGS="$CPPFLAGS $with_libjansson_cppflags"
+
+  AC_CHECK_HEADERS([jansson.h],
+    [with_libjansson="yes"],
+    [with_libjansson="no (jansson.h not found)"]
+  )
+ CPPFLAGS="$SAVE_CPPFLAGS"
+fi
+if test "x$with_libjansson" = "xyes"; then
+  SAVE_LDFLAGS="$LDFLAGS"
+  LDFLAGS="$LDFLAGS $with_jansson_ldflags"
+
+  AC_CHECK_LIB([jansson], [json_is_object],
+    [with_jansson="yes"],
+    [with_jansson="no (Symbol 'json_is_object' not found)"]
+  )
+
+  LDFLAGS="$SAVE_LDFLAGS"
+fi
+
+BUILD_WITH_LIBJANSSON_CPPFLAGS="$with_libjansson_cppflags"
+BUILD_WITH_LIBJANSSON_LDFLAGS="$with_libjansson_ldflags"
+BUILD_WITH_LIBJANSSON_LIBS="-ljansson"
+AC_SUBST(BUILD_WITH_LIBJANSSON_CPPFLAGS)
+AC_SUBST(BUILD_WITH_LIBJANSSON_LDFLAGS)
+AC_SUBST(BUILD_WITH_LIBJANSSON_LIBS)
+# }}}
+
 # --with-libjevents {{{
 with_libjevents_cppflags=""
 with_libjevents_ldflags=""
@@ -6386,6 +6431,7 @@ plugin_disk="no"
 plugin_drbd="no"
 plugin_dpdkevents="no"
 plugin_dpdkstat="no"
+plugin_dpdk_telemetry="no"
 plugin_entropy="no"
 plugin_ethstat="no"
 plugin_fhcount="no"
@@ -6773,6 +6819,10 @@ if test "x$with_libdpdk" = "xyes"; then
   plugin_dpdkstat="yes"
 fi
 
+if test "x$with_libjansson" = "xyes"; then
+  plugin_dpdk_telemetry="yes"
+fi
+
 m4_divert_once([HELP_ENABLE], [
 collectd plugins:])
 
@@ -6824,6 +6874,7 @@ AC_PLUGIN([dns],                 [$with_libpcap],             [DNS traffic analy
 AC_PLUGIN([dpdkevents],          [$plugin_dpdkevents],        [Events from DPDK])
 AC_PLUGIN([dpdkstat],            [$plugin_dpdkstat],          [Stats from DPDK])
 AC_PLUGIN([drbd],                [$plugin_drbd],              [DRBD statistics])
+AC_PLUGIN([dpdk_telemetry],      [$plugin_dpdk_telemetry],    [Metrics from DPDK Telemetry])
 AC_PLUGIN([email],               [yes],                       [EMail statistics])
 AC_PLUGIN([entropy],             [$plugin_entropy],           [Entropy statistics])
 AC_PLUGIN([ethstat],             [$plugin_ethstat],           [Stats from NIC driver])
@@ -7164,6 +7215,7 @@ AC_MSG_RESULT([    libhiredis  . . . . . $with_libhiredis])
 AC_MSG_RESULT([    libi2c-dev  . . . . . $with_libi2c])
 AC_MSG_RESULT([    libiokit  . . . . . . $with_libiokit])
 AC_MSG_RESULT([    libiptc . . . . . . . $with_libiptc])
+AC_MSG_RESULT([    libjansson  . . . . . $with_libjansson])
 AC_MSG_RESULT([    libjevents  . . . . . $with_libjevents])
 AC_MSG_RESULT([    libjvm  . . . . . . . $with_java])
 AC_MSG_RESULT([    libkstat  . . . . . . $with_kstat])
@@ -7257,6 +7309,7 @@ AC_MSG_RESULT([    dns . . . . . . . . . $enable_dns])
 AC_MSG_RESULT([    dpdkevents. . . . . . $enable_dpdkevents])
 AC_MSG_RESULT([    dpdkstat  . . . . . . $enable_dpdkstat])
 AC_MSG_RESULT([    drbd  . . . . . . . . $enable_drbd])
+AC_MSG_RESULT([    dpdk_telemetry. . . . $enable_dpdk_telemetry])
 AC_MSG_RESULT([    email . . . . . . . . $enable_email])
 AC_MSG_RESULT([    entropy . . . . . . . $enable_entropy])
 AC_MSG_RESULT([    ethstat . . . . . . . $enable_ethstat])
index a194b4752bf8d5772f0271de41ad9dcd521fccf0..cf5bbd116e90fb8bb6a37308cda292c61e38bdb2 100644 (file)
 #@BUILD_PLUGIN_DNS_TRUE@LoadPlugin dns
 #@BUILD_PLUGIN_DPDKEVENTS_TRUE@LoadPlugin dpdkevents
 #@BUILD_PLUGIN_DPDKSTAT_TRUE@LoadPlugin dpdkstat
+#@BUILD_PLUGIN_DPDK_TELEMETRY_TRUE@LoadPlugin dpdk_telemetry
 #@BUILD_PLUGIN_DRBD_TRUE@LoadPlugin drbd
 #@BUILD_PLUGIN_EMAIL_TRUE@LoadPlugin email
 #@BUILD_PLUGIN_ENTROPY_TRUE@LoadPlugin entropy
 #  PortName "interface2"
 #</Plugin>
 
+#<Plugin dpdk_telemetry>
+#      ClientSocketPath "/var/run/.client"
+#      DpdkSocketPath "/var/run/dpdk/rte/telemetry"
+#</Plugin>
+
 #<Plugin email>
 #      SocketFile "@localstatedir@/run/@PACKAGE_NAME@-email"
 #      SocketGroup "collectd"
index cda1002c5de721ebde9d3ed6ada459ae1c255e0a..0b7fbe8f922e3e27a9489bbf72b4a68c6a6913ab 100644 (file)
@@ -2944,6 +2944,41 @@ convention will be used for the additional ports.
 
 =back
 
+=head2 Plugin C<dpdk_telemetry>
+
+
+The I< dpdk_telemetry > plugin collects DPDK ethernet device metrics via
+dpdk_telemetry library.
+
+The plugin retrieves metrics from a DPDK packet forwarding application
+by sending the JSON formatted message via a UNIX domain socket.
+The DPDK telemetry component will respond with a JSON formatted reply,
+delivering the requested metrics. The plugin parses the JSON data,
+and publishes the metric values to collectd for further use.
+
+B<Synopsis:>
+
+  <Plugin dpdk_telemetry>
+    ClientSocketPath "/var/run/.client"
+    DpdkSocketPath "/var/run/dpdk/rte/telemetry"
+  </Plugin>
+
+B<Options:>
+
+=over 2
+
+=item B<ClientSocketPath> I<Client_Path>
+
+The UNIX domain client socket at I<Client_Path> to receive messages from DPDK
+telemetry library. Defaults to B<"/var/run/.client">.
+
+=item B<DpdkSocketPath> I<Dpdk_Path>
+
+The UNIX domain DPDK telemetry socket to be connected at I<Dpdk_Path> to send
+messages. Defaults to B<"/var/run/dpdk/rte/telemetry">.
+
+=back
+
 =head2 Plugin C<email>
 
 =over 4
diff --git a/src/dpdk_telemetry.c b/src/dpdk_telemetry.c
new file mode 100755 (executable)
index 0000000..9ae9fd3
--- /dev/null
@@ -0,0 +1,434 @@
+/*-
+ * collectd - src/dpdk_telemetry.c
+ * MIT License
+ *
+ * Copyright(c) 2019 Intel Corporation. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of
+ * this software and associated documentation files (the "Software"), to deal in
+ * the Software without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is furnished to
+ * do
+ * so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ */
+
+#include "collectd.h"
+#include "plugin.h"
+#include "utils/common/common.h"
+#include "utils_time.h"
+
+#include <errno.h>
+#include <jansson.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <sys/unistd.h>
+
+#define BUF_SIZE 100000
+#define PLUGIN_NAME "dpdk_telemetry"
+#define DEFAULT_DPDK_PATH "/var/run/dpdk/rte/telemetry"
+#define DEFAULT_CLIENT_PATH "/var/run/.client"
+#define MAX_COMMANDS 2
+
+struct client_info {
+  int s_send;
+  int s_recv;
+  int fd;
+  const char *dpdk_path;
+  const char *client_path;
+  struct sockaddr_un addr;
+  struct sockaddr_un addrs;
+};
+
+typedef struct client_info client_info_t;
+
+static client_info_t client;
+static char g_client_path[BUF_SIZE];
+static char g_dpdk_path[BUF_SIZE];
+cdtime_t dpdk_tel_interval = 0;
+
+static int dpdk_telemetry_config(oconfig_item_t *ci) {
+  int ret, i;
+
+  INFO(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
+
+  for (i = 0; i < ci->children_num; i++) {
+    oconfig_item_t *child = ci->children + i;
+
+    if (strcasecmp("ClientSocketPath", child->key) == 0) {
+      ret = cf_util_get_string_buffer(child, g_client_path,
+                                      sizeof(g_client_path));
+    } else if (strcasecmp("DpdkSocketPath", child->key) == 0) {
+      ret = cf_util_get_string_buffer(child, g_dpdk_path, sizeof(g_dpdk_path));
+    } else {
+      ERROR(PLUGIN_NAME ": Unknown configuration parameter"
+                        "\"%s\"",
+            child->key);
+      ret = -1;
+    }
+
+    if (ret < 0) {
+      INFO(PLUGIN_NAME ": %s:%d ret =%d", __FUNCTION__, __LINE__, ret);
+      return ret;
+    }
+  }
+
+  dpdk_tel_interval = plugin_get_interval();
+  return 0;
+}
+
+static int dpdk_telemetry_parse(json_t *stats, json_t *port) {
+  json_t *statsArrayObj;
+  int portid;
+
+  if (!stats) {
+    ERROR(PLUGIN_NAME ": Stats pointer is invalid");
+    return -1;
+  }
+
+  if (!port) {
+    ERROR(PLUGIN_NAME ":  Port pointer is invalid");
+    return -1;
+  }
+  portid = json_integer_value(port);
+
+  if (portid < -1) {
+    ERROR(PLUGIN_NAME ": portid is invalid");
+    return -1;
+  }
+
+  json_t *name, *value;
+  const char *name_string;
+  int statslen, i;
+  uint64_t value_int;
+  statslen = json_array_size(stats);
+  for (i = 0; i < statslen; i++) {
+    statsArrayObj = json_array_get(stats, i);
+    name = json_object_get(statsArrayObj, "name");
+    value = json_object_get(statsArrayObj, "value");
+    if (!name) {
+      ERROR(PLUGIN_NAME ": Request does not have name field");
+      return -1;
+    }
+    if (!json_is_string(name)) {
+      ERROR(PLUGIN_NAME ": Metric name is not a string");
+      return -1;
+    }
+    name_string = json_string_value(name);
+    if (!value) {
+      ERROR(PLUGIN_NAME ": Request does not have value name");
+      return -1;
+    }
+    if (!json_is_integer(value)) {
+      ERROR(PLUGIN_NAME ": Metric value is not an integer");
+      return -1;
+    }
+
+    char dev_name[DATA_MAX_NAME_LEN];
+    if (portid == -1)
+      snprintf(dev_name, sizeof(dev_name), "%s", name_string);
+    else
+      snprintf(dev_name, sizeof(dev_name), "%s.%d", name_string, portid);
+    value_int = json_integer_value(value);
+    value_t dpdk_telemetry_values[1];
+    value_list_t dpdk_telemetry_vl = VALUE_LIST_INIT;
+    dpdk_telemetry_values[0].counter = value_int;
+    dpdk_telemetry_vl.values = dpdk_telemetry_values;
+    dpdk_telemetry_vl.values_len = 1;
+    dpdk_telemetry_vl.time = cdtime();
+    snprintf(dpdk_telemetry_vl.host, sizeof(dpdk_telemetry_vl.host), "%s",
+             hostname_g);
+    snprintf(dpdk_telemetry_vl.plugin, sizeof(dpdk_telemetry_vl.plugin),
+             "dpdk_telemetry");
+    sstrncpy(dpdk_telemetry_vl.plugin_instance, dev_name,
+             sizeof(dpdk_telemetry_vl.plugin_instance));
+    snprintf(dpdk_telemetry_vl.type, sizeof(dpdk_telemetry_vl.type),
+             "dpdk_telemetry");
+    snprintf(dpdk_telemetry_vl.type_instance,
+             sizeof(dpdk_telemetry_vl.type_instance), "%s", name_string);
+
+    int ret = plugin_dispatch_values(&dpdk_telemetry_vl);
+    if (ret < 0) {
+      ERROR(PLUGIN_NAME ": Failed to dispatch values");
+      return -1;
+    }
+  }
+  return 0;
+}
+
+static int parse_json(char *buf) {
+
+  if (!buf) {
+    ERROR(PLUGIN_NAME ": buf pointer is invalid");
+    return -1;
+  }
+  json_error_t error;
+  json_t *root = json_loads(buf, 0, &error);
+  int arraylen, i;
+  json_t *status, *dataArray, *stats, *dataArrayObj;
+  stats = NULL;
+
+  if (!root) {
+    ERROR(PLUGIN_NAME ": Could not load JSON object from data passed in"
+                      " : %s",
+          error.text);
+    return -1;
+  } else if (!json_is_object(root)) {
+    ERROR(PLUGIN_NAME ": JSON Request is not a JSON object");
+    json_decref(root);
+    return -1;
+  }
+
+  status = json_object_get(root, "status_code");
+  if (!status) {
+    ERROR(PLUGIN_NAME ": Request does not have status field");
+    return -1;
+  } else if (!json_is_string(status)) {
+    ERROR(PLUGIN_NAME ": Status value is not a string");
+    return -1;
+  }
+  dataArray = json_object_get(root, "data");
+  if (!dataArray) {
+    ERROR(PLUGIN_NAME ": Request does not have data field");
+    return -1;
+  }
+  arraylen = json_array_size(dataArray);
+  if (!arraylen) {
+    ERROR(PLUGIN_NAME ": No data to get");
+    return -1;
+  }
+
+  for (i = 0; i < arraylen; i++) {
+    json_t *port;
+    dataArrayObj = json_array_get(dataArray, i);
+    port = json_object_get(dataArrayObj, "port");
+    stats = json_object_get(dataArrayObj, "stats");
+    if (!port) {
+      ERROR(PLUGIN_NAME ": Request does not have port field");
+      return -1;
+    }
+    if (!json_is_integer(port)) {
+      ERROR(PLUGIN_NAME ": Port value is not an integer");
+      return -1;
+    }
+
+    if (!stats) {
+      ERROR(PLUGIN_NAME ": Request does not have stats field");
+      return -1;
+    }
+    dpdk_telemetry_parse(stats, port);
+  }
+  return 0;
+}
+
+static int dpdk_telemetry_cleanup(void) {
+  close(client.s_send);
+  close(client.s_recv);
+  close(client.fd);
+  return 0;
+}
+
+static int dpdk_telemetry_socket_init(void) {
+  INFO(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
+  char message[BUF_SIZE];
+
+  /* Here we look up the length of the g_dpdk_path string
+   * If it has a length we use it, otherwise we fall back to default
+   * See dpdk_telemetry_config() for details
+   */
+  client.dpdk_path = (strlen(g_dpdk_path)) ? g_dpdk_path : DEFAULT_DPDK_PATH;
+  client.client_path =
+      (strlen(g_client_path)) ? g_client_path : DEFAULT_CLIENT_PATH;
+  client.s_send = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+  if (client.s_send < 0) {
+    ERROR(PLUGIN_NAME ": Failed to open socket errno(%d), error(%s)", errno,
+          strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  client.s_recv = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+  if (client.s_recv < 0) {
+    ERROR(PLUGIN_NAME ": Failed to open message socket errno(%d), error(%s)",
+          errno, strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  client.addr.sun_family = AF_UNIX;
+  snprintf(client.addr.sun_path, sizeof(client.addr.sun_path), "%s",
+           client.dpdk_path);
+  if (connect(client.s_send, (struct sockaddr *)&client.addr,
+              sizeof(client.addr)) < 0) {
+    ERROR(PLUGIN_NAME ": Failed to connect errno(%d), error(%s)", errno,
+          strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  client.addrs.sun_family = AF_UNIX;
+  snprintf(client.addrs.sun_path, sizeof(client.addrs.sun_path), "%s",
+           client.client_path);
+  unlink(client.client_path);
+  if (bind(client.s_recv, (struct sockaddr *)&client.addrs,
+           sizeof(client.addrs)) < 0) {
+    ERROR(PLUGIN_NAME ": Failed to bind errno(%d), error(%s)", errno,
+          strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  if (listen(client.s_recv, 1) < 0) {
+    ERROR(PLUGIN_NAME ": Listen failed errno(%d), error(%s)", errno,
+          strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  snprintf(message, sizeof(message),
+           "{\"action\":1,\"command\":\"clients\""
+           ",\"data\":{\"client_path\":\"%s\"}}",
+           client.client_path);
+  if (send(client.s_send, message, strlen(message), 0) < 0) {
+    ERROR(PLUGIN_NAME ": Could not send register message errno(%d), error(%s)",
+          errno, strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  client.fd = accept(client.s_recv, NULL, NULL);
+  if (client.fd < 0) {
+    ERROR(PLUGIN_NAME ": Failed to accept errno(%d), error(%s)", errno,
+          strerror(errno));
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  return 0;
+}
+
+static int dpdk_telemetry_shutdown(void) {
+  INFO(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
+  char msg[BUF_SIZE];
+  int ret;
+
+  snprintf(msg, sizeof(msg),
+           "{\"action\":2,\"command\":\"clients\""
+           ",\"data\":{\"client_path\":\"%s\"}}",
+           client.client_path);
+  ret = send(client.fd, msg, strlen(msg), 0);
+  if (ret < 0) {
+    ERROR(PLUGIN_NAME ": Could not send unregister message");
+    dpdk_telemetry_cleanup();
+    return -1;
+  }
+  dpdk_telemetry_cleanup();
+  return 0;
+}
+
+static int dpdk_telemetry_read(user_data_t *ud) {
+  INFO(PLUGIN_NAME ": %s:%d", __FUNCTION__, __LINE__);
+  char buffer[BUF_SIZE];
+  int bytes = 0, ret;
+  cdtime_t prev = 0;
+  char *json_string[MAX_COMMANDS] = {"{\"action\":0,\"command\":"
+                                     "\"ports_all_stat_values\",\"data\":null}",
+                                     "{\"action\":0,\"command\":"
+                                     "\"global_stat_values\",\"data\":null}"};
+  size_t size = sizeof(json_string) / sizeof(json_string[0]);
+
+  prev = cdtime();
+  for (int i = 0; i < size; i++) {
+    /* Exit the read function after passing the half of the
+     * plugin interval to avoid blocking of other collectd plugins.
+     */
+    if ((cdtime() - prev) > (dpdk_tel_interval / 2)) {
+      ERROR(PLUGIN_NAME ":Cannot read after the interval %d", __LINE__);
+      return -1;
+    }
+    while (1) {
+      /* Exit the read function after passing the half of the plugin
+       * interval in order to avoid blocking of other collectd
+       * plugins.
+       *
+       * For send() and parse_json() failures, try reconnecting
+       * to the socket, as we don't know what exactly happened on
+       * server side.
+       */
+      if ((cdtime() - prev) > (dpdk_tel_interval / 2)) {
+        ERROR(PLUGIN_NAME ":Cannot read after interval %d", __LINE__);
+        return -1;
+      }
+      if (send(client.fd, json_string[i], strlen(json_string[i]), 0) < 0) {
+        ERROR(PLUGIN_NAME ": Could not send stats errno(%d), error(%s)", errno,
+              strerror(errno));
+        if (errno == EBADF || errno == ECONNRESET || errno == ENOTCONN ||
+            errno == EPIPE) {
+          dpdk_telemetry_cleanup();
+          if (dpdk_telemetry_socket_init() < 0)
+            continue;
+        }
+      } else {
+        bytes = recv(client.fd, buffer, sizeof(buffer) - 1, 0);
+        if (bytes < 0) {
+          ERROR(PLUGIN_NAME ": Could not receive stats errno(%d), error(%s)",
+                errno, strerror(errno));
+          continue;
+        } else {
+          buffer[bytes] = '\0';
+          ret = parse_json(buffer);
+          if (ret < 0) {
+            ERROR(PLUGIN_NAME ": Parsing failed");
+            dpdk_telemetry_cleanup();
+            dpdk_telemetry_socket_init();
+            continue;
+          }
+          break;
+        }
+      }
+    }
+  }
+  return 0;
+}
+
+static int dpdk_telemetry_init(void) {
+  cdtime_t prev = 0;
+
+  prev = cdtime();
+  while (1) {
+    /* If server socket is already not running, try connecting
+     * for period of 6 seconds to provide resiliency.
+     * If server doesn't shows up even after  6seconds, mark the
+     * initialization as failure.
+     */
+    if ((cdtime() - prev) > dpdk_tel_interval) {
+      ERROR(PLUGIN_NAME ":Cannot retry initialization after the interval");
+      return -1;
+    }
+    if (dpdk_telemetry_socket_init() < 0) {
+      ERROR(PLUGIN_NAME ": Socket initialization failed");
+      continue;
+    }
+    break;
+  }
+  plugin_register_complex_read(NULL, PLUGIN_NAME, dpdk_telemetry_read,
+                               dpdk_tel_interval, NULL);
+  return 0;
+}
+
+void module_register(void) {
+  plugin_register_init(PLUGIN_NAME, dpdk_telemetry_init);
+  plugin_register_complex_config(PLUGIN_NAME, dpdk_telemetry_config);
+  plugin_register_shutdown(PLUGIN_NAME, dpdk_telemetry_shutdown);
+}
index 6c08936e948d7865c2c768f4cb47666c51a3788f..22cc5621b1fdd287b606435c5085c2097d81a845 100644 (file)
@@ -79,6 +79,7 @@ dns_transfer            value:DERIVE:0:U
 dns_update              value:DERIVE:0:U
 dns_zops                value:DERIVE:0:U
 domain_state            state:GAUGE:0:U, reason:GAUGE:0:U
+dpdk_telemetry          value:COUNTER:0:U
 drbd_resource           value:DERIVE:0:U
 duration                seconds:GAUGE:0:U
 email_check             value:GAUGE:0:U