for (unsigned int i = 0; i < iov_count; i++) {
/* does it actually fit there */
- if (zstream->output.pos + iov[i].iov_len >= zstream->output.size)
- break;
ZSTD_inBuffer input = {
.src = iov[i].iov_base,
.pos = 0,
.size = iov[i].iov_len
};
- ret = ZSTD_compressStream(zstream->cstream, &zstream->output,
- &input);
- if (ZSTD_isError(ret) != 0) {
- o_stream_zstd_write_error(zstream, ret);
- return -1;
+ bool flush_attempted = FALSE;
+ for (;;) {
+ size_t prev_pos = input.pos;
+ ret = ZSTD_compressStream(zstream->cstream, &zstream->output,
+ &input);
+ if (ZSTD_isError(ret) != 0) {
+ o_stream_zstd_write_error(zstream, ret);
+ return -1;
+ }
+ size_t new_input_size = input.pos - prev_pos;
+ if (new_input_size == 0 && flush_attempted) {
+ /* non-blocking output buffer full */
+ return total;
+ }
+ stream->ostream.offset += new_input_size;
+ total += new_input_size;
+ if (input.pos == input.size)
+ break;
+ /* output buffer full. try to flush it. */
+ if (o_stream_zstd_send_outbuf(zstream) < 0)
+ return -1;
+ flush_attempted = TRUE;
}
- total += input.pos;
}
if (o_stream_zstd_send_outbuf(zstream) < 0)
return -1;
- stream->ostream.offset += total;
return total;
}