]> git.ipfire.org Git - thirdparty/rrdtool-1.x.git/commitdiff
Add support for storing RRD in Ceph through librados 582/head
authorSimon Boulet <simon@nostalgeek.com>
Thu, 29 Jan 2015 21:30:01 +0000 (16:30 -0500)
committerSimon Boulet <simon@nostalgeek.com>
Tue, 3 Feb 2015 18:07:01 +0000 (13:07 -0500)
This patch adds support for creating, updating and retrieving RRD files
on a Ceph cluster. It adds a ceph// prefix to RRD file name (similar to
sql// used by libdbi). It uses librados to talk natively to Ceph.

configure.ac
doc/Makefile.am
doc/rrdrados.pod [new file with mode: 0644]
src/Makefile.am
src/rrd.h
src/rrd_create.c
src/rrd_open.c
src/rrd_rados.c [new file with mode: 0644]
src/rrd_rados.h [new file with mode: 0644]
src/rrd_restore.c
src/rrd_update.c

index 387083feb11eb099f3380f792daca7bef25b360e..f1dd366ff605bfbb10e31052395846471b9582c3 100644 (file)
@@ -542,6 +542,18 @@ AC_ARG_ENABLE(libdbi,AS_HELP_STRING([--disable-libdbi],[do not build in support
 
 AM_CONDITIONAL(BUILD_LIBDBI,[test $have_libdbi != no])
 
+have_librados=no
+
+AC_ARG_ENABLE(librados,AS_HELP_STRING([--disable-librados],[do not build in support for librados]),[],[
+    AC_CHECK_HEADER(rados/librados.h, [
+        AC_DEFINE(HAVE_LIBRADOS,[1],[have got librados installed])
+        LIBS="${LIBS} -lrados"
+        have_librados=yes
+    ])
+])
+
+AM_CONDITIONAL(BUILD_LIBRADOS,[test $have_librados != no])
+
 have_libwrap=no
 
 AC_ARG_ENABLE(libwrap, AS_HELP_STRING([--disable-libwrap], [do not build in support for libwrap (tcp wrapper)]),[],[ 
@@ -1055,6 +1067,7 @@ echo "          Build rrdcgi: $enable_rrdcgi"
 echo "       Build librrd MT: $enable_pthread"
 echo "           Use gettext: $USE_NLS"
 echo "           With libDBI: $have_libdbi"
+echo "         With librados: $have_librados"
 echo "          With libwrap: $have_libwrap"
 echo "      With systemd dir: $with_systemdsystemunitdir"
 echo
index 3a9a0395d513947b0b63693eeaf2f9ded7f8f7b4..ad097cf961e6e6bde960fdf1005ca88164277acb 100644 (file)
@@ -17,6 +17,10 @@ if BUILD_LIBDBI
   POD += rrdgraph_libdbi.pod
 endif
 
+if BUILD_LIBRADOS
+  POD += rrdrados.pod
+endif
+
 if BUILD_LUA
   POD += rrdlua.pod
 endif
diff --git a/doc/rrdrados.pod b/doc/rrdrados.pod
new file mode 100644 (file)
index 0000000..0c64554
--- /dev/null
@@ -0,0 +1,68 @@
+=head1 NAME
+
+rrdrados - Creating, updating and retrieving RRD files from Ceph
+
+=head1 SYNOPSIS
+
+E<lt>rrdfileE<gt> = B<ceph//E<lt>nameE<gt>>
+
+
+=head1 DESCRIPTION
+
+This module adds support for creating, updating and retrieving RRD files
+directly from a Ceph cluster using librados.
+
+It adds a ceph// prefix to RRD file name which is used to instruct rrdtool to
+operate on a file that is stored in Ceph.
+
+Currently the module expects to find the Ceph configuration file in the default
+location at /etc/ceph/ceph.conf. By default it uses Ceph client ID "admin" and a
+Ceph pool named "rrd".
+
+
+=head1 ENVIRONMENT VARIABLES
+
+=over 4
+
+=item B<CEPH_ID>
+
+Sets the Ceph Client ID to use when connecting. By default the client ID "admin"
+is used.
+
+=item B<CEPH_POOL>
+
+Sets the name of the Ceph Pool to connect to. By default, the pool "rrd" is
+used.
+
+=back
+
+
+=head1 EXAMPLES
+
+B<Creating an RRD file on Ceph>
+
+ rrdtool create ceph//temperature.rrd --step 300 \
+   DS:temp:GAUGE:600:-273:5000 \
+   RRA:AVERAGE:0.5:1:1200 \
+   RRA:MIN:0.5:12:2400 \
+   RRA:MAX:0.5:12:2400 \
+   RRA:AVERAGE:0.5:12:2400
+
+
+B<Importing an existing RRD into Ceph>
+
+ rrdtool dump existing.rrd | rrdtool restore - ceph//new.rrd
+
+Or you could also copy the RRD file directly into Ceph using the rados command
+line utility.
+
+
+B<Retrieving RRD data from Ceph>
+
+ rrdtool fetch ceph//file.rrd AVERAGE
+
+
+=head1 AUTHOR
+
+Simon Boulet E<lt>simon@nostalgeek.comE<gt>
+
index 1496d8d5fb0d206ee666b79f9cf5c6a037b2e728..b9f846ae473e3e5cac2c29c0fd7713cb6e0ff83f 100644 (file)
@@ -83,6 +83,10 @@ if BUILD_LIBDBI
 RRD_C_FILES += rrd_fetch_libdbi.c
 endif
 
+if BUILD_LIBRADOS
+UPD_C_FILES += rrd_rados.c
+endif
+
 if BUILD_GETOPT
 noinst_HEADERS += rrd_getopt.h
 UPD_C_FILES += rrd_getopt.c rrd_getopt1.c
index 475720a4de1625adca6f0464e0c1b307e6b2a3db..f7c1dc791701a69c99b74362f8e75cc1e593df71 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -101,6 +101,9 @@ struct rrd_t;
         size_t     pos;  /* current pos in file */
         void      *pvt;
        struct rrd_t *rrd;              /* the corresponding RRD structure, if any */
+#ifdef HAVE_LIBRADOS
+        struct rrd_rados_t *rados;
+#endif
     } rrd_file_t;
 
 /* information used for the conventional file access methods */
index 50ff122a41e6aa919c27cc719985c08da3683c88..2b77097931fd952d01633a07e3a3ec37ca15a9d7 100644 (file)
 # include <process.h>
 #endif
 
+#ifdef HAVE_LIBRADOS
+#include "rrd_rados.h"
+#endif
+
 static void reset_pdp_prep(rrd_t *rrd);
 static int rrd_init_data(rrd_t *rrd);
 static int rrd_prefill_data(rrd_t *rrd, const GList *sources_rrd_files,
@@ -1317,6 +1321,13 @@ int write_rrd(const char *outfilename, rrd_t *out) {
     char *tmpfilename = NULL;
 
     /* write out the new file */
+#ifdef HAVE_LIBRADOS
+    if (strncmp("ceph//", outfilename, 6) == 0) {
+      rc = rrd_rados_create(outfilename + 6, out);
+      goto done;
+    }
+#endif
+
     FILE *fh = NULL;
     if (strcmp(outfilename, "-") == 0) {
        fh = stdout;
index 5a22ff17a5583e2fc44ee9978506d8b4cbc425a7..614cc3c661627ecd4f384ceb66fe2980c312cb31 100644 (file)
 #include <utime.h>
 #endif
 
+#ifdef HAVE_LIBRADOS
+#include "rrd_rados.h"
+#endif
+
 #define MEMBLK 8192
 
 #ifdef WIN32
@@ -49,7 +53,7 @@
 /* the cast to void* is there to avoid this warning seen on ia64 with certain
    versions of gcc: 'cast increases required alignment of target type'
 */
-#define __rrd_read(dst, dst_t, cnt) { \
+#define __rrd_read_mmap(dst, dst_t, cnt) { \
        size_t wanted = sizeof(dst_t)*(cnt); \
        if (offset + wanted > rrd_file->file_len) { \
                rrd_set_error("reached EOF while loading header " #dst); \
@@ -59,7 +63,7 @@
        offset += wanted; \
     }
 #else
-#define __rrd_read(dst, dst_t, cnt) { \
+#define __rrd_read_seq(dst, dst_t, cnt) { \
        size_t wanted = sizeof(dst_t)*(cnt); \
         size_t got; \
        if ((dst = (dst_t*)malloc(wanted)) == NULL) { \
     }
 #endif
 
+#ifdef HAVE_LIBRADOS
+#define __rrd_read_rados(dst, dst_t, cnt) { \
+       size_t wanted = sizeof(dst_t)*(cnt); \
+        size_t got; \
+       if ((dst = (dst_t*)malloc(wanted)) == NULL) { \
+               rrd_set_error(#dst " malloc"); \
+               goto out_nullify_head; \
+       } \
+        got = rrd_rados_read(rrd_file->rados, dst, wanted, offset); \
+       if (got != wanted) { \
+               rrd_set_error("short read while reading header " #dst); \
+                goto out_nullify_head; \
+       } \
+       offset += got; \
+    }
+#endif
+
+#if defined(HAVE_LIBRADOS) && defined(HAVE_MMAP)
+#define __rrd_read(dst, dst_t, cnt) { \
+    if (rrd_file->rados) \
+      __rrd_read_rados(dst, dst_t, cnt) \
+    else \
+      __rrd_read_mmap(dst, dst_t, cnt) \
+    }
+#elif defined(HAVE_LIBRADOS) && !defined(HAVE_MMAP)
+    if (rrd_file->rados) \
+      __rrd_read_rados(dst, dst_t, cnt) \
+    else \
+      __rrd_read_seq(dst, dst_t, cnt) \
+    }
+#elif defined(HAVE_MMAP)
+#define __rrd_read(dst, dst_t, cnt) \
+    __rrd_read_mmap(dst, dst_t, cnt)
+#else
+#define __rrd_read(dst, dst_t, cnt) \
+    __rrd_read_seq(dst, dst_t, cnt)
+#endif
+
 /* get the address of the start of this page */
 #if defined USE_MADVISE || defined HAVE_POSIX_FADVISE
 #ifndef PAGE_START
@@ -150,6 +192,19 @@ rrd_file_t *rrd_open(
     }
 #endif
 
+#ifdef HAVE_LIBRADOS
+    if (strncmp("ceph//", file_name, 6) == 0) {
+      rrd_file->rados = rrd_rados_open(file_name + 6);
+      if (rrd_file->rados == NULL)
+          goto out_free;
+
+      if (rdwr & RRD_CREAT)
+          goto out_done;
+
+      goto read_check;
+    }
+#endif
+
 #ifdef HAVE_MMAP
     rrd_simple_file->mm_prot = PROT_READ;
     rrd_simple_file->mm_flags = 0;
@@ -316,6 +371,10 @@ rrd_file_t *rrd_open(
     }
 #endif
 
+#ifdef HAVE_LIBRADOS
+read_check:
+#endif
+
     __rrd_read(rrd->stat_head, stat_head_t,
                1);
 
@@ -378,6 +437,13 @@ rrd_file_t *rrd_open(
       size_t  correct_len = rrd_file->header_len +
         sizeof(rrd_value_t) * row_cnt * rrd->stat_head->ds_cnt;
 
+#ifdef HAVE_LIBRADOS
+      /* skip length checking for rados file */
+      if (rrd_file->rados) {
+        rrd_file->file_len = correct_len;
+      }
+#endif
+
       if (correct_len > rrd_file->file_len)
       {
         rrd_set_error("'%s' is too small (should be %ld bytes)",
@@ -478,6 +544,10 @@ int rrd_lock(
     (void)rrd_file;
     return 0;
 #else
+#ifdef HAVE_LIBRADOS
+    if (rrd_file->rados)
+      return rrd_rados_lock(rrd_file->rados);
+#endif
     int       rcstat;
     rrd_simple_file_t *rrd_simple_file;
     rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt;
@@ -599,8 +669,14 @@ int rrd_close(
     ret = munmap(rrd_simple_file->file_start, rrd_file->file_len);
     if (ret != 0)
         rrd_set_error("munmap rrd_file: %s", rrd_strerror(errno));
+#endif
+#ifdef HAVE_LIBRADOS
+    if (rrd_file->rados)
+        ret = rrd_rados_close(rrd_file->rados);
+    else
 #endif
     ret = close(rrd_simple_file->fd);
+
     if (ret != 0)
         rrd_set_error("closing file: %s", rrd_strerror(errno));
     free(rrd_file->pvt);
@@ -617,6 +693,14 @@ off_t rrd_seek(
     off_t off,
     int whence)
 {
+#ifdef HAVE_LIBRADOS
+  /* no seek for rados */
+  if (rrd_file->rados) {
+    rrd_file->pos = off;
+    return 0;
+  }
+#endif
+
     off_t     ret = 0;
 #ifndef HAVE_MMAP
     rrd_simple_file_t *rrd_simple_file;
@@ -658,6 +742,14 @@ ssize_t rrd_read(
     void *buf,
     size_t count)
 {
+#ifdef HAVE_LIBRADOS
+    if (rrd_file->rados) {
+        ssize_t ret = rrd_rados_read(rrd_file->rados, buf, count, rrd_file->pos);
+        if (ret > 0)
+            rrd_file->pos += ret;
+        return ret;
+    }
+#endif
     rrd_simple_file_t *rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt;
 #ifdef HAVE_MMAP
     size_t    _cnt = count;
@@ -697,6 +789,14 @@ ssize_t rrd_write(
     const void *buf,
     size_t count)
 {
+#ifdef HAVE_LIBRADOS
+    if (rrd_file->rados) {
+      size_t ret = rrd_rados_write(rrd_file->rados, buf, count, rrd_file->pos);
+      if (ret > 0)
+        rrd_file->pos += count;
+      return ret;
+    }
+#endif
     rrd_simple_file_t *rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt;
 #ifdef HAVE_MMAP
     size_t old_size = rrd_file->file_len;
diff --git a/src/rrd_rados.c b/src/rrd_rados.c
new file mode 100644 (file)
index 0000000..9fe32df
--- /dev/null
@@ -0,0 +1,168 @@
+#include "rrd_rados.h"
+
+rrd_rados_t* rrd_rados_open(const char *oid) {
+    int err;
+    rrd_rados_t *rrd_rados;
+
+    rrd_rados = (rrd_rados_t*)malloc(sizeof(rrd_rados_t));
+    if (rrd_rados == NULL) {
+      rrd_set_error("allocating rrd_rados descriptor");
+      goto err;
+    }
+
+    memset(rrd_rados, 0, sizeof(rrd_rados_t));
+    rrd_rados->oid = oid;
+
+    const char *ceph_id = getenv("CEPH_ID");
+    if (!ceph_id) ceph_id = "admin";
+    err = rados_create(&rrd_rados->cluster, ceph_id);
+    if (err < 0) {
+        rrd_set_error("cannot create cluster handle: %s", strerror(-err));
+        goto err;
+    }
+
+    err = rados_conf_read_file(rrd_rados->cluster, "/etc/ceph/ceph.conf");
+    if (err < 0) {
+        rrd_set_error("cannot read config file: %s", strerror(-err));
+        goto err;
+    }
+
+    err = rados_connect(rrd_rados->cluster);
+    if (err < 0) {
+        rrd_set_error("cannot connect to cluster: %s", strerror(-err));
+        goto err;
+    }
+
+    rrd_rados->write_op = rados_create_write_op();
+    if (rrd_rados->write_op == NULL) {
+        rrd_set_error("allocating rados_write_op_t");
+        goto err;
+    }
+
+    const char *ceph_pool = getenv("CEPH_POOL");
+    if (!ceph_pool) ceph_pool = "rrd";
+    err = rados_ioctx_create(rrd_rados->cluster, ceph_pool, &rrd_rados->ioctx);
+    if (err < 0) {
+        rrd_set_error("cannot open rados pool: %s", strerror(-err));
+        goto err;
+    }
+
+    return rrd_rados;
+err:
+    if (rrd_rados->write_op)
+      rados_release_write_op(rrd_rados->write_op);
+    if (rrd_rados->ioctx)
+      rados_ioctx_destroy(rrd_rados->ioctx);
+    if (rrd_rados->cluster)
+      rados_shutdown(rrd_rados->cluster);
+    free(rrd_rados);
+    return NULL;
+}
+
+int rrd_rados_close(rrd_rados_t *rrd_rados) {
+    rrd_rados_flush(rrd_rados);
+
+    /* release lock on close, see rrd_rados_lock() */
+    if (rrd_rados->lock) {
+      rados_unlock(rrd_rados->ioctx, rrd_rados->oid, "rrdtool", "");
+    }
+
+    rados_release_write_op(rrd_rados->write_op);
+    rados_ioctx_destroy(rrd_rados->ioctx);
+    rados_shutdown(rrd_rados->cluster);
+    free(rrd_rados);
+
+    return 0;
+}
+
+int rrd_rados_create(const char *oid, rrd_t *rrd) {
+    int err;
+
+    rrd_rados_t *rrd_rados = rrd_rados_open(oid);
+    if (rrd_rados == NULL)
+      return -1;
+
+    rados_write_op_truncate(rrd_rados->write_op, 0);
+
+    /* See write_fh() defined in rrd_create.c */
+
+    if (atoi(rrd->stat_head->version) < 3) {
+        /* we output 3 or higher */
+        strcpy(rrd->stat_head->version, "0003");
+    }
+
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->stat_head, sizeof(stat_head_t));
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->ds_def, sizeof(ds_def_t) * rrd->stat_head->ds_cnt);
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->rra_def, sizeof(rra_def_t) * rrd->stat_head->rra_cnt);
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->live_head, sizeof(live_head_t));
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->pdp_prep, sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt);
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->cdp_prep,
+                    sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt * rrd->stat_head->ds_cnt);
+    rados_write_op_append(rrd_rados->write_op, (char*)rrd->rra_ptr, sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt);
+
+    /* calculate the number of rrd_values to dump */
+    int rra_offset = 0;
+    for (unsigned int i = 0; i < rrd->stat_head->rra_cnt; i++) {
+        unsigned long num_rows = rrd->rra_def[i].row_cnt;
+        unsigned long ds_cnt = rrd->stat_head->ds_cnt;
+        if (num_rows > 0){
+            rados_write_op_append(rrd_rados->write_op, (char*)(rrd->rrd_value + rra_offset * ds_cnt),
+                                  sizeof(rrd_value_t) * num_rows * ds_cnt);
+
+            rra_offset += num_rows;
+        }
+    }
+
+    err = rrd_rados_flush(rrd_rados);
+    if (err < 0)
+        rrd_set_error("rados flush: %s", strerror(-err));
+
+    rrd_rados_close(rrd_rados);
+
+    return err;
+}
+
+size_t rrd_rados_read(rrd_rados_t *rrd_rados, void *data, size_t len, uint64_t offset) {
+    int ret;
+
+    ret = rados_read(rrd_rados->ioctx, rrd_rados->oid, data, len, offset);
+
+    if (ret < 0)
+        rrd_set_error("rados read: %s", strerror(-ret));
+
+    return ret;
+}
+
+size_t rrd_rados_write(rrd_rados_t *rrd_rados, const void *data, size_t len, uint64_t offset) {
+    /* writes are queued in rados write_op and written atomically on
+       close or when explicitly calling flush */
+    rados_write_op_write(rrd_rados->write_op, (char*)data, len, offset);
+
+    /* writes aren't confirmed until flushed */
+    return len;
+}
+
+int rrd_rados_flush(rrd_rados_t *rrd_rados) {
+    return rados_write_op_operate(rrd_rados->write_op, rrd_rados->ioctx, rrd_rados->oid,
+                                  NULL, LIBRADOS_OPERATION_NOFLAG);
+}
+
+int rrd_rados_lock(rrd_rados_t *rrd_rados) {
+    int ret;
+
+    /* prevent dead lock by setting a maximum lock duration */
+    struct timeval tv;
+    tv.tv_sec = 2; // 2 seconds
+    tv.tv_usec = 0;
+
+    ret = rados_lock_exclusive(rrd_rados->ioctx, rrd_rados->oid, "rrdtool", "", "", &tv,  0);
+
+    if (ret < 0) {
+        rrd_set_error("rados lock: %s", strerror(-ret));
+    } else {
+      /* set flag to instruct rrd_rados_close() to release lock */
+      rrd_rados->lock = 1;
+    }
+
+    return ret;
+}
diff --git a/src/rrd_rados.h b/src/rrd_rados.h
new file mode 100644 (file)
index 0000000..135536a
--- /dev/null
@@ -0,0 +1,24 @@
+#ifndef RRD_RADOS_H
+#define RRD_RADOS_H
+
+#include <rados/librados.h>
+
+#include "rrd_tool.h"
+
+typedef struct rrd_rados_t {
+    rados_t cluster;
+    rados_ioctx_t ioctx;
+    const char *oid;
+    rados_write_op_t write_op;
+    int lock;
+} rrd_rados_t;
+
+rrd_rados_t* rrd_rados_open(const char *oid);
+int rrd_rados_close(rrd_rados_t *rrd_rados);
+int rrd_rados_create(const char *oid, rrd_t *rrd);
+size_t rrd_rados_read(rrd_rados_t *rrd_rados, void *data, size_t len, uint64_t offset);
+size_t rrd_rados_write(rrd_rados_t *rrd_rados, const void *data, size_t len, uint64_t offset);
+int rrd_rados_flush(rrd_rados_t *rrd_rados);
+int rrd_rados_lock(rrd_rados_t *rrd_rados);
+
+#endif
index 1475f5e643ef8ff63bab264c1a3fa52cea25624b..87c95f6af690ae46c58b7b54de7b0095b3dc9ea1 100644 (file)
 # define close _close
 #endif
 
+#ifdef HAVE_LIBRADOS
+#include "rrd_rados.h"
+#endif
+
 
 #define ARRAY_LENGTH(a) (sizeof (a) / sizeof ((a)[0]))
 
@@ -1324,6 +1328,12 @@ int write_file(
 {
     FILE     *fh;
 
+#ifdef HAVE_LIBRADOS
+    if (strncmp("ceph//", file_name, 6) == 0) {
+      return rrd_rados_create(file_name + 6, rrd);
+    }
+#endif
+
     if (strcmp("-", file_name) == 0)
         fh = stdout;
     else {
index 7b2d8f1dbc09736c670acc743696f1469fa8e36e..f6314b02378cdfed41df2ad932518bde0e0e160b 100644 (file)
@@ -259,7 +259,7 @@ static int smooth_all_rras(
     rrd_file_t *rrd_file,
     unsigned long rra_begin);
 
-#ifndef HAVE_MMAP
+#if !defined(HAVE_MMAP) || defined(HAVE_LIBRADOS)
 static int write_changes_to_disk(
     rrd_t *rrd,
     rrd_file_t *rrd_file,
@@ -931,6 +931,10 @@ int _rrd_updatex(
     if (rrd_test_error()) {
         goto err_free_structures;
     }
+#ifdef HAVE_LIBRADOS
+    if (rrd_file->rados)
+      write_changes_to_disk(&rrd, rrd_file, version);
+#endif
 #ifndef HAVE_MMAP
     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
         goto err_free_structures;
@@ -2405,7 +2409,7 @@ static int smooth_all_rras(
     return 0;
 }
 
-#ifndef HAVE_MMAP
+#if !defined(HAVE_MMAP) || defined(HAVE_LIBRADOS)
 /*
  * Flush changes to disk (unless we're using mmap)
  *