#include "lib.h"
#include "array.h"
#include "time-util.h"
+#include "istream-private.h"
#include "ioloop-private.h"
#include <unistd.h>
io_loop_handler_init(ioloop, initial_fd_count);
}
-#undef io_add
-struct io *io_add(int fd, enum io_condition condition,
- unsigned int source_linenum,
- io_callback_t *callback, void *context)
+static struct io_file *
+io_add_file(int fd, enum io_condition condition,
+ unsigned int source_linenum,
+ io_callback_t *callback, void *context)
{
struct io_file *io;
io->next = io->io.ioloop->io_files;
}
io->io.ioloop->io_files = io;
+ return io;
+}
+
+#undef io_add
+struct io *io_add(int fd, enum io_condition condition,
+ unsigned int source_linenum,
+ io_callback_t *callback, void *context)
+{
+ struct io_file *io;
+
+ io = io_add_file(fd, condition, source_linenum, callback, context);
+ return &io->io;
+}
+
+#undef io_add_istream
+struct io *io_add_istream(struct istream *input, unsigned int source_linenum,
+ io_callback_t *callback, void *context)
+{
+ struct io_file *io;
+
+ io = io_add_file(i_stream_get_fd(input), IO_READ, source_linenum,
+ callback, context);
+ io->istream = input;
+ i_stream_ref(io->istream);
+ i_stream_set_io(io->istream, &io->io);
return &io->io;
}
else {
struct io_file *io_file = (struct io_file *)io;
+ if (io_file->istream != NULL) {
+ i_stream_unset_io(io_file->istream, io);
+ i_stream_unref(&io_file->istream);
+ io_file->istream = NULL;
+ }
+
io_file_unlink(io_file);
io_loop_handle_remove(io_file, closed);
}
struct io *io_loop_move_io(struct io **_io)
{
- struct io *new_io, *old_io = *_io;
- struct io_file *old_io_file;
+ struct io *old_io = *_io;
+ struct io_file *old_io_file, *new_io_file;
i_assert((old_io->condition & IO_NOTIFY) == 0);
return old_io;
old_io_file = (struct io_file *)old_io;
- new_io = io_add(old_io_file->fd, old_io->condition,
- old_io->source_linenum,
- old_io->callback, old_io->context);
+ new_io_file = io_add_file(old_io_file->fd, old_io->condition,
+ old_io->source_linenum,
+ old_io->callback, old_io->context);
+ if (old_io_file->istream != NULL) {
+ /* reference before io_remove() */
+ new_io_file->istream = old_io_file->istream;
+ i_stream_ref(new_io_file->istream);
+ }
if (old_io->pending)
- io_set_pending(new_io);
+ io_set_pending(&new_io_file->io);
io_remove(_io);
- return new_io;
+ if (new_io_file->istream != NULL) {
+ /* update istream io after it was removed with io_remove() */
+ i_stream_set_io(new_io_file->istream, &new_io_file->io);
+ }
+ return &new_io_file->io;
}
struct timeout *io_loop_move_timeout(struct timeout **_timeout)
struct io;
struct timeout;
struct ioloop;
+struct istream;
enum io_condition {
IO_READ = 0x01,
io_add_notify(path + \
CALLBACK_TYPECHECK(callback, void (*)(typeof(context))), \
(io_callback_t *)callback, context, io_r)
+struct io *io_add_istream(struct istream *input, unsigned int source_linenum,
+ io_callback_t *callback, void *context) ATTR_NULL(3);
+#define io_add_istream(input, callback, context) \
+ io_add_istream(input, __LINE__ + \
+ CALLBACK_TYPECHECK(callback, void (*)(typeof(context))), \
+ (io_callback_t *)callback, context)
/* Remove I/O handler, and set io pointer to NULL. */
void io_remove(struct io **io);
#define I_STREAM_MIN_SIZE IO_BLOCK_SIZE
+struct io;
+
struct istream_private {
/* inheritance: */
struct iostream_private iostream;
void (*sync)(struct istream_private *stream);
int (*stat)(struct istream_private *stream, bool exact);
int (*get_size)(struct istream_private *stream, bool exact, uoff_t *size_r);
+ void (*switch_ioloop)(struct istream_private *stream);
/* data: */
struct istream istream;
int fd;
uoff_t abs_start_offset;
struct stat statbuf;
+ /* added by io_add_istream() -> i_stream_set_io() */
+ struct io *io;
const unsigned char *buffer;
unsigned char *w_buffer; /* may be NULL */
void i_stream_default_seek_nonseekable(struct istream_private *stream,
uoff_t v_offset, bool mark);
+void i_stream_set_io(struct istream *stream, struct io *io);
+void i_stream_unset_io(struct istream *stream, struct io *io);
+
#endif
return TRUE;
}
+void i_stream_set_input_pending(struct istream *stream, bool pending)
+{
+ if (!pending)
+ return;
+
+ while (stream->real_stream->parent != NULL) {
+ i_assert(stream->real_stream->io == NULL);
+ stream = stream->real_stream->parent;
+ }
+ if (stream->real_stream->io != NULL)
+ io_set_pending(stream->real_stream->io);
+}
+
+void i_stream_switch_ioloop(struct istream *stream)
+{
+ do {
+ if (stream->real_stream->switch_ioloop != NULL)
+ stream->real_stream->switch_ioloop(stream->real_stream);
+ stream = stream->real_stream->parent;
+ } while (stream != NULL);
+}
+
+void i_stream_set_io(struct istream *stream, struct io *io)
+{
+ while (stream->real_stream->parent != NULL) {
+ i_assert(stream->real_stream->io == NULL);
+ stream = stream->real_stream->parent;
+ }
+
+ i_assert(stream->real_stream->io == NULL);
+ stream->real_stream->io = io;
+}
+
+void i_stream_unset_io(struct istream *stream, struct io *io)
+{
+ while (stream->real_stream->parent != NULL) {
+ i_assert(stream->real_stream->io == NULL);
+ stream = stream->real_stream->parent;
+ }
+
+ i_assert(stream->real_stream->io == io);
+ stream->real_stream->io = NULL;
+}
+
static void
i_stream_default_set_max_buffer_size(struct iostream_private *stream,
size_t max_size)