]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: iostream-temp - Optimize writing many small writes
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 21 Oct 2025 10:33:56 +0000 (13:33 +0300)
committerTimo Sirainen <timo.sirainen@open-xchange.com>
Wed, 22 Oct 2025 08:23:23 +0000 (11:23 +0300)
There was no buffering for o_stream_send*() calls after the initial
in-memory buffer size was exceeded. Each one was immediately written to
disk.

This fixes inefficiencies at least with doveadm-server HTTP responses.

src/lib/iostream-temp.c
src/lib/iostream-temp.h
src/lib/test-iostream-temp.c

index b518367c4511a1570cc6e49419c34d1913aa911e..7b460796f3dd8eaaf707bb4baea6415ed25c1c01 100644 (file)
@@ -10,6 +10,7 @@
 #include "iostream-temp.h"
 
 #include <unistd.h>
+#include <sys/uio.h>
 
 #define IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT (1024*128)
 
@@ -20,11 +21,16 @@ struct temp_ostream {
        enum iostream_temp_flags flags;
        size_t max_mem_size;
 
+       /* writev() wrapper - overridden for unit tests */
+       ssize_t (*writev)(int fd, const struct iovec *iov,
+                         unsigned int iov_count);
+
        struct istream *dupstream;
        uoff_t dupstream_offset, dupstream_start_offset;
        char *name;
 
        buffer_t *buf;
+       buffer_t *fd_buf;
        int fd;
        bool fd_tried;
        uoff_t fd_size;
@@ -42,6 +48,7 @@ o_stream_temp_close(struct iostream_private *stream,
 
        i_close_fd(&tstream->fd);
        buffer_free(&tstream->buf);
+       buffer_free(&tstream->fd_buf);
        i_free(tstream->temp_path_prefix);
        i_free(tstream->name);
 }
@@ -74,11 +81,15 @@ static int o_stream_temp_move_to_fd(struct temp_ostream *tstream)
           e.g. for unit tests */
        tstream->ostream.fd = tstream->fd;
        tstream->fd_size = tstream->buf->used;
+       /* max_mem_size is smaller than IO_BLOCK_SIZE only in unit tests */
+       size_t fd_buf_size = I_MIN(IO_BLOCK_SIZE, tstream->max_mem_size);
+       tstream->fd_buf = buffer_create_dynamic_max(default_pool,
+                                                   fd_buf_size, fd_buf_size);
        buffer_free(&tstream->buf);
        return 0;
 }
 
-int o_stream_temp_move_to_memory(struct ostream *output)
+static int o_stream_temp_move_to_memory(struct ostream *output)
 {
        struct temp_ostream *tstream =
                container_of(output->real_stream, struct temp_ostream, ostream);
@@ -88,10 +99,7 @@ int o_stream_temp_move_to_memory(struct ostream *output)
 
        i_assert(tstream->buf == NULL);
        tstream->buf = buffer_create_dynamic(default_pool, 8192);
-       while (offset < tstream->ostream.ostream.offset &&
-              (ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) {
-               if ((size_t)ret > tstream->ostream.ostream.offset - offset)
-                       ret = tstream->ostream.ostream.offset - offset;
+       while ((ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) {
                buffer_append(tstream->buf, buf, ret);
                offset += ret;
        }
@@ -109,32 +117,97 @@ int o_stream_temp_move_to_memory(struct ostream *output)
 }
 
 static ssize_t
-o_stream_temp_fd_sendv(struct temp_ostream *tstream,
-                      const struct const_iovec *iov, unsigned int iov_count)
+o_stream_temp_fd_buf_sendv(struct temp_ostream *tstream,
+                          const struct const_iovec *iov,
+                          unsigned int iov_count, bool flush)
 {
-       size_t bytes = 0;
-       unsigned int i;
+       i_assert(tstream->buf == NULL);
 
-       for (i = 0; i < iov_count; i++) {
-               if (write_full(tstream->fd, iov[i].iov_base, iov[i].iov_len) < 0) {
-                       i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory",
-                               o_stream_get_name(&tstream->ostream.ostream),
-                               tstream->temp_path_prefix);
-                       if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0)
-                               return -1;
-                       for (; i < iov_count; i++) {
-                               buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len);
-                               bytes += iov[i].iov_len;
-                               tstream->ostream.ostream.offset += iov[i].iov_len;
-                       }
-                       i_assert(tstream->fd_tried);
-                       return bytes;
+       /* if the amount of data fits into fd_buf, put it there */
+       size_t total_size = 0;
+       for (unsigned int i = 0; i < iov_count; i++)
+               total_size += iov[i].iov_len;
+       if (total_size + tstream->fd_buf->used <=
+           buffer_get_writable_size(tstream->fd_buf) && !flush) {
+               for (unsigned int i = 0; i < iov_count; i++) {
+                       buffer_append(tstream->fd_buf, iov[i].iov_base,
+                                     iov[i].iov_len);
+               }
+               return total_size;
+       }
+
+       struct const_iovec iov_copy[iov_count + 1];
+       struct const_iovec *new_iov = iov_copy;
+
+       /* Use writev() to send all the pieces */
+       size_t fd_buf_used = tstream->fd_buf->used;
+       if (fd_buf_used == 0)
+               memcpy(new_iov, iov, sizeof(*new_iov) * iov_count);
+       else {
+               /* Create iovec that is prefixed by the already buffered data */
+               new_iov[0].iov_base = tstream->fd_buf->data;
+               new_iov[0].iov_len = tstream->fd_buf->used;
+               memcpy(new_iov + 1, iov, sizeof(*new_iov) * iov_count);
+               iov_count++;
+       }
+
+       size_t bytes_sent = 0;
+       ssize_t ret;
+       while ((ret = tstream->writev(tstream->fd,
+                                     (const struct iovec *)new_iov,
+                                     I_MIN(iov_count, IOV_MAX))) > 0) {
+               bytes_sent += ret;
+               i_assert(bytes_sent <= total_size + fd_buf_used);
+               if (bytes_sent == total_size + fd_buf_used)
+                       break;
+               /* partial write, try again */
+               while ((size_t)ret >= new_iov->iov_len) {
+                       ret -= new_iov->iov_len;
+                       new_iov++;
+                       iov_count--;
+                       i_assert(iov_count > 0);
                }
-               bytes += iov[i].iov_len;
-               tstream->ostream.ostream.offset += iov[i].iov_len;
+               new_iov->iov_len -= ret;
+               new_iov->iov_base = CONST_PTR_OFFSET(new_iov->iov_base, ret);
        }
-       tstream->fd_size += bytes;
-       return bytes;
+       if (ret == 0) {
+               /* shouldn't happen - assume it's out of disk space */
+               errno = ENOSPC;
+               ret = -1;
+       }
+       if (ret < 0) {
+               i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory",
+                       o_stream_get_name(&tstream->ostream.ostream),
+                       tstream->temp_path_prefix);
+               if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0)
+                       return -1;
+               for (unsigned int i = 0; i < iov_count; i++) {
+                       buffer_append(tstream->buf, new_iov[i].iov_base,
+                                     new_iov[i].iov_len);
+                       bytes_sent += new_iov[i].iov_len;
+               }
+               i_assert(bytes_sent == total_size + fd_buf_used);
+               buffer_free(&tstream->fd_buf);
+       } else {
+               buffer_set_used_size(tstream->fd_buf, 0);
+       }
+       return total_size;
+}
+
+static ssize_t
+o_stream_temp_fd_sendv(struct temp_ostream *tstream,
+                      const struct const_iovec *iov, unsigned int iov_count)
+{
+       i_assert(tstream->fd_buf != NULL);
+       /* max_mem_size has been reached, and we're writing to a temp file. */
+       ssize_t ret = o_stream_temp_fd_buf_sendv(tstream, iov, iov_count, FALSE);
+       if (ret < 0)
+               return -1;
+
+       if (tstream->fd != -1)
+               tstream->fd_size += ret;
+       tstream->ostream.ostream.offset += ret;
+       return ret;
 }
 
 static ssize_t
@@ -157,6 +230,8 @@ o_stream_temp_sendv(struct ostream_private *stream,
        if (tstream->fd != -1)
                return o_stream_temp_fd_sendv(tstream, iov, iov_count);
 
+       /* Either max_mem_size has not been reached yet, or we failed
+          to write to a temp file (e.g. out of disk space). */
        for (i = 0; i < iov_count; i++) {
                if (tstream->buf->used + iov[i].iov_len > tstream->max_mem_size) {
                        if (o_stream_temp_move_to_fd(tstream) == 0) {
@@ -210,6 +285,18 @@ static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream,
        return ret;
 }
 
+static int o_stream_temp_flush(struct ostream_private *stream)
+{
+       struct temp_ostream *tstream =
+               container_of(stream, struct temp_ostream, ostream);
+
+       if (tstream->fd_buf == NULL || tstream->fd_buf->used == 0)
+               return 1;
+
+       struct const_iovec iov = { 0, 0 };
+       return o_stream_temp_fd_buf_sendv(tstream, &iov, 0, TRUE) < 0 ? -1 : 1;
+}
+
 static bool
 o_stream_temp_dup_istream(struct temp_ostream *outstream,
                          struct istream *instream,
@@ -264,6 +351,21 @@ o_stream_temp_send_istream(struct ostream_private *_outstream,
        return io_stream_copy(&outstream->ostream.ostream, instream);
 }
 
+static ssize_t iostream_temp_writev(int fd, const struct iovec *iov,
+                                   unsigned int iov_count)
+{
+       return writev(fd, iov, iov_count);
+}
+
+void o_stream_temp_set_writev(struct ostream *output,
+                             ssize_t (*func)(int fd, const struct iovec *iov,
+                                             unsigned int iov_count))
+{
+       struct temp_ostream *tstream =
+               container_of(output->real_stream, struct temp_ostream, ostream);
+       tstream->writev = func;
+}
+
 struct ostream *iostream_temp_create(const char *temp_path_prefix,
                                     enum iostream_temp_flags flags)
 {
@@ -290,12 +392,14 @@ struct ostream *iostream_temp_create_sized(const char *temp_path_prefix,
        tstream->ostream.ostream.blocking = TRUE;
        tstream->ostream.sendv = o_stream_temp_sendv;
        tstream->ostream.send_istream = o_stream_temp_send_istream;
+       tstream->ostream.flush = o_stream_temp_flush;
        tstream->ostream.iostream.close = o_stream_temp_close;
        tstream->temp_path_prefix = i_strdup(temp_path_prefix);
        tstream->flags = flags;
        tstream->max_mem_size = max_mem_size;
        tstream->buf = buffer_create_dynamic(default_pool, 8192);
        tstream->fd = -1;
+       tstream->writev = iostream_temp_writev;
 
        output = o_stream_create(&tstream->ostream, NULL, -1);
        tstream->name = i_strdup(name);
@@ -323,14 +427,21 @@ struct istream *iostream_temp_finish(struct ostream **output,
        struct istream *input, *input2;
        uoff_t abs_offset, size;
        const char *for_path;
-       int fd;
+       int fd, ret;
 
        if (tstream->name[0] == '\0')
                for_path = "";
        else
                for_path = t_strdup_printf(" for %s", tstream->name);
 
-       if (tstream->dupstream != NULL && !tstream->dupstream->closed) {
+       ret = o_stream_finish(*output);
+       if (ret <= 0) {
+               i_assert(ret < 0);
+               input = i_stream_create_error_str((*output)->stream_errno, "%s",
+                                                 o_stream_get_error(*output));
+               i_stream_set_name(input, t_strdup_printf(
+                       "(Temp file in %s)", tstream->temp_path_prefix));
+       } else if (tstream->dupstream != NULL && !tstream->dupstream->closed) {
                abs_offset = i_stream_get_absolute_offset(tstream->dupstream) -
                        tstream->dupstream->v_offset +
                        tstream->dupstream_start_offset;
index bf5a45d15d29958243e44409d057dc29ac8ba1d4..a6d84e2ffc1015b62c77d06c2bfef987817f43ac 100644 (file)
@@ -26,6 +26,8 @@ struct istream *iostream_temp_finish(struct ostream **output,
                                     size_t max_buffer_size);
 
 /* For internal testing: */
-int o_stream_temp_move_to_memory(struct ostream *output);
+void o_stream_temp_set_writev(struct ostream *output,
+                             ssize_t (*func)(int fd, const struct iovec *iov,
+                                             unsigned int iov_count));
 
 #endif
index cfb8658604bfd67b83f9f6bf76c9ba9cb239c477..99b5b8d85af72c944a4a53e2515c124e1597fe5c 100644 (file)
@@ -7,7 +7,53 @@
 
 #include <unistd.h>
 #include <fcntl.h>
+#include <sys/uio.h>
 
+static size_t test_writev_fail_at_left = 0;
+
+static ssize_t
+test_writev_fail(int fd ATTR_UNUSED, const struct iovec *iov ATTR_UNUSED,
+                unsigned int iov_count ATTR_UNUSED)
+{
+       errno = EIO;
+       return -1;
+}
+
+static ssize_t
+test_writev_fail_at(int fd, const struct iovec *iov,
+                   unsigned int iov_count)
+{
+       struct iovec iov_copy[iov_count];
+       memcpy(iov_copy, iov, sizeof(*iov) * iov_count);
+
+       unsigned int i;
+       for (i = 0; i < iov_count; i++) {
+               if (test_writev_fail_at_left < iov_copy[i].iov_len) {
+                       iov_copy[i].iov_len = test_writev_fail_at_left;
+                       test_writev_fail_at_left = 0;
+                       i++;
+                       break;
+               }
+               test_writev_fail_at_left -= iov_copy[i].iov_len;
+       }
+       if (i == 1 && iov_copy[0].iov_len == 0) {
+               errno = EIO;
+               return -1;
+       }
+       return writev(fd, iov_copy, i);
+}
+
+static const char *test_iostream_temp_finish(struct ostream *output)
+{
+       struct istream *input = iostream_temp_finish(&output, 128);
+
+       (void)i_stream_read(input);
+       size_t size;
+       const unsigned char *data = i_stream_get_data(input, &size);
+       const char *str = t_strndup(data, size);
+       i_stream_destroy(&input);
+       return str;
+}
 static void test_iostream_temp_create_sized_memory(void)
 {
        struct ostream *output;
@@ -26,7 +72,7 @@ static void test_iostream_temp_create_sized_memory(void)
        test_expect_no_more_errors();
 
        test_assert(o_stream_get_fd(output) == -1);
-       o_stream_destroy(&output);
+       test_assert_strcmp(test_iostream_temp_finish(output), "12345");
        test_end();
 }
 
@@ -44,31 +90,109 @@ static void test_iostream_temp_create_sized_disk(void)
        test_assert(o_stream_send(output, "5", 1) == 1);
        test_assert(output->offset == 5);
        test_assert(o_stream_get_fd(output) != -1);
-       o_stream_destroy(&output);
+       test_assert_strcmp(test_iostream_temp_finish(output), "12345");
        test_end();
 }
 
-static void test_iostream_temp_create_write_error(void)
+static void test_iostream_temp_create_sized_disk_mixed(void)
 {
        struct ostream *output;
 
-       test_begin("iostream_temp_create_sized() write error");
-       output = iostream_temp_create_sized(".", 0, "test", 1);
-
-       test_assert(o_stream_send(output, "123", 3) == 3);
+       test_begin("iostream_temp_create_sized() disk (mixed)");
+       output = iostream_temp_create_sized(".", 0, "test", 2);
+       test_assert(o_stream_send(output, "12", 2) == 2);
+       test_assert(o_stream_get_fd(output) == -1);
+       test_assert(o_stream_send(output, "3", 1) == 1);
        test_assert(o_stream_get_fd(output) != -1);
        test_assert(output->offset == 3);
-       test_assert(o_stream_temp_move_to_memory(output) == 0);
-       test_assert(o_stream_get_fd(output) == -1);
        test_assert(o_stream_send(output, "45", 2) == 2);
+       test_assert(o_stream_send(output, "6", 1) == 1);
+       test_assert_strcmp(test_iostream_temp_finish(output), "123456");
+       test_end();
+}
+
+static void test_iostream_temp_create_write_error_middle(void)
+{
+       struct ostream *output;
+
+       test_begin("iostream_temp_create_sized() write error (middle)");
+       /* 2 bytes before it's first flushed to disk + 2 bytes in-memory buffer
+          before more data is written to disk. */
+       output = iostream_temp_create_sized(".", 0, "test", 2);
+
+       test_assert(o_stream_send(output, "12", 2) == 2);
+       test_assert(o_stream_get_fd(output) == -1);
+       test_assert(o_stream_send(output, "34", 2) == 2);
+       test_assert(o_stream_get_fd(output) != -1);
+       test_assert(output->offset == 4);
+
+       o_stream_temp_set_writev(output, test_writev_fail);
+
+       test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory");
+       test_assert(o_stream_send(output, "5", 1) == 1);
+       test_expect_no_more_errors();
+
+       test_assert(o_stream_get_fd(output) == -1);
        test_assert(output->offset == 5);
 
-       const unsigned char *data;
-       size_t size;
-       struct istream *input = iostream_temp_finish(&output, 128);
-       test_assert(i_stream_read_bytes(input, &data, &size, 5) == 1 &&
-                   memcmp(data, "12345", 5) == 0);
-       i_stream_destroy(&input);
+       test_assert(o_stream_send(output, "6", 1) == 1);
+       test_assert_strcmp(test_iostream_temp_finish(output), "123456");
+
+       test_end();
+}
+
+static void test_iostream_temp_create_write_error_finish(void)
+{
+       struct ostream *output;
+
+       test_begin("iostream_temp_create_sized() write error (finish)");
+
+       output = iostream_temp_create_sized(".", 0, "test", 2);
+
+       test_assert(o_stream_send(output, "12", 2) == 2);
+       test_assert(o_stream_get_fd(output) == -1);
+       test_assert(o_stream_send(output, "34", 2) == 2);
+       test_assert(o_stream_get_fd(output) != -1);
+       test_assert(output->offset == 4);
+
+       o_stream_temp_set_writev(output, test_writev_fail);
+
+       test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory");
+       test_assert_strcmp(test_iostream_temp_finish(output), "1234");
+       test_expect_no_more_errors();
+
+       test_end();
+}
+
+static void test_iostream_temp_create_write_error_mixed(void)
+{
+       struct ostream *output;
+
+       test_begin("iostream_temp_create_sized() write error (mixed)");
+
+       for (unsigned int i = 0; i < 5; i++) {
+               output = iostream_temp_create_sized(".", 0, "test", 2);
+               test_assert_idx(o_stream_send(output, "12", 2) == 2, i);
+               test_assert_idx(o_stream_get_fd(output) == -1, i);
+               test_assert_idx(o_stream_send(output, "3", 1) == 1, i);
+               test_assert_idx(o_stream_get_fd(output) != -1, i);
+               test_assert_idx(output->offset == 3, i);
+
+               struct const_iovec iov[2] = {
+                       { "45", 2 },
+                       { "67", 2 },
+               };
+               test_writev_fail_at_left = i;
+               o_stream_temp_set_writev(output, test_writev_fail_at);
+
+               test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory");
+               test_assert_idx(o_stream_sendv(output, iov, 2) == 4, i);
+               test_expect_no_more_errors();
+
+               test_assert_idx(o_stream_get_fd(output) == -1, i);
+               test_assert_idx(output->offset == 7, i);
+               test_assert_strcmp_idx(test_iostream_temp_finish(output), "1234567", i);
+       }
 
        test_end();
 }
@@ -145,6 +269,9 @@ void test_iostream_temp(void)
 {
        test_iostream_temp_create_sized_memory();
        test_iostream_temp_create_sized_disk();
-       test_iostream_temp_create_write_error();
+       test_iostream_temp_create_sized_disk_mixed();
+       test_iostream_temp_create_write_error_middle();
+       test_iostream_temp_create_write_error_finish();
+       test_iostream_temp_create_write_error_mixed();
        test_iostream_temp_istream();
 }