-- make recurrent event that will cancel after 5 times
event.recurrent(5 * minute, pruner())
+Another type of actionable event is activity on a file descriptor. This allows you to embed other
+event loops or monitor open files and then fire a callback when an activity is detected.
+This allows you to build persistent services like HTTP servers or monitoring probes that cooperate
+well with the daemon internal operations.
+
+For example a simple web server that doesn't block:
+
+.. code-block:: lua
+
+ local server, headers = require 'http.server', require 'http.headers'
+ local cqueues = require 'cqueues'
+ -- Start socket server
+ local s = server.listen { host = 'localhost', port = 8080 }
+ assert(s:listen())
+ -- Compose per-request coroutine
+ local cq = cqueues.new()
+ cq:wrap(function()
+ s:run(function(stream)
+ -- Create response headers
+ local headers = headers.new()
+ headers:append(':status', '200')
+ headers:append('connection', 'close')
+ -- Send response and close connection
+ assert(stream:write_headers(headers, false))
+ assert(stream:write_chunk('OK', true))
+ stream:shutdown()
+ stream.connection:shutdown()
+ end)
+ s:close()
+ end)
+ -- Hook to socket watcher
+ event.socket(cq:pollfd(), function (ev, status, events)
+ cq:step(0)
+ end)
+
* File watchers
-* Data I/O
.. note:: Work in progress, come back later!
e = event.after(1 * minute, function() print('Hi!') end)
event.cancel(e)
+Watch for file descriptor activity. This allows embedding other event loops or simply
+firing events when a pipe endpoint becomes active. In another words, asynchronous
+notifications for daemon.
+
+.. function:: event.socket(fd, cb)
+
+ :param number fd: file descriptor to watch
+ :param cb: closure or callback to execute when fd becomes active
+ :return: event id
+
+ Execute function when there is activity on the file descriptor and calls a closure
+ with event id as the first parameter, status as second and number of events as third.
+
+ Example:
+
+ .. code-block:: lua
+
+ e = event.socket(0, function(e, status, nevents)
+ print('activity detected')
+ end)
+ e.cancel(e)
+
Scripting worker
^^^^^^^^^^^^^^^^
}
/* Clear the stack, there may be event a/o enything returned */
lua_settop(L, 0);
- lua_gc(L, LUA_GCCOLLECT, 0);
return ret;
}
}
}
+static void event_fdcallback(uv_poll_t* handle, int status, int events)
+{
+ struct worker_ctx *worker = handle->loop->data;
+ lua_State *L = worker->engine->L;
+
+ /* Retrieve callback and execute */
+ lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) handle->data);
+ lua_rawgeti(L, -1, 1);
+ lua_pushinteger(L, (intptr_t) handle->data);
+ lua_pushinteger(L, status);
+ lua_pushinteger(L, events);
+ int ret = execute_callback(L, 3);
+ /* Free callback if not recurrent or an error */
+ if (ret != 0) {
+ if (!uv_is_closing((uv_handle_t *)handle)) {
+ uv_close((uv_handle_t *)handle, (uv_close_cb) event_free);
+ }
+ }
+}
+
static int event_sched(lua_State *L, unsigned timeout, unsigned repeat)
{
uv_timer_t *timer = malloc(sizeof(*timer));
return 1;
}
+static int event_fdwatch(lua_State *L)
+{
+ /* Check parameters */
+ int n = lua_gettop(L);
+ if (n < 2 || !lua_isnumber(L, 1) || !lua_isfunction(L, 2)) {
+ format_error(L, "expected 'socket(number fd, function)'");
+ lua_error(L);
+ }
+
+ uv_poll_t *handle = malloc(sizeof(*handle));
+ if (!handle) {
+ format_error(L, "out of memory");
+ lua_error(L);
+ }
+
+ /* Start timer with the reference */
+ int sock = lua_tonumber(L, 1);
+ uv_loop_t *loop = uv_default_loop();
+#if defined(__APPLE__) || defined(__FreeBSD__)
+ /* libuv is buggy and fails to create poller for
+ * kqueue sockets as it can't be fcntl'd to non-blocking mode,
+ * so we pass it a copy of standard input and then
+ * switch it with real socket before starting the poller
+ */
+ int decoy_fd = dup(STDIN_FILENO);
+ int ret = uv_poll_init(loop, handle, decoy_fd);
+ if (ret == 0) {
+ handle->io_watcher.fd = sock;
+ }
+ close(decoy_fd);
+#else
+ int ret = uv_poll_init(loop, handle, sock);
+#endif
+ if (ret == 0) {
+ ret = uv_poll_start(handle, UV_READABLE, event_fdcallback);
+ }
+ if (ret != 0) {
+ free(handle);
+ format_error(L, "couldn't start event poller");
+ lua_error(L);
+ }
+
+ /* Save callback and timer in registry */
+ lua_newtable(L);
+ lua_pushvalue(L, 2);
+ lua_rawseti(L, -2, 1);
+ lua_pushlightuserdata(L, handle);
+ lua_rawseti(L, -2, 2);
+ int ref = luaL_ref(L, LUA_REGISTRYINDEX);
+
+ /* Save reference to the timer */
+ handle->data = (void *) (intptr_t)ref;
+ lua_pushinteger(L, ref);
+ return 1;
+}
+
int lib_event(lua_State *L)
{
static const luaL_Reg lib[] = {
{ "after", event_after },
{ "recurrent", event_recurrent },
{ "cancel", event_cancel },
+ { "socket", event_fdwatch },
{ NULL, NULL }
};
"mode(strict|normal|permissive)\n set resolver strictness level\n"
"resolve(name, type[, class, flags, callback])\n resolve query, callback when it's finished\n"
"todname(name)\n convert name to wire format\n"
+ "tojson(val)\n convert value to JSON\n"
"net\n network configuration\n"
"cache\n network configuration\n"
"modules\n modules configuration\n"
return result;
}
+static int l_tojson(lua_State *L)
+{
+ auto_free char *json_str = l_pack_json(L, 1);
+ if (!json_str) {
+ return 0;
+ }
+ lua_pushstring(L, json_str);
+ return 1;
+}
+
/** Trampoline function for module properties. */
static int l_trampoline(lua_State *L)
{
lua_setglobal(engine->L, "trustanchor");
lua_pushcfunction(engine->L, l_libpath);
lua_setglobal(engine->L, "libpath");
+ lua_pushcfunction(engine->L, l_tojson);
+ lua_setglobal(engine->L, "tojson");
lua_pushliteral(engine->L, MODULEDIR);
lua_setglobal(engine->L, "moduledir");
lua_pushliteral(engine->L, ETCDIR);