]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: ostream-multiplex - Fix flush callback handling
authorTimo Sirainen <timo.sirainen@open-xchange.com>
Tue, 14 Apr 2020 12:38:56 +0000 (15:38 +0300)
committertimo.sirainen <timo.sirainen@open-xchange.com>
Fri, 24 Apr 2020 08:00:42 +0000 (08:00 +0000)
Using flush-callbacks didn't work very well. Even if it wasn't overridden,
it could have caused a hang if there was data left inside the internal
buffer that couldn't be flushed to parent ostream. At the end of the
ostream there was nothing that triggered flushing the internal buffer.

src/lib/ostream-multiplex.c
src/lib/test-ostream-multiplex.c

index 3f5bc0a9d551ecd10eaacd22b4580888fc1535d3..b248995fdbf4553dc05ee20d8005931087287377 100644 (file)
@@ -23,6 +23,9 @@ struct multiplex_ochannel {
 struct multiplex_ostream {
        struct ostream *parent;
 
+       stream_flush_callback_t *old_flush_callback;
+       void *old_flush_context;
+
        /* channel 0 is main channel */
        uint8_t cur_channel;
        unsigned int remain;
@@ -70,17 +73,20 @@ static struct multiplex_ochannel *get_next_channel(struct multiplex_ostream *mst
        return channel;
 }
 
-static void
+static bool
 o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
 {
        struct multiplex_ochannel *channel;
        ssize_t ret = 0;
+       bool all_sent = TRUE;
 
        while((channel = get_next_channel(mstream)) != NULL) {
                if (channel->buf->used == 0)
                        continue;
-               if (o_stream_get_buffer_avail_size(mstream->parent) < 6)
+               if (o_stream_get_buffer_avail_size(mstream->parent) < 6) {
+                       all_sent = FALSE;
                        break;
+               }
                /* check parent stream capacity */
                size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5;
                /* ensure it fits into 32 bit int */
@@ -106,6 +112,33 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
        }
        if (o_stream_is_corked(mstream->parent))
                o_stream_uncork(mstream->parent);
+       return all_sent;
+}
+
+static int o_stream_multiplex_flush(struct multiplex_ostream *mstream)
+{
+       int ret = o_stream_flush(mstream->parent);
+       if (ret >= 0) {
+               if (!o_stream_multiplex_sendv(mstream))
+                       ret = 0;
+       }
+       if (ret <= 0)
+               return ret;
+
+       /* Everything is flushed. See if one of the callbacks' flush callbacks
+          wants to write more data. */
+       struct multiplex_ochannel **channelp;
+       bool unfinished = FALSE;
+       array_foreach_modifiable(&mstream->channels, channelp) {
+               if (*channelp != NULL && (*channelp)->ostream.callback != NULL) {
+                       ret = (*channelp)->ostream.callback((*channelp)->ostream.context);
+                       if (ret < 0)
+                               return -1;
+                       if (ret == 0)
+                               unfinished = TRUE;
+               }
+       }
+       return unfinished ? 0 : 1;
 }
 
 static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream)
@@ -179,6 +212,16 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
        return total;
 }
 
+static void
+o_stream_multiplex_ochannel_set_flush_callback(struct ostream_private *stream,
+                                              stream_flush_callback_t *callback,
+                                              void *context)
+{
+       /* We have overwritten our parent's flush-callback. Don't change it. */
+       stream->callback = callback;
+       stream->context = context;
+}
+
 static size_t
 o_stream_multiplex_ochannel_get_buffer_used_size(const struct ostream_private *stream)
 {
@@ -224,6 +267,12 @@ static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
        array_foreach_modifiable(&mstream->channels, channelp)
                if (*channelp != NULL)
                        return;
+
+       i_assert(mstream->parent->real_stream->callback ==
+                (stream_flush_callback_t *)o_stream_multiplex_flush);
+       o_stream_set_flush_callback(mstream->parent,
+                                   *mstream->old_flush_callback,
+                                   mstream->old_flush_context);
        o_stream_unref(&mstream->parent);
        array_free(&mstream->channels);
        i_free(mstream);
@@ -256,6 +305,8 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
        channel->ostream.cork = o_stream_multiplex_ochannel_cork;
        channel->ostream.flush = o_stream_multiplex_ochannel_flush;
        channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
+       channel->ostream.set_flush_callback =
+               o_stream_multiplex_ochannel_set_flush_callback;
        channel->ostream.get_buffer_used_size =
                o_stream_multiplex_ochannel_get_buffer_used_size;
        channel->ostream.get_buffer_avail_size =
@@ -265,7 +316,13 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
        channel->ostream.fd = o_stream_get_fd(mstream->parent);
        array_push_back(&channel->mstream->channels, &channel);
 
-       return o_stream_create(&channel->ostream, mstream->parent, -1);
+       (void)o_stream_create(&channel->ostream, mstream->parent, -1);
+       /* o_stream_create() defaults the flush_callback to parent's callback.
+          Here it points to o_stream_multiplex_flush(), which just causes
+          infinite looping. */
+       channel->ostream.callback = NULL;
+       channel->ostream.context = NULL;
+       return &channel->ostream.ostream;
 }
 
 struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
@@ -284,6 +341,9 @@ struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize
        mstream = i_new(struct multiplex_ostream, 1);
        mstream->parent = parent;
        mstream->bufsize = bufsize;
+       mstream->old_flush_callback = parent->real_stream->callback;
+       mstream->old_flush_context = parent->real_stream->context;
+       o_stream_set_flush_callback(parent, o_stream_multiplex_flush, mstream);
        i_array_init(&mstream->channels, 8);
        o_stream_ref(parent);
 
index 20feab98aa77cb01c317fd653373dd0be5bd6a68..96d798a97a3561b5bc73b3fabcdb7b379fb98edc 100644 (file)
@@ -5,6 +5,7 @@
 #include "str.h"
 #include "istream.h"
 #include "ostream-private.h"
+#include "istream-multiplex.h"
 #include "ostream-multiplex.h"
 #include "ostream.h"
 #include <unistd.h>
@@ -181,9 +182,223 @@ static void test_ostream_multiplex_cork(void)
        test_end();
 }
 
+struct test_hang_context {
+       struct istream *input1, *input2;
+       size_t sent_bytes, sent2_bytes;
+       size_t read_bytes, read2_bytes;
+};
+
+static void test_hang_input(struct test_hang_context *ctx)
+{
+       ssize_t ret, ret2;
+
+       do {
+               ret = i_stream_read(ctx->input1);
+               if (ret > 0) {
+                       i_stream_skip(ctx->input1, ret);
+                       ctx->read_bytes += ret;
+               }
+               ret2 = i_stream_read(ctx->input2);
+               if (ret2 > 0) {
+                       i_stream_skip(ctx->input2, ret2);
+                       ctx->read2_bytes += ret2;
+               }
+       } while (ret > 0 || ret2 > 0);
+
+       test_assert(ret == 0 && ret2 == 0);
+       if (ctx->read_bytes == ctx->sent_bytes &&
+           ctx->read2_bytes == ctx->sent2_bytes)
+               io_loop_stop(current_ioloop);
+}
+
+static void test_ostream_multiplex_hang(void)
+{
+       int fd[2];
+
+       test_begin("ostream multiplex hang");
+       if (pipe(fd) < 0)
+               i_fatal("pipe() failed: %m");
+       fd_set_nonblock(fd[0], TRUE);
+       fd_set_nonblock(fd[1], TRUE);
+
+       struct ioloop *ioloop = io_loop_create();
+       struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
+       o_stream_set_no_error_handling(file_output, TRUE);
+       struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
+       struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
+       char buf[256];
+
+       /* send multiplex output until the buffer is full */
+       ssize_t ret, ret2;
+       size_t sent_bytes = 0, sent2_bytes = 0;
+       i_zero(&buf);
+       o_stream_cork(channel);
+       o_stream_cork(channel2);
+       while ((ret = o_stream_send(channel, buf, sizeof(buf))) > 0) {
+               sent_bytes += ret;
+               ret2 = o_stream_send(channel2, buf, sizeof(buf));
+               if (ret2 <= 0)
+                       break;
+               sent2_bytes += ret2;
+       }
+       test_assert(o_stream_finish(channel) == 0);
+       test_assert(o_stream_finish(channel2) == 0);
+       o_stream_uncork(channel);
+       o_stream_uncork(channel2);
+       /* We expect the first channel to have data buffered */
+       test_assert(o_stream_get_buffer_used_size(channel) >=
+                   o_stream_get_buffer_used_size(file_output));
+       test_assert(o_stream_get_buffer_used_size(channel) -
+                   o_stream_get_buffer_used_size(file_output) > 0);
+
+       /* read everything that was already sent */
+       struct istream *file_input = i_stream_create_fd(fd[0], 1024);
+       struct istream *input = i_stream_create_multiplex(file_input, 4096);
+       struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
+
+       struct test_hang_context ctx = {
+               .input1 = input,
+               .input2 = input2,
+               .sent_bytes = sent_bytes,
+               .sent2_bytes = sent2_bytes,
+       };
+
+       struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
+       struct io *io = io_add_istream(file_input, test_hang_input, &ctx);
+       io_loop_run(ioloop);
+       io_remove(&io);
+       timeout_remove(&to);
+
+       /* everything that was sent should have been received now.
+          ostream-multiplex's internal buffer is also supposed to have
+          been sent. */
+       test_assert(input->v_offset == sent_bytes);
+       test_assert(input2->v_offset == sent2_bytes);
+       test_assert(o_stream_get_buffer_used_size(channel) == 0);
+       test_assert(o_stream_get_buffer_used_size(channel2) == 0);
+
+       i_stream_unref(&file_input);
+       i_stream_unref(&input);
+       i_stream_unref(&input2);
+       o_stream_unref(&channel);
+       o_stream_unref(&channel2);
+       o_stream_unref(&file_output);
+       io_loop_destroy(&ioloop);
+       test_end();
+}
+
+#define FLUSH_CALLBACK_TOTAL_BYTES 10240
+
+struct test_flush_context {
+       struct ostream *output1, *output2;
+       struct istream *input1, *input2;
+};
+
+static int flush_callback1(struct test_flush_context *ctx)
+{
+       char buf[32];
+
+       i_assert(ctx->output1->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
+       size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output1->offset;
+
+       memset(buf, '1', sizeof(buf));
+       if (o_stream_send(ctx->output1, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
+               return -1;
+       return ctx->output1->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
+}
+
+static int flush_callback2(struct test_flush_context *ctx)
+{
+       char buf[64];
+
+       i_assert(ctx->output2->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
+       size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output2->offset;
+
+       memset(buf, '2', sizeof(buf));
+       if (o_stream_send(ctx->output2, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
+               return -1;
+       return ctx->output2->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
+}
+
+static void test_flush_input(struct test_flush_context *ctx)
+{
+       ssize_t ret, ret2;
+
+       do {
+               ret = i_stream_read(ctx->input1);
+               if (ret > 0)
+                       i_stream_skip(ctx->input1, ret);
+               ret2 = i_stream_read(ctx->input2);
+               if (ret2 > 0)
+                       i_stream_skip(ctx->input2, ret2);
+       } while (ret > 0 || ret2 > 0);
+
+       test_assert(ret == 0 && ret2 == 0);
+       if (ctx->input1->v_offset == FLUSH_CALLBACK_TOTAL_BYTES &&
+           ctx->input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES)
+               io_loop_stop(current_ioloop);
+}
+
+static void test_ostream_multiplex_flush_callback(void)
+{
+       int fd[2];
+
+       test_begin("ostream multiplex flush callback");
+       if (pipe(fd) < 0)
+               i_fatal("pipe() failed: %m");
+       fd_set_nonblock(fd[0], TRUE);
+       fd_set_nonblock(fd[1], TRUE);
+
+       struct ioloop *ioloop = io_loop_create();
+       struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
+       o_stream_set_no_error_handling(file_output, TRUE);
+       struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
+       struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
+
+       struct istream *file_input = i_stream_create_fd(fd[0], 1024);
+       struct istream *input = i_stream_create_multiplex(file_input, 4096);
+       struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
+
+       struct test_flush_context ctx = {
+               .output1 = channel,
+               .output2 = channel2,
+               .input1 = input,
+               .input2 = input2,
+       };
+       o_stream_set_flush_callback(channel, flush_callback1, &ctx);
+       o_stream_set_flush_callback(channel2, flush_callback2, &ctx);
+       o_stream_set_flush_pending(channel, TRUE);
+       o_stream_set_flush_pending(channel2, TRUE);
+
+       struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
+       struct io *io = io_add_istream(file_input, test_flush_input, &ctx);
+       io_loop_run(ioloop);
+       io_remove(&io);
+       timeout_remove(&to);
+
+       test_assert(channel->offset == FLUSH_CALLBACK_TOTAL_BYTES);
+       test_assert(channel2->offset == FLUSH_CALLBACK_TOTAL_BYTES);
+       test_assert(input->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
+       test_assert(input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
+
+       test_assert(o_stream_finish(channel) == 1);
+       test_assert(o_stream_finish(channel2) == 1);
+
+       i_stream_unref(&file_input);
+       i_stream_unref(&input);
+       i_stream_unref(&input2);
+       o_stream_unref(&channel);
+       o_stream_unref(&channel2);
+       o_stream_unref(&file_output);
+       io_loop_destroy(&ioloop);
+       test_end();
+}
+
 void test_ostream_multiplex(void)
 {
        test_ostream_multiplex_simple();
        test_ostream_multiplex_stream();
        test_ostream_multiplex_cork();
+       test_ostream_multiplex_hang();
+       test_ostream_multiplex_flush_callback();
 }