struct multiplex_ostream {
struct ostream *parent;
+ stream_flush_callback_t *old_flush_callback;
+ void *old_flush_context;
+
/* channel 0 is main channel */
uint8_t cur_channel;
unsigned int remain;
return channel;
}
-static void
+static bool
o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
{
struct multiplex_ochannel *channel;
ssize_t ret = 0;
+ bool all_sent = TRUE;
while((channel = get_next_channel(mstream)) != NULL) {
if (channel->buf->used == 0)
continue;
- if (o_stream_get_buffer_avail_size(mstream->parent) < 6)
+ if (o_stream_get_buffer_avail_size(mstream->parent) < 6) {
+ all_sent = FALSE;
break;
+ }
/* check parent stream capacity */
size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5;
/* ensure it fits into 32 bit int */
}
if (o_stream_is_corked(mstream->parent))
o_stream_uncork(mstream->parent);
+ return all_sent;
+}
+
+static int o_stream_multiplex_flush(struct multiplex_ostream *mstream)
+{
+ int ret = o_stream_flush(mstream->parent);
+ if (ret >= 0) {
+ if (!o_stream_multiplex_sendv(mstream))
+ ret = 0;
+ }
+ if (ret <= 0)
+ return ret;
+
+ /* Everything is flushed. See if one of the callbacks' flush callbacks
+ wants to write more data. */
+ struct multiplex_ochannel **channelp;
+ bool unfinished = FALSE;
+ array_foreach_modifiable(&mstream->channels, channelp) {
+ if (*channelp != NULL && (*channelp)->ostream.callback != NULL) {
+ ret = (*channelp)->ostream.callback((*channelp)->ostream.context);
+ if (ret < 0)
+ return -1;
+ if (ret == 0)
+ unfinished = TRUE;
+ }
+ }
+ return unfinished ? 0 : 1;
}
static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream)
return total;
}
+static void
+o_stream_multiplex_ochannel_set_flush_callback(struct ostream_private *stream,
+ stream_flush_callback_t *callback,
+ void *context)
+{
+ /* We have overwritten our parent's flush-callback. Don't change it. */
+ stream->callback = callback;
+ stream->context = context;
+}
+
static size_t
o_stream_multiplex_ochannel_get_buffer_used_size(const struct ostream_private *stream)
{
array_foreach_modifiable(&mstream->channels, channelp)
if (*channelp != NULL)
return;
+
+ i_assert(mstream->parent->real_stream->callback ==
+ (stream_flush_callback_t *)o_stream_multiplex_flush);
+ o_stream_set_flush_callback(mstream->parent,
+ *mstream->old_flush_callback,
+ mstream->old_flush_context);
o_stream_unref(&mstream->parent);
array_free(&mstream->channels);
i_free(mstream);
channel->ostream.cork = o_stream_multiplex_ochannel_cork;
channel->ostream.flush = o_stream_multiplex_ochannel_flush;
channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
+ channel->ostream.set_flush_callback =
+ o_stream_multiplex_ochannel_set_flush_callback;
channel->ostream.get_buffer_used_size =
o_stream_multiplex_ochannel_get_buffer_used_size;
channel->ostream.get_buffer_avail_size =
channel->ostream.fd = o_stream_get_fd(mstream->parent);
array_push_back(&channel->mstream->channels, &channel);
- return o_stream_create(&channel->ostream, mstream->parent, -1);
+ (void)o_stream_create(&channel->ostream, mstream->parent, -1);
+ /* o_stream_create() defaults the flush_callback to parent's callback.
+ Here it points to o_stream_multiplex_flush(), which just causes
+ infinite looping. */
+ channel->ostream.callback = NULL;
+ channel->ostream.context = NULL;
+ return &channel->ostream.ostream;
}
struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
mstream = i_new(struct multiplex_ostream, 1);
mstream->parent = parent;
mstream->bufsize = bufsize;
+ mstream->old_flush_callback = parent->real_stream->callback;
+ mstream->old_flush_context = parent->real_stream->context;
+ o_stream_set_flush_callback(parent, o_stream_multiplex_flush, mstream);
i_array_init(&mstream->channels, 8);
o_stream_ref(parent);
#include "str.h"
#include "istream.h"
#include "ostream-private.h"
+#include "istream-multiplex.h"
#include "ostream-multiplex.h"
#include "ostream.h"
#include <unistd.h>
test_end();
}
+struct test_hang_context {
+ struct istream *input1, *input2;
+ size_t sent_bytes, sent2_bytes;
+ size_t read_bytes, read2_bytes;
+};
+
+static void test_hang_input(struct test_hang_context *ctx)
+{
+ ssize_t ret, ret2;
+
+ do {
+ ret = i_stream_read(ctx->input1);
+ if (ret > 0) {
+ i_stream_skip(ctx->input1, ret);
+ ctx->read_bytes += ret;
+ }
+ ret2 = i_stream_read(ctx->input2);
+ if (ret2 > 0) {
+ i_stream_skip(ctx->input2, ret2);
+ ctx->read2_bytes += ret2;
+ }
+ } while (ret > 0 || ret2 > 0);
+
+ test_assert(ret == 0 && ret2 == 0);
+ if (ctx->read_bytes == ctx->sent_bytes &&
+ ctx->read2_bytes == ctx->sent2_bytes)
+ io_loop_stop(current_ioloop);
+}
+
+static void test_ostream_multiplex_hang(void)
+{
+ int fd[2];
+
+ test_begin("ostream multiplex hang");
+ if (pipe(fd) < 0)
+ i_fatal("pipe() failed: %m");
+ fd_set_nonblock(fd[0], TRUE);
+ fd_set_nonblock(fd[1], TRUE);
+
+ struct ioloop *ioloop = io_loop_create();
+ struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
+ o_stream_set_no_error_handling(file_output, TRUE);
+ struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
+ struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
+ char buf[256];
+
+ /* send multiplex output until the buffer is full */
+ ssize_t ret, ret2;
+ size_t sent_bytes = 0, sent2_bytes = 0;
+ i_zero(&buf);
+ o_stream_cork(channel);
+ o_stream_cork(channel2);
+ while ((ret = o_stream_send(channel, buf, sizeof(buf))) > 0) {
+ sent_bytes += ret;
+ ret2 = o_stream_send(channel2, buf, sizeof(buf));
+ if (ret2 <= 0)
+ break;
+ sent2_bytes += ret2;
+ }
+ test_assert(o_stream_finish(channel) == 0);
+ test_assert(o_stream_finish(channel2) == 0);
+ o_stream_uncork(channel);
+ o_stream_uncork(channel2);
+ /* We expect the first channel to have data buffered */
+ test_assert(o_stream_get_buffer_used_size(channel) >=
+ o_stream_get_buffer_used_size(file_output));
+ test_assert(o_stream_get_buffer_used_size(channel) -
+ o_stream_get_buffer_used_size(file_output) > 0);
+
+ /* read everything that was already sent */
+ struct istream *file_input = i_stream_create_fd(fd[0], 1024);
+ struct istream *input = i_stream_create_multiplex(file_input, 4096);
+ struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
+
+ struct test_hang_context ctx = {
+ .input1 = input,
+ .input2 = input2,
+ .sent_bytes = sent_bytes,
+ .sent2_bytes = sent2_bytes,
+ };
+
+ struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
+ struct io *io = io_add_istream(file_input, test_hang_input, &ctx);
+ io_loop_run(ioloop);
+ io_remove(&io);
+ timeout_remove(&to);
+
+ /* everything that was sent should have been received now.
+ ostream-multiplex's internal buffer is also supposed to have
+ been sent. */
+ test_assert(input->v_offset == sent_bytes);
+ test_assert(input2->v_offset == sent2_bytes);
+ test_assert(o_stream_get_buffer_used_size(channel) == 0);
+ test_assert(o_stream_get_buffer_used_size(channel2) == 0);
+
+ i_stream_unref(&file_input);
+ i_stream_unref(&input);
+ i_stream_unref(&input2);
+ o_stream_unref(&channel);
+ o_stream_unref(&channel2);
+ o_stream_unref(&file_output);
+ io_loop_destroy(&ioloop);
+ test_end();
+}
+
+#define FLUSH_CALLBACK_TOTAL_BYTES 10240
+
+struct test_flush_context {
+ struct ostream *output1, *output2;
+ struct istream *input1, *input2;
+};
+
+static int flush_callback1(struct test_flush_context *ctx)
+{
+ char buf[32];
+
+ i_assert(ctx->output1->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
+ size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output1->offset;
+
+ memset(buf, '1', sizeof(buf));
+ if (o_stream_send(ctx->output1, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
+ return -1;
+ return ctx->output1->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
+}
+
+static int flush_callback2(struct test_flush_context *ctx)
+{
+ char buf[64];
+
+ i_assert(ctx->output2->offset <= FLUSH_CALLBACK_TOTAL_BYTES);
+ size_t bytes_left = FLUSH_CALLBACK_TOTAL_BYTES - ctx->output2->offset;
+
+ memset(buf, '2', sizeof(buf));
+ if (o_stream_send(ctx->output2, buf, I_MIN(sizeof(buf), bytes_left)) < 0)
+ return -1;
+ return ctx->output2->offset < FLUSH_CALLBACK_TOTAL_BYTES ? 0 : 1;
+}
+
+static void test_flush_input(struct test_flush_context *ctx)
+{
+ ssize_t ret, ret2;
+
+ do {
+ ret = i_stream_read(ctx->input1);
+ if (ret > 0)
+ i_stream_skip(ctx->input1, ret);
+ ret2 = i_stream_read(ctx->input2);
+ if (ret2 > 0)
+ i_stream_skip(ctx->input2, ret2);
+ } while (ret > 0 || ret2 > 0);
+
+ test_assert(ret == 0 && ret2 == 0);
+ if (ctx->input1->v_offset == FLUSH_CALLBACK_TOTAL_BYTES &&
+ ctx->input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES)
+ io_loop_stop(current_ioloop);
+}
+
+static void test_ostream_multiplex_flush_callback(void)
+{
+ int fd[2];
+
+ test_begin("ostream multiplex flush callback");
+ if (pipe(fd) < 0)
+ i_fatal("pipe() failed: %m");
+ fd_set_nonblock(fd[0], TRUE);
+ fd_set_nonblock(fd[1], TRUE);
+
+ struct ioloop *ioloop = io_loop_create();
+ struct ostream *file_output = o_stream_create_fd(fd[1], 1024);
+ o_stream_set_no_error_handling(file_output, TRUE);
+ struct ostream *channel = o_stream_create_multiplex(file_output, 4096);
+ struct ostream *channel2 = o_stream_multiplex_add_channel(channel, 1);
+
+ struct istream *file_input = i_stream_create_fd(fd[0], 1024);
+ struct istream *input = i_stream_create_multiplex(file_input, 4096);
+ struct istream *input2 = i_stream_multiplex_add_channel(input, 1);
+
+ struct test_flush_context ctx = {
+ .output1 = channel,
+ .output2 = channel2,
+ .input1 = input,
+ .input2 = input2,
+ };
+ o_stream_set_flush_callback(channel, flush_callback1, &ctx);
+ o_stream_set_flush_callback(channel2, flush_callback2, &ctx);
+ o_stream_set_flush_pending(channel, TRUE);
+ o_stream_set_flush_pending(channel2, TRUE);
+
+ struct timeout *to = timeout_add(5000, io_loop_stop, current_ioloop);
+ struct io *io = io_add_istream(file_input, test_flush_input, &ctx);
+ io_loop_run(ioloop);
+ io_remove(&io);
+ timeout_remove(&to);
+
+ test_assert(channel->offset == FLUSH_CALLBACK_TOTAL_BYTES);
+ test_assert(channel2->offset == FLUSH_CALLBACK_TOTAL_BYTES);
+ test_assert(input->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
+ test_assert(input2->v_offset == FLUSH_CALLBACK_TOTAL_BYTES);
+
+ test_assert(o_stream_finish(channel) == 1);
+ test_assert(o_stream_finish(channel2) == 1);
+
+ i_stream_unref(&file_input);
+ i_stream_unref(&input);
+ i_stream_unref(&input2);
+ o_stream_unref(&channel);
+ o_stream_unref(&channel2);
+ o_stream_unref(&file_output);
+ io_loop_destroy(&ioloop);
+ test_end();
+}
+
void test_ostream_multiplex(void)
{
test_ostream_multiplex_simple();
test_ostream_multiplex_stream();
test_ostream_multiplex_cork();
+ test_ostream_multiplex_hang();
+ test_ostream_multiplex_flush_callback();
}