]> git.ipfire.org Git - thirdparty/rrdtool-1.x.git/commitdiff
Improved buffering (#792)
authorSven Panne <svenpanne@gmail.com>
Wed, 31 May 2017 12:14:42 +0000 (14:14 +0200)
committerTobias Oetiker <tobi@oetiker.ch>
Wed, 31 May 2017 12:14:42 +0000 (14:14 +0200)
* Encapsulate access to write buffer.

Fixed ssize_t vs. size_t confusion on the way.

* Improve wbuf_append's time complexity from linear to amortized constant.

This is done by the usual technique employed in many, may libraries out
there (C++'s vector, Java's ArrayList, Python's list, ...): Distinguish
between the size and the capacity of the underlying container.  The capacity
grows by some *factor* (2 in our case) if it is too small, amortizing the
needed allocations/copies over time.

In a nutshell: Adding a single character to the buffer can now be done in
constant amortized time.

* Simplify and improve handle_request_fetch.

Now that add_response_info is efficient (thanks to the improved
wbuf_append), we can vastly simplify handle_request_fetch and even remove
some arbitrary length restrictions on the way.

src/rrd_daemon.c

index bb8faaf569e5176587ed478d200ae18289acccb4..d33a057e65884d671706d75415e980212a3f3552 100644 (file)
@@ -146,8 +146,9 @@ struct listen_socket_s
   off_t next_cmd;
   off_t next_read;
 
-  char *wbuf;
-  ssize_t wbuf_len;
+  char *wbuf_data;
+  size_t wbuf_size;
+  size_t wbuf_capacity;
 
   uint32_t permissions;
 
@@ -674,29 +675,59 @@ static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */
   assert(1==0);
 } /* }}} char *next_cmd */
 
+static char *wbuf_data(listen_socket_t *sock) /* {{{ */
+{
+  assert(sock != NULL);
+  return sock->wbuf_data;
+} /* }}} static char *wbuf_data */
+
+static size_t wbuf_size(listen_socket_t *sock) /* {{{ */
+{
+  assert(sock != NULL);
+  return sock->wbuf_size;
+} /* }}} static size_t wbuf_data */
+
+static void wbuf_free(listen_socket_t *sock) /* {{{ */
+{
+  assert(sock != NULL);
+  free(sock->wbuf_data);
+  sock->wbuf_data = NULL;
+  sock->wbuf_size = 0;
+  sock->wbuf_capacity = 0;
+} /* }}} static void wbuf_free */
+
 /* add the characters directly to the write buffer */
-static int add_to_wbuf(listen_socket_t *sock, char *str, size_t len) /* {{{ */
+static int wbuf_append(listen_socket_t *sock, char *str, size_t len) /* {{{ */
 {
-  char *new_buf;
+  char *new_data;
+  size_t new_capacity;
 
   assert(sock != NULL);
 
-  new_buf = rrd_realloc(sock->wbuf, sock->wbuf_len + len + 1);
-  if (new_buf == NULL)
+  new_capacity = sock->wbuf_capacity == 0 ? 4096 : sock->wbuf_capacity;
+  while (new_capacity <= sock->wbuf_size + len)
   {
-    RRDD_LOG(LOG_ERR, "add_to_wbuf: realloc failed");
-    return -1;
+    new_capacity *= 2;
   }
 
-  memcpy(new_buf + sock->wbuf_len, str, len);
-
-  sock->wbuf = new_buf;
-  sock->wbuf_len += len;
+  if (new_capacity != sock->wbuf_capacity)
+  {
+    new_data = rrd_realloc(sock->wbuf_data, new_capacity);
+    if (new_data == NULL)
+    {
+      RRDD_LOG(LOG_ERR, "wbuf_append: realloc failed");
+      return -1;
+    }
+    sock->wbuf_data = new_data;
+    sock->wbuf_capacity = new_capacity;
+  }
 
-  *(sock->wbuf + sock->wbuf_len)=0;
+  memcpy(&sock->wbuf_data[sock->wbuf_size], str, len);
+  sock->wbuf_data[sock->wbuf_size + len] = '\0';
+  sock->wbuf_size += len;
 
   return 0;
-} /* }}} static int add_to_wbuf */
+} /* }}} static int wbuf_append */
 
 /* add the text to the "extra" info that's sent after the status line */
 static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
@@ -721,7 +752,7 @@ static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */
     return -1;
   }
 
-  return add_to_wbuf(sock, buffer, len);
+  return wbuf_append(sock, buffer, len);
 } /* }}} static int add_response_info */
 
 /* add the binary data to the "extra" info that's sent after the status line */
@@ -743,11 +774,11 @@ static int add_binary_response_info(listen_socket_t *sock,
        if (res)
                return res;
        /* and add it to the buffer */
-       res = add_to_wbuf(sock, (char*) data, records * rsize);
+       res = wbuf_append(sock, (char*) data, records * rsize);
        if (res)
                return res;
        /* and add a newline */
-       return add_to_wbuf(sock, "\n", 1);
+       return wbuf_append(sock, "\n", 1);
 } /* }}} static int add_binary_response_info */
 
 static int count_lines(char *str) /* {{{ */
@@ -775,7 +806,7 @@ static int send_response (listen_socket_t *sock, response_code rc,
   va_list argp;
   char buffer[RRD_CMD_MAX];
   int lines;
-  ssize_t wrote;
+  size_t wrote;
   int rclen, len;
 
   if (JOURNAL_REPLAY(sock)) return rc;
@@ -787,7 +818,7 @@ static int send_response (listen_socket_t *sock, response_code rc,
     lines = sock->batch_cmd;
   }
   else if (rc == RESP_OK)
-    lines = count_lines(sock->wbuf);
+    lines = count_lines(wbuf_data(sock));
   else if (rc == RESP_OK_BIN)
     lines = 1;
   else
@@ -814,7 +845,7 @@ static int send_response (listen_socket_t *sock, response_code rc,
 
   /* append the result to the wbuf, don't write to the user */
   if (sock->batch_start)
-    return add_to_wbuf(sock, buffer, len);
+    return wbuf_append(sock, buffer, len);
 
   /* first write must be complete */
   if (len != write(sock->fd, buffer, len))
@@ -823,12 +854,12 @@ static int send_response (listen_socket_t *sock, response_code rc,
     return -1;
   }
 
-  if (sock->wbuf != NULL && rc == RESP_OK)
+  if (wbuf_data(sock) != NULL && rc == RESP_OK)
   {
     wrote = 0;
-    while (wrote < sock->wbuf_len)
+    while (wrote < wbuf_size(sock))
     {
-      ssize_t wb = write(sock->fd, sock->wbuf + wrote, sock->wbuf_len - wrote);
+      ssize_t wb = write(sock->fd, wbuf_data(sock) + wrote, wbuf_size(sock) - wrote);
       if (wb <= 0)
       {
         RRDD_LOG(LOG_INFO, "send_response: could not write results");
@@ -838,8 +869,7 @@ static int send_response (listen_socket_t *sock, response_code rc,
     }
   }
 
-  free(sock->wbuf); sock->wbuf = NULL;
-  sock->wbuf_len = 0;
+  wbuf_free(sock);
 
   return 0;
 } /* }}} */
@@ -1975,21 +2005,6 @@ static int handle_request_fetch_parse (HANDLER_PROTO,
   return 0;
 }
 
-#define SSTRCAT(buffer,str,buffer_fill) do { \
-    size_t str_len = strlen (str); \
-    if ((buffer_fill + str_len) > sizeof (buffer)) \
-      str_len = sizeof (buffer) - buffer_fill; \
-    if (str_len > 0) { \
-      strncpy (buffer + buffer_fill, str, str_len); \
-      buffer_fill += str_len; \
-      assert (buffer_fill <= sizeof (buffer)); \
-      if (buffer_fill == sizeof (buffer)) \
-        buffer[buffer_fill - 1] = 0; \
-      else \
-        buffer[buffer_fill] = 0; \
-    } \
-  } while (0)
-
 static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
 {
   unsigned long i,j;
@@ -2010,22 +2025,15 @@ static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
   add_response_info (sock, "End: %lu\n", (unsigned long) parsed.end_tm);
   add_response_info (sock, "Step: %lu\n", parsed.step);
 
-  { /* Add list of DS names */
-    char linebuf[1024];
-    size_t linebuf_fill;
-
-    memset (linebuf, 0, sizeof (linebuf));
-    linebuf_fill = 0;
-    for (i = 0; i < parsed.field_cnt; i++)
-    {
-      if (i > 0)
-        SSTRCAT (linebuf, " ", linebuf_fill);
-      SSTRCAT (linebuf, parsed.ds_namv[parsed.field_idx[i]], linebuf_fill);
-    }
-    linebuf[sizeof(linebuf) - 1] = 0;
-    add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt);
-    add_response_info (sock, "DSName: %s\n", linebuf);
+  /* Add list of DS names */
+  add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt);
+  add_response_info (sock, "DSName: ");
+  for (i = 0; i < parsed.field_cnt; i++)
+  {
+    add_response_info (sock, (i == 0 ? "%s" :" %s"),
+                       parsed.ds_namv[parsed.field_idx[i]]);
   }
+  add_response_info (sock, "\n");
 
   /* Add the actual data */
   assert (parsed.step > 0);
@@ -2033,37 +2041,18 @@ static int handle_request_fetch (HANDLER_PROTO) /* {{{ */
        t <= parsed.end_tm;
        t += parsed.step,j++)
   {
-    char linebuf[1024];
-    size_t linebuf_fill;
-    char tmp[128];
-
     add_response_info (sock, "%10lu:", (unsigned long) t);
-
-    memset (linebuf, 0, sizeof (linebuf));
-    linebuf_fill = 0;
     for (i = 0; i < parsed.field_cnt; i++)
     {
       unsigned int idx = j*parsed.ds_cnt+parsed.field_idx[i];
-      snprintf (tmp, sizeof (tmp), " %0.17e", parsed.data[idx]);
-      tmp[sizeof (tmp) - 1] = 0;
-      SSTRCAT (linebuf, tmp, linebuf_fill);
-      if (linebuf_fill>sizeof(linebuf)*9/10) {
-        add_response_info (sock, linebuf);
-       memset (linebuf, 0, sizeof (linebuf));
-       linebuf_fill = 0;
-      }
-    }
-
-    /* only print out a line if parsed something */
-    if (i > 0) {
-      add_response_info (sock, "%s\n", linebuf);
+      add_response_info (sock, " %0.17e", parsed.data[idx]);
     }
+    add_response_info (sock, "\n");
   } /* for (t) */
   free_fetch_parsed(&parsed);
 
   return (send_response (sock, RESP_OK, "Success\n"));
 } /* }}} int handle_request_fetch */
-#undef SSTRCAT
 
 static int handle_request_fetchbin (HANDLER_PROTO) /* {{{ */
 {
@@ -3536,7 +3525,7 @@ static void free_listen_socket(listen_socket_t *sock) /* {{{ */
   assert(sock != NULL);
 
   free(sock->rbuf);  sock->rbuf = NULL;
-  free(sock->wbuf);  sock->wbuf = NULL;
+  wbuf_free(sock);
   free(sock->addr);  sock->addr = NULL;
   free(sock);
 } /* }}} void free_listen_socket */