From: Marek VavruĊĦa Date: Mon, 8 Jan 2018 21:32:21 +0000 (-0800) Subject: Merge remote-tracking branch 'origin/master' into query-trace X-Git-Tag: v2.0.0~16^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=97729953c8983d0787b01814d0e5278f5a80779e;p=thirdparty%2Fknot-resolver.git Merge remote-tracking branch 'origin/master' into query-trace --- 97729953c8983d0787b01814d0e5278f5a80779e diff --cc daemon/bindings.c index f91f54d1b,9da34be01..42aee6020 --- a/daemon/bindings.c +++ b/daemon/bindings.c @@@ -1240,27 -1431,18 +1423,27 @@@ static int wrk_resolve(lua_State *L knot_wire_set_cd(pkt->wire); } + /* Create task and start with a first question */ + struct qr_task *task = worker_resolve_start(worker, pkt, *options); + if (!task) { + knot_rrset_free(&pkt->opt_rr, NULL); + knot_pkt_free(&pkt); + lua_pushstring(L, "couldn't create a resolution request"); + lua_error(L); + } + + /* Add initialisation callback */ if (lua_isfunction(L, 5)) { - /* Store callback in registry */ lua_pushvalue(L, 5); - lua_pushlightuserdata(L, &task->req); - int cb = luaL_ref(L, LUA_REGISTRYINDEX); - ret = worker_resolve(worker, pkt, *options, resolve_callback, (void *) (intptr_t)cb); - } else { - ret = worker_resolve(worker, pkt, *options, NULL, NULL); ++ lua_pushlightuserdata(L, worker_task_request(task)); + (void) execute_callback(L, 1); } - + + /* Start execution */ + int ret = worker_resolve_exec(task, pkt); + lua_pushboolean(L, ret == 0); knot_rrset_free(&pkt->opt_rr, NULL); knot_pkt_free(&pkt); - lua_pushboolean(L, ret == 0); return 1; } diff --cc daemon/io.c index 5ebd3ac87,0cfe5b3c5..d8a6efb9e --- a/daemon/io.c +++ b/daemon/io.c @@@ -101,10 -108,21 +108,11 @@@ static void session_release(struct work } } --static uv_stream_t *handle_alloc(uv_loop_t *loop) -{ - uv_stream_t *handle = calloc(1, sizeof(uv_handles_t)); - if (!handle) { - return NULL; - } - - return handle; -} - + static uv_stream_t *handle_borrow(uv_loop_t *loop) { - uv_stream_t *handle = calloc(1, sizeof(*handle)); - if (!handle) { + struct worker_ctx *worker = loop->data; + void *req = worker_iohandle_borrow(worker); + if (!req) { return NULL; } @@@ -189,16 -216,12 +206,11 @@@ int udp_bindfd(uv_udp_t *handle, int fd return udp_bind_finalize((uv_handle_t *)handle); } - static void tcp_timeout(uv_handle_t *timer) - { - uv_handle_t *handle = timer->data; - uv_close(handle, io_free); - } - static void tcp_timeout_trigger(uv_timer_t *timer) { - uv_handle_t *handle = timer->data; - struct session *session = handle->data; + struct session *session = timer->data; - struct worker_ctx *worker = timer->loop->data; + + assert(session->outgoing == false); if (session->tasks.len > 0) { uv_timer_again(timer); } else { diff --cc daemon/lua/kres.lua index 99f9ffd85,216911975..dd4e567c2 --- a/daemon/lua/kres.lua +++ b/daemon/lua/kres.lua @@@ -151,26 -181,13 +181,27 @@@ local const_rcode = BADVERS = 16, BADCOOKIE = 23, } - +-- This corresponds to `enum kr_rank`, it's not possible to do this without introspection unfortunately +local const_rank = { + INITIAL = 0, + OMIT = 1, + TRY = 2, + INDET = 4, + BOGUS = 5, + MISMATCH = 6, + MISSING = 7, + INSECURE = 8, + AUTH = 16, + SECURE = 32 +} - -- Create inverse table - local const_rank_tostring = {} - for k, v in pairs(const_rank) do - const_rank_tostring[v] = k - end + -- Constant tables + local const_class_str = itable(const_class) + local const_type_str = itable(const_type) + local const_rcode_str = itable(const_rcode) + local const_opcode_str = itable(const_opcode) + local const_section_str = itable(const_section) ++local const_rank_str = itable(const_rank) -- Metatype for RR types to allow anonymous types setmetatable(const_type, { @@@ -204,9 -242,8 +256,9 @@@ local knot_rrset_t = ffi.typeof('knot_r ffi.metatype( knot_rrset_t, { -- beware: `owner` and `rdata` are typed as a plain lua strings -- and not the real types they represent. + __tostring = function(rr) return rr:txt_dump() end, __index = { - owner = function(rr) return ffi.string(rr._owner, knot.knot_dname_size(rr._owner)) end, + owner = function(rr) return dname2wire(rr._owner) end, ttl = function(rr) return tonumber(knot.knot_rrset_ttl(rr)) end, rdata = function(rr, i) local rdata = knot.knot_rdataset_at(rr.rrs, i) @@@ -339,35 -529,6 +544,30 @@@ ffi.metatype( kr_request_t, }, }) +-- C array iterator +local function c_array_iter(t, i) + i = i + 1 + if i >= t.len then return end + return i, t.at[i][0] +end + +-- Metatype for ranked record array +local ranked_rr_array_t = ffi.typeof('ranked_rr_array_t') +ffi.metatype(ranked_rr_array_t, { + __len = function(self) + return tonumber(self.len) + end, + __ipairs = function (self) + return c_array_iter, self, -1 + end, + __index = { + get = function (self, i) + if i < 0 or i > self.len then return nil end + return self.at[i][0] + end, + } +}) + - -- Pretty print for domain name - local function dname2str(dname) - return ffi.string(ffi.gc(C.knot_dname_to_str(nil, dname, 0), C.free)) - end - -- Pretty-print a single RR (which is a table with .owner .ttl .type .rdata) -- Extension: append .comment if exists. local function rr2str(rr, style) @@@ -401,8 -562,16 +601,18 @@@ kres = type = const_type, section = const_section, rcode = const_rcode, + opcode = const_opcode, + rank = const_rank, - rank_tostring = const_rank_tostring, + + -- Constants to strings + tostring = { + class = const_class_str, + type = const_type_str, + section = const_section_str, + rcode = const_rcode_str, + opcode = const_opcode_str, ++ rank = const_rank_str, + }, -- Create a struct kr_qflags from a single flag name or a list of names. mk_qflags = function (names) diff --cc daemon/worker.c index 44e9d0932,522e5803b..fbac669e1 --- a/daemon/worker.c +++ b/daemon/worker.c @@@ -37,29 -38,56 +38,54 @@@ #define VERBOSE_MSG(qry, fmt...) QRVERBOSE(qry, "wrkr", fmt) - /* @internal Union of various libuv objects for freelist. */ - struct req - { - union { - /* Socket handles, these have session as their `handle->data` and own it. */ - uv_udp_t udp; - uv_tcp_t tcp; - /* I/O events, these have only a reference to the task they're operating on. */ - uv_udp_send_t send; - uv_write_t write; - uv_connect_t connect; - /* Timer events */ - uv_timer_t timer; - } as; + /** Client request state. */ + struct request_ctx + { + struct kr_request req; + struct { + union inaddr addr; + union inaddr dst_addr; + /* uv_handle_t *handle; */ + + /** NULL if the request didn't come over network. */ + struct session *session; + } source; + struct worker_ctx *worker; + qr_tasklist_t tasks; + }; + + /** Query resolution task. */ + struct qr_task + { + struct request_ctx *ctx; + knot_pkt_t *pktbuf; + qr_tasklist_t waiting; + uv_handle_t *pending[MAX_PENDING]; + uint16_t pending_count; + uint16_t addrlist_count; + uint16_t addrlist_turn; + uint16_t timeouts; + uint16_t iter_count; + uint16_t bytes_remaining; + struct sockaddr *addrlist; - worker_cb_t on_complete; - void *baton; + uint32_t refs; + bool finished : 1; + bool leading : 1; }; + /* Convenience macros */ #define qr_task_ref(task) \ do { ++(task)->refs; } while(0) #define qr_task_unref(task) \ - do { if (--(task)->refs == 0) { qr_task_free(task); } } while (0) + do { if (task && --(task)->refs == 0) { qr_task_free(task); } } while (0) #define qr_valid_handle(task, checked) \ - (!uv_is_closing((checked)) || (task)->source.handle == (checked)) + (!uv_is_closing((checked)) || (task)->ctx->source.session->handle == (checked)) + + /** @internal get key for tcp session + * @note kr_straddr() return pointer to static string + */ + #define tcpsess_key(addr) kr_straddr(addr) /* Forward decls */ static void qr_task_free(struct qr_task *task); @@@ -71,26 -129,71 +127,69 @@@ static inline struct worker_ctx *get_wo return uv_default_loop()->data; } - static inline struct req *req_borrow(struct worker_ctx *worker) + static inline void *iohandle_borrow(struct worker_ctx *worker) + { + void *h = NULL; + + const size_t size = sizeof(uv_handles_t); + if (worker->pool_iohandles.len > 0) { + h = array_tail(worker->pool_iohandles); + array_pop(worker->pool_iohandles); + kr_asan_unpoison(h, size); + } else { + h = malloc(size); + } + + return h; + } + + static inline void iohandle_release(struct worker_ctx *worker, void *h) + { + assert(h); + - const size_t size = sizeof(uv_handles_t); + if (worker->pool_iohandles.len < MP_FREELIST_SIZE) { + array_push(worker->pool_iohandles, h); - kr_asan_poison(h, size); ++ kr_asan_poison(h, sizeof(uv_handles_t)); + } else { + free(h); + } + } + + void *worker_iohandle_borrow(struct worker_ctx *worker) + { + return iohandle_borrow(worker); + } + + void worker_iohandle_release(struct worker_ctx *worker, void *h) + { + iohandle_release(worker, h); + } + + static inline void *iorequest_borrow(struct worker_ctx *worker) { - struct req *req = NULL; - if (worker->pool_ioreq.len > 0) { - req = array_tail(worker->pool_ioreq); - array_pop(worker->pool_ioreq); - kr_asan_unpoison(req, sizeof(*req)); + void *r = NULL; + + const size_t size = sizeof(uv_reqs_t); + if (worker->pool_ioreqs.len > 0) { + r = array_tail(worker->pool_ioreqs); + array_pop(worker->pool_ioreqs); + kr_asan_unpoison(r, size); } else { - req = malloc(sizeof(*req)); + r = malloc(size); } - return req; + + return r; } - static inline void req_release(struct worker_ctx *worker, struct req *req) + static inline void iorequest_release(struct worker_ctx *worker, void *r) { - if (!req || worker->pool_ioreq.len < 4 * MP_FREELIST_SIZE) { - array_push(worker->pool_ioreq, req); - kr_asan_poison(req, sizeof(*req)); + assert(r); + - const size_t size = sizeof(uv_reqs_t); + if (worker->pool_ioreqs.len < MP_FREELIST_SIZE) { + array_push(worker->pool_ioreqs, r); - kr_asan_poison(r, size); ++ kr_asan_poison(r, sizeof(uv_reqs_t)); } else { - free(req); + free(r); } } @@@ -250,38 -505,30 +501,31 @@@ static struct request_ctx *request_crea .alloc = (knot_mm_alloc_t) mp_alloc }; - /* Create resolution task */ - struct qr_task *task = mm_alloc(&pool, sizeof(*task)); - if (!task) { - mp_delete(pool.ctx); + /* Create request context */ + struct request_ctx *ctx = mm_alloc(&pool, sizeof(*ctx)); + if (!ctx) { + pool_release(worker, pool.ctx); return NULL; } - memset(&task->req, 0, sizeof(task->req)); + - /* Create packet buffers for answer and subrequests */ - task->req.pool = pool; - knot_pkt_t *pktbuf = knot_pkt_new(NULL, pktbuf_max, &task->req.pool); - if (!pktbuf) { - mp_delete(pool.ctx); - return NULL; + memset(ctx, 0, sizeof(*ctx)); + + /* TODO Relocate pool to struct request */ + ctx->worker = worker; + array_init(ctx->tasks); + struct session *session = handle ? handle->data : NULL; + if (session) { + assert(session->outgoing == false); } - pktbuf->size = 0; - task->pktbuf = pktbuf; - array_init(task->waiting); - task->addrlist = NULL; - task->pending_count = 0; - task->bytes_remaining = 0; - task->iter_count = 0; - task->timeouts = 0; - task->refs = 1; - task->finished = false; - task->leading = false; - task->worker = worker; - task->session = NULL; - task->source.handle = handle; - task->timeout = NULL; + ctx->source.session = session; + + struct kr_request *req = &ctx->req; + req->pool = pool; + /* Remember query source addr */ - if (addr) { + if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) { + ctx->source.addr.ip.sa_family = AF_UNSPEC; + } else { size_t addr_len = sizeof(struct sockaddr_in); if (addr->sa_family == AF_INET6) addr_len = sizeof(struct sockaddr_in6); @@@ -414,12 -756,24 +753,22 @@@ static int qr_task_register(struct qr_t static void qr_task_complete(struct qr_task *task) { + struct request_ctx *ctx = task->ctx; - struct worker_ctx *worker = ctx->worker; ++ /* Kill pending I/O requests */ - ioreq_killall(task); + ioreq_kill_pending(task); assert(task->waiting.len == 0); assert(task->leading == false); - /* Run the completion callback. */ - if (task->on_complete) { - task->on_complete(worker, &ctx->req, task->baton); - } ++ + struct session *source_session = ctx->source.session; + if (source_session) { + assert(source_session->outgoing == false && + source_session->waiting.len == 0); + session_del_tasks(source_session, task); + } ++ /* Release primary reference to task. */ - qr_task_unref(task); + request_del_tasks(ctx, task); } /* This is called when we send subrequest / answer */ @@@ -504,64 -944,283 +939,299 @@@ static int qr_task_send(struct qr_task return ret; } } - /* Send using given protocol */ - if (handle->type == UV_UDP) { - uv_buf_t buf = { (char *)pkt->wire, pkt->size }; - send_req->as.send.data = task; - ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send); - } else { - uint16_t pkt_size = htons(pkt->size); - uv_buf_t buf[2] = { - { (char *)&pkt_size, sizeof(pkt_size) }, - { (char *)pkt->wire, pkt->size } - }; - send_req->as.write.data = task; - ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write); - } - if (ret == 0) { - qr_task_ref(task); /* Pending ioreq on current task */ - } else { - req_release(task->worker, send_req); + /* Send using given protocol */ + if (handle->type == UV_UDP) { + uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq; + uv_buf_t buf = { (char *)pkt->wire, pkt->size }; + send_req->data = task; + ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send); + } else if (handle->type == UV_TCP) { + uv_write_t *write_req = (uv_write_t *)ioreq; + uint16_t pkt_size = htons(pkt->size); + uv_buf_t buf[2] = { + { (char *)&pkt_size, sizeof(pkt_size) }, + { (char *)pkt->wire, pkt->size } + }; + write_req->data = task; + ret = uv_write(write_req, (uv_stream_t *)handle, buf, 2, &on_write); + } else { + assert(false); + } + + if (ret == 0) { + qr_task_ref(task); /* Pending ioreq on current task */ + if (worker->too_many_open && + worker->stats.rconcurrent < + worker->rconcurrent_highwatermark - 10) { + worker->too_many_open = false; + } + } else { + iorequest_release(worker, ioreq); + if (ret == UV_EMFILE) { + worker->too_many_open = true; + worker->rconcurrent_highwatermark = worker->stats.rconcurrent; + } + } + + /* Update statistics */ + if (ctx->source.session && + handle != ctx->source.session->handle && + addr) { + if (handle->type == UV_UDP) + worker->stats.udp += 1; + else + worker->stats.tcp += 1; + if (addr->sa_family == AF_INET6) + worker->stats.ipv6 += 1; + else if (addr->sa_family == AF_INET) + worker->stats.ipv4 += 1; + } + return ret; + } + + static int session_next_waiting_send(struct session *session) + { + union inaddr *peer = &session->peer; + int ret = kr_ok(); + if (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf); + } + session->timeout.data = session; + timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); + return ret; + } + + static int session_tls_hs_cb(struct session *session, int status) + { + VERBOSE_MSG(NULL, "=> server: '%s' TLS handshake has %s\n", + kr_straddr(&session->peer.ip), status ? "failed" : "completed"); + + struct worker_ctx *worker = get_worker(); + union inaddr *peer = &session->peer; + int deletion_res = worker_del_tcp_waiting(worker, &peer->ip); + + if (status) { + for (size_t i = 0; i < session->waiting.len; ++i) { + struct qr_task *task = session->waiting.at[0]; + struct kr_query *qry = array_tail(task->ctx->req.rplan.pending); + kr_nsrep_update_rtt(&qry->ns, &peer->ip, KR_NS_TIMEOUT, + worker->engine->resolver.cache_rtt, KR_NS_UPDATE); + } + } else { + if (deletion_res != 0) { + /* session isn't in list of waiting queries, * + * something gone wrong */ + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + session_del_tasks(session, task); + array_del(session->waiting, 0); + qr_task_finalize(task, KR_STATE_FAIL); + qr_task_unref(task); + } + assert(session->tasks.len == 0); + session_close(session); + return kr_ok(); + } + + int ret = session_next_waiting_send(session); + if (ret == kr_ok()) { + struct worker_ctx *worker = get_worker(); + union inaddr *peer = &session->peer; + int ret = worker_add_tcp_connected(worker, &peer->ip, session); + assert(ret == 0); + } + } + return kr_ok(); + } + ++static struct kr_query *session_current_query(struct session *session) ++{ ++ if (session->waiting.len == 0) { ++ return NULL; ++ } ++ ++ struct qr_task *task = session->waiting.at[0]; ++ if (task->ctx->req.rplan.pending.len == 0) { ++ return NULL; ++ } ++ ++ return array_tail(task->ctx->req.rplan.pending); ++} ++ + static void on_connect(uv_connect_t *req, int status) + { + struct worker_ctx *worker = get_worker(); + uv_stream_t *handle = req->handle; + struct session *session = handle->data; + + union inaddr *peer = &session->peer; + uv_timer_stop((uv_timer_t *)&session->timeout); + + if (status == UV_ECANCELED) { + worker_del_tcp_waiting(worker, &peer->ip); + assert(session->closing && session->waiting.len == 0 && session->tasks.len == 0); + iorequest_release(worker, req); + return; + } + + if (session->closing) { + worker_del_tcp_waiting(worker, &peer->ip); + assert(session->waiting.len == 0 && session->tasks.len == 0); + iorequest_release(worker, req); + return; + } + + if (status != 0) { + worker_del_tcp_waiting(worker, &peer->ip); + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + session_del_tasks(session, task); + array_del(session->waiting, 0); + assert(task->refs > 1); + qr_task_unref(task); + qr_task_step(task, NULL, NULL); + } + assert(session->tasks.len == 0); + iorequest_release(worker, req); + session_close(session); + return; + } + + if (!session->has_tls) { + /* if there is a TLS, session still waiting for handshake, + * otherwise remove it from waiting list */ + if (worker_del_tcp_waiting(worker, &peer->ip) != 0) { + /* session isn't in list of waiting queries, * + * something gone wrong */ + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + session_del_tasks(session, task); + array_del(session->waiting, 0); + qr_task_finalize(task, KR_STATE_FAIL); + qr_task_unref(task); + } + assert(session->tasks.len == 0); + iorequest_release(worker, req); + session_close(session); + return; + } + } + - WITH_VERBOSE { ++ struct kr_query *qry = session_current_query(session); ++ WITH_VERBOSE (qry) { + char addr_str[INET6_ADDRSTRLEN]; + inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), + addr_str, sizeof(addr_str)); - VERBOSE_MSG(NULL, "=> connected to '%s'\n", addr_str); ++ VERBOSE_MSG(qry, "=> connected to '%s'\n", addr_str); + } + + session->connected = true; + session->handle = (uv_handle_t *)handle; + + int ret = kr_ok(); + if (session->has_tls) { + ret = tls_client_connect_start(session->tls_client_ctx, + session, session_tls_hs_cb); + if (ret == kr_error(EAGAIN)) { + iorequest_release(worker, req); + io_start_read(session->handle); + return; + } + } + + if (ret == kr_ok()) { + ret = session_next_waiting_send(session); + if (ret == kr_ok()) { + worker_add_tcp_connected(worker, &session->peer.ip, session); + iorequest_release(worker, req); + return; + } } - /* Update statistics */ - if (handle != task->source.handle && addr) { - if (handle->type == UV_UDP) - task->worker->stats.udp += 1; - else - task->worker->stats.tcp += 1; - if (addr->sa_family == AF_INET6) - task->worker->stats.ipv6 += 1; - else - task->worker->stats.ipv4 += 1; + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + session_del_tasks(session, task); + array_del(session->waiting, 0); + qr_task_finalize(task, KR_STATE_FAIL); + qr_task_unref(task); } - return ret; + + assert(session->tasks.len == 0); + + iorequest_release(worker, req); + session_close(session); } - static void on_connect(uv_connect_t *req, int status) + static void on_tcp_connect_timeout(uv_timer_t *timer) { + struct session *session = timer->data; + + uv_timer_stop(timer); struct worker_ctx *worker = get_worker(); - struct qr_task *task = req->data; - uv_stream_t *handle = req->handle; - if (qr_valid_handle(task, (uv_handle_t *)req->handle)) { - if (status == 0) { - struct sockaddr_storage addr; - int addr_len = sizeof(addr); - uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len); - qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf); - } else { - qr_task_step(task, task->addrlist, NULL); - } + + assert (session->waiting.len == session->tasks.len); + + union inaddr *peer = &session->peer; + worker_del_tcp_waiting(worker, &peer->ip); + - WITH_VERBOSE { ++ struct kr_query *qry = session_current_query(session); ++ WITH_VERBOSE (qry) { + char addr_str[INET6_ADDRSTRLEN]; + inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str)); - VERBOSE_MSG(NULL, "=> connection to '%s' failed\n", addr_str); ++ VERBOSE_MSG(qry, "=> connection to '%s' failed\n", addr_str); } - qr_task_unref(task); - req_release(worker, (struct req *)req); + + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + struct request_ctx *ctx = task->ctx; + assert(ctx); + task->timeouts += 1; + worker->stats.timeout += 1; + session_del_tasks(session, task); + array_del(session->waiting, 0); + assert(task->refs > 1); + qr_task_unref(task); + qr_task_step(task, NULL, NULL); + } + + assert (session->tasks.len == 0); + session_close(session); } - static void on_timer_close(uv_handle_t *handle) + static void on_tcp_watchdog_timeout(uv_timer_t *timer) { - struct qr_task *task = handle->data; - req_release(task->worker, (struct req *)handle); - qr_task_unref(task); + struct session *session = timer->data; + + assert(session->outgoing); + uv_timer_stop(timer); + struct worker_ctx *worker = get_worker(); + + if (session->outgoing) { + worker_del_tcp_connected(worker, &session->peer.ip); + + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + task->timeouts += 1; + worker->stats.timeout += 1; + array_del(session->waiting, 0); + session_del_tasks(session, task); + qr_task_finalize(task, KR_STATE_FAIL); + qr_task_unref(task); + } + } + + while (session->tasks.len > 0) { + struct qr_task *task = session->tasks.at[0]; + task->timeouts += 1; + worker->stats.timeout += 1; + assert(task->refs > 1); + array_del(session->tasks, 0); + qr_task_finalize(task, KR_STATE_FAIL); + qr_task_unref(task); + } + + session_close(session); } /* This is called when I/O timeouts */ @@@ -789,33 -1492,179 +1502,180 @@@ static int qr_task_step(struct qr_task * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly. */ subreq_lead(task); - } else { - uv_connect_t *conn = (uv_connect_t *)req_borrow(task->worker); - if (!conn) { - return qr_task_step(task, NULL, NULL); + struct session *session = handle->data; + assert(session->handle->type == UV_UDP); + ret = timer_start(session, on_retransmit, timeout, 0); + /* Start next step with timeout, fatal if can't start a timer. */ + if (ret != 0) { + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); } + } else { + assert (sock_type == SOCK_STREAM); const struct sockaddr *addr = packet_source ? packet_source : task->addrlist; - uv_handle_t *client = ioreq_spawn(task, sock_type, addr->sa_family); - if (!client) { - req_release(task->worker, (struct req *)conn); - return qr_task_step(task, NULL, NULL); - } - conn->data = task; - if (uv_tcp_connect(conn, (uv_tcp_t *)client, addr , on_connect) != 0) { - req_release(task->worker, (struct req *)conn); - return qr_task_step(task, NULL, NULL); + if (addr->sa_family == AF_UNSPEC) { + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); } - qr_task_ref(task); /* Connect request borrows task */ - ret = timer_start(task, on_timeout, KR_CONN_RTT_MAX, 0); - } + struct session* session = NULL; + if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) { + assert(session->outgoing); + if (session->closing) { + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + /* There are waiting tasks. + * It means that connection establishing or data sending + * is coming right now. */ + /* Task will be notified in on_connect() or qr_task_on_send(). */ + ret = session_add_waiting(session, task); + if (ret < 0) { + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + ret = session_add_tasks(session, task); + if (ret < 0) { + session_del_waiting(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) { + /* Connection has been already established */ + assert(session->outgoing); + if (session->closing) { + session_del_tasks(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } - /* Start next step with timeout, fatal if can't start a timer. */ - if (ret != 0) { - subreq_finalize(task, packet_source, packet); - return qr_task_finalize(task, KR_STATE_FAIL); + if (session->tasks.len >= worker->tcp_pipeline_max) { + session_del_tasks(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + + /* will be removed in qr_task_on_send() */ + ret = session_add_waiting(session, task); + if (ret < 0) { + session_del_tasks(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + ret = session_add_tasks(session, task); + if (ret < 0) { + session_del_waiting(session, task); + session_del_tasks(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + if (session->waiting.len == 1) { + ret = qr_task_send(task, session->handle, + &session->peer.ip, task->pktbuf); + if (ret < 0) { + session_del_waiting(session, task); + session_del_tasks(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + ret = timer_start(session, on_tcp_watchdog_timeout, + KR_CONN_RTT_MAX, 0); + if (ret < 0) { + assert(false); + session_del_waiting(session, task); + session_del_tasks(session, task); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + } + task->pending[task->pending_count] = session->handle; + task->pending_count += 1; + } else { + /* Make connection */ + uv_connect_t *conn = (uv_connect_t *)iorequest_borrow(ctx->worker); + if (!conn) { + return qr_task_step(task, NULL, NULL); + } + uv_handle_t *client = ioreq_spawn(task, sock_type, + addr->sa_family); + if (!client) { + iorequest_release(ctx->worker, conn); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + session = client->data; + ret = worker_add_tcp_waiting(ctx->worker, addr, session); + if (ret < 0) { + session_del_tasks(session, task); + iorequest_release(ctx->worker, conn); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + /* will be removed in qr_task_on_send() */ + ret = session_add_waiting(session, task); + if (ret < 0) { + session_del_tasks(session, task); + worker_del_tcp_waiting(ctx->worker, addr); + iorequest_release(ctx->worker, conn); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + + /* Check if there must be TLS */ + struct engine *engine = ctx->worker->engine; + struct network *net = &engine->net; + const char *key = tcpsess_key(addr); + struct tls_client_paramlist_entry *entry = map_get(&net->tls_client_params, key); + if (entry) { + assert(session->tls_client_ctx == NULL); + struct tls_client_ctx_t *tls_ctx = tls_client_ctx_new(entry); + if (!tls_ctx) { + session_del_tasks(session, task); + session_del_waiting(session, task); + worker_del_tcp_waiting(ctx->worker, addr); + iorequest_release(ctx->worker, conn); + subreq_finalize(task, packet_source, packet); + return qr_task_step(task, NULL, NULL); + } + tls_client_ctx_set_params(tls_ctx, entry, session); + session->tls_client_ctx = tls_ctx; + session->has_tls = true; + } + + conn->data = session; + memcpy(&session->peer, addr, sizeof(session->peer)); + + ret = timer_start(session, on_tcp_connect_timeout, + KR_CONN_RTT_MAX, 0); + if (ret != 0) { + session_del_tasks(session, task); + session_del_waiting(session, task); + worker_del_tcp_waiting(ctx->worker, addr); + iorequest_release(ctx->worker, conn); + subreq_finalize(task, packet_source, packet); + return qr_task_finalize(task, KR_STATE_FAIL); + } + - WITH_VERBOSE { ++ struct kr_query *qry = session_current_query(session); ++ WITH_VERBOSE (qry) { + char addr_str[INET6_ADDRSTRLEN]; + inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str)); - VERBOSE_MSG(NULL, "=> connecting to: '%s'\n", addr_str); ++ VERBOSE_MSG(qry, "=> connecting to: '%s'\n", addr_str); + } + + if (uv_tcp_connect(conn, (uv_tcp_t *)client, + addr , on_connect) != 0) { + uv_timer_stop(&session->timeout); + session_del_tasks(session, task); + session_del_waiting(session, task); + worker_del_tcp_waiting(ctx->worker, addr); + iorequest_release(ctx->worker, conn); + subreq_finalize(task, packet_source, packet); + return qr_task_step(task, NULL, NULL); + } + } } - return 0; + return kr_ok(); } static int parse_packet(knot_pkt_t *query) @@@ -917,77 -1889,221 +1900,223 @@@ int worker_process_tcp(struct worker_ct if (len <= 0 || !msg) { /* If we have pending tasks, we must dissociate them from the * connection so they don't try to access closed and freed handle. - * @warning Do not modify task if this is outgoing request as it is shared with originator. + * @warning Do not modify task if this is outgoing request + * as it is shared with originator. */ - if (!session->outgoing) { - for (size_t i = 0; i < session->tasks.len; ++i) { - struct qr_task *task = session->tasks.at[i]; - task->session = NULL; - task->source.handle = NULL; - WITH_VERBOSE { ++ struct kr_query *qry = session_current_query(session); ++ WITH_VERBOSE (qry) { + char addr_str[INET6_ADDRSTRLEN]; + inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), + addr_str, sizeof(addr_str)); - VERBOSE_MSG(NULL, "=> connection to '%s' closed by peer\n", addr_str); ++ VERBOSE_MSG(qry, "=> connection to '%s' closed by peer\n", addr_str); + } + uv_timer_t *timer = &session->timeout; + uv_timer_stop(timer); + struct sockaddr *peer = &session->peer.ip; + worker_del_tcp_connected(worker, peer); + session->connected = false; + + if (session->tls_client_ctx) { + /* Avoid gnutls_bye() call */ + tls_client_set_hs_state(session->tls_client_ctx, + TLS_HS_NOT_STARTED); + } + + if (session->outgoing && session->buffering) { + session->buffering = NULL; + } + + assert(session->tasks.len >= session->waiting.len); + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + array_del(session->waiting, 0); + assert(task->refs > 1); + session_del_tasks(session, task); + if (session->outgoing) { + if (task->ctx->req.options.FORWARD) { + /* We are in TCP_FORWARD mode. + * To prevent failing at kr_resolve_consume() + * qry.flags.TCP must be cleared. + * TODO - refactoring is needed. */ + struct kr_request *req = &task->ctx->req; + struct kr_rplan *rplan = &req->rplan; + struct kr_query *qry = array_tail(rplan->pending); + qry->flags.TCP = false; + } + qr_task_step(task, NULL, NULL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } + qr_task_unref(task); + } + while (session->tasks.len > 0) { + struct qr_task *task = session->tasks.at[0]; + if (session->outgoing) { + if (task->ctx->req.options.FORWARD) { + struct kr_request *req = &task->ctx->req; + struct kr_rplan *rplan = &req->rplan; + struct kr_query *qry = array_tail(rplan->pending); + qry->flags.TCP = false; + } + qr_task_step(task, NULL, NULL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; } - session->tasks.len = 0; + session_del_tasks(session, task); + } + session_close(session); + return kr_ok(); + } + + if (session->outgoing) { + uv_timer_stop(&session->timeout); + timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0); + } + + if (session->bytes_to_skip) { + assert(session->buffering == NULL); + ssize_t min_len = MIN(session->bytes_to_skip, len); + len -= min_len; + msg += min_len; + session->bytes_to_skip -= min_len; + if (len < 0 || session->bytes_to_skip < 0) { + /* Something gone wrong. + * Better kill the connection */ + assert(false); + return kr_error(EILSEQ); + } + if (len == 0) { + return kr_ok(); } - return kr_error(ECONNRESET); + assert(session->bytes_to_skip == 0); } int submitted = 0; struct qr_task *task = session->buffering; + knot_pkt_t *pkt_buf = NULL; + if (task) { + pkt_buf = task->pktbuf; + } else { + /* Update DNS header in session->msg_hdr* */ + assert(session->msg_hdr_idx <= sizeof(session->msg_hdr)); + ssize_t hdr_amount = sizeof(session->msg_hdr) - + session->msg_hdr_idx; + if (hdr_amount > len) { + hdr_amount = len; + } + if (hdr_amount > 0) { + memcpy(session->msg_hdr + session->msg_hdr_idx, msg, hdr_amount); + session->msg_hdr_idx += hdr_amount; + len -= hdr_amount; + msg += hdr_amount; + } + if (len == 0) { /* no data beyond msg_hdr -> not much to do */ + return kr_ok(); + } + assert(session->msg_hdr_idx == sizeof(session->msg_hdr)); + session->msg_hdr_idx = 0; + uint16_t msg_size = get_msg_size(session->msg_hdr); + uint16_t msg_id = knot_wire_get_id(session->msg_hdr + 2); + if (msg_size < KNOT_WIRE_HEADER_SIZE) { + /* better kill the connection; we would probably get out of sync */ + uv_timer_t *timer = &session->timeout; + uv_timer_stop(timer); + while (session->waiting.len > 0) { + struct qr_task *task = session->waiting.at[0]; + if (session->outgoing) { + qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } + array_del(session->waiting, 0); + session_del_tasks(session, task); + qr_task_unref(task); + } + while (session->tasks.len > 0) { + struct qr_task *task = session->tasks.at[0]; + if (session->outgoing) { + qr_task_finalize(task, KR_STATE_FAIL); + } else { + assert(task->ctx->source.session == session); + task->ctx->source.session = NULL; + } + session_del_tasks(session, task); + } + session_close(session); - /* If this is a new query, create a new task that we can use - * to buffer incoming message until it's complete. */ - if (!session->outgoing) { - if (!task) { - /* Get TCP peer name, keep zeroed address if it fails. */ - struct sockaddr_storage addr; - memset(&addr, 0, sizeof(addr)); - int addr_len = sizeof(addr); - uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addr_len); - task = qr_task_create(worker, (uv_handle_t *)handle, (struct sockaddr *)&addr); + return kr_ok(); + } + + /* get task */ + if (!session->outgoing) { + /* This is a new query, create a new task that we can use + * to buffer incoming message until it's complete. */ + struct sockaddr *addr = &(session->peer.ip); + assert(addr->sa_family != AF_UNSPEC); + struct request_ctx *ctx = request_create(worker, + (uv_handle_t *)handle, + addr); + if (!ctx) { + assert(false); + return kr_error(ENOMEM); + } + task = qr_task_create(ctx); if (!task) { + assert(false); + request_free(ctx); return kr_error(ENOMEM); } - session->buffering = task; + } else { + /* Start of response from upstream. + * The session task list must contain a task + * with the same msg id. */ + task = find_task(session, msg_id); + /* FIXME: on high load over one connection, it's likely + * that we will get multiple matches sooner or later (!) */ + if (task) { + knot_pkt_clear(task->pktbuf); + assert(task->leading == false); + } else { + session->bytes_to_skip = msg_size - 2; + ssize_t min_len = MIN(session->bytes_to_skip, len); + len -= min_len; + msg += min_len; + session->bytes_to_skip -= min_len; + if (len < 0 || session->bytes_to_skip < 0) { + /* Something gone wrong. + * Better kill the connection */ + assert(false); + return kr_error(EILSEQ); + } + if (len == 0) { + return submitted; + } + assert(session->bytes_to_skip == 0); + int ret = worker_process_tcp(worker, handle, msg, len); + if (ret < 0) { + submitted = ret; + } else { + submitted += ret; + } + return submitted; + } } - } else { - assert(session->tasks.len > 0); - task = array_tail(session->tasks); + + pkt_buf = task->pktbuf; + knot_wire_set_id(pkt_buf->wire, msg_id); + pkt_buf->size = 2; + task->bytes_remaining = msg_size - 2; + assert(session->buffering == NULL); + session->buffering = task; } - /* At this point session must have either created new task or it's already assigned. */ + /* At this point session must have either created new task + * or it's already assigned. */ assert(task); assert(len > 0); - /* Start reading DNS/TCP message length */ - knot_pkt_t *pkt_buf = task->pktbuf; - if (task->bytes_remaining == 0 && pkt_buf->size == 0) { - knot_pkt_clear(pkt_buf); - /* Make sure we can process maximum packet sizes over TCP for outbound queries. - * Previous packet is allocated with mempool, so there's no need to free it manually. */ - if (session->outgoing && pkt_buf->max_size < KNOT_WIRE_MAX_PKTSIZE) { - pkt_buf = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, &task->req.pool); - if (!pkt_buf) { - return kr_error(ENOMEM); - } - task->pktbuf = pkt_buf; - } - /* Read only one byte as TCP fragment may end at a 1B boundary - * which would lead to OOB read or improper reassembly length. */ - pkt_buf->size = 1; - pkt_buf->wire[0] = msg[0]; - len -= 1; - msg += 1; - if (len == 0) { - return 0; - } - } - /* Finish reading DNS/TCP message length. */ - if (task->bytes_remaining == 0 && pkt_buf->size == 1) { - pkt_buf->wire[1] = msg[0]; - ssize_t nbytes = msg_size(pkt_buf->wire); - len -= 1; - msg += 1; - /* Cut off fragment length and start reading DNS message. */ - pkt_buf->size = 0; - task->bytes_remaining = nbytes; - } ++ /* Message is too long, can't process it. */ ssize_t to_read = MIN(len, task->bytes_remaining); if (pkt_buf->size + to_read > pkt_buf->max_size) { @@@ -1036,49 -2177,45 +2190,57 @@@ return submitted; } -int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, - struct kr_qflags options, worker_cb_t on_complete, - void *baton) +struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options) { if (!worker || !query) { - assert(false); - return kr_error(EINVAL); + return NULL; } - struct qr_task *task = qr_task_create(worker, NULL, NULL); + struct request_ctx *ctx = request_create(worker, NULL, NULL); + if (!ctx) { - return kr_error(ENOMEM); ++ return NULL; + } + + /* Create task */ + struct qr_task *task = qr_task_create(ctx); if (!task) { + request_free(ctx); - return kr_error(ENOMEM); + return NULL; } - task->baton = baton; - task->on_complete = on_complete; + - int ret = qr_task_start(task, query); + /* Start task */ + int ret = request_start(ctx, query); + if (ret != 0) { ++ request_free(ctx); + qr_task_unref(task); + return NULL; + } /* Set options late, as qr_task_start() -> kr_resolve_begin() rewrite it. */ - kr_qflags_set(&task->req.options, options); + kr_qflags_set(&task->ctx->req.options, options); + return task; +} - if (ret != 0) { - request_free(ctx); - qr_task_unref(task); - return ret; +int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query) +{ + if (!task) { + return kr_error(EINVAL); } return qr_task_step(task, NULL, query); } - int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options) ++struct kr_request *worker_task_request(struct qr_task *task) +{ - if (!worker || !query) { - return kr_error(EINVAL); ++ if (!task || !task->ctx) { ++ return NULL; + } + - /* Create task */ - struct qr_task *task = worker_resolve_start(worker, query, options); - if (!task) { - return kr_error(ENOMEM); - } ++ return &task->ctx->req; ++} + - return worker_resolve_exec(task, query); + void worker_session_close(struct session *session) + { + session_close(session); } /** Reserve worker buffers */ diff --cc daemon/worker.h index e88cfdd25,231b23975..b77281fa2 --- a/daemon/worker.h +++ b/daemon/worker.h @@@ -21,9 -21,15 +21,12 @@@ #include "lib/generic/map.h" + /** Query resolution task (opaque). */ + struct qr_task; /** Worker state (opaque). */ struct worker_ctx; - struct qr_task; + /** Transport session (opaque). */ + struct session; -/** Union of various libuv objects for freelist. */ -/** Worker callback */ -typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, void *baton); /** Create and initialize the worker. */ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool, @@@ -52,29 -64,16 +61,22 @@@ int worker_process_tcp(struct worker_ct int worker_end_tcp(struct worker_ctx *worker, uv_handle_t *handle); /** - * Schedule query for resolution. + * Start query resolution with given query. * - * After resolution finishes, invoke on_complete with baton. - * @return 0 or an error code + * @return task or NULL + */ +struct qr_task *worker_resolve_start(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options); + +/** + * Execute a request with given query. + * It expects task to be created with \fn worker_resolve_start. * - * @note the options passed are |-combined with struct kr_context::options - * @todo maybe better semantics for this? + * @return 0 or an error code */ -int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options, - worker_cb_t on_complete, void *baton); +int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query); + - /** - * Schedule query for resolution. - * - * @return 0 or an error code - * - * @note the options passed are |-combined with struct kr_context::options - * @todo maybe better semantics for this? - */ - int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, struct kr_qflags options); ++/** @return struct kr_request associated with opaque task */ ++struct kr_request *worker_task_request(struct qr_task *task); /** Collect worker mempools */ void worker_reclaim(struct worker_ctx *worker); diff --cc lib/layer/iterate.c index e1c0cf106,9a9d31140..aa4d98cd6 --- a/lib/layer/iterate.c +++ b/lib/layer/iterate.c @@@ -824,7 -823,7 +824,7 @@@ int kr_make_query(struct kr_query *quer char name_str[KNOT_DNAME_MAXLEN], type_str[16]; knot_dname_to_str(name_str, query->sname, sizeof(name_str)); knot_rrtype_to_string(query->stype, type_str, sizeof(type_str)); - QVERBOSE_MSG(query, "'%s' type '%s' created outbound query, parent id %hu\n", - QVERBOSE_MSG(query, "'%s' type '%s' id was assigned, parent id %u\n", ++ QVERBOSE_MSG(query, "'%s' type '%s' id was assigned, parent id %hu\n", name_str, type_str, query->parent ? query->parent->id : 0); } return kr_ok(); diff --cc modules/http/http_trace.lua index b57295034,000000000..9e4d4667d mode 100644,000000..100644 --- a/modules/http/http_trace.lua +++ b/modules/http/http_trace.lua @@@ -1,109 -1,0 +1,109 @@@ +local ffi = require('ffi') +local bit = require('bit') +local condition = require('cqueues.condition') + +-- Buffer selected record information to a table +local function add_selected_records(dst, records) + for _, rec in ipairs(records) do + local rank = rec.rank + -- Separate the referral chain verified flag + local verified = bit.band(rec.rank, kres.rank.AUTH) + if verified then + rank = bit.band(rank, bit.bnot(kres.rank.AUTH)) + end - local rank_name = kres.rank_tostring[rank] or tostring(rank) ++ local rank_name = kres.tostring.rank[rank] or tostring(rank) + -- Write out each individual RR + for rr in tostring(rec.rr):gmatch('[^\n]+\n?') do + local row = string.format('cached: %s, rank: %s, record: %s', + rec.cached, rank_name:lower(), rr) + table.insert(dst, row) + end + end +end + +local function format_selected_records(header, records) + if #records == 0 then return '' end + return string.format('%s\n%s\n', header, string.rep('-', #header)) + .. table.concat(records, '') .. '\n' +end + +-- Trace execution of DNS queries +local function serve_trace(h, _) + local path = h:get(':path') + local qname, qtype_str = path:match('/trace/([^/]+)/?([^/]*)') + if not qname then + return 400, 'expected /trace//' + end + + -- Parse query type (or default to A) + if not qtype_str or #qtype_str == 0 then + qtype_str = 'A' + end + + local qtype = kres.type[qtype_str] + if not qtype then + return 400, string.format('unexpected query type: %s', qtype_str) + end + + -- Create logging handler callback + local buffer = {} + local buffer_log_cb = ffi.cast('trace_log_f', function (query, source, msg) + local message = string.format('[%5s] [%s] %s', + query.id, ffi.string(source), ffi.string(msg)) + table.insert(buffer, message) + end) + + -- Wait for the result of the query + -- Note: We can't do non-blocking write to stream directly from resolve callbacks + -- because they don't run inside cqueue. + local answers, authority = {}, {} + local cond = condition.new() + local waiting, done = false, false + local finish_cb = ffi.cast('trace_callback_f', function (req) + req = kres.request_t(req) + add_selected_records(answers, req.answ_selected) + add_selected_records(authority, req.auth_selected) + if waiting then + cond:signal() + end + done = true + end) + + -- Resolve query and buffer logs into table + resolve { + name = qname, + type = qtype, + options = {'TRACE'}, + init = function (req) + req = kres.request_t(req) + req.trace_log = buffer_log_cb + req.trace_finish = finish_cb + end + } + + -- Wait for asynchronous query and free callbacks + if not done then + waiting = true + cond:wait() + end + + buffer_log_cb:free() + finish_cb:free() + + -- Build the result + local result = table.concat(buffer, '') .. '\n' + .. format_selected_records('Used records from answer:', answers) + .. format_selected_records('Used records from authority:', authority) + -- Return buffered data + if not done then + return 504, result + end + return result +end + +-- Export endpoints +return { + endpoints = { + ['/trace'] = {'text/plain', serve_trace}, + } +}