]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: ostream-multiplex - Implement cork support
authorAki Tuomi <aki.tuomi@open-xchange.com>
Tue, 21 May 2019 08:45:57 +0000 (11:45 +0300)
committerVille Savolainen <ville.savolainen@dovecot.fi>
Tue, 10 Sep 2019 07:01:57 +0000 (10:01 +0300)
Support corking for multiplex channels

src/lib/ostream-multiplex.c

index a4bdc398f70c02e0cd6cf7b74a147a98502e0d7d..60805b8c5a6568e15ccf39b1b3bdce674b126e43 100644 (file)
@@ -17,6 +17,7 @@ struct multiplex_ochannel {
        buffer_t *buf;
        time_t last_sent;
        bool closed:1;
+       bool corked:1;
 };
 
 struct multiplex_ostream {
@@ -73,6 +74,8 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
                return -2;
 
        while((channel = get_next_channel(mstream)) != NULL) {
+               if (channel->buf->used == 0)
+                       continue;
                size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
                /* ensure it fits into 32 bit int */
                size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
@@ -98,6 +101,38 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
        return ret;
 }
 
+static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream)
+{
+       ssize_t ret;
+       struct multiplex_ochannel *channel = (struct multiplex_ochannel *)stream;
+       struct multiplex_ostream *mstream = channel->mstream;
+
+       /* flush parent stream always, so there is room for more. */
+       if ((ret = o_stream_flush(mstream->parent)) <= 0) {
+               if (ret == -1)
+                       propagate_error(mstream, mstream->parent->stream_errno);
+               return ret;
+       }
+
+       /* send all channels */
+       if (o_stream_multiplex_sendv(mstream) < 0)
+               return -1;
+
+       if (channel->buf->used > 0)
+               return 0;
+       return 1;
+}
+
+static void o_stream_multiplex_ochannel_cork(struct ostream_private *stream, bool set)
+{
+       struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
+       if (channel->corked != set && !set) {
+               /* flush */
+               (void)o_stream_multiplex_ochannel_flush(stream);
+       }
+       channel->corked = set;
+}
+
 static ssize_t
 o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
                                 const struct const_iovec *iov, unsigned int iov_count)
@@ -120,6 +155,10 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
 
        stream->ostream.offset += total;
 
+       /* will send later */
+       if (channel->corked)
+               return total;
+
        if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
                return ret;
 
@@ -198,6 +237,8 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
        channel->cid = cid;
        channel->buf = buffer_create_dynamic(default_pool, 256);
        channel->mstream = mstream;
+       channel->ostream.cork = o_stream_multiplex_ochannel_cork;
+       channel->ostream.flush = o_stream_multiplex_ochannel_flush;
        channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
        channel->ostream.get_buffer_used_size =
                o_stream_multiplex_ochannel_get_buffer_used_size;