From: Martin Sperl Date: Thu, 2 Oct 2014 08:06:18 +0000 (+0000) Subject: implemented fetchbin protocol X-Git-Tag: v1.5.0-rc1~24^2^2~5 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=805c38279a0bf6376d0abffd226966a926b4de6b;p=thirdparty%2Frrdtool-1.x.git implemented fetchbin protocol as well as fixing a bug in fetch --- diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index c80827f3..8dad4ee1 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -1566,6 +1566,7 @@ struct fetch_parsed{ time_t start_tm; time_t end_tm; unsigned long step; + unsigned long steps; unsigned long ds_cnt; char **ds_namv; @@ -1701,6 +1702,8 @@ static int handle_request_fetch_parse (HANDLER_PROTO, return (send_response(sock, RESP_ERR, "rrd_fetch_r failed: %s\n", rrd_get_error ())); + parsed->steps = (parsed->end_tm - parsed->start_tm) / parsed->step; + /* prepare field index */ { unsigned int i; @@ -1718,6 +1721,9 @@ static int handle_request_fetch_parse (HANDLER_PROTO, "too many fields given - duplicates!\n" )); } + /* if the field is empty, then next */ + if (field[0]==0) + continue; /* try to find the string */ unsigned int found=parsed->ds_cnt; for(i=0; i < parsed->ds_cnt; i++) { @@ -1814,6 +1820,8 @@ static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ size_t linebuf_fill; char tmp[128]; + add_response_info (sock, "%10lu:", (unsigned long) t); + memset (linebuf, 0, sizeof (linebuf)); linebuf_fill = 0; for (i = 0; i < parsed.field_cnt; i++) @@ -1822,21 +1830,86 @@ static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ snprintf (tmp, sizeof (tmp), " %0.17e", parsed.data[idx]); tmp[sizeof (tmp) - 1] = 0; SSTRCAT (linebuf, tmp, linebuf_fill); + if (linebuf_fill>sizeof(linebuf)*9/10) { + add_response_info (sock, linebuf); + memset (linebuf, 0, sizeof (linebuf)); + linebuf_fill = 0; + } + } + if (linebuf_fill>0) { + add_response_info (sock, "%s\n", linebuf); } - - add_response_info (sock, "%10lu:%s\n", (unsigned long) t, linebuf); } /* for (t) */ free_fetch_parsed(&parsed); return (send_response (sock, RESP_OK, "Success\n")); } /* }}} int handle_request_fetch */ +#undef SSTRCAT static int handle_request_fetchbin (HANDLER_PROTO) /* {{{ */ { - return handle_request_fetch(cmd,sock,now,buffer,buffer_size); -} /* }}} int handle_request_fetchbin */ + unsigned long i,j; -#undef SSTRCAT + time_t t; + int status; + + struct fetch_parsed parsed; + + double *dbuffer; + size_t dbuffer_size; + + status = handle_request_fetch_parse (cmd, sock, now, + buffer, buffer_size, + &parsed); + if (status != 0) + return status; + + /* create a buffer for the full binary line */ + dbuffer_size = sizeof(double) * parsed.steps; + dbuffer=calloc(1,dbuffer_size); + if (!dbuffer) { + return (send_response (sock, RESP_ERR, + "Failed memory allocation\n")); + } + + assert (parsed.step > 0); + + add_response_info (sock, "FlushVersion: %lu\n", 1); + add_response_info (sock, "Start: %lu\n", (unsigned long) parsed.start_tm); + add_response_info (sock, "End: %lu\n", (unsigned long) parsed.end_tm); + add_response_info (sock, "Step: %lu\n", parsed.step); + add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt); + + /* now iterate the parsed fields */ + for (i = 0; i < parsed.field_cnt; i++) + { + add_response_info (sock, + "DSBINDATA%s: %s %i\n", +#ifdef WORDS_BIGENDIAN + "BIG" +#else + "LITTLE", +#endif + parsed.ds_namv[parsed.field_idx[i]], + dbuffer_size + ); + for (t = parsed.start_tm + parsed.step, j=0; + t <= parsed.end_tm; + t += parsed.step,j++) + { + unsigned int idx = j*parsed.ds_cnt+parsed.field_idx[i]; + dbuffer[j] = parsed.data[idx]; + } + /* and add it to the buffer */ + add_to_wbuf(sock, (char*) dbuffer, dbuffer_size); + /* and add a newline */ + add_to_wbuf(sock, "\n", 1); + } + + free_fetch_parsed(&parsed); + + return (send_response (sock, RESP_OK, "Success\n")); +} /* }}} int handle_request_fetchbin */ /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file