From: Aki Tuomi Date: Tue, 21 May 2019 08:45:57 +0000 (+0300) Subject: lib: ostream-multiplex - Implement cork support X-Git-Tag: 2.3.9~377 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a25c91bfa1c1ed1d6e8b3134ab8e011bafd5628e;p=thirdparty%2Fdovecot%2Fcore.git lib: ostream-multiplex - Implement cork support Support corking for multiplex channels --- diff --git a/src/lib/ostream-multiplex.c b/src/lib/ostream-multiplex.c index a4bdc398f7..60805b8c5a 100644 --- a/src/lib/ostream-multiplex.c +++ b/src/lib/ostream-multiplex.c @@ -17,6 +17,7 @@ struct multiplex_ochannel { buffer_t *buf; time_t last_sent; bool closed:1; + bool corked:1; }; struct multiplex_ostream { @@ -73,6 +74,8 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream) return -2; while((channel = get_next_channel(mstream)) != NULL) { + if (channel->buf->used == 0) + continue; size_t tmp = mstream->bufsize - mstream->wbuf->used - 5; /* ensure it fits into 32 bit int */ size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used)); @@ -98,6 +101,38 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream) return ret; } +static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream) +{ + ssize_t ret; + struct multiplex_ochannel *channel = (struct multiplex_ochannel *)stream; + struct multiplex_ostream *mstream = channel->mstream; + + /* flush parent stream always, so there is room for more. */ + if ((ret = o_stream_flush(mstream->parent)) <= 0) { + if (ret == -1) + propagate_error(mstream, mstream->parent->stream_errno); + return ret; + } + + /* send all channels */ + if (o_stream_multiplex_sendv(mstream) < 0) + return -1; + + if (channel->buf->used > 0) + return 0; + return 1; +} + +static void o_stream_multiplex_ochannel_cork(struct ostream_private *stream, bool set) +{ + struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; + if (channel->corked != set && !set) { + /* flush */ + (void)o_stream_multiplex_ochannel_flush(stream); + } + channel->corked = set; +} + static ssize_t o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, const struct const_iovec *iov, unsigned int iov_count) @@ -120,6 +155,10 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, stream->ostream.offset += total; + /* will send later */ + if (channel->corked) + return total; + if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0) return ret; @@ -198,6 +237,8 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) channel->cid = cid; channel->buf = buffer_create_dynamic(default_pool, 256); channel->mstream = mstream; + channel->ostream.cork = o_stream_multiplex_ochannel_cork; + channel->ostream.flush = o_stream_multiplex_ochannel_flush; channel->ostream.sendv = o_stream_multiplex_ochannel_sendv; channel->ostream.get_buffer_used_size = o_stream_multiplex_ochannel_get_buffer_used_size;