/* channel 0 is main channel */
uint8_t cur_channel;
unsigned int remain;
- buffer_t *wbuf;
size_t bufsize;
ARRAY(struct multiplex_ochannel *) channels;
{
struct multiplex_ochannel *channel;
ssize_t ret = 0;
- if (mstream->bufsize <= mstream->wbuf->used + 5)
- return -2;
while((channel = get_next_channel(mstream)) != NULL) {
if (channel->buf->used == 0)
continue;
- size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
+ if (o_stream_get_buffer_avail_size(mstream->parent) < 6)
+ return 0;
+ /* check parent stream capacity */
+ size_t tmp = o_stream_get_buffer_avail_size(mstream->parent) - 5;
/* ensure it fits into 32 bit int */
size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
+ /* ensure amt fits */
if (tmp == 0)
break;
+ /* delay corking here now that we are going to send something */
+ if (!o_stream_is_corked(mstream->parent))
+ o_stream_cork(mstream->parent);
uint32_t len = cpu32_to_be(amt);
- buffer_append(mstream->wbuf, &channel->cid, 1);
- buffer_append(mstream->wbuf, &len, 4);
- buffer_append(mstream->wbuf, channel->buf->data, amt);
- buffer_delete(channel->buf, 0, amt);
- channel->last_sent = ioloop_time;
- }
-
- if (mstream->wbuf->used > 0) {
- ret = o_stream_send(mstream->parent, mstream->wbuf->data,
- mstream->wbuf->used);
- if (ret < 0) {
+ const struct const_iovec vec[] = {
+ { &channel->cid, 1 },
+ { &len, 4 },
+ { channel->buf->data, amt }
+ };
+ if ((ret = o_stream_sendv(mstream->parent, vec, N_ELEMENTS(vec))) < 0) {
+ i_assert(ret != -2);
propagate_error(mstream, mstream->parent->stream_errno);
- return ret;
+ break;
}
- buffer_delete(mstream->wbuf, 0, ret);
+ buffer_delete(channel->buf, 0, amt);
+ channel->last_sent = ioloop_time;
}
- return ret;
+ if (o_stream_is_corked(mstream->parent))
+ o_stream_uncork(mstream->parent);
+ return 0;
}
static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream)
const struct const_iovec *iov, unsigned int iov_count)
{
struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
- ssize_t ret;
- size_t total = 0;
- if (channel->mstream->bufsize <= channel->buf->used)
- return -2;
+ size_t total = 0, avail = o_stream_get_buffer_avail_size(&stream->ostream);
+ size_t optimal_size = I_MIN(IO_BLOCK_SIZE, avail);
+
+ for (unsigned int i = 0; i < iov_count; i++)
+ total += iov[i].iov_len;
+
+ if (avail < total) {
+ if (o_stream_multiplex_sendv(channel->mstream) < 0)
+ return -1;
+ avail = o_stream_get_buffer_avail_size(&stream->ostream);
+ if (avail == 0)
+ return -2;
+ }
+
+ total = 0;
for (unsigned int i = 0; i < iov_count; i++) {
/* copy data to buffer */
- size_t tmp = channel->mstream->bufsize - channel->buf->used;
+ size_t tmp = avail - total;
if (tmp == 0)
break;
buffer_append(channel->buf, iov[i].iov_base,
stream->ostream.offset += total;
/* will send later */
- if (channel->corked)
+ if (channel->corked && channel->buf->used < optimal_size)
return total;
- if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
- return ret;
-
+ if (o_stream_multiplex_sendv(channel->mstream) < 0)
+ return -1;
return total;
}
const struct multiplex_ochannel *channel =
(const struct multiplex_ochannel*)stream;
- return channel->buf->used + channel->mstream->wbuf->used +
+ return channel->buf->used +
o_stream_get_buffer_used_size(channel->mstream->parent);
}
{
const struct multiplex_ochannel *channel =
(const struct multiplex_ochannel*)stream;
+ size_t max_avail = I_MIN(channel->mstream->bufsize,
+ o_stream_get_buffer_avail_size(stream->parent));
- return channel->mstream->bufsize <= channel->buf->used ? 0 :
- channel->mstream->bufsize - channel->buf->used;
+ /* There is 5-byte overhead per message, so take that into account */
+ return max_avail <= (channel->buf->used + 5) ? 0 :
+ max_avail - (channel->buf->used + 5);
}
static void
return;
o_stream_unref(&mstream->parent);
array_free(&mstream->channels);
- buffer_free(&mstream->wbuf);
i_free(mstream);
}
channel->ostream.fd = o_stream_get_fd(mstream->parent);
array_push_back(&channel->mstream->channels, &channel);
- return o_stream_create(&channel->ostream, mstream->parent,
- mstream->bufsize);
+ return o_stream_create(&channel->ostream, mstream->parent, -1);
}
struct ostream *o_stream_multiplex_add_channel(struct ostream *stream, uint8_t cid)
mstream = i_new(struct multiplex_ostream, 1);
mstream->parent = parent;
mstream->bufsize = bufsize;
- mstream->wbuf = buffer_create_dynamic(default_pool, 256);
i_array_init(&mstream->channels, 8);
o_stream_ref(parent);