]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib: ostream-multiplex - Optimize writing
authorAki Tuomi <aki.tuomi@open-xchange.com>
Tue, 21 May 2019 09:43:46 +0000 (12:43 +0300)
committerVille Savolainen <ville.savolainen@dovecot.fi>
Tue, 10 Sep 2019 07:01:57 +0000 (10:01 +0300)
src/lib/ostream-multiplex.c

index 60805b8c5a6568e15ccf39b1b3bdce674b126e43..71cb871be1a6d05917a056975e886fe980f5c3c1 100644 (file)
@@ -26,7 +26,6 @@ struct multiplex_ostream {
        /* channel 0 is main channel */
        uint8_t cur_channel;
        unsigned int remain;
-       buffer_t *wbuf;
        size_t bufsize;
        ARRAY(struct multiplex_ochannel *) channels;
 
@@ -70,35 +69,39 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream)
 {
        struct multiplex_ochannel *channel;
        ssize_t ret = 0;
-       if (mstream->bufsize <= mstream->wbuf->used + 5)
-               return -2;
 
        while((channel = get_next_channel(mstream)) != NULL) {
                if (channel->buf->used == 0)
                        continue;
-               size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
+               if (o_stream_get_buffer_avail_size(mstream->parent) < 6)
+                       return 0;
+               /* check parent stream capacity */
+               size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5;
                /* ensure it fits into 32 bit int */
                size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
+               /* ensure amt fits */
                if (tmp == 0)
                        break;
+               /* delay corking here now that we are going to send something */
+               if (!o_stream_is_corked(mstream->parent))
+                       o_stream_cork(mstream->parent);
                uint32_t len = cpu32_to_be(amt);
-               buffer_append(mstream->wbuf, &channel->cid, 1);
-               buffer_append(mstream->wbuf, &len, 4);
-               buffer_append(mstream->wbuf, channel->buf->data, amt);
-               buffer_delete(channel->buf, 0, amt);
-               channel->last_sent = ioloop_time;
-       }
-
-       if (mstream->wbuf->used > 0) {
-               ret = o_stream_send(mstream->parent, mstream->wbuf->data,
-                                   mstream->wbuf->used);
-               if (ret < 0) {
+               const struct const_iovec vec[] = {
+                       { &channel->cid, 1 },
+                       { &len, 4 },
+                       { channel->buf->data, amt }
+               };
+               if ((ret = o_stream_sendv(mstream->parent, vec, N_ELEMENTS(vec))) < 0) {
+                       i_assert(ret != -2);
                        propagate_error(mstream, mstream->parent->stream_errno);
-                       return ret;
+                       break;
                }
-               buffer_delete(mstream->wbuf, 0, ret);
+               buffer_delete(channel->buf, 0, amt);
+               channel->last_sent = ioloop_time;
        }
-       return ret;
+       if (o_stream_is_corked(mstream->parent))
+               o_stream_uncork(mstream->parent);
+       return 0;
 }
 
 static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream)
@@ -138,14 +141,25 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
                                 const struct const_iovec *iov, unsigned int iov_count)
 {
        struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
-       ssize_t ret;
-       size_t total = 0;
-       if (channel->mstream->bufsize <= channel->buf->used)
-               return -2;
+       size_t total = 0, avail = o_stream_get_buffer_avail_size(&stream->ostream);
+       size_t optimal_size = I_MIN(IO_BLOCK_SIZE, avail);
+
+       for (unsigned int i = 0; i < iov_count; i++)
+               total += iov[i].iov_len;
+
+       if (avail < total) {
+               if (o_stream_multiplex_sendv(channel->mstream) < 0)
+                       return -1;
+               avail = o_stream_get_buffer_avail_size(&stream->ostream);
+               if (avail == 0)
+                       return -2;
+       }
+
+       total = 0;
 
        for (unsigned int i = 0; i < iov_count; i++) {
                /* copy data to buffer */
-               size_t tmp = channel->mstream->bufsize - channel->buf->used;
+               size_t tmp = avail - total;
                if (tmp == 0)
                        break;
                buffer_append(channel->buf, iov[i].iov_base,
@@ -156,12 +170,11 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
        stream->ostream.offset += total;
 
        /* will send later */
-       if (channel->corked)
+       if (channel->corked && channel->buf->used < optimal_size)
                return total;
 
-       if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
-               return ret;
-
+       if (o_stream_multiplex_sendv(channel->mstream) < 0)
+               return -1;
        return total;
 }
 
@@ -171,7 +184,7 @@ o_stream_multiplex_ochannel_get_buffer_used_size(const struct ostream_private *s
        const struct multiplex_ochannel *channel =
                (const struct multiplex_ochannel*)stream;
 
-       return channel->buf->used + channel->mstream->wbuf->used +
+       return channel->buf->used +
                o_stream_get_buffer_used_size(channel->mstream->parent);
 }
 
@@ -180,9 +193,12 @@ o_stream_multiplex_ochannel_get_buffer_avail_size(const struct ostream_private *
 {
        const struct multiplex_ochannel *channel =
                (const struct multiplex_ochannel*)stream;
+       size_t max_avail = I_MIN(channel->mstream->bufsize,
+                                o_stream_get_buffer_avail_size(stream->parent));
 
-       return channel->mstream->bufsize <= channel->buf->used ? 0 :
-               channel->mstream->bufsize - channel->buf->used;
+       /* There is 5-byte overhead per message, so take that into account */
+       return max_avail <= (channel->buf->used + 5) ? 0 :
+               max_avail - (channel->buf->used + 5);
 }
 
 static void
@@ -209,7 +225,6 @@ static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream)
                        return;
        o_stream_unref(&mstream->parent);
        array_free(&mstream->channels);
-       buffer_free(&mstream->wbuf);
        i_free(mstream);
 }
 
@@ -249,8 +264,7 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid)
        channel->ostream.fd = o_stream_get_fd(mstream->parent);
        array_push_back(&channel->mstream->channels, &channel);
 
-       return o_stream_create(&channel->ostream, mstream->parent,
-                              mstream->bufsize);
+       return o_stream_create(&channel->ostream, mstream->parent, -1);
 }
 
 struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
@@ -269,7 +283,6 @@ struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize
        mstream = i_new(struct multiplex_ostream, 1);
        mstream->parent = parent;
        mstream->bufsize = bufsize;
-       mstream->wbuf = buffer_create_dynamic(default_pool, 256);
        i_array_init(&mstream->channels, 8);
        o_stream_ref(parent);