#include "iostream-temp.h"
#include <unistd.h>
+#include <sys/uio.h>
#define IOSTREAM_TEMP_MAX_BUF_SIZE_DEFAULT (1024*128)
enum iostream_temp_flags flags;
size_t max_mem_size;
+ /* writev() wrapper - overridden for unit tests */
+ ssize_t (*writev)(int fd, const struct iovec *iov,
+ unsigned int iov_count);
+
struct istream *dupstream;
uoff_t dupstream_offset, dupstream_start_offset;
char *name;
buffer_t *buf;
+ buffer_t *fd_buf;
int fd;
bool fd_tried;
uoff_t fd_size;
i_close_fd(&tstream->fd);
buffer_free(&tstream->buf);
+ buffer_free(&tstream->fd_buf);
i_free(tstream->temp_path_prefix);
i_free(tstream->name);
}
e.g. for unit tests */
tstream->ostream.fd = tstream->fd;
tstream->fd_size = tstream->buf->used;
+ /* max_mem_size is smaller than IO_BLOCK_SIZE only in unit tests */
+ size_t fd_buf_size = I_MIN(IO_BLOCK_SIZE, tstream->max_mem_size);
+ tstream->fd_buf = buffer_create_dynamic_max(default_pool,
+ fd_buf_size, fd_buf_size);
buffer_free(&tstream->buf);
return 0;
}
-int o_stream_temp_move_to_memory(struct ostream *output)
+static int o_stream_temp_move_to_memory(struct ostream *output)
{
struct temp_ostream *tstream =
container_of(output->real_stream, struct temp_ostream, ostream);
i_assert(tstream->buf == NULL);
tstream->buf = buffer_create_dynamic(default_pool, 8192);
- while (offset < tstream->ostream.ostream.offset &&
- (ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) {
- if ((size_t)ret > tstream->ostream.ostream.offset - offset)
- ret = tstream->ostream.ostream.offset - offset;
+ while ((ret = pread(tstream->fd, buf, sizeof(buf), offset)) > 0) {
buffer_append(tstream->buf, buf, ret);
offset += ret;
}
}
static ssize_t
-o_stream_temp_fd_sendv(struct temp_ostream *tstream,
- const struct const_iovec *iov, unsigned int iov_count)
+o_stream_temp_fd_buf_sendv(struct temp_ostream *tstream,
+ const struct const_iovec *iov,
+ unsigned int iov_count, bool flush)
{
- size_t bytes = 0;
- unsigned int i;
+ i_assert(tstream->buf == NULL);
- for (i = 0; i < iov_count; i++) {
- if (write_full(tstream->fd, iov[i].iov_base, iov[i].iov_len) < 0) {
- i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory",
- o_stream_get_name(&tstream->ostream.ostream),
- tstream->temp_path_prefix);
- if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0)
- return -1;
- for (; i < iov_count; i++) {
- buffer_append(tstream->buf, iov[i].iov_base, iov[i].iov_len);
- bytes += iov[i].iov_len;
- tstream->ostream.ostream.offset += iov[i].iov_len;
- }
- i_assert(tstream->fd_tried);
- return bytes;
+ /* if the amount of data fits into fd_buf, put it there */
+ size_t total_size = 0;
+ for (unsigned int i = 0; i < iov_count; i++)
+ total_size += iov[i].iov_len;
+ if (total_size + tstream->fd_buf->used <=
+ buffer_get_writable_size(tstream->fd_buf) && !flush) {
+ for (unsigned int i = 0; i < iov_count; i++) {
+ buffer_append(tstream->fd_buf, iov[i].iov_base,
+ iov[i].iov_len);
+ }
+ return total_size;
+ }
+
+ struct const_iovec iov_copy[iov_count + 1];
+ struct const_iovec *new_iov = iov_copy;
+
+ /* Use writev() to send all the pieces */
+ size_t fd_buf_used = tstream->fd_buf->used;
+ if (fd_buf_used == 0)
+ memcpy(new_iov, iov, sizeof(*new_iov) * iov_count);
+ else {
+ /* Create iovec that is prefixed by the already buffered data */
+ new_iov[0].iov_base = tstream->fd_buf->data;
+ new_iov[0].iov_len = tstream->fd_buf->used;
+ memcpy(new_iov + 1, iov, sizeof(*new_iov) * iov_count);
+ iov_count++;
+ }
+
+ size_t bytes_sent = 0;
+ ssize_t ret;
+ while ((ret = tstream->writev(tstream->fd,
+ (const struct iovec *)new_iov,
+ I_MIN(iov_count, IOV_MAX))) > 0) {
+ bytes_sent += ret;
+ i_assert(bytes_sent <= total_size + fd_buf_used);
+ if (bytes_sent == total_size + fd_buf_used)
+ break;
+ /* partial write, try again */
+ while ((size_t)ret >= new_iov->iov_len) {
+ ret -= new_iov->iov_len;
+ new_iov++;
+ iov_count--;
+ i_assert(iov_count > 0);
}
- bytes += iov[i].iov_len;
- tstream->ostream.ostream.offset += iov[i].iov_len;
+ new_iov->iov_len -= ret;
+ new_iov->iov_base = CONST_PTR_OFFSET(new_iov->iov_base, ret);
}
- tstream->fd_size += bytes;
- return bytes;
+ if (ret == 0) {
+ /* shouldn't happen - assume it's out of disk space */
+ errno = ENOSPC;
+ ret = -1;
+ }
+ if (ret < 0) {
+ i_error("iostream-temp %s: write(%s*) failed: %m - moving to memory",
+ o_stream_get_name(&tstream->ostream.ostream),
+ tstream->temp_path_prefix);
+ if (o_stream_temp_move_to_memory(&tstream->ostream.ostream) < 0)
+ return -1;
+ for (unsigned int i = 0; i < iov_count; i++) {
+ buffer_append(tstream->buf, new_iov[i].iov_base,
+ new_iov[i].iov_len);
+ bytes_sent += new_iov[i].iov_len;
+ }
+ i_assert(bytes_sent == total_size + fd_buf_used);
+ buffer_free(&tstream->fd_buf);
+ } else {
+ buffer_set_used_size(tstream->fd_buf, 0);
+ }
+ return total_size;
+}
+
+static ssize_t
+o_stream_temp_fd_sendv(struct temp_ostream *tstream,
+ const struct const_iovec *iov, unsigned int iov_count)
+{
+ i_assert(tstream->fd_buf != NULL);
+ /* max_mem_size has been reached, and we're writing to a temp file. */
+ ssize_t ret = o_stream_temp_fd_buf_sendv(tstream, iov, iov_count, FALSE);
+ if (ret < 0)
+ return -1;
+
+ if (tstream->fd != -1)
+ tstream->fd_size += ret;
+ tstream->ostream.ostream.offset += ret;
+ return ret;
}
static ssize_t
if (tstream->fd != -1)
return o_stream_temp_fd_sendv(tstream, iov, iov_count);
+ /* Either max_mem_size has not been reached yet, or we failed
+ to write to a temp file (e.g. out of disk space). */
for (i = 0; i < iov_count; i++) {
if (tstream->buf->used + iov[i].iov_len > tstream->max_mem_size) {
if (o_stream_temp_move_to_fd(tstream) == 0) {
return ret;
}
+static int o_stream_temp_flush(struct ostream_private *stream)
+{
+ struct temp_ostream *tstream =
+ container_of(stream, struct temp_ostream, ostream);
+
+ if (tstream->fd_buf == NULL || tstream->fd_buf->used == 0)
+ return 1;
+
+ struct const_iovec iov = { 0, 0 };
+ return o_stream_temp_fd_buf_sendv(tstream, &iov, 0, TRUE) < 0 ? -1 : 1;
+}
+
static bool
o_stream_temp_dup_istream(struct temp_ostream *outstream,
struct istream *instream,
return io_stream_copy(&outstream->ostream.ostream, instream);
}
+static ssize_t iostream_temp_writev(int fd, const struct iovec *iov,
+ unsigned int iov_count)
+{
+ return writev(fd, iov, iov_count);
+}
+
+void o_stream_temp_set_writev(struct ostream *output,
+ ssize_t (*func)(int fd, const struct iovec *iov,
+ unsigned int iov_count))
+{
+ struct temp_ostream *tstream =
+ container_of(output->real_stream, struct temp_ostream, ostream);
+ tstream->writev = func;
+}
+
struct ostream *iostream_temp_create(const char *temp_path_prefix,
enum iostream_temp_flags flags)
{
tstream->ostream.ostream.blocking = TRUE;
tstream->ostream.sendv = o_stream_temp_sendv;
tstream->ostream.send_istream = o_stream_temp_send_istream;
+ tstream->ostream.flush = o_stream_temp_flush;
tstream->ostream.iostream.close = o_stream_temp_close;
tstream->temp_path_prefix = i_strdup(temp_path_prefix);
tstream->flags = flags;
tstream->max_mem_size = max_mem_size;
tstream->buf = buffer_create_dynamic(default_pool, 8192);
tstream->fd = -1;
+ tstream->writev = iostream_temp_writev;
output = o_stream_create(&tstream->ostream, NULL, -1);
tstream->name = i_strdup(name);
struct istream *input, *input2;
uoff_t abs_offset, size;
const char *for_path;
- int fd;
+ int fd, ret;
if (tstream->name[0] == '\0')
for_path = "";
else
for_path = t_strdup_printf(" for %s", tstream->name);
- if (tstream->dupstream != NULL && !tstream->dupstream->closed) {
+ ret = o_stream_finish(*output);
+ if (ret <= 0) {
+ i_assert(ret < 0);
+ input = i_stream_create_error_str((*output)->stream_errno, "%s",
+ o_stream_get_error(*output));
+ i_stream_set_name(input, t_strdup_printf(
+ "(Temp file in %s)", tstream->temp_path_prefix));
+ } else if (tstream->dupstream != NULL && !tstream->dupstream->closed) {
abs_offset = i_stream_get_absolute_offset(tstream->dupstream) -
tstream->dupstream->v_offset +
tstream->dupstream_start_offset;
#include <unistd.h>
#include <fcntl.h>
+#include <sys/uio.h>
+static size_t test_writev_fail_at_left = 0;
+
+static ssize_t
+test_writev_fail(int fd ATTR_UNUSED, const struct iovec *iov ATTR_UNUSED,
+ unsigned int iov_count ATTR_UNUSED)
+{
+ errno = EIO;
+ return -1;
+}
+
+static ssize_t
+test_writev_fail_at(int fd, const struct iovec *iov,
+ unsigned int iov_count)
+{
+ struct iovec iov_copy[iov_count];
+ memcpy(iov_copy, iov, sizeof(*iov) * iov_count);
+
+ unsigned int i;
+ for (i = 0; i < iov_count; i++) {
+ if (test_writev_fail_at_left < iov_copy[i].iov_len) {
+ iov_copy[i].iov_len = test_writev_fail_at_left;
+ test_writev_fail_at_left = 0;
+ i++;
+ break;
+ }
+ test_writev_fail_at_left -= iov_copy[i].iov_len;
+ }
+ if (i == 1 && iov_copy[0].iov_len == 0) {
+ errno = EIO;
+ return -1;
+ }
+ return writev(fd, iov_copy, i);
+}
+
+static const char *test_iostream_temp_finish(struct ostream *output)
+{
+ struct istream *input = iostream_temp_finish(&output, 128);
+
+ (void)i_stream_read(input);
+ size_t size;
+ const unsigned char *data = i_stream_get_data(input, &size);
+ const char *str = t_strndup(data, size);
+ i_stream_destroy(&input);
+ return str;
+}
static void test_iostream_temp_create_sized_memory(void)
{
struct ostream *output;
test_expect_no_more_errors();
test_assert(o_stream_get_fd(output) == -1);
- o_stream_destroy(&output);
+ test_assert_strcmp(test_iostream_temp_finish(output), "12345");
test_end();
}
test_assert(o_stream_send(output, "5", 1) == 1);
test_assert(output->offset == 5);
test_assert(o_stream_get_fd(output) != -1);
- o_stream_destroy(&output);
+ test_assert_strcmp(test_iostream_temp_finish(output), "12345");
test_end();
}
-static void test_iostream_temp_create_write_error(void)
+static void test_iostream_temp_create_sized_disk_mixed(void)
{
struct ostream *output;
- test_begin("iostream_temp_create_sized() write error");
- output = iostream_temp_create_sized(".", 0, "test", 1);
-
- test_assert(o_stream_send(output, "123", 3) == 3);
+ test_begin("iostream_temp_create_sized() disk (mixed)");
+ output = iostream_temp_create_sized(".", 0, "test", 2);
+ test_assert(o_stream_send(output, "12", 2) == 2);
+ test_assert(o_stream_get_fd(output) == -1);
+ test_assert(o_stream_send(output, "3", 1) == 1);
test_assert(o_stream_get_fd(output) != -1);
test_assert(output->offset == 3);
- test_assert(o_stream_temp_move_to_memory(output) == 0);
- test_assert(o_stream_get_fd(output) == -1);
test_assert(o_stream_send(output, "45", 2) == 2);
+ test_assert(o_stream_send(output, "6", 1) == 1);
+ test_assert_strcmp(test_iostream_temp_finish(output), "123456");
+ test_end();
+}
+
+static void test_iostream_temp_create_write_error_middle(void)
+{
+ struct ostream *output;
+
+ test_begin("iostream_temp_create_sized() write error (middle)");
+ /* 2 bytes before it's first flushed to disk + 2 bytes in-memory buffer
+ before more data is written to disk. */
+ output = iostream_temp_create_sized(".", 0, "test", 2);
+
+ test_assert(o_stream_send(output, "12", 2) == 2);
+ test_assert(o_stream_get_fd(output) == -1);
+ test_assert(o_stream_send(output, "34", 2) == 2);
+ test_assert(o_stream_get_fd(output) != -1);
+ test_assert(output->offset == 4);
+
+ o_stream_temp_set_writev(output, test_writev_fail);
+
+ test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory");
+ test_assert(o_stream_send(output, "5", 1) == 1);
+ test_expect_no_more_errors();
+
+ test_assert(o_stream_get_fd(output) == -1);
test_assert(output->offset == 5);
- const unsigned char *data;
- size_t size;
- struct istream *input = iostream_temp_finish(&output, 128);
- test_assert(i_stream_read_bytes(input, &data, &size, 5) == 1 &&
- memcmp(data, "12345", 5) == 0);
- i_stream_destroy(&input);
+ test_assert(o_stream_send(output, "6", 1) == 1);
+ test_assert_strcmp(test_iostream_temp_finish(output), "123456");
+
+ test_end();
+}
+
+static void test_iostream_temp_create_write_error_finish(void)
+{
+ struct ostream *output;
+
+ test_begin("iostream_temp_create_sized() write error (finish)");
+
+ output = iostream_temp_create_sized(".", 0, "test", 2);
+
+ test_assert(o_stream_send(output, "12", 2) == 2);
+ test_assert(o_stream_get_fd(output) == -1);
+ test_assert(o_stream_send(output, "34", 2) == 2);
+ test_assert(o_stream_get_fd(output) != -1);
+ test_assert(output->offset == 4);
+
+ o_stream_temp_set_writev(output, test_writev_fail);
+
+ test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory");
+ test_assert_strcmp(test_iostream_temp_finish(output), "1234");
+ test_expect_no_more_errors();
+
+ test_end();
+}
+
+static void test_iostream_temp_create_write_error_mixed(void)
+{
+ struct ostream *output;
+
+ test_begin("iostream_temp_create_sized() write error (mixed)");
+
+ for (unsigned int i = 0; i < 5; i++) {
+ output = iostream_temp_create_sized(".", 0, "test", 2);
+ test_assert_idx(o_stream_send(output, "12", 2) == 2, i);
+ test_assert_idx(o_stream_get_fd(output) == -1, i);
+ test_assert_idx(o_stream_send(output, "3", 1) == 1, i);
+ test_assert_idx(o_stream_get_fd(output) != -1, i);
+ test_assert_idx(output->offset == 3, i);
+
+ struct const_iovec iov[2] = {
+ { "45", 2 },
+ { "67", 2 },
+ };
+ test_writev_fail_at_left = i;
+ o_stream_temp_set_writev(output, test_writev_fail_at);
+
+ test_expect_error_string("iostream-temp (temp iostream in . for test): write(.*) failed: Input/output error - moving to memory");
+ test_assert_idx(o_stream_sendv(output, iov, 2) == 4, i);
+ test_expect_no_more_errors();
+
+ test_assert_idx(o_stream_get_fd(output) == -1, i);
+ test_assert_idx(output->offset == 7, i);
+ test_assert_strcmp_idx(test_iostream_temp_finish(output), "1234567", i);
+ }
test_end();
}
{
test_iostream_temp_create_sized_memory();
test_iostream_temp_create_sized_disk();
- test_iostream_temp_create_write_error();
+ test_iostream_temp_create_sized_disk_mixed();
+ test_iostream_temp_create_write_error_middle();
+ test_iostream_temp_create_write_error_finish();
+ test_iostream_temp_create_write_error_mixed();
test_iostream_temp_istream();
}