From e6194caa38b5cc3c5e015a8404e53fcea74dae40 Mon Sep 17 00:00:00 2001 From: Peter Stamfest Date: Mon, 3 Mar 2014 08:25:12 +0100 Subject: [PATCH] Make RRA modifications work in presence of simultanous DS modifications --- src/rrd_modify.c | 86 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 24 deletions(-) diff --git a/src/rrd_modify.c b/src/rrd_modify.c index fc7628b6..f421e896 100644 --- a/src/rrd_modify.c +++ b/src/rrd_modify.c @@ -38,7 +38,8 @@ static int positive_mod(int a, int b) { // prototypes static int write_rrd(const char *outfilename, rrd_t *out); -static int add_rras(rrd_t *out, rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt, unsigned long hash); +static int add_rras(const rrd_t *in, rrd_t *out, const int *ds_map, + rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt, unsigned long hash); /* a convenience realloc/memcpy combo */ static void * copy_over_realloc(void *dest, int dest_index, @@ -68,7 +69,7 @@ static void * copy_over_realloc(void *dest, int dest_index, typedef struct { - rrd_t *rrd; + const rrd_t *rrd; int rra_index; rrd_value_t *values; } candidate_t; @@ -146,16 +147,25 @@ static int row_for_time(const rrd_t *rrd, } /* - rrd .. the RRD to use for the search of other RRAs to populate the new RRA + in_rrd .. the RRD to use for the search of other RRAs to populate the new RRA + out_rrd .. the RRD new_rra is part of new_rra .. the RRA to populate. It is assumed, that this RRA will become part of rrd. This means that all meta settings (step size, last update time, etc.) not part of the RRA definition can be taken from rrd. + cur_row .. The cur_row value for new_rra. This is not kept with the def, so it + has to be passed separately + values .. Pointer to the 0-index row of the RRA populate_start .. the first row to populate in new_rra populate_cnt .. the number of rows to populate in new_rra, starting at populate_start + ds_map .. maps the DS indices from the ones used in new_rra to the ones used in + rrd. If NULL, an identity mapping is used. This is needed to support + DS addition/removal from the rrd to new_rra. */ -static int populate_row(rrd_t *rrd, +static int populate_row(const rrd_t *in_rrd, + const rrd_t *out_rrd, + const int *ds_map, rra_def_t *new_rra, int cur_row, rrd_value_t *values, @@ -163,7 +173,7 @@ static int populate_row(rrd_t *rrd, int populate_cnt) { int rc = -1; - if (rrd->stat_head->rra_cnt <= 1) return 0; + if (in_rrd->stat_head->rra_cnt <= 1) return 0; enum cf_en cf = cf_conv(new_rra->cf_nam); switch (cf) { @@ -176,7 +186,7 @@ static int populate_row(rrd_t *rrd, return 0; } - int ds_cnt = rrd->stat_head->ds_cnt; + int ds_cnt = in_rrd->stat_head->ds_cnt; candidate_t *candidates = NULL; int candidates_cnt = 0; @@ -186,8 +196,8 @@ static int populate_row(rrd_t *rrd, /* find other rows with the same CF or an RRA with CF_AVERAGE and a stepping of 1 as possible candidates for filling */ - for (i = 0 ; i < (int) rrd->stat_head->rra_cnt ; i++) { - rra_def_t *other_rra = rrd->rra_def + i; + for (i = 0 ; i < (int) in_rrd->stat_head->rra_cnt ; i++) { + rra_def_t *other_rra = in_rrd->rra_def + i; // can't use our own data if (other_rra == new_rra) { @@ -198,9 +208,9 @@ static int populate_row(rrd_t *rrd, if (other_cf == cf || (other_cf == CF_AVERAGE && other_rra->pdp_cnt == 1)) { candidate_t c = { - .rrd = rrd, + .rrd = in_rrd, .rra_index = i, - .values = rrd->rrd_value + ds_cnt * total_rows + .values = in_rrd->rrd_value + ds_cnt * total_rows }; candidates = copy_over_realloc(candidates, candidates_cnt, &c, 0, sizeof(c)); @@ -228,9 +238,9 @@ static int populate_row(rrd_t *rrd, for (ri = 0 ; ri < populate_cnt ; ri++) { int row = populate_start + ri; - time_t new_timeslot = new_rra->pdp_cnt * rrd->stat_head->pdp_step; + time_t new_timeslot = new_rra->pdp_cnt * out_rrd->stat_head->pdp_step; - time_t row_end_time = end_time_for_row(rrd, new_rra, cur_row, row); + time_t row_end_time = end_time_for_row(out_rrd, new_rra, cur_row, row); time_t row_start_time = row_end_time - new_timeslot + 1; /* now walk all candidates */ @@ -241,8 +251,8 @@ static int populate_row(rrd_t *rrd, int cand_cur_row = c->rrd->rra_ptr[c->rra_index].cur_row; /* find a matching range of rows */ - int cand_row_start = row_for_time(rrd, r, cand_cur_row, row_start_time); - int cand_row_end = row_for_time(rrd, r, cand_cur_row, row_end_time); + int cand_row_start = row_for_time(in_rrd, r, cand_cur_row, row_start_time); + int cand_row_end = row_for_time(in_rrd, r, cand_cur_row, row_end_time); if (cand_row_start == -1 && cand_row_end != -1) { cand_row_start = cand_cur_row; @@ -268,16 +278,19 @@ static int populate_row(rrd_t *rrd, int cand_timeslot = r->pdp_cnt * c->rrd->stat_head->pdp_step; - for (int k = 0 ; k < ds_cnt ; k++) { + int out_ds_cnt = out_rrd->stat_head->ds_cnt; + for (int k = 0 ; k < out_ds_cnt ; k++) { int cand_row, ci ; rrd_value_t tmp = DNAN, final = DNAN; int covered = 0; + int in_k = ds_map ? ds_map[k] : k; + for (cand_row = cand_row_start, ci = 0 ; ci < cand_rows ; ci++, cand_row = (cand_row + 1) % r->row_cnt) { - rrd_value_t v = c->values[cand_row * ds_cnt + k]; + rrd_value_t v = c->values[cand_row * ds_cnt + in_k]; if (isnan(v)) continue; @@ -301,7 +314,7 @@ static int populate_row(rrd_t *rrd, } } - values[row * ds_cnt + k] = final; + values[row * out_ds_cnt + k] = final; } } } @@ -339,6 +352,7 @@ static int rrd_modify_r(const char *infilename, char *old_locale = NULL; char *ops = NULL; unsigned int ops_cnt = 0; + int *ds_map = NULL; old_locale = setlocale(LC_NUMERIC, NULL); setlocale(LC_NUMERIC, "C"); @@ -497,6 +511,25 @@ static int rrd_modify_r(const char *infilename, } } + /* prepare explicit data source index to map from output index to + input index */ + + ds_map = malloc(sizeof(int) * out.stat_head->ds_cnt); + + j = 0; + for (i = 0 ; i < ops_cnt ; i++) { + switch (ops[i]) { + case 'c': + ds_map[j++] = i; + break; + case 'd': + break; + case 'a': + ds_map[j++] = -1; + break; + } + } + /* now take care to copy all RRAs, removing and adding columns for every row as needed for the requested DS changes */ @@ -749,15 +782,11 @@ static int rrd_modify_r(const char *infilename, } // now try to populate the newly added rows - populate_row(&in, + populate_row(&in, &out, ds_map, out.rra_def + out_rra, out.rra_ptr[i].cur_row, out.rrd_value + total_cnt_out, 0, row_count); - /* - static int populate_row(rrd_t *rrd, rra_def_t *new_rra, int new_step, - int populate_start, int populate_cnt) { - */ break; default: @@ -813,7 +842,7 @@ static int rrd_modify_r(const char *infilename, unsigned long hashed_name = FnvHash(outfilename); - rc = add_rras(&out, rra_mod_ops, rra_mod_ops_cnt, hashed_name); + rc = add_rras(&in, &out, ds_map, rra_mod_ops, rra_mod_ops_cnt, hashed_name); if (rc != 0) goto done; @@ -831,11 +860,13 @@ done: rrd_free(&out); if (ops != NULL) free(ops); + if (ds_map != NULL) free(ds_map); return rc; } -static int add_rras(rrd_t *out, rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt, unsigned long hash) +static int add_rras(const rrd_t *in, rrd_t *out, const int *ds_map, + rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt, unsigned long hash) { int rc = -1; @@ -952,6 +983,13 @@ static int add_rras(rrd_t *out, rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt, int rra_values_out = out->stat_head->ds_cnt * rra_def->row_cnt; // ssize_t rra_size_out = sizeof(rrd_value_t) * rra_values_out; + // now try to populate the newly added rows + populate_row(in, out, ds_map, + rra_def, + out->rra_ptr[last_rra_cnt].cur_row, + out->rrd_value + total_cnt_out, + 0, rra_def->row_cnt); + // rra_start_out += rra_size_out; total_cnt_out += rra_values_out; } -- 2.47.3