struct tee_istream *tee;
struct tee_child_istream *next;
+
+ unsigned int last_read_waiting:1;
};
static void tee_streams_update_buffer(struct tee_istream *tee)
uoff_t last_high_offset;
ssize_t ret;
+ tstream->last_read_waiting = FALSE;
if (stream->buffer == NULL) {
/* initial read */
tee_streams_update_buffer(tstream->tee);
if (ret == -2 && stream->skip != 0) {
/* someone else is holding the data,
wait for it */
+ tstream->last_read_waiting = TRUE;
return 0;
}
stream->istream.stream_errno = input->stream_errno;
return i_stream_create(&tstream->istream, NULL,
i_stream_get_fd(tee->input));
}
+
+bool tee_i_stream_child_is_waiting(struct istream *input)
+{
+ struct tee_child_istream *tstream =
+ (struct tee_child_istream *)input->real_stream;
+
+ return tstream->last_read_waiting;
+}
If the stream's buffer gets full because some child isn't consuming the
data, other streams get returned 0 by i_stream_read(). */
struct tee_istream *tee_i_stream_create(struct istream *input);
+/* Returns TRUE if last read() operation returned 0, because it was waiting
+ for another tee stream to read more of its data. */
+bool tee_i_stream_child_is_waiting(struct istream *input);
struct istream *tee_i_stream_create_child(struct tee_istream *tee);
test_istream_set_size(test_input, len);
for (i = 0; i < CHILD_COUNT; i++) {
test_assert(i_stream_read(child_input[i]) == 1);
+ test_assert(!tee_i_stream_child_is_waiting(child_input[i]));
test_assert(i_stream_read(child_input[i]) == 0);
+ test_assert(!tee_i_stream_child_is_waiting(child_input[i]));
}
}
for (i = 0; i < CHILD_COUNT; i++) {
test_assert(i_stream_read(child_input[i]) == 1);
test_assert(i_stream_read(child_input[i]) == -2);
+ test_assert(!tee_i_stream_child_is_waiting(child_input[i]));
}
for (len++; len <= TEST_STR_LEN; len++) {
test_istream_set_size(test_input, len);
- for (i = 0; i < CHILD_COUNT; i++)
+ for (i = 0; i < CHILD_COUNT; i++) {
test_assert(i_stream_read(child_input[i]) == -2);
+ test_assert(!tee_i_stream_child_is_waiting(child_input[i]));
+ }
for (i = 0; i < CHILD_COUNT-1; i++) {
i_stream_skip(child_input[i], 1);
test_assert(i_stream_read(child_input[i]) == 0);
+ test_assert(tee_i_stream_child_is_waiting(child_input[i]));
}
i_stream_skip(child_input[i], 1);
for (i = 0; i < CHILD_COUNT; i++) {
test_assert(i_stream_read(child_input[i]) == 1);
test_assert(i_stream_read(child_input[i]) == -2);
+ test_assert(!tee_i_stream_child_is_waiting(child_input[i]));
}
}
- for (i = 0; i < CHILD_COUNT; i++) {
+ for (i = 0; i < CHILD_COUNT-1; i++) {
i_stream_skip(child_input[i], 1);
test_assert(i_stream_read(child_input[i]) == 0);
+ test_assert(tee_i_stream_child_is_waiting(child_input[i]));
}
+ i_stream_skip(child_input[i], 1);
+ test_assert(i_stream_read(child_input[i]) == 0);
+ test_assert(!tee_i_stream_child_is_waiting(child_input[i]));
+
test_istream_set_allow_eof(test_input, TRUE);
for (i = 0; i < CHILD_COUNT; i++) {
test_assert(i_stream_read(child_input[i]) == -1);