* FILE* for convenience functions, or NULL
*/
FILE *file;
+
+ /**
+ * Callback if data is ready to read
+ */
+ stream_cb_t read_cb;
+
+ /**
+ * Data for read-ready callback
+ */
+ void *read_data;
+
+ /**
+ * Callback if write is non-blocking
+ */
+ stream_cb_t write_cb;
+
+ /**
+ * Data for write-ready callback
+ */
+ void *write_data;
+
+
};
METHOD(stream_t, read_, ssize_t,
}
}
+/**
+ * Remove a registered watcher
+ */
+static void remove_watcher(private_stream_t *this)
+{
+ if (this->read_cb || this->write_cb)
+ {
+ lib->watcher->remove(lib->watcher, this->fd);
+ }
+}
+
+/**
+ * Watcher callback
+ */
+static bool watch(private_stream_t *this, int fd, watcher_event_t event)
+{
+ bool keep = FALSE;
+
+ switch (event)
+ {
+ case WATCHER_READ:
+ keep = this->read_cb(this->read_data, &this->public);
+ if (!keep)
+ {
+ this->read_cb = NULL;
+ }
+ break;
+ case WATCHER_WRITE:
+ keep = this->write_cb(this->write_data, &this->public);
+ if (!keep)
+ {
+ this->write_cb = NULL;
+ }
+ break;
+ case WATCHER_EXCEPT:
+ break;
+ }
+ return keep;
+}
+
+/**
+ * Register watcher for stream callbacks
+ */
+static void add_watcher(private_stream_t *this)
+{
+ watcher_event_t events = 0;
+
+ if (this->read_cb)
+ {
+ events |= WATCHER_READ;
+ }
+ if (this->write_cb)
+ {
+ events |= WATCHER_WRITE;
+ }
+ if (events)
+ {
+ lib->watcher->add(lib->watcher, this->fd, events,
+ (watcher_cb_t)watch, this);
+ }
+}
+
+METHOD(stream_t, on_read, void,
+ private_stream_t *this, stream_cb_t cb, void *data)
+{
+ remove_watcher(this);
+
+ this->read_cb = cb;
+ this->read_data = data;
+
+ add_watcher(this);
+}
+
+METHOD(stream_t, on_write, void,
+ private_stream_t *this, stream_cb_t cb, void *data)
+{
+ remove_watcher(this);
+
+ this->write_cb = cb;
+ this->write_data = data;
+
+ add_watcher(this);
+}
+
METHOD(stream_t, vprint, int,
private_stream_t *this, char *format, va_list ap)
{
METHOD(stream_t, destroy, void,
private_stream_t *this)
{
+ remove_watcher(this);
if (this->file)
{
fclose(this->file);
INIT(this,
.public = {
.read = _read_,
+ .on_read = _on_read,
.write = _write_,
+ .on_write = _on_write,
.print = _print,
.vprint = _vprint,
.destroy = _destroy,
*/
typedef stream_t*(*stream_constructor_t)(char *uri);
+/**
+ * Callback function prototype, called when stream is ready.
+ *
+ * It is not allowed to destroy the stream during the callback, this would
+ * deadlock. Instead, return FALSE to destroy the stream. It is not allowed
+ * to call on_read()/on_write() during this callback.
+ *
+ * As select() may return even if a read()/write() would actually block, it is
+ * recommended to use the non-blocking calls and handle return values
+ * appropriately.
+ *
+ * @param data data passed during callback registration
+ * @param stream associated stream
+ * @return FALSE to destroy the stream
+ */
+typedef bool (*stream_cb_t)(void *data, stream_t *stream);
+
/**
* Abstraction of a Berkley socket using stream semantics.
*/
*/
ssize_t (*read)(stream_t *this, void *buf, size_t len, bool block);
+ /**
+ * Register a callback to invoke when stream has data to read.
+ *
+ * @param cb callback function, NULL to unregister
+ * @param data data to pass to callback
+ */
+ void (*on_read)(stream_t *this, stream_cb_t cb, void *data);
+
/**
* Write data to the stream.
*
*/
ssize_t (*write)(stream_t *this, void *buf, size_t len, bool block);
+ /**
+ * Register a callback to invoke when a write would not block.
+ *
+ * @param cb callback function, NULL to unregister
+ * @param data data to pass to callback
+ */
+ void (*on_write)(stream_t *this, stream_cb_t cb, void *data);
+
/**
* printf() convenience function for this stream.
*