]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: iostream-pump: Properly implement running a pump with one of the streams blocking.
authorStephan Bosch <stephan.bosch@dovecot.fi>
Sun, 25 Feb 2018 20:45:17 +0000 (21:45 +0100)
committerAki Tuomi <aki.tuomi@dovecot.fi>
Sun, 18 Mar 2018 10:53:18 +0000 (12:53 +0200)
Having both streams blocking is not useful and that is now explicitly forbidden.

src/lib/iostream-pump.c
src/lib/test-iostream-pump.c

index 85fe8eb405449ac02f9deb935d8e6f27d652ef59..dee0687522af61e3ecb9be019da23c1e917ee7c4 100644 (file)
@@ -51,6 +51,7 @@ static void iostream_pump_copy(struct iostream_pump *pump)
                               pump->context);
                return;
        case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               i_assert(!pump->output->blocking);
                pump->waiting_output = TRUE;
                io_remove(&pump->io);
                return;
@@ -74,6 +75,7 @@ static void iostream_pump_copy(struct iostream_pump *pump)
                }
                return;
        case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               i_assert(!pump->input->blocking);
                pump->waiting_output = FALSE;
                return;
        }
@@ -97,7 +99,9 @@ static int iostream_pump_flush(struct iostream_pump *pump)
                return 1;
        }
 
-       if (pump->io == NULL) {
+       if (pump->input->blocking)
+               iostream_pump_copy(pump);
+       else if (pump->io == NULL) {
                pump->io = io_add_istream(pump->input,
                                          iostream_pump_copy, pump);
                io_set_pending(pump->io);
@@ -112,6 +116,7 @@ iostream_pump_create(struct istream *input, struct ostream *output)
 
        i_assert(input != NULL &&
                 output != NULL);
+       i_assert(!input->blocking || !output->blocking);
 
        /* ref streams */
        i_stream_ref(input);
@@ -132,13 +137,20 @@ void iostream_pump_start(struct iostream_pump *pump)
        i_assert(pump->callback != NULL);
 
        /* add flush handler */
-       o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump);
+       if (!pump->output->blocking) {
+               o_stream_set_flush_callback(pump->output,
+                                           iostream_pump_flush, pump);
+       }
 
        /* make IO objects */
-       pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
-
-       /* make sure we do first read right away */
-       io_set_pending(pump->io);
+       if (pump->input->blocking) {
+               i_assert(!pump->output->blocking);
+               o_stream_set_flush_pending(pump->output, TRUE);
+       } else {
+               pump->io = io_add_istream(pump->input,
+                                         iostream_pump_copy, pump);
+               io_set_pending(pump->io);
+       }
 }
 
 struct istream *iostream_pump_get_input(struct iostream_pump *pump)
index 55522e4073ab12d0f0b98f503f1d8f7cd0285e7d..2a5aea687a8010ed2d41fb0736a24e675b20b30e 100644 (file)
@@ -42,6 +42,8 @@ static void pump_nonblocking_timeout(struct nonblock_ctx *ctx)
                break;
        case 1:
                /* allow more input */
+               if (ctx->in->blocking)
+                       break;
                if (ctx->pos/4 == ctx->max_size+1)
                        test_istream_set_allow_eof(ctx->in, TRUE);
                else
@@ -53,6 +55,8 @@ static void pump_nonblocking_timeout(struct nonblock_ctx *ctx)
        case 3: {
                /* allow more output. give always one byte less than the
                   input size so there's something in internal buffer. */
+               if (ctx->out->blocking)
+                       break;
                size_t size = ctx->pos/4;
                if (size > 0)
                        test_ostream_set_max_output_size(ctx->out, size-1);
@@ -76,7 +80,11 @@ run_pump(struct istream *in, struct ostream *out, int *counter,
                test_assert(i_stream_get_size(in, TRUE, &ctx.max_size) > 0);
                test_istream_set_size(in, 0);
                test_istream_set_allow_eof(in, FALSE);
+       }
+       if (!out->blocking) {
                test_ostream_set_max_output_size(out, 0);
+       }
+       if (!in->blocking || !out->blocking) {
                to2 = timeout_add_short(0, pump_nonblocking_timeout, &ctx);
        }
 
@@ -98,7 +106,7 @@ run_pump(struct istream *in, struct ostream *out, int *counter,
 
        test_assert(*counter == 0);
 
-       if (!ctx.in->blocking && ctx.in->stream_errno != 0 &&
+       if (!ctx.out->blocking && ctx.in->stream_errno != 0 &&
            ctx.out->stream_errno == 0) {
                /* input failed, finish flushing output */
                test_ostream_set_max_output_size(ctx.out, (size_t)-1);
@@ -111,37 +119,39 @@ run_pump(struct istream *in, struct ostream *out, int *counter,
 
        iostream_pump_unref(&pump);
        io_loop_destroy(&ioloop);
-
        return ret;
 }
 
 static void
-test_iostream_setup(bool block, struct istream **in_r,
-                   struct ostream **out_r, buffer_t **out_buffer_r)
+test_iostream_setup(bool in_block, bool out_block,
+                   struct istream **in_r, struct ostream **out_r,
+                   buffer_t **out_buffer_r)
 {
        *out_buffer_r = t_buffer_create(128);
 
        *in_r = test_istream_create_data(data, sizeof(data));
-       (*in_r)->blocking = block;
+       (*in_r)->blocking = in_block;
 
-       if (block)
+       if (out_block)
                *out_r = test_ostream_create(*out_buffer_r);
        else
                *out_r = test_ostream_create_nonblocking(*out_buffer_r, 1);
 }
 
 static void
-test_iostream_pump_simple(bool block)
+test_iostream_pump_simple(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in;
        struct ostream *out;
        buffer_t *buffer;
 
-       test_begin(t_strdup_printf("iostream_pump (%sblocking)",
-                                  block ? "" : "non-"));
+       test_begin(t_strdup_printf("iostream_pump "
+                                  "(in=%sblocking, out=%sblocking)",
+                                  (in_block ? "" : "non-"),
+                                  (out_block ? "" : "non-")));
 
-       test_iostream_setup(block, &in, &out, &buffer);
+       test_iostream_setup(in_block, out_block, &in, &out, &buffer);
        counter = 1;
 
        test_assert(strcmp(run_pump(in, out, &counter, buffer),
@@ -151,7 +161,7 @@ test_iostream_pump_simple(bool block)
 }
 
 static void
-test_iostream_pump_failure_start_read(bool block)
+test_iostream_pump_failure_start_read(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in, *in_2;
@@ -159,9 +169,11 @@ test_iostream_pump_failure_start_read(bool block)
        buffer_t *buffer;
 
        test_begin(t_strdup_printf("iostream_pump failure start-read "
-                                  "(%sblocking)", block ? "" : "non-"));
+                                  "(in=%sblocking, out=%sblocking)",
+                                  (in_block ? "" : "non-"),
+                                  (out_block ? "" : "non-")));
 
-       test_iostream_setup(block, &in_2, &out, &buffer);
+       test_iostream_setup(in_block, out_block, &in_2, &out, &buffer);
        in = i_stream_create_failure_at(in_2, 0, EIO, "test pump fail");
        i_stream_unref(&in_2);
        counter = 2;
@@ -171,7 +183,7 @@ test_iostream_pump_failure_start_read(bool block)
 }
 
 static void
-test_iostream_pump_failure_mid_read(bool block)
+test_iostream_pump_failure_mid_read(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in, *in_2;
@@ -179,9 +191,11 @@ test_iostream_pump_failure_mid_read(bool block)
        buffer_t *buffer;
 
        test_begin(t_strdup_printf("iostream_pump failure mid-read "
-                                  "(%sblocking)", block ? "" : "non-"));
+                                  "(in=%sblocking, out=%sblocking)",
+                                  (in_block ? "" : "non-"),
+                                  (out_block ? "" : "non-")));
 
-       test_iostream_setup(block, &in_2, &out, &buffer);
+       test_iostream_setup(in_block, out_block, &in_2, &out, &buffer);
        in = i_stream_create_failure_at(in_2, 4, EIO, "test pump fail");
        i_stream_unref(&in_2);
        counter = 2;
@@ -191,7 +205,7 @@ test_iostream_pump_failure_mid_read(bool block)
 }
 
 static void
-test_iostream_pump_failure_end_read(bool block)
+test_iostream_pump_failure_end_read(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in, *in_2;
@@ -199,9 +213,11 @@ test_iostream_pump_failure_end_read(bool block)
        buffer_t *buffer;
 
        test_begin(t_strdup_printf("iostream_pump failure mid-read "
-                  "(%sblocking)", block ? "" : "non-"));
+                                  "(in=%sblocking, out=%sblocking)",
+                                  (in_block ? "" : "non-"),
+                                  (out_block ? "" : "non-")));
 
-       test_iostream_setup(block, &in_2, &out, &buffer);
+       test_iostream_setup(in_block, out_block, &in_2, &out, &buffer);
        in = i_stream_create_failure_at_eof(in_2, EIO, "test pump fail");
        i_stream_unref(&in_2);
        counter = 2;
@@ -212,7 +228,7 @@ test_iostream_pump_failure_end_read(bool block)
 }
 
 static void
-test_iostream_pump_failure_start_write(bool block)
+test_iostream_pump_failure_start_write(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in;
@@ -220,9 +236,11 @@ test_iostream_pump_failure_start_write(bool block)
        buffer_t *buffer;
 
        test_begin(t_strdup_printf("iostream_pump failure start-write "
-                                  "(%sblocking)", block ? "" : "non-"));
+                                  "(in=%sblocking, out=%sblocking)",
+                                  (in_block ? "" : "non-"),
+                                  (out_block ? "" : "non-")));
 
-       test_iostream_setup(block, &in, &out_2, &buffer);
+       test_iostream_setup(in_block, out_block, &in, &out_2, &buffer);
        out = o_stream_create_failure_at(out_2, 0, "test pump fail");
        o_stream_unref(&out_2);
        counter = 2;
@@ -232,7 +250,7 @@ test_iostream_pump_failure_start_write(bool block)
 }
 
 static void
-test_iostream_pump_failure_mid_write(bool block)
+test_iostream_pump_failure_mid_write(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in;
@@ -240,36 +258,39 @@ test_iostream_pump_failure_mid_write(bool block)
        buffer_t *buffer;
 
        test_begin(t_strdup_printf("iostream_pump failure mid-write "
-                                  "(%sblocking)", block ? "" : "non-"));
+                                  "(in=%sblocking, out=%sblocking)",
+                                  (in_block ? "" : "non-"),
+                                  (out_block ? "" : "non-")));
 
-       test_iostream_setup(block, &in, &out_2, &buffer);
+       test_iostream_setup(in_block, out_block, &in, &out_2, &buffer);
        out = o_stream_create_failure_at(out_2, 4, "test pump fail");
        o_stream_unref(&out_2);
        counter = 2;
 
        /* "hel" because the last byte is only in internal buffer */
        test_assert(strcmp(run_pump(in, out, &counter, buffer),
-                          (block ? "" : "hel")) == 0);
+                          (out_block ? (in_block ? "" : "hell") :
+                                       "hel")) == 0);
 
        test_end();
 }
 
 static void
-test_iostream_pump_failure_end_write(bool block)
+test_iostream_pump_failure_end_write(bool in_block, bool out_block)
 {
        int counter;
        struct istream *in;
        struct ostream *out, *out_2;
        buffer_t *buffer;
 
-       if (!block) {
+       if (!out_block || !in_block) {
                /* we'll get flushes constantly */
                return;
        }
 
        test_begin("iostream_pump failure end-write (blocking)");
 
-       test_iostream_setup(block, &in, &out_2, &buffer);
+       test_iostream_setup(in_block, out_block, &in, &out_2, &buffer);
        out = o_stream_create_failure_at_flush(out_2, "test pump fail");
        o_stream_unref(&out_2);
        counter = 2;
@@ -279,17 +300,26 @@ test_iostream_pump_failure_end_write(bool block)
        test_end();
 }
 
+static void
+test_iostream_pump_real(void)
+{
+       for(int i = 0; i < 3; i++) {
+               bool in_block = ((i & BIT(0)) != 0); 
+               bool out_block = ((i & BIT(1)) != 0);
+
+               test_iostream_pump_simple(in_block, out_block);
+               test_iostream_pump_failure_start_read(in_block, out_block);
+               test_iostream_pump_failure_mid_read(in_block, out_block);
+               test_iostream_pump_failure_end_read(in_block, out_block);
+               test_iostream_pump_failure_start_write(in_block, out_block);
+               test_iostream_pump_failure_mid_write(in_block, out_block);
+               test_iostream_pump_failure_end_write(in_block, out_block);
+       }
+}
+
 void test_iostream_pump(void)
 {
        T_BEGIN {
-               for(int i = 0; i < 2; i++) {
-                       test_iostream_pump_simple(i < 1);
-                       test_iostream_pump_failure_start_read(i < 1);
-                       test_iostream_pump_failure_mid_read(i < 1);
-                       test_iostream_pump_failure_end_read(i < 1);
-                       test_iostream_pump_failure_start_write(i < 1);
-                       test_iostream_pump_failure_mid_write(i < 1);
-                       test_iostream_pump_failure_end_write(i < 1);
-               }
+               test_iostream_pump_real();
        } T_END;
 }