From 08c0bffa1c9ccb1ac67689093f3c99cd38cfd3a4 Mon Sep 17 00:00:00 2001 From: Aki Tuomi Date: Thu, 15 Sep 2016 13:57:46 +0300 Subject: [PATCH] lib: Add iostream-pump --- src/lib/Makefile.am | 2 + src/lib/iostream-pump.c | 181 ++++++++++++++++++++++++++++++++++++++++ src/lib/iostream-pump.h | 48 +++++++++++ 3 files changed, 231 insertions(+) create mode 100644 src/lib/iostream-pump.c create mode 100644 src/lib/iostream-pump.h diff --git a/src/lib/Makefile.am b/src/lib/Makefile.am index 57bb6bc9aa..9989fc780a 100644 --- a/src/lib/Makefile.am +++ b/src/lib/Makefile.am @@ -54,6 +54,7 @@ liblib_la_SOURCES = \ imem.c \ ipwd.c \ iostream.c \ + iostream-pump.c \ iostream-rawlog.c \ iostream-temp.c \ iso8601-date.c \ @@ -203,6 +204,7 @@ headers = \ ipwd.h \ iostream.h \ iostream-private.h \ + iostream-pump.h \ iostream-rawlog.h \ iostream-rawlog-private.h \ iostream-temp.h \ diff --git a/src/lib/iostream-pump.c b/src/lib/iostream-pump.c new file mode 100644 index 0000000000..147c006706 --- /dev/null +++ b/src/lib/iostream-pump.c @@ -0,0 +1,181 @@ +/* Copyright (c) 2002-2016 Dovecot authors, see the included COPYING file + */ +#include "lib.h" +#include "buffer.h" +#include "str.h" +#include "iostream-pump.h" +#include "istream.h" +#include "ostream.h" +#include + +#undef iostream_pump_set_completion_callback + +struct iostream_pump { + struct istream *input; + struct ostream *output; + + struct io *io; + + unsigned int ref; + + iostream_pump_callback_t *callback; + void *context; + bool completed; +}; + +static +void iostream_pump_copy(struct iostream_pump *pump) +{ + enum ostream_send_istream_result res; + + size_t old_size = o_stream_get_max_buffer_size(pump->output); + o_stream_set_max_buffer_size(pump->output, + I_MIN(IO_BLOCK_SIZE, + o_stream_get_max_buffer_size(pump->output))); + res = o_stream_send_istream(pump->output, pump->input); + o_stream_set_max_buffer_size(pump->output, old_size); + + switch(res) { + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT: + case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT: + io_remove(&pump->io); + pump->callback(FALSE, pump->context); + return; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT: + io_remove(&pump->io); + return; + case OSTREAM_SEND_ISTREAM_RESULT_FINISHED: + io_remove(&pump->io); + /* flush it */ + switch (o_stream_flush(pump->output)) { + case -1: + pump->callback(FALSE, pump->context); + break; + case 0: + pump->completed = TRUE; + break; + default: + pump->callback(TRUE, pump->context); + break; + } + return; + case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT: + return; + } + i_unreached(); +} + +static +int iostream_pump_flush(struct iostream_pump *pump) +{ + int ret; + if ((ret = o_stream_flush(pump->output)) <= 0) { + if (ret < 0) + pump->callback(FALSE, pump->context); + return ret; + } + if (pump->completed) { + pump->callback(TRUE, pump->context); + return 1; + } + + if (pump->io == NULL) { + pump->io = io_add_istream(pump->input, iostream_pump_copy, pump); + io_set_pending(pump->io); + } + return ret; +} + +struct iostream_pump * +iostream_pump_create(struct istream *input, struct ostream *output) +{ + i_assert(input != NULL && + output != NULL); + + /* ref streams */ + i_stream_ref(input); + o_stream_ref(output); + + /* create pump */ + struct iostream_pump *pump = i_new(struct iostream_pump, 1); + pump->input = input; + pump->output = output; + + pump->ref = 1; + + return pump; +} + +void iostream_pump_start(struct iostream_pump *pump) +{ + i_assert(pump != NULL); + i_assert(pump->callback != NULL); + + /* add flush handler */ + o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump); + + /* make IO objects */ + pump->io = io_add_istream(pump->input, iostream_pump_copy, pump); + + /* make sure we do first read right away */ + io_set_pending(pump->io); +} + +struct istream *iostream_pump_get_input(struct iostream_pump *pump) +{ + i_assert(pump != NULL); + return pump->input; +} + +struct ostream *iostream_pump_get_output(struct iostream_pump *pump) +{ + i_assert(pump != NULL); + return pump->output; +} + +void iostream_pump_set_completion_callback(struct iostream_pump *pump, + iostream_pump_callback_t *callback, void *context) +{ + i_assert(pump != NULL); + pump->callback = callback; + pump->context = context; +} + +void iostream_pump_ref(struct iostream_pump *pump) +{ + i_assert(pump != NULL && pump->ref > 0); + pump->ref++; +} + +void iostream_pump_unref(struct iostream_pump **pump_r) +{ + i_assert(pump_r != NULL && *pump_r != NULL); + struct iostream_pump *pump = *pump_r; + *pump_r = NULL; + + i_assert(pump->ref > 0); + if (--pump->ref == 0) { + iostream_pump_stop(pump); + o_stream_unref(&pump->output); + i_stream_unref(&pump->input); + i_free(pump); + } +} + +void iostream_pump_stop(struct iostream_pump *pump) +{ + i_assert(pump != NULL); + + o_stream_unset_flush_callback(pump->output); + + if (pump->io != NULL) + io_remove(&pump->io); +} + +void iostream_pump_switch_ioloop(struct iostream_pump *pump) +{ + i_assert(pump != NULL); + pump->io = io_loop_move_io(&pump->io); + o_stream_switch_ioloop(pump->output); + i_stream_switch_ioloop(pump->input); +} diff --git a/src/lib/iostream-pump.h b/src/lib/iostream-pump.h new file mode 100644 index 0000000000..8c606fcc75 --- /dev/null +++ b/src/lib/iostream-pump.h @@ -0,0 +1,48 @@ +/* Copyright (c) 2002-2016 Dovecot authors, see the included COPYING file + */ +#ifndef IOSTREAM_PUMP_H +#define IOSTREAM_PUMP_H 1 + +/** + +iostream-pump +============= + +This construct pumps data from istream to ostream asynchronously. + +The pump requires you to provide completion callback. The +completion callback is called with success parameter to +indicate whether it ended with error. + +The istream and ostream are reffed on creation and unreffed +on unref. + +**/ + +struct istream; +struct ostream; +struct iostream_pump; + +typedef void iostream_pump_callback_t(bool success, void *context); + +struct iostream_pump * +iostream_pump_create(struct istream *input, struct ostream *output); + +struct istream *iostream_pump_get_input(struct iostream_pump *pump); +struct ostream *iostream_pump_get_output(struct iostream_pump *pump); + +void iostream_pump_start(struct iostream_pump *pump); +void iostream_pump_stop(struct iostream_pump *pump); + +void iostream_pump_ref(struct iostream_pump *pump); +void iostream_pump_unref(struct iostream_pump **pump_r); + +void iostream_pump_set_completion_callback(struct iostream_pump *pump, + iostream_pump_callback_t *callback, void *context); +#define iostream_pump_set_completion_callback(pump, callback, context) \ + iostream_pump_set_completion_callback(pump, (iostream_pump_callback_t *)callback, context + \ + CALLBACK_TYPECHECK(callback, void (*)(bool, typeof(context)))) + +void iostream_pump_switch_ioloop(struct iostream_pump *pump); + +#endif -- 2.47.3