]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
Simplified creating filter ostreams.
authorTimo Sirainen <tss@iki.fi>
Wed, 21 Sep 2011 11:40:35 +0000 (14:40 +0300)
committerTimo Sirainen <tss@iki.fi>
Wed, 21 Sep 2011 11:40:35 +0000 (14:40 +0300)
src/lib-fs/ostream-cmp.c
src/lib-ssl-iostream/ostream-openssl.c
src/lib/ostream-buffer.c
src/lib/ostream-file.c
src/lib/ostream-private.h
src/lib/ostream.c
src/plugins/zlib/ostream-bzlib.c
src/plugins/zlib/ostream-zlib.c

index 70dc23d510b84018caa644e99f9b6037a6874c94..c54488186835315fb996865c7e11ac59ebced13c 100644 (file)
@@ -9,41 +9,18 @@ struct cmp_ostream {
        struct ostream_private ostream;
 
        struct istream *input;
-       struct ostream *output;
        bool equals;
 };
 
-static void cstream_copy_error(struct cmp_ostream *cstream)
-{
-       struct ostream *src = cstream->output;
-       struct ostream *dest = &cstream->ostream.ostream;
-
-       dest->stream_errno = src->stream_errno;
-       dest->last_failed_errno = src->last_failed_errno;
-       dest->overflow = src->overflow;
-}
-
 static void o_stream_cmp_close(struct iostream_private *stream)
 {
        struct cmp_ostream *cstream = (struct cmp_ostream *)stream;
 
-       if (cstream->output == NULL)
+       if (cstream->input == NULL)
                return;
 
        i_stream_unref(&cstream->input);
        o_stream_flush(&cstream->ostream.ostream);
-       o_stream_unref(&cstream->output);
-}
-
-static int o_stream_cmp_flush(struct ostream_private *stream)
-{
-       struct cmp_ostream *cstream = (struct cmp_ostream *)stream;
-       int ret;
-
-       ret = o_stream_flush(cstream->output);
-       if (ret < 0)
-               cstream_copy_error(cstream);
-       return ret;
 }
 
 bool stream_cmp_block(struct istream *input,
@@ -82,8 +59,8 @@ o_stream_cmp_sendv(struct ostream_private *stream,
                }
        }
 
-       if ((ret = o_stream_sendv(cstream->output, iov, iov_count)) < 0) {
-               cstream_copy_error(cstream);
+       if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) {
+               o_stream_copy_error_from_parent(stream);
                return -1;
        }
 
@@ -98,15 +75,12 @@ o_stream_create_cmp(struct ostream *output, struct istream *input)
 
        cstream = i_new(struct cmp_ostream, 1);
        cstream->ostream.sendv = o_stream_cmp_sendv;
-       cstream->ostream.flush = o_stream_cmp_flush;
        cstream->ostream.iostream.close = o_stream_cmp_close;
        cstream->input = input;
-       cstream->output = output;
        cstream->equals = TRUE;
        i_stream_ref(input);
-       o_stream_ref(output);
 
-       return o_stream_create(&cstream->ostream);
+       return o_stream_create(&cstream->ostream, output);
 }
 
 bool o_stream_cmp_equals(struct ostream *_output)
index 2ebf05cea5cfb56c8467ce4592d7f41cc9d1ed53..8a1904a984ef4b71468af6988cd9b00d216f18e5 100644 (file)
@@ -253,5 +253,5 @@ struct ostream *o_stream_create_ssl(struct ssl_iostream *ssl_io)
        o_stream_set_flush_callback(ssl_io->plain_output,
                                    plain_flush_callback, sstream);
 
-       return o_stream_create(&sstream->ostream);
+       return o_stream_create(&sstream->ostream, NULL);
 }
index 7acb7f75add0ab40789bf3ffc567c756d8ef69de..2dd6a119c73e25fb8444e615474128cd46261101 100644 (file)
@@ -59,5 +59,5 @@ struct ostream *o_stream_create_buffer(buffer_t *buf)
        bstream->ostream.write_at = o_stream_buffer_write_at;
 
        bstream->buf = buf;
-       return o_stream_create(&bstream->ostream);
+       return o_stream_create(&bstream->ostream, NULL);
 }
index 36fb53f701d42e9d5d63e1f4ccf5d8e4b7d34f68..0703a3a788c8f33c8069392f485c4144771bf32b 100644 (file)
@@ -933,7 +933,7 @@ o_stream_create_fd(int fd, size_t max_buffer_size, bool autoclose_fd)
 
        fstream = o_stream_create_fd_common(fd, autoclose_fd);
        fstream->ostream.max_buffer_size = max_buffer_size;
-       ostream = o_stream_create(&fstream->ostream);
+       ostream = o_stream_create(&fstream->ostream, NULL);
 
        offset = lseek(fd, 0, SEEK_CUR);
        if (offset >= 0) {
@@ -969,7 +969,7 @@ o_stream_create_fd_file(int fd, uoff_t offset, bool autoclose_fd)
        fstream->real_offset = offset;
        fstream->buffer_offset = offset;
 
-       ostream = o_stream_create(&fstream->ostream);
+       ostream = o_stream_create(&fstream->ostream, NULL);
        ostream->offset = offset;
        return ostream;
 }
index e86a060ddd34f9fc60bed077017f5ad5381f3517..5b8e7152b12579b6972e5a0ca3b180b7214c2f26 100644 (file)
@@ -11,6 +11,9 @@ struct ostream_private {
 /* methods: */
        void (*cork)(struct ostream_private *stream, bool set);
        int (*flush)(struct ostream_private *stream);
+       void (*set_flush_callback)(struct ostream_private *stream,
+                                  stream_flush_callback_t *callback,
+                                  void *context);
        void (*flush_pending)(struct ostream_private *stream, bool set);
        size_t (*get_used_size)(const struct ostream_private *stream);
        int (*seek)(struct ostream_private *stream, uoff_t offset);
@@ -27,15 +30,20 @@ struct ostream_private {
        struct ostream ostream;
        size_t max_buffer_size;
 
+       struct ostream *parent; /* for filter streams */
+
        stream_flush_callback_t *callback;
        void *context;
 
        unsigned int corked:1;
 };
 
-struct ostream *o_stream_create(struct ostream_private *_stream);
+struct ostream *
+o_stream_create(struct ostream_private *_stream, struct ostream *parent);
 
 off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
                     size_t block_size);
 
+void o_stream_copy_error_from_parent(struct ostream_private *_stream);
+
 #endif
index 48e5df6a568b194272a0557293562ac2450d5aa0..bd0aa8d806fa777de22b0a73b8ffe361e7474ee2 100644 (file)
@@ -12,8 +12,12 @@ void o_stream_set_name(struct ostream *stream, const char *name)
 
 const char *o_stream_get_name(struct ostream *stream)
 {
-       return stream->real_stream->iostream.name == NULL ? "" :
-               stream->real_stream->iostream.name;
+       while (stream->real_stream->iostream.name == NULL) {
+               stream = stream->real_stream->parent;
+               if (stream == NULL)
+                       return "";
+       }
+       return stream->real_stream->iostream.name;
 }
 
 void o_stream_destroy(struct ostream **stream)
@@ -46,26 +50,17 @@ void o_stream_set_flush_callback(struct ostream *stream,
 {
        struct ostream_private *_stream = stream->real_stream;
 
-       _stream->callback = callback;
-       _stream->context = context;
+       _stream->set_flush_callback(_stream, callback, context);
 }
 
 void o_stream_unset_flush_callback(struct ostream *stream)
 {
-       struct ostream_private *_stream = stream->real_stream;
-
-       _stream->callback = NULL;
-       _stream->context = NULL;
+       o_stream_set_flush_callback(stream, NULL, NULL);
 }
 
 void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
 {
-       if (stream->real_stream->iostream.set_max_buffer_size != NULL) {
-               io_stream_set_max_buffer_size(&stream->real_stream->iostream,
-                                             max_size);
-       } else {
-               stream->real_stream->max_buffer_size = max_size;
-       }
+       io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
 }
 
 void o_stream_cork(struct ostream *stream)
@@ -75,10 +70,7 @@ void o_stream_cork(struct ostream *stream)
        if (unlikely(stream->closed))
                return;
 
-       if (_stream->cork != NULL)
-               _stream->cork(_stream, TRUE);
-       else
-               _stream->corked = TRUE;
+       _stream->cork(_stream, TRUE);
 }
 
 void o_stream_uncork(struct ostream *stream)
@@ -88,12 +80,7 @@ void o_stream_uncork(struct ostream *stream)
        if (unlikely(stream->closed))
                return;
 
-       if (_stream->cork != NULL)
-               _stream->cork(_stream, FALSE);
-       else {
-               _stream->corked = FALSE;
-               (void)o_stream_flush(stream);
-       }
+       _stream->cork(_stream, FALSE);
 }
 
 int o_stream_flush(struct ostream *stream)
@@ -105,11 +92,9 @@ int o_stream_flush(struct ostream *stream)
                return -1;
 
        stream->stream_errno = 0;
-       if (_stream->flush != NULL) {
-               if (unlikely((ret = _stream->flush(_stream)) < 0)) {
-                       i_assert(stream->stream_errno != 0);
-                       stream->last_failed_errno = stream->stream_errno;
-               }
+       if (unlikely((ret = _stream->flush(_stream)) < 0)) {
+               i_assert(stream->stream_errno != 0);
+               stream->last_failed_errno = stream->stream_errno;
        }
        return ret;
 }
@@ -121,16 +106,14 @@ void o_stream_set_flush_pending(struct ostream *stream, bool set)
        if (unlikely(stream->closed))
                return;
 
-       if (_stream->flush_pending != NULL)
-               _stream->flush_pending(_stream, set);
+       _stream->flush_pending(_stream, set);
 }
 
 size_t o_stream_get_buffer_used_size(const struct ostream *stream)
 {
        const struct ostream_private *_stream = stream->real_stream;
 
-       return _stream->get_used_size == NULL ? 0 :
-               _stream->get_used_size(_stream);
+       return _stream->get_used_size(_stream);
 }
 
 size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
@@ -149,14 +132,9 @@ int o_stream_seek(struct ostream *stream, uoff_t offset)
                return -1;
 
        stream->stream_errno = 0;
-       if (_stream->seek != NULL) {
-               if (unlikely(_stream->seek(_stream, offset) < 0)) {
-                       i_assert(stream->stream_errno != 0);
-                       stream->last_failed_errno = stream->stream_errno;
-               }
-       } else {
-               stream->stream_errno = EPIPE;
-               stream->last_failed_errno = EPIPE;
+       if (unlikely(_stream->seek(_stream, offset) < 0)) {
+               i_assert(stream->stream_errno != 0);
+               stream->last_failed_errno = stream->stream_errno;
                return -1;
        }
        return 1;
@@ -230,11 +208,6 @@ int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
        if (unlikely(stream->closed))
                return -1;
 
-       if (stream->real_stream->write_at == NULL) {
-               /* stream doesn't support seeking */
-               stream->stream_errno = EPIPE;
-               return -1;
-       }
        ret = stream->real_stream->write_at(stream->real_stream,
                                            data, size, offset);
        if (unlikely(ret < 0)) {
@@ -244,22 +217,6 @@ int o_stream_pwrite(struct ostream *stream, const void *data, size_t size,
        return ret;
 }
 
-static off_t o_stream_default_send_istream(struct ostream_private *outstream,
-                                          struct istream *instream)
-{
-       return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
-}
-
-struct ostream *o_stream_create(struct ostream_private *_stream)
-{
-       _stream->ostream.real_stream = _stream;
-       if (_stream->send_istream == NULL)
-               _stream->send_istream = o_stream_default_send_istream;
-
-       io_stream_init(&_stream->iostream);
-       return &_stream->ostream;
-}
-
 off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
                     size_t block_size)
 {
@@ -297,6 +254,169 @@ void o_stream_switch_ioloop(struct ostream *stream)
 {
        struct ostream_private *_stream = stream->real_stream;
 
-       if (_stream->switch_ioloop != NULL)
-               _stream->switch_ioloop(_stream);
+       _stream->switch_ioloop(_stream);
+}
+
+static void o_stream_default_close(struct iostream_private *stream)
+{
+       struct ostream_private *_stream = (struct ostream_private *)stream;
+
+       (void)o_stream_flush(&_stream->ostream);
+}
+
+static void o_stream_default_destroy(struct iostream_private *stream)
+{
+       struct ostream_private *_stream = (struct ostream_private *)stream;
+
+       if (_stream->parent != NULL)
+               o_stream_unref(&_stream->parent);
+}
+
+static void
+o_stream_default_set_max_buffer_size(struct iostream_private *stream,
+                                    size_t max_size)
+{
+       struct ostream_private *_stream = (struct ostream_private *)stream;
+
+       if (_stream->parent != NULL)
+               o_stream_set_max_buffer_size(_stream->parent, max_size);
+       _stream->max_buffer_size = max_size;
+}
+
+static void o_stream_default_cork(struct ostream_private *_stream, bool set)
+{
+       _stream->corked = set;
+       if (set) {
+               if (_stream->parent != NULL)
+                       o_stream_cork(_stream->parent);
+       } else {
+               (void)o_stream_flush(&_stream->ostream);
+               if (_stream->parent != NULL)
+                       o_stream_uncork(_stream->parent);
+       }
+}
+
+void o_stream_copy_error_from_parent(struct ostream_private *_stream)
+{
+       struct ostream *src = _stream->parent;
+       struct ostream *dest = &_stream->ostream;
+
+       dest->stream_errno = src->stream_errno;
+       dest->last_failed_errno = src->last_failed_errno;
+       dest->overflow = src->overflow;
+}
+
+static int o_stream_default_flush(struct ostream_private *_stream)
+{
+       int ret;
+
+       if (_stream->parent == NULL)
+               return 1;
+
+       if ((ret = o_stream_flush(_stream->parent)) < 0)
+               o_stream_copy_error_from_parent(_stream);
+       return ret;
+}
+
+static void
+o_stream_default_set_flush_callback(struct ostream_private *_stream,
+                                   stream_flush_callback_t *callback,
+                                   void *context)
+{
+       if (_stream->parent == NULL) {
+               _stream->callback = callback;
+               _stream->context = context;
+       } else {
+               /* this is a filter stream, we don't have a flush
+                  callback ourself */
+               o_stream_set_flush_callback(_stream->parent, callback, context);
+       }
+}
+
+static void
+o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
+{
+       if (_stream->parent != NULL)
+               o_stream_set_flush_pending(_stream->parent, set);
+}
+
+static size_t
+o_stream_default_get_used_size(const struct ostream_private *_stream)
+{
+       if (_stream->parent == NULL)
+               return 0;
+       else
+               return o_stream_get_buffer_used_size(_stream->parent);
+}
+
+static int
+o_stream_default_seek(struct ostream_private *_stream,
+                     uoff_t offset ATTR_UNUSED)
+{
+       _stream->ostream.stream_errno = EPIPE;
+       return -1;
+}
+
+static int
+o_stream_default_write_at(struct ostream_private *_stream,
+                         const void *data ATTR_UNUSED,
+                         size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
+{
+       _stream->ostream.stream_errno = EPIPE;
+       return -1;
+}
+
+static off_t o_stream_default_send_istream(struct ostream_private *outstream,
+                                          struct istream *instream)
+{
+       return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
+}
+
+static void o_stream_default_switch_ioloop(struct ostream_private *_stream)
+{
+       if (_stream->parent != NULL)
+               o_stream_switch_ioloop(_stream->parent);
+}
+
+struct ostream *
+o_stream_create(struct ostream_private *_stream, struct ostream *parent)
+{
+       _stream->ostream.real_stream = _stream;
+       if (parent != NULL) {
+               _stream->parent = parent;
+               o_stream_ref(parent);
+       }
+
+       if (_stream->iostream.close == NULL)
+               _stream->iostream.close = o_stream_default_close;
+       if (_stream->iostream.destroy == NULL)
+               _stream->iostream.destroy = o_stream_default_destroy;
+       if (_stream->iostream.set_max_buffer_size == NULL) {
+               _stream->iostream.set_max_buffer_size =
+                       o_stream_default_set_max_buffer_size;
+       }
+
+       if (_stream->cork == NULL)
+               _stream->cork = o_stream_default_cork;
+       if (_stream->flush == NULL)
+               _stream->flush = o_stream_default_flush;
+       if (_stream->set_flush_callback == NULL) {
+               _stream->set_flush_callback =
+                       o_stream_default_set_flush_callback;
+       }
+       if (_stream->flush_pending == NULL)
+               _stream->flush_pending = o_stream_default_set_flush_pending;
+       if (_stream->get_used_size == NULL)
+               _stream->get_used_size = o_stream_default_get_used_size;
+       if (_stream->seek == NULL)
+               _stream->seek = o_stream_default_seek;
+       if (_stream->write_at == NULL)
+               _stream->write_at = o_stream_default_write_at;
+       if (_stream->send_istream == NULL)
+               _stream->send_istream = o_stream_default_send_istream;
+       if (_stream->switch_ioloop == NULL)
+               _stream->switch_ioloop = o_stream_default_switch_ioloop;
+
+       io_stream_init(&_stream->iostream);
+       return &_stream->ostream;
 }
index e7b40301d68caf8dcd6ca6d42c9d16ba8578084b..070cdc3c4616b4f514199bffe2a2ca72d4b6c899 100644 (file)
@@ -20,25 +20,11 @@ struct bzlib_ostream {
        unsigned int flushed:1;
 };
 
-static void zstream_copy_error(struct bzlib_ostream *zstream)
-{
-       struct ostream *src = zstream->output;
-       struct ostream *dest = &zstream->ostream.ostream;
-
-       dest->stream_errno = src->stream_errno;
-       dest->last_failed_errno = src->last_failed_errno;
-       dest->overflow = src->overflow;
-}
-
 static void o_stream_bzlib_close(struct iostream_private *stream)
 {
        struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
 
-       if (zstream->output == NULL)
-               return;
-
        o_stream_flush(&zstream->ostream.ostream);
-       o_stream_unref(&zstream->output);
        (void)BZ2_bzCompressEnd(&zstream->zs);
 }
 
@@ -56,10 +42,11 @@ o_stream_bzlib_send_chunk(struct bzlib_ostream *zstream,
                        zs->next_out = zstream->outbuf;
                        zs->avail_out = sizeof(zstream->outbuf);
 
-                       ret = o_stream_send(zstream->output, zstream->outbuf,
+                       ret = o_stream_send(zstream->ostream.parent,
+                                           zstream->outbuf,
                                            sizeof(zstream->outbuf));
                        if (ret != (ssize_t)sizeof(zstream->outbuf)) {
-                               zstream_copy_error(zstream);
+                               o_stream_copy_error_from_parent(&zstream->ostream);
                                return -1;
                        }
                }
@@ -93,10 +80,10 @@ static int o_stream_bzlib_send_flush(struct bzlib_ostream *zstream)
                        zs->next_out = zstream->outbuf;
                        zs->avail_out = sizeof(zstream->outbuf);
 
-                       ret = o_stream_send(zstream->output,
+                       ret = o_stream_send(zstream->ostream.parent,
                                            zstream->outbuf, len);
                        if (ret != (int)len) {
-                               zstream_copy_error(zstream);
+                               o_stream_copy_error_from_parent(&zstream->ostream);
                                return -1;
                        }
                        if (done)
@@ -119,19 +106,6 @@ static int o_stream_bzlib_send_flush(struct bzlib_ostream *zstream)
        return 0;
 }
 
-static void o_stream_bzlib_cork(struct ostream_private *stream, bool set)
-{
-       struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
-
-       stream->corked = set;
-       if (set)
-               o_stream_cork(zstream->output);
-       else {
-               (void)o_stream_flush(&stream->ostream);
-               o_stream_uncork(zstream->output);
-       }
-}
-
 static int o_stream_bzlib_flush(struct ostream_private *stream)
 {
        struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
@@ -140,19 +114,12 @@ static int o_stream_bzlib_flush(struct ostream_private *stream)
        if (o_stream_bzlib_send_flush(zstream) < 0)
                return -1;
 
-       ret = o_stream_flush(zstream->output);
+       ret = o_stream_flush(stream->parent);
        if (ret < 0)
-               zstream_copy_error(zstream);
+               o_stream_copy_error_from_parent(stream);
        return ret;
 }
 
-static void o_stream_bzlib_switch_ioloop(struct ostream_private *stream)
-{
-       struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
-
-       o_stream_switch_ioloop(zstream->output);
-}
-
 static ssize_t
 o_stream_bzlib_sendv(struct ostream_private *stream,
                    const struct const_iovec *iov, unsigned int iov_count)
@@ -181,12 +148,8 @@ struct ostream *o_stream_create_bz2(struct ostream *output, int level)
 
        zstream = i_new(struct bzlib_ostream, 1);
        zstream->ostream.sendv = o_stream_bzlib_sendv;
-       zstream->ostream.cork = o_stream_bzlib_cork;
        zstream->ostream.flush = o_stream_bzlib_flush;
-       zstream->ostream.switch_ioloop = o_stream_bzlib_switch_ioloop;
        zstream->ostream.iostream.close = o_stream_bzlib_close;
-       zstream->output = output;
-       o_stream_ref(output);
 
        ret = BZ2_bzCompressInit(&zstream->zs, level, 0, 0);
        switch (ret) {
@@ -205,6 +168,6 @@ struct ostream *o_stream_create_bz2(struct ostream *output, int level)
 
        zstream->zs.next_out = zstream->outbuf;
        zstream->zs.avail_out = sizeof(zstream->outbuf);
-       return o_stream_create(&zstream->ostream);
+       return o_stream_create(&zstream->ostream, output);
 }
 #endif
index 34a1a775840f14f8fc0bb0c0a4efdd51f39e5a61..a834afebea13bb214f7ce013ebfdc764d16df278 100644 (file)
@@ -19,7 +19,6 @@ struct zlib_ostream {
        unsigned char gz_header[10];
        unsigned char outbuf[CHUNK_SIZE];
 
-       struct ostream *output;
        uint32_t crc, bytes32;
 
        unsigned int gz:1;
@@ -27,25 +26,11 @@ struct zlib_ostream {
        unsigned int flushed:1;
 };
 
-static void zstream_copy_error(struct zlib_ostream *zstream)
-{
-       struct ostream *src = zstream->output;
-       struct ostream *dest = &zstream->ostream.ostream;
-
-       dest->stream_errno = src->stream_errno;
-       dest->last_failed_errno = src->last_failed_errno;
-       dest->overflow = src->overflow;
-}
-
 static void o_stream_zlib_close(struct iostream_private *stream)
 {
        struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
 
-       if (zstream->output == NULL)
-               return;
-
        o_stream_flush(&zstream->ostream.ostream);
-       o_stream_unref(&zstream->output);
        (void)deflateEnd(&zstream->zs);
 }
 
@@ -53,10 +38,10 @@ static int o_stream_zlib_send_gz_header(struct zlib_ostream *zstream)
 {
        ssize_t ret;
 
-       ret = o_stream_send(zstream->output, zstream->gz_header,
+       ret = o_stream_send(zstream->ostream.parent, zstream->gz_header,
                            sizeof(zstream->gz_header));
        if ((size_t)ret != sizeof(zstream->gz_header)) {
-               zstream_copy_error(zstream);
+               o_stream_copy_error_from_parent(&zstream->ostream);
                return -1;
        }
        zstream->header_sent = TRUE;
@@ -79,12 +64,14 @@ static int o_stream_zlib_lsb_uint32(struct ostream *output, uint32_t num)
 
 static int o_stream_zlib_send_gz_trailer(struct zlib_ostream *zstream)
 {
+       struct ostream *output = zstream->ostream.parent;
+
        if (!zstream->gz)
                return 0;
 
-       if (o_stream_zlib_lsb_uint32(zstream->output, zstream->crc) < 0 ||
-           o_stream_zlib_lsb_uint32(zstream->output, zstream->bytes32) < 0) {
-               zstream_copy_error(zstream);
+       if (o_stream_zlib_lsb_uint32(output, zstream->crc) < 0 ||
+           o_stream_zlib_lsb_uint32(output, zstream->bytes32) < 0) {
+               o_stream_copy_error_from_parent(&zstream->ostream);
                return -1;
        }
        return 0;
@@ -111,10 +98,11 @@ o_stream_zlib_send_chunk(struct zlib_ostream *zstream,
                        zs->next_out = zstream->outbuf;
                        zs->avail_out = sizeof(zstream->outbuf);
 
-                       ret = o_stream_send(zstream->output, zstream->outbuf,
+                       ret = o_stream_send(zstream->ostream.parent,
+                                           zstream->outbuf,
                                            sizeof(zstream->outbuf));
                        if (ret != (ssize_t)sizeof(zstream->outbuf)) {
-                               zstream_copy_error(zstream);
+                               o_stream_copy_error_from_parent(&zstream->ostream);
                                return -1;
                        }
                }
@@ -154,10 +142,10 @@ static int o_stream_zlib_send_flush(struct zlib_ostream *zstream)
                        zs->next_out = zstream->outbuf;
                        zs->avail_out = sizeof(zstream->outbuf);
 
-                       ret = o_stream_send(zstream->output,
+                       ret = o_stream_send(zstream->ostream.parent,
                                            zstream->outbuf, len);
                        if (ret != (int)len) {
-                               zstream_copy_error(zstream);
+                               o_stream_copy_error_from_parent(&zstream->ostream);
                                return -1;
                        }
                        if (done)
@@ -182,19 +170,6 @@ static int o_stream_zlib_send_flush(struct zlib_ostream *zstream)
        return 0;
 }
 
-static void o_stream_zlib_cork(struct ostream_private *stream, bool set)
-{
-       struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
-
-       stream->corked = set;
-       if (set)
-               o_stream_cork(zstream->output);
-       else {
-               (void)o_stream_flush(&stream->ostream);
-               o_stream_uncork(zstream->output);
-       }
-}
-
 static int o_stream_zlib_flush(struct ostream_private *stream)
 {
        struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
@@ -203,19 +178,12 @@ static int o_stream_zlib_flush(struct ostream_private *stream)
        if (o_stream_zlib_send_flush(zstream) < 0)
                return -1;
 
-       ret = o_stream_flush(zstream->output);
+       ret = o_stream_flush(stream->parent);
        if (ret < 0)
-               zstream_copy_error(zstream);
+               o_stream_copy_error_from_parent(stream);
        return ret;
 }
 
-static void o_stream_zlib_switch_ioloop(struct ostream_private *stream)
-{
-       struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
-
-       o_stream_switch_ioloop(zstream->output);
-}
-
 static ssize_t
 o_stream_zlib_sendv(struct ostream_private *stream,
                    const struct const_iovec *iov, unsigned int iov_count)
@@ -265,16 +233,12 @@ o_stream_create_zlib(struct ostream *output, int level, bool gz)
 
        zstream = i_new(struct zlib_ostream, 1);
        zstream->ostream.sendv = o_stream_zlib_sendv;
-       zstream->ostream.cork = o_stream_zlib_cork;
        zstream->ostream.flush = o_stream_zlib_flush;
-       zstream->ostream.switch_ioloop = o_stream_zlib_switch_ioloop;
        zstream->ostream.iostream.close = o_stream_zlib_close;
-       zstream->output = output;
        zstream->crc = 0;
        zstream->gz = gz;
        if (!gz)
                zstream->header_sent = TRUE;
-       o_stream_ref(output);
 
        o_stream_zlib_init_gz_header(zstream, level, strategy);
        ret = deflateInit2(&zstream->zs, level, Z_DEFLATED, -15, 8, strategy);
@@ -293,7 +257,7 @@ o_stream_create_zlib(struct ostream *output, int level, bool gz)
 
        zstream->zs.next_out = zstream->outbuf;
        zstream->zs.avail_out = sizeof(zstream->outbuf);
-       return o_stream_create(&zstream->ostream);
+       return o_stream_create(&zstream->ostream, output);
 }
 
 struct ostream *o_stream_create_gz(struct ostream *output, int level)