for (ds_index = 0 ; ds_index < rrd->stat_head->ds_cnt ; ds_index++) {
strcpy(rrd->pdp_prep[ds_index].last_ds, "U");
- /* interestingly, "U" is associated with a value of 0.0, traditionally.
- * I do not know why. Ask Tobi.
+ /* interestingly, "U" was associated sometimes with a value of 0.0
+ * and sometimes with a value of DNAN traditionally during create.
+ * I do not know why. Ask Tobi. :-)
+ * I choose DNAN here, because it makes more sense, IMHO.
*/
- rrd->pdp_prep[ds_index].scratch[PDP_val].u_val = 0;
+ rrd->pdp_prep[ds_index].scratch[PDP_val].u_val = DNAN;
rrd->pdp_prep[ds_index].scratch[PDP_unkn_sec_cnt].u_cnt =
rrd->live_head->last_up % rrd->stat_head->pdp_step;
}
}
}
-static void prefill_cdp_prep(candidate_t *target, rra_def_t *average_1_pdp_rra)
+static void get_cdp_start_end(const rrd_t *rrd, const rra_def_t *rra, time_t *begin, time_t *end) {
+ int cdp_step = rra->pdp_cnt * rrd->stat_head->pdp_step;
+ *begin = (rrd->live_head->last_up / cdp_step) * cdp_step + 1;
+ *end = *begin + cdp_step - 1;
+}
+
+static void prefill_cdp_prep(candidate_t *target,
+ candidate_t *candidates, int candidate_cnt,
+ long cdp_rra_index)
{
rrd_t *rrd = target->rrd;
- int ds_index = target->extra.l;
-
- int cdp_step = target->rra->pdp_cnt * rrd->stat_head->pdp_step;
-
- time_t current_cdp_begin_time =
- (rrd->live_head->last_up / cdp_step) * cdp_step + 1;
- time_t current_cdp_end_time = current_cdp_begin_time + cdp_step - 1;
-
-
-
-
-
-
-
-
-
-
-
-
-#if 0
+ time_t current_cdp_begin_time, current_cdp_end_time;
+ get_cdp_start_end(rrd, target->rra, ¤t_cdp_begin_time, ¤t_cdp_end_time);
const rrd_t *srrd = NULL;
+
+ /* the list of candidates only contains RRAs with the same PDP count, we
+ * still have to check if the step times are the same and if we
+ * have the time covered by the CDPs...*/
for (int c = 0 ; c < candidate_cnt ; c++) {
const candidate_t *candidate = candidates + c;
- /* only look at each source RRD once. We use the fact that all
- * candidates are block-sorted by the source RRDs */
- if (srrd == candidate->rrd) continue;
srrd = candidate->rrd;
-/*
- if (strncmp(srrd->pdp_prep[candidate->extra.l].last_ds, "U", LAST_DS_LEN) == 0) {
- /* this source RRD does not have a usable last DS value - skip */
- }
-
- */
- time_t source_cdp_begin_time =
- (srrd->live_head->last_up / cdp_step) * cdp_step + 1;
- time_t source_cdp_end_time = source_cdp_begin_time + cdp_step - 1;
-
- if (source_cdp_end_time <= current_cdp_end_time
- && source_cdp_end_time >= current_cdp_begin_time) {
- printf("PREFILL CDP !!!\n");
-
- memcpy(target->cdp + target->extra.l,
- candidate->cdp + candidate->extra.l,
- sizeof(cdp_prep_t));
+
+ if (rrd->stat_head->pdp_step != srrd->stat_head->pdp_step) continue;
+
+ time_t source_cdp_begin_time, source_cdp_end_time;
+ get_cdp_start_end(srrd, candidate->rra, &source_cdp_begin_time, &source_cdp_end_time);
+
+ if (source_cdp_begin_time == current_cdp_begin_time
+ && source_cdp_end_time == current_cdp_end_time) {
+ /* CDPs are fully compatible, but still: do the last update time match the very same bin?
+ * Only then are we able to just copy over data */
- break;
+ if (rrd->live_head->last_up / rrd->stat_head->pdp_step == srrd->live_head->last_up / srrd->stat_head->pdp_step) {
+ // OK, fully compatible...
+ memcpy(target->cdp + target->extra.l,
+ candidate->cdp + candidate->extra.l, sizeof(cdp_prep_t));
+ return;
+ }
}
}
-#endif
+ /* we are still here: we have not found a compatible CDP, use the "CDP RRA"... */
+ /* void init_cdp(const rrd_t *rrd, const rra_def_t *rra_def,
+ * const pdp_prep_t *pdp_prep, cdp_prep_t *cdp_prep) */
+
+ // must find first value index first....
-}
+ int ii, first_cdp_rra_value = 0;
+ for (ii = 0 ; ii < cdp_rra_index ; ii++) {
+ first_cdp_rra_value += rrd->rra_def[ii].row_cnt;
+ }
+
+ init_cdp(rrd, target->rra, rrd->pdp_prep + target->extra.l,
+ target->cdp + target->extra.l);
+ // explicitly set unknown count to 0...
+
+ (target->cdp + target->extra.l)->scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
+
+ enum cf_en tcf = cf_conv(target->rra->cf_nam);
+ time_t t;
+
+ rra_def_t *cdp_rra = rrd->rra_def + cdp_rra_index;
+
+ if (target->rra->pdp_cnt == 1) {
+ /* special case: for pdp_cnt == 1 RRAs, we may just use the last average
+ * value for the CDP primary value */
+ target->cdp[target->extra.l].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
+ target->cdp[target->extra.l].scratch[CDP_primary_val].u_val = 0;
+ target->cdp[target->extra.l].scratch[CDP_secondary_val].u_val = 0;
+
+ int r = row_for_time(rrd, cdp_rra,
+ rrd->rra_ptr[cdp_rra_index].cur_row,
+ current_cdp_begin_time - target->rrd->stat_head->pdp_step);
+
+ if (r >= 0) {
+ rrd_value_t v = rrd->rrd_value[(first_cdp_rra_value + r) * rrd->stat_head->ds_cnt
+ + target->extra.l];
+ target->cdp[target->extra.l].scratch[CDP_primary_val].u_val = v;
+ }
+
+ return;
+ }
+
+ int cdp_step = rrd->stat_head->pdp_step * target->rra->pdp_cnt;
+ for (t = current_cdp_begin_time - target->rra->pdp_cnt * rrd->stat_head->pdp_step ;
+ t < current_cdp_end_time && t < rrd->live_head->last_up ;
+ t += rrd->stat_head->pdp_step) {
+
+ int r = row_for_time(rrd, cdp_rra,
+ rrd->rra_ptr[cdp_rra_index].cur_row,
+ t);
+
+ if (r < 0) continue;
+ rrd_value_t v = rrd->rrd_value[(first_cdp_rra_value + r) * rrd->stat_head->ds_cnt
+ + target->extra.l];
+ int n = target->rra->pdp_cnt - (t % cdp_step) / rrd->stat_head->pdp_step;
+
+ update_cdp(target->cdp[target->extra.l].scratch,
+ tcf,
+ v, // pdp_temp_val
+ n == 1, // rra_step_cnt
+ 1, // elapsed_pdp_st
+ n, // start_pdp_offset
+ target->rra->pdp_cnt, // pdp_cnt
+ target->rra->par[RRA_cdp_xff_val].u_val, // xff
+ 0, 0);
+ }
+}
static unsigned long find_ds_match(const ds_def_t *ds_def, const rrd_t *src_rrd) {
unsigned long source_ds_index;
return src_rrd->stat_head->ds_cnt;
}
-
-
/* Find a set of RRAs among all RRA in the sources list, as matched by the select_func and
* ordered (by source) according to order_func.
*/
*
* If we do not already have something compatible, we add a temporary RRA that
* can/should be removed again after CDP prefilling
+ *
+ * Returns the index of a suitable RRA in the target RRD. If that RRA was
+ * added just for this purpose, the *added flag will be true and false otherwise.
*/
-static long add_temporary_rra_for_cdp_prefilling(rrd_t *rrd)
+static long rra_for_cdp_prefilling(rrd_t *rrd, int *added)
{
// find largest pdp count
unsigned long largest_pdp = 0;
rra_def_t *average_1_pdp_rra = NULL;
- long temp_added_rra_index = -1;
+ long found_rra_index = -1;
unsigned long original_total_rows = 0;
unsigned long rra_index;
if (rra->pdp_cnt == 1 && cf_conv(rra->cf_nam) == CF_AVERAGE) {
if (average_1_pdp_rra == NULL) {
average_1_pdp_rra = rra;
+ found_rra_index = rra_index;
} else if (average_1_pdp_rra->row_cnt < rra->row_cnt) {
average_1_pdp_rra = rra;
+ found_rra_index = rra_index;
}
}
}
-
+ largest_pdp *= 2;
+ *added = 0;
if (average_1_pdp_rra == NULL || average_1_pdp_rra->row_cnt < largest_pdp) {
// add our own temporary average RRA with a PDP count == 1
-
+
/* temporarily extend */
rrd->stat_head->rra_cnt++;
- temp_added_rra_index = rrd->stat_head->rra_cnt - 1;
+ found_rra_index = rrd->stat_head->rra_cnt - 1;
+ *added = 1;
rrd->rra_def = realloc(rrd->rra_def, sizeof(rra_def_t) * rrd->stat_head->rra_cnt);
rrd->rra_ptr = realloc(rrd->rra_ptr, sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt);
return -1;
}
- strcpy(rrd->rra_def[temp_added_rra_index].cf_nam, "AVERAGE");
- rrd->rra_def[temp_added_rra_index].pdp_cnt = 1;
- rrd->rra_def[temp_added_rra_index].row_cnt = largest_pdp;
- rrd->rra_def[temp_added_rra_index].par[RRA_cdp_xff_val].u_val = 0.5;
+ strcpy(rrd->rra_def[found_rra_index].cf_nam, "AVERAGE");
+ rrd->rra_def[found_rra_index].pdp_cnt = 1;
+ rrd->rra_def[found_rra_index].row_cnt = largest_pdp;
+ rrd->rra_def[found_rra_index].par[RRA_cdp_xff_val].u_val = 0.5;
- rrd->rra_ptr[temp_added_rra_index].cur_row = 0; // doesn't matter
+ rrd->rra_ptr[found_rra_index].cur_row = 0; // doesn't matter
unsigned long i, ii;
for (ii = 0 ; ii < rrd->stat_head->ds_cnt ; ii++) {
- init_cdp(rrd, &(rrd->rra_def[temp_added_rra_index]),
- rrd->pdp_prep + temp_added_rra_index,
- rrd->cdp_prep + rrd->stat_head->ds_cnt * temp_added_rra_index + ii);
+ init_cdp(rrd, &(rrd->rra_def[found_rra_index]),
+ rrd->pdp_prep + found_rra_index,
+ rrd->cdp_prep + rrd->stat_head->ds_cnt * found_rra_index + ii);
}
for (ii = 0 ; ii < largest_pdp ; ii++) {
}
}
}
- return temp_added_rra_index;
+ return found_rra_index;
}
static void remove_temporary_rra_for_cdp_prefilling(rrd_t *rrd, long added_index) {
sizeof(rrd_value_t) * (total_rows * rrd->stat_head->ds_cnt));
}
+static int cdp_match(const rra_def_t *tofill, const rra_def_t *maybe) {
+ enum cf_en mcf = cf_conv(maybe->cf_nam);
+ if (cf_conv(tofill->cf_nam) == mcf &&
+ tofill->pdp_cnt == maybe->pdp_cnt) {
+ return 1;
+ }
+ return 0;
+}
static int rrd_prefill_data(rrd_t *rrd, const GList *sources) {
int rc = -1;
- long temp_added_rra_index = -1;
+ long cdp_rra_index = -1;
if (sources == NULL) {
// we are done if there is nothing to copy data from
}
unsigned long rra_index, ds_index;
- unsigned long seen_rows = 0;
+ unsigned long total_rows = 0;
debug_dump_rra(((rrd_file_t *) sources->data)->rrd, 0, 0);
- temp_added_rra_index = add_temporary_rra_for_cdp_prefilling(rrd);
+ int rra_added_temporarily = 0;
+ cdp_rra_index = rra_for_cdp_prefilling(rrd, &rra_added_temporarily);
+
if (rrd_test_error()) goto done;
- unsigned long total_rows = 0;
// process one RRA after the other
for (rra_index = 0 ; rra_index < rrd->stat_head->rra_cnt ; rra_index++) {
rra_def_t *rra_def = rrd->rra_def + rra_index;
.rra = rra_def,
.rra_cf = cf_conv(rra_def->cf_nam),
.rra_index = rra_index,
- .values = rrd->rrd_value + rrd->stat_head->ds_cnt * seen_rows,
+ .values = rrd->rrd_value + rrd->stat_head->ds_cnt * total_rows,
.cdp = rrd->cdp_prep + rrd->stat_head->ds_cnt * rra_index,
};
free(candidates);
}
- seen_rows += rra_def->row_cnt;
+ total_rows += rra_def->row_cnt;
}
/* now we walk all RRAs and DSs again to handle CDP prefilling. Do this AFTER
* to obtain input data for CDP consolidation. And THAT RRA will only be available
* AFTER all bin prefilling done in the previous pass */
- seen_rows = 0;
+ total_rows = 0;
for (rra_index = 0 ; rra_index < rrd->stat_head->rra_cnt ; rra_index++) {
- if ((long)rra_index != temp_added_rra_index) {
- candidate_t target = {
- .rrd = rrd,
- .rra = rrd->rra_def + rra_index,
- .rra_cf = cf_conv(rrd->rra_def[rra_index].cf_nam),
- .rra_index = rra_index,
- .values = rrd->rrd_value + rrd->stat_head->ds_cnt * seen_rows,
- .cdp = rrd->cdp_prep + rrd->stat_head->ds_cnt * rra_index,
- };
-
- for (ds_index = 0 ; ds_index < rrd->stat_head->ds_cnt ; ds_index++) {
- target.extra.l = ds_index;
-
- ds_def_t *ds_def = rrd->ds_def + ds_index;
- GList *src;
- for (src = sources ; src ; src = g_list_next(src)) {
- const rrd_file_t *rrd_file = src->data;
- if (rrd_file == NULL) continue;
+ candidate_t target = {
+ .rrd = rrd,
+ .rra = rrd->rra_def + rra_index,
+ .rra_cf = cf_conv(rrd->rra_def[rra_index].cf_nam),
+ .rra_index = rra_index,
+ .values = rrd->rrd_value + rrd->stat_head->ds_cnt * total_rows,
+ .cdp = rrd->cdp_prep + rrd->stat_head->ds_cnt * rra_index,
+ };
- const rrd_t *src_rrd = rrd_file->rrd;
- if (src_rrd == NULL) continue;
+ for (ds_index = 0 ; ds_index < rrd->stat_head->ds_cnt ; ds_index++) {
+ target.extra.l = ds_index;
- //find_ds_match(ds_def, s)
- }
-
- //prefill_cdp_prep(&target, average_1_pdp_rra);
- }
+ int candidate_cnt = 0;
+ candidate_t *candidates = find_matching_candidates(&target, sources,
+ &candidate_cnt, cdp_match, NULL);
+
+ prefill_cdp_prep(&target, candidates, candidate_cnt, cdp_rra_index);
}
- seen_rows += rrd->rra_def[rra_index].row_cnt;
+ total_rows += rrd->rra_def[rra_index].row_cnt;
}
rc = 0;
* RRA data sets */
debug_dump_rra(rrd, 0, 0);
done:
- remove_temporary_rra_for_cdp_prefilling(rrd, temp_added_rra_index);
+ if (rra_added_temporarily) {
+ remove_temporary_rra_for_cdp_prefilling(rrd, cdp_rra_index);
+ }
return rc;
}