From: Timo Sirainen Date: Tue, 14 Apr 2020 12:38:56 +0000 (+0300) Subject: lib: ostream-multiplex - Fix flush callback handling X-Git-Tag: 2.3.11.2~228 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=aac7ba91b7837d6f8930f0406ed01736cefbc56c;p=thirdparty%2Fdovecot%2Fcore.git lib: ostream-multiplex - Fix flush callback handling Using flush-callbacks didn't work very well. Even if it wasn't overridden, it could have caused a hang if there was data left inside the internal buffer that couldn't be flushed to parent ostream. At the end of the ostream there was nothing that triggered flushing the internal buffer. --- diff --git a/src/lib/ostream-multiplex.c b/src/lib/ostream-multiplex.c index 3f5bc0a9d5..b248995fdb 100644 --- a/src/lib/ostream-multiplex.c +++ b/src/lib/ostream-multiplex.c @@ -23,6 +23,9 @@ struct multiplex_ochannel { struct multiplex_ostream { struct ostream *parent; + stream_flush_callback_t *old_flush_callback; + void *old_flush_context; + /* channel 0 is main channel */ uint8_t cur_channel; unsigned int remain; @@ -70,17 +73,20 @@ static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mst return channel; } -static void +static bool o_stream_multiplex_sendv(struct multiplex_ostream *mstream) { struct multiplex_ochannel *channel; ssize_t ret = 0; + bool all_sent = TRUE; while((channel = get_next_channel(mstream)) != NULL) { if (channel->buf->used == 0) continue; - if (o_stream_get_buffer_avail_size(mstream->parent) < 6) + if (o_stream_get_buffer_avail_size(mstream->parent) < 6) { + all_sent = FALSE; break; + } /* check parent stream capacity */ size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5; /* ensure it fits into 32 bit int */ @@ -106,6 +112,33 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream) } if (o_stream_is_corked(mstream->parent)) o_stream_uncork(mstream->parent); + return all_sent; +} + +static int o_stream_multiplex_flush(struct multiplex_ostream *mstream) +{ + int ret = o_stream_flush(mstream->parent); + if (ret >= 0) { + if (!o_stream_multiplex_sendv(mstream)) + ret = 0; + } + if (ret <= 0) + return ret; + + /* Everything is flushed. See if one of the callbacks' flush callbacks + wants to write more data. */ + struct multiplex_ochannel **channelp; + bool unfinished = FALSE; + array_foreach_modifiable(&mstream->channels, channelp) { + if (*channelp != NULL && (*channelp)->ostream.callback != NULL) { + ret = (*channelp)->ostream.callback((*channelp)->ostream.context); + if (ret < 0) + return -1; + if (ret == 0) + unfinished = TRUE; + } + } + return unfinished ? 0 : 1; } static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream) @@ -179,6 +212,16 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, return total; } +static void +o_stream_multiplex_ochannel_set_flush_callback(struct ostream_private *stream, + stream_flush_callback_t *callback, + void *context) +{ + /* We have overwritten our parent's flush-callback. Don't change it. */ + stream->callback = callback; + stream->context = context; +} + static size_t o_stream_multiplex_ochannel_get_buffer_used_size(const struct ostream_private *stream) { @@ -224,6 +267,12 @@ static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream) array_foreach_modifiable(&mstream->channels, channelp) if (*channelp != NULL) return; + + i_assert(mstream->parent->real_stream->callback == + (stream_flush_callback_t *)o_stream_multiplex_flush); + o_stream_set_flush_callback(mstream->parent, + *mstream->old_flush_callback, + mstream->old_flush_context); o_stream_unref(&mstream->parent); array_free(&mstream->channels); i_free(mstream); @@ -256,6 +305,8 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) channel->ostream.cork = o_stream_multiplex_ochannel_cork; channel->ostream.flush = o_stream_multiplex_ochannel_flush; channel->ostream.sendv = o_stream_multiplex_ochannel_sendv; + channel->ostream.set_flush_callback = + o_stream_multiplex_ochannel_set_flush_callback; channel->ostream.get_buffer_used_size = o_stream_multiplex_ochannel_get_buffer_used_size; channel->ostream.get_buffer_avail_size = @@ -265,7 +316,13 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) channel->ostream.fd = o_stream_get_fd(mstream->parent); array_push_back(&channel->mstream->channels, &channel); - return o_stream_create(&channel->ostream, mstream->parent, -1); + (void)o_stream_create(&channel->ostream, mstream->parent, -1); + /* o_stream_create() defaults the flush_callback to parent's callback. + Here it points to o_stream_multiplex_flush(), which just causes + infinite looping. */ + channel->ostream.callback = NULL; + channel->ostream.context = NULL; + return &channel->ostream.ostream; } struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid) @@ -284,6 +341,9 @@ struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize mstream = i_new(struct multiplex_ostream, 1); mstream->parent = parent; mstream->bufsize = bufsize; + mstream->old_flush_callback = parent->real_stream->callback; + mstream->old_flush_context = parent->real_stream->context; + o_stream_set_flush_callback(parent, o_stream_multiplex_flush, mstream); i_array_init(&mstream->channels, 8); o_stream_ref(parent); diff --git a/src/lib/test-ostream-multiplex.c b/src/lib/test-ostream-multiplex.c index 20feab98aa..96d798a97a 100644 --- a/src/lib/test-ostream-multiplex.c +++ b/src/lib/test-ostream-multiplex.c @@ -5,6 +5,7 @@ #include "str.h" #include "istream.h" #include "ostream-private.h" +#include "istream-multiplex.h" #include "ostream-multiplex.h" #include "ostream.h" #include @@ -181,9 +182,223 @@ static void test_ostream_multiplex_cork(void) test_end(); } +struct test_hang_context { + struct istream *input1, *input2; + size_t sent_bytes, sent2_bytes; + size_t read_bytes, read2_bytes; +}; + +static void test_hang_input(struct test_hang_context *ctx) +{ + ssize_t ret, ret2; + + do { + ret = i_stream_read(ctx->input1); + if (ret > 0) { + i_stream_skip(ctx->input1, ret); + ctx->read_bytes += ret; + } + ret2 = i_stream_read(ctx->input2); + if (ret2 > 0) { + i_stream_skip(ctx->input2, ret2); + ctx->read2_bytes += ret2; + } + } while (ret > 0 || ret2 > 0); + + test_assert(ret == 0 && ret2 == 0); + if (ctx->read_bytes == ctx->sent_bytes && + ctx->read2_bytes == ctx->sent2_bytes) + io_loop_stop(current_ioloop); +} + +static void test_ostream_multiplex_hang(void) +{ + int fd[2]; + + test_begin("ostream multiplex hang"); + if (pipe(fd) < 0) + i_fatal("pipe() failed: %m"); + fd_set_nonblock(fd[0], TRUE); + fd_set_nonblock(fd[1], TRUE); + + struct ioloop *ioloop = io_loop_create(); + struct ostream *file_output = o_stream_create_fd(fd[1], 1024); + o_stream_set_no_error_handling(file_output, TRUE); + struct ostream *channel = o_stream_create_multiplex(file_output, 4096); + struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1); + char buf[256]; + + /* send multiplex output until the buffer is full */ + ssize_t ret, ret2; + size_t sent_bytes = 0, sent2_bytes = 0; + i_zero(&buf); + o_stream_cork(channel); + o_stream_cork(channel2); + while ((ret = o_stream_send(channel, buf, sizeof(buf))) > 0) { + sent_bytes += ret; + ret2 = o_stream_send(channel2, buf, sizeof(buf)); + if (ret2 <= 0) + break; + sent2_bytes += ret2; + } + test_assert(o_stream_finish(channel) == 0); + test_assert(o_stream_finish(channel2) == 0); + o_stream_uncork(channel); + o_stream_uncork(channel2); + /* We expect the first channel to have data buffered */ + test_assert(o_stream_get_buffer_used_size(channel) >= + o_stream_get_buffer_used_size(file_output)); + test_assert(o_stream_get_buffer_used_size(channel) - + o_stream_get_buffer_used_size(file_output) > 0); + + /* read everything that was already sent */ + struct istream *file_input = i_stream_create_fd(fd[0], 1024); + struct istream *input = i_stream_create_multiplex(file_input, 4096); + struct istream *input2 = i_stream_multiplex_add_channel(input, 1); + + struct test_hang_context ctx = { + .input1 = input, + .input2 = input2, + .sent_bytes = sent_bytes, + .sent2_bytes = sent2_bytes, + }; + + struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop); + struct io *io = io_add_istream(file_input, test_hang_input, &ctx); + io_loop_run(ioloop); + io_remove(&io); + timeout_remove(&to); + + /* everything that was sent should have been received now. + ostream-multiplex's internal buffer is also supposed to have + been sent. */ + test_assert(input->v_offset == sent_bytes); + test_assert(input2->v_offset == sent2_bytes); + test_assert(o_stream_get_buffer_used_size(channel) == 0); + test_assert(o_stream_get_buffer_used_size(channel2) == 0); + + i_stream_unref(&file_input); + i_stream_unref(&input); + i_stream_unref(&input2); + o_stream_unref(&channel); + o_stream_unref(&channel2); + o_stream_unref(&file_output); + io_loop_destroy(&ioloop); + test_end(); +} + +#define FLUSH_CALLBACK_TOTAL_BYTES 10240 + +struct test_flush_context { + struct ostream *output1, *output2; + struct istream *input1, *input2; +}; + +static int flush_callback1(struct test_flush_context *ctx) +{ + char buf[32]; + + i_assert(ctx->output1->offset <= FLUSH_CALLBACK_TOTAL_BYTES); + size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output1->offset; + + memset(buf, '1', sizeof(buf)); + if (o_stream_send(ctx->output1, buf, I_MIN(sizeof(buf), bytes_left)) < 0) + return -1; + return ctx->output1->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1; +} + +static int flush_callback2(struct test_flush_context *ctx) +{ + char buf[64]; + + i_assert(ctx->output2->offset <= FLUSH_CALLBACK_TOTAL_BYTES); + size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output2->offset; + + memset(buf, '2', sizeof(buf)); + if (o_stream_send(ctx->output2, buf, I_MIN(sizeof(buf), bytes_left)) < 0) + return -1; + return ctx->output2->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1; +} + +static void test_flush_input(struct test_flush_context *ctx) +{ + ssize_t ret, ret2; + + do { + ret = i_stream_read(ctx->input1); + if (ret > 0) + i_stream_skip(ctx->input1, ret); + ret2 = i_stream_read(ctx->input2); + if (ret2 > 0) + i_stream_skip(ctx->input2, ret2); + } while (ret > 0 || ret2 > 0); + + test_assert(ret == 0 && ret2 == 0); + if (ctx->input1->v_offset == FLUSH_CALLBACK_TOTAL_BYTES && + ctx->input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES) + io_loop_stop(current_ioloop); +} + +static void test_ostream_multiplex_flush_callback(void) +{ + int fd[2]; + + test_begin("ostream multiplex flush callback"); + if (pipe(fd) < 0) + i_fatal("pipe() failed: %m"); + fd_set_nonblock(fd[0], TRUE); + fd_set_nonblock(fd[1], TRUE); + + struct ioloop *ioloop = io_loop_create(); + struct ostream *file_output = o_stream_create_fd(fd[1], 1024); + o_stream_set_no_error_handling(file_output, TRUE); + struct ostream *channel = o_stream_create_multiplex(file_output, 4096); + struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1); + + struct istream *file_input = i_stream_create_fd(fd[0], 1024); + struct istream *input = i_stream_create_multiplex(file_input, 4096); + struct istream *input2 = i_stream_multiplex_add_channel(input, 1); + + struct test_flush_context ctx = { + .output1 = channel, + .output2 = channel2, + .input1 = input, + .input2 = input2, + }; + o_stream_set_flush_callback(channel, flush_callback1, &ctx); + o_stream_set_flush_callback(channel2, flush_callback2, &ctx); + o_stream_set_flush_pending(channel, TRUE); + o_stream_set_flush_pending(channel2, TRUE); + + struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop); + struct io *io = io_add_istream(file_input, test_flush_input, &ctx); + io_loop_run(ioloop); + io_remove(&io); + timeout_remove(&to); + + test_assert(channel->offset == FLUSH_CALLBACK_TOTAL_BYTES); + test_assert(channel2->offset == FLUSH_CALLBACK_TOTAL_BYTES); + test_assert(input->v_offset == FLUSH_CALLBACK_TOTAL_BYTES); + test_assert(input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES); + + test_assert(o_stream_finish(channel) == 1); + test_assert(o_stream_finish(channel2) == 1); + + i_stream_unref(&file_input); + i_stream_unref(&input); + i_stream_unref(&input2); + o_stream_unref(&channel); + o_stream_unref(&channel2); + o_stream_unref(&file_output); + io_loop_destroy(&ioloop); + test_end(); +} + void test_ostream_multiplex(void) { test_ostream_multiplex_simple(); test_ostream_multiplex_stream(); test_ostream_multiplex_cork(); + test_ostream_multiplex_hang(); + test_ostream_multiplex_flush_callback(); }