From: Aki Tuomi Date: Tue, 21 May 2019 09:43:46 +0000 (+0300) Subject: lib: ostream-multiplex - Optimize writing X-Git-Tag: 2.3.8~187 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=8a55fddfd1ab8bef8dc7b7d3174b28fd747e770a;p=thirdparty%2Fdovecot%2Fcore.git lib: ostream-multiplex - Optimize writing --- diff --git a/src/lib/ostream-multiplex.c b/src/lib/ostream-multiplex.c index 60805b8c5a..71cb871be1 100644 --- a/src/lib/ostream-multiplex.c +++ b/src/lib/ostream-multiplex.c @@ -26,7 +26,6 @@ struct multiplex_ostream { /* channel 0 is main channel */ uint8_t cur_channel; unsigned int remain; - buffer_t *wbuf; size_t bufsize; ARRAY(struct multiplex_ochannel *) channels; @@ -70,35 +69,39 @@ o_stream_multiplex_sendv(struct multiplex_ostream *mstream) { struct multiplex_ochannel *channel; ssize_t ret = 0; - if (mstream->bufsize <= mstream->wbuf->used + 5) - return -2; while((channel = get_next_channel(mstream)) != NULL) { if (channel->buf->used == 0) continue; - size_t tmp = mstream->bufsize - mstream->wbuf->used - 5; + if (o_stream_get_buffer_avail_size(mstream->parent) < 6) + return 0; + /* check parent stream capacity */ + size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5; /* ensure it fits into 32 bit int */ size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used)); + /* ensure amt fits */ if (tmp == 0) break; + /* delay corking here now that we are going to send something */ + if (!o_stream_is_corked(mstream->parent)) + o_stream_cork(mstream->parent); uint32_t len = cpu32_to_be(amt); - buffer_append(mstream->wbuf, &channel->cid, 1); - buffer_append(mstream->wbuf, &len, 4); - buffer_append(mstream->wbuf, channel->buf->data, amt); - buffer_delete(channel->buf, 0, amt); - channel->last_sent = ioloop_time; - } - - if (mstream->wbuf->used > 0) { - ret = o_stream_send(mstream->parent, mstream->wbuf->data, - mstream->wbuf->used); - if (ret < 0) { + const struct const_iovec vec[] = { + { &channel->cid, 1 }, + { &len, 4 }, + { channel->buf->data, amt } + }; + if ((ret = o_stream_sendv(mstream->parent, vec, N_ELEMENTS(vec))) < 0) { + i_assert(ret != -2); propagate_error(mstream, mstream->parent->stream_errno); - return ret; + break; } - buffer_delete(mstream->wbuf, 0, ret); + buffer_delete(channel->buf, 0, amt); + channel->last_sent = ioloop_time; } - return ret; + if (o_stream_is_corked(mstream->parent)) + o_stream_uncork(mstream->parent); + return 0; } static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream) @@ -138,14 +141,25 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, const struct const_iovec *iov, unsigned int iov_count) { struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream; - ssize_t ret; - size_t total = 0; - if (channel->mstream->bufsize <= channel->buf->used) - return -2; + size_t total = 0, avail = o_stream_get_buffer_avail_size(&stream->ostream); + size_t optimal_size = I_MIN(IO_BLOCK_SIZE, avail); + + for (unsigned int i = 0; i < iov_count; i++) + total += iov[i].iov_len; + + if (avail < total) { + if (o_stream_multiplex_sendv(channel->mstream) < 0) + return -1; + avail = o_stream_get_buffer_avail_size(&stream->ostream); + if (avail == 0) + return -2; + } + + total = 0; for (unsigned int i = 0; i < iov_count; i++) { /* copy data to buffer */ - size_t tmp = channel->mstream->bufsize - channel->buf->used; + size_t tmp = avail - total; if (tmp == 0) break; buffer_append(channel->buf, iov[i].iov_base, @@ -156,12 +170,11 @@ o_stream_multiplex_ochannel_sendv(struct ostream_private *stream, stream->ostream.offset += total; /* will send later */ - if (channel->corked) + if (channel->corked && channel->buf->used < optimal_size) return total; - if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0) - return ret; - + if (o_stream_multiplex_sendv(channel->mstream) < 0) + return -1; return total; } @@ -171,7 +184,7 @@ o_stream_multiplex_ochannel_get_buffer_used_size(const struct ostream_private *s const struct multiplex_ochannel *channel = (const struct multiplex_ochannel*)stream; - return channel->buf->used + channel->mstream->wbuf->used + + return channel->buf->used + o_stream_get_buffer_used_size(channel->mstream->parent); } @@ -180,9 +193,12 @@ o_stream_multiplex_ochannel_get_buffer_avail_size(const struct ostream_private * { const struct multiplex_ochannel *channel = (const struct multiplex_ochannel*)stream; + size_t max_avail = I_MIN(channel->mstream->bufsize, + o_stream_get_buffer_avail_size(stream->parent)); - return channel->mstream->bufsize <= channel->buf->used ? 0 : - channel->mstream->bufsize - channel->buf->used; + /* There is 5-byte overhead per message, so take that into account */ + return max_avail <= (channel->buf->used + 5) ? 0 : + max_avail - (channel->buf->used + 5); } static void @@ -209,7 +225,6 @@ static void o_stream_multiplex_try_destroy(struct multiplex_ostream *mstream) return; o_stream_unref(&mstream->parent); array_free(&mstream->channels); - buffer_free(&mstream->wbuf); i_free(mstream); } @@ -249,8 +264,7 @@ o_stream_add_channel_real(struct multiplex_ostream *mstream, uint8_t cid) channel->ostream.fd = o_stream_get_fd(mstream->parent); array_push_back(&channel->mstream->channels, &channel); - return o_stream_create(&channel->ostream, mstream->parent, - mstream->bufsize); + return o_stream_create(&channel->ostream, mstream->parent, -1); } struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid) @@ -269,7 +283,6 @@ struct ostream *o_stream_create_multiplex(struct ostream *parent, size_t bufsize mstream = i_new(struct multiplex_ostream, 1); mstream->parent = parent; mstream->bufsize = bufsize; - mstream->wbuf = buffer_create_dynamic(default_pool, 256); i_array_init(&mstream->channels, 8); o_stream_ref(parent);