From 57b574c5c7920fa83109971ddf894b4751ba1f49 Mon Sep 17 00:00:00 2001 From: Stephan Bosch Date: Sun, 25 Feb 2018 21:45:17 +0100 Subject: [PATCH] lib: iostream-pump: Properly implement running a pump with one of the streams blocking. Having both streams blocking is not useful and that is now explicitly forbidden. --- src/lib/iostream-pump.c | 24 ++++++-- src/lib/test-iostream-pump.c | 106 ++++++++++++++++++++++------------- 2 files changed, 86 insertions(+), 44 deletions(-) diff --git a/src/lib/iostream-pump.c b/src/lib/iostream-pump.c index 85fe8eb405..dee0687522 100644 --- a/src/lib/iostream-pump.c +++ b/src/lib/iostream-pump.c @@ -51,6 +51,7 @@ static void iostream_pump_copy(struct iostream_pump *pump) pump->context); return; case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + i_assert(!pump->output->blocking); pump->waiting_output = TRUE; io_remove(&pump->io); return; @@ -74,6 +75,7 @@ static void iostream_pump_copy(struct iostream_pump *pump) } return; case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + i_assert(!pump->input->blocking); pump->waiting_output = FALSE; return; } @@ -97,7 +99,9 @@ static int iostream_pump_flush(struct iostream_pump *pump) return 1; } - if (pump->io == NULL) { + if (pump->input->blocking) + iostream_pump_copy(pump); + else if (pump->io == NULL) { pump->io = io_add_istream(pump->input, iostream_pump_copy, pump); io_set_pending(pump->io); @@ -112,6 +116,7 @@ iostream_pump_create(struct istream *input, struct ostream *output) i_assert(input != NULL && output != NULL); + i_assert(!input->blocking || !output->blocking); /* ref streams */ i_stream_ref(input); @@ -132,13 +137,20 @@ void iostream_pump_start(struct iostream_pump *pump) i_assert(pump->callback != NULL); /* add flush handler */ - o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump); + if (!pump->output->blocking) { + o_stream_set_flush_callback(pump->output, + iostream_pump_flush, pump); + } /* make IO objects */ - pump->io = io_add_istream(pump->input, iostream_pump_copy, pump); - - /* make sure we do first read right away */ - io_set_pending(pump->io); + if (pump->input->blocking) { + i_assert(!pump->output->blocking); + o_stream_set_flush_pending(pump->output, TRUE); + } else { + pump->io = io_add_istream(pump->input, + iostream_pump_copy, pump); + io_set_pending(pump->io); + } } struct istream *iostream_pump_get_input(struct iostream_pump *pump) diff --git a/src/lib/test-iostream-pump.c b/src/lib/test-iostream-pump.c index 55522e4073..2a5aea687a 100644 --- a/src/lib/test-iostream-pump.c +++ b/src/lib/test-iostream-pump.c @@ -42,6 +42,8 @@ static void pump_nonblocking_timeout(struct nonblock_ctx *ctx) break; case 1: /* allow more input */ + if (ctx->in->blocking) + break; if (ctx->pos/4 == ctx->max_size+1) test_istream_set_allow_eof(ctx->in, TRUE); else @@ -53,6 +55,8 @@ static void pump_nonblocking_timeout(struct nonblock_ctx *ctx) case 3: { /* allow more output. give always one byte less than the input size so there's something in internal buffer. */ + if (ctx->out->blocking) + break; size_t size = ctx->pos/4; if (size > 0) test_ostream_set_max_output_size(ctx->out, size-1); @@ -76,7 +80,11 @@ run_pump(struct istream *in, struct ostream *out, int *counter, test_assert(i_stream_get_size(in, TRUE, &ctx.max_size) > 0); test_istream_set_size(in, 0); test_istream_set_allow_eof(in, FALSE); + } + if (!out->blocking) { test_ostream_set_max_output_size(out, 0); + } + if (!in->blocking || !out->blocking) { to2 = timeout_add_short(0, pump_nonblocking_timeout, &ctx); } @@ -98,7 +106,7 @@ run_pump(struct istream *in, struct ostream *out, int *counter, test_assert(*counter == 0); - if (!ctx.in->blocking && ctx.in->stream_errno != 0 && + if (!ctx.out->blocking && ctx.in->stream_errno != 0 && ctx.out->stream_errno == 0) { /* input failed, finish flushing output */ test_ostream_set_max_output_size(ctx.out, (size_t)-1); @@ -111,37 +119,39 @@ run_pump(struct istream *in, struct ostream *out, int *counter, iostream_pump_unref(&pump); io_loop_destroy(&ioloop); - return ret; } static void -test_iostream_setup(bool block, struct istream **in_r, - struct ostream **out_r, buffer_t **out_buffer_r) +test_iostream_setup(bool in_block, bool out_block, + struct istream **in_r, struct ostream **out_r, + buffer_t **out_buffer_r) { *out_buffer_r = t_buffer_create(128); *in_r = test_istream_create_data(data, sizeof(data)); - (*in_r)->blocking = block; + (*in_r)->blocking = in_block; - if (block) + if (out_block) *out_r = test_ostream_create(*out_buffer_r); else *out_r = test_ostream_create_nonblocking(*out_buffer_r, 1); } static void -test_iostream_pump_simple(bool block) +test_iostream_pump_simple(bool in_block, bool out_block) { int counter; struct istream *in; struct ostream *out; buffer_t *buffer; - test_begin(t_strdup_printf("iostream_pump (%sblocking)", - block ? "" : "non-")); + test_begin(t_strdup_printf("iostream_pump " + "(in=%sblocking, out=%sblocking)", + (in_block ? "" : "non-"), + (out_block ? "" : "non-"))); - test_iostream_setup(block, &in, &out, &buffer); + test_iostream_setup(in_block, out_block, &in, &out, &buffer); counter = 1; test_assert(strcmp(run_pump(in, out, &counter, buffer), @@ -151,7 +161,7 @@ test_iostream_pump_simple(bool block) } static void -test_iostream_pump_failure_start_read(bool block) +test_iostream_pump_failure_start_read(bool in_block, bool out_block) { int counter; struct istream *in, *in_2; @@ -159,9 +169,11 @@ test_iostream_pump_failure_start_read(bool block) buffer_t *buffer; test_begin(t_strdup_printf("iostream_pump failure start-read " - "(%sblocking)", block ? "" : "non-")); + "(in=%sblocking, out=%sblocking)", + (in_block ? "" : "non-"), + (out_block ? "" : "non-"))); - test_iostream_setup(block, &in_2, &out, &buffer); + test_iostream_setup(in_block, out_block, &in_2, &out, &buffer); in = i_stream_create_failure_at(in_2, 0, EIO, "test pump fail"); i_stream_unref(&in_2); counter = 2; @@ -171,7 +183,7 @@ test_iostream_pump_failure_start_read(bool block) } static void -test_iostream_pump_failure_mid_read(bool block) +test_iostream_pump_failure_mid_read(bool in_block, bool out_block) { int counter; struct istream *in, *in_2; @@ -179,9 +191,11 @@ test_iostream_pump_failure_mid_read(bool block) buffer_t *buffer; test_begin(t_strdup_printf("iostream_pump failure mid-read " - "(%sblocking)", block ? "" : "non-")); + "(in=%sblocking, out=%sblocking)", + (in_block ? "" : "non-"), + (out_block ? "" : "non-"))); - test_iostream_setup(block, &in_2, &out, &buffer); + test_iostream_setup(in_block, out_block, &in_2, &out, &buffer); in = i_stream_create_failure_at(in_2, 4, EIO, "test pump fail"); i_stream_unref(&in_2); counter = 2; @@ -191,7 +205,7 @@ test_iostream_pump_failure_mid_read(bool block) } static void -test_iostream_pump_failure_end_read(bool block) +test_iostream_pump_failure_end_read(bool in_block, bool out_block) { int counter; struct istream *in, *in_2; @@ -199,9 +213,11 @@ test_iostream_pump_failure_end_read(bool block) buffer_t *buffer; test_begin(t_strdup_printf("iostream_pump failure mid-read " - "(%sblocking)", block ? "" : "non-")); + "(in=%sblocking, out=%sblocking)", + (in_block ? "" : "non-"), + (out_block ? "" : "non-"))); - test_iostream_setup(block, &in_2, &out, &buffer); + test_iostream_setup(in_block, out_block, &in_2, &out, &buffer); in = i_stream_create_failure_at_eof(in_2, EIO, "test pump fail"); i_stream_unref(&in_2); counter = 2; @@ -212,7 +228,7 @@ test_iostream_pump_failure_end_read(bool block) } static void -test_iostream_pump_failure_start_write(bool block) +test_iostream_pump_failure_start_write(bool in_block, bool out_block) { int counter; struct istream *in; @@ -220,9 +236,11 @@ test_iostream_pump_failure_start_write(bool block) buffer_t *buffer; test_begin(t_strdup_printf("iostream_pump failure start-write " - "(%sblocking)", block ? "" : "non-")); + "(in=%sblocking, out=%sblocking)", + (in_block ? "" : "non-"), + (out_block ? "" : "non-"))); - test_iostream_setup(block, &in, &out_2, &buffer); + test_iostream_setup(in_block, out_block, &in, &out_2, &buffer); out = o_stream_create_failure_at(out_2, 0, "test pump fail"); o_stream_unref(&out_2); counter = 2; @@ -232,7 +250,7 @@ test_iostream_pump_failure_start_write(bool block) } static void -test_iostream_pump_failure_mid_write(bool block) +test_iostream_pump_failure_mid_write(bool in_block, bool out_block) { int counter; struct istream *in; @@ -240,36 +258,39 @@ test_iostream_pump_failure_mid_write(bool block) buffer_t *buffer; test_begin(t_strdup_printf("iostream_pump failure mid-write " - "(%sblocking)", block ? "" : "non-")); + "(in=%sblocking, out=%sblocking)", + (in_block ? "" : "non-"), + (out_block ? "" : "non-"))); - test_iostream_setup(block, &in, &out_2, &buffer); + test_iostream_setup(in_block, out_block, &in, &out_2, &buffer); out = o_stream_create_failure_at(out_2, 4, "test pump fail"); o_stream_unref(&out_2); counter = 2; /* "hel" because the last byte is only in internal buffer */ test_assert(strcmp(run_pump(in, out, &counter, buffer), - (block ? "" : "hel")) == 0); + (out_block ? (in_block ? "" : "hell") : + "hel")) == 0); test_end(); } static void -test_iostream_pump_failure_end_write(bool block) +test_iostream_pump_failure_end_write(bool in_block, bool out_block) { int counter; struct istream *in; struct ostream *out, *out_2; buffer_t *buffer; - if (!block) { + if (!out_block || !in_block) { /* we'll get flushes constantly */ return; } test_begin("iostream_pump failure end-write (blocking)"); - test_iostream_setup(block, &in, &out_2, &buffer); + test_iostream_setup(in_block, out_block, &in, &out_2, &buffer); out = o_stream_create_failure_at_flush(out_2, "test pump fail"); o_stream_unref(&out_2); counter = 2; @@ -279,17 +300,26 @@ test_iostream_pump_failure_end_write(bool block) test_end(); } +static void +test_iostream_pump_real(void) +{ + for(int i = 0; i < 3; i++) { + bool in_block = ((i & BIT(0)) != 0); + bool out_block = ((i & BIT(1)) != 0); + + test_iostream_pump_simple(in_block, out_block); + test_iostream_pump_failure_start_read(in_block, out_block); + test_iostream_pump_failure_mid_read(in_block, out_block); + test_iostream_pump_failure_end_read(in_block, out_block); + test_iostream_pump_failure_start_write(in_block, out_block); + test_iostream_pump_failure_mid_write(in_block, out_block); + test_iostream_pump_failure_end_write(in_block, out_block); + } +} + void test_iostream_pump(void) { T_BEGIN { - for(int i = 0; i < 2; i++) { - test_iostream_pump_simple(i < 1); - test_iostream_pump_failure_start_read(i < 1); - test_iostream_pump_failure_mid_read(i < 1); - test_iostream_pump_failure_end_read(i < 1); - test_iostream_pump_failure_start_write(i < 1); - test_iostream_pump_failure_mid_write(i < 1); - test_iostream_pump_failure_end_write(i < 1); - } + test_iostream_pump_real(); } T_END; } -- 2.47.3