AM_CONDITIONAL(BUILD_LIBDBI,[test $have_libdbi != no])
+have_librados=no
+
+AC_ARG_ENABLE(librados,AS_HELP_STRING([--disable-librados],[do not build in support for librados]),[],[
+ AC_CHECK_HEADER(rados/librados.h, [
+ AC_DEFINE(HAVE_LIBRADOS,[1],[have got librados installed])
+ LIBS="${LIBS} -lrados"
+ have_librados=yes
+ ])
+])
+
+AM_CONDITIONAL(BUILD_LIBRADOS,[test $have_librados != no])
+
have_libwrap=no
AC_ARG_ENABLE(libwrap, AS_HELP_STRING([--disable-libwrap], [do not build in support for libwrap (tcp wrapper)]),[],[
echo " Build librrd MT: $enable_pthread"
echo " Use gettext: $USE_NLS"
echo " With libDBI: $have_libdbi"
+echo " With librados: $have_librados"
echo " With libwrap: $have_libwrap"
echo " With systemd dir: $with_systemdsystemunitdir"
echo
POD += rrdgraph_libdbi.pod
endif
+if BUILD_LIBRADOS
+ POD += rrdrados.pod
+endif
+
if BUILD_LUA
POD += rrdlua.pod
endif
--- /dev/null
+=head1 NAME
+
+rrdrados - Creating, updating and retrieving RRD files from Ceph
+
+=head1 SYNOPSIS
+
+E<lt>rrdfileE<gt> = B<ceph//E<lt>nameE<gt>>
+
+
+=head1 DESCRIPTION
+
+This module adds support for creating, updating and retrieving RRD files
+directly from a Ceph cluster using librados.
+
+It adds a ceph// prefix to RRD file name which is used to instruct rrdtool to
+operate on a file that is stored in Ceph.
+
+Currently the module expects to find the Ceph configuration file in the default
+location at /etc/ceph/ceph.conf. By default it uses Ceph client ID "admin" and a
+Ceph pool named "rrd".
+
+
+=head1 ENVIRONMENT VARIABLES
+
+=over 4
+
+=item B<CEPH_ID>
+
+Sets the Ceph Client ID to use when connecting. By default the client ID "admin"
+is used.
+
+=item B<CEPH_POOL>
+
+Sets the name of the Ceph Pool to connect to. By default, the pool "rrd" is
+used.
+
+=back
+
+
+=head1 EXAMPLES
+
+B<Creating an RRD file on Ceph>
+
+ rrdtool create ceph//temperature.rrd --step 300 \
+ DS:temp:GAUGE:600:-273:5000 \
+ RRA:AVERAGE:0.5:1:1200 \
+ RRA:MIN:0.5:12:2400 \
+ RRA:MAX:0.5:12:2400 \
+ RRA:AVERAGE:0.5:12:2400
+
+
+B<Importing an existing RRD into Ceph>
+
+ rrdtool dump existing.rrd | rrdtool restore - ceph//new.rrd
+
+Or you could also copy the RRD file directly into Ceph using the rados command
+line utility.
+
+
+B<Retrieving RRD data from Ceph>
+
+ rrdtool fetch ceph//file.rrd AVERAGE
+
+
+=head1 AUTHOR
+
+Simon Boulet E<lt>simon@nostalgeek.comE<gt>
+
RRD_C_FILES += rrd_fetch_libdbi.c
endif
+if BUILD_LIBRADOS
+UPD_C_FILES += rrd_rados.c
+endif
+
if BUILD_GETOPT
noinst_HEADERS += rrd_getopt.h
UPD_C_FILES += rrd_getopt.c rrd_getopt1.c
size_t pos; /* current pos in file */
void *pvt;
struct rrd_t *rrd; /* the corresponding RRD structure, if any */
+#ifdef HAVE_LIBRADOS
+ struct rrd_rados_t *rados;
+#endif
} rrd_file_t;
/* information used for the conventional file access methods */
# include <process.h>
#endif
+#ifdef HAVE_LIBRADOS
+#include "rrd_rados.h"
+#endif
+
static void reset_pdp_prep(rrd_t *rrd);
static int rrd_init_data(rrd_t *rrd);
static int rrd_prefill_data(rrd_t *rrd, const GList *sources_rrd_files,
char *tmpfilename = NULL;
/* write out the new file */
+#ifdef HAVE_LIBRADOS
+ if (strncmp("ceph//", outfilename, 6) == 0) {
+ rc = rrd_rados_create(outfilename + 6, out);
+ goto done;
+ }
+#endif
+
FILE *fh = NULL;
if (strcmp(outfilename, "-") == 0) {
fh = stdout;
#include <utime.h>
#endif
+#ifdef HAVE_LIBRADOS
+#include "rrd_rados.h"
+#endif
+
#define MEMBLK 8192
#ifdef WIN32
/* the cast to void* is there to avoid this warning seen on ia64 with certain
versions of gcc: 'cast increases required alignment of target type'
*/
-#define __rrd_read(dst, dst_t, cnt) { \
+#define __rrd_read_mmap(dst, dst_t, cnt) { \
size_t wanted = sizeof(dst_t)*(cnt); \
if (offset + wanted > rrd_file->file_len) { \
rrd_set_error("reached EOF while loading header " #dst); \
offset += wanted; \
}
#else
-#define __rrd_read(dst, dst_t, cnt) { \
+#define __rrd_read_seq(dst, dst_t, cnt) { \
size_t wanted = sizeof(dst_t)*(cnt); \
size_t got; \
if ((dst = (dst_t*)malloc(wanted)) == NULL) { \
}
#endif
+#ifdef HAVE_LIBRADOS
+#define __rrd_read_rados(dst, dst_t, cnt) { \
+ size_t wanted = sizeof(dst_t)*(cnt); \
+ size_t got; \
+ if ((dst = (dst_t*)malloc(wanted)) == NULL) { \
+ rrd_set_error(#dst " malloc"); \
+ goto out_nullify_head; \
+ } \
+ got = rrd_rados_read(rrd_file->rados, dst, wanted, offset); \
+ if (got != wanted) { \
+ rrd_set_error("short read while reading header " #dst); \
+ goto out_nullify_head; \
+ } \
+ offset += got; \
+ }
+#endif
+
+#if defined(HAVE_LIBRADOS) && defined(HAVE_MMAP)
+#define __rrd_read(dst, dst_t, cnt) { \
+ if (rrd_file->rados) \
+ __rrd_read_rados(dst, dst_t, cnt) \
+ else \
+ __rrd_read_mmap(dst, dst_t, cnt) \
+ }
+#elif defined(HAVE_LIBRADOS) && !defined(HAVE_MMAP)
+ if (rrd_file->rados) \
+ __rrd_read_rados(dst, dst_t, cnt) \
+ else \
+ __rrd_read_seq(dst, dst_t, cnt) \
+ }
+#elif defined(HAVE_MMAP)
+#define __rrd_read(dst, dst_t, cnt) \
+ __rrd_read_mmap(dst, dst_t, cnt)
+#else
+#define __rrd_read(dst, dst_t, cnt) \
+ __rrd_read_seq(dst, dst_t, cnt)
+#endif
+
/* get the address of the start of this page */
#if defined USE_MADVISE || defined HAVE_POSIX_FADVISE
#ifndef PAGE_START
}
#endif
+#ifdef HAVE_LIBRADOS
+ if (strncmp("ceph//", file_name, 6) == 0) {
+ rrd_file->rados = rrd_rados_open(file_name + 6);
+ if (rrd_file->rados == NULL)
+ goto out_free;
+
+ if (rdwr & RRD_CREAT)
+ goto out_done;
+
+ goto read_check;
+ }
+#endif
+
#ifdef HAVE_MMAP
rrd_simple_file->mm_prot = PROT_READ;
rrd_simple_file->mm_flags = 0;
}
#endif
+#ifdef HAVE_LIBRADOS
+read_check:
+#endif
+
__rrd_read(rrd->stat_head, stat_head_t,
1);
size_t correct_len = rrd_file->header_len +
sizeof(rrd_value_t) * row_cnt * rrd->stat_head->ds_cnt;
+#ifdef HAVE_LIBRADOS
+ /* skip length checking for rados file */
+ if (rrd_file->rados) {
+ rrd_file->file_len = correct_len;
+ }
+#endif
+
if (correct_len > rrd_file->file_len)
{
rrd_set_error("'%s' is too small (should be %ld bytes)",
(void)rrd_file;
return 0;
#else
+#ifdef HAVE_LIBRADOS
+ if (rrd_file->rados)
+ return rrd_rados_lock(rrd_file->rados);
+#endif
int rcstat;
rrd_simple_file_t *rrd_simple_file;
rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt;
ret = munmap(rrd_simple_file->file_start, rrd_file->file_len);
if (ret != 0)
rrd_set_error("munmap rrd_file: %s", rrd_strerror(errno));
+#endif
+#ifdef HAVE_LIBRADOS
+ if (rrd_file->rados)
+ ret = rrd_rados_close(rrd_file->rados);
+ else
#endif
ret = close(rrd_simple_file->fd);
+
if (ret != 0)
rrd_set_error("closing file: %s", rrd_strerror(errno));
free(rrd_file->pvt);
off_t off,
int whence)
{
+#ifdef HAVE_LIBRADOS
+ /* no seek for rados */
+ if (rrd_file->rados) {
+ rrd_file->pos = off;
+ return 0;
+ }
+#endif
+
off_t ret = 0;
#ifndef HAVE_MMAP
rrd_simple_file_t *rrd_simple_file;
void *buf,
size_t count)
{
+#ifdef HAVE_LIBRADOS
+ if (rrd_file->rados) {
+ ssize_t ret = rrd_rados_read(rrd_file->rados, buf, count, rrd_file->pos);
+ if (ret > 0)
+ rrd_file->pos += ret;
+ return ret;
+ }
+#endif
rrd_simple_file_t *rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt;
#ifdef HAVE_MMAP
size_t _cnt = count;
const void *buf,
size_t count)
{
+#ifdef HAVE_LIBRADOS
+ if (rrd_file->rados) {
+ size_t ret = rrd_rados_write(rrd_file->rados, buf, count, rrd_file->pos);
+ if (ret > 0)
+ rrd_file->pos += count;
+ return ret;
+ }
+#endif
rrd_simple_file_t *rrd_simple_file = (rrd_simple_file_t *)rrd_file->pvt;
#ifdef HAVE_MMAP
size_t old_size = rrd_file->file_len;
--- /dev/null
+#include "rrd_rados.h"
+
+rrd_rados_t* rrd_rados_open(const char *oid) {
+ int err;
+ rrd_rados_t *rrd_rados;
+
+ rrd_rados = (rrd_rados_t*)malloc(sizeof(rrd_rados_t));
+ if (rrd_rados == NULL) {
+ rrd_set_error("allocating rrd_rados descriptor");
+ goto err;
+ }
+
+ memset(rrd_rados, 0, sizeof(rrd_rados_t));
+ rrd_rados->oid = oid;
+
+ const char *ceph_id = getenv("CEPH_ID");
+ if (!ceph_id) ceph_id = "admin";
+ err = rados_create(&rrd_rados->cluster, ceph_id);
+ if (err < 0) {
+ rrd_set_error("cannot create cluster handle: %s", strerror(-err));
+ goto err;
+ }
+
+ err = rados_conf_read_file(rrd_rados->cluster, "/etc/ceph/ceph.conf");
+ if (err < 0) {
+ rrd_set_error("cannot read config file: %s", strerror(-err));
+ goto err;
+ }
+
+ err = rados_connect(rrd_rados->cluster);
+ if (err < 0) {
+ rrd_set_error("cannot connect to cluster: %s", strerror(-err));
+ goto err;
+ }
+
+ rrd_rados->write_op = rados_create_write_op();
+ if (rrd_rados->write_op == NULL) {
+ rrd_set_error("allocating rados_write_op_t");
+ goto err;
+ }
+
+ const char *ceph_pool = getenv("CEPH_POOL");
+ if (!ceph_pool) ceph_pool = "rrd";
+ err = rados_ioctx_create(rrd_rados->cluster, ceph_pool, &rrd_rados->ioctx);
+ if (err < 0) {
+ rrd_set_error("cannot open rados pool: %s", strerror(-err));
+ goto err;
+ }
+
+ return rrd_rados;
+err:
+ if (rrd_rados->write_op)
+ rados_release_write_op(rrd_rados->write_op);
+ if (rrd_rados->ioctx)
+ rados_ioctx_destroy(rrd_rados->ioctx);
+ if (rrd_rados->cluster)
+ rados_shutdown(rrd_rados->cluster);
+ free(rrd_rados);
+ return NULL;
+}
+
+int rrd_rados_close(rrd_rados_t *rrd_rados) {
+ rrd_rados_flush(rrd_rados);
+
+ /* release lock on close, see rrd_rados_lock() */
+ if (rrd_rados->lock) {
+ rados_unlock(rrd_rados->ioctx, rrd_rados->oid, "rrdtool", "");
+ }
+
+ rados_release_write_op(rrd_rados->write_op);
+ rados_ioctx_destroy(rrd_rados->ioctx);
+ rados_shutdown(rrd_rados->cluster);
+ free(rrd_rados);
+
+ return 0;
+}
+
+int rrd_rados_create(const char *oid, rrd_t *rrd) {
+ int err;
+
+ rrd_rados_t *rrd_rados = rrd_rados_open(oid);
+ if (rrd_rados == NULL)
+ return -1;
+
+ rados_write_op_truncate(rrd_rados->write_op, 0);
+
+ /* See write_fh() defined in rrd_create.c */
+
+ if (atoi(rrd->stat_head->version) < 3) {
+ /* we output 3 or higher */
+ strcpy(rrd->stat_head->version, "0003");
+ }
+
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->stat_head, sizeof(stat_head_t));
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->ds_def, sizeof(ds_def_t) * rrd->stat_head->ds_cnt);
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->rra_def, sizeof(rra_def_t) * rrd->stat_head->rra_cnt);
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->live_head, sizeof(live_head_t));
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->pdp_prep, sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt);
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->cdp_prep,
+ sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt * rrd->stat_head->ds_cnt);
+ rados_write_op_append(rrd_rados->write_op, (char*)rrd->rra_ptr, sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt);
+
+ /* calculate the number of rrd_values to dump */
+ int rra_offset = 0;
+ for (unsigned int i = 0; i < rrd->stat_head->rra_cnt; i++) {
+ unsigned long num_rows = rrd->rra_def[i].row_cnt;
+ unsigned long ds_cnt = rrd->stat_head->ds_cnt;
+ if (num_rows > 0){
+ rados_write_op_append(rrd_rados->write_op, (char*)(rrd->rrd_value + rra_offset * ds_cnt),
+ sizeof(rrd_value_t) * num_rows * ds_cnt);
+
+ rra_offset += num_rows;
+ }
+ }
+
+ err = rrd_rados_flush(rrd_rados);
+ if (err < 0)
+ rrd_set_error("rados flush: %s", strerror(-err));
+
+ rrd_rados_close(rrd_rados);
+
+ return err;
+}
+
+size_t rrd_rados_read(rrd_rados_t *rrd_rados, void *data, size_t len, uint64_t offset) {
+ int ret;
+
+ ret = rados_read(rrd_rados->ioctx, rrd_rados->oid, data, len, offset);
+
+ if (ret < 0)
+ rrd_set_error("rados read: %s", strerror(-ret));
+
+ return ret;
+}
+
+size_t rrd_rados_write(rrd_rados_t *rrd_rados, const void *data, size_t len, uint64_t offset) {
+ /* writes are queued in rados write_op and written atomically on
+ close or when explicitly calling flush */
+ rados_write_op_write(rrd_rados->write_op, (char*)data, len, offset);
+
+ /* writes aren't confirmed until flushed */
+ return len;
+}
+
+int rrd_rados_flush(rrd_rados_t *rrd_rados) {
+ return rados_write_op_operate(rrd_rados->write_op, rrd_rados->ioctx, rrd_rados->oid,
+ NULL, LIBRADOS_OPERATION_NOFLAG);
+}
+
+int rrd_rados_lock(rrd_rados_t *rrd_rados) {
+ int ret;
+
+ /* prevent dead lock by setting a maximum lock duration */
+ struct timeval tv;
+ tv.tv_sec = 2; // 2 seconds
+ tv.tv_usec = 0;
+
+ ret = rados_lock_exclusive(rrd_rados->ioctx, rrd_rados->oid, "rrdtool", "", "", &tv, 0);
+
+ if (ret < 0) {
+ rrd_set_error("rados lock: %s", strerror(-ret));
+ } else {
+ /* set flag to instruct rrd_rados_close() to release lock */
+ rrd_rados->lock = 1;
+ }
+
+ return ret;
+}
--- /dev/null
+#ifndef RRD_RADOS_H
+#define RRD_RADOS_H
+
+#include <rados/librados.h>
+
+#include "rrd_tool.h"
+
+typedef struct rrd_rados_t {
+ rados_t cluster;
+ rados_ioctx_t ioctx;
+ const char *oid;
+ rados_write_op_t write_op;
+ int lock;
+} rrd_rados_t;
+
+rrd_rados_t* rrd_rados_open(const char *oid);
+int rrd_rados_close(rrd_rados_t *rrd_rados);
+int rrd_rados_create(const char *oid, rrd_t *rrd);
+size_t rrd_rados_read(rrd_rados_t *rrd_rados, void *data, size_t len, uint64_t offset);
+size_t rrd_rados_write(rrd_rados_t *rrd_rados, const void *data, size_t len, uint64_t offset);
+int rrd_rados_flush(rrd_rados_t *rrd_rados);
+int rrd_rados_lock(rrd_rados_t *rrd_rados);
+
+#endif
# define close _close
#endif
+#ifdef HAVE_LIBRADOS
+#include "rrd_rados.h"
+#endif
+
#define ARRAY_LENGTH(a) (sizeof (a) / sizeof ((a)[0]))
{
FILE *fh;
+#ifdef HAVE_LIBRADOS
+ if (strncmp("ceph//", file_name, 6) == 0) {
+ return rrd_rados_create(file_name + 6, rrd);
+ }
+#endif
+
if (strcmp("-", file_name) == 0)
fh = stdout;
else {
rrd_file_t *rrd_file,
unsigned long rra_begin);
-#ifndef HAVE_MMAP
+#if !defined(HAVE_MMAP) || defined(HAVE_LIBRADOS)
static int write_changes_to_disk(
rrd_t *rrd,
rrd_file_t *rrd_file,
if (rrd_test_error()) {
goto err_free_structures;
}
+#ifdef HAVE_LIBRADOS
+ if (rrd_file->rados)
+ write_changes_to_disk(&rrd, rrd_file, version);
+#endif
#ifndef HAVE_MMAP
if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
goto err_free_structures;
return 0;
}
-#ifndef HAVE_MMAP
+#if !defined(HAVE_MMAP) || defined(HAVE_LIBRADOS)
/*
* Flush changes to disk (unless we're using mmap)
*