{
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
-
- /* UDP requests are oneshot, always close afterwards */
- if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */
- io_close((uv_handle_t *)handle);
- }
-
- /* Check the incoming wire length. */
- if (nread > KNOT_WIRE_HEADER_SIZE) {
- knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
- worker_exec(worker, (uv_handle_t *)handle, query, addr);
- knot_pkt_free(&query);
- }
+ knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
+ worker_exec(worker, (uv_handle_t *)handle, query, addr);
+ knot_pkt_free(&query);
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
- /* Check for connection close */
- if (nread <= 0) {
+ /* Check for originator connection close */
+ if (nread <= 0 && handle->data == 0) {
io_close((uv_handle_t *)handle);
return;
} else if (nread < 2) {
/* Not enough bytes to read length */
+ worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
- /* Set packet size */
/** @todo This is not going to work if the packet is fragmented in the stream ! */
uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
-
- /* Check if there's enough data and execute */
if (nbytes + 2 < nread) {
+ worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
+
knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, NULL);
knot_pkt_free(&query);
static void qr_task_timeout(uv_timer_t *req)
{
struct qr_task *task = req->data;
- if (!uv_is_closing(task->next_handle)) {
+ if (task->next_handle) {
io_stop_read(task->next_handle);
- uv_close(task->next_handle, (uv_close_cb) free);
qr_task_step(task, NULL);
}
}
io_start_read(task->next_handle);
}
} else { /* Finalize task */
+ uv_timer_stop(&task->timeout);
uv_close((uv_handle_t *)&task->timeout, qr_task_free);
}
}
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
- /* Cancel timeout if active */
- uv_timer_stop(&task->timeout);
- task->next_handle = NULL;
+ /* Cancel timeout if active, close handle. */
+ if (task->next_handle) {
+ uv_close(task->next_handle, (uv_close_cb) free);
+ uv_timer_stop(&task->timeout);
+ task->next_handle = NULL;
+ }
/* Consume input and produce next query */
int sock_type = -1;
/* We're done, no more iterations needed */
if (state & (KNOT_STATE_DONE|KNOT_STATE_FAIL)) {
return qr_task_finalize(task, state);
- }
-
- /* Iteration limit */
- if (++task->iter_count > KR_ITER_LIMIT) {
+ } else if (++task->iter_count > KR_ITER_LIMIT) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
if (sock_type == SOCK_STREAM) {
uv_connect_t *connect = &task->ioreq.connect;
if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
- uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
connect->data = task;
} else {
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
- uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
}
- /* Start next timeout */
+ /* Start next step with timeout */
uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
-
return kr_ok();
}
/* Parse query */
int ret = parse_query(query);
- if (ret != 0) {
- return ret;
- }
/* Start new task on master sockets, or resume existing */
struct qr_task *task = handle->data;
bool is_master_socket = (!task);
if (is_master_socket) {
- if (knot_wire_get_qr(query->wire)) {
+ /* Ignore badly formed queries or responses. */
+ if (ret != 0 || knot_wire_get_qr(query->wire)) {
return kr_error(EINVAL); /* Ignore. */
}
task = qr_task_create(worker, handle, addr);