From e0ec619bfa066e136afef5fd0969fcc94366f04c Mon Sep 17 00:00:00 2001 From: Simon Boulet Date: Thu, 29 Jan 2015 16:30:01 -0500 Subject: [PATCH] Add support for storing RRD in Ceph through librados This patch adds support for creating, updating and retrieving RRD files on a Ceph cluster. It adds a ceph// prefix to RRD file name (similar to sql// used by libdbi). It uses librados to talk natively to Ceph. --- configure.ac | 13 ++++ doc/Makefile.am | 4 ++ doc/rrdrados.pod | 68 +++++++++++++++++++ src/Makefile.am | 4 ++ src/rrd.h | 3 + src/rrd_create.c | 11 +++ src/rrd_open.c | 104 +++++++++++++++++++++++++++- src/rrd_rados.c | 168 ++++++++++++++++++++++++++++++++++++++++++++++ src/rrd_rados.h | 24 +++++++ src/rrd_restore.c | 10 +++ src/rrd_update.c | 8 ++- 11 files changed, 413 insertions(+), 4 deletions(-) create mode 100644 doc/rrdrados.pod create mode 100644 src/rrd_rados.c create mode 100644 src/rrd_rados.h diff --git a/configure.ac b/configure.ac index 387083fe..f1dd366f 100644 --- a/configure.ac +++ b/configure.ac @@ -542,6 +542,18 @@ AC_ARG_ENABLE(libdbi,AS_HELP_STRING([--disable-libdbi],[do not build in support AM_CONDITIONAL(BUILD_LIBDBI,[test $have_libdbi != no]) +have_librados=no + +AC_ARG_ENABLE(librados,AS_HELP_STRING([--disable-librados],[do not build in support for librados]),[],[ + AC_CHECK_HEADER(rados/librados.h, [ + AC_DEFINE(HAVE_LIBRADOS,[1],[have got librados installed]) + LIBS="${LIBS} -lrados" + have_librados=yes + ]) +]) + +AM_CONDITIONAL(BUILD_LIBRADOS,[test $have_librados != no]) + have_libwrap=no AC_ARG_ENABLE(libwrap, AS_HELP_STRING([--disable-libwrap], [do not build in support for libwrap (tcp wrapper)]),[],[ @@ -1055,6 +1067,7 @@ echo " Build rrdcgi: $enable_rrdcgi" echo " Build librrd MT: $enable_pthread" echo " Use gettext: $USE_NLS" echo " With libDBI: $have_libdbi" +echo " With librados: $have_librados" echo " With libwrap: $have_libwrap" echo " With systemd dir: $with_systemdsystemunitdir" echo diff --git a/doc/Makefile.am b/doc/Makefile.am index 3a9a0395..ad097cf9 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -17,6 +17,10 @@ if BUILD_LIBDBI POD += rrdgraph_libdbi.pod endif +if BUILD_LIBRADOS + POD += rrdrados.pod +endif + if BUILD_LUA POD += rrdlua.pod endif diff --git a/doc/rrdrados.pod b/doc/rrdrados.pod new file mode 100644 index 00000000..0c645548 --- /dev/null +++ b/doc/rrdrados.pod @@ -0,0 +1,68 @@ +=head1 NAME + +rrdrados - Creating, updating and retrieving RRD files from Ceph + +=head1 SYNOPSIS + +ErrdfileE = BnameE> + + +=head1 DESCRIPTION + +This module adds support for creating, updating and retrieving RRD files +directly from a Ceph cluster using librados. + +It adds a ceph// prefix to RRD file name which is used to instruct rrdtool to +operate on a file that is stored in Ceph. + +Currently the module expects to find the Ceph configuration file in the default +location at /etc/ceph/ceph.conf. By default it uses Ceph client ID "admin" and a +Ceph pool named "rrd". + + +=head1 ENVIRONMENT VARIABLES + +=over 4 + +=item B + +Sets the Ceph Client ID to use when connecting. By default the client ID "admin" +is used. + +=item B + +Sets the name of the Ceph Pool to connect to. By default, the pool "rrd" is +used. + +=back + + +=head1 EXAMPLES + +B + + rrdtool create ceph//temperature.rrd --step 300 \ + DS:temp:GAUGE:600:-273:5000 \ + RRA:AVERAGE:0.5:1:1200 \ + RRA:MIN:0.5:12:2400 \ + RRA:MAX:0.5:12:2400 \ + RRA:AVERAGE:0.5:12:2400 + + +B + + rrdtool dump existing.rrd | rrdtool restore - ceph//new.rrd + +Or you could also copy the RRD file directly into Ceph using the rados command +line utility. + + +B + + rrdtool fetch ceph//file.rrd AVERAGE + + +=head1 AUTHOR + +Simon Boulet Esimon@nostalgeek.comE + diff --git a/src/Makefile.am b/src/Makefile.am index 1496d8d5..b9f846ae 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -83,6 +83,10 @@ if BUILD_LIBDBI RRD_C_FILES += rrd_fetch_libdbi.c endif +if BUILD_LIBRADOS +UPD_C_FILES += rrd_rados.c +endif + if BUILD_GETOPT noinst_HEADERS += rrd_getopt.h UPD_C_FILES += rrd_getopt.c rrd_getopt1.c diff --git a/src/rrd.h b/src/rrd.h index 475720a4..f7c1dc79 100644 --- a/src/rrd.h +++ b/src/rrd.h @@ -101,6 +101,9 @@ struct rrd_t; size_t pos; /* current pos in file */ void *pvt; struct rrd_t *rrd; /* the corresponding RRD structure, if any */ +#ifdef HAVE_LIBRADOS + struct rrd_rados_t *rados; +#endif } rrd_file_t; /* information used for the conventional file access methods */ diff --git a/src/rrd_create.c b/src/rrd_create.c index 50ff122a..2b770979 100644 --- a/src/rrd_create.c +++ b/src/rrd_create.c @@ -44,6 +44,10 @@ # include #endif +#ifdef HAVE_LIBRADOS +#include "rrd_rados.h" +#endif + static void reset_pdp_prep(rrd_t *rrd); static int rrd_init_data(rrd_t *rrd); static int rrd_prefill_data(rrd_t *rrd, const GList *sources_rrd_files, @@ -1317,6 +1321,13 @@ int write_rrd(const char *outfilename, rrd_t *out) { char *tmpfilename = NULL; /* write out the new file */ +#ifdef HAVE_LIBRADOS + if (strncmp("ceph//", outfilename, 6) == 0) { + rc = rrd_rados_create(outfilename + 6, out); + goto done; + } +#endif + FILE *fh = NULL; if (strcmp(outfilename, "-") == 0) { fh = stdout; diff --git a/src/rrd_open.c b/src/rrd_open.c index 5a22ff17..614cc3c6 100644 --- a/src/rrd_open.c +++ b/src/rrd_open.c @@ -21,6 +21,10 @@ #include #endif +#ifdef HAVE_LIBRADOS +#include "rrd_rados.h" +#endif + #define MEMBLK 8192 #ifdef WIN32 @@ -49,7 +53,7 @@ /* the cast to void* is there to avoid this warning seen on ia64 with certain versions of gcc: 'cast increases required alignment of target type' */ -#define __rrd_read(dst, dst_t, cnt) { \ +#define __rrd_read_mmap(dst, dst_t, cnt) { \ size_t wanted = sizeof(dst_t)*(cnt); \ if (offset + wanted > rrd_file->file_len) { \ rrd_set_error("reached EOF while loading header " #dst); \ @@ -59,7 +63,7 @@ offset += wanted; \ } #else -#define __rrd_read(dst, dst_t, cnt) { \ +#define __rrd_read_seq(dst, dst_t, cnt) { \ size_t wanted = sizeof(dst_t)*(cnt); \ size_t got; \ if ((dst = (dst_t*)malloc(wanted)) == NULL) { \ @@ -75,6 +79,44 @@ } #endif +#ifdef HAVE_LIBRADOS +#define __rrd_read_rados(dst, dst_t, cnt) { \ + size_t wanted = sizeof(dst_t)*(cnt); \ + size_t got; \ + if ((dst = (dst_t*)malloc(wanted)) == NULL) { \ + rrd_set_error(#dst " malloc"); \ + goto out_nullify_head; \ + } \ + got = rrd_rados_read(rrd_file->rados, dst, wanted, offset); \ + if (got != wanted) { \ + rrd_set_error("short read while reading header " #dst); \ + goto out_nullify_head; \ + } \ + offset += got; \ + } +#endif + +#if defined(HAVE_LIBRADOS) && defined(HAVE_MMAP) +#define __rrd_read(dst, dst_t, cnt) { \ + if (rrd_file->rados) \ + __rrd_read_rados(dst, dst_t, cnt) \ + else \ + __rrd_read_mmap(dst, dst_t, cnt) \ + } +#elif defined(HAVE_LIBRADOS) && !defined(HAVE_MMAP) + if (rrd_file->rados) \ + __rrd_read_rados(dst, dst_t, cnt) \ + else \ + __rrd_read_seq(dst, dst_t, cnt) \ + } +#elif defined(HAVE_MMAP) +#define __rrd_read(dst, dst_t, cnt) \ + __rrd_read_mmap(dst, dst_t, cnt) +#else +#define __rrd_read(dst, dst_t, cnt) \ + __rrd_read_seq(dst, dst_t, cnt) +#endif + /* get the address of the start of this page */ #if defined USE_MADVISE || defined HAVE_POSIX_FADVISE #ifndef PAGE_START @@ -150,6 +192,19 @@ rrd_file_t *rrd_open( } #endif +#ifdef HAVE_LIBRADOS + if (strncmp("ceph//", file_name, 6) == 0) { + rrd_file->rados = rrd_rados_open(file_name + 6); + if (rrd_file->rados == NULL) + goto out_free; + + if (rdwr & RRD_CREAT) + goto out_done; + + goto read_check; + } +#endif + #ifdef HAVE_MMAP rrd_simple_file->mm_prot = PROT_READ; rrd_simple_file->mm_flags = 0; @@ -316,6 +371,10 @@ rrd_file_t *rrd_open( } #endif +#ifdef HAVE_LIBRADOS +read_check: +#endif + __rrd_read(rrd->stat_head, stat_head_t, 1); @@ -378,6 +437,13 @@ rrd_file_t *rrd_open( size_t correct_len = rrd_file->header_len + sizeof(rrd_value_t) * row_cnt * rrd->stat_head->ds_cnt; +#ifdef HAVE_LIBRADOS + /* skip length checking for rados file */ + if (rrd_file->rados) { + rrd_file->file_len = correct_len; + } +#endif + if (correct_len > rrd_file->file_len) { rrd_set_error("'%s' is too small (should be %ld bytes)", @@ -478,6 +544,10 @@ int rrd_lock( (void)rrd_file; return 0; #else +#ifdef HAVE_LIBRADOS + if (rrd_file->rados) + return rrd_rados_lock(rrd_file->rados); +#endif int rcstat; rrd_simple_file_t *rrd_simple_file; rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt; @@ -599,8 +669,14 @@ int rrd_close( ret = munmap(rrd_simple_file->file_start, rrd_file->file_len); if (ret != 0) rrd_set_error("munmap rrd_file: %s", rrd_strerror(errno)); +#endif +#ifdef HAVE_LIBRADOS + if (rrd_file->rados) + ret = rrd_rados_close(rrd_file->rados); + else #endif ret = close(rrd_simple_file->fd); + if (ret != 0) rrd_set_error("closing file: %s", rrd_strerror(errno)); free(rrd_file->pvt); @@ -617,6 +693,14 @@ off_t rrd_seek( off_t off, int whence) { +#ifdef HAVE_LIBRADOS + /* no seek for rados */ + if (rrd_file->rados) { + rrd_file->pos = off; + return 0; + } +#endif + off_t ret = 0; #ifndef HAVE_MMAP rrd_simple_file_t *rrd_simple_file; @@ -658,6 +742,14 @@ ssize_t rrd_read( void *buf, size_t count) { +#ifdef HAVE_LIBRADOS + if (rrd_file->rados) { + ssize_t ret = rrd_rados_read(rrd_file->rados, buf, count, rrd_file->pos); + if (ret > 0) + rrd_file->pos += ret; + return ret; + } +#endif rrd_simple_file_t *rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt; #ifdef HAVE_MMAP size_t _cnt = count; @@ -697,6 +789,14 @@ ssize_t rrd_write( const void *buf, size_t count) { +#ifdef HAVE_LIBRADOS + if (rrd_file->rados) { + size_t ret = rrd_rados_write(rrd_file->rados, buf, count, rrd_file->pos); + if (ret > 0) + rrd_file->pos += count; + return ret; + } +#endif rrd_simple_file_t *rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt; #ifdef HAVE_MMAP size_t old_size = rrd_file->file_len; diff --git a/src/rrd_rados.c b/src/rrd_rados.c new file mode 100644 index 00000000..9fe32df7 --- /dev/null +++ b/src/rrd_rados.c @@ -0,0 +1,168 @@ +#include "rrd_rados.h" + +rrd_rados_t* rrd_rados_open(const char *oid) { + int err; + rrd_rados_t *rrd_rados; + + rrd_rados = (rrd_rados_t*)malloc(sizeof(rrd_rados_t)); + if (rrd_rados == NULL) { + rrd_set_error("allocating rrd_rados descriptor"); + goto err; + } + + memset(rrd_rados, 0, sizeof(rrd_rados_t)); + rrd_rados->oid = oid; + + const char *ceph_id = getenv("CEPH_ID"); + if (!ceph_id) ceph_id = "admin"; + err = rados_create(&rrd_rados->cluster, ceph_id); + if (err < 0) { + rrd_set_error("cannot create cluster handle: %s", strerror(-err)); + goto err; + } + + err = rados_conf_read_file(rrd_rados->cluster, "/etc/ceph/ceph.conf"); + if (err < 0) { + rrd_set_error("cannot read config file: %s", strerror(-err)); + goto err; + } + + err = rados_connect(rrd_rados->cluster); + if (err < 0) { + rrd_set_error("cannot connect to cluster: %s", strerror(-err)); + goto err; + } + + rrd_rados->write_op = rados_create_write_op(); + if (rrd_rados->write_op == NULL) { + rrd_set_error("allocating rados_write_op_t"); + goto err; + } + + const char *ceph_pool = getenv("CEPH_POOL"); + if (!ceph_pool) ceph_pool = "rrd"; + err = rados_ioctx_create(rrd_rados->cluster, ceph_pool, &rrd_rados->ioctx); + if (err < 0) { + rrd_set_error("cannot open rados pool: %s", strerror(-err)); + goto err; + } + + return rrd_rados; +err: + if (rrd_rados->write_op) + rados_release_write_op(rrd_rados->write_op); + if (rrd_rados->ioctx) + rados_ioctx_destroy(rrd_rados->ioctx); + if (rrd_rados->cluster) + rados_shutdown(rrd_rados->cluster); + free(rrd_rados); + return NULL; +} + +int rrd_rados_close(rrd_rados_t *rrd_rados) { + rrd_rados_flush(rrd_rados); + + /* release lock on close, see rrd_rados_lock() */ + if (rrd_rados->lock) { + rados_unlock(rrd_rados->ioctx, rrd_rados->oid, "rrdtool", ""); + } + + rados_release_write_op(rrd_rados->write_op); + rados_ioctx_destroy(rrd_rados->ioctx); + rados_shutdown(rrd_rados->cluster); + free(rrd_rados); + + return 0; +} + +int rrd_rados_create(const char *oid, rrd_t *rrd) { + int err; + + rrd_rados_t *rrd_rados = rrd_rados_open(oid); + if (rrd_rados == NULL) + return -1; + + rados_write_op_truncate(rrd_rados->write_op, 0); + + /* See write_fh() defined in rrd_create.c */ + + if (atoi(rrd->stat_head->version) < 3) { + /* we output 3 or higher */ + strcpy(rrd->stat_head->version, "0003"); + } + + rados_write_op_append(rrd_rados->write_op, (char*)rrd->stat_head, sizeof(stat_head_t)); + rados_write_op_append(rrd_rados->write_op, (char*)rrd->ds_def, sizeof(ds_def_t) * rrd->stat_head->ds_cnt); + rados_write_op_append(rrd_rados->write_op, (char*)rrd->rra_def, sizeof(rra_def_t) * rrd->stat_head->rra_cnt); + rados_write_op_append(rrd_rados->write_op, (char*)rrd->live_head, sizeof(live_head_t)); + rados_write_op_append(rrd_rados->write_op, (char*)rrd->pdp_prep, sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt); + rados_write_op_append(rrd_rados->write_op, (char*)rrd->cdp_prep, + sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt * rrd->stat_head->ds_cnt); + rados_write_op_append(rrd_rados->write_op, (char*)rrd->rra_ptr, sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt); + + /* calculate the number of rrd_values to dump */ + int rra_offset = 0; + for (unsigned int i = 0; i < rrd->stat_head->rra_cnt; i++) { + unsigned long num_rows = rrd->rra_def[i].row_cnt; + unsigned long ds_cnt = rrd->stat_head->ds_cnt; + if (num_rows > 0){ + rados_write_op_append(rrd_rados->write_op, (char*)(rrd->rrd_value + rra_offset * ds_cnt), + sizeof(rrd_value_t) * num_rows * ds_cnt); + + rra_offset += num_rows; + } + } + + err = rrd_rados_flush(rrd_rados); + if (err < 0) + rrd_set_error("rados flush: %s", strerror(-err)); + + rrd_rados_close(rrd_rados); + + return err; +} + +size_t rrd_rados_read(rrd_rados_t *rrd_rados, void *data, size_t len, uint64_t offset) { + int ret; + + ret = rados_read(rrd_rados->ioctx, rrd_rados->oid, data, len, offset); + + if (ret < 0) + rrd_set_error("rados read: %s", strerror(-ret)); + + return ret; +} + +size_t rrd_rados_write(rrd_rados_t *rrd_rados, const void *data, size_t len, uint64_t offset) { + /* writes are queued in rados write_op and written atomically on + close or when explicitly calling flush */ + rados_write_op_write(rrd_rados->write_op, (char*)data, len, offset); + + /* writes aren't confirmed until flushed */ + return len; +} + +int rrd_rados_flush(rrd_rados_t *rrd_rados) { + return rados_write_op_operate(rrd_rados->write_op, rrd_rados->ioctx, rrd_rados->oid, + NULL, LIBRADOS_OPERATION_NOFLAG); +} + +int rrd_rados_lock(rrd_rados_t *rrd_rados) { + int ret; + + /* prevent dead lock by setting a maximum lock duration */ + struct timeval tv; + tv.tv_sec = 2; // 2 seconds + tv.tv_usec = 0; + + ret = rados_lock_exclusive(rrd_rados->ioctx, rrd_rados->oid, "rrdtool", "", "", &tv, 0); + + if (ret < 0) { + rrd_set_error("rados lock: %s", strerror(-ret)); + } else { + /* set flag to instruct rrd_rados_close() to release lock */ + rrd_rados->lock = 1; + } + + return ret; +} diff --git a/src/rrd_rados.h b/src/rrd_rados.h new file mode 100644 index 00000000..135536af --- /dev/null +++ b/src/rrd_rados.h @@ -0,0 +1,24 @@ +#ifndef RRD_RADOS_H +#define RRD_RADOS_H + +#include + +#include "rrd_tool.h" + +typedef struct rrd_rados_t { + rados_t cluster; + rados_ioctx_t ioctx; + const char *oid; + rados_write_op_t write_op; + int lock; +} rrd_rados_t; + +rrd_rados_t* rrd_rados_open(const char *oid); +int rrd_rados_close(rrd_rados_t *rrd_rados); +int rrd_rados_create(const char *oid, rrd_t *rrd); +size_t rrd_rados_read(rrd_rados_t *rrd_rados, void *data, size_t len, uint64_t offset); +size_t rrd_rados_write(rrd_rados_t *rrd_rados, const void *data, size_t len, uint64_t offset); +int rrd_rados_flush(rrd_rados_t *rrd_rados); +int rrd_rados_lock(rrd_rados_t *rrd_rados); + +#endif diff --git a/src/rrd_restore.c b/src/rrd_restore.c index 1475f5e6..87c95f6a 100644 --- a/src/rrd_restore.c +++ b/src/rrd_restore.c @@ -37,6 +37,10 @@ # define close _close #endif +#ifdef HAVE_LIBRADOS +#include "rrd_rados.h" +#endif + #define ARRAY_LENGTH(a) (sizeof (a) / sizeof ((a)[0])) @@ -1324,6 +1328,12 @@ int write_file( { FILE *fh; +#ifdef HAVE_LIBRADOS + if (strncmp("ceph//", file_name, 6) == 0) { + return rrd_rados_create(file_name + 6, rrd); + } +#endif + if (strcmp("-", file_name) == 0) fh = stdout; else { diff --git a/src/rrd_update.c b/src/rrd_update.c index 7b2d8f1d..f6314b02 100644 --- a/src/rrd_update.c +++ b/src/rrd_update.c @@ -259,7 +259,7 @@ static int smooth_all_rras( rrd_file_t *rrd_file, unsigned long rra_begin); -#ifndef HAVE_MMAP +#if !defined(HAVE_MMAP) || defined(HAVE_LIBRADOS) static int write_changes_to_disk( rrd_t *rrd, rrd_file_t *rrd_file, @@ -931,6 +931,10 @@ int _rrd_updatex( if (rrd_test_error()) { goto err_free_structures; } +#ifdef HAVE_LIBRADOS + if (rrd_file->rados) + write_changes_to_disk(&rrd, rrd_file, version); +#endif #ifndef HAVE_MMAP if (write_changes_to_disk(&rrd, rrd_file, version) == -1) { goto err_free_structures; @@ -2405,7 +2409,7 @@ static int smooth_all_rras( return 0; } -#ifndef HAVE_MMAP +#if !defined(HAVE_MMAP) || defined(HAVE_LIBRADOS) /* * Flush changes to disk (unless we're using mmap) * -- 2.47.2