]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
* RRD API initial implementation.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 14 Dec 2012 18:26:27 +0000 (22:26 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Fri, 14 Dec 2012 18:26:27 +0000 (22:26 +0400)
src/rrd.c
src/rrd.h

index b14369f9a05be9526bcd8e7dc77013627708904e..f24b4c3d5b44b38116c759c5101f11a64c3f386e 100644 (file)
--- a/src/rrd.c
+++ b/src/rrd.c
 #include "config.h"
 #include "rrd.h"
 
+static GQuark
+rrd_error_quark (void)
+{
+       return g_quark_from_static_string ("rrd-error");
+}
+
+/**
+ * Check rrd file for correctness (size, cookies, etc)
+ */
+static gboolean
+rspamd_rrd_check_file (const gchar *filename, gboolean need_data, GError **err)
+{
+       gint                                                             fd, i;
+       struct stat                                                      st;
+       struct rrd_file_head                             head;
+       struct rrd_rra_def                                       rra;
+       gint                                                             head_size;
+
+       fd = open (filename, O_RDWR);
+       if (fd == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
+               return FALSE;
+       }
+
+       if (fstat (fd, &st) == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+       if (st.st_size < (goffset)sizeof (struct rrd_file_head)) {
+               /* We have trimmed file */
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd size is bad: %ud", (guint)st.st_size);
+               close (fd);
+               return FALSE;
+       }
+
+       /* Try to read header */
+       if (read (fd, &head, sizeof (head)) != sizeof (head)) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd read head error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+       /* Check magic */
+       if (memcmp (head.cookie, RRD_COOKIE, sizeof (head.cookie)) != 0 ||
+                       memcmp (head.version, RRD_VERSION, sizeof (head.version)) != 0 ||
+                       head.float_cookie != RRD_FLOAT_COOKIE) {
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+       /* Check for other params */
+       if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd head cookies error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+       /* Now we can calculate the overall size of rrd */
+       head_size = sizeof (struct rrd_file_head) +
+                       sizeof (struct rrd_ds_def) * head.ds_cnt +
+                       sizeof (struct rrd_rra_def) * head.rra_cnt +
+                       sizeof (struct rrd_live_head) +
+                       sizeof (struct rrd_pdp_prep) * head.ds_cnt +
+                       sizeof (struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
+                       sizeof (struct rrd_rra_ptr) * head.rra_cnt;
+       if (st.st_size < (goffset)head_size) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd file seems to have stripped header: %d", head_size);
+               close (fd);
+               return FALSE;
+       }
+
+       if (need_data) {
+               /* Now check rra */
+               if (lseek (fd, sizeof (struct rrd_ds_def) * head.ds_cnt, SEEK_CUR) == -1) {
+                       g_set_error (err, rrd_error_quark (), errno, "rrd head lseek error: %s", strerror (errno));
+                       close (fd);
+                       return FALSE;
+               }
+               for (i = 0; i < (gint)head.rra_cnt; i ++) {
+                       if (read (fd, &rra, sizeof (rra)) != sizeof (rra)) {
+                               g_set_error (err, rrd_error_quark (), errno, "rrd read rra error: %s", strerror (errno));
+                               close (fd);
+                               return FALSE;
+                       }
+                       head_size += rra.row_cnt * head.ds_cnt;
+               }
+
+               if (st.st_size != head_size) {
+                       g_set_error (err, rrd_error_quark (), EINVAL, "rrd file seems to have incorrect size: %d, must be %d", (gint)st.st_size, head_size);
+                       close (fd);
+                       return FALSE;
+               }
+       }
+
+       close (fd);
+       return TRUE;
+}
+
+/**
+ * Adjust pointers in mmapped rrd file
+ * @param file
+ */
+static void
+rspamd_rrd_adjust_pointers (struct rspamd_rrd_file *file, gboolean completed)
+{
+       guint8                                                                          *ptr;
+
+       ptr = file->map;
+       file->stat_head = (struct rrd_file_head *)ptr;
+       ptr += sizeof (struct rrd_file_head);
+       file->ds_def = (struct rrd_ds_def *)ptr;
+       ptr += sizeof (struct rrd_ds_def) * file->stat_head->ds_cnt;
+       file->rra_def = (struct rrd_rra_def *)ptr;
+       ptr += sizeof (struct rrd_rra_def) * file->stat_head->rra_cnt;
+       file->live_head = (struct rrd_live_head *)ptr;
+       ptr += sizeof (struct rrd_live_head);
+       file->pdp_prep = (struct rrd_pdp_prep *)ptr;
+       ptr += sizeof (struct rrd_pdp_prep) * file->stat_head->ds_cnt;
+       file->cdp_prep = (struct rrd_cdp_prep *)ptr;
+       ptr += sizeof (struct rrd_cdp_prep) * file->stat_head->rra_cnt * file->stat_head->ds_cnt;
+       file->rra_ptr = (struct rrd_rra_ptr *)ptr;
+       if (completed) {
+               ptr += sizeof (struct rrd_rra_ptr) * file->stat_head->rra_cnt;
+               file->rrd_value = (gdouble *)ptr;
+       }
+       else {
+               file->rrd_value = NULL;
+       }
+}
+
+/**
+ * Open completed or incompleted rrd file
+ * @param filename
+ * @param completed
+ * @param err
+ * @return
+ */
+static struct rspamd_rrd_file*
+rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
+{
+       struct rspamd_rrd_file                                          *new;
+       gint                                                                             fd;
+       struct stat                                                                      st;
+
+       if (!rspamd_rrd_check_file (filename, completed, err)) {
+               return NULL;
+       }
+
+       new = g_slice_alloc0 (sizeof (struct rspamd_rrd_file));
+
+       if (new == NULL) {
+               g_set_error (err, rrd_error_quark (), ENOMEM, "not enough memory");
+               return NULL;
+       }
+
+       /* Open file */
+       fd = open (filename, O_RDWR);
+       if (fd == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
+               return FALSE;
+       }
+
+       if (fstat (fd, &st) == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+       /* Mmap file */
+       new->size = st.st_size;
+       if ((new->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+               close (fd);
+               g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
+               g_slice_free1 (sizeof (struct rspamd_rrd_file), new);
+               return NULL;
+       }
+
+       close (fd);
+
+       /* Adjust pointers */
+       rspamd_rrd_adjust_pointers (new, completed);
+
+       /* Mark it as finalized */
+       new->finalized = completed;
+
+       new->filename = g_strdup (filename);
+
+       return new;
+}
+
 /**
  * Open (and mmap) existing RRD file
  * @param filename path
 struct rspamd_rrd_file*
 rspamd_rrd_open (const gchar *filename, GError **err)
 {
-       return NULL;
+       return rspamd_rrd_open_common (filename, TRUE, err);
 }
 
 /**
@@ -45,10 +233,117 @@ rspamd_rrd_open (const gchar *filename, GError **err)
  * @param err error pointer
  * @return TRUE if file has been created
  */
-gboolean
+struct rspamd_rrd_file*
 rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err)
 {
-       return FALSE;
+       struct rspamd_rrd_file                                          *new;
+       struct rrd_file_head                                             head;
+       struct rrd_ds_def                                                        ds;
+       struct rrd_rra_def                                                       rra;
+       struct rrd_live_head                                             lh;
+       struct rrd_pdp_prep                                                      pdp;
+       struct rrd_cdp_prep                                                      cdp;
+       struct rrd_rra_ptr                                                       rra_ptr;
+       gint                                                                             fd;
+       guint                                                                            i, j;
+
+       /* Open file */
+       fd = open (filename, O_RDWR | O_CREAT | O_TRUNC, 0644);
+       if (fd == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd create error: %s", strerror (errno));
+               return NULL;
+       }
+
+       /* Fill header */
+       memset (&head, 0, sizeof (head));
+       head.rra_cnt = rra_count;
+       head.ds_cnt = ds_count;
+       head.pdp_step = pdp_step;
+       memcpy (head.cookie, RRD_COOKIE, sizeof (head.cookie));
+       memcpy (head.version, RRD_VERSION, sizeof (head.version));
+       head.float_cookie = RRD_FLOAT_COOKIE;
+
+       if (write (fd, &head, sizeof (head)) != sizeof (head)) {
+               close (fd);
+               g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+               return NULL;
+       }
+
+       /* Fill DS section */
+       memset (&ds.ds_nam, 0, sizeof (ds.ds_nam));
+       memcpy (&ds.dst, "COUNTER", sizeof ("COUNTER"));
+       memset (&ds.par, 0, sizeof (ds.par));
+       for (i = 0; i < ds_count; i ++) {
+               if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) {
+                       close (fd);
+                       g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+                       return NULL;
+               }
+       }
+
+       /* Fill RRA section */
+       memcpy (&rra.cf_nam, "AVERAGE", sizeof ("AVERAGE"));
+       rra.pdp_cnt = 1;
+       memset (&rra.par, 0, sizeof (rra.par));
+       for (i = 0; i < rra_count; i ++) {
+               if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) {
+                       close (fd);
+                       g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+                       return NULL;
+               }
+       }
+
+       /* Fill live header */
+       lh.last_up = time (NULL) - 10;
+       lh.last_up_usec = 0;
+
+       if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) {
+               close (fd);
+               g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+               return NULL;
+       }
+
+       /* Fill pdp prep */
+       memcpy (&pdp.last_ds, "U", sizeof ("U"));
+       memset (&pdp.scratch, 0, sizeof (pdp.scratch));
+       pdp.scratch[PDP_val].dv = 0.;
+       pdp.scratch[PDP_unkn_sec_cnt].lv = lh.last_up % pdp_step;
+       for (i = 0; i < ds_count; i ++) {
+               if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) {
+                       close (fd);
+                       g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+                       return NULL;
+               }
+       }
+
+       /* Fill cdp prep */
+       memset (&cdp.scratch, 0, sizeof (cdp.scratch));
+       cdp.scratch[CDP_val].dv = NAN;
+       for (i = 0; i < rra_count; i ++) {
+               cdp.scratch[CDP_unkn_pdp_cnt].lv = ((lh.last_up - pdp.scratch[PDP_unkn_sec_cnt].lv) % (pdp_step * rra.pdp_cnt)) / pdp_step;
+               for (j = 0; j < ds_count; j ++) {
+                       if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) {
+                               close (fd);
+                               g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+                               return NULL;
+                       }
+               }
+       }
+
+       /* Set row pointers */
+       memset (&rra_ptr, 0, sizeof (rra_ptr));
+       for (i = 0; i < rra_count; i ++) {
+               if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) {
+                       close (fd);
+                       g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+                       return NULL;
+               }
+       }
+
+       close (fd);
+       new = rspamd_rrd_open_common (filename, FALSE, err);
+
+       return new;
 }
 
 /**
@@ -59,9 +354,18 @@ rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gul
  * @return TRUE if data sources were added
  */
 gboolean
-rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err)
+rspamd_rrd_add_ds (struct rspamd_rrd_file *file, GArray *ds, GError **err)
 {
-       return FALSE;
+
+       if (file == NULL || file->stat_head->ds_cnt != ds->len * sizeof (struct rrd_ds_def)) {
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd add ds failed: wrong arguments");
+               return FALSE;
+       }
+
+       /* Straightforward memcpy */
+       memcpy (file->ds_def, ds->data, ds->len);
+
+       return TRUE;
 }
 
 /**
@@ -72,9 +376,17 @@ rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err)
  * @return TRUE if archives were added
  */
 gboolean
-rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err)
+rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err)
 {
-       return FALSE;
+       if (file == NULL || file->stat_head->rra_cnt != rra->len * sizeof (struct rrd_rra_def)) {
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
+               return FALSE;
+       }
+
+       /* Straightforward memcpy */
+       memcpy (file->rra_def, rra->data, rra->len);
+
+       return TRUE;
 }
 
 /**
@@ -84,9 +396,76 @@ rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err)
  * @return TRUE if rrd file is ready for use
  */
 gboolean
-rspamd_rrd_finalize (const gchar *filename, GError **err)
+rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
 {
-       return FALSE;
+       gint                                                                             fd;
+       guint                                                                            i, count = 0;
+       gdouble                                                                          vbuf[1024];
+       struct stat                                                                      st;
+
+       if (file == NULL || file->filename == NULL) {
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
+               return FALSE;
+       }
+
+       fd = open (file->filename, O_RDWR);
+       if (fd == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
+               return FALSE;
+       }
+
+       if (lseek (fd, 0, SEEK_END) == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd seek error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+
+       /* Adjust CDP */
+       for (i = 0; i < file->stat_head->rra_cnt; i ++) {
+               file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv =
+                               ((file->live_head->last_up - file->pdp_prep->scratch[PDP_unkn_sec_cnt].lv) % (file->stat_head->pdp_step *
+                                               file->rra_def[i].pdp_cnt)) / file->stat_head->pdp_step;
+               /* Randomize row pointer */
+               file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt;
+               /* Calculate values count */
+               count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
+       }
+
+       munmap (file->map, file->size);
+       /* Write values */
+       for (i = 0; i < G_N_ELEMENTS (vbuf); i ++) {
+               vbuf[i] = NAN;
+       }
+
+       while (count > 0) {
+               /* Write values in buffered matter */
+               if (write (fd, vbuf, MIN (G_N_ELEMENTS (vbuf), count) * sizeof (gdouble)) == -1) {
+                       g_set_error (err, rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
+                       close (fd);
+                       return FALSE;
+               }
+               count -= G_N_ELEMENTS (vbuf);
+       }
+
+       if (fstat (fd, &st) == -1) {
+               g_set_error (err, rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
+               close (fd);
+               return FALSE;
+       }
+
+       /* Mmap again */
+       file->size = st.st_size;
+       if ((file->map = mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)) == MAP_FAILED) {
+               close (fd);
+               g_set_error (err, rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
+               g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
+               return FALSE;
+       }
+       close (fd);
+       /* Adjust pointers */
+       rspamd_rrd_adjust_pointers (file, TRUE);
+
+       return TRUE;
 }
 
 /**
@@ -100,5 +479,52 @@ rspamd_rrd_finalize (const gchar *filename, GError **err)
 gboolean
 rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err)
 {
-       return FALSE;
+       gdouble                                                                 *row;
+       guint                                                                    i;
+
+       if (file == NULL || file->stat_head->ds_cnt != points->len * sizeof (gdouble) || rra_idx >= file->stat_head->rra_cnt) {
+               g_set_error (err, rrd_error_quark (), EINVAL, "rrd add points failed: wrong arguments");
+               return FALSE;
+       }
+
+       row = file->rrd_value;
+       /* Skip unaffected rra */
+       for (i = 0; i < rra_idx; i ++) {
+               row += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
+       }
+
+       row += file->rra_ptr[rra_idx].cur_row * file->stat_head->ds_cnt;
+
+       /* Increase row index */
+       file->rra_ptr[rra_idx].cur_row ++;
+       if (file->rra_ptr[rra_idx].cur_row >= file->rra_def[rra_idx].row_cnt) {
+               file->rra_ptr[rra_idx].cur_row = 0;
+       }
+
+       /* Write data */
+       memcpy (row, points, points->len);
+
+       return TRUE;
+}
+
+/**
+ * Close rrd file
+ * @param file
+ * @return
+ */
+gint
+rspamd_rrd_close (struct rspamd_rrd_file* file)
+{
+       if (file == NULL) {
+               errno = EINVAL;
+               return -1;
+       }
+
+       munmap (file->map, file->size);
+       if (file->filename != NULL) {
+               g_free (file->filename);
+       }
+       g_slice_free1 (sizeof (struct rspamd_rrd_file), file);
+
+       return 0;
 }
index be22fa35467e2e9887b1f13607acb041ea98c2d9..214f36d03ec7fe16d5de7796d3677c3c4f397a0a 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -260,15 +260,20 @@ struct rrd_rra_ptr {
 
 /* Final rrd file structure */
 struct rspamd_rrd_file {
-    struct rrd_file_head *stat_head; /* the static header */
-    struct rrd_ds_def *ds_def;   /* list of data source definitions */
-    struct rrd_rra_def *rra_def; /* list of round robin archive def */
-    struct rrd_live_head *live_head; /* rrd v >= 3 last_up with us */
-    struct rrd_pdp_prep *pdp_prep;   /* pdp data prep area */
-    struct rrd_cdp_prep *cdp_prep;   /* cdp prep area */
-    struct rrd_rra_ptr *rra_ptr; /* list of rra pointers */
-    rrd_value_t *rrd_value; /* list of rrd values */
-} rrd_t;
+       struct rrd_file_head *stat_head; /* the static header */
+       struct rrd_ds_def *ds_def;   /* list of data source definitions */
+       struct rrd_rra_def *rra_def; /* list of round robin archive def */
+       struct rrd_live_head *live_head; /* rrd v >= 3 last_up with us */
+       struct rrd_pdp_prep *pdp_prep;   /* pdp data prep area */
+       struct rrd_cdp_prep *cdp_prep;   /* cdp prep area */
+       struct rrd_rra_ptr *rra_ptr; /* list of rra pointers */
+       gdouble *rrd_value; /* list of rrd values */
+
+       gchar *filename;
+       guint8* map; /* mmapped area */
+       gsize size; /* its size */
+       gboolean finalized;
+};
 
 
 /* Public API */
@@ -290,7 +295,7 @@ struct rspamd_rrd_file* rspamd_rrd_open (const gchar *filename, GError **err);
  * @param err error pointer
  * @return TRUE if file has been created
  */
-gboolean rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err);
+struct rspamd_rrd_file* rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_count, gulong pdp_step, GError **err);
 
 /**
  * Add data sources to rrd file
@@ -299,7 +304,7 @@ gboolean rspamd_rrd_create (const gchar *filename, gulong ds_count, gulong rra_c
  * @param err error pointer
  * @return TRUE if data sources were added
  */
-gboolean rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err);
+gboolean rspamd_rrd_add_ds (struct rspamd_rrd_file* file, GArray *ds, GError **err);
 
 /**
  * Add round robin archives to rrd file
@@ -308,7 +313,7 @@ gboolean rspamd_rrd_add_ds (const gchar *filename, GArray *ds, GError **err);
  * @param err error pointer
  * @return TRUE if archives were added
  */
-gboolean rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err);
+gboolean rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err);
 
 /**
  * Finalize rrd file header and initialize all RRA in the file
@@ -316,7 +321,7 @@ gboolean rspamd_rrd_add_rra (const gchar *filename, GArray *rra, GError **err);
  * @param err error pointer
  * @return TRUE if rrd file is ready for use
  */
-gboolean rspamd_rrd_finalize (const gchar *filename, GError **err);
+gboolean rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err);
 
 /**
  * Add record to rrd file
@@ -328,4 +333,11 @@ gboolean rspamd_rrd_finalize (const gchar *filename, GError **err);
  */
 gboolean rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *points, GError **err);
 
+/**
+ * Close rrd file
+ * @param file
+ * @return
+ */
+gint rspamd_rrd_close (struct rspamd_rrd_file* file);
+
 #endif /* RRD_H_ */