From: Sven Panne Date: Wed, 31 May 2017 12:14:42 +0000 (+0200) Subject: Improved buffering (#792) X-Git-Tag: v1.7.1~131^2~3 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=698db1350e7c15eccc9723f3fdd3a8240897d5eb;p=thirdparty%2Frrdtool-1.x.git Improved buffering (#792) * Encapsulate access to write buffer. Fixed ssize_t vs. size_t confusion on the way. * Improve wbuf_append's time complexity from linear to amortized constant. This is done by the usual technique employed in many, may libraries out there (C++'s vector, Java's ArrayList, Python's list, ...): Distinguish between the size and the capacity of the underlying container. The capacity grows by some *factor* (2 in our case) if it is too small, amortizing the needed allocations/copies over time. In a nutshell: Adding a single character to the buffer can now be done in constant amortized time. * Simplify and improve handle_request_fetch. Now that add_response_info is efficient (thanks to the improved wbuf_append), we can vastly simplify handle_request_fetch and even remove some arbitrary length restrictions on the way. --- diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index bb8faaf5..d33a057e 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -146,8 +146,9 @@ struct listen_socket_s off_t next_cmd; off_t next_read; - char *wbuf; - ssize_t wbuf_len; + char *wbuf_data; + size_t wbuf_size; + size_t wbuf_capacity; uint32_t permissions; @@ -674,29 +675,59 @@ static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ assert(1==0); } /* }}} char *next_cmd */ +static char *wbuf_data(listen_socket_t *sock) /* {{{ */ +{ + assert(sock != NULL); + return sock->wbuf_data; +} /* }}} static char *wbuf_data */ + +static size_t wbuf_size(listen_socket_t *sock) /* {{{ */ +{ + assert(sock != NULL); + return sock->wbuf_size; +} /* }}} static size_t wbuf_data */ + +static void wbuf_free(listen_socket_t *sock) /* {{{ */ +{ + assert(sock != NULL); + free(sock->wbuf_data); + sock->wbuf_data = NULL; + sock->wbuf_size = 0; + sock->wbuf_capacity = 0; +} /* }}} static void wbuf_free */ + /* add the characters directly to the write buffer */ -static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */ +static int wbuf_append(listen_socket_t *sock, char *str, size_t len) /* {{{ */ { - char *new_buf; + char *new_data; + size_t new_capacity; assert(sock != NULL); - new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1); - if (new_buf == NULL) + new_capacity = sock->wbuf_capacity == 0 ? 4096 : sock->wbuf_capacity; + while (new_capacity <= sock->wbuf_size + len) { - RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed"); - return -1; + new_capacity *= 2; } - memcpy(new_buf + sock->wbuf_len, str, len); - - sock->wbuf = new_buf; - sock->wbuf_len += len; + if (new_capacity != sock->wbuf_capacity) + { + new_data = rrd_realloc(sock->wbuf_data, new_capacity); + if (new_data == NULL) + { + RRDD_LOG(LOG_ERR, "wbuf_append: realloc failed"); + return -1; + } + sock->wbuf_data = new_data; + sock->wbuf_capacity = new_capacity; + } - *(sock->wbuf + sock->wbuf_len)=0; + memcpy(&sock->wbuf_data[sock->wbuf_size], str, len); + sock->wbuf_data[sock->wbuf_size + len] = '\0'; + sock->wbuf_size += len; return 0; -} /* }}} static int add_to_wbuf */ +} /* }}} static int wbuf_append */ /* add the text to the "extra" info that's sent after the status line */ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ @@ -721,7 +752,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ return -1; } - return add_to_wbuf(sock, buffer, len); + return wbuf_append(sock, buffer, len); } /* }}} static int add_response_info */ /* add the binary data to the "extra" info that's sent after the status line */ @@ -743,11 +774,11 @@ static int add_binary_response_info(listen_socket_t *sock, if (res) return res; /* and add it to the buffer */ - res = add_to_wbuf(sock, (char*) data, records * rsize); + res = wbuf_append(sock, (char*) data, records * rsize); if (res) return res; /* and add a newline */ - return add_to_wbuf(sock, "\n", 1); + return wbuf_append(sock, "\n", 1); } /* }}} static int add_binary_response_info */ static int count_lines(char *str) /* {{{ */ @@ -775,7 +806,7 @@ static int send_response (listen_socket_t *sock, response_code rc, va_list argp; char buffer[RRD_CMD_MAX]; int lines; - ssize_t wrote; + size_t wrote; int rclen, len; if (JOURNAL_REPLAY(sock)) return rc; @@ -787,7 +818,7 @@ static int send_response (listen_socket_t *sock, response_code rc, lines = sock->batch_cmd; } else if (rc == RESP_OK) - lines = count_lines(sock->wbuf); + lines = count_lines(wbuf_data(sock)); else if (rc == RESP_OK_BIN) lines = 1; else @@ -814,7 +845,7 @@ static int send_response (listen_socket_t *sock, response_code rc, /* append the result to the wbuf, don't write to the user */ if (sock->batch_start) - return add_to_wbuf(sock, buffer, len); + return wbuf_append(sock, buffer, len); /* first write must be complete */ if (len != write(sock->fd, buffer, len)) @@ -823,12 +854,12 @@ static int send_response (listen_socket_t *sock, response_code rc, return -1; } - if (sock->wbuf != NULL && rc == RESP_OK) + if (wbuf_data(sock) != NULL && rc == RESP_OK) { wrote = 0; - while (wrote < sock->wbuf_len) + while (wrote < wbuf_size(sock)) { - ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote); + ssize_t wb = write(sock->fd, wbuf_data(sock) + wrote, wbuf_size(sock) - wrote); if (wb <= 0) { RRDD_LOG(LOG_INFO, "send_response: could not write results"); @@ -838,8 +869,7 @@ static int send_response (listen_socket_t *sock, response_code rc, } } - free(sock->wbuf); sock->wbuf = NULL; - sock->wbuf_len = 0; + wbuf_free(sock); return 0; } /* }}} */ @@ -1975,21 +2005,6 @@ static int handle_request_fetch_parse (HANDLER_PROTO, return 0; } -#define SSTRCAT(buffer,str,buffer_fill) do { \ - size_t str_len = strlen (str); \ - if ((buffer_fill + str_len) > sizeof (buffer)) \ - str_len = sizeof (buffer) - buffer_fill; \ - if (str_len > 0) { \ - strncpy (buffer + buffer_fill, str, str_len); \ - buffer_fill += str_len; \ - assert (buffer_fill <= sizeof (buffer)); \ - if (buffer_fill == sizeof (buffer)) \ - buffer[buffer_fill - 1] = 0; \ - else \ - buffer[buffer_fill] = 0; \ - } \ - } while (0) - static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ { unsigned long i,j; @@ -2010,22 +2025,15 @@ static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ add_response_info (sock, "End: %lu\n", (unsigned long) parsed.end_tm); add_response_info (sock, "Step: %lu\n", parsed.step); - { /* Add list of DS names */ - char linebuf[1024]; - size_t linebuf_fill; - - memset (linebuf, 0, sizeof (linebuf)); - linebuf_fill = 0; - for (i = 0; i < parsed.field_cnt; i++) - { - if (i > 0) - SSTRCAT (linebuf, " ", linebuf_fill); - SSTRCAT (linebuf, parsed.ds_namv[parsed.field_idx[i]], linebuf_fill); - } - linebuf[sizeof(linebuf) - 1] = 0; - add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt); - add_response_info (sock, "DSName: %s\n", linebuf); + /* Add list of DS names */ + add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt); + add_response_info (sock, "DSName: "); + for (i = 0; i < parsed.field_cnt; i++) + { + add_response_info (sock, (i == 0 ? "%s" :" %s"), + parsed.ds_namv[parsed.field_idx[i]]); } + add_response_info (sock, "\n"); /* Add the actual data */ assert (parsed.step > 0); @@ -2033,37 +2041,18 @@ static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ t <= parsed.end_tm; t += parsed.step,j++) { - char linebuf[1024]; - size_t linebuf_fill; - char tmp[128]; - add_response_info (sock, "%10lu:", (unsigned long) t); - - memset (linebuf, 0, sizeof (linebuf)); - linebuf_fill = 0; for (i = 0; i < parsed.field_cnt; i++) { unsigned int idx = j*parsed.ds_cnt+parsed.field_idx[i]; - snprintf (tmp, sizeof (tmp), " %0.17e", parsed.data[idx]); - tmp[sizeof (tmp) - 1] = 0; - SSTRCAT (linebuf, tmp, linebuf_fill); - if (linebuf_fill>sizeof(linebuf)*9/10) { - add_response_info (sock, linebuf); - memset (linebuf, 0, sizeof (linebuf)); - linebuf_fill = 0; - } - } - - /* only print out a line if parsed something */ - if (i > 0) { - add_response_info (sock, "%s\n", linebuf); + add_response_info (sock, " %0.17e", parsed.data[idx]); } + add_response_info (sock, "\n"); } /* for (t) */ free_fetch_parsed(&parsed); return (send_response (sock, RESP_OK, "Success\n")); } /* }}} int handle_request_fetch */ -#undef SSTRCAT static int handle_request_fetchbin (HANDLER_PROTO) /* {{{ */ { @@ -3536,7 +3525,7 @@ static void free_listen_socket(listen_socket_t *sock) /* {{{ */ assert(sock != NULL); free(sock->rbuf); sock->rbuf = NULL; - free(sock->wbuf); sock->wbuf = NULL; + wbuf_free(sock); free(sock->addr); sock->addr = NULL; free(sock); } /* }}} void free_listen_socket */