From: Marek Vavrusa Date: Mon, 23 May 2016 00:56:50 +0000 (-0700) Subject: daemon: support event.socket(fd, cb) for I/O events X-Git-Tag: v1.0.0~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c416877a0e2bfc5d66a2e165bb09b8ef2f8b843d;p=thirdparty%2Fknot-resolver.git daemon: support event.socket(fd, cb) for I/O events this allows embedding other event loops or just asynchronous events triggered by socket activity. this is required for things like cooperative HTTP server, monitoring endpoint or remote configuration daemon/controller --- diff --git a/daemon/README.rst b/daemon/README.rst index 888b03de9..3e93b4e48 100644 --- a/daemon/README.rst +++ b/daemon/README.rst @@ -299,8 +299,42 @@ as a parameter, but it's not very useful as you don't have any *non-global* way -- make recurrent event that will cancel after 5 times event.recurrent(5 * minute, pruner()) +Another type of actionable event is activity on a file descriptor. This allows you to embed other +event loops or monitor open files and then fire a callback when an activity is detected. +This allows you to build persistent services like HTTP servers or monitoring probes that cooperate +well with the daemon internal operations. + +For example a simple web server that doesn't block: + +.. code-block:: lua + + local server, headers = require 'http.server', require 'http.headers' + local cqueues = require 'cqueues' + -- Start socket server + local s = server.listen { host = 'localhost', port = 8080 } + assert(s:listen()) + -- Compose per-request coroutine + local cq = cqueues.new() + cq:wrap(function() + s:run(function(stream) + -- Create response headers + local headers = headers.new() + headers:append(':status', '200') + headers:append('connection', 'close') + -- Send response and close connection + assert(stream:write_headers(headers, false)) + assert(stream:write_chunk('OK', true)) + stream:shutdown() + stream.connection:shutdown() + end) + s:close() + end) + -- Hook to socket watcher + event.socket(cq:pollfd(), function (ev, status, events) + cq:step(0) + end) + * File watchers -* Data I/O .. note:: Work in progress, come back later! @@ -826,6 +860,28 @@ For example, ``5 * hour`` represents five hours, or 5*60*60*100 milliseconds. e = event.after(1 * minute, function() print('Hi!') end) event.cancel(e) +Watch for file descriptor activity. This allows embedding other event loops or simply +firing events when a pipe endpoint becomes active. In another words, asynchronous +notifications for daemon. + +.. function:: event.socket(fd, cb) + + :param number fd: file descriptor to watch + :param cb: closure or callback to execute when fd becomes active + :return: event id + + Execute function when there is activity on the file descriptor and calls a closure + with event id as the first parameter, status as second and number of events as third. + + Example: + + .. code-block:: lua + + e = event.socket(0, function(e, status, nevents) + print('activity detected') + end) + e.cancel(e) + Scripting worker ^^^^^^^^^^^^^^^^ diff --git a/daemon/bindings.c b/daemon/bindings.c index 80fba6947..e18ad10a9 100644 --- a/daemon/bindings.c +++ b/daemon/bindings.c @@ -692,7 +692,6 @@ static int execute_callback(lua_State *L, int argc) } /* Clear the stack, there may be event a/o enything returned */ lua_settop(L, 0); - lua_gc(L, LUA_GCCOLLECT, 0); return ret; } @@ -714,6 +713,26 @@ static void event_callback(uv_timer_t *timer) } } +static void event_fdcallback(uv_poll_t* handle, int status, int events) +{ + struct worker_ctx *worker = handle->loop->data; + lua_State *L = worker->engine->L; + + /* Retrieve callback and execute */ + lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) handle->data); + lua_rawgeti(L, -1, 1); + lua_pushinteger(L, (intptr_t) handle->data); + lua_pushinteger(L, status); + lua_pushinteger(L, events); + int ret = execute_callback(L, 3); + /* Free callback if not recurrent or an error */ + if (ret != 0) { + if (!uv_is_closing((uv_handle_t *)handle)) { + uv_close((uv_handle_t *)handle, (uv_close_cb) event_free); + } + } +} + static int event_sched(lua_State *L, unsigned timeout, unsigned repeat) { uv_timer_t *timer = malloc(sizeof(*timer)); @@ -794,12 +813,69 @@ static int event_cancel(lua_State *L) return 1; } +static int event_fdwatch(lua_State *L) +{ + /* Check parameters */ + int n = lua_gettop(L); + if (n < 2 || !lua_isnumber(L, 1) || !lua_isfunction(L, 2)) { + format_error(L, "expected 'socket(number fd, function)'"); + lua_error(L); + } + + uv_poll_t *handle = malloc(sizeof(*handle)); + if (!handle) { + format_error(L, "out of memory"); + lua_error(L); + } + + /* Start timer with the reference */ + int sock = lua_tonumber(L, 1); + uv_loop_t *loop = uv_default_loop(); +#if defined(__APPLE__) || defined(__FreeBSD__) + /* libuv is buggy and fails to create poller for + * kqueue sockets as it can't be fcntl'd to non-blocking mode, + * so we pass it a copy of standard input and then + * switch it with real socket before starting the poller + */ + int decoy_fd = dup(STDIN_FILENO); + int ret = uv_poll_init(loop, handle, decoy_fd); + if (ret == 0) { + handle->io_watcher.fd = sock; + } + close(decoy_fd); +#else + int ret = uv_poll_init(loop, handle, sock); +#endif + if (ret == 0) { + ret = uv_poll_start(handle, UV_READABLE, event_fdcallback); + } + if (ret != 0) { + free(handle); + format_error(L, "couldn't start event poller"); + lua_error(L); + } + + /* Save callback and timer in registry */ + lua_newtable(L); + lua_pushvalue(L, 2); + lua_rawseti(L, -2, 1); + lua_pushlightuserdata(L, handle); + lua_rawseti(L, -2, 2); + int ref = luaL_ref(L, LUA_REGISTRYINDEX); + + /* Save reference to the timer */ + handle->data = (void *) (intptr_t)ref; + lua_pushinteger(L, ref); + return 1; +} + int lib_event(lua_State *L) { static const luaL_Reg lib[] = { { "after", event_after }, { "recurrent", event_recurrent }, { "cancel", event_cancel }, + { "socket", event_fdwatch }, { NULL, NULL } }; diff --git a/daemon/engine.c b/daemon/engine.c index ac7d93710..774fb8cbd 100644 --- a/daemon/engine.c +++ b/daemon/engine.c @@ -68,6 +68,7 @@ static int l_help(lua_State *L) "mode(strict|normal|permissive)\n set resolver strictness level\n" "resolve(name, type[, class, flags, callback])\n resolve query, callback when it's finished\n" "todname(name)\n convert name to wire format\n" + "tojson(val)\n convert value to JSON\n" "net\n network configuration\n" "cache\n network configuration\n" "modules\n modules configuration\n" @@ -319,6 +320,16 @@ static char *l_pack_json(lua_State *L, int top) return result; } +static int l_tojson(lua_State *L) +{ + auto_free char *json_str = l_pack_json(L, 1); + if (!json_str) { + return 0; + } + lua_pushstring(L, json_str); + return 1; +} + /** Trampoline function for module properties. */ static int l_trampoline(lua_State *L) { @@ -431,6 +442,8 @@ static int init_state(struct engine *engine) lua_setglobal(engine->L, "trustanchor"); lua_pushcfunction(engine->L, l_libpath); lua_setglobal(engine->L, "libpath"); + lua_pushcfunction(engine->L, l_tojson); + lua_setglobal(engine->L, "tojson"); lua_pushliteral(engine->L, MODULEDIR); lua_setglobal(engine->L, "moduledir"); lua_pushliteral(engine->L, ETCDIR);