]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon: support event.socket(fd, cb) for I/O events
authorMarek Vavrusa <marek@vavrusa.com>
Mon, 23 May 2016 00:56:50 +0000 (17:56 -0700)
committerMarek Vavrusa <marek@vavrusa.com>
Mon, 23 May 2016 00:56:50 +0000 (17:56 -0700)
this allows embedding other event loops or just
asynchronous events triggered by socket activity.
this is required for things like cooperative
HTTP server, monitoring endpoint or remote
configuration daemon/controller

daemon/README.rst
daemon/bindings.c
daemon/engine.c

index 888b03de975a05339fd846755084b54461b0f641..3e93b4e48a1f2168600e13436f5480f37f50fc4c 100644 (file)
@@ -299,8 +299,42 @@ as a parameter, but it's not very useful as you don't have any *non-global* way
        -- make recurrent event that will cancel after 5 times
        event.recurrent(5 * minute, pruner())
 
+Another type of actionable event is activity on a file descriptor. This allows you to embed other
+event loops or monitor open files and then fire a callback when an activity is detected.
+This allows you to build persistent services like HTTP servers or monitoring probes that cooperate
+well with the daemon internal operations.
+
+For example a simple web server that doesn't block:
+
+.. code-block:: lua
+
+   local server, headers = require 'http.server', require 'http.headers'
+   local cqueues = require 'cqueues'
+   -- Start socket server
+   local s = server.listen { host = 'localhost', port = 8080 }
+   assert(s:listen())
+   -- Compose per-request coroutine
+   local cq = cqueues.new()
+   cq:wrap(function()
+      s:run(function(stream)
+         -- Create response headers
+         local headers = headers.new()
+         headers:append(':status', '200')
+         headers:append('connection', 'close')
+         -- Send response and close connection
+         assert(stream:write_headers(headers, false))
+         assert(stream:write_chunk('OK', true))
+         stream:shutdown()
+         stream.connection:shutdown()
+      end)
+      s:close()
+   end)
+   -- Hook to socket watcher
+   event.socket(cq:pollfd(), function (ev, status, events)
+      cq:step(0)
+   end)
+
 * File watchers
-* Data I/O
 
 .. note:: Work in progress, come back later!
 
@@ -826,6 +860,28 @@ For example, ``5 * hour`` represents five hours, or 5*60*60*100 milliseconds.
        e = event.after(1 * minute, function() print('Hi!') end)
        event.cancel(e)
 
+Watch for file descriptor activity. This allows embedding other event loops or simply
+firing events when a pipe endpoint becomes active. In another words, asynchronous
+notifications for daemon.
+
+.. function:: event.socket(fd, cb)
+
+   :param number fd: file descriptor to watch
+   :param cb: closure or callback to execute when fd becomes active
+   :return: event id
+
+   Execute function when there is activity on the file descriptor and calls a closure
+   with event id as the first parameter, status as second and number of events as third.
+
+   Example:
+
+   .. code-block:: lua
+
+   e = event.socket(0, function(e, status, nevents)
+      print('activity detected')
+   end)
+   e.cancel(e)
+
 Scripting worker
 ^^^^^^^^^^^^^^^^
 
index 80fba694724f35890f5eaa65fb184566cb9a088d..e18ad10a9e224850c306bed9b255f829e432654b 100644 (file)
@@ -692,7 +692,6 @@ static int execute_callback(lua_State *L, int argc)
        }
        /* Clear the stack, there may be event a/o enything returned */
        lua_settop(L, 0);
-       lua_gc(L, LUA_GCCOLLECT, 0);
        return ret;
 }
 
@@ -714,6 +713,26 @@ static void event_callback(uv_timer_t *timer)
        }
 }
 
+static void event_fdcallback(uv_poll_t* handle, int status, int events)
+{
+       struct worker_ctx *worker = handle->loop->data;
+       lua_State *L = worker->engine->L;
+
+       /* Retrieve callback and execute */
+       lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) handle->data);
+       lua_rawgeti(L, -1, 1);
+       lua_pushinteger(L, (intptr_t) handle->data);
+       lua_pushinteger(L, status);
+       lua_pushinteger(L, events);
+       int ret = execute_callback(L, 3);
+       /* Free callback if not recurrent or an error */
+       if (ret != 0) {
+               if (!uv_is_closing((uv_handle_t *)handle)) {
+                       uv_close((uv_handle_t *)handle, (uv_close_cb) event_free);
+               }
+       }
+}
+
 static int event_sched(lua_State *L, unsigned timeout, unsigned repeat)
 {
        uv_timer_t *timer = malloc(sizeof(*timer));
@@ -794,12 +813,69 @@ static int event_cancel(lua_State *L)
        return 1;
 }
 
+static int event_fdwatch(lua_State *L)
+{
+       /* Check parameters */
+       int n = lua_gettop(L);
+       if (n < 2 || !lua_isnumber(L, 1) || !lua_isfunction(L, 2)) {
+               format_error(L, "expected 'socket(number fd, function)'");
+               lua_error(L);
+       }
+
+       uv_poll_t *handle = malloc(sizeof(*handle));
+       if (!handle) {
+               format_error(L, "out of memory");
+               lua_error(L);
+       }
+
+       /* Start timer with the reference */
+       int sock = lua_tonumber(L, 1);
+       uv_loop_t *loop = uv_default_loop();
+#if defined(__APPLE__) || defined(__FreeBSD__)
+       /* libuv is buggy and fails to create poller for
+        * kqueue sockets as it can't be fcntl'd to non-blocking mode,
+        * so we pass it a copy of standard input and then
+        * switch it with real socket before starting the poller
+        */
+       int decoy_fd = dup(STDIN_FILENO);
+       int ret = uv_poll_init(loop, handle, decoy_fd);
+       if (ret == 0) {
+               handle->io_watcher.fd = sock;
+       }
+       close(decoy_fd);
+#else
+       int ret = uv_poll_init(loop, handle, sock);
+#endif
+       if (ret == 0) {
+               ret = uv_poll_start(handle, UV_READABLE, event_fdcallback);
+       }
+       if (ret != 0) {
+               free(handle);
+               format_error(L, "couldn't start event poller");
+               lua_error(L);
+       }
+
+       /* Save callback and timer in registry */
+       lua_newtable(L);
+       lua_pushvalue(L, 2);
+       lua_rawseti(L, -2, 1);
+       lua_pushlightuserdata(L, handle);
+       lua_rawseti(L, -2, 2);
+       int ref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+       /* Save reference to the timer */
+       handle->data = (void *) (intptr_t)ref;
+       lua_pushinteger(L, ref);
+       return 1;
+}
+
 int lib_event(lua_State *L)
 {
        static const luaL_Reg lib[] = {
                { "after",      event_after },
                { "recurrent",  event_recurrent },
                { "cancel",     event_cancel },
+               { "socket",     event_fdwatch },
                { NULL, NULL }
        };
 
index ac7d9371037c6dfb77e21bef507abe072176b163..774fb8cbdbf2f14b809f2c8d1b7195b7ec5019cf 100644 (file)
@@ -68,6 +68,7 @@ static int l_help(lua_State *L)
                "mode(strict|normal|permissive)\n    set resolver strictness level\n"
                "resolve(name, type[, class, flags, callback])\n    resolve query, callback when it's finished\n"
                "todname(name)\n    convert name to wire format\n"
+               "tojson(val)\n    convert value to JSON\n"
                "net\n    network configuration\n"
                "cache\n    network configuration\n"
                "modules\n    modules configuration\n"
@@ -319,6 +320,16 @@ static char *l_pack_json(lua_State *L, int top)
        return result;
 }
 
+static int l_tojson(lua_State *L)
+{
+       auto_free char *json_str = l_pack_json(L, 1);
+       if (!json_str) {
+               return 0;
+       }
+       lua_pushstring(L, json_str);
+       return 1;
+}
+
 /** Trampoline function for module properties. */
 static int l_trampoline(lua_State *L)
 {
@@ -431,6 +442,8 @@ static int init_state(struct engine *engine)
        lua_setglobal(engine->L, "trustanchor");
        lua_pushcfunction(engine->L, l_libpath);
        lua_setglobal(engine->L, "libpath");
+       lua_pushcfunction(engine->L, l_tojson);
+       lua_setglobal(engine->L, "tojson");
        lua_pushliteral(engine->L, MODULEDIR);
        lua_setglobal(engine->L, "moduledir");
        lua_pushliteral(engine->L, ETCDIR);