From: Timo Sirainen Date: Tue, 21 Oct 2025 10:33:56 +0000 (+0300) Subject: lib: iostream-temp - Optimize writing many small writes X-Git-Tag: 2.4.2~26 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0e5890dd0f3fe66fe64b510756e1a7fed85b2f46;p=thirdparty%2Fdovecot%2Fcore.git lib: iostream-temp - Optimize writing many small writes There was no buffering for o_stream_send*() calls after the initial in-memory buffer size was exceeded. Each one was immediately written to disk. This fixes inefficiencies at least with doveadm-server HTTP responses. --- diff --git a/src/lib/iostream-temp.c b/src/lib/iostream-temp.c index b518367c45..7b460796f3 100644 --- a/src/lib/iostream-temp.c +++ b/src/lib/iostream-temp.c @@ -10,6 +10,7 @@ #include "iostream-temp.h" #include +#include #define IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT (1024*128) @@ -20,11 +21,16 @@ struct temp_ostream { enum iostream_temp_flags flags; size_t max_mem_size; + /* writev() wrapper - overridden for unit tests */ + ssize_t (*writev)(int fd, const struct iovec *iov, + unsigned int iov_count); + struct istream *dupstream; uoff_t dupstream_offset, dupstream_start_offset; char *name; buffer_t *buf; + buffer_t *fd_buf; int fd; bool fd_tried; uoff_t fd_size; @@ -42,6 +48,7 @@ o_stream_temp_close(struct iostream_private *stream, i_close_fd(&tstream->fd); buffer_free(&tstream->buf); + buffer_free(&tstream->fd_buf); i_free(tstream->temp_path_prefix); i_free(tstream->name); } @@ -74,11 +81,15 @@ static int o_stream_temp_move_to_fd(struct temp_ostream *tstream) e.g. for unit tests */ tstream->ostream.fd = tstream->fd; tstream->fd_size = tstream->buf->used; + /* max_mem_size is smaller than IO_BLOCK_SIZE only in unit tests */ + size_t fd_buf_size = I_MIN(IO_BLOCK_SIZE, tstream->max_mem_size); + tstream->fd_buf = buffer_create_dynamic_max(default_pool, + fd_buf_size, fd_buf_size); buffer_free(&tstream->buf); return 0; } -int o_stream_temp_move_to_memory(struct ostream *output) +static int o_stream_temp_move_to_memory(struct ostream *output) { struct temp_ostream *tstream = container_of(output->real_stream, struct temp_ostream, ostream); @@ -88,10 +99,7 @@ int o_stream_temp_move_to_memory(struct ostream *output) i_assert(tstream->buf == NULL); tstream->buf = buffer_create_dynamic(default_pool, 8192); - while (offset < tstream->ostream.ostream.offset && - (ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) { - if ((size_t)ret > tstream->ostream.ostream.offset - offset) - ret = tstream->ostream.ostream.offset - offset; + while ((ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) { buffer_append(tstream->buf, buf, ret); offset += ret; } @@ -109,32 +117,97 @@ int o_stream_temp_move_to_memory(struct ostream *output) } static ssize_t -o_stream_temp_fd_sendv(struct temp_ostream *tstream, - const struct const_iovec *iov, unsigned int iov_count) +o_stream_temp_fd_buf_sendv(struct temp_ostream *tstream, + const struct const_iovec *iov, + unsigned int iov_count, bool flush) { - size_t bytes = 0; - unsigned int i; + i_assert(tstream->buf == NULL); - for (i = 0; i < iov_count; i++) { - if (write_full(tstream->fd, iov[i].iov_base, iov[i].iov_len) < 0) { - i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory", - o_stream_get_name(&tstream->ostream.ostream), - tstream->temp_path_prefix); - if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0) - return -1; - for (; i < iov_count; i++) { - buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len); - bytes += iov[i].iov_len; - tstream->ostream.ostream.offset += iov[i].iov_len; - } - i_assert(tstream->fd_tried); - return bytes; + /* if the amount of data fits into fd_buf, put it there */ + size_t total_size = 0; + for (unsigned int i = 0; i < iov_count; i++) + total_size += iov[i].iov_len; + if (total_size + tstream->fd_buf->used <= + buffer_get_writable_size(tstream->fd_buf) && !flush) { + for (unsigned int i = 0; i < iov_count; i++) { + buffer_append(tstream->fd_buf, iov[i].iov_base, + iov[i].iov_len); + } + return total_size; + } + + struct const_iovec iov_copy[iov_count + 1]; + struct const_iovec *new_iov = iov_copy; + + /* Use writev() to send all the pieces */ + size_t fd_buf_used = tstream->fd_buf->used; + if (fd_buf_used == 0) + memcpy(new_iov, iov, sizeof(*new_iov) * iov_count); + else { + /* Create iovec that is prefixed by the already buffered data */ + new_iov[0].iov_base = tstream->fd_buf->data; + new_iov[0].iov_len = tstream->fd_buf->used; + memcpy(new_iov + 1, iov, sizeof(*new_iov) * iov_count); + iov_count++; + } + + size_t bytes_sent = 0; + ssize_t ret; + while ((ret = tstream->writev(tstream->fd, + (const struct iovec *)new_iov, + I_MIN(iov_count, IOV_MAX))) > 0) { + bytes_sent += ret; + i_assert(bytes_sent <= total_size + fd_buf_used); + if (bytes_sent == total_size + fd_buf_used) + break; + /* partial write, try again */ + while ((size_t)ret >= new_iov->iov_len) { + ret -= new_iov->iov_len; + new_iov++; + iov_count--; + i_assert(iov_count > 0); } - bytes += iov[i].iov_len; - tstream->ostream.ostream.offset += iov[i].iov_len; + new_iov->iov_len -= ret; + new_iov->iov_base = CONST_PTR_OFFSET(new_iov->iov_base, ret); } - tstream->fd_size += bytes; - return bytes; + if (ret == 0) { + /* shouldn't happen - assume it's out of disk space */ + errno = ENOSPC; + ret = -1; + } + if (ret < 0) { + i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory", + o_stream_get_name(&tstream->ostream.ostream), + tstream->temp_path_prefix); + if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0) + return -1; + for (unsigned int i = 0; i < iov_count; i++) { + buffer_append(tstream->buf, new_iov[i].iov_base, + new_iov[i].iov_len); + bytes_sent += new_iov[i].iov_len; + } + i_assert(bytes_sent == total_size + fd_buf_used); + buffer_free(&tstream->fd_buf); + } else { + buffer_set_used_size(tstream->fd_buf, 0); + } + return total_size; +} + +static ssize_t +o_stream_temp_fd_sendv(struct temp_ostream *tstream, + const struct const_iovec *iov, unsigned int iov_count) +{ + i_assert(tstream->fd_buf != NULL); + /* max_mem_size has been reached, and we're writing to a temp file. */ + ssize_t ret = o_stream_temp_fd_buf_sendv(tstream, iov, iov_count, FALSE); + if (ret < 0) + return -1; + + if (tstream->fd != -1) + tstream->fd_size += ret; + tstream->ostream.ostream.offset += ret; + return ret; } static ssize_t @@ -157,6 +230,8 @@ o_stream_temp_sendv(struct ostream_private *stream, if (tstream->fd != -1) return o_stream_temp_fd_sendv(tstream, iov, iov_count); + /* Either max_mem_size has not been reached yet, or we failed + to write to a temp file (e.g. out of disk space). */ for (i = 0; i < iov_count; i++) { if (tstream->buf->used + iov[i].iov_len > tstream->max_mem_size) { if (o_stream_temp_move_to_fd(tstream) == 0) { @@ -210,6 +285,18 @@ static bool o_stream_temp_dup_cancel(struct temp_ostream *tstream, return ret; } +static int o_stream_temp_flush(struct ostream_private *stream) +{ + struct temp_ostream *tstream = + container_of(stream, struct temp_ostream, ostream); + + if (tstream->fd_buf == NULL || tstream->fd_buf->used == 0) + return 1; + + struct const_iovec iov = { 0, 0 }; + return o_stream_temp_fd_buf_sendv(tstream, &iov, 0, TRUE) < 0 ? -1 : 1; +} + static bool o_stream_temp_dup_istream(struct temp_ostream *outstream, struct istream *instream, @@ -264,6 +351,21 @@ o_stream_temp_send_istream(struct ostream_private *_outstream, return io_stream_copy(&outstream->ostream.ostream, instream); } +static ssize_t iostream_temp_writev(int fd, const struct iovec *iov, + unsigned int iov_count) +{ + return writev(fd, iov, iov_count); +} + +void o_stream_temp_set_writev(struct ostream *output, + ssize_t (*func)(int fd, const struct iovec *iov, + unsigned int iov_count)) +{ + struct temp_ostream *tstream = + container_of(output->real_stream, struct temp_ostream, ostream); + tstream->writev = func; +} + struct ostream *iostream_temp_create(const char *temp_path_prefix, enum iostream_temp_flags flags) { @@ -290,12 +392,14 @@ struct ostream *iostream_temp_create_sized(const char *temp_path_prefix, tstream->ostream.ostream.blocking = TRUE; tstream->ostream.sendv = o_stream_temp_sendv; tstream->ostream.send_istream = o_stream_temp_send_istream; + tstream->ostream.flush = o_stream_temp_flush; tstream->ostream.iostream.close = o_stream_temp_close; tstream->temp_path_prefix = i_strdup(temp_path_prefix); tstream->flags = flags; tstream->max_mem_size = max_mem_size; tstream->buf = buffer_create_dynamic(default_pool, 8192); tstream->fd = -1; + tstream->writev = iostream_temp_writev; output = o_stream_create(&tstream->ostream, NULL, -1); tstream->name = i_strdup(name); @@ -323,14 +427,21 @@ struct istream *iostream_temp_finish(struct ostream **output, struct istream *input, *input2; uoff_t abs_offset, size; const char *for_path; - int fd; + int fd, ret; if (tstream->name[0] == '\0') for_path = ""; else for_path = t_strdup_printf(" for %s", tstream->name); - if (tstream->dupstream != NULL && !tstream->dupstream->closed) { + ret = o_stream_finish(*output); + if (ret <= 0) { + i_assert(ret < 0); + input = i_stream_create_error_str((*output)->stream_errno, "%s", + o_stream_get_error(*output)); + i_stream_set_name(input, t_strdup_printf( + "(Temp file in %s)", tstream->temp_path_prefix)); + } else if (tstream->dupstream != NULL && !tstream->dupstream->closed) { abs_offset = i_stream_get_absolute_offset(tstream->dupstream) - tstream->dupstream->v_offset + tstream->dupstream_start_offset; diff --git a/src/lib/iostream-temp.h b/src/lib/iostream-temp.h index bf5a45d15d..a6d84e2ffc 100644 --- a/src/lib/iostream-temp.h +++ b/src/lib/iostream-temp.h @@ -26,6 +26,8 @@ struct istream *iostream_temp_finish(struct ostream **output, size_t max_buffer_size); /* For internal testing: */ -int o_stream_temp_move_to_memory(struct ostream *output); +void o_stream_temp_set_writev(struct ostream *output, + ssize_t (*func)(int fd, const struct iovec *iov, + unsigned int iov_count)); #endif diff --git a/src/lib/test-iostream-temp.c b/src/lib/test-iostream-temp.c index cfb8658604..99b5b8d85a 100644 --- a/src/lib/test-iostream-temp.c +++ b/src/lib/test-iostream-temp.c @@ -7,7 +7,53 @@ #include #include +#include +static size_t test_writev_fail_at_left = 0; + +static ssize_t +test_writev_fail(int fd ATTR_UNUSED, const struct iovec *iov ATTR_UNUSED, + unsigned int iov_count ATTR_UNUSED) +{ + errno = EIO; + return -1; +} + +static ssize_t +test_writev_fail_at(int fd, const struct iovec *iov, + unsigned int iov_count) +{ + struct iovec iov_copy[iov_count]; + memcpy(iov_copy, iov, sizeof(*iov) * iov_count); + + unsigned int i; + for (i = 0; i < iov_count; i++) { + if (test_writev_fail_at_left < iov_copy[i].iov_len) { + iov_copy[i].iov_len = test_writev_fail_at_left; + test_writev_fail_at_left = 0; + i++; + break; + } + test_writev_fail_at_left -= iov_copy[i].iov_len; + } + if (i == 1 && iov_copy[0].iov_len == 0) { + errno = EIO; + return -1; + } + return writev(fd, iov_copy, i); +} + +static const char *test_iostream_temp_finish(struct ostream *output) +{ + struct istream *input = iostream_temp_finish(&output, 128); + + (void)i_stream_read(input); + size_t size; + const unsigned char *data = i_stream_get_data(input, &size); + const char *str = t_strndup(data, size); + i_stream_destroy(&input); + return str; +} static void test_iostream_temp_create_sized_memory(void) { struct ostream *output; @@ -26,7 +72,7 @@ static void test_iostream_temp_create_sized_memory(void) test_expect_no_more_errors(); test_assert(o_stream_get_fd(output) == -1); - o_stream_destroy(&output); + test_assert_strcmp(test_iostream_temp_finish(output), "12345"); test_end(); } @@ -44,31 +90,109 @@ static void test_iostream_temp_create_sized_disk(void) test_assert(o_stream_send(output, "5", 1) == 1); test_assert(output->offset == 5); test_assert(o_stream_get_fd(output) != -1); - o_stream_destroy(&output); + test_assert_strcmp(test_iostream_temp_finish(output), "12345"); test_end(); } -static void test_iostream_temp_create_write_error(void) +static void test_iostream_temp_create_sized_disk_mixed(void) { struct ostream *output; - test_begin("iostream_temp_create_sized() write error"); - output = iostream_temp_create_sized(".", 0, "test", 1); - - test_assert(o_stream_send(output, "123", 3) == 3); + test_begin("iostream_temp_create_sized() disk (mixed)"); + output = iostream_temp_create_sized(".", 0, "test", 2); + test_assert(o_stream_send(output, "12", 2) == 2); + test_assert(o_stream_get_fd(output) == -1); + test_assert(o_stream_send(output, "3", 1) == 1); test_assert(o_stream_get_fd(output) != -1); test_assert(output->offset == 3); - test_assert(o_stream_temp_move_to_memory(output) == 0); - test_assert(o_stream_get_fd(output) == -1); test_assert(o_stream_send(output, "45", 2) == 2); + test_assert(o_stream_send(output, "6", 1) == 1); + test_assert_strcmp(test_iostream_temp_finish(output), "123456"); + test_end(); +} + +static void test_iostream_temp_create_write_error_middle(void) +{ + struct ostream *output; + + test_begin("iostream_temp_create_sized() write error (middle)"); + /* 2 bytes before it's first flushed to disk + 2 bytes in-memory buffer + before more data is written to disk. */ + output = iostream_temp_create_sized(".", 0, "test", 2); + + test_assert(o_stream_send(output, "12", 2) == 2); + test_assert(o_stream_get_fd(output) == -1); + test_assert(o_stream_send(output, "34", 2) == 2); + test_assert(o_stream_get_fd(output) != -1); + test_assert(output->offset == 4); + + o_stream_temp_set_writev(output, test_writev_fail); + + test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory"); + test_assert(o_stream_send(output, "5", 1) == 1); + test_expect_no_more_errors(); + + test_assert(o_stream_get_fd(output) == -1); test_assert(output->offset == 5); - const unsigned char *data; - size_t size; - struct istream *input = iostream_temp_finish(&output, 128); - test_assert(i_stream_read_bytes(input, &data, &size, 5) == 1 && - memcmp(data, "12345", 5) == 0); - i_stream_destroy(&input); + test_assert(o_stream_send(output, "6", 1) == 1); + test_assert_strcmp(test_iostream_temp_finish(output), "123456"); + + test_end(); +} + +static void test_iostream_temp_create_write_error_finish(void) +{ + struct ostream *output; + + test_begin("iostream_temp_create_sized() write error (finish)"); + + output = iostream_temp_create_sized(".", 0, "test", 2); + + test_assert(o_stream_send(output, "12", 2) == 2); + test_assert(o_stream_get_fd(output) == -1); + test_assert(o_stream_send(output, "34", 2) == 2); + test_assert(o_stream_get_fd(output) != -1); + test_assert(output->offset == 4); + + o_stream_temp_set_writev(output, test_writev_fail); + + test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory"); + test_assert_strcmp(test_iostream_temp_finish(output), "1234"); + test_expect_no_more_errors(); + + test_end(); +} + +static void test_iostream_temp_create_write_error_mixed(void) +{ + struct ostream *output; + + test_begin("iostream_temp_create_sized() write error (mixed)"); + + for (unsigned int i = 0; i < 5; i++) { + output = iostream_temp_create_sized(".", 0, "test", 2); + test_assert_idx(o_stream_send(output, "12", 2) == 2, i); + test_assert_idx(o_stream_get_fd(output) == -1, i); + test_assert_idx(o_stream_send(output, "3", 1) == 1, i); + test_assert_idx(o_stream_get_fd(output) != -1, i); + test_assert_idx(output->offset == 3, i); + + struct const_iovec iov[2] = { + { "45", 2 }, + { "67", 2 }, + }; + test_writev_fail_at_left = i; + o_stream_temp_set_writev(output, test_writev_fail_at); + + test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory"); + test_assert_idx(o_stream_sendv(output, iov, 2) == 4, i); + test_expect_no_more_errors(); + + test_assert_idx(o_stream_get_fd(output) == -1, i); + test_assert_idx(output->offset == 7, i); + test_assert_strcmp_idx(test_iostream_temp_finish(output), "1234567", i); + } test_end(); } @@ -145,6 +269,9 @@ void test_iostream_temp(void) { test_iostream_temp_create_sized_memory(); test_iostream_temp_create_sized_disk(); - test_iostream_temp_create_write_error(); + test_iostream_temp_create_sized_disk_mixed(); + test_iostream_temp_create_write_error_middle(); + test_iostream_temp_create_write_error_finish(); + test_iostream_temp_create_write_error_mixed(); test_iostream_temp_istream(); }