struct ostream_private ostream;
struct istream *input;
- struct ostream *output;
bool equals;
};
-static void cstream_copy_error(struct cmp_ostream *cstream)
-{
- struct ostream *src = cstream->output;
- struct ostream *dest = &cstream->ostream.ostream;
-
- dest->stream_errno = src->stream_errno;
- dest->last_failed_errno = src->last_failed_errno;
- dest->overflow = src->overflow;
-}
-
static void o_stream_cmp_close(struct iostream_private *stream)
{
struct cmp_ostream *cstream = (struct cmp_ostream *)stream;
- if (cstream->output == NULL)
+ if (cstream->input == NULL)
return;
i_stream_unref(&cstream->input);
o_stream_flush(&cstream->ostream.ostream);
- o_stream_unref(&cstream->output);
-}
-
-static int o_stream_cmp_flush(struct ostream_private *stream)
-{
- struct cmp_ostream *cstream = (struct cmp_ostream *)stream;
- int ret;
-
- ret = o_stream_flush(cstream->output);
- if (ret < 0)
- cstream_copy_error(cstream);
- return ret;
}
bool stream_cmp_block(struct istream *input,
}
}
- if ((ret = o_stream_sendv(cstream->output, iov, iov_count)) < 0) {
- cstream_copy_error(cstream);
+ if ((ret = o_stream_sendv(stream->parent, iov, iov_count)) < 0) {
+ o_stream_copy_error_from_parent(stream);
return -1;
}
cstream = i_new(struct cmp_ostream, 1);
cstream->ostream.sendv = o_stream_cmp_sendv;
- cstream->ostream.flush = o_stream_cmp_flush;
cstream->ostream.iostream.close = o_stream_cmp_close;
cstream->input = input;
- cstream->output = output;
cstream->equals = TRUE;
i_stream_ref(input);
- o_stream_ref(output);
- return o_stream_create(&cstream->ostream);
+ return o_stream_create(&cstream->ostream, output);
}
bool o_stream_cmp_equals(struct ostream *_output)
o_stream_set_flush_callback(ssl_io->plain_output,
plain_flush_callback, sstream);
- return o_stream_create(&sstream->ostream);
+ return o_stream_create(&sstream->ostream, NULL);
}
bstream->ostream.write_at = o_stream_buffer_write_at;
bstream->buf = buf;
- return o_stream_create(&bstream->ostream);
+ return o_stream_create(&bstream->ostream, NULL);
}
fstream = o_stream_create_fd_common(fd, autoclose_fd);
fstream->ostream.max_buffer_size = max_buffer_size;
- ostream = o_stream_create(&fstream->ostream);
+ ostream = o_stream_create(&fstream->ostream, NULL);
offset = lseek(fd, 0, SEEK_CUR);
if (offset >= 0) {
fstream->real_offset = offset;
fstream->buffer_offset = offset;
- ostream = o_stream_create(&fstream->ostream);
+ ostream = o_stream_create(&fstream->ostream, NULL);
ostream->offset = offset;
return ostream;
}
/* methods: */
void (*cork)(struct ostream_private *stream, bool set);
int (*flush)(struct ostream_private *stream);
+ void (*set_flush_callback)(struct ostream_private *stream,
+ stream_flush_callback_t *callback,
+ void *context);
void (*flush_pending)(struct ostream_private *stream, bool set);
size_t (*get_used_size)(const struct ostream_private *stream);
int (*seek)(struct ostream_private *stream, uoff_t offset);
struct ostream ostream;
size_t max_buffer_size;
+ struct ostream *parent; /* for filter streams */
+
stream_flush_callback_t *callback;
void *context;
unsigned int corked:1;
};
-struct ostream *o_stream_create(struct ostream_private *_stream);
+struct ostream *
+o_stream_create(struct ostream_private *_stream, struct ostream *parent);
off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
size_t block_size);
+void o_stream_copy_error_from_parent(struct ostream_private *_stream);
+
#endif
const char *o_stream_get_name(struct ostream *stream)
{
- return stream->real_stream->iostream.name == NULL ? "" :
- stream->real_stream->iostream.name;
+ while (stream->real_stream->iostream.name == NULL) {
+ stream = stream->real_stream->parent;
+ if (stream == NULL)
+ return "";
+ }
+ return stream->real_stream->iostream.name;
}
void o_stream_destroy(struct ostream **stream)
{
struct ostream_private *_stream = stream->real_stream;
- _stream->callback = callback;
- _stream->context = context;
+ _stream->set_flush_callback(_stream, callback, context);
}
void o_stream_unset_flush_callback(struct ostream *stream)
{
- struct ostream_private *_stream = stream->real_stream;
-
- _stream->callback = NULL;
- _stream->context = NULL;
+ o_stream_set_flush_callback(stream, NULL, NULL);
}
void o_stream_set_max_buffer_size(struct ostream *stream, size_t max_size)
{
- if (stream->real_stream->iostream.set_max_buffer_size != NULL) {
- io_stream_set_max_buffer_size(&stream->real_stream->iostream,
- max_size);
- } else {
- stream->real_stream->max_buffer_size = max_size;
- }
+ io_stream_set_max_buffer_size(&stream->real_stream->iostream, max_size);
}
void o_stream_cork(struct ostream *stream)
if (unlikely(stream->closed))
return;
- if (_stream->cork != NULL)
- _stream->cork(_stream, TRUE);
- else
- _stream->corked = TRUE;
+ _stream->cork(_stream, TRUE);
}
void o_stream_uncork(struct ostream *stream)
if (unlikely(stream->closed))
return;
- if (_stream->cork != NULL)
- _stream->cork(_stream, FALSE);
- else {
- _stream->corked = FALSE;
- (void)o_stream_flush(stream);
- }
+ _stream->cork(_stream, FALSE);
}
int o_stream_flush(struct ostream *stream)
return -1;
stream->stream_errno = 0;
- if (_stream->flush != NULL) {
- if (unlikely((ret = _stream->flush(_stream)) < 0)) {
- i_assert(stream->stream_errno != 0);
- stream->last_failed_errno = stream->stream_errno;
- }
+ if (unlikely((ret = _stream->flush(_stream)) < 0)) {
+ i_assert(stream->stream_errno != 0);
+ stream->last_failed_errno = stream->stream_errno;
}
return ret;
}
if (unlikely(stream->closed))
return;
- if (_stream->flush_pending != NULL)
- _stream->flush_pending(_stream, set);
+ _stream->flush_pending(_stream, set);
}
size_t o_stream_get_buffer_used_size(const struct ostream *stream)
{
const struct ostream_private *_stream = stream->real_stream;
- return _stream->get_used_size == NULL ? 0 :
- _stream->get_used_size(_stream);
+ return _stream->get_used_size(_stream);
}
size_t o_stream_get_buffer_avail_size(const struct ostream *stream)
return -1;
stream->stream_errno = 0;
- if (_stream->seek != NULL) {
- if (unlikely(_stream->seek(_stream, offset) < 0)) {
- i_assert(stream->stream_errno != 0);
- stream->last_failed_errno = stream->stream_errno;
- }
- } else {
- stream->stream_errno = EPIPE;
- stream->last_failed_errno = EPIPE;
+ if (unlikely(_stream->seek(_stream, offset) < 0)) {
+ i_assert(stream->stream_errno != 0);
+ stream->last_failed_errno = stream->stream_errno;
return -1;
}
return 1;
if (unlikely(stream->closed))
return -1;
- if (stream->real_stream->write_at == NULL) {
- /* stream doesn't support seeking */
- stream->stream_errno = EPIPE;
- return -1;
- }
ret = stream->real_stream->write_at(stream->real_stream,
data, size, offset);
if (unlikely(ret < 0)) {
return ret;
}
-static off_t o_stream_default_send_istream(struct ostream_private *outstream,
- struct istream *instream)
-{
- return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
-}
-
-struct ostream *o_stream_create(struct ostream_private *_stream)
-{
- _stream->ostream.real_stream = _stream;
- if (_stream->send_istream == NULL)
- _stream->send_istream = o_stream_default_send_istream;
-
- io_stream_init(&_stream->iostream);
- return &_stream->ostream;
-}
-
off_t io_stream_copy(struct ostream *outstream, struct istream *instream,
size_t block_size)
{
{
struct ostream_private *_stream = stream->real_stream;
- if (_stream->switch_ioloop != NULL)
- _stream->switch_ioloop(_stream);
+ _stream->switch_ioloop(_stream);
+}
+
+static void o_stream_default_close(struct iostream_private *stream)
+{
+ struct ostream_private *_stream = (struct ostream_private *)stream;
+
+ (void)o_stream_flush(&_stream->ostream);
+}
+
+static void o_stream_default_destroy(struct iostream_private *stream)
+{
+ struct ostream_private *_stream = (struct ostream_private *)stream;
+
+ if (_stream->parent != NULL)
+ o_stream_unref(&_stream->parent);
+}
+
+static void
+o_stream_default_set_max_buffer_size(struct iostream_private *stream,
+ size_t max_size)
+{
+ struct ostream_private *_stream = (struct ostream_private *)stream;
+
+ if (_stream->parent != NULL)
+ o_stream_set_max_buffer_size(_stream->parent, max_size);
+ _stream->max_buffer_size = max_size;
+}
+
+static void o_stream_default_cork(struct ostream_private *_stream, bool set)
+{
+ _stream->corked = set;
+ if (set) {
+ if (_stream->parent != NULL)
+ o_stream_cork(_stream->parent);
+ } else {
+ (void)o_stream_flush(&_stream->ostream);
+ if (_stream->parent != NULL)
+ o_stream_uncork(_stream->parent);
+ }
+}
+
+void o_stream_copy_error_from_parent(struct ostream_private *_stream)
+{
+ struct ostream *src = _stream->parent;
+ struct ostream *dest = &_stream->ostream;
+
+ dest->stream_errno = src->stream_errno;
+ dest->last_failed_errno = src->last_failed_errno;
+ dest->overflow = src->overflow;
+}
+
+static int o_stream_default_flush(struct ostream_private *_stream)
+{
+ int ret;
+
+ if (_stream->parent == NULL)
+ return 1;
+
+ if ((ret = o_stream_flush(_stream->parent)) < 0)
+ o_stream_copy_error_from_parent(_stream);
+ return ret;
+}
+
+static void
+o_stream_default_set_flush_callback(struct ostream_private *_stream,
+ stream_flush_callback_t *callback,
+ void *context)
+{
+ if (_stream->parent == NULL) {
+ _stream->callback = callback;
+ _stream->context = context;
+ } else {
+ /* this is a filter stream, we don't have a flush
+ callback ourself */
+ o_stream_set_flush_callback(_stream->parent, callback, context);
+ }
+}
+
+static void
+o_stream_default_set_flush_pending(struct ostream_private *_stream, bool set)
+{
+ if (_stream->parent != NULL)
+ o_stream_set_flush_pending(_stream->parent, set);
+}
+
+static size_t
+o_stream_default_get_used_size(const struct ostream_private *_stream)
+{
+ if (_stream->parent == NULL)
+ return 0;
+ else
+ return o_stream_get_buffer_used_size(_stream->parent);
+}
+
+static int
+o_stream_default_seek(struct ostream_private *_stream,
+ uoff_t offset ATTR_UNUSED)
+{
+ _stream->ostream.stream_errno = EPIPE;
+ return -1;
+}
+
+static int
+o_stream_default_write_at(struct ostream_private *_stream,
+ const void *data ATTR_UNUSED,
+ size_t size ATTR_UNUSED, uoff_t offset ATTR_UNUSED)
+{
+ _stream->ostream.stream_errno = EPIPE;
+ return -1;
+}
+
+static off_t o_stream_default_send_istream(struct ostream_private *outstream,
+ struct istream *instream)
+{
+ return io_stream_copy(&outstream->ostream, instream, IO_BLOCK_SIZE);
+}
+
+static void o_stream_default_switch_ioloop(struct ostream_private *_stream)
+{
+ if (_stream->parent != NULL)
+ o_stream_switch_ioloop(_stream->parent);
+}
+
+struct ostream *
+o_stream_create(struct ostream_private *_stream, struct ostream *parent)
+{
+ _stream->ostream.real_stream = _stream;
+ if (parent != NULL) {
+ _stream->parent = parent;
+ o_stream_ref(parent);
+ }
+
+ if (_stream->iostream.close == NULL)
+ _stream->iostream.close = o_stream_default_close;
+ if (_stream->iostream.destroy == NULL)
+ _stream->iostream.destroy = o_stream_default_destroy;
+ if (_stream->iostream.set_max_buffer_size == NULL) {
+ _stream->iostream.set_max_buffer_size =
+ o_stream_default_set_max_buffer_size;
+ }
+
+ if (_stream->cork == NULL)
+ _stream->cork = o_stream_default_cork;
+ if (_stream->flush == NULL)
+ _stream->flush = o_stream_default_flush;
+ if (_stream->set_flush_callback == NULL) {
+ _stream->set_flush_callback =
+ o_stream_default_set_flush_callback;
+ }
+ if (_stream->flush_pending == NULL)
+ _stream->flush_pending = o_stream_default_set_flush_pending;
+ if (_stream->get_used_size == NULL)
+ _stream->get_used_size = o_stream_default_get_used_size;
+ if (_stream->seek == NULL)
+ _stream->seek = o_stream_default_seek;
+ if (_stream->write_at == NULL)
+ _stream->write_at = o_stream_default_write_at;
+ if (_stream->send_istream == NULL)
+ _stream->send_istream = o_stream_default_send_istream;
+ if (_stream->switch_ioloop == NULL)
+ _stream->switch_ioloop = o_stream_default_switch_ioloop;
+
+ io_stream_init(&_stream->iostream);
+ return &_stream->ostream;
}
unsigned int flushed:1;
};
-static void zstream_copy_error(struct bzlib_ostream *zstream)
-{
- struct ostream *src = zstream->output;
- struct ostream *dest = &zstream->ostream.ostream;
-
- dest->stream_errno = src->stream_errno;
- dest->last_failed_errno = src->last_failed_errno;
- dest->overflow = src->overflow;
-}
-
static void o_stream_bzlib_close(struct iostream_private *stream)
{
struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
- if (zstream->output == NULL)
- return;
-
o_stream_flush(&zstream->ostream.ostream);
- o_stream_unref(&zstream->output);
(void)BZ2_bzCompressEnd(&zstream->zs);
}
zs->next_out = zstream->outbuf;
zs->avail_out = sizeof(zstream->outbuf);
- ret = o_stream_send(zstream->output, zstream->outbuf,
+ ret = o_stream_send(zstream->ostream.parent,
+ zstream->outbuf,
sizeof(zstream->outbuf));
if (ret != (ssize_t)sizeof(zstream->outbuf)) {
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(&zstream->ostream);
return -1;
}
}
zs->next_out = zstream->outbuf;
zs->avail_out = sizeof(zstream->outbuf);
- ret = o_stream_send(zstream->output,
+ ret = o_stream_send(zstream->ostream.parent,
zstream->outbuf, len);
if (ret != (int)len) {
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(&zstream->ostream);
return -1;
}
if (done)
return 0;
}
-static void o_stream_bzlib_cork(struct ostream_private *stream, bool set)
-{
- struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
-
- stream->corked = set;
- if (set)
- o_stream_cork(zstream->output);
- else {
- (void)o_stream_flush(&stream->ostream);
- o_stream_uncork(zstream->output);
- }
-}
-
static int o_stream_bzlib_flush(struct ostream_private *stream)
{
struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
if (o_stream_bzlib_send_flush(zstream) < 0)
return -1;
- ret = o_stream_flush(zstream->output);
+ ret = o_stream_flush(stream->parent);
if (ret < 0)
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(stream);
return ret;
}
-static void o_stream_bzlib_switch_ioloop(struct ostream_private *stream)
-{
- struct bzlib_ostream *zstream = (struct bzlib_ostream *)stream;
-
- o_stream_switch_ioloop(zstream->output);
-}
-
static ssize_t
o_stream_bzlib_sendv(struct ostream_private *stream,
const struct const_iovec *iov, unsigned int iov_count)
zstream = i_new(struct bzlib_ostream, 1);
zstream->ostream.sendv = o_stream_bzlib_sendv;
- zstream->ostream.cork = o_stream_bzlib_cork;
zstream->ostream.flush = o_stream_bzlib_flush;
- zstream->ostream.switch_ioloop = o_stream_bzlib_switch_ioloop;
zstream->ostream.iostream.close = o_stream_bzlib_close;
- zstream->output = output;
- o_stream_ref(output);
ret = BZ2_bzCompressInit(&zstream->zs, level, 0, 0);
switch (ret) {
zstream->zs.next_out = zstream->outbuf;
zstream->zs.avail_out = sizeof(zstream->outbuf);
- return o_stream_create(&zstream->ostream);
+ return o_stream_create(&zstream->ostream, output);
}
#endif
unsigned char gz_header[10];
unsigned char outbuf[CHUNK_SIZE];
- struct ostream *output;
uint32_t crc, bytes32;
unsigned int gz:1;
unsigned int flushed:1;
};
-static void zstream_copy_error(struct zlib_ostream *zstream)
-{
- struct ostream *src = zstream->output;
- struct ostream *dest = &zstream->ostream.ostream;
-
- dest->stream_errno = src->stream_errno;
- dest->last_failed_errno = src->last_failed_errno;
- dest->overflow = src->overflow;
-}
-
static void o_stream_zlib_close(struct iostream_private *stream)
{
struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
- if (zstream->output == NULL)
- return;
-
o_stream_flush(&zstream->ostream.ostream);
- o_stream_unref(&zstream->output);
(void)deflateEnd(&zstream->zs);
}
{
ssize_t ret;
- ret = o_stream_send(zstream->output, zstream->gz_header,
+ ret = o_stream_send(zstream->ostream.parent, zstream->gz_header,
sizeof(zstream->gz_header));
if ((size_t)ret != sizeof(zstream->gz_header)) {
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(&zstream->ostream);
return -1;
}
zstream->header_sent = TRUE;
static int o_stream_zlib_send_gz_trailer(struct zlib_ostream *zstream)
{
+ struct ostream *output = zstream->ostream.parent;
+
if (!zstream->gz)
return 0;
- if (o_stream_zlib_lsb_uint32(zstream->output, zstream->crc) < 0 ||
- o_stream_zlib_lsb_uint32(zstream->output, zstream->bytes32) < 0) {
- zstream_copy_error(zstream);
+ if (o_stream_zlib_lsb_uint32(output, zstream->crc) < 0 ||
+ o_stream_zlib_lsb_uint32(output, zstream->bytes32) < 0) {
+ o_stream_copy_error_from_parent(&zstream->ostream);
return -1;
}
return 0;
zs->next_out = zstream->outbuf;
zs->avail_out = sizeof(zstream->outbuf);
- ret = o_stream_send(zstream->output, zstream->outbuf,
+ ret = o_stream_send(zstream->ostream.parent,
+ zstream->outbuf,
sizeof(zstream->outbuf));
if (ret != (ssize_t)sizeof(zstream->outbuf)) {
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(&zstream->ostream);
return -1;
}
}
zs->next_out = zstream->outbuf;
zs->avail_out = sizeof(zstream->outbuf);
- ret = o_stream_send(zstream->output,
+ ret = o_stream_send(zstream->ostream.parent,
zstream->outbuf, len);
if (ret != (int)len) {
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(&zstream->ostream);
return -1;
}
if (done)
return 0;
}
-static void o_stream_zlib_cork(struct ostream_private *stream, bool set)
-{
- struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
-
- stream->corked = set;
- if (set)
- o_stream_cork(zstream->output);
- else {
- (void)o_stream_flush(&stream->ostream);
- o_stream_uncork(zstream->output);
- }
-}
-
static int o_stream_zlib_flush(struct ostream_private *stream)
{
struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
if (o_stream_zlib_send_flush(zstream) < 0)
return -1;
- ret = o_stream_flush(zstream->output);
+ ret = o_stream_flush(stream->parent);
if (ret < 0)
- zstream_copy_error(zstream);
+ o_stream_copy_error_from_parent(stream);
return ret;
}
-static void o_stream_zlib_switch_ioloop(struct ostream_private *stream)
-{
- struct zlib_ostream *zstream = (struct zlib_ostream *)stream;
-
- o_stream_switch_ioloop(zstream->output);
-}
-
static ssize_t
o_stream_zlib_sendv(struct ostream_private *stream,
const struct const_iovec *iov, unsigned int iov_count)
zstream = i_new(struct zlib_ostream, 1);
zstream->ostream.sendv = o_stream_zlib_sendv;
- zstream->ostream.cork = o_stream_zlib_cork;
zstream->ostream.flush = o_stream_zlib_flush;
- zstream->ostream.switch_ioloop = o_stream_zlib_switch_ioloop;
zstream->ostream.iostream.close = o_stream_zlib_close;
- zstream->output = output;
zstream->crc = 0;
zstream->gz = gz;
if (!gz)
zstream->header_sent = TRUE;
- o_stream_ref(output);
o_stream_zlib_init_gz_header(zstream, level, strategy);
ret = deflateInit2(&zstream->zs, level, Z_DEFLATED, -15, 8, strategy);
zstream->zs.next_out = zstream->outbuf;
zstream->zs.avail_out = sizeof(zstream->outbuf);
- return o_stream_create(&zstream->ostream);
+ return o_stream_create(&zstream->ostream, output);
}
struct ostream *o_stream_create_gz(struct ostream *output, int level)