buffer_t *buf;
time_t last_sent;
bool closed:1;
+ bool corked:1;
};
struct multiplex_ostream {
return -2;
while((channel = get_next_channel(mstream)) != NULL) {
+ if (channel->buf->used == 0)
+ continue;
size_t tmp = mstream->bufsize - mstream->wbuf->used - 5;
/* ensure it fits into 32 bit int */
size_t amt = I_MIN(UINT_MAX, I_MIN(tmp, channel->buf->used));
return ret;
}
+static int o_stream_multiplex_ochannel_flush(struct ostream_private *stream)
+{
+ ssize_t ret;
+ struct multiplex_ochannel *channel = (struct multiplex_ochannel *)stream;
+ struct multiplex_ostream *mstream = channel->mstream;
+
+ /* flush parent stream always, so there is room for more. */
+ if ((ret = o_stream_flush(mstream->parent)) <= 0) {
+ if (ret == -1)
+ propagate_error(mstream, mstream->parent->stream_errno);
+ return ret;
+ }
+
+ /* send all channels */
+ if (o_stream_multiplex_sendv(mstream) < 0)
+ return -1;
+
+ if (channel->buf->used > 0)
+ return 0;
+ return 1;
+}
+
+static void o_stream_multiplex_ochannel_cork(struct ostream_private *stream, bool set)
+{
+ struct multiplex_ochannel *channel = (struct multiplex_ochannel*)stream;
+ if (channel->corked != set && !set) {
+ /* flush */
+ (void)o_stream_multiplex_ochannel_flush(stream);
+ }
+ channel->corked = set;
+}
+
static ssize_t
o_stream_multiplex_ochannel_sendv(struct ostream_private *stream,
const struct const_iovec *iov, unsigned int iov_count)
stream->ostream.offset += total;
+ /* will send later */
+ if (channel->corked)
+ return total;
+
if ((ret = o_stream_multiplex_sendv(channel->mstream)) < 0)
return ret;
channel->cid = cid;
channel->buf = buffer_create_dynamic(default_pool, 256);
channel->mstream = mstream;
+ channel->ostream.cork = o_stream_multiplex_ochannel_cork;
+ channel->ostream.flush = o_stream_multiplex_ochannel_flush;
channel->ostream.sendv = o_stream_multiplex_ochannel_sendv;
channel->ostream.get_buffer_used_size =
o_stream_multiplex_ochannel_get_buffer_used_size;