From: Martin Willi Date: Thu, 27 Jun 2013 13:49:11 +0000 (+0200) Subject: stream: support async operation using watcher X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f63f4f86e6f1b54c08ce47dc9dfec246d7a7970a;p=thirdparty%2Fstrongswan.git stream: support async operation using watcher --- diff --git a/src/libstrongswan/networking/streams/stream.c b/src/libstrongswan/networking/streams/stream.c index 144792e08c..d3b67761ed 100644 --- a/src/libstrongswan/networking/streams/stream.c +++ b/src/libstrongswan/networking/streams/stream.c @@ -38,6 +38,28 @@ struct private_stream_t { * FILE* for convenience functions, or NULL */ FILE *file; + + /** + * Callback if data is ready to read + */ + stream_cb_t read_cb; + + /** + * Data for read-ready callback + */ + void *read_data; + + /** + * Callback if write is non-blocking + */ + stream_cb_t write_cb; + + /** + * Data for write-ready callback + */ + void *write_data; + + }; METHOD(stream_t, read_, ssize_t, @@ -96,6 +118,90 @@ METHOD(stream_t, write_, ssize_t, } } +/** + * Remove a registered watcher + */ +static void remove_watcher(private_stream_t *this) +{ + if (this->read_cb || this->write_cb) + { + lib->watcher->remove(lib->watcher, this->fd); + } +} + +/** + * Watcher callback + */ +static bool watch(private_stream_t *this, int fd, watcher_event_t event) +{ + bool keep = FALSE; + + switch (event) + { + case WATCHER_READ: + keep = this->read_cb(this->read_data, &this->public); + if (!keep) + { + this->read_cb = NULL; + } + break; + case WATCHER_WRITE: + keep = this->write_cb(this->write_data, &this->public); + if (!keep) + { + this->write_cb = NULL; + } + break; + case WATCHER_EXCEPT: + break; + } + return keep; +} + +/** + * Register watcher for stream callbacks + */ +static void add_watcher(private_stream_t *this) +{ + watcher_event_t events = 0; + + if (this->read_cb) + { + events |= WATCHER_READ; + } + if (this->write_cb) + { + events |= WATCHER_WRITE; + } + if (events) + { + lib->watcher->add(lib->watcher, this->fd, events, + (watcher_cb_t)watch, this); + } +} + +METHOD(stream_t, on_read, void, + private_stream_t *this, stream_cb_t cb, void *data) +{ + remove_watcher(this); + + this->read_cb = cb; + this->read_data = data; + + add_watcher(this); +} + +METHOD(stream_t, on_write, void, + private_stream_t *this, stream_cb_t cb, void *data) +{ + remove_watcher(this); + + this->write_cb = cb; + this->write_data = data; + + add_watcher(this); +} + METHOD(stream_t, vprint, int, private_stream_t *this, char *format, va_list ap) { @@ -126,6 +232,7 @@ METHOD(stream_t, print, int, METHOD(stream_t, destroy, void, private_stream_t *this) { + remove_watcher(this); if (this->file) { fclose(this->file); @@ -147,7 +254,9 @@ stream_t *stream_create_from_fd(int fd) INIT(this, .public = { .read = _read_, + .on_read = _on_read, .write = _write_, + .on_write = _on_write, .print = _print, .vprint = _vprint, .destroy = _destroy, diff --git a/src/libstrongswan/networking/streams/stream.h b/src/libstrongswan/networking/streams/stream.h index bcf7fb4143..4e0a67a079 100644 --- a/src/libstrongswan/networking/streams/stream.h +++ b/src/libstrongswan/networking/streams/stream.h @@ -33,6 +33,23 @@ typedef struct stream_t stream_t; */ typedef stream_t*(*stream_constructor_t)(char *uri); +/** + * Callback function prototype, called when stream is ready. + * + * It is not allowed to destroy the stream during the callback, this would + * deadlock. Instead, return FALSE to destroy the stream. It is not allowed + * to call on_read()/on_write() during this callback. + * + * As select() may return even if a read()/write() would actually block, it is + * recommended to use the non-blocking calls and handle return values + * appropriately. + * + * @param data data passed during callback registration + * @param stream associated stream + * @return FALSE to destroy the stream + */ +typedef bool (*stream_cb_t)(void *data, stream_t *stream); + /** * Abstraction of a Berkley socket using stream semantics. */ @@ -51,6 +68,14 @@ struct stream_t { */ ssize_t (*read)(stream_t *this, void *buf, size_t len, bool block); + /** + * Register a callback to invoke when stream has data to read. + * + * @param cb callback function, NULL to unregister + * @param data data to pass to callback + */ + void (*on_read)(stream_t *this, stream_cb_t cb, void *data); + /** * Write data to the stream. * @@ -64,6 +89,14 @@ struct stream_t { */ ssize_t (*write)(stream_t *this, void *buf, size_t len, bool block); + /** + * Register a callback to invoke when a write would not block. + * + * @param cb callback function, NULL to unregister + * @param data data to pass to callback + */ + void (*on_write)(stream_t *this, stream_cb_t cb, void *data); + /** * printf() convenience function for this stream. *