]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: Add iostream-pump
authorAki Tuomi <aki.tuomi@dovecot.fi>
Thu, 15 Sep 2016 10:57:46 +0000 (13:57 +0300)
committerAki Tuomi <aki.tuomi@dovecot.fi>
Thu, 17 Nov 2016 20:15:19 +0000 (22:15 +0200)
src/lib/Makefile.am
src/lib/iostream-pump.c [new file with mode: 0644]
src/lib/iostream-pump.h [new file with mode: 0644]

index 57bb6bc9aaff9aec7dd93fb97fd252dbbfc98827..9989fc780af945a7b78cac877546477d7c78095c 100644 (file)
@@ -54,6 +54,7 @@ liblib_la_SOURCES = \
        imem.c \
        ipwd.c \
        iostream.c \
+       iostream-pump.c \
        iostream-rawlog.c \
        iostream-temp.c \
        iso8601-date.c \
@@ -203,6 +204,7 @@ headers = \
        ipwd.h \
        iostream.h \
        iostream-private.h \
+       iostream-pump.h \
        iostream-rawlog.h \
        iostream-rawlog-private.h \
        iostream-temp.h \
diff --git a/src/lib/iostream-pump.c b/src/lib/iostream-pump.c
new file mode 100644 (file)
index 0000000..147c006
--- /dev/null
@@ -0,0 +1,181 @@
+/* Copyright (c) 2002-2016 Dovecot authors, see the included COPYING file
+ */
+#include "lib.h"
+#include "buffer.h"
+#include "str.h"
+#include "iostream-pump.h"
+#include "istream.h"
+#include "ostream.h"
+#include <unistd.h>
+
+#undef iostream_pump_set_completion_callback
+
+struct iostream_pump {
+       struct istream *input;
+       struct ostream *output;
+
+       struct io *io;
+
+       unsigned int ref;
+
+       iostream_pump_callback_t *callback;
+       void *context;
+       bool completed;
+};
+
+static
+void iostream_pump_copy(struct iostream_pump *pump)
+{
+       enum ostream_send_istream_result res;
+
+       size_t old_size = o_stream_get_max_buffer_size(pump->output);
+       o_stream_set_max_buffer_size(pump->output,
+                                    I_MIN(IO_BLOCK_SIZE,
+                                          o_stream_get_max_buffer_size(pump->output)));
+       res = o_stream_send_istream(pump->output, pump->input);
+       o_stream_set_max_buffer_size(pump->output, old_size);
+
+       switch(res) {
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_INPUT:
+       case OSTREAM_SEND_ISTREAM_RESULT_ERROR_OUTPUT:
+               io_remove(&pump->io);
+               pump->callback(FALSE, pump->context);
+               return;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_OUTPUT:
+               io_remove(&pump->io);
+               return;
+       case OSTREAM_SEND_ISTREAM_RESULT_FINISHED:
+               io_remove(&pump->io);
+               /* flush it */
+               switch (o_stream_flush(pump->output)) {
+               case -1:
+                       pump->callback(FALSE, pump->context);
+                       break;
+               case 0:
+                       pump->completed = TRUE;
+                       break;
+               default:
+                       pump->callback(TRUE, pump->context);
+                       break;
+               }
+               return;
+       case OSTREAM_SEND_ISTREAM_RESULT_WAIT_INPUT:
+               return;
+       }
+       i_unreached();
+}
+
+static
+int iostream_pump_flush(struct iostream_pump *pump)
+{
+       int ret;
+       if ((ret = o_stream_flush(pump->output)) <= 0) {
+               if (ret < 0)
+                       pump->callback(FALSE, pump->context);
+               return ret;
+       }
+       if (pump->completed) {
+               pump->callback(TRUE, pump->context);
+               return 1;
+       }
+
+       if (pump->io == NULL) {
+               pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
+               io_set_pending(pump->io);
+       }
+       return ret;
+}
+
+struct iostream_pump *
+iostream_pump_create(struct istream *input, struct ostream *output)
+{
+       i_assert(input != NULL &&
+                output != NULL);
+
+       /* ref streams */
+       i_stream_ref(input);
+       o_stream_ref(output);
+
+       /* create pump */
+       struct iostream_pump *pump = i_new(struct iostream_pump, 1);
+       pump->input = input;
+       pump->output = output;
+
+       pump->ref = 1;
+
+       return pump;
+}
+
+void iostream_pump_start(struct iostream_pump *pump)
+{
+       i_assert(pump != NULL);
+       i_assert(pump->callback != NULL);
+
+       /* add flush handler */
+       o_stream_set_flush_callback(pump->output, iostream_pump_flush, pump);
+
+       /* make IO objects */
+       pump->io = io_add_istream(pump->input, iostream_pump_copy, pump);
+
+       /* make sure we do first read right away */
+       io_set_pending(pump->io);
+}
+
+struct istream *iostream_pump_get_input(struct iostream_pump *pump)
+{
+       i_assert(pump != NULL);
+       return pump->input;
+}
+
+struct ostream *iostream_pump_get_output(struct iostream_pump *pump)
+{
+       i_assert(pump != NULL);
+       return pump->output;
+}
+
+void iostream_pump_set_completion_callback(struct iostream_pump *pump,
+                                          iostream_pump_callback_t *callback, void *context)
+{
+       i_assert(pump != NULL);
+       pump->callback = callback;
+       pump->context = context;
+}
+
+void iostream_pump_ref(struct iostream_pump *pump)
+{
+       i_assert(pump != NULL && pump->ref > 0);
+       pump->ref++;
+}
+
+void iostream_pump_unref(struct iostream_pump **pump_r)
+{
+       i_assert(pump_r != NULL && *pump_r != NULL);
+       struct iostream_pump *pump = *pump_r;
+       *pump_r = NULL;
+
+       i_assert(pump->ref > 0);
+       if (--pump->ref == 0) {
+               iostream_pump_stop(pump);
+               o_stream_unref(&pump->output);
+               i_stream_unref(&pump->input);
+               i_free(pump);
+       }
+}
+
+void iostream_pump_stop(struct iostream_pump *pump)
+{
+       i_assert(pump != NULL);
+
+       o_stream_unset_flush_callback(pump->output);
+
+       if (pump->io != NULL)
+               io_remove(&pump->io);
+}
+
+void iostream_pump_switch_ioloop(struct iostream_pump *pump)
+{
+       i_assert(pump != NULL);
+       pump->io = io_loop_move_io(&pump->io);
+       o_stream_switch_ioloop(pump->output);
+       i_stream_switch_ioloop(pump->input);
+}
diff --git a/src/lib/iostream-pump.h b/src/lib/iostream-pump.h
new file mode 100644 (file)
index 0000000..8c606fc
--- /dev/null
@@ -0,0 +1,48 @@
+/* Copyright (c) 2002-2016 Dovecot authors, see the included COPYING file
+ */
+#ifndef IOSTREAM_PUMP_H
+#define IOSTREAM_PUMP_H 1
+
+/**
+
+iostream-pump
+=============
+
+This construct pumps data from istream to ostream asynchronously.
+
+The pump requires you to provide completion callback. The
+completion callback is called with success parameter to
+indicate whether it ended with error.
+
+The istream and ostream are reffed on creation and unreffed
+on unref.
+
+**/
+
+struct istream;
+struct ostream;
+struct iostream_pump;
+
+typedef void iostream_pump_callback_t(bool success, void *context);
+
+struct iostream_pump *
+iostream_pump_create(struct istream *input, struct ostream *output);
+
+struct istream *iostream_pump_get_input(struct iostream_pump *pump);
+struct ostream *iostream_pump_get_output(struct iostream_pump *pump);
+
+void iostream_pump_start(struct iostream_pump *pump);
+void iostream_pump_stop(struct iostream_pump *pump);
+
+void iostream_pump_ref(struct iostream_pump *pump);
+void iostream_pump_unref(struct iostream_pump **pump_r);
+
+void iostream_pump_set_completion_callback(struct iostream_pump *pump,
+                                          iostream_pump_callback_t *callback, void *context);
+#define iostream_pump_set_completion_callback(pump, callback, context) \
+       iostream_pump_set_completion_callback(pump, (iostream_pump_callback_t *)callback, context + \
+               CALLBACK_TYPECHECK(callback, void (*)(bool, typeof(context))))
+
+void iostream_pump_switch_ioloop(struct iostream_pump *pump);
+
+#endif