]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
Add CDP updates.
authorVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 17 Dec 2012 18:56:03 +0000 (22:56 +0400)
committerVsevolod Stakhov <vsevolod@rambler-co.ru>
Mon, 17 Dec 2012 18:56:03 +0000 (22:56 +0400)
src/rrd.c

index c82332fda643113133a694f03d0a6c141d2edff8..34d956a1603117385b768f23a3e384a80371f603 100644 (file)
--- a/src/rrd.c
+++ b/src/rrd.c
@@ -587,6 +587,14 @@ rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
        return TRUE;
 }
 
+/**
+ * Update pdp_prep data
+ * @param file rrd file
+ * @param vals new values
+ * @param pdp_new new pdp array
+ * @param interval time elapsed from the last update
+ * @return
+ */
 static gboolean
 rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble *pdp_new, gdouble interval)
 {
@@ -638,6 +646,16 @@ rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file, gdouble *vals, gdouble
        return TRUE;
 }
 
+/**
+ * Update step for this pdp
+ * @param file
+ * @param pdp_new new pdp array
+ * @param pdp_temp temp pdp array
+ * @param interval time till last update
+ * @param pre_int pre interval
+ * @param post_int post intervall
+ * @param pdp_diff time till last pdp update
+ */
 static void
 rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdouble *pdp_temp, gdouble interval,
                gdouble pre_int, gdouble post_int, gulong pdp_diff)
@@ -675,6 +693,150 @@ rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file, gdouble *pdp_new, gdou
        }
 }
 
+/**
+ * Update CDP for this rra
+ * @param file rrd file
+ * @param pdp_steps how much pdp steps elapsed from the last update
+ * @param pdp_offset offset from pdp
+ * @param rra_steps how much steps must be updated for this rra
+ * @param rra_index index of desired rra
+ * @param pdp_temp temporary pdp points
+ */
+static void
+rspamd_rrd_update_cdp (struct rspamd_rrd_file *file, gdouble pdp_steps, gdouble pdp_offset, gulong *rra_steps, gulong rra_index,
+               gdouble *pdp_temp)
+{
+       guint                                                                    i;
+       struct rrd_rra_def                                              *rra;
+       rrd_value_t                                                             *scratch;
+       enum rrd_cf_type                                                 cf;
+       gdouble                                                                  last_cdp, cur_cdp;
+       gulong                                                                   pdp_in_cdp;
+
+       rra = &file->rra_def[rra_index];
+       cf = rrd_cf_from_string (rra->cf_nam);
+
+       /* Iterate over all DS for this RRA */
+       for (i = 0; i < file->stat_head->ds_cnt; i ++) {
+               /* Get CDP for this RRA and DS */
+               scratch = file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
+               if (rra->pdp_cnt > 1) {
+                       /* Do we have any CDP to update for this rra ? */
+                       if (rra_steps[rra_index] > 0) {
+                               if (isnan (pdp_temp[i])) {
+                                       /* New pdp is nan */
+                                       /* Increment unknown points count */
+                                       scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
+                                       /* Reset secondary value */
+                                       scratch[CDP_secondary_val].dv = NAN;
+                               }
+                               else {
+                                       scratch[CDP_secondary_val].dv = pdp_temp[i];
+                               }
+
+                               /* Check XFF for this rra */
+                               if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt * rra->par[RRA_cdp_xff_val].lv) {
+                                       /* XFF is reached */
+                                       scratch[CDP_primary_val].dv = NAN;
+                               }
+                               else {
+                                       /* Need to initialize CDP using specified consolidation */
+                                       switch (cf) {
+                                       case RRD_CF_AVERAGE:
+                                               last_cdp = isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
+                                               cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i];
+                                               scratch[CDP_primary_val].dv = (last_cdp + cur_cdp * pdp_offset) / (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
+                                               break;
+                                       case RRD_CF_MAXIMUM:
+                                               last_cdp = isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
+                                               cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i];
+                                               scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp);
+                                               break;
+                                       case RRD_CF_MINIMUM:
+                                               last_cdp = isnan (scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
+                                               cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i];
+                                               scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp);
+                                               break;
+                                       case RRD_CF_LAST:
+                                       default:
+                                               scratch[CDP_primary_val].dv = pdp_temp[i];
+                                               break;
+                                       }
+                               }
+                               /* Init carry of this CDP */
+                               pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
+                               if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) {
+                                       /* Set overflow */
+                                       switch (cf) {
+                                       case RRD_CF_AVERAGE:
+                                               scratch[CDP_val].dv = 0;
+                                               break;
+                                       case RRD_CF_MAXIMUM:
+                                               scratch[CDP_val].dv = -INFINITY;
+                                               break;
+                                       case RRD_CF_MINIMUM:
+                                               scratch[CDP_val].dv = INFINITY;
+                                               break;
+                                       default:
+                                               scratch[CDP_val].dv = NAN;
+                                               break;
+                                       }
+                               }
+                               else {
+                                       /* Special carry for average */
+                                       if (cf == RRD_CF_AVERAGE) {
+                                               scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
+                                       }
+                                       else {
+                                               scratch[CDP_val].dv = pdp_temp[i];
+                                       }
+                               }
+                       }
+                       /* In this case we just need to update cdp_prep for this RRA */
+                       else {
+                               if (isnan (pdp_temp[i])) {
+                                       /* Just increase undefined zone */
+                                       scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
+                               }
+                               else {
+                                       /* Calculate cdp value */
+                                       last_cdp = scratch[CDP_val].dv;
+                                       switch (cf) {
+                                       case RRD_CF_AVERAGE:
+                                               if (isnan (last_cdp)) {
+                                                       scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
+                                               }
+                                               else {
+                                                       scratch[CDP_val].dv = last_cdp + pdp_temp[i] * pdp_steps;
+                                               }
+                                               break;
+                                       case RRD_CF_MAXIMUM:
+                                               scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]);
+                                               break;
+                                       case RRD_CF_MINIMUM:
+                                               scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]);
+                                               break;
+                                       case RRD_CF_LAST:
+                                               scratch[CDP_val].dv = pdp_temp[i];
+                                               break;
+                                       default:
+                                               scratch[CDP_val].dv = NAN;
+                                               break;
+                                       }
+                               }
+                       }
+               }
+               else {
+                       /* We have nothing to consolidate, but we may miss some pdp */
+                       if (pdp_steps > 2) {
+                               /* Just write PDP value */
+                               scratch[CDP_primary_val].dv = pdp_temp[i];
+                               scratch[CDP_secondary_val].dv = pdp_temp[i];
+                       }
+               }
+       }
+}
+
 /**
  * Add record to rrd file
  * @param file rrd file object
@@ -689,7 +851,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
        gdouble                                                                 *row, interval, *pdp_new, *pdp_temp, pre_int, post_int;
        guint                                                                    i;
        gulong                                                                   pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
-                                                                                        prev_pdp_age, cur_pdp_age;
+                                                                                        prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
        struct timeval                                                   tv;
 
        if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) != points->len || rra_idx >= file->stat_head->rra_cnt) {
@@ -706,11 +868,14 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
        /* Update PDP preparation values */
        pdp_new = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
        pdp_temp = g_malloc (sizeof (gdouble) * file->stat_head->ds_cnt);
+       /* How much steps need to be updated in each RRA */
+       rra_steps = g_malloc (sizeof (gulong) * file->stat_head->rra_cnt);
 
        if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new, interval)) {
                g_set_error (err, rrd_error_quark (), EINVAL, "rrd update pdp failed: wrong arguments");
                g_free (pdp_new);
                g_free (pdp_temp);
+               g_free (rra_steps);
                return FALSE;
        }
 
@@ -758,6 +923,21 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
                /* Update PDP for this step */
                rspamd_rrd_update_pdp_step (file, pdp_new, pdp_temp, interval, pre_int, post_int, pdp_steps * file->stat_head->pdp_step);
 
+               /* Update CDP points for each RRA*/
+               for (i = 0; i < file->stat_head->rra_cnt; i ++) {
+                       /* Calculate pdp offset for this RRA */
+                       pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count % file->rra_def[i].pdp_cnt;
+                       /* How much steps we got for this RRA */
+                       if (pdp_offset <= pdp_steps) {
+                               rra_steps[i] = (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
+                       }
+                       else {
+                               /* This rra have not passed enough pdp steps */
+                               rra_steps[i] = 0;
+                       }
+                       /* Update this specific CDP */
+                       rspamd_rrd_update_cdp (file, pdp_steps, pdp_offset, rra_steps, i, pdp_temp);
+               }
        }
 
        /* Skip unaffected rra */
@@ -778,6 +958,7 @@ rspamd_rrd_add_record (struct rspamd_rrd_file* file, guint rra_idx, GArray *poin
 
        g_free (pdp_new);
        g_free (pdp_temp);
+       g_free (rra_steps);
 
        return TRUE;
 }