From 552bafe5d6ee06187759a6ef65561db4c0dc43ad Mon Sep 17 00:00:00 2001 From: Peter Stamfest Date: Thu, 28 Aug 2014 23:49:58 +0200 Subject: [PATCH] CDP prefilling should now mostly work... --- src/rrd_create.c | 255 ++++++++++++++++++++++-------------- tests/tune2-testa-mod1.dump | 4 +- tests/tune2-testorg.dump | 4 +- 3 files changed, 162 insertions(+), 101 deletions(-) diff --git a/src/rrd_create.c b/src/rrd_create.c index 0877ff61..c2725944 100644 --- a/src/rrd_create.c +++ b/src/rrd_create.c @@ -951,10 +951,12 @@ static void reset_pdp_prep(rrd_t *rrd) { for (ds_index = 0 ; ds_index < rrd->stat_head->ds_cnt ; ds_index++) { strcpy(rrd->pdp_prep[ds_index].last_ds, "U"); - /* interestingly, "U" is associated with a value of 0.0, traditionally. - * I do not know why. Ask Tobi. + /* interestingly, "U" was associated sometimes with a value of 0.0 + * and sometimes with a value of DNAN traditionally during create. + * I do not know why. Ask Tobi. :-) + * I choose DNAN here, because it makes more sense, IMHO. */ - rrd->pdp_prep[ds_index].scratch[PDP_val].u_val = 0; + rrd->pdp_prep[ds_index].scratch[PDP_val].u_val = DNAN; rrd->pdp_prep[ds_index].scratch[PDP_unkn_sec_cnt].u_cnt = rrd->live_head->last_up % rrd->stat_head->pdp_step; } @@ -1734,64 +1736,117 @@ static void prefill_pdp_prep(candidate_t *target, const candidate_t *candidates, } } -static void prefill_cdp_prep(candidate_t *target, rra_def_t *average_1_pdp_rra) +static void get_cdp_start_end(const rrd_t *rrd, const rra_def_t *rra, time_t *begin, time_t *end) { + int cdp_step = rra->pdp_cnt * rrd->stat_head->pdp_step; + *begin = (rrd->live_head->last_up / cdp_step) * cdp_step + 1; + *end = *begin + cdp_step - 1; +} + +static void prefill_cdp_prep(candidate_t *target, + candidate_t *candidates, int candidate_cnt, + long cdp_rra_index) { rrd_t *rrd = target->rrd; - int ds_index = target->extra.l; - - int cdp_step = target->rra->pdp_cnt * rrd->stat_head->pdp_step; - - time_t current_cdp_begin_time = - (rrd->live_head->last_up / cdp_step) * cdp_step + 1; - time_t current_cdp_end_time = current_cdp_begin_time + cdp_step - 1; - - - - - - - - - - - - -#if 0 + time_t current_cdp_begin_time, current_cdp_end_time; + get_cdp_start_end(rrd, target->rra, ¤t_cdp_begin_time, ¤t_cdp_end_time); const rrd_t *srrd = NULL; + + /* the list of candidates only contains RRAs with the same PDP count, we + * still have to check if the step times are the same and if we + * have the time covered by the CDPs...*/ for (int c = 0 ; c < candidate_cnt ; c++) { const candidate_t *candidate = candidates + c; - /* only look at each source RRD once. We use the fact that all - * candidates are block-sorted by the source RRDs */ - if (srrd == candidate->rrd) continue; srrd = candidate->rrd; -/* - if (strncmp(srrd->pdp_prep[candidate->extra.l].last_ds, "U", LAST_DS_LEN) == 0) { - /* this source RRD does not have a usable last DS value - skip */ - } - - */ - time_t source_cdp_begin_time = - (srrd->live_head->last_up / cdp_step) * cdp_step + 1; - time_t source_cdp_end_time = source_cdp_begin_time + cdp_step - 1; - - if (source_cdp_end_time <= current_cdp_end_time - && source_cdp_end_time >= current_cdp_begin_time) { - printf("PREFILL CDP !!!\n"); - - memcpy(target->cdp + target->extra.l, - candidate->cdp + candidate->extra.l, - sizeof(cdp_prep_t)); + + if (rrd->stat_head->pdp_step != srrd->stat_head->pdp_step) continue; + + time_t source_cdp_begin_time, source_cdp_end_time; + get_cdp_start_end(srrd, candidate->rra, &source_cdp_begin_time, &source_cdp_end_time); + + if (source_cdp_begin_time == current_cdp_begin_time + && source_cdp_end_time == current_cdp_end_time) { + /* CDPs are fully compatible, but still: do the last update time match the very same bin? + * Only then are we able to just copy over data */ - break; + if (rrd->live_head->last_up / rrd->stat_head->pdp_step == srrd->live_head->last_up / srrd->stat_head->pdp_step) { + // OK, fully compatible... + memcpy(target->cdp + target->extra.l, + candidate->cdp + candidate->extra.l, sizeof(cdp_prep_t)); + return; + } } } -#endif + /* we are still here: we have not found a compatible CDP, use the "CDP RRA"... */ + /* void init_cdp(const rrd_t *rrd, const rra_def_t *rra_def, + * const pdp_prep_t *pdp_prep, cdp_prep_t *cdp_prep) */ + + // must find first value index first.... -} + int ii, first_cdp_rra_value = 0; + for (ii = 0 ; ii < cdp_rra_index ; ii++) { + first_cdp_rra_value += rrd->rra_def[ii].row_cnt; + } + + init_cdp(rrd, target->rra, rrd->pdp_prep + target->extra.l, + target->cdp + target->extra.l); + // explicitly set unknown count to 0... + + (target->cdp + target->extra.l)->scratch[CDP_unkn_pdp_cnt].u_cnt = 0; + + enum cf_en tcf = cf_conv(target->rra->cf_nam); + time_t t; + + rra_def_t *cdp_rra = rrd->rra_def + cdp_rra_index; + + if (target->rra->pdp_cnt == 1) { + /* special case: for pdp_cnt == 1 RRAs, we may just use the last average + * value for the CDP primary value */ + target->cdp[target->extra.l].scratch[CDP_unkn_pdp_cnt].u_cnt = 0; + target->cdp[target->extra.l].scratch[CDP_primary_val].u_val = 0; + target->cdp[target->extra.l].scratch[CDP_secondary_val].u_val = 0; + + int r = row_for_time(rrd, cdp_rra, + rrd->rra_ptr[cdp_rra_index].cur_row, + current_cdp_begin_time - target->rrd->stat_head->pdp_step); + + if (r >= 0) { + rrd_value_t v = rrd->rrd_value[(first_cdp_rra_value + r) * rrd->stat_head->ds_cnt + + target->extra.l]; + target->cdp[target->extra.l].scratch[CDP_primary_val].u_val = v; + } + + return; + } + + int cdp_step = rrd->stat_head->pdp_step * target->rra->pdp_cnt; + for (t = current_cdp_begin_time - target->rra->pdp_cnt * rrd->stat_head->pdp_step ; + t < current_cdp_end_time && t < rrd->live_head->last_up ; + t += rrd->stat_head->pdp_step) { + + int r = row_for_time(rrd, cdp_rra, + rrd->rra_ptr[cdp_rra_index].cur_row, + t); + + if (r < 0) continue; + rrd_value_t v = rrd->rrd_value[(first_cdp_rra_value + r) * rrd->stat_head->ds_cnt + + target->extra.l]; + int n = target->rra->pdp_cnt - (t % cdp_step) / rrd->stat_head->pdp_step; + + update_cdp(target->cdp[target->extra.l].scratch, + tcf, + v, // pdp_temp_val + n == 1, // rra_step_cnt + 1, // elapsed_pdp_st + n, // start_pdp_offset + target->rra->pdp_cnt, // pdp_cnt + target->rra->par[RRA_cdp_xff_val].u_val, // xff + 0, 0); + } +} static unsigned long find_ds_match(const ds_def_t *ds_def, const rrd_t *src_rrd) { unsigned long source_ds_index; @@ -1804,8 +1859,6 @@ static unsigned long find_ds_match(const ds_def_t *ds_def, const rrd_t *src_rrd) return src_rrd->stat_head->ds_cnt; } - - /* Find a set of RRAs among all RRA in the sources list, as matched by the select_func and * ordered (by source) according to order_func. */ @@ -1878,15 +1931,18 @@ done: * * If we do not already have something compatible, we add a temporary RRA that * can/should be removed again after CDP prefilling + * + * Returns the index of a suitable RRA in the target RRD. If that RRA was + * added just for this purpose, the *added flag will be true and false otherwise. */ -static long add_temporary_rra_for_cdp_prefilling(rrd_t *rrd) +static long rra_for_cdp_prefilling(rrd_t *rrd, int *added) { // find largest pdp count unsigned long largest_pdp = 0; rra_def_t *average_1_pdp_rra = NULL; - long temp_added_rra_index = -1; + long found_rra_index = -1; unsigned long original_total_rows = 0; unsigned long rra_index; @@ -1899,19 +1955,23 @@ static long add_temporary_rra_for_cdp_prefilling(rrd_t *rrd) if (rra->pdp_cnt == 1 && cf_conv(rra->cf_nam) == CF_AVERAGE) { if (average_1_pdp_rra == NULL) { average_1_pdp_rra = rra; + found_rra_index = rra_index; } else if (average_1_pdp_rra->row_cnt < rra->row_cnt) { average_1_pdp_rra = rra; + found_rra_index = rra_index; } } } - + largest_pdp *= 2; + *added = 0; if (average_1_pdp_rra == NULL || average_1_pdp_rra->row_cnt < largest_pdp) { // add our own temporary average RRA with a PDP count == 1 - + /* temporarily extend */ rrd->stat_head->rra_cnt++; - temp_added_rra_index = rrd->stat_head->rra_cnt - 1; + found_rra_index = rrd->stat_head->rra_cnt - 1; + *added = 1; rrd->rra_def = realloc(rrd->rra_def, sizeof(rra_def_t) * rrd->stat_head->rra_cnt); rrd->rra_ptr = realloc(rrd->rra_ptr, sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt); @@ -1926,18 +1986,18 @@ static long add_temporary_rra_for_cdp_prefilling(rrd_t *rrd) return -1; } - strcpy(rrd->rra_def[temp_added_rra_index].cf_nam, "AVERAGE"); - rrd->rra_def[temp_added_rra_index].pdp_cnt = 1; - rrd->rra_def[temp_added_rra_index].row_cnt = largest_pdp; - rrd->rra_def[temp_added_rra_index].par[RRA_cdp_xff_val].u_val = 0.5; + strcpy(rrd->rra_def[found_rra_index].cf_nam, "AVERAGE"); + rrd->rra_def[found_rra_index].pdp_cnt = 1; + rrd->rra_def[found_rra_index].row_cnt = largest_pdp; + rrd->rra_def[found_rra_index].par[RRA_cdp_xff_val].u_val = 0.5; - rrd->rra_ptr[temp_added_rra_index].cur_row = 0; // doesn't matter + rrd->rra_ptr[found_rra_index].cur_row = 0; // doesn't matter unsigned long i, ii; for (ii = 0 ; ii < rrd->stat_head->ds_cnt ; ii++) { - init_cdp(rrd, &(rrd->rra_def[temp_added_rra_index]), - rrd->pdp_prep + temp_added_rra_index, - rrd->cdp_prep + rrd->stat_head->ds_cnt * temp_added_rra_index + ii); + init_cdp(rrd, &(rrd->rra_def[found_rra_index]), + rrd->pdp_prep + found_rra_index, + rrd->cdp_prep + rrd->stat_head->ds_cnt * found_rra_index + ii); } for (ii = 0 ; ii < largest_pdp ; ii++) { @@ -1946,7 +2006,7 @@ static long add_temporary_rra_for_cdp_prefilling(rrd_t *rrd) } } } - return temp_added_rra_index; + return found_rra_index; } static void remove_temporary_rra_for_cdp_prefilling(rrd_t *rrd, long added_index) { @@ -1972,10 +2032,18 @@ static void remove_temporary_rra_for_cdp_prefilling(rrd_t *rrd, long added_index sizeof(rrd_value_t) * (total_rows * rrd->stat_head->ds_cnt)); } +static int cdp_match(const rra_def_t *tofill, const rra_def_t *maybe) { + enum cf_en mcf = cf_conv(maybe->cf_nam); + if (cf_conv(tofill->cf_nam) == mcf && + tofill->pdp_cnt == maybe->pdp_cnt) { + return 1; + } + return 0; +} static int rrd_prefill_data(rrd_t *rrd, const GList *sources) { int rc = -1; - long temp_added_rra_index = -1; + long cdp_rra_index = -1; if (sources == NULL) { // we are done if there is nothing to copy data from @@ -1984,14 +2052,15 @@ static int rrd_prefill_data(rrd_t *rrd, const GList *sources) { } unsigned long rra_index, ds_index; - unsigned long seen_rows = 0; + unsigned long total_rows = 0; debug_dump_rra(((rrd_file_t *) sources->data)->rrd, 0, 0); - temp_added_rra_index = add_temporary_rra_for_cdp_prefilling(rrd); + int rra_added_temporarily = 0; + cdp_rra_index = rra_for_cdp_prefilling(rrd, &rra_added_temporarily); + if (rrd_test_error()) goto done; - unsigned long total_rows = 0; // process one RRA after the other for (rra_index = 0 ; rra_index < rrd->stat_head->rra_cnt ; rra_index++) { rra_def_t *rra_def = rrd->rra_def + rra_index; @@ -2007,7 +2076,7 @@ static int rrd_prefill_data(rrd_t *rrd, const GList *sources) { .rra = rra_def, .rra_cf = cf_conv(rra_def->cf_nam), .rra_index = rra_index, - .values = rrd->rrd_value + rrd->stat_head->ds_cnt * seen_rows, + .values = rrd->rrd_value + rrd->stat_head->ds_cnt * total_rows, .cdp = rrd->cdp_prep + rrd->stat_head->ds_cnt * rra_index, }; @@ -2055,7 +2124,7 @@ static int rrd_prefill_data(rrd_t *rrd, const GList *sources) { free(candidates); } - seen_rows += rra_def->row_cnt; + total_rows += rra_def->row_cnt; } /* now we walk all RRAs and DSs again to handle CDP prefilling. Do this AFTER @@ -2063,37 +2132,27 @@ static int rrd_prefill_data(rrd_t *rrd, const GList *sources) { * to obtain input data for CDP consolidation. And THAT RRA will only be available * AFTER all bin prefilling done in the previous pass */ - seen_rows = 0; + total_rows = 0; for (rra_index = 0 ; rra_index < rrd->stat_head->rra_cnt ; rra_index++) { - if ((long)rra_index != temp_added_rra_index) { - candidate_t target = { - .rrd = rrd, - .rra = rrd->rra_def + rra_index, - .rra_cf = cf_conv(rrd->rra_def[rra_index].cf_nam), - .rra_index = rra_index, - .values = rrd->rrd_value + rrd->stat_head->ds_cnt * seen_rows, - .cdp = rrd->cdp_prep + rrd->stat_head->ds_cnt * rra_index, - }; - - for (ds_index = 0 ; ds_index < rrd->stat_head->ds_cnt ; ds_index++) { - target.extra.l = ds_index; - - ds_def_t *ds_def = rrd->ds_def + ds_index; - GList *src; - for (src = sources ; src ; src = g_list_next(src)) { - const rrd_file_t *rrd_file = src->data; - if (rrd_file == NULL) continue; + candidate_t target = { + .rrd = rrd, + .rra = rrd->rra_def + rra_index, + .rra_cf = cf_conv(rrd->rra_def[rra_index].cf_nam), + .rra_index = rra_index, + .values = rrd->rrd_value + rrd->stat_head->ds_cnt * total_rows, + .cdp = rrd->cdp_prep + rrd->stat_head->ds_cnt * rra_index, + }; - const rrd_t *src_rrd = rrd_file->rrd; - if (src_rrd == NULL) continue; + for (ds_index = 0 ; ds_index < rrd->stat_head->ds_cnt ; ds_index++) { + target.extra.l = ds_index; - //find_ds_match(ds_def, s) - } - - //prefill_cdp_prep(&target, average_1_pdp_rra); - } + int candidate_cnt = 0; + candidate_t *candidates = find_matching_candidates(&target, sources, + &candidate_cnt, cdp_match, NULL); + + prefill_cdp_prep(&target, candidates, candidate_cnt, cdp_rra_index); } - seen_rows += rrd->rra_def[rra_index].row_cnt; + total_rows += rrd->rra_def[rra_index].row_cnt; } rc = 0; @@ -2104,7 +2163,9 @@ static int rrd_prefill_data(rrd_t *rrd, const GList *sources) { * RRA data sets */ debug_dump_rra(rrd, 0, 0); done: - remove_temporary_rra_for_cdp_prefilling(rrd, temp_added_rra_index); + if (rra_added_temporarily) { + remove_temporary_rra_for_cdp_prefilling(rrd, cdp_rra_index); + } return rc; } diff --git a/tests/tune2-testa-mod1.dump b/tests/tune2-testa-mod1.dump index d9656e5f..66f43eab 100644 --- a/tests/tune2-testa-mod1.dump +++ b/tests/tune2-testa-mod1.dump @@ -15,7 +15,7 @@ U - 0.0000000000e+00 + NaN 40 @@ -28,7 +28,7 @@ U - 0.0000000000e+00 + NaN 40 diff --git a/tests/tune2-testorg.dump b/tests/tune2-testorg.dump index e6726a7f..37b87f20 100644 --- a/tests/tune2-testorg.dump +++ b/tests/tune2-testorg.dump @@ -15,7 +15,7 @@ U - 0.0000000000e+00 + NaN 40 @@ -28,7 +28,7 @@ U - 0.0000000000e+00 + NaN 40 -- 2.47.3