From: Marek VavruĊĦa Date: Sun, 5 Jul 2015 20:09:48 +0000 (+0200) Subject: daemon/bindings: Lua API for worker (resolve, stats) X-Git-Tag: v1.0.0-beta1~88^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c3ad1f835e96e02acd38977232700fe1bcfaa78d;p=thirdparty%2Fknot-resolver.git daemon/bindings: Lua API for worker (resolve, stats) --- diff --git a/daemon/bindings.c b/daemon/bindings.c index 66ac0454a..4fe3265a5 100644 --- a/daemon/bindings.c +++ b/daemon/bindings.c @@ -15,6 +15,7 @@ */ #include +#include #include "lib/cache.h" #include "daemon/bindings.h" @@ -543,3 +544,75 @@ int lib_event(lua_State *L) register_lib(L, "event", lib); return 1; } + +static inline struct worker_ctx *wrk_luaget(lua_State *L) { + lua_getglobal(L, "__worker"); + struct worker_ctx *worker = lua_touserdata(L, -1); + lua_pop(L, 1); + return worker; +} + +static int wrk_resolve(lua_State *L) +{ + struct worker_ctx *worker = wrk_luaget(L); + if (!worker) { + return 0; + } + /* Create query packet */ + knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, NULL); + if (!pkt) { + lua_pushstring(L, strerror(ENOMEM)); + lua_error(L); + } + uint8_t dname[KNOT_DNAME_MAXLEN]; + knot_dname_from_str(dname, lua_tostring(L, 1), sizeof(dname)); + /* Check class and type */ + uint16_t rrtype = lua_tointeger(L, 2); + if (!lua_isnumber(L, 2)) { + lua_pushstring(L, "invalid RR type"); + lua_error(L); + } + uint16_t rrclass = lua_tointeger(L, 3); + if (!lua_isnumber(L, 3)) { /* Default class is IN */ + rrclass = KNOT_CLASS_IN; + } + knot_pkt_put_question(pkt, dname, rrclass, rrtype); + knot_wire_set_rd(pkt->wire); + /* Resolve it */ + int ret = worker_resolve(worker, pkt); + knot_pkt_free(&pkt); + if (ret != 0) { + lua_pushstring(L, kr_strerror(ret)); + lua_error(L); + } + lua_pushboolean(L, true); + return 1; +} + +/** Return worker statistics. */ +static int wrk_stats(lua_State *L) +{ + struct worker_ctx *worker = wrk_luaget(L); + if (!worker) { + return 0; + } + lua_newtable(L); + lua_pushnumber(L, worker->stats.concurrent); + lua_setfield(L, -2, "concurrent"); + lua_pushnumber(L, worker->stats.udp); + lua_setfield(L, -2, "udp"); + lua_pushnumber(L, worker->stats.tcp); + lua_setfield(L, -2, "tcp"); + return 1; +} + +int lib_worker(lua_State *L) +{ + static const luaL_Reg lib[] = { + { "resolve", wrk_resolve }, + { "stats", wrk_stats }, + { NULL, NULL } + }; + register_lib(L, "worker", lib); + return 1; +} diff --git a/daemon/bindings.h b/daemon/bindings.h index c114b78da..655b24944 100644 --- a/daemon/bindings.h +++ b/daemon/bindings.h @@ -75,4 +75,11 @@ int lib_cache(lua_State *L); * @param L scriptable * @return number of packages to load */ -int lib_event(lua_State *L); \ No newline at end of file +int lib_event(lua_State *L); + +/** + * Load worker API. + * @param L scriptable + * @return number of packages to load + */ +int lib_worker(lua_State *L); \ No newline at end of file diff --git a/daemon/main.c b/daemon/main.c index 791ec2688..1dc4503ff 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -135,6 +135,7 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, mm engine_lualib(engine, "cache", lib_cache); engine_lualib(engine, "event", lib_event); engine_lualib(engine, "kres", lib_kres); + engine_lualib(engine, "worker", lib_worker); /* Create main worker. */ struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker)); @@ -146,6 +147,9 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, mm worker->loop = loop; loop->data = worker; worker_reserve(worker, MP_FREELIST_SIZE); + /* Register worker in Lua thread */ + lua_pushlightuserdata(engine->L, worker); + lua_setglobal(engine->L, "__worker"); return worker; } diff --git a/daemon/worker.c b/daemon/worker.c index bfcfd1dc9..728616cf5 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -117,9 +117,10 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha task->next_query = pktbuf; /* Start resolution */ - uv_timer_init(handle->loop, &task->timeout); + uv_timer_init(worker->loop, &task->timeout); task->timeout.data = task; kr_resolve_begin(&task->req, &engine->resolver, answer); + worker->stats.concurrent += 1; return task; } @@ -128,7 +129,7 @@ static void qr_task_free(uv_handle_t *handle) struct qr_task *task = handle->data; /* Return handle to the event loop in case * it was exclusively taken by this task. */ - if (!uv_has_ref(task->source.handle)) { + if (task->source.handle && !uv_has_ref(task->source.handle)) { uv_ref(task->source.handle); io_start_read(task->source.handle); } @@ -149,6 +150,8 @@ static void qr_task_free(uv_handle_t *handle) mp_delete_count = 0; } #endif + /* Update stats */ + worker->stats.concurrent -= 1; } static void qr_task_timeout(uv_timer_t *req) @@ -178,9 +181,14 @@ static int qr_task_on_send(struct qr_task *task, int status) static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt) { int ret = 0; + if (!handle) { + return qr_task_on_send(task, kr_error(EIO)); + } if (handle->type == UV_UDP) { uv_buf_t buf = { (char *)pkt->wire, pkt->size }; ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr); + if (handle != task->source.handle) + task->worker->stats.udp += 1; } else { uint16_t pkt_size = htons(pkt->size); uv_buf_t buf[2] = { @@ -188,6 +196,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad { (char *)pkt->wire, pkt->size } }; ret = uv_try_write((uv_stream_t *)handle, buf, 2); + if (handle != task->source.handle) + task->worker->stats.tcp += 1; } return qr_task_on_send(task, (ret >= 0) ? 0 : -1); } @@ -204,6 +214,7 @@ static void qr_task_on_connect(uv_connect_t *connect, int status) static int qr_task_finalize(struct qr_task *task, int state) { kr_resolve_finish(&task->req, state); + /* Send back answer */ (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer); return state == KNOT_STATE_DONE ? 0 : kr_error(EIO); } @@ -238,8 +249,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) } /* Create connection for iterative query */ - uv_handle_t *source_handle = task->source.handle; - task->next_handle = io_create(source_handle->loop, sock_type); + task->next_handle = io_create(task->worker->loop, sock_type); if (task->next_handle == NULL) { return qr_task_finalize(task, KNOT_STATE_FAIL); } @@ -256,6 +266,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) } connect->data = task; } else { + printf("sending: %s %u\n", knot_dname_to_str_alloc(knot_pkt_qname(next_query)), knot_pkt_qtype(next_query)); if (qr_task_send(task, task->next_handle, addr, next_query) != 0) { return qr_task_step(task, NULL); } @@ -293,6 +304,20 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer return qr_task_step(task, query); } +int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query) +{ + if (!worker) { + return kr_error(EINVAL); + } + + /* Create task */ + struct qr_task *task = qr_task_create(worker, NULL, query, NULL); + if (!task) { + return kr_error(ENOMEM); + } + return qr_task_step(task, query); +} + int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) { array_init(worker->pools); diff --git a/daemon/worker.h b/daemon/worker.h index 8f7df470b..28b9d24da 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -33,21 +33,26 @@ struct worker_ctx { uv_loop_t *loop; mm_ctx_t *mm; uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE]; + struct { + size_t concurrent; + size_t udp; + size_t tcp; + } stats; mp_freelist_t pools; }; /** - * Resolve query. - * - * @param worker - * @param handle - * @param answer - * @param query - * @param addr - * @return 0, error code + * Process incoming packet (query or answer to subrequest). + * @return 0 or an error code */ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr); +/** + * Schedule query for resolution. + * @return 0 or an error code + */ +int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query); + /** Reserve worker buffers */ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);