]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/bindings: Lua API for worker (resolve, stats)
authorMarek Vavruša <marek.vavrusa@nic.cz>
Sun, 5 Jul 2015 20:09:48 +0000 (22:09 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Sun, 5 Jul 2015 20:09:48 +0000 (22:09 +0200)
daemon/bindings.c
daemon/bindings.h
daemon/main.c
daemon/worker.c
daemon/worker.h

index 66ac0454a91254e398a0f6347b4b9f9db00ebffe..4fe3265a57071b555447c6adbf9fbfcc1789783b 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <uv.h>
+#include <libknot/descriptor.h>
 
 #include "lib/cache.h"
 #include "daemon/bindings.h"
@@ -543,3 +544,75 @@ int lib_event(lua_State *L)
        register_lib(L, "event", lib);
        return 1;
 }
+
+static inline struct worker_ctx *wrk_luaget(lua_State *L) {
+       lua_getglobal(L, "__worker");
+       struct worker_ctx *worker = lua_touserdata(L, -1);
+       lua_pop(L, 1);
+       return worker;
+}
+
+static int wrk_resolve(lua_State *L)
+{
+       struct worker_ctx *worker = wrk_luaget(L);
+       if (!worker) {
+               return 0;
+       }
+       /* Create query packet */
+       knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, NULL);
+       if (!pkt) {
+               lua_pushstring(L, strerror(ENOMEM));
+               lua_error(L);
+       }
+       uint8_t dname[KNOT_DNAME_MAXLEN];
+       knot_dname_from_str(dname, lua_tostring(L, 1), sizeof(dname));
+       /* Check class and type */
+       uint16_t rrtype = lua_tointeger(L, 2);
+       if (!lua_isnumber(L, 2)) {
+               lua_pushstring(L, "invalid RR type");
+               lua_error(L);
+       }
+       uint16_t rrclass = lua_tointeger(L, 3);
+       if (!lua_isnumber(L, 3)) { /* Default class is IN */
+               rrclass = KNOT_CLASS_IN;
+       }
+       knot_pkt_put_question(pkt, dname, rrclass, rrtype);
+       knot_wire_set_rd(pkt->wire);
+       /* Resolve it */
+       int ret = worker_resolve(worker, pkt);
+       knot_pkt_free(&pkt);
+       if (ret != 0) {
+               lua_pushstring(L, kr_strerror(ret));
+               lua_error(L);
+       }
+       lua_pushboolean(L, true);
+       return 1;
+}
+
+/** Return worker statistics. */
+static int wrk_stats(lua_State *L)
+{
+       struct worker_ctx *worker = wrk_luaget(L);
+       if (!worker) {
+               return 0;
+       }
+       lua_newtable(L);
+       lua_pushnumber(L, worker->stats.concurrent);
+       lua_setfield(L, -2, "concurrent");
+       lua_pushnumber(L, worker->stats.udp);
+       lua_setfield(L, -2, "udp");
+       lua_pushnumber(L, worker->stats.tcp);
+       lua_setfield(L, -2, "tcp");
+       return 1;
+}
+
+int lib_worker(lua_State *L)
+{
+       static const luaL_Reg lib[] = {
+               { "resolve",  wrk_resolve },
+               { "stats",    wrk_stats },
+               { NULL, NULL }
+       };
+       register_lib(L, "worker", lib);
+       return 1;
+}
index c114b78da18f1766bc59a7dad1430a7b7a6b9ebf..655b249443d183e96808b26a02f0f51134a467b6 100644 (file)
@@ -75,4 +75,11 @@ int lib_cache(lua_State *L);
  * @param  L scriptable
  * @return   number of packages to load
  */
-int lib_event(lua_State *L);
\ No newline at end of file
+int lib_event(lua_State *L);
+
+/**
+ * Load worker API.
+ * @param  L scriptable
+ * @return   number of packages to load
+ */
+int lib_worker(lua_State *L);
\ No newline at end of file
index 791ec26883e0b1409d8ba6b434b6df7289a7b5ba..1dc4503ffe0ef56dbd87e1867f254f76be4fea26 100644 (file)
@@ -135,6 +135,7 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, mm
        engine_lualib(engine, "cache",   lib_cache);
        engine_lualib(engine, "event",   lib_event);
        engine_lualib(engine, "kres",    lib_kres);
+       engine_lualib(engine, "worker",  lib_worker);
 
        /* Create main worker. */
        struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
@@ -146,6 +147,9 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, mm
        worker->loop = loop;
        loop->data = worker;
        worker_reserve(worker, MP_FREELIST_SIZE);
+       /* Register worker in Lua thread */
+       lua_pushlightuserdata(engine->L, worker);
+       lua_setglobal(engine->L, "__worker");
        return worker;
 }
 
index bfcfd1dc9a01bca7f29e7b4b2ad2ac0d8b2f4bb3..728616cf5183215b99d66ce6ea90df4a81f9647f 100644 (file)
@@ -117,9 +117,10 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        task->next_query = pktbuf;
 
        /* Start resolution */
-       uv_timer_init(handle->loop, &task->timeout);
+       uv_timer_init(worker->loop, &task->timeout);
        task->timeout.data = task;
        kr_resolve_begin(&task->req, &engine->resolver, answer);
+       worker->stats.concurrent += 1;
        return task;
 }
 
@@ -128,7 +129,7 @@ static void qr_task_free(uv_handle_t *handle)
        struct qr_task *task = handle->data;
        /* Return handle to the event loop in case
         * it was exclusively taken by this task. */
-       if (!uv_has_ref(task->source.handle)) {
+       if (task->source.handle && !uv_has_ref(task->source.handle)) {
                uv_ref(task->source.handle);
                io_start_read(task->source.handle);
        }
@@ -149,6 +150,8 @@ static void qr_task_free(uv_handle_t *handle)
                mp_delete_count = 0;
        }
 #endif
+       /* Update stats */
+       worker->stats.concurrent -= 1;
 }
 
 static void qr_task_timeout(uv_timer_t *req)
@@ -178,9 +181,14 @@ static int qr_task_on_send(struct qr_task *task, int status)
 static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
 {
        int ret = 0;
+       if (!handle) {
+               return qr_task_on_send(task, kr_error(EIO));
+       }
        if (handle->type == UV_UDP) {
                uv_buf_t buf = { (char *)pkt->wire, pkt->size };
                ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
+               if (handle != task->source.handle)
+                       task->worker->stats.udp += 1;
        } else {
                uint16_t pkt_size = htons(pkt->size);
                uv_buf_t buf[2] = {
@@ -188,6 +196,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
                        { (char *)pkt->wire, pkt->size }
                };
                ret = uv_try_write((uv_stream_t *)handle, buf, 2);
+               if (handle != task->source.handle)
+                       task->worker->stats.tcp += 1;
        }
        return qr_task_on_send(task, (ret >= 0) ? 0 : -1);
 }
@@ -204,6 +214,7 @@ static void qr_task_on_connect(uv_connect_t *connect, int status)
 static int qr_task_finalize(struct qr_task *task, int state)
 {
        kr_resolve_finish(&task->req, state);
+       /* Send back answer */
        (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
        return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
 }
@@ -238,8 +249,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
        }
 
        /* Create connection for iterative query */
-       uv_handle_t *source_handle = task->source.handle;
-       task->next_handle = io_create(source_handle->loop, sock_type);
+       task->next_handle = io_create(task->worker->loop, sock_type);
        if (task->next_handle == NULL) {
                return qr_task_finalize(task, KNOT_STATE_FAIL);
        }
@@ -256,6 +266,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
                }
                connect->data = task;
        } else {
+               printf("sending: %s %u\n", knot_dname_to_str_alloc(knot_pkt_qname(next_query)), knot_pkt_qtype(next_query));
                if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
                        return qr_task_step(task, NULL);
                }
@@ -293,6 +304,20 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
        return qr_task_step(task, query);
 }
 
+int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query)
+{
+       if (!worker) {
+               return kr_error(EINVAL);
+       }
+
+       /* Create task */
+       struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
+       if (!task) {
+               return kr_error(ENOMEM);
+       }
+       return qr_task_step(task, query);
+}
+
 int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 {
        array_init(worker->pools);
index 8f7df470b6bcca13b7b5abe677bd2203dd0c251b..28b9d24da7bfb9cdd6518ca72766fa84782f6342 100644 (file)
@@ -33,21 +33,26 @@ struct worker_ctx {
        uv_loop_t *loop;
        mm_ctx_t *mm;
        uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE];
+       struct {
+               size_t concurrent;
+               size_t udp;
+               size_t tcp;
+       } stats;
        mp_freelist_t pools;
 };
 
 /**
- * Resolve query.
- *
- * @param worker
- * @param handle
- * @param answer
- * @param query
- * @param addr
- * @return 0, error code
+ * Process incoming packet (query or answer to subrequest).
+ * @return 0 or an error code
  */
 int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
 
+/**
+ * Schedule query for resolution.
+ * @return 0 or an error code
+ */
+int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query);
+
 /** Reserve worker buffers */
 int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);