{
struct kr_request req;
struct worker_ctx *worker;
- knot_pkt_t *next_query;
- uv_handle_t *next_handle;
+ knot_pkt_t *pktbuf;
+ uv_req_t *ioreq;
+ uv_handle_t *iohandle;
uv_timer_t timeout;
struct {
union {
/* Forward decls */
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet);
-static int parse_query(knot_pkt_t *query)
+/** @internal Get singleton worker. */
+static inline struct worker_ctx *get_worker(void)
{
- /* Parse query packet. */
- int ret = knot_pkt_parse(query, 0);
- if (ret != KNOT_EOK) {
- return kr_error(EPROTO); /* Ignore malformed query. */
- }
-
- /* Check if at least header is parsed. */
- if (query->parsed < query->size) {
- return kr_error(EMSGSIZE);
- }
-
- return kr_ok();
+ return uv_default_loop()->data;
}
static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr *addr)
return NULL;
}
task->req.answer = answer;
- task->next_query = pktbuf;
- task->next_handle = NULL;
+ task->pktbuf = pktbuf;
+ task->ioreq = NULL;
+ task->iohandle = NULL;
task->iter_count = 0;
task->flags = 0;
task->worker = worker;
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
- if (task->next_handle) { /* Handle data may be stale when it completes */
- task->next_handle->data = NULL;
+ if (!uv_is_closing((uv_handle_t *)req)) {
+ if (task->ioreq) { /* Invalidate pending IO request. */
+ task->ioreq->data = NULL;
+ }
qr_task_step(task, NULL);
}
}
-static int qr_task_on_send(struct qr_task *task, int status)
+static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
- if (task) {
- /* Start reading answer */
- if (task->req.overlay.state != KNOT_STATE_NOOP) {
- if (status == 0 && task->next_handle) {
- io_start_read(task->next_handle);
- }
- } else { /* Finalize task */
- uv_timer_stop(&task->timeout);
- uv_close((uv_handle_t *)&task->timeout, qr_task_free);
+ if (task->req.overlay.state != KNOT_STATE_NOOP) {
+ if (status == 0 && handle) {
+ io_start_read(handle); /* Start reading answer */
}
+ } else { /* Finalize task */
+ uv_timer_stop(&task->timeout);
+ uv_close((uv_handle_t *)&task->timeout, qr_task_free);
}
return status;
}
static void on_close(uv_handle_t *handle)
{
- struct qr_task *task = handle->data;
- if (task) {
- ioreq_release(task->worker, (struct ioreq *)handle);
- } else free(handle);
+ struct worker_ctx *worker = get_worker();
+ ioreq_release(worker, (struct ioreq *)handle);
}
static void on_send(uv_udp_send_t *req, int status)
{
+ struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
- qr_task_on_send(task, status);
- ioreq_release(task->worker, (struct ioreq *)req);
- } else free(req);
+ qr_task_on_send(task, (uv_handle_t *)req->handle, status);
+ task->ioreq = NULL;
+ }
+ ioreq_release(worker, (struct ioreq *)req);
}
static void on_write(uv_write_t *req, int status)
{
+ struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
if (task) {
- qr_task_on_send(task, status);
- ioreq_release(task->worker, (struct ioreq *)req);
- } else free(req);
+ qr_task_on_send(task, (uv_handle_t *)req->handle, status);
+ task->ioreq = NULL;
+ }
+ ioreq_release(worker, (struct ioreq *)req);
}
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
if (!handle) {
- return qr_task_on_send(task, kr_error(EIO));
+ return qr_task_on_send(task, handle, kr_error(EIO));
}
- struct ioreq *req = ioreq_take(task->worker);
- if (!req) {
- return qr_task_on_send(task, kr_error(ENOMEM));
+ struct ioreq *send_req = ioreq_take(task->worker);
+ if (!send_req) {
+ return qr_task_on_send(task, handle, kr_error(ENOMEM));
}
+
/* Send using given protocol */
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
- req->as.send.data = task;
- ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
+ send_req->as.send.data = task;
+ ret = uv_udp_send(&send_req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
} else {
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
- req->as.write.data = task;
- ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
+ send_req->as.write.data = task;
+ ret = uv_write(&send_req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
+ }
+ if (ret == 0) {
+ task->ioreq = (uv_req_t *)send_req;
+ } else {
+ ioreq_release(task->worker, send_req);
}
+
/* Update statistics */
if (handle != task->source.handle && addr) {
if (handle->type == UV_UDP)
task->worker->stats.udp += 1;
- else task->worker->stats.tcp += 1;
+ else
+ task->worker->stats.tcp += 1;
if (addr->sa_family == AF_INET6)
task->worker->stats.ipv6 += 1;
- else task->worker->stats.ipv4 += 1;
- }
- if (ret != 0) {
- ioreq_release(task->worker, req);
+ else
+ task->worker->stats.ipv4 += 1;
}
return ret;
}
static void on_connect(uv_connect_t *req, int status)
{
+ struct worker_ctx *worker = get_worker();
struct qr_task *task = req->data;
- if (status == 0) {
- /* Retrieve endpoint IP for statistics */
- struct sockaddr_in6 addr;
- int addrlen = sizeof(addr);
- uv_tcp_getpeername((uv_tcp_t *)req->handle, (struct sockaddr *)&addr, &addrlen);
- qr_task_send(task, (uv_handle_t *)req->handle, (struct sockaddr *)&addr, task->next_query);
- ioreq_release(task->worker, (struct ioreq *)req);
- } else { /* Must not recycle, as 'task' may be freed. */
- free(req);
+ if (task) {
+ task->ioreq = NULL;
+ if (status == 0) {
+ struct sockaddr_in6 addr;
+ int addrlen = sizeof(addr); /* Retrieve endpoint IP for statistics */
+ uv_stream_t *handle = req->handle;
+ uv_tcp_getpeername((uv_tcp_t *)handle, (struct sockaddr *)&addr, &addrlen);
+ qr_task_send(task, (uv_handle_t *)handle, (struct sockaddr *)&addr, task->pktbuf);
+ } else {
+ qr_task_step(task, NULL);
+ }
}
+ ioreq_release(worker, (struct ioreq *)req);
}
static int qr_task_finalize(struct qr_task *task, int state)
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
- /* Cancel timeout if active, close handle. */
- if (task->next_handle) {
- if (!uv_is_closing(task->next_handle)) {
- io_stop_read(task->next_handle);
- uv_close(task->next_handle, on_close);
- }
- uv_timer_stop(&task->timeout);
- task->next_handle = NULL;
+ /* Close subrequest handle. */
+ uv_timer_stop(&task->timeout);
+ if (task->iohandle && !uv_is_closing(task->iohandle)) {
+ io_stop_read(task->iohandle);
+ uv_close(task->iohandle, on_close);
+ task->iohandle = NULL;
}
/* Consume input and produce next query */
int sock_type = -1;
struct sockaddr *addr = NULL;
- knot_pkt_t *next_query = task->next_query;
+ knot_pkt_t *pktbuf = task->pktbuf;
int state = kr_resolve_consume(&task->req, packet);
while (state == KNOT_STATE_PRODUCE) {
- state = kr_resolve_produce(&task->req, &addr, &sock_type, next_query);
+ state = kr_resolve_produce(&task->req, &addr, &sock_type, pktbuf);
if (unlikely(++task->iter_count > KR_ITER_LIMIT)) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
}
/* Create connection for iterative query */
- task->next_handle = (uv_handle_t *)ioreq_take(task->worker);
- if (!task->next_handle) {
+ uv_handle_t *subreq = (uv_handle_t *)ioreq_take(task->worker);
+ if (!subreq) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
+ io_create(task->worker->loop, subreq, sock_type);
+ subreq->data = task;
/* Connect or issue query datagram */
- io_create(task->worker->loop, task->next_handle, sock_type);
- task->next_handle->data = task;
+ task->iohandle = subreq;
if (sock_type == SOCK_DGRAM) {
- if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
+ if (qr_task_send(task, subreq, addr, pktbuf) != 0) {
return qr_task_step(task, NULL);
}
} else {
- struct ioreq *req = ioreq_take(task->worker);
- if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) {
- ioreq_release(task->worker, req);
+ struct ioreq *conn_req = ioreq_take(task->worker);
+ conn_req->as.connect.data = task;
+ if (!conn_req || uv_tcp_connect(&conn_req->as.connect, (uv_tcp_t *)subreq, addr, on_connect) != 0) {
+ ioreq_release(task->worker, conn_req);
return qr_task_step(task, NULL);
}
- req->as.connect.data = task;
+ task->ioreq = (uv_req_t *)conn_req;
}
/* Start next step with timeout */
return kr_ok();
}
+static int parse_query(knot_pkt_t *query)
+{
+ /* Parse query packet. */
+ int ret = knot_pkt_parse(query, 0);
+ if (ret != KNOT_EOK) {
+ return kr_error(EPROTO); /* Ignore malformed query. */
+ }
+
+ /* Check if at least header is parsed. */
+ if (query->parsed < query->size) {
+ return kr_error(EMSGSIZE);
+ }
+
+ return kr_ok();
+}
+
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr)
{
if (!worker) {