]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: Create wrapper output stream.
authorStephan Bosch <stephan.bosch@dovecot.fi>
Sun, 25 Nov 2018 20:24:07 +0000 (21:24 +0100)
committermartti.rannanjarvi <martti.rannanjarvi@open-xchange.com>
Sat, 18 Apr 2020 14:55:11 +0000 (14:55 +0000)
This allows creating the stream before the actual output is available, thereby
wrapping the whole process of making a connection, doing protocol handshakes,
and submitting the request. Once there's an actual output stream, data from the
wrapper stream can be sent. Until that time, the wrapper will block or return 0
(based on whether it is blocking or not).

The wrapper stream is a partial implementation that needs to be completed (by in
heritance) by the respective protocol implementation.

src/lib/Makefile.am
src/lib/ostream-wrapper.c [new file with mode: 0644]
src/lib/ostream-wrapper.h [new file with mode: 0644]

index 45f1863dd58291a83a8995946507d0f46b1a5539..91f61b0d0c2844814e250d80496d200aa702325b 100644 (file)
@@ -128,6 +128,7 @@ liblib_la_SOURCES = \
        ostream-null.c \
        ostream-rawlog.c \
        ostream-unix.c \
+       ostream-wrapper.c \
        path-util.c \
        pkcs5.c \
        primes.c \
@@ -281,6 +282,7 @@ headers = \
        ostream-null.h \
        ostream-rawlog.h \
        ostream-unix.h \
+       ostream-wrapper.h \
        path-util.h \
        pkcs5.h \
        primes.h \
diff --git a/src/lib/ostream-wrapper.c b/src/lib/ostream-wrapper.c
new file mode 100644 (file)
index 0000000..55aa017
--- /dev/null
@@ -0,0 +1,1223 @@
+/* Copyright (c) 2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "buffer.h"
+#include "ostream-private.h"
+
+#include "ostream-wrapper.h"
+
+static int wrapper_ostream_flush(struct ostream_private *stream);
+static void
+wrapper_ostream_switch_ioloop_to(struct ostream_private *stream,
+                                struct ioloop *ioloop);
+
+/*
+ * Buffer
+ */
+
+/* Determine the optimum buffer size for the wrapper stream itself. */
+static inline size_t
+wrapper_ostream_optimal_size(struct wrapper_ostream *wostream)
+{
+       size_t optimal_size = wostream->ostream.max_buffer_size;
+
+       if (wostream->output != NULL) {
+               optimal_size = I_MIN(
+                       o_stream_get_max_buffer_size(wostream->output),
+                       optimal_size);
+       }
+       if (optimal_size == SIZE_MAX)
+               optimal_size = IO_BLOCK_SIZE;
+
+       return optimal_size;
+}
+
+/* Return the current size of the wrapper output stream buffer. */
+static inline size_t wrapper_ostream_size(struct wrapper_ostream *wostream)
+{
+       buffer_t *buffer = wostream->buffer;
+
+       if (buffer == NULL)
+               return 0;
+       return buffer->used;
+}
+
+/* Return TRUE when the wrapper stream's internal buffer is empty. */
+static inline bool wrapper_ostream_is_empty(struct wrapper_ostream *wostream)
+{
+       return (wrapper_ostream_size(wostream) == 0);
+}
+/* Return TRUE when the wrapper stream's internal buffer is filled to the
+   maximum. */
+static inline bool wrapper_ostream_is_full(struct wrapper_ostream *wostream)
+{
+       return (wrapper_ostream_size(wostream) >=
+               wostream->ostream.max_buffer_size);
+}
+/* Return TRUE when the wrapper stream's internal buffer is filled at or beyond
+   the optimum. */
+static inline bool wrapper_ostream_is_filled(struct wrapper_ostream *wostream)
+{
+       return (wrapper_ostream_size(wostream) >=
+               wrapper_ostream_optimal_size(wostream));
+}
+
+/*
+ * Underlying output
+ */
+
+/* Handle error in the underlying output stream (the parent). */
+static void
+wrapper_ostream_handle_parent_error(struct wrapper_ostream *wostream)
+{
+       i_assert(wostream->output != NULL);
+
+       wostream->ostream.ostream.stream_errno =
+               wostream->output->stream_errno;
+       wostream->ostream.ostream.overflow =
+               wostream->output->overflow;
+       if (wostream->output->closed)
+               o_stream_close(&wostream->ostream.ostream);
+
+       if (wostream->output_error != NULL)
+               wostream->output_error(wostream);
+}
+
+static void wrapper_ostream_closed(struct wrapper_ostream *wostream)
+{
+       wostream->ostream.ostream.closed = TRUE;
+}
+
+/* Drop the underlying output. */
+static void wrapper_ostream_output_close(struct wrapper_ostream *wostream)
+{
+       o_stream_unref(&wostream->output);
+       wostream->output_finished = TRUE;
+       wostream->output_closed = TRUE;
+       wostream->output_closed_api = TRUE;
+}
+
+/* Method calls */
+
+/* Called when the implementation should start making the parent output stream
+   available, e.g. connect to the server (see output_start() method).
+ */
+static void wrapper_ostream_output_start(struct wrapper_ostream *wostream)
+{
+       if (wostream->output_started)
+               return;
+       wostream->output_started = TRUE;
+       if (wostream->output_start != NULL)
+               wostream->output_start(wostream);
+}
+
+/* Returns TRUE when the output is ready for data (see output_ready() method).
+ */
+static bool wrapper_ostream_output_ready(struct wrapper_ostream *wostream)
+{
+       i_assert(wostream->output_ready != NULL);
+       return wostream->output_ready(wostream);
+}
+
+/* Finish the underlying output (see output_finish() method).*/
+static int wrapper_ostream_output_finish(struct wrapper_ostream *wostream)
+{
+       i_assert(wostream->output_finish != NULL);
+       return wostream->output_finish(wostream);
+}
+
+/* Called when the wrapper ostream does not need write to parent output stream.
+   (see output_halt() method).
+ */
+static void wrapper_ostream_output_halt(struct wrapper_ostream *wostream)
+{
+       if (wostream->output_closed)
+               return;
+       if (wostream->output_halt != NULL)
+               wostream->output_halt(wostream);
+}
+
+/* Called when the wrapper ostream has data available for the parent output and
+   wants wrapper_ostream_continue() to be called when the parent stream is
+   writeable (see output_resume() method). */
+static void wrapper_ostream_output_resume(struct wrapper_ostream *wostream)
+{
+       if (wostream->output_closed)
+               return;
+       if (wostream->output_resume != NULL)
+               wostream->output_resume(wostream);
+}
+
+/* Update any timeouts for the underlying (parent) output (see
+  output_update_timeouts() method). */
+static void
+wrapper_ostream_output_update_timeouts(struct wrapper_ostream *wostream)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       bool sender_blocking;
+
+       if (wostream->output_closed)
+               return;
+       if (wostream->output_update_timeouts == NULL)
+               return;
+
+       sender_blocking = (!stream->finished &&
+                          (wrapper_ostream_is_empty(wostream) ||
+                           (stream->corked &&
+                            !wrapper_ostream_is_filled(wostream))));
+       wostream->output_update_timeouts(wostream, sender_blocking);
+}
+
+/*
+ * Wrapper
+ */
+
+/* Halt/resume the underlying output based on the state of the wrapper stream.
+ */
+static void
+wrapper_ostream_output_manage(struct wrapper_ostream *wostream, bool sending)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       bool no_data;
+
+       if (wostream->output_closed)
+               return;
+
+       no_data = (!sending && wrapper_ostream_is_empty(wostream)) ||
+                  (stream->corked && !wostream->flushing &&
+                   !stream->finished && !wrapper_ostream_is_filled(wostream));
+
+       if ((stream->ostream.closed || no_data) && !wostream->flush_pending)
+               wrapper_ostream_output_halt(wostream);
+       else {
+               wrapper_ostream_output_resume(wostream);
+               if (wostream->flush_pending && wostream->output != NULL)
+                       o_stream_set_flush_pending(wostream->output, TRUE);
+       }
+}
+
+/* Handle any pending error by making it available to the application through
+   the output stream API. */
+static int
+wrapper_ostream_handle_pending_error(struct wrapper_ostream *wostream)
+{
+       struct ostream_private *stream = &wostream->ostream;
+
+       if (wostream->pending_errno != 0) {
+               if (wostream->pending_error != NULL) {
+                       io_stream_set_error(&stream->iostream,
+                                           "%s", wostream->pending_error);
+               }
+               stream->ostream.stream_errno = wostream->pending_errno;
+               wostream->pending_errno = 0;
+               wostream->returned_error = TRUE;
+               wrapper_ostream_closed(wostream);
+               i_free_and_null(wostream->pending_error);
+               return -1;
+       }
+       return 0;
+}
+
+/* Called when the wrapper stream is first finished using o_stream_finish(). */
+static int wrapper_ostream_finish(struct wrapper_ostream *wostream)
+{
+       int ret;
+
+       if (wostream->output_closed) {
+               if (wrapper_ostream_handle_pending_error(wostream) < 0)
+                       return -1;
+               return 1;
+       }
+
+       if (!wrapper_ostream_output_ready(wostream)) {
+               return 0;
+       }
+
+       wostream->output_finished = TRUE;
+       if (wostream->output != NULL) {
+               if (o_stream_uncork_flush(wostream->output) < 0) {
+                       wrapper_ostream_handle_parent_error(wostream);
+                       o_stream_unref(&wostream->output);
+                       return -1;
+               }
+       }
+
+       /* Finished sending payload; now also finish the underlying output. */
+       ret = wrapper_ostream_output_finish(wostream);
+       if (ret <= 0)
+               return ret;
+
+       if (wrapper_ostream_handle_pending_error(wostream) < 0)
+               return -1;
+       wrapper_ostream_output_close(wostream);
+       return 1;
+}
+
+/* Wait in ioloop until underlying (parent) output can be flushed. This is
+   called only when the wrapper stream is blocking. */
+static int
+wrapper_ostream_flush_wait(struct wrapper_ostream *wostream)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       struct ioloop *ioloop, *prev_ioloop;
+       bool was_corked = FALSE;
+
+       wrapper_ostream_output_manage(wostream, !wostream->flushing);
+
+       /* Cannot be already waiting */
+       i_assert(!wostream->flush_waiting);
+       i_assert(wostream->flush_ioloop == NULL);
+
+       i_assert(wostream->wait_begin != NULL);
+       i_assert(wostream->wait_end != NULL);
+
+       if (wostream->output != NULL && o_stream_is_corked(wostream->output)) {
+               /* Make sure parent is uncorked here to make sure output IO is
+                  active. */
+               if (o_stream_uncork_flush(wostream->output) < 0) {
+                       wrapper_ostream_handle_parent_error(wostream);
+                       return -1;
+               }
+               was_corked = TRUE;
+       }
+
+       wostream->flush_ioloop = ioloop = io_loop_create();
+       prev_ioloop = wostream->wait_begin(wostream, ioloop);
+       o_stream_switch_ioloop_to(&wostream->ostream.ostream, ioloop);
+
+       /* Either we're waiting for network I/O or we're getting out of a
+          callback using timeout_add_short(0) */
+       i_assert(io_loop_have_ios(ioloop) ||
+                io_loop_have_immediate_timeouts(ioloop));
+
+       wostream->flush_waiting = TRUE;
+       do {
+               e_debug(wostream->event, "Waiting for output flush");
+               io_loop_run(ioloop);
+       } while (wostream->flush_waiting);
+
+       e_debug(wostream->event, "Can now flush output");
+
+       o_stream_switch_ioloop_to(&wostream->ostream.ostream, prev_ioloop);
+       wostream->wait_end(wostream, prev_ioloop);
+       io_loop_destroy(&ioloop);
+       wostream->flush_ioloop = NULL;
+
+       if (stream->ostream.blocking)
+               wrapper_ostream_output_halt(wostream);
+
+       if (was_corked && wostream->output != NULL)
+               o_stream_cork(wostream->output);
+
+       if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+               /* Stream already hit an error */
+               return -1;
+       }
+       return 0;
+}
+
+/* Try to flush the underlying (parent) output. */
+static int wrapper_ostream_flush_parent(struct wrapper_ostream *wostream)
+{
+       struct ostream *parent;
+
+       if (wostream->output_closed) {
+               /* Output already dropped; nothing to flush */
+               return 1;
+       }
+       if (!wrapper_ostream_output_ready(wostream)) {
+               /* There is no parent ostream yet */
+               return 1;
+       }
+
+       parent = wostream->output;
+       if (parent == NULL) {
+               /* There is no parent ostream anymore */
+               i_assert(wostream->buffer == NULL ||
+                        wostream->buffer->used == 0);
+               return 1;
+       }
+       if (o_stream_get_buffer_used_size(parent) >= IO_BLOCK_SIZE) {
+               /* We already have quite a lot of data in parent stream.
+                  unless we can flush it, don't add any more to it or we
+                  could keep wasting memory by just increasing the buffer
+                  size all the time. */
+               if (o_stream_flush(parent) < 0) {
+                       wrapper_ostream_handle_parent_error(wostream);
+                       return -1;
+               }
+               if (o_stream_get_buffer_used_size(parent) >= IO_BLOCK_SIZE)
+                       return 0;
+       }
+
+       return 1;
+}
+
+/* Try to write data to underlying (parent) output. */
+static ssize_t
+wrapper_ostream_writev(struct wrapper_ostream *wostream,
+                      const struct const_iovec *iov, unsigned int iov_count)
+{
+       struct ostream *parent = wostream->output;
+       ssize_t sent;
+
+       i_assert(!wostream->output_closed);
+       i_assert(!wostream->output_finished);
+
+       if (!wrapper_ostream_output_ready(wostream))
+               return 0;
+
+       /* Send more data to parent ostream */
+       i_assert(parent != NULL);
+       o_stream_set_max_buffer_size(parent, IO_BLOCK_SIZE);
+       sent = o_stream_sendv(parent, iov, iov_count);
+       o_stream_set_max_buffer_size(parent, (size_t)-1);
+       if (sent < 0) {
+               wrapper_ostream_handle_parent_error(wostream);
+               return -1;
+       }
+
+       return sent;
+}
+
+/* Try to write data to underlying (parent) output and implement blocking
+   behavior by running an ioloop. */
+static ssize_t
+wrapper_ostream_writev_full(struct wrapper_ostream *wostream,
+                           const struct const_iovec *iov,
+                           unsigned int iov_count)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       unsigned int i;
+       ssize_t sent, sent_total;
+
+       if (!stream->ostream.blocking) {
+               /* Not blocking; send what we can */
+               return wrapper_ostream_writev(wostream, iov, iov_count);
+       }
+
+       /* Blocking; loop and wait until all is sent */
+
+       i = 0;
+       sent_total = 0;
+       for (;;) {
+               struct const_iovec niov;
+               size_t iov_pos;
+
+               i_assert(iov_count > 0);
+
+               /* Send iovec with complete entries */
+               sent = wrapper_ostream_writev(wostream, iov, iov_count);
+               if (sent < 0)
+                       return -1;
+               if (sent == 0) {
+                       if (wrapper_ostream_flush_wait(wostream) < 0)
+                               return -1;
+                       i_assert(!wostream->output_closed);
+                       continue;
+               }
+
+               /* Determine what was sent */
+               sent_total += sent;
+               iov_pos = (size_t)sent;
+               for (i = 0; i < iov_count && iov_pos >= iov[i].iov_len; i++)
+                       iov_pos -= iov[i].iov_len;
+               if (i >= iov_count) {
+                       /* All sent */
+                       i_assert(iov_pos == 0);
+                       return sent_total;
+               }
+
+               iov = &iov[i];
+               iov_count -= i;
+               if (iov_pos == 0) {
+                       /* Nicely sent until an iovec boundary */
+                       continue;
+               }
+
+               /* Send partial iovec entry */
+               i_zero(&niov);
+               niov = iov[0];
+               i_assert(iov_pos < niov.iov_len);
+               niov.iov_base = CONST_PTR_OFFSET(niov.iov_base, iov_pos);
+               niov.iov_len -= iov_pos;
+
+               while (niov.iov_len > 0) {
+                       sent = wrapper_ostream_writev(wostream, &niov, 1);
+                       if (sent < 0)
+                               return sent;
+                       if (sent == 0) {
+                               if (wrapper_ostream_flush_wait(wostream) < 0)
+                                       return -1;
+                               i_assert(!wostream->output_closed);
+                               continue;
+                       }
+                       i_assert((size_t)sent <= niov.iov_len);
+                       niov.iov_base = CONST_PTR_OFFSET(niov.iov_base, sent);
+                       niov.iov_len -= sent;
+                       sent_total += sent;
+               }
+
+               if (iov_count == 1) {
+                       i_assert(sent_total != 0);
+                       return sent_total;
+               }
+
+               /* Now sent until an iovec boundary */
+               iov = &iov[1];
+               iov_count--;
+       }
+
+       i_unreached();
+}
+
+/* Try to flush wrapper stream's buffer content. */
+static int wrapper_ostream_flush_buffer(struct wrapper_ostream *wostream)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       buffer_t *buffer = wostream->buffer;
+       struct const_iovec iov;
+       ssize_t sent;
+
+       if (wostream->output_closed) {
+               /* Ostream already finished */
+               i_assert(wostream->ostream.finished);
+               return 1;
+       }
+
+       if (buffer == NULL || buffer->used == 0) {
+               /* Buffer already empty */
+               return 1;
+       }
+
+       do {
+               /* Try to flush whole buffer */
+               iov.iov_base = buffer->data;
+               iov.iov_len = buffer->used;
+               sent = wrapper_ostream_writev_full(wostream, &iov, 1);
+               if (sent < 0)
+                       return -1;
+
+               /* Remove sent data from buffer */
+               buffer_delete(buffer, 0, sent);
+
+               /* More aggressively flush the buffer when this stream is
+                  finished
+                */
+       } while (wostream->ostream.finished && sent > 0 && buffer->used > 0);
+
+       if (buffer->used == 0 ||
+           (stream->corked && !wrapper_ostream_is_filled(wostream)))
+               wrapper_ostream_output_halt(wostream);
+
+       return (buffer->used == 0 ? 1 : 0);
+}
+
+static int wrapper_ostream_flush_real(struct wrapper_ostream *wostream)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       int ret;
+
+       if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+               /* Stream already hit an error */
+               return -1;
+       }
+       wrapper_ostream_output_start(wostream);
+
+       if ((ret = wrapper_ostream_flush_parent(wostream)) <= 0) {
+               /* Try to flush parent stream first to make room for more
+                  data */
+               return ret;
+       }
+       if ((ret = wrapper_ostream_flush_buffer(wostream)) <= 0) {
+               /* Try sending data we already buffered */
+               return ret;
+       }
+
+       if (wostream->output_closed || wostream->output_finished) {
+               /* Already finished the ostream */
+               i_assert(stream->finished);
+               return 1;
+       }
+
+       if (!wrapper_ostream_output_ready(wostream)) {
+               return ((wostream->buffer == NULL ||
+                        wostream->buffer->used == 0) ? 1 : 0);
+       }
+
+       if (wostream->output == NULL) {
+               i_assert(wrapper_ostream_is_empty(wostream));
+               ret = 1;
+       } else {
+               ret = o_stream_flush(wostream->output);
+               if (ret < 0)
+                       wrapper_ostream_handle_parent_error(wostream);
+       }
+
+       return ret;
+}
+
+static bool
+wrapper_ostream_send_prepare(struct wrapper_ostream *wostream, size_t size)
+{
+       struct ostream_private *stream = &wostream->ostream;
+
+       if (wostream->output_closed || wostream->output_started)
+               return TRUE;
+
+       if (stream->corked && !stream->finished) {
+               if (wostream->buffer == NULL)
+                       return FALSE;
+               if ((wostream->buffer->used + size) < stream->max_buffer_size)
+                       return FALSE;
+       }
+       wrapper_ostream_output_start(wostream);
+       return TRUE;
+}
+
+/* Add data to the wrapper stream's internal buffer. */
+static size_t
+wrapper_ostream_add(struct wrapper_ostream *wostream,
+                   const struct const_iovec *iov,
+                   unsigned int iov_count, unsigned int *iov_idx,
+                   size_t *iov_idx_pos)
+{
+       buffer_t *buffer = wostream->buffer;
+       unsigned int i;
+       size_t added = 0;
+
+       /* Create buffer */
+       if (buffer == NULL) {
+               wostream->buffer = buffer =
+                       buffer_create_dynamic(default_pool, IO_BLOCK_SIZE);
+       }
+
+       for (i = *iov_idx; i < iov_count; i++) {
+               size_t iov_len, iov_add, space;
+               const unsigned char *iov_data;
+
+               iov_len = iov[i].iov_len;
+               iov_data = iov[i].iov_base;
+               space = wostream->ostream.max_buffer_size - buffer->used;
+
+               i_assert(*iov_idx_pos < iov_len);
+               if (*iov_idx_pos > 0) {
+                       iov_len -= *iov_idx_pos;
+                       iov_data += *iov_idx_pos;
+               }
+               iov_add = I_MIN(space, iov_len);
+               buffer_append(buffer, iov_data, iov_add);
+               added += iov_add;
+               if (iov_add < iov_len) {
+                       /* Buffer is full */
+                       *iov_idx_pos += iov_add;
+                       break;
+               }
+               *iov_idx_pos = 0;
+       }
+
+       *iov_idx = i;
+       return added;
+}
+
+static ssize_t
+wrapper_ostream_sendv_real(struct wrapper_ostream *wostream,
+                          const struct const_iovec *iov,
+                          unsigned int iov_count)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       ssize_t written;
+       size_t size, iov_pos, sent;
+       unsigned int i;
+       int ret;
+
+       if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+               /* Stream already hit an error */
+               return -1;
+       }
+
+       i_assert(!wostream->output_closed);
+       i_assert(!wostream->output_finished);
+
+       /* Determine total size of data to send */
+       size = 0;
+       for (i = 0, size = 0; i < iov_count; i++)
+               size += iov[i].iov_len;
+
+       /* Flush buffer if required */
+       ret = 1;
+       if (!wrapper_ostream_is_empty(wostream) &&
+           (!stream->corked || wrapper_ostream_is_filled(wostream)) &&
+           wrapper_ostream_send_prepare(wostream, size) &&
+           (ret = wrapper_ostream_flush_buffer(wostream)) < 0)
+               return -1;
+
+       if (!stream->corked && wrapper_ostream_is_full(wostream)) {
+               /* No space in buffer for more data */
+               i_assert(!stream->ostream.blocking);
+               return 0;
+       }
+
+       /* Send data to connection directly if possible */
+       i = 0;
+       sent = iov_pos = 0;
+       if (wrapper_ostream_is_empty(wostream) &&
+           (!stream->corked ||
+            size >= wrapper_ostream_optimal_size(wostream)) &&
+           wrapper_ostream_send_prepare(wostream, size)) {
+               written = wrapper_ostream_writev_full(wostream, iov, iov_count);
+               if (written < 0)
+                       return -1;
+               sent += written;
+               if (sent == size) {
+                       /* All sent */
+                       return (ssize_t)sent;
+               }
+
+               i_assert(!stream->ostream.blocking);
+
+               /* Determine send position */
+               iov_pos = sent;
+               for (; i < iov_count && iov_pos >= iov[i].iov_len; i++)
+                       iov_pos -= iov[i].iov_len;
+               i_assert(i < iov_count);
+       }
+
+       /* Fill buffer with remainder that was not sent directly */
+       for (;;) {
+               sent += wrapper_ostream_add(wostream, iov, iov_count,
+                                           &i, &iov_pos);
+               i_assert(sent <= size);
+
+               if (!stream->corked || !wrapper_ostream_is_filled(wostream))
+                       break;
+
+               /* Flush corked full buffer */
+               wrapper_ostream_output_start(wostream);
+               if ((ret = wrapper_ostream_flush_buffer(wostream)) < 0)
+                       return -1;
+               if (ret == 0)
+                       break;
+       }
+
+       i_assert(!stream->ostream.blocking || sent == size);
+       return sent;
+}
+
+/* Run the flush callback for the wrapper stream. */
+static int wrapper_ostream_callback(struct wrapper_ostream *wostream)
+{
+       int ret;
+
+       if (wostream->ostream.callback != NULL) {
+               if (wostream->callback_pre != NULL)
+                       wostream->callback_pre(wostream);
+               ret = wostream->ostream.callback(wostream->ostream.context);
+               if (wostream->callback_post != NULL)
+                       wostream->callback_post(wostream);
+       } else {
+               ret = wrapper_ostream_flush(&wostream->ostream);
+       }
+       return ret;
+}
+
+/* Handle an event by running wrapper_ostream_continue(). This called from
+   ioloop on a zero timeout. */
+static void wrapper_ostream_handle_event(struct wrapper_ostream *wostream)
+{
+       timeout_remove(&wostream->to_event);
+       (void)wrapper_ostream_continue(wostream);
+}
+
+/*
+ * iostream methods
+ */
+
+static void
+wrapper_ostream_close(struct iostream_private *stream,
+                     bool close_parent ATTR_UNUSED)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+
+       timeout_remove(&wostream->to_event);
+       wrapper_ostream_output_close(wostream);
+       if (wostream->close != NULL)
+               wostream->close(wostream);
+}
+
+static void wrapper_ostream_destroy(struct iostream_private *stream)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+
+       timeout_remove(&wostream->to_event);
+       i_free(wostream->pending_error);
+
+       if (wostream->destroy != NULL)
+               wostream->destroy(wostream);
+       buffer_free(&wostream->buffer);
+       o_stream_unref(&wostream->output);
+       event_unref(&wostream->event);
+}
+
+/*
+ * ostream methods
+ */
+
+static void wrapper_ostream_cork(struct ostream_private *stream, bool set)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+       int ret;
+
+       if (stream->ostream.closed || wostream->pending_errno != 0)
+               return;
+
+       if (wostream->output_closed) {
+               i_assert(wostream->ostream.finished);
+               return;
+       }
+
+       if (set) {
+               if (wostream->output != NULL)
+                       o_stream_cork(wostream->output);
+       } else {
+               /* Buffer flushing might close the stream */
+               ret = wrapper_ostream_flush_buffer(wostream);
+               stream->last_errors_not_checked = TRUE;
+
+               if (wostream->output != NULL) {
+                       if (o_stream_uncork_flush(wostream->output) < 0) {
+                               wrapper_ostream_handle_parent_error(wostream);
+                               ret = -1;
+                       }
+               }
+               if ((ret == 0 || wostream->flush_pending) &&
+                   !stream->ostream.closed)
+                       wrapper_ostream_output_resume(wostream);
+       }
+       stream->corked = set;
+
+       wrapper_ostream_output_manage(wostream, FALSE);
+}
+
+static ssize_t
+wrapper_ostream_sendv(struct ostream_private *stream,
+                     const struct const_iovec *iov, unsigned int iov_count)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+       bool must_uncork = FALSE;
+       ssize_t sret;
+
+       if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+               /* Stream already hit an error */
+               return -1;
+       }
+
+       /* Cork parent ostream if necessary */
+       if (!wostream->output_closed && wostream->output != NULL &&
+           !o_stream_is_corked(wostream->output)) {
+               o_stream_cork(wostream->output);
+               must_uncork = TRUE;
+       }
+
+       sret = wrapper_ostream_sendv_real(wostream, iov, iov_count);
+       if (sret > 0)
+               stream->ostream.offset += (ssize_t)sret;
+
+       /* Uncork the parent ostream */
+       if (must_uncork && !wostream->output_closed &&
+           wostream->output != NULL) {
+               if (o_stream_uncork_flush(wostream->output) < 0 &&
+                   sret >= 0) {
+                       wrapper_ostream_handle_parent_error(wostream);
+                       sret = -1;
+               }
+       }
+
+       if (sret >= 0) {
+               wrapper_ostream_output_update_timeouts(wostream);
+               if (!stream->ostream.blocking)
+                       wrapper_ostream_output_manage(wostream, FALSE);
+       }
+
+       return sret;
+}
+
+static int wrapper_ostream_flush(struct ostream_private *stream)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+       struct ostream *ostream = &stream->ostream;
+       bool must_uncork = FALSE;
+       int ret;
+
+       if (wrapper_ostream_handle_pending_error(wostream) < 0) {
+               /* Stream already hit an error */
+               return -1;
+       }
+
+       if (wostream->output_closed) {
+               if (!stream->finished || !wrapper_ostream_is_empty(wostream)) {
+                       stream->ostream.stream_errno = EPIPE;
+                       return -1;
+               }
+               /* Already finished the ostream */
+               return 1;
+       }
+
+       if (wostream->flushing) {
+               /* Prevent recursion while finishing output */
+               return 1;
+       }
+       wostream->flushing = TRUE;
+       o_stream_ref(ostream);
+
+       /* Cork parent ostream if necessary */
+       if (wostream->output != NULL && !o_stream_is_corked(wostream->output)) {
+               o_stream_cork(wostream->output);
+               must_uncork = TRUE;
+       }
+
+       /* If blocking: loop until all is flushed; otherwise try once */
+       do {
+               /* Try to flush */
+               if ((ret = wrapper_ostream_flush_real(wostream)) < 0) {
+                       ret = -1;
+                       break;
+               }
+
+               if (ret == 0 && stream->ostream.blocking) {
+                       /* Block until we can write more */
+                       if (wrapper_ostream_flush_wait(wostream) < 0) {
+                               ret = -1;
+                               break;
+                       }
+               }
+
+               if (stream->ostream.closed) {
+                       /* Ostream was closed in the mean time */
+                       ret = -1;
+                       break;
+               }
+
+               if (wostream->output_closed) {
+                       /* Already finished the ostream */
+                       i_assert(stream->finished);
+                       ret = 1;
+                       break;
+               }
+       } while (ret == 0 && stream->ostream.blocking);
+
+       if (ret > 0 && stream->finished) {
+               /* This was an o_stream_finish() call or subsequent flush */
+               i_assert(wrapper_ostream_is_empty(wostream));
+               while ((ret = wrapper_ostream_finish(wostream)) == 0) {
+                       if (!stream->ostream.blocking) {
+                               /* Not yet finished completely */
+                               break;
+                       }
+                       /* Block until we can write more */
+                       if (wrapper_ostream_flush_wait(wostream) < 0) {
+                               ret = -1;
+                               break;
+                       }
+               }
+       }
+       wrapper_ostream_output_update_timeouts(wostream);
+       wostream->flushing = FALSE;
+
+       if (ret >= 0 && !ostream->blocking)
+               wrapper_ostream_output_manage(wostream, FALSE);
+
+       if (wostream->output_closed) {
+               i_assert(ret < 0 || ostream->stream_errno == 0 ||
+                        ostream->closed);
+               o_stream_unref(&ostream);
+               return (ret >= 0 ? 1 : -1);
+       }
+
+       if (!must_uncork || wostream->output == NULL) {
+               /* Nothing */
+       } else if (ret >= 0) {
+               /* Uncork the parent ostream */
+               if (o_stream_uncork_flush(wostream->output) < 0) {
+                       wrapper_ostream_handle_parent_error(wostream);
+                       ret = -1;
+               }
+       } else {
+               o_stream_uncork(wostream->output);
+       }
+       o_stream_unref(&ostream);
+
+       return ret;
+}
+
+static void
+wrapper_ostream_set_flush_callback(struct ostream_private *stream,
+                                  stream_flush_callback_t *callback,
+                                  void *context)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+
+       stream->callback = callback;
+       stream->context = context;
+
+       if (!stream->ostream.blocking && stream->callback == NULL) {
+               /* Application is currently not interested in flush events and
+                  that includes request events like errors. */
+               timeout_remove(&wostream->to_event);
+       } else if (wostream->pending_error != NULL &&
+                  wostream->to_event == NULL) {
+               /* Schedule flush callback to notify application of events */
+               wostream->to_event = timeout_add_short(
+                       0, wrapper_ostream_handle_event, wostream);
+       }
+}
+
+static void
+wrapper_ostream_flush_pending(struct ostream_private *stream, bool set)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+
+       wostream->flush_pending = set;
+       if (!set)
+               return;
+       if (wostream->output_closed) {
+               i_assert(wostream->ostream.ostream.closed);
+               return;
+       }
+       if (wostream->to_event == NULL) {
+               wostream->to_event = timeout_add_short(
+                       0, wrapper_ostream_handle_event, wostream);
+       }
+}
+
+static size_t
+wrapper_ostream_get_buffer_used_size(const struct ostream_private *stream)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+       size_t size = 0;
+
+       if (wostream->buffer != NULL)
+               size += wostream->buffer->used;
+       if (wostream->output != NULL)
+               size += o_stream_get_buffer_used_size(wostream->output);
+       return size;
+}
+
+static size_t
+wrapper_ostream_get_buffer_avail_size(const struct ostream_private *stream)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+       size_t size = 0;
+
+       if (wostream->ostream.max_buffer_size == (size_t)-1)
+               return (size_t)-1;
+
+       if (wostream->buffer == NULL)
+               size = wostream->ostream.max_buffer_size;
+       else if (wostream->buffer->used < wostream->ostream.max_buffer_size) {
+               size = (wostream->ostream.max_buffer_size -
+                       wostream->buffer->used);
+       }
+
+       if (wostream->output != NULL)
+               size += o_stream_get_buffer_avail_size(wostream->output);
+
+       return size;
+}
+
+static void
+wrapper_ostream_switch_ioloop_to(struct ostream_private *stream,
+                                struct ioloop *ioloop)
+{
+       struct wrapper_ostream *wostream = (struct wrapper_ostream *)stream;
+
+       if (wostream->flush_ioloop != ioloop &&
+           wostream->switch_ioloop_to != NULL)
+               wostream->switch_ioloop_to(wostream, ioloop);
+
+       if (wostream->to_event != NULL) {
+               wostream->to_event =
+                       io_loop_move_timeout_to(ioloop, &wostream->to_event);
+       }
+}
+
+/*
+ * API
+ */
+
+struct ostream *
+wrapper_ostream_create(struct wrapper_ostream *wostream,
+                      size_t max_buffer_size, bool blocking,
+                      struct event *event)
+{
+       wostream->ostream.iostream.close = wrapper_ostream_close;
+       wostream->ostream.iostream.destroy = wrapper_ostream_destroy;
+
+       wostream->ostream.ostream.blocking = blocking;
+       wostream->ostream.max_buffer_size = max_buffer_size;
+       wostream->ostream.cork = wrapper_ostream_cork;
+       wostream->ostream.sendv = wrapper_ostream_sendv;
+       wostream->ostream.flush = wrapper_ostream_flush;
+       wostream->ostream.set_flush_callback =
+               wrapper_ostream_set_flush_callback;
+       wostream->ostream.flush_pending = wrapper_ostream_flush_pending;
+       wostream->ostream.get_buffer_used_size =
+               wrapper_ostream_get_buffer_used_size;
+       wostream->ostream.get_buffer_avail_size =
+               wrapper_ostream_get_buffer_avail_size;
+       wostream->ostream.switch_ioloop_to =
+               wrapper_ostream_switch_ioloop_to;
+
+       wostream->event = event_create(event);
+
+       return o_stream_create(&wostream->ostream, NULL, -1);
+}
+
+void wrapper_ostream_continue(struct wrapper_ostream *wostream)
+{
+       struct ostream_private *stream = &wostream->ostream;
+       struct ostream *ostream = &stream->ostream;
+       struct ioloop *ioloop = NULL;
+       bool use_cork = !stream->corked;
+       int ret;
+
+       if (wostream->flush_waiting) {
+               /* Inside wrapper_ostream_flush_wait() */
+               i_assert(!wostream->output_closed);
+               ioloop = wostream->flush_ioloop;
+       }
+       if (stream->ostream.closed ||
+           (stream->finished && wrapper_ostream_is_empty(wostream) &&
+            wostream->output != NULL &&
+            o_stream_get_buffer_used_size(wostream->output) == 0)) {
+               /* Already finished */
+               if (wrapper_ostream_finish(wostream) == 0)
+                       return;
+       }
+       if (wostream->flush_waiting) {
+               i_assert(ioloop != NULL);
+               io_loop_stop(ioloop);
+               wostream->flush_waiting = FALSE;
+               return;
+       }
+
+       /* Set flush_pending = FALSE first before calling the flush callback,
+          and change it to TRUE only if callback returns 0. That way the
+          callback can call o_stream_set_flush_pending() again and we don't
+          forget it even if flush callback returns 1. */
+       wostream->flush_pending = FALSE;
+
+       o_stream_ref(ostream);
+       wostream->continuing = TRUE;
+       for (;;) {
+               if (use_cork)
+                       o_stream_cork(ostream);
+               ret = wrapper_ostream_callback(wostream);
+               if (use_cork && !wostream->output_closed) {
+                       int fret = o_stream_uncork_flush(ostream);
+                       if (ret == 0 && fret > 0)
+                               continue;
+                       if (fret < 0 && ret >= 0) {
+                               i_assert(ostream->stream_errno != 0);
+                               (void)wrapper_ostream_callback(wostream);
+                               ret = -1;
+                       }
+               }
+               break;
+       }
+       wostream->continuing = FALSE;
+       if (wostream->output_closed)
+               o_stream_close(ostream);
+
+       if (ret == 0)
+               wostream->flush_pending = TRUE;
+
+       if (!stream->ostream.blocking)
+               wrapper_ostream_output_manage(wostream, FALSE);
+
+       o_stream_unref(&ostream);
+}
+
+void wrapper_ostream_trigger_flush(struct wrapper_ostream *wostream)
+{
+       struct ostream *ostream = &wostream->ostream.ostream;
+
+       if (ostream->closed)
+               return;
+       if (wostream->to_event != NULL)
+               return;
+       if (!wostream->flush_waiting && wostream->ostream.callback == NULL)
+               return;
+
+       wostream->to_event = timeout_add_short(
+               0, wrapper_ostream_handle_event, wostream);
+}
+
+bool wrapper_ostream_get_buffered_size(struct wrapper_ostream *wostream,
+                                      uoff_t *size_r)
+{
+       buffer_t *buffer = wostream->buffer;
+
+       if (!wostream->ostream.finished)
+               return FALSE;
+
+       *size_r = (buffer == NULL ? 0 : (uoff_t)buffer->used);
+       i_assert(*size_r == wostream->ostream.ostream.offset);
+       return TRUE;
+}
+
+void wrapper_ostream_output_available(struct wrapper_ostream *wostream,
+                                     struct ostream *output)
+{
+       i_assert(!wostream->output_closed);
+       i_assert(!wostream->output_finished);
+       i_assert(wostream->output == NULL);
+       wostream->output = output;
+       if (output != NULL) {
+               if (wostream->ostream.corked)
+                       o_stream_cork(wostream->output);
+               o_stream_ref(output);
+       }
+}
+
+void wrapper_ostream_output_destroyed(struct wrapper_ostream *wostream)
+{
+       struct ostream *ostream = &wostream->ostream.ostream;
+
+       wrapper_ostream_trigger_flush(wostream);
+       o_stream_set_no_error_handling(ostream, TRUE);
+
+       o_stream_unref(&wostream->output);
+       wostream->output_closed = TRUE;
+       wostream->output_finished = TRUE;
+}
+
+void wrapper_ostream_set_error(struct wrapper_ostream *wostream,
+                              int stream_errno, const char *stream_error)
+{
+       struct ostream *ostream = &wostream->ostream.ostream;
+
+       if (ostream->closed || wostream->pending_errno != 0 ||
+           wostream->returned_error)
+               return;
+
+       i_assert(wostream->pending_error == NULL);
+       wostream->pending_errno = stream_errno;
+       wostream->pending_error = i_strdup(stream_error);
+
+       wrapper_ostream_trigger_flush(wostream);
+}
+
+void wrapper_ostream_notify_error(struct wrapper_ostream *wostream)
+{
+       struct ostream *ostream = &wostream->ostream.ostream;
+
+       if (ostream->closed || ostream->blocking ||
+           wostream->output_closed_api || wostream->returned_error ||
+           wostream->continuing)
+               return;
+       if (wostream->pending_errno == 0)
+               return;
+       wostream->returned_error = TRUE;
+       (void)wrapper_ostream_callback(wostream);
+}
diff --git a/src/lib/ostream-wrapper.h b/src/lib/ostream-wrapper.h
new file mode 100644 (file)
index 0000000..22842c0
--- /dev/null
@@ -0,0 +1,164 @@
+#ifndef OSTREAM_WRAPPER_H
+#define OSTREAM_WRAPPER_H
+
+#include "ostream-private.h"
+
+/* The wrapper output stream allows turning any form* of activity involving data
+   output into a standard Dovecot output stream. The wrapper output stream can
+   operate both in blocking and non-blocking mode. When the wrapped activity is
+   non-blocking, a blocking wrapper output stream will implicitly run its own
+   ioloop.
+
+   It is possible to have the wrapper output stream object available even before
+   the data can be written anywhere, even before any form of output object (a
+   connection) exists. In that case, any data written to the wrapper stream is
+   buffered until the buffer is full. Once that happens, the stream will block
+   or refuse writes until the underlying output becomes available.
+
+   The wrapper output stream is not meant to be used directly. Instead, it is
+   to be used as part of the implementation of an application-specific output
+   stream. The wrapper output stream serves as the means to prevent code
+   duplication between similar output stream implementations. It defines several
+   methods that need to be implemented by the application-specific output
+   stream.
+
+   * Currently, the wrapper stream still expects an output stream object when
+     data is to be written somewhere, but that should be easily circumvented
+     once such behavior is needed (FIXME).
+ */
+
+struct wrapper_ostream {
+       struct ostream_private ostream;
+       struct event *event;
+
+       /* Called when the implementation should start making the parent output
+          stream available, e.g. connect to the server. This happens when data
+          was written to the wrapper ostream (when it is corked this only
+          happens when the wrapper ostream buffer is full or the wrapper
+          ostream is finished). */
+       void (*output_start)(struct wrapper_ostream *wostream);
+       /* Returns TRUE when the output is ready for data. */
+       bool (*output_ready)(struct wrapper_ostream *wostream);
+       /* Called when an error occurred while writing to the output stream. */
+       void (*output_error)(struct wrapper_ostream *wostream);
+       /* Called when the wrapper ostream was finished using o_stream_finish()
+          and the wrapper ostream buffer is empty. Also, the parent output
+          was flushed successfully. */
+       int (*output_finish)(struct wrapper_ostream *wostream);
+       /* Called when the wrapper ostream does not need write to parent output
+          stream. This is will e.g. drop the parent output's flush callback or
+          equivalent notification mechanism. */
+       void (*output_halt)(struct wrapper_ostream *wostream);
+       /* Called when the wrapper ostream has data available for the parent
+          output and wants wrapper_ostream_continue() to be called when the
+          parent stream is writeable. */
+       void (*output_resume)(struct wrapper_ostream *wostream);
+       /* Update the timeouts. The sender_blocking parameter indicates which
+          side of the data transfer is blocking, so whether a timeout needs to
+          be set for limiting the time other side is not doing anything. */
+       void (*output_update_timeouts)(struct wrapper_ostream *wostream,
+                                      bool sender_blocking);
+
+       /* Called before and after running ioloop for performing blocking I/O
+          wait. Use these vfuncs to switch to and from the temporary ioloop. */
+       struct ioloop *(*wait_begin)(struct wrapper_ostream *wostream,
+                                    struct ioloop *ioloop);
+       void (*wait_end)(struct wrapper_ostream *wostream,
+                        struct ioloop *prev_ioloop);
+
+       /* Called before and after running the flush callback for the ostream.
+         */
+       void (*callback_pre)(struct wrapper_ostream *wostream);
+       void (*callback_post)(struct wrapper_ostream *wostream);
+
+       /* Called when the ostream is switched to a different ioloop. */
+       void (*switch_ioloop_to)(struct wrapper_ostream *wostream,
+                                struct ioloop *ioloop);
+
+       /* Called when the wrapper ostream is forcibly closed using
+          o_stream_close() (or indirectly through e.g. o_stream_destroy()). */
+       void (*close)(struct wrapper_ostream *wostream);
+       /* Called when the ostream is destroyed. */
+       void (*destroy)(struct wrapper_ostream *wostream);
+
+       buffer_t *buffer; // FIXME: use a ringbuffer instead (file_ostream)
+
+       /* The (parent) output stream. */
+       struct ostream *output;
+
+       /* The ioloop used while flushing/sending output for when the wrapper
+          ostream is blocking. */
+       struct ioloop *flush_ioloop;
+
+       /* Error set using wrapper_ostream_return_error(). This is returned to
+          the application once it continues using the wrapper ostream. */
+       char *pending_error;
+       int pending_errno;
+
+       /* Timeout for delayed execution of wrapper_ostream_continue(). */
+       struct timeout *to_event;
+
+       /* Output was started (output_start() vfunc was called). */
+       bool output_started:1;
+       /* Output was finished (output_finish() vfunc was called). */
+       bool output_finished:1;
+       /* Output was was closed somehow. This means that the output is no
+          longer available. This is not the same as the ostream close flag. */
+       bool output_closed:1;
+       /* Output was closed directly or indirectly by the application action.
+        */
+       bool output_closed_api:1;
+
+       bool flush_pending:1;
+       bool flush_waiting:1;
+       bool flushing:1;
+       bool continuing:1;
+       bool returned_error:1;
+};
+
+/* Create the wrapper output stream. This function calls o_stream_create()
+   internally. The initial maximum buffer size is set to max_buffer_size. When
+   blocking is TRUE, a blocking output stream will be created. The provided
+   event is used internally for debug logging. */
+struct ostream *
+wrapper_ostream_create(struct wrapper_ostream *wostream,
+                      size_t max_buffer_size, bool blocking,
+                      struct event *event) ATTR_NULL(4);
+
+/* Continue sending output. */
+void wrapper_ostream_continue(struct wrapper_ostream *wostream);
+/* Trigger an (asynchronous) flush on the output stream. */
+void wrapper_ostream_trigger_flush(struct wrapper_ostream *wostream);
+
+/* This function returns the size of the data buffered in the wrapper stream,
+   but only when the output stream is finished using o_stream_finish(). When the
+   output stream is finished, the data is complete and this function returns
+   TRUE and size_r is set to the size. If it is not complete, this function
+   returns FALSE and size_r is not assigned. This function is meant to be called
+   just before sending the first block of data internally for deciding between
+   sending the data using a chunked transfer encoding or, when it is already
+   complete, as a single blob with known size. E.g., for HTTP this is the choice
+   between sending the message using the Transfer-Encoding: chunked header or
+   the Content-Length header. */
+bool wrapper_ostream_get_buffered_size(struct wrapper_ostream *wostream,
+                                      uoff_t *size_r);
+
+/* Call this when the underlying output stream first becomes available. */
+void wrapper_ostream_output_available(struct wrapper_ostream *wostream,
+                                     struct ostream *output);
+/* Call this to notify the wrapper that the underlying output is destroyed and
+   no more data can be written ever. */
+void wrapper_ostream_output_destroyed(struct wrapper_ostream *wostream);
+
+/* Call this to notify the wrapper that an error has occurred. It will be
+   returned as such for the next stream write/flush and subsequent
+   o_stream_get_error(). */
+void wrapper_ostream_set_error(struct wrapper_ostream *wostream,
+                              int stream_errno, const char *stream_error);
+/* Notify the application immediately about any error condition set earlier
+   using wrapper_ostream_set_error() by calling the ostream flush callback
+   right now.
+ */
+void wrapper_ostream_notify_error(struct wrapper_ostream *wostream);
+
+#endif