From: Timo Sirainen Date: Thu, 3 Apr 2014 16:51:52 +0000 (+0300) Subject: Added io_add_istream() and related functionality for combining the ioloop/istream. X-Git-Tag: 2.2.13.rc1~175 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4c096615cb86a826fda377b87df22c579bfe5525;p=thirdparty%2Fdovecot%2Fcore.git Added io_add_istream() and related functionality for combining the ioloop/istream. The idea here is that it's possible to implement asynchronous istreams when there isn't a file descriptor that can be used to reliably receive updates. --- diff --git a/src/lib/ioloop-private.h b/src/lib/ioloop-private.h index b5b4dcd7e4..db6753de43 100644 --- a/src/lib/ioloop-private.h +++ b/src/lib/ioloop-private.h @@ -52,6 +52,9 @@ struct io_file { int refcount; int fd; + + /* only for io_add_istream(), a bit kludgy to be here.. */ + struct istream *istream; }; struct timeout { diff --git a/src/lib/ioloop.c b/src/lib/ioloop.c index ce32af4b20..a853fb733d 100644 --- a/src/lib/ioloop.c +++ b/src/lib/ioloop.c @@ -3,6 +3,7 @@ #include "lib.h" #include "array.h" #include "time-util.h" +#include "istream-private.h" #include "ioloop-private.h" #include @@ -28,10 +29,10 @@ static void io_loop_initialize_handler(struct ioloop *ioloop) io_loop_handler_init(ioloop, initial_fd_count); } -#undef io_add -struct io *io_add(int fd, enum io_condition condition, - unsigned int source_linenum, - io_callback_t *callback, void *context) +static struct io_file * +io_add_file(int fd, enum io_condition condition, + unsigned int source_linenum, + io_callback_t *callback, void *context) { struct io_file *io; @@ -62,6 +63,31 @@ struct io *io_add(int fd, enum io_condition condition, io->next = io->io.ioloop->io_files; } io->io.ioloop->io_files = io; + return io; +} + +#undef io_add +struct io *io_add(int fd, enum io_condition condition, + unsigned int source_linenum, + io_callback_t *callback, void *context) +{ + struct io_file *io; + + io = io_add_file(fd, condition, source_linenum, callback, context); + return &io->io; +} + +#undef io_add_istream +struct io *io_add_istream(struct istream *input, unsigned int source_linenum, + io_callback_t *callback, void *context) +{ + struct io_file *io; + + io = io_add_file(i_stream_get_fd(input), IO_READ, source_linenum, + callback, context); + io->istream = input; + i_stream_ref(io->istream); + i_stream_set_io(io->istream, &io->io); return &io->io; } @@ -106,6 +132,12 @@ static void io_remove_full(struct io **_io, bool closed) else { struct io_file *io_file = (struct io_file *)io; + if (io_file->istream != NULL) { + i_stream_unset_io(io_file->istream, io); + i_stream_unref(&io_file->istream); + io_file->istream = NULL; + } + io_file_unlink(io_file); io_loop_handle_remove(io_file, closed); } @@ -714,8 +746,8 @@ struct ioloop_context *io_loop_get_current_context(struct ioloop *ioloop) struct io *io_loop_move_io(struct io **_io) { - struct io *new_io, *old_io = *_io; - struct io_file *old_io_file; + struct io *old_io = *_io; + struct io_file *old_io_file, *new_io_file; i_assert((old_io->condition & IO_NOTIFY) == 0); @@ -723,13 +755,22 @@ struct io *io_loop_move_io(struct io **_io) return old_io; old_io_file = (struct io_file *)old_io; - new_io = io_add(old_io_file->fd, old_io->condition, - old_io->source_linenum, - old_io->callback, old_io->context); + new_io_file = io_add_file(old_io_file->fd, old_io->condition, + old_io->source_linenum, + old_io->callback, old_io->context); + if (old_io_file->istream != NULL) { + /* reference before io_remove() */ + new_io_file->istream = old_io_file->istream; + i_stream_ref(new_io_file->istream); + } if (old_io->pending) - io_set_pending(new_io); + io_set_pending(&new_io_file->io); io_remove(_io); - return new_io; + if (new_io_file->istream != NULL) { + /* update istream io after it was removed with io_remove() */ + i_stream_set_io(new_io_file->istream, &new_io_file->io); + } + return &new_io_file->io; } struct timeout *io_loop_move_timeout(struct timeout **_timeout) diff --git a/src/lib/ioloop.h b/src/lib/ioloop.h index 421a8da0eb..e0fdd016fb 100644 --- a/src/lib/ioloop.h +++ b/src/lib/ioloop.h @@ -7,6 +7,7 @@ struct io; struct timeout; struct ioloop; +struct istream; enum io_condition { IO_READ = 0x01, @@ -60,6 +61,12 @@ io_add_notify(const char *path, io_callback_t *callback, io_add_notify(path + \ CALLBACK_TYPECHECK(callback, void (*)(typeof(context))), \ (io_callback_t *)callback, context, io_r) +struct io *io_add_istream(struct istream *input, unsigned int source_linenum, + io_callback_t *callback, void *context) ATTR_NULL(3); +#define io_add_istream(input, callback, context) \ + io_add_istream(input, __LINE__ + \ + CALLBACK_TYPECHECK(callback, void (*)(typeof(context))), \ + (io_callback_t *)callback, context) /* Remove I/O handler, and set io pointer to NULL. */ void io_remove(struct io **io); diff --git a/src/lib/istream-private.h b/src/lib/istream-private.h index 80ede30210..54a91d83af 100644 --- a/src/lib/istream-private.h +++ b/src/lib/istream-private.h @@ -6,6 +6,8 @@ #define I_STREAM_MIN_SIZE IO_BLOCK_SIZE +struct io; + struct istream_private { /* inheritance: */ struct iostream_private iostream; @@ -17,6 +19,7 @@ struct istream_private { void (*sync)(struct istream_private *stream); int (*stat)(struct istream_private *stream, bool exact); int (*get_size)(struct istream_private *stream, bool exact, uoff_t *size_r); + void (*switch_ioloop)(struct istream_private *stream); /* data: */ struct istream istream; @@ -24,6 +27,8 @@ struct istream_private { int fd; uoff_t abs_start_offset; struct stat statbuf; + /* added by io_add_istream() -> i_stream_set_io() */ + struct io *io; const unsigned char *buffer; unsigned char *w_buffer; /* may be NULL */ @@ -66,4 +71,7 @@ ssize_t i_stream_read_copy_from_parent(struct istream *istream); void i_stream_default_seek_nonseekable(struct istream_private *stream, uoff_t v_offset, bool mark); +void i_stream_set_io(struct istream *stream, struct io *io); +void i_stream_unset_io(struct istream *stream, struct io *io); + #endif diff --git a/src/lib/istream.c b/src/lib/istream.c index e35513b65d..82b77a76ed 100644 --- a/src/lib/istream.c +++ b/src/lib/istream.c @@ -608,6 +608,50 @@ bool i_stream_add_data(struct istream *_stream, const unsigned char *data, return TRUE; } +void i_stream_set_input_pending(struct istream *stream, bool pending) +{ + if (!pending) + return; + + while (stream->real_stream->parent != NULL) { + i_assert(stream->real_stream->io == NULL); + stream = stream->real_stream->parent; + } + if (stream->real_stream->io != NULL) + io_set_pending(stream->real_stream->io); +} + +void i_stream_switch_ioloop(struct istream *stream) +{ + do { + if (stream->real_stream->switch_ioloop != NULL) + stream->real_stream->switch_ioloop(stream->real_stream); + stream = stream->real_stream->parent; + } while (stream != NULL); +} + +void i_stream_set_io(struct istream *stream, struct io *io) +{ + while (stream->real_stream->parent != NULL) { + i_assert(stream->real_stream->io == NULL); + stream = stream->real_stream->parent; + } + + i_assert(stream->real_stream->io == NULL); + stream->real_stream->io = io; +} + +void i_stream_unset_io(struct istream *stream, struct io *io) +{ + while (stream->real_stream->parent != NULL) { + i_assert(stream->real_stream->io == NULL); + stream = stream->real_stream->parent; + } + + i_assert(stream->real_stream->io == io); + stream->real_stream->io = NULL; +} + static void i_stream_default_set_max_buffer_size(struct iostream_private *stream, size_t max_size) diff --git a/src/lib/istream.h b/src/lib/istream.h index 9c9b8bc040..3e6e76e5ed 100644 --- a/src/lib/istream.h +++ b/src/lib/istream.h @@ -157,4 +157,10 @@ int i_stream_read_data(struct istream *stream, const unsigned char **data_r, bool i_stream_add_data(struct istream *stream, const unsigned char *data, size_t size); +void i_stream_set_input_pending(struct istream *stream, bool pending); + +/* If there are any I/O loop items associated with the stream, move all of + them to current_ioloop. */ +void i_stream_switch_ioloop(struct istream *stream); + #endif