]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
Added io_add_istream() and related functionality for combining the ioloop/istream.
authorTimo Sirainen <tss@iki.fi>
Thu, 3 Apr 2014 16:51:52 +0000 (19:51 +0300)
committerTimo Sirainen <tss@iki.fi>
Thu, 3 Apr 2014 16:51:52 +0000 (19:51 +0300)
The idea here is that it's possible to implement asynchronous istreams when
there isn't a file descriptor that can be used to reliably receive updates.

src/lib/ioloop-private.h
src/lib/ioloop.c
src/lib/ioloop.h
src/lib/istream-private.h
src/lib/istream.c
src/lib/istream.h

index b5b4dcd7e44780a84a14644d7f64e6809c5b0b47..db6753de43c81f1c8c855543d3aa49af6fd0ff3f 100644 (file)
@@ -52,6 +52,9 @@ struct io_file {
 
        int refcount;
        int fd;
+
+       /* only for io_add_istream(), a bit kludgy to be here.. */
+       struct istream *istream;
 };
 
 struct timeout {
index ce32af4b20904a049f370248f5bd856a637654e3..a853fb733d89bbe050af1ccc210b94022e3b1f31 100644 (file)
@@ -3,6 +3,7 @@
 #include "lib.h"
 #include "array.h"
 #include "time-util.h"
+#include "istream-private.h"
 #include "ioloop-private.h"
 
 #include <unistd.h>
@@ -28,10 +29,10 @@ static void io_loop_initialize_handler(struct ioloop *ioloop)
        io_loop_handler_init(ioloop, initial_fd_count);
 }
 
-#undef io_add
-struct io *io_add(int fd, enum io_condition condition,
-                 unsigned int source_linenum,
-                 io_callback_t *callback, void *context)
+static struct io_file *
+io_add_file(int fd, enum io_condition condition,
+           unsigned int source_linenum,
+           io_callback_t *callback, void *context)
 {
        struct io_file *io;
 
@@ -62,6 +63,31 @@ struct io *io_add(int fd, enum io_condition condition,
                io->next = io->io.ioloop->io_files;
        }
        io->io.ioloop->io_files = io;
+       return io;
+}
+
+#undef io_add
+struct io *io_add(int fd, enum io_condition condition,
+                 unsigned int source_linenum,
+                 io_callback_t *callback, void *context)
+{
+       struct io_file *io;
+
+       io = io_add_file(fd, condition, source_linenum, callback, context);
+       return &io->io;
+}
+
+#undef io_add_istream
+struct io *io_add_istream(struct istream *input, unsigned int source_linenum,
+                         io_callback_t *callback, void *context)
+{
+       struct io_file *io;
+
+       io = io_add_file(i_stream_get_fd(input), IO_READ, source_linenum,
+                        callback, context);
+       io->istream = input;
+       i_stream_ref(io->istream);
+       i_stream_set_io(io->istream, &io->io);
        return &io->io;
 }
 
@@ -106,6 +132,12 @@ static void io_remove_full(struct io **_io, bool closed)
        else {
                struct io_file *io_file = (struct io_file *)io;
 
+               if (io_file->istream != NULL) {
+                       i_stream_unset_io(io_file->istream, io);
+                       i_stream_unref(&io_file->istream);
+                       io_file->istream = NULL;
+               }
+
                io_file_unlink(io_file);
                io_loop_handle_remove(io_file, closed);
        }
@@ -714,8 +746,8 @@ struct ioloop_context *io_loop_get_current_context(struct ioloop *ioloop)
 
 struct io *io_loop_move_io(struct io **_io)
 {
-       struct io *new_io, *old_io = *_io;
-       struct io_file *old_io_file;
+       struct io *old_io = *_io;
+       struct io_file *old_io_file, *new_io_file;
 
        i_assert((old_io->condition & IO_NOTIFY) == 0);
 
@@ -723,13 +755,22 @@ struct io *io_loop_move_io(struct io **_io)
                return old_io;
 
        old_io_file = (struct io_file *)old_io;
-       new_io = io_add(old_io_file->fd, old_io->condition,
-                       old_io->source_linenum,
-                       old_io->callback, old_io->context);
+       new_io_file = io_add_file(old_io_file->fd, old_io->condition,
+                                 old_io->source_linenum,
+                                 old_io->callback, old_io->context);
+       if (old_io_file->istream != NULL) {
+               /* reference before io_remove() */
+               new_io_file->istream = old_io_file->istream;
+               i_stream_ref(new_io_file->istream);
+       }
        if (old_io->pending)
-               io_set_pending(new_io);
+               io_set_pending(&new_io_file->io);
        io_remove(_io);
-       return new_io;
+       if (new_io_file->istream != NULL) {
+               /* update istream io after it was removed with io_remove() */
+               i_stream_set_io(new_io_file->istream, &new_io_file->io);
+       }
+       return &new_io_file->io;
 }
 
 struct timeout *io_loop_move_timeout(struct timeout **_timeout)
index 421a8da0eb2a9d74247d4eb57864ec76072cad20..e0fdd016fbe549d1a7a06f5e0578fdbf0ccab7c4 100644 (file)
@@ -7,6 +7,7 @@
 struct io;
 struct timeout;
 struct ioloop;
+struct istream;
 
 enum io_condition {
        IO_READ         = 0x01,
@@ -60,6 +61,12 @@ io_add_notify(const char *path, io_callback_t *callback,
        io_add_notify(path + \
                CALLBACK_TYPECHECK(callback, void (*)(typeof(context))), \
                (io_callback_t *)callback, context, io_r)
+struct io *io_add_istream(struct istream *input, unsigned int source_linenum,
+                         io_callback_t *callback, void *context) ATTR_NULL(3);
+#define io_add_istream(input, callback, context) \
+       io_add_istream(input, __LINE__ + \
+               CALLBACK_TYPECHECK(callback, void (*)(typeof(context))), \
+               (io_callback_t *)callback, context)
 
 /* Remove I/O handler, and set io pointer to NULL. */
 void io_remove(struct io **io);
index 80ede3021051610e4b5eb7d65bae22b18158b9f0..54a91d83af3647abe30a9b3eadad4a366312d5c6 100644 (file)
@@ -6,6 +6,8 @@
 
 #define I_STREAM_MIN_SIZE IO_BLOCK_SIZE
 
+struct io;
+
 struct istream_private {
 /* inheritance: */
        struct iostream_private iostream;
@@ -17,6 +19,7 @@ struct istream_private {
        void (*sync)(struct istream_private *stream);
        int (*stat)(struct istream_private *stream, bool exact);
        int (*get_size)(struct istream_private *stream, bool exact, uoff_t *size_r);
+       void (*switch_ioloop)(struct istream_private *stream);
 
 /* data: */
        struct istream istream;
@@ -24,6 +27,8 @@ struct istream_private {
        int fd;
        uoff_t abs_start_offset;
        struct stat statbuf;
+       /* added by io_add_istream() -> i_stream_set_io() */
+       struct io *io;
 
        const unsigned char *buffer;
        unsigned char *w_buffer; /* may be NULL */
@@ -66,4 +71,7 @@ ssize_t i_stream_read_copy_from_parent(struct istream *istream);
 void i_stream_default_seek_nonseekable(struct istream_private *stream,
                                       uoff_t v_offset, bool mark);
 
+void i_stream_set_io(struct istream *stream, struct io *io);
+void i_stream_unset_io(struct istream *stream, struct io *io);
+
 #endif
index e35513b65d92566a9a26a7bbc84188d788ec20eb..82b77a76eda4c03a461b3232a463144582589c8d 100644 (file)
@@ -608,6 +608,50 @@ bool i_stream_add_data(struct istream *_stream, const unsigned char *data,
        return TRUE;
 }
 
+void i_stream_set_input_pending(struct istream *stream, bool pending)
+{
+       if (!pending)
+               return;
+
+       while (stream->real_stream->parent != NULL) {
+               i_assert(stream->real_stream->io == NULL);
+               stream = stream->real_stream->parent;
+       }
+       if (stream->real_stream->io != NULL)
+               io_set_pending(stream->real_stream->io);
+}
+
+void i_stream_switch_ioloop(struct istream *stream)
+{
+       do {
+               if (stream->real_stream->switch_ioloop != NULL)
+                       stream->real_stream->switch_ioloop(stream->real_stream);
+               stream = stream->real_stream->parent;
+       } while (stream != NULL);
+}
+
+void i_stream_set_io(struct istream *stream, struct io *io)
+{
+       while (stream->real_stream->parent != NULL) {
+               i_assert(stream->real_stream->io == NULL);
+               stream = stream->real_stream->parent;
+       }
+
+       i_assert(stream->real_stream->io == NULL);
+       stream->real_stream->io = io;
+}
+
+void i_stream_unset_io(struct istream *stream, struct io *io)
+{
+       while (stream->real_stream->parent != NULL) {
+               i_assert(stream->real_stream->io == NULL);
+               stream = stream->real_stream->parent;
+       }
+
+       i_assert(stream->real_stream->io == io);
+       stream->real_stream->io = NULL;
+}
+
 static void
 i_stream_default_set_max_buffer_size(struct iostream_private *stream,
                                     size_t max_size)
index 9c9b8bc040ae321bd37edd2f4b72c13825c1c552..3e6e76e5edb95848c5583a6088fe82ed4655fe61 100644 (file)
@@ -157,4 +157,10 @@ int i_stream_read_data(struct istream *stream, const unsigned char **data_r,
 bool i_stream_add_data(struct istream *stream, const unsigned char *data,
                       size_t size);
 
+void i_stream_set_input_pending(struct istream *stream, bool pending);
+
+/* If there are any I/O loop items associated with the stream, move all of
+   them to current_ioloop. */
+void i_stream_switch_ioloop(struct istream *stream);
+
 #endif