*/
#include <uv.h>
+#include <libknot/descriptor.h>
#include "lib/cache.h"
#include "daemon/bindings.h"
register_lib(L, "event", lib);
return 1;
}
+
+static inline struct worker_ctx *wrk_luaget(lua_State *L) {
+ lua_getglobal(L, "__worker");
+ struct worker_ctx *worker = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+ return worker;
+}
+
+static int wrk_resolve(lua_State *L)
+{
+ struct worker_ctx *worker = wrk_luaget(L);
+ if (!worker) {
+ return 0;
+ }
+ /* Create query packet */
+ knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, NULL);
+ if (!pkt) {
+ lua_pushstring(L, strerror(ENOMEM));
+ lua_error(L);
+ }
+ uint8_t dname[KNOT_DNAME_MAXLEN];
+ knot_dname_from_str(dname, lua_tostring(L, 1), sizeof(dname));
+ /* Check class and type */
+ uint16_t rrtype = lua_tointeger(L, 2);
+ if (!lua_isnumber(L, 2)) {
+ lua_pushstring(L, "invalid RR type");
+ lua_error(L);
+ }
+ uint16_t rrclass = lua_tointeger(L, 3);
+ if (!lua_isnumber(L, 3)) { /* Default class is IN */
+ rrclass = KNOT_CLASS_IN;
+ }
+ knot_pkt_put_question(pkt, dname, rrclass, rrtype);
+ knot_wire_set_rd(pkt->wire);
+ /* Resolve it */
+ int ret = worker_resolve(worker, pkt);
+ knot_pkt_free(&pkt);
+ if (ret != 0) {
+ lua_pushstring(L, kr_strerror(ret));
+ lua_error(L);
+ }
+ lua_pushboolean(L, true);
+ return 1;
+}
+
+/** Return worker statistics. */
+static int wrk_stats(lua_State *L)
+{
+ struct worker_ctx *worker = wrk_luaget(L);
+ if (!worker) {
+ return 0;
+ }
+ lua_newtable(L);
+ lua_pushnumber(L, worker->stats.concurrent);
+ lua_setfield(L, -2, "concurrent");
+ lua_pushnumber(L, worker->stats.udp);
+ lua_setfield(L, -2, "udp");
+ lua_pushnumber(L, worker->stats.tcp);
+ lua_setfield(L, -2, "tcp");
+ return 1;
+}
+
+int lib_worker(lua_State *L)
+{
+ static const luaL_Reg lib[] = {
+ { "resolve", wrk_resolve },
+ { "stats", wrk_stats },
+ { NULL, NULL }
+ };
+ register_lib(L, "worker", lib);
+ return 1;
+}
* @param L scriptable
* @return number of packages to load
*/
-int lib_event(lua_State *L);
\ No newline at end of file
+int lib_event(lua_State *L);
+
+/**
+ * Load worker API.
+ * @param L scriptable
+ * @return number of packages to load
+ */
+int lib_worker(lua_State *L);
\ No newline at end of file
engine_lualib(engine, "cache", lib_cache);
engine_lualib(engine, "event", lib_event);
engine_lualib(engine, "kres", lib_kres);
+ engine_lualib(engine, "worker", lib_worker);
/* Create main worker. */
struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
worker->loop = loop;
loop->data = worker;
worker_reserve(worker, MP_FREELIST_SIZE);
+ /* Register worker in Lua thread */
+ lua_pushlightuserdata(engine->L, worker);
+ lua_setglobal(engine->L, "__worker");
return worker;
}
task->next_query = pktbuf;
/* Start resolution */
- uv_timer_init(handle->loop, &task->timeout);
+ uv_timer_init(worker->loop, &task->timeout);
task->timeout.data = task;
kr_resolve_begin(&task->req, &engine->resolver, answer);
+ worker->stats.concurrent += 1;
return task;
}
struct qr_task *task = handle->data;
/* Return handle to the event loop in case
* it was exclusively taken by this task. */
- if (!uv_has_ref(task->source.handle)) {
+ if (task->source.handle && !uv_has_ref(task->source.handle)) {
uv_ref(task->source.handle);
io_start_read(task->source.handle);
}
mp_delete_count = 0;
}
#endif
+ /* Update stats */
+ worker->stats.concurrent -= 1;
}
static void qr_task_timeout(uv_timer_t *req)
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
+ if (!handle) {
+ return qr_task_on_send(task, kr_error(EIO));
+ }
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
+ if (handle != task->source.handle)
+ task->worker->stats.udp += 1;
} else {
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
{ (char *)pkt->wire, pkt->size }
};
ret = uv_try_write((uv_stream_t *)handle, buf, 2);
+ if (handle != task->source.handle)
+ task->worker->stats.tcp += 1;
}
return qr_task_on_send(task, (ret >= 0) ? 0 : -1);
}
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
+ /* Send back answer */
(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
}
/* Create connection for iterative query */
- uv_handle_t *source_handle = task->source.handle;
- task->next_handle = io_create(source_handle->loop, sock_type);
+ task->next_handle = io_create(task->worker->loop, sock_type);
if (task->next_handle == NULL) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
}
connect->data = task;
} else {
+ printf("sending: %s %u\n", knot_dname_to_str_alloc(knot_pkt_qname(next_query)), knot_pkt_qtype(next_query));
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
return qr_task_step(task, NULL);
}
return qr_task_step(task, query);
}
+int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query)
+{
+ if (!worker) {
+ return kr_error(EINVAL);
+ }
+
+ /* Create task */
+ struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
+ if (!task) {
+ return kr_error(ENOMEM);
+ }
+ return qr_task_step(task, query);
+}
+
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pools);
uv_loop_t *loop;
mm_ctx_t *mm;
uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE];
+ struct {
+ size_t concurrent;
+ size_t udp;
+ size_t tcp;
+ } stats;
mp_freelist_t pools;
};
/**
- * Resolve query.
- *
- * @param worker
- * @param handle
- * @param answer
- * @param query
- * @param addr
- * @return 0, error code
+ * Process incoming packet (query or answer to subrequest).
+ * @return 0 or an error code
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
+/**
+ * Schedule query for resolution.
+ * @return 0 or an error code
+ */
+int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query);
+
/** Reserve worker buffers */
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);