]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: Add istream-try
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Fri, 6 Oct 2017 15:31:12 +0000 (18:31 +0300)
committerTimo Sirainen <timo.sirainen@dovecot.fi>
Thu, 12 Oct 2017 15:26:44 +0000 (18:26 +0300)
This can be used to automatically detect the underlying istream format from
a given list of choices.

src/lib/Makefile.am
src/lib/istream-try.c [new file with mode: 0644]
src/lib/istream-try.h [new file with mode: 0644]
src/lib/test-istream-try.c [new file with mode: 0644]
src/lib/test-lib.c
src/lib/test-lib.h

index 5d16307cde2c3978143c71af2ba4b4da4b1b5c66..7aa0200cd672d3ab55da483c03cd138e2cd6f859 100644 (file)
@@ -77,6 +77,7 @@ liblib_la_SOURCES = \
        istream-seekable.c \
        istream-sized.c \
        istream-tee.c \
+       istream-try.c \
        istream-timeout.c \
        istream-unix.c \
        ioloop.c \
@@ -228,6 +229,7 @@ headers = \
        istream-seekable.h \
        istream-sized.h \
        istream-tee.h \
+       istream-try.h \
        istream-timeout.h \
        istream-unix.h \
        ioloop.h \
@@ -347,6 +349,7 @@ test_lib_SOURCES = \
        test-istream-multiplex.c \
        test-istream-seekable.c \
        test-istream-tee.c \
+       test-istream-try.c \
        test-istream-unix.c \
        test-json-parser.c \
        test-json-tree.c \
diff --git a/src/lib/istream-try.c b/src/lib/istream-try.c
new file mode 100644 (file)
index 0000000..d865efd
--- /dev/null
@@ -0,0 +1,144 @@
+/* Copyright (c) 2013-2017 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "istream-private.h"
+#include "istream-try.h"
+
+struct try_istream {
+       struct istream_private istream;
+
+       unsigned int try_input_count;
+       struct istream **try_input;
+       unsigned int try_idx;
+
+       struct istream *final_input;
+};
+
+static void i_stream_unref_try_inputs(struct try_istream *tstream)
+{
+       for (unsigned int i = 0; i < tstream->try_input_count; i++) {
+               if (tstream->try_input[i] != NULL)
+                       i_stream_unref(&tstream->try_input[i]);
+       }
+       tstream->try_input_count = 0;
+       i_free(tstream->try_input);
+}
+
+static void i_stream_try_close(struct iostream_private *stream,
+                              bool close_parent)
+{
+       struct try_istream *tstream = (struct try_istream *)stream;
+
+       if (close_parent) {
+               if (tstream->istream.parent != NULL)
+                       i_stream_close(tstream->istream.parent);
+               for (unsigned int i = 0; i < tstream->try_input_count; i++) {
+                       if (tstream->try_input[i] != NULL)
+                               i_stream_close(tstream->try_input[i]);
+               }
+       }
+       i_stream_unref_try_inputs(tstream);
+}
+
+static bool i_stream_try_is_buffer_full(struct istream *try_input)
+{
+       /* See if one of the parent istreams have their buffer full.
+          This is mainly intended to check with istream-tee whether its
+          parent is full. That means that the try_input has already seen
+          a full buffer of input, but it hasn't decided to return anything
+          yet. But it also hasn't failed, so we'll assume that the input is
+          correct for it and it simply needs a lot more input before it can
+          return anything (e.g. istream-bzlib). */
+       while (try_input->real_stream->parent != NULL) {
+               try_input = try_input->real_stream->parent;
+               if (try_input->real_stream->pos == try_input->real_stream->buffer_size &&
+                   try_input->real_stream->buffer_size > 0)
+                       return TRUE;
+       }
+       return FALSE;
+}
+
+static int i_stream_try_detect(struct try_istream *tstream)
+{
+       int ret;
+
+       for (; tstream->try_idx < tstream->try_input_count; tstream->try_idx++) {
+               struct istream *try_input =
+                       tstream->try_input[tstream->try_idx];
+
+               ret = i_stream_read(try_input);
+               if (ret == 0 && i_stream_try_is_buffer_full(try_input))
+                       ret = 1;
+               if (ret > 0) {
+                       i_stream_init_parent(&tstream->istream, try_input);
+                       i_stream_unref_try_inputs(tstream);
+                       return 1;
+               }
+               if (ret == 0)
+                       return 0;
+               if (try_input->stream_errno != EINVAL) {
+                       tstream->istream.istream.stream_errno =
+                               try_input->stream_errno;
+                       io_stream_set_error(&tstream->istream.iostream,
+                               "Unexpected error while detecting stream format: %s",
+                               i_stream_get_error(try_input));
+                       return -1;
+               }
+       }
+
+       /* All streams failed with EINVAL. */
+       io_stream_set_error(&tstream->istream.iostream,
+                           "Failed to detect stream format");
+       tstream->istream.istream.stream_errno = EINVAL;
+       return -1;
+}
+
+static ssize_t
+i_stream_try_read(struct istream_private *stream)
+{
+       struct try_istream *tstream = (struct try_istream *)stream;
+       int ret;
+
+       if (stream->parent == NULL) {
+               if ((ret = i_stream_try_detect(tstream)) <= 0)
+                       return ret;
+       }
+
+       i_stream_seek(stream->parent, stream->parent_start_offset +
+                     stream->istream.v_offset);
+       return i_stream_read_copy_from_parent(&stream->istream);
+}
+
+struct istream *istream_try_create(struct istream *const input[])
+{
+       struct try_istream *tstream;
+       unsigned int count;
+       size_t max_buffer_size = I_STREAM_MIN_SIZE;
+       bool blocking = TRUE, seekable = TRUE;
+
+       for (count = 0; input[count] != NULL; count++) {
+               max_buffer_size = I_MAX(max_buffer_size,
+                                       i_stream_get_max_buffer_size(input[count]));
+               if (!input[count]->blocking)
+                       blocking = FALSE;
+               if (!input[count]->seekable)
+                       seekable = FALSE;
+               i_stream_ref(input[count]);
+       }
+       i_assert(count != 0);
+
+       tstream = i_new(struct try_istream, 1);
+       tstream->try_input_count = count;
+       tstream->try_input = p_memdup(default_pool, input,
+                                     sizeof(*input) * count);
+
+       tstream->istream.iostream.close = i_stream_try_close;
+
+       tstream->istream.max_buffer_size = max_buffer_size;
+       tstream->istream.read = i_stream_try_read;
+
+       tstream->istream.istream.readable_fd = FALSE;
+       tstream->istream.istream.blocking = blocking;
+       tstream->istream.istream.seekable = seekable;
+       return i_stream_create(&tstream->istream, NULL, -1);
+}
diff --git a/src/lib/istream-try.h b/src/lib/istream-try.h
new file mode 100644 (file)
index 0000000..f2040b3
--- /dev/null
@@ -0,0 +1,14 @@
+#ifndef ISTREAM_TRY_H
+#define ISTREAM_TRY_H
+
+/* Read from the first input stream that doesn't fail with EINVAL. If any of
+   the streams fail with non-EINVAL, it's treated as a fatal failure and the
+   error is immediately returned. If a stream returns 0, more data is waited
+   for before continuing to the next stream. This allows the last stream to
+   be a fallback stream that always succeeds.
+
+   Once the stream is detected, all the other streams are unreferenced.
+   The streams should usually be children of the same parent tee-istream. */
+struct istream *istream_try_create(struct istream *const input[]);
+
+#endif
diff --git a/src/lib/test-istream-try.c b/src/lib/test-istream-try.c
new file mode 100644 (file)
index 0000000..a9623aa
--- /dev/null
@@ -0,0 +1,130 @@
+/* Copyright (c) 2009-2017 Dovecot authors, see the included COPYING file */
+
+#include "test-lib.h"
+#include "istream.h"
+#include "istream-try.h"
+
+void test_istream_try(void)
+{
+       bool finished = FALSE;
+
+       test_begin("istream try");
+       for (unsigned int test = 0; test <= 10; test++) {
+               struct istream *test_inputs[3], *try_input;
+
+               test_inputs[0] = test_istream_create("1");
+               test_inputs[1] = test_istream_create("2");
+               test_inputs[2] = NULL;
+               test_istream_set_size(test_inputs[0], 0);
+               test_istream_set_size(test_inputs[1], 0);
+               try_input = istream_try_create(test_inputs);
+
+               /* nonblocking read */
+               test_assert_idx(i_stream_read(try_input) == 0, test);
+
+               switch (test) {
+               case 0:
+                       /* stream 0 is available */
+                       test_istream_set_size(test_inputs[0], 1);
+                       test_assert_idx(i_stream_read(try_input) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+                       break;
+               case 1:
+                       /* stream 1 is available, but not used before 0 */
+                       test_istream_set_size(test_inputs[1], 1);
+                       test_assert_idx(i_stream_read(try_input) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+                       /* continue failing stream 0 -> 1 is available */
+                       test_inputs[0]->stream_errno = EINVAL;
+                       test_assert_idx(i_stream_read(try_input) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 1, test);
+                       break;
+               case 2:
+                       /* both streams are available - stream 0 is read */
+                       test_istream_set_size(test_inputs[0], 1);
+                       test_istream_set_size(test_inputs[1], 1);
+                       test_assert_idx(i_stream_read(try_input) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+                       break;
+               case 3:
+                       /* stream 0 fails */
+                       test_inputs[0]->stream_errno = EINVAL;
+                       test_assert_idx(i_stream_read(try_input) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+                       /* continue making stream 1 available */
+                       test_istream_set_size(test_inputs[1], 1);
+                       test_assert_idx(i_stream_read(try_input) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 1, test);
+                       break;
+               case 4:
+                       /* stream 1 fails */
+                       test_inputs[1]->stream_errno = EINVAL;
+                       test_assert_idx(i_stream_read(try_input) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+                       break;
+               case 5:
+                       /* stream 0 fails, stream 1 is available */
+                       test_inputs[0]->stream_errno = EINVAL;
+                       test_istream_set_size(test_inputs[1], 1);
+                       test_assert_idx(i_stream_read(try_input) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 0, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 1, test);
+                       break;
+               case 6:
+                       /* stream 0 is available, stream 1 fails */
+                       test_inputs[1]->stream_errno = EINVAL;
+                       test_istream_set_size(test_inputs[0], 1);
+                       test_assert_idx(i_stream_read(try_input) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[0]) == 1, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+                       break;
+               case 7:
+                       /* both streams fail */
+                       test_inputs[0]->stream_errno = EINVAL;
+                       test_inputs[1]->stream_errno = EINVAL;
+                       test_assert_idx(i_stream_read(try_input) == -1, test);
+                       test_assert_idx(try_input->stream_errno == EINVAL, test);
+                       break;
+               case 8:
+                       /* stream 0 fails with EINVAL, stream 1 with EIO */
+                       test_inputs[0]->stream_errno = EINVAL;
+                       test_inputs[1]->stream_errno = EIO;
+                       test_assert_idx(i_stream_read(try_input) == -1, test);
+                       test_assert_idx(try_input->stream_errno == EIO, test);
+                       break;
+               case 9:
+                       /* stream 0 fails with EIO, stream 1 with EINVAL */
+                       test_inputs[0]->stream_errno = EIO;
+                       test_inputs[1]->stream_errno = EINVAL;
+                       test_assert_idx(i_stream_read(try_input) == -1, test);
+                       test_assert_idx(try_input->stream_errno == EIO, test);
+                       break;
+               case 10:
+                       /* stream 0 fails with EIO, stream 1 would work.. */
+                       test_inputs[0]->stream_errno = EIO;
+                       test_istream_set_size(test_inputs[1], 1);
+                       test_assert_idx(i_stream_read(try_input) == -1, test);
+                       test_assert_idx(try_input->stream_errno == EIO, test);
+                       test_assert_idx(i_stream_get_data_size(test_inputs[1]) == 0, test);
+
+                       finished = TRUE;
+                       break;
+               }
+
+               test_assert_idx(test_inputs[0]->v_offset == 0, test);
+               test_assert_idx(test_inputs[1]->v_offset == 0, test);
+
+               i_stream_unref(&test_inputs[0]);
+               i_stream_unref(&test_inputs[1]);
+               i_stream_unref(&try_input);
+       }
+       i_assert(finished);
+       test_end();
+}
index 583feb3d4fc2455e144721a32a28c84c6dea7ed0..7157ae0701a277b64d09b4721c39a9d6e3ccf2c5 100644 (file)
@@ -36,6 +36,7 @@ int main(void)
                test_istream_multiplex,
                test_istream_seekable,
                test_istream_tee,
+               test_istream_try,
                test_istream_unix,
                test_json_parser,
                test_json_tree,
index b667a090fc4a75ee6c945e6da3759c4f57dfc9f9..cb6efc82490941e37602a7e75bc97369c13f9188 100644 (file)
@@ -37,6 +37,7 @@ void test_istream_failure_at(void);
 void test_istream_multiplex(void);
 void test_istream_seekable(void);
 void test_istream_tee(void);
+void test_istream_try(void);
 void test_istream_unix(void);
 void test_json_parser(void);
 void test_json_tree(void);