From: Peter Stamfest Date: Wed, 5 Mar 2014 18:58:17 +0000 (+0100) Subject: Second round of refactoring: Ripping apart rrd_modify_r into more digestable X-Git-Tag: v1.5.0-rc1~129^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=28d853e86051263864e700584c35fb5af62e18a0;p=thirdparty%2Frrdtool-1.x.git Second round of refactoring: Ripping apart rrd_modify_r into more digestable pieces. --- diff --git a/src/rrd_modify.c b/src/rrd_modify.c index 18255464..f7a6beac 100644 --- a/src/rrd_modify.c +++ b/src/rrd_modify.c @@ -12,6 +12,7 @@ #include "rrd_restore.h" /* write_file */ #include "rrd_create.h" /* parseDS */ #include "rrd_update.h" /* update_cdp */ +#include "unused.h" #include "fnv.h" @@ -263,13 +264,141 @@ done: return rc; } +/* + Handle all RRAs definitions (and associated CDP data structures), taking care + of RRA removals, and RRA row additions and removals. NOTE: data copying is + NOT done by this function, but it DOES calculate overall total_row + information needed for sizing the data area. + + returns the total number out RRA rows for both the in and out RRDs in the + variables pointed to by total_in_rra_rows and total_out_rra_rows respectively + */ +static int handle_rra_defs(const rrd_t *in, rrd_t *out, + rra_mod_op_t *rra_mod_ops, unsigned int rra_mod_ops_cnt, + const char *ds_ops, unsigned int ds_ops_cnt, + int *total_in_rra_rows, int *total_out_rra_rows) +{ + int rc = -1; + unsigned int j, r; + rra_ptr_t rra_0_ptr = { .cur_row = 0 }; + cdp_prep_t empty_cdp_prep; + memset(&empty_cdp_prep, 0, sizeof(empty_cdp_prep)); + + for (j = 0 ; j < in->stat_head->rra_cnt ; j++) { + if (total_in_rra_rows) + *total_in_rra_rows += in->rra_def[j].row_cnt; + + rra_mod_op_t *rra_op = NULL; + for (r = 0 ; r < rra_mod_ops_cnt ; r++) { + if (rra_mod_ops[r].index == (int) j) { + rra_op = rra_mod_ops + r; + break; + } + } + + int final_row_count = in->rra_def[j].row_cnt; + if (rra_op) { + switch (rra_op->op) { + case '=': + final_row_count = rra_op->row_count; + break; + case '-': + final_row_count -= rra_op->row_count; + break; + case '+': + final_row_count += rra_op->row_count; + break; + } + if (final_row_count < 0) final_row_count = 0; + /* record the final row_count. I don't like this, because + it changes the data passed to us via an argument: */ + + rra_op->final_row_count = final_row_count; + } + + // do we have to keep the RRA at all?? + if (final_row_count == 0) { + // delete the RRA! - just skip processing this RRA.... + continue; + } + + out->cdp_prep = realloc(out->cdp_prep, + sizeof(cdp_prep_t) * out->stat_head->ds_cnt + * (out->stat_head->rra_cnt + 1)); + + if (out->cdp_prep == NULL) { + rrd_set_error("Cannot allocate memory"); + goto done; + } + + /* for every RRA copy only those CDPs in the prep area where we keep + the DS! */ + + int start_index_in = in->stat_head->ds_cnt * j; + int start_index_out = out->stat_head->ds_cnt * out->stat_head->rra_cnt; + + out->rra_def = copy_over_realloc(out->rra_def, out->stat_head->rra_cnt, + in->rra_def, j, + sizeof(rra_def_t)); + if (out->rra_def == NULL) goto done; + + // adapt row count: + out->rra_def[out->stat_head->rra_cnt].row_cnt = final_row_count; + + out->rra_ptr = copy_over_realloc(out->rra_ptr, out->stat_head->rra_cnt, + &rra_0_ptr, 0, + sizeof(rra_ptr_t)); + if (out->rra_ptr == NULL) goto done; + + out->rra_ptr[out->stat_head->rra_cnt].cur_row = final_row_count - 1; + + unsigned int i, ii; + for (i = ii = 0 ; i < ds_ops_cnt ; i++) { + switch (ds_ops[i]) { + case 'c': { + memcpy(out->cdp_prep + start_index_out + ii, + in->cdp_prep + start_index_in + i, + sizeof(cdp_prep_t)); + ii++; + break; + } + case 'a': { + cdp_prep_t *cdp_prep = out->cdp_prep + start_index_out + ii; + memcpy(cdp_prep, + &empty_cdp_prep, sizeof(cdp_prep_t)); + + init_cdp(out, + out->rra_def + out->stat_head->rra_cnt, + cdp_prep); + ii++; + break; + } + case 'd': + break; + default: + rrd_set_error("internal error: invalid ops"); + goto done; + } + } + + if (total_out_rra_rows) + *total_out_rra_rows += out->rra_def[out->stat_head->rra_cnt].row_cnt; + + out->stat_head->rra_cnt++; + } + rc = 0; +done: + return rc; +} + + /* add datasources as specified in the addDS array of DS specs as deocumented in rrdcreate(1) Returns the number of DSs added or -1 on error */ -static int add_dss(const rrd_t *in, rrd_t *out, +static int add_dss(const rrd_t UNUSED(*in), rrd_t *out, const char **addDS) { if (addDS == NULL) return 0; @@ -353,7 +482,8 @@ static int populate_row(const rrd_t *in_rrd, int cur_row, rrd_value_t *values, int populate_start, - int populate_cnt) { + int populate_cnt) +{ int rc = -1; if (in_rrd->stat_head->rra_cnt < 1) return 0; @@ -499,6 +629,142 @@ static int populate_row(const rrd_t *in_rrd, return rc; } +static int mod_rras(const rrd_t *in, rrd_t *out, const int *ds_map, + const rra_mod_op_t *rra_mod_ops, unsigned int rra_mod_ops_cnt, + const char *ds_ops, unsigned int ds_ops_cnt) +{ + int rc = -1; + unsigned int rra_index, j; + int total_cnt = 0, total_cnt_out = 0; + int out_rra = 0; // index of currently copied RRA + + for (rra_index = 0; rra_index < in->stat_head->rra_cnt; rra_index++) { + const rra_mod_op_t *rra_op = NULL; + for (unsigned int op_index = 0 ; op_index < rra_mod_ops_cnt ; op_index++) { + if (rra_mod_ops[op_index].index == (int)rra_index) { + rra_op = rra_mod_ops + op_index; + break; + } + } + + if (rra_op && rra_op->final_row_count == 0) { + // RRA deleted - skip ! + continue; + } + + /* number and sizes of all the data in an RRA */ + int rra_values = in->stat_head->ds_cnt * in->rra_def[rra_index].row_cnt; + int rra_values_out = out->stat_head->ds_cnt * out->rra_def[out_rra].row_cnt; + + /* we now have all the data for the current RRA available, now + start to transfer it to the output RRD: For every row copy + the data corresponding to copied DSes, add NaN values for newly + added DSes. */ + + unsigned int ii = 0, jj, oi = 0; + + /* we have to decide beforehand about row addition and + deletion, because this takes place in the front of the + rrd_value array.... + */ + + if (rra_op) { + char op = rra_op->op; + unsigned int row_count = rra_op->row_count; + + // rewrite '=' ops into '-' or '+' for better code-reuse... + if (op == '=') { + if (row_count < in->rra_def[rra_index].row_cnt) { + row_count = in->rra_def[rra_index].row_cnt - row_count; + op = '-'; + } else if (row_count > in->rra_def[rra_index].row_cnt) { + row_count = row_count - in->rra_def[rra_index].row_cnt; + op = '+'; + } else { + // equal - special case: nothing to do... + } + } + + switch (op) { + case '=': + // no op + break; + case '-': + // remove rows: just skip the first couple of rows! + ii = row_count; + break; + case '+': + // add rows: insert the requested number of rows! + // currently, just add the all as NaN values... + + for ( ; oi < row_count ; oi++) { + for (j = 0 ; j < out->stat_head->ds_cnt ; j++) { + out->rrd_value[total_cnt_out + + oi * out->stat_head->ds_cnt + + j] = DNAN; + } + } + + // now try to populate the newly added rows + populate_row(in, out, ds_map, + out->rra_def + out_rra, + out->rra_ptr[rra_index].cur_row, + out->rrd_value + total_cnt_out, + 0, row_count); + + break; + default: + rrd_set_error("RRA modification operation '%c' " + "not (yet) supported", rra_op->op); + goto done; + } + } + + /* now do the actual copying of data */ + + for ( ; ii < in->rra_def[rra_index].row_cnt + && oi < out->rra_def[out_rra].row_cnt ; ii++, oi++) { + int real_ii = (ii + in->rra_ptr[rra_index].cur_row + 1) % in->rra_def[rra_index].row_cnt; + for (j = jj = 0 ; j < ds_ops_cnt ; j++) { + switch (ds_ops[j]) { + case 'c': { + out->rrd_value[total_cnt_out + oi * out->stat_head->ds_cnt + jj] = + in->rrd_value[total_cnt + real_ii * in->stat_head->ds_cnt + j]; + + /* it might be better to use memcpy, actually (to + treat them opaquely)... so keep the code for + the time being */ + /* + memcpy((void*) (out.rrd_value + total_cnt_out + oi * out.stat_head->ds_cnt + jj), + (void*) (in.rrd_value + total_cnt + real_ii * in.stat_head->ds_cnt + j), sizeof(rrd_value_t)); + */ + jj++; + break; + } + case 'a': { + out->rrd_value[total_cnt_out + oi * out->stat_head->ds_cnt + jj] = DNAN; + jj++; + break; + } + case 'd': + break; + default: + rrd_set_error("internal error: invalid ops"); + goto done; + } + } + } + + total_cnt += rra_values; + total_cnt_out += rra_values_out; + + out_rra++; + } + rc = 0; +done: + return rc; +} + /* copies the RRD named by infilename to a new RRD called outfilename. @@ -514,7 +780,8 @@ static int rrd_modify_r(const char *infilename, const char *outfilename, const char **removeDS, const char **addDS, - const rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt) { + rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt) +{ rrd_t in, out; int rc = -1; unsigned int i, j; @@ -596,7 +863,7 @@ static int rrd_modify_r(const char *infilename, // record DSs to be deleted in ds_ops if (removeDS != NULL) { - for (int in_ds = 0 ; in_ds < in.stat_head->ds_cnt ; in_ds++) { + for (unsigned int in_ds = 0 ; in_ds < in.stat_head->ds_cnt ; in_ds++) { const char *c; for (j = 0, c = removeDS[j] ; c ; j++, c = removeDS[j]) { if (strcmp(in.ds_def[in_ds].ds_nam, c) == 0) { @@ -613,21 +880,19 @@ static int rrd_modify_r(const char *infilename, } /* now add any DS definitions to be added */ - if (addDS) { - int added_cnt = add_dss(&in, &out, addDS); - if (added_cnt < 0) { - // error - goto done; - } - if (added_cnt > 0) { - // and extend the ds_ops array as well - ds_ops = realloc(ds_ops, ds_ops_cnt + added_cnt); - for(; added_cnt > 0 ; added_cnt--) { - ds_ops[ds_ops_cnt++] = 'a'; - } + int added_cnt = add_dss(&in, &out, addDS); + if (added_cnt < 0) { + // error + goto done; + } + if (added_cnt > 0) { + // and extend the ds_ops array as well + ds_ops = realloc(ds_ops, ds_ops_cnt + added_cnt); + for(; added_cnt > 0 ; added_cnt--) { + ds_ops[ds_ops_cnt++] = 'a'; } } - + /* prepare explicit data source index to map from output index to input index */ @@ -652,116 +917,13 @@ static int rrd_modify_r(const char *infilename, /* we also reorder all rows, adding/removing rows as needed */ - rra_ptr_t rra_0_ptr = { .cur_row = 0 }; - - cdp_prep_t empty_cdp_prep; - memset(&empty_cdp_prep, 0, sizeof(empty_cdp_prep)); - + /* later on, we'll need to know the total number of rows for both RRDs in + order to allocate memory. Luckily, handle_rra_defs will give that to us. */ int total_out_rra_rows = 0, total_in_rra_rows = 0; - rra_mod_op_t *rra_op = NULL; - int r; - for (j = 0 ; j < in.stat_head->rra_cnt ; j++) { - total_in_rra_rows += in.rra_def[j].row_cnt; - - rra_op = NULL; - for (r = 0 ; r < rra_mod_ops_cnt ; r++) { - if (rra_mod_ops[r].index == (int) j) { - rra_op = rra_mod_ops + r; - break; - } - } - - int final_row_count = in.rra_def[j].row_cnt; - if (rra_op) { - switch (rra_op->op) { - case '=': - final_row_count = rra_op->row_count; - break; - case '-': - final_row_count -= rra_op->row_count; - break; - case '+': - final_row_count += rra_op->row_count; - break; - } - if (final_row_count < 0) final_row_count = 0; - /* record the final row_count. I don't like this, because - it changes the data passed to us via an argument: */ - - rra_op->final_row_count = final_row_count; - } - - // do we have to keep the RRA at all?? - if (final_row_count == 0) { - // delete the RRA! - just skip processing this RRA.... - continue; - } - - out.cdp_prep = realloc(out.cdp_prep, - sizeof(cdp_prep_t) * out.stat_head->ds_cnt - * (out.stat_head->rra_cnt + 1)); - - if (out.cdp_prep == NULL) { - rrd_set_error("Cannot allocate memory"); - goto done; - } - - /* for every RRA copy only those CDPs in the prep area where we keep - the DS! */ - - int start_index_in = in.stat_head->ds_cnt * j; - int start_index_out = out.stat_head->ds_cnt * out.stat_head->rra_cnt; - - out.rra_def = copy_over_realloc(out.rra_def, out.stat_head->rra_cnt, - in.rra_def, j, - sizeof(rra_def_t)); - if (out.rra_def == NULL) goto done; - - // adapt row count: - out.rra_def[out.stat_head->rra_cnt].row_cnt = final_row_count; - - out.rra_ptr = copy_over_realloc(out.rra_ptr, out.stat_head->rra_cnt, - &rra_0_ptr, 0, - sizeof(rra_ptr_t)); - if (out.rra_ptr == NULL) goto done; - - out.rra_ptr[out.stat_head->rra_cnt].cur_row = final_row_count - 1; - - int ii; - for (i = ii = 0 ; i < ds_ops_cnt ; i++) { - switch (ds_ops[i]) { - case 'c': { - memcpy(out.cdp_prep + start_index_out + ii, - in.cdp_prep + start_index_in + i, - sizeof(cdp_prep_t)); - ii++; - break; - } - case 'a': { - cdp_prep_t *cdp_prep = out.cdp_prep + start_index_out + ii; - memcpy(cdp_prep, - &empty_cdp_prep, sizeof(cdp_prep_t)); - - init_cdp(&out, - out.rra_def + out.stat_head->rra_cnt, - cdp_prep); - ii++; - break; - } - case 'd': - break; - default: - rrd_set_error("internal error: invalid ops"); - goto done; - } - } - - total_out_rra_rows += out.rra_def[out.stat_head->rra_cnt].row_cnt; - - out.stat_head->rra_cnt++; - } - + rc = handle_rra_defs(&in, &out, rra_mod_ops, rra_mod_ops_cnt, ds_ops, ds_ops_cnt, &total_in_rra_rows, &total_out_rra_rows); + if (rc != 0) goto done; + /* read and process all data ... */ /* there seem to be two function in the current rrdtool codebase @@ -776,15 +938,12 @@ static int rrd_modify_r(const char *infilename, - why we reset cur_row in RRAs and reorder data to be cronological */ - char *all_data = NULL; - - /* prepare space to read data in */ - all_data = realloc(all_data, - total_in_rra_rows * in.stat_head->ds_cnt - * sizeof(rrd_value_t)); - in.rrd_value = (void *) all_data; + /* prepare space to read data into */ + in.rrd_value = realloc(in.rrd_value, + total_in_rra_rows * in.stat_head->ds_cnt + * sizeof(rrd_value_t)); - if (all_data == NULL) { + if (in.rrd_value == NULL) { rrd_set_error("out of memory"); goto done; } @@ -799,13 +958,6 @@ static int rrd_modify_r(const char *infilename, goto done; } - ssize_t rra_start = 0, rra_start_out = 0; - - int total_cnt = 0, total_cnt_out = 0; - - int out_rra = 0; - - /* Before we do any other operation on RRAs, we read in all data. This is important, because in the second pass we may @@ -816,151 +968,19 @@ static int rrd_modify_r(const char *infilename, size_t to_read = total_in_rra_rows * sizeof(rrd_value_t) * in.stat_head->ds_cnt; size_t read_bytes; - read_bytes = rrd_read(rrd_file, all_data, to_read); + read_bytes = rrd_read(rrd_file, in.rrd_value, to_read); if (read_bytes != to_read) { rrd_set_error("short read 2"); goto done; } - total_cnt = 0; - - for (i = 0; i < in.stat_head->rra_cnt; i++) { - rra_op = NULL; - for (r = 0 ; r < rra_mod_ops_cnt ; r++) { - if (rra_mod_ops[r].index == (int)i) { - rra_op = rra_mod_ops + r; - break; - } - } - - if (rra_op) { - if (rra_op->final_row_count == 0) { - // RRA deleted - skip ! - continue; - } - } - - /* number and sizes of all the data in an RRA */ - int rra_values = in.stat_head->ds_cnt * in.rra_def[i].row_cnt; - int rra_values_out = out.stat_head->ds_cnt * out.rra_def[out_rra].row_cnt; - - ssize_t rra_size = sizeof(rrd_value_t) * rra_values; - ssize_t rra_size_out = sizeof(rrd_value_t) * rra_values_out; - - - /* we now have all the data for the current RRA available, now - start to transfer it to the output RRD: For every row copy - the data corresponding to copied DSes, add NaN values for newly - added DSes. */ - - unsigned int ii = 0, jj, oi = 0; - - /* we have to decide beforehand about row addition and - deletion, because this takes place in the front of the - rrd_value array.... - */ - - if (rra_op) { - char op = rra_op->op; - unsigned int row_count = rra_op->row_count; - - // rewrite '=' ops into '-' or '+' for better code-reuse... - if (op == '=') { - if (row_count < in.rra_def[i].row_cnt) { - row_count = in.rra_def[i].row_cnt - row_count; - op = '-'; - } else if (row_count > in.rra_def[i].row_cnt) { - row_count = row_count - in.rra_def[i].row_cnt; - op = '+'; - } else { - // equal - special case: nothing to do... - } - } - - switch (op) { - case '=': - // no op - break; - case '-': - // remove rows: just skip the first couple of rows! - ii = row_count; - break; - case '+': - // add rows: insert the requested number of rows! - // currently, just add the all as NaN values... - - for ( ; oi < row_count ; oi++) { - for (j = 0 ; j < out.stat_head->ds_cnt ; j++) { - out.rrd_value[total_cnt_out + - oi * out.stat_head->ds_cnt + - j] = DNAN; - } - } - - // now try to populate the newly added rows - populate_row(&in, &out, ds_map, - out.rra_def + out_rra, - out.rra_ptr[i].cur_row, - out.rrd_value + total_cnt_out, - 0, row_count); - - break; - default: - rrd_set_error("RRA modification operation '%c' " - "not (yet) supported", rra_op->op); - goto done; - } - } - - /* now do the actual copying of data */ - - for ( ; ii < in.rra_def[i].row_cnt - && oi < out.rra_def[out_rra].row_cnt ; ii++, oi++) { - int real_ii = (ii + in.rra_ptr[i].cur_row + 1) % in.rra_def[i].row_cnt; - for (j = jj = 0 ; j < ds_ops_cnt ; j++) { - switch (ds_ops[j]) { - case 'c': { - out.rrd_value[total_cnt_out + oi * out.stat_head->ds_cnt + jj] = - in.rrd_value[total_cnt + real_ii * in.stat_head->ds_cnt + j]; - - /* it might be better to use memcpy, actually (to - treat them opaquely)... so keep the code for - the time being */ - /* - memcpy((void*) (out.rrd_value + total_cnt_out + oi * out.stat_head->ds_cnt + jj), - (void*) (in.rrd_value + total_cnt + real_ii * in.stat_head->ds_cnt + j), sizeof(rrd_value_t)); - */ - jj++; - break; - } - case 'a': { - out.rrd_value[total_cnt_out + oi * out.stat_head->ds_cnt + jj] = DNAN; - jj++; - break; - } - case 'd': - break; - default: - rrd_set_error("internal error: invalid ops"); - goto done; - } - } - } - - rra_start += rra_size; - rra_start_out += rra_size_out; - - total_cnt += rra_values; - total_cnt_out += rra_values_out; - - out_rra++; - } + rc = mod_rras(&in, &out, ds_map, rra_mod_ops, rra_mod_ops_cnt, ds_ops, ds_ops_cnt); + if (rc != 0) goto done; unsigned long hashed_name = FnvHash(outfilename); rc = add_rras(&in, &out, ds_map, rra_mod_ops, rra_mod_ops_cnt, hashed_name); - if (rc != 0) goto done; rc = write_rrd(outfilename, &out); @@ -1143,6 +1163,7 @@ static void prepare_CDPs(const rrd_t *in, rrd_t *out, if (candidates) free(candidates); } + static int add_rras(const rrd_t *in, rrd_t *out, const int *ds_map, const rra_mod_op_t *rra_mod_ops, int rra_mod_ops_cnt, unsigned long hash) { @@ -1265,7 +1286,6 @@ done: return rc; } - static int write_rrd(const char *outfilename, rrd_t *out) { int rc = -1; char *tmpfile = NULL;