uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
+ /* UDP requests are oneshot, always close afterwards */
+ if (handle->data && !uv_is_closing((uv_handle_t *)handle)) { /* Do not free master socket */
+ uv_close((uv_handle_t *)handle, handle_free);
+ }
+
/* Check the incoming wire length. */
if (nread > KNOT_WIRE_HEADER_SIZE) {
knot_pkt_t *query = knot_pkt_new(buf->base, nread, worker->mm);
worker_exec(worker, (uv_handle_t *)handle, query, addr);
knot_pkt_free(&query);
}
-
- /* UDP requests are oneshot, always close afterwards */
- if (handle->data) { /* Do not free master socket */
- uv_close((uv_handle_t *)handle, handle_free);
- }
}
int udp_bind(struct endpoint *ep, struct sockaddr *addr)
void udp_unbind(struct endpoint *ep)
{
uv_udp_t *handle = &ep->udp;
- uv_udp_recv_stop(handle);
uv_close((uv_handle_t *)handle, NULL);
}
-static void tcp_unbind_handle(uv_handle_t *handle)
-{
- uv_read_stop((uv_stream_t *)handle);
-}
-
static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
uv_loop_t *loop = handle->loop;
return;
}
- uv_read_start(client, handle_getbuf, tcp_recv);
+ io_start_read((uv_handle_t *)client);
}
int tcp_bind(struct endpoint *ep, struct sockaddr *addr)
void tcp_unbind(struct endpoint *ep)
{
- tcp_unbind_handle((uv_handle_t *)&ep->tcp);
uv_close((uv_handle_t *)&ep->tcp, NULL);
}
uv_udp_t *handle = handle_alloc(loop, sizeof(*handle));
if (handle) {
uv_udp_init(loop, handle);
- uv_udp_recv_start(handle, &handle_getbuf, &udp_recv);
}
return (uv_handle_t *)handle;
} else {
return (uv_handle_t *)handle;
}
}
+
+int io_start_read(uv_handle_t *handle)
+{
+ if (handle->type == UV_UDP) {
+ return uv_udp_recv_start((uv_udp_t *)handle, &handle_getbuf, &udp_recv);
+ } else {
+ return uv_read_start((uv_stream_t *)handle, &handle_getbuf, &tcp_recv);
+ }
+}
+
+int io_stop_read(uv_handle_t *handle)
+{
+ if (handle->type == UV_UDP) {
+ return uv_udp_recv_stop((uv_udp_t *)handle);
+ } else {
+ return uv_read_stop((uv_stream_t *)handle);
+ }
+}
\ No newline at end of file
{
struct kr_request req;
knot_pkt_t *next_query;
+ uv_handle_t *next_handle;
+ uv_timer_t timeout;
union {
uv_write_t tcp_send;
uv_udp_send_t udp_send;
task->next_query = next_query;
/* Start resolution */
+ uv_timer_init(handle->loop, &task->timeout);
+ task->timeout.data = task;
kr_resolve_begin(&task->req, &engine->resolver, answer);
return task;
}
+static void qr_task_close(uv_handle_t *handle)
+{
+ struct qr_task *task = handle->data;
+ mp_delete(task->req.pool.ctx);
+}
+
+static void qr_task_timeout(uv_timer_t *req)
+{
+ struct qr_task *task = req->data;
+ if (!uv_is_closing(task->next_handle)) {
+ io_stop_read(task->next_handle);
+ uv_close(task->next_handle, (uv_close_cb) free);
+ qr_task_step(task, NULL);
+ }
+}
+
static void qr_task_on_send(uv_req_t* req, int status)
{
struct qr_task *task = req->data;
if (task) {
- /* Failed to send, invalidate */
- if (status != 0) {
- qr_task_step(task, NULL);
- }
- if (task->req.overlay.state == KNOT_STATE_NOOP) {
- mp_delete(task->req.pool.ctx);
+ /* Start reading answer */
+ if (task->req.overlay.state != KNOT_STATE_NOOP) {
+ if (status == 0 && task->next_handle) {
+ io_start_read(task->next_handle);
+ }
+ } else {
+ /* Finalize task */
+ uv_close((uv_handle_t *)&task->timeout, qr_task_close);
}
}
}
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
+ uv_timer_stop(&task->timeout);
qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
{
+ /* Cancel timeout if active */
+ uv_timer_stop(&task->timeout);
+ task->next_handle = NULL;
+
/* Consume input and produce next query */
- assert(task);
int sock_type = -1;
struct sockaddr *addr = NULL;
knot_pkt_t *next_query = task->next_query;
/* Create connection for iterative query */
uv_handle_t *source_handle = task->source.handle;
- uv_handle_t *next_handle = io_create(source_handle->loop, sock_type);
- if (next_handle == NULL) {
+ task->next_handle = io_create(source_handle->loop, sock_type);
+ if (task->next_handle == NULL) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
/* Connect or issue query datagram */
- next_handle->data = task;
+ task->next_handle->data = task;
if (sock_type == SOCK_STREAM) {
uv_connect_t *connect = &task->ioreq.connect;
- if (uv_tcp_connect(connect, (uv_tcp_t *)next_handle, addr, qr_task_on_connect) != 0) {
- uv_close(next_handle, (uv_close_cb) free);
+ if (uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
+ uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
connect->data = task;
} else {
- if (qr_task_send(task, next_handle, addr, next_query) != 0) {
- uv_close(next_handle, (uv_close_cb) free);
+ if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
+ uv_close(task->next_handle, (uv_close_cb) free);
return qr_task_step(task, NULL);
}
}
+ /* Start next timeout */
+ uv_timer_start(&task->timeout, qr_task_timeout, KR_CONN_RTT_MAX, 0);
+
return kr_ok();
}