From: Oto Šťáva Date: Wed, 3 Aug 2022 08:03:44 +0000 (+0200) Subject: daemon: TCP connection timeouts with protolayers X-Git-Tag: v6.0.2~42^2~65 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ea90ceaea90bbcf57daea19688abf9b492c3b3a8;p=thirdparty%2Fknot-resolver.git daemon: TCP connection timeouts with protolayers --- diff --git a/daemon/io.c b/daemon/io.c index c9fcc0eb2..8a498174b 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -241,7 +241,6 @@ static int pl_tcp_sess_deinit(struct protolayer_manager *manager, struct protola static enum protolayer_cb_result pl_tcp_unwrap_timeout( struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) { - /* TODO - connecting timeout? */ struct session2 *s = ctx->manager->session; if (kr_fails_assert(!s->closing)) @@ -261,8 +260,7 @@ static enum protolayer_cb_result pl_tcp_unwrap_timeout( session2_timer_stop(s); session2_timer_start(s, KR_RESOLVE_TIME_LIMIT / 2, - KR_RESOLVE_TIME_LIMIT / 2, - PROTOLAYER_UNWRAP); + KR_RESOLVE_TIME_LIMIT / 2); } else { /* Normally it should not happen, * but better to check if there anything in this list. */ @@ -279,9 +277,7 @@ static enum protolayer_cb_result pl_tcp_unwrap_timeout( if (idle_time < idle_in_timeout) { idle_in_timeout -= idle_time; session2_timer_stop(s); - session2_timer_start(s, - idle_in_timeout, idle_in_timeout, - PROTOLAYER_UNWRAP); + session2_timer_start(s, idle_in_timeout, idle_in_timeout); } else { struct sockaddr *peer = session2_get_peer(s); char *peer_str = kr_straddr(peer); @@ -291,7 +287,7 @@ static enum protolayer_cb_result pl_tcp_unwrap_timeout( worker_del_tcp_waiting(peer); worker_del_tcp_connected(peer); } - session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), NULL, NULL, NULL); + session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL); } } @@ -553,8 +549,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) uv_strerror(nread)); } worker_end_tcp(s); - session2_unwrap(s, protolayer_event_nd(PROTOLAYER_EVENT_FORCE_CLOSE), - NULL, NULL, NULL); + session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL); return; } @@ -688,9 +683,7 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp if (uv_accept(master, (uv_stream_t *)client) != 0) { /* close session, close underlying uv handles and * deallocate (or return to memory pool) memory. */ - session2_unwrap(s, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL); return; } @@ -700,18 +693,14 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp int sa_len = sizeof(struct sockaddr_in6); int ret = uv_tcp_getpeername(client, sa, &sa_len); if (ret || sa->sa_family == AF_UNSPEC) { - session2_unwrap(s, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL); return; } sa = session2_get_sockname(s); sa_len = sizeof(struct sockaddr_in6); ret = uv_tcp_getsockname(client, sa, &sa_len); if (ret || sa->sa_family == AF_UNSPEC) { - session2_unwrap(s, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL); return; } @@ -776,7 +765,7 @@ static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp // } // } //#endif - session2_timer_start(s, timeout, idle_in_timeout, PROTOLAYER_UNWRAP); + session2_timer_start(s, timeout, idle_in_timeout); io_start_read((uv_handle_t *)client); } diff --git a/daemon/session2.c b/daemon/session2.c index 49dcecbda..9637b785f 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -206,9 +206,11 @@ static int protolayer_step(struct protolayer_cb_ctx *ctx) result = cb(ldata, ctx); else ctx->action = PROTOLAYER_CB_ACTION_CONTINUE; + ldata->processed = true; } else { - kr_assert(false && "Repeated protocol layer step"); + //kr_assert(false && "Repeated protocol layer step"); + kr_log_debug(PROTOLAYER, "Repeated protocol layer step\n"); } if (kr_fails_assert(result == PROTOLAYER_CB_RESULT_MAGIC)) { @@ -522,8 +524,8 @@ int wire_buf_movestart(struct wire_buf *wb) size_t len = wire_buf_data_length(wb); if (len) memmove(wb->buf, wire_buf_data(wb), len); - wb->end -= wb->start; wb->start = 0; + wb->end = len; return kr_ok(); } @@ -642,15 +644,11 @@ uv_handle_t *session2_get_handle(struct session2 *s) static void session2_on_timeout(uv_timer_t *timer) { struct session2 *s = timer->data; - protolayer_manager_submit(s->layers, s->timer_direction, - protolayer_event_nd(PROTOLAYER_EVENT_TIMEOUT), - NULL, NULL, NULL); + session2_event(s, PROTOLAYER_EVENT_TIMEOUT, NULL); } -int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat, - enum protolayer_direction direction) +int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat) { - s->timer_direction = direction; return uv_timer_start(&s->timer, session2_on_timeout, timeout, repeat); } @@ -1079,7 +1077,8 @@ struct event_ctx { static void session2_transport_io_event_finished(uv_handle_t *handle) { - struct event_ctx *ctx = handle->data; + struct session2 *s = handle->data; + struct event_ctx *ctx = s->data; if (ctx->cb) ctx->cb(kr_ok(), ctx->session, ctx->target, ctx->baton); free(ctx); @@ -1100,7 +1099,7 @@ static int session2_handle_close(struct session2 *s, uv_handle_t *handle, struct event_ctx *ctx) { io_stop_read(handle); - handle->data = ctx; + s->data = ctx; uv_close(handle, session2_transport_io_event_finished); return kr_ok(); @@ -1145,11 +1144,20 @@ static int session2_transport_event(struct session2 *s, if (is_close_event) return session2_handle_close(s, handle, ctx); + if (ctx->cb) + ctx->cb(kr_ok(), s, target, baton); + + free(ctx); return kr_ok(); - case SESSION2_TRANSPORT_PARENT: - session2_wrap(s, protolayer_event(event), target, + case SESSION2_TRANSPORT_PARENT:; + int ret = session2_wrap(s, protolayer_event(event), target, session2_transport_parent_event_finished, ctx); + if (ret < 0) { + free(ctx); + return ret; + } + return kr_ok(); default: @@ -1169,7 +1177,5 @@ void session2_kill_ioreq(struct session2 *session, struct qr_task *task) return; session2_tasklist_del(session, task); if (session->transport.io.handle->type == UV_UDP) - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); } diff --git a/daemon/session2.h b/daemon/session2.h index efb40897a..fbbd2b194 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -145,7 +145,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session, * session - i.e. layers SHOULD NOT add * any disconnection ceremony, if * avoidable. */\ - XX(TIMEOUT) /**< Signal that the session has timed out. */ + XX(TIMEOUT) /**< Signal that the session has timed out. */\ + XX(CONNECT) /**< Signal that a connection has been established. */ /** Event type, to be interpreted by a layer. */ enum protolayer_event_type { @@ -506,21 +507,28 @@ struct session2 { struct protolayer_manager *layers; /**< Protocol layers of this session. */ knot_mm_t pool; - uv_timer_t timer; - enum protolayer_direction timer_direction; /**< Timeout event direction. */ + uv_timer_t timer; /**< For session-wide timeout events. */ - trie_t *tasks; /**< list of tasks associated with given session. */ - queue_t(struct qr_task *) waiting; /**< list of tasks waiting for sending to upstream. */ + trie_t *tasks; /**< List of tasks associated with given session. */ + queue_t(struct qr_task *) waiting; /**< List of tasks waiting for + * sending to upstream. */ struct wire_buf wire_buf; uint64_t last_activity; /**< Time of last IO activity (if any occurs). * Otherwise session creation time. */ - bool closing : 1; - bool throttled : 1; - bool outgoing : 1; - bool secure : 1; /**< Whether encryption takes place in this session. + void *data; /**< Pointer to arbitrary data for callbacks. */ + + bool outgoing : 1; /**< True: session's transport is towards an upstream + * server. Otherwise, it is towards a client + * connected to the resolver. */ + bool closing : 1; /**< True: session is at the end of its lifecycle and + * is going to close. */ + bool connected : 1; /**< For connection-based sessions. True: connection + * is established. */ + bool throttled : 1; /**< True: session is being rate-limited. */ + bool secure : 1; /**< True: encryption takes place in this session. * Layers may use this to determine whether padding * should be applied. */ }; @@ -582,12 +590,11 @@ struct sockaddr *session2_get_sockname(struct session2 *s); * needed. * * May return `NULL` if no peer is set. */ -uv_handle_t *session2_get_handle(struct session2 *s); +KR_EXPORT uv_handle_t *session2_get_handle(struct session2 *s); /** Start the session timer. When the timer ends, a `_TIMEOUT` event is sent - * in the specified `direction`. */ -int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat, - enum protolayer_direction direction); + * in the `_UNWRAP` direction. */ +int session2_timer_start(struct session2 *s, uint64_t timeout, uint64_t repeat); /** Restart the session timer without changing any of its parameters. */ int session2_timer_restart(struct session2 *s); @@ -663,6 +670,20 @@ int session2_unwrap(struct session2 *s, struct protolayer_payload payload, int session2_wrap(struct session2 *s, struct protolayer_payload payload, const void *target, protolayer_finished_cb cb, void *baton); +/** Convenience function to send the specified event to be processed in the + * `unwrap` direction. `data` may be `NULL`. + * + * See `session2_unwrap` for more information. */ +static inline int session2_event(struct session2 *s, + enum protolayer_event_type type, void *data) +{ + struct protolayer_event event = { + .type = type, + .data = { .ptr = data } + }; + return session2_unwrap(s, protolayer_event(event), NULL, NULL, NULL); +} + /** Removes the specified request task from the session's tasklist. The session * must be outgoing. If the session is UDP, a signal to close is also sent to it. */ void session2_kill_ioreq(struct session2 *session, struct qr_task *task); diff --git a/daemon/worker.c b/daemon/worker.c index f946fc761..24fd8b326 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -105,8 +105,6 @@ static int qr_task_send(struct qr_task *task, struct session2 *session, static int qr_task_finalize(struct qr_task *task, int state); static void qr_task_complete(struct qr_task *task); static int worker_add_tcp_waiting(const struct sockaddr* addr, struct session2 *session); -static void on_tcp_connect_timeout(uv_timer_t *timer); -static void on_udp_timeout(uv_timer_t *timer); static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt); @@ -128,9 +126,8 @@ static uv_handle_t *ioreq_spawn(int socktype, sa_family_t family, /* Create connection for iterative query */ uv_handle_t *handle = malloc(socktype == SOCK_DGRAM ? sizeof(uv_udp_t) : sizeof(uv_tcp_t)); - if (!handle) { - return NULL; - } + kr_require(handle); + int ret = io_create(the_worker->loop, handle, socktype, family, grp, true); if (ret) { if (ret == UV_EMFILE) { @@ -351,6 +348,7 @@ static struct request_ctx *request_create(struct session2 *session, req->qsource.comm_flags.tcp = session2_get_handle(session)->type == UV_TCP; req->qsource.comm_flags.tls = session->secure; // req->qsource.comm_flags.http = session->has_http; /* TODO */ + req->qsource.comm_flags.http = false; req->qsource.flags = req->qsource.comm_flags; if (proxy) { @@ -590,7 +588,7 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status) if (kr_fails_assert(qry && task->transport)) return status; size_t timeout = task->transport->timeout; - int ret = session2_timer_start(s, timeout, 0, PROTOLAYER_UNWRAP); + int ret = session2_timer_start(s, timeout, 0); /* Start next step with timeout, fatal if can't start a timer. */ if (ret != 0) { subreq_finalize(task, &task->transport->address.ip, task->pktbuf); @@ -615,6 +613,7 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status) peer_str, uv_strerror(status)); } worker_end_tcp(s); + session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL); return status; } @@ -633,24 +632,6 @@ int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status) return status; } -static void on_send(uv_udp_send_t *req, int status) -{ - struct qr_task *task = req->data; - uv_handle_t *h = (uv_handle_t *)req->handle; - qr_task_on_send(task, h, status); - qr_task_unref(task); - free(req); -} - -static void on_write(uv_write_t *req, int status) -{ - struct qr_task *task = req->data; - uv_handle_t *h = (uv_handle_t *)req->handle; - qr_task_on_send(task, h, status); - qr_task_unref(task); - free(req); -} - static void qr_task_wrap_finished(int status, struct session2 *session, const void *target, void *baton) { struct qr_task *task = baton; @@ -933,9 +914,7 @@ static int send_waiting(struct session2 *session) session2_waitinglist_finalize(session, KR_STATE_FAIL); session2_tasklist_finalize(session, KR_STATE_FAIL); worker_del_tcp_connected(peer); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); break; } session2_waitinglist_pop(session, true); @@ -978,9 +957,7 @@ static void on_connect(uv_connect_t *req, int status) } kr_assert(session2_tasklist_is_empty(session)); session2_waitinglist_retry(session, false); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return; } @@ -997,9 +974,7 @@ static void on_connect(uv_connect_t *req, int status) } kr_assert(session2_tasklist_is_empty(session)); session2_waitinglist_retry(session, false); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return; } @@ -1020,9 +995,7 @@ static void on_connect(uv_connect_t *req, int status) } kr_assert(session2_tasklist_is_empty(session)); session2_waitinglist_retry(session, false); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return; } @@ -1034,9 +1007,7 @@ static void on_connect(uv_connect_t *req, int status) * something gone wrong */ session2_waitinglist_finalize(session, KR_STATE_FAIL); kr_assert(session2_tasklist_is_empty(session)); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return; } } @@ -1047,7 +1018,7 @@ static void on_connect(uv_connect_t *req, int status) } /* TODO */ -// session->connected = true; + session->connected = true; session2_start_read(session); int ret = kr_ok(); @@ -1070,76 +1041,7 @@ static void on_connect(uv_connect_t *req, int status) } session2_timer_stop(session); - session2_timer_start(session, - MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY, - PROTOLAYER_UNWRAP); -} - -static void on_tcp_connect_timeout(uv_timer_t *timer) -{ - struct session2 *session = timer->data; - - uv_timer_stop(timer); - kr_require(the_worker); - - kr_assert(session2_tasklist_is_empty(session)); - - struct sockaddr *peer = session2_get_peer(session); - worker_del_tcp_waiting(peer); - - struct qr_task *task = session2_waitinglist_get(session); - if (!task) { - /* Normally shouldn't happen. */ - const char *peer_str = kr_straddr(peer); - VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n", - peer_str ? peer_str : ""); - return; - } - - struct kr_query *qry = task_get_last_pending_query(task); - if (kr_log_is_debug_qry(WORKER, qry)) { - const char *peer_str = kr_straddr(peer); - VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n", - peer_str ? peer_str : ""); - } - - qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT); - - the_worker->stats.timeout += session2_waitinglist_get_len(session); - session2_waitinglist_retry(session, true); - kr_assert(session2_tasklist_is_empty(session)); - /* uv_cancel() doesn't support uv_connect_t request, - * so that we can't cancel it. - * There still exists possibility of successful connection - * for this request. - * So connection callback (on_connect()) must check - * if connection is in the list of waiting connection. - * If no, most likely this is timed out connection even if - * it was successful. */ -} - -/* This is called when I/O timeouts */ -static void on_udp_timeout(uv_timer_t *timer) -{ - struct session2 *session = timer->data; - kr_assert(session2_get_handle(session)->data == session); - kr_assert(session2_tasklist_get_len(session) == 1); - kr_assert(session2_waitinglist_is_empty(session)); - - uv_timer_stop(timer); - - struct qr_task *task = session2_tasklist_get_first(session); - if (!task) - return; - - if (task->leading && task->pending_count > 0) { - struct kr_query *qry = array_tail(task->ctx->req.rplan.pending); - qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT); - } - - task->timeouts += 1; - the_worker->stats.timeout += 1; - qr_task_step(task, NULL, NULL); + session2_timer_start(session, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY); } static uv_handle_t *transmit(struct qr_task *task) @@ -1174,9 +1076,7 @@ static uv_handle_t *transmit(struct qr_task *task) memcpy(peer, addr, kr_sockaddr_len(addr)); if (qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf) != 0) { - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); ret = NULL; } else { task->pending[task->pending_count] = session; @@ -1388,9 +1288,7 @@ static int qr_task_finalize(struct qr_task *task, int state) * (ie. task->leading is true) */ worker_task_unref(t); } - session2_unwrap(source_session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(source_session, PROTOLAYER_EVENT_CLOSE, NULL); } qr_task_unref(task); @@ -1459,9 +1357,7 @@ static int tcp_task_existing_connection(struct session2 *session, struct qr_task * close connection to upstream. */ session2_tasklist_finalize(session, KR_STATE_FAIL); worker_del_tcp_connected(session2_get_peer(session)); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return kr_error(EINVAL); } @@ -1471,6 +1367,7 @@ static int tcp_task_existing_connection(struct session2 *session, struct qr_task static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr *addr) { /* Check if there must be TLS */ + /* TODO: tls */ // struct tls_client_ctx *tls_ctx = NULL; // tls_client_param_t *entry = tls_client_param_get( // the_network->tls_client_params, addr); @@ -1515,9 +1412,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr int ret = worker_add_tcp_waiting(addr, session); if (ret < 0) { free(conn); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return kr_error(EINVAL); } @@ -1527,14 +1422,11 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr memcpy(peer, addr, kr_sockaddr_len(addr)); /* Start watchdog to catch eventual connection timeout. */ - ret = session2_timer_start(session, - KR_CONN_RTT_MAX, 0, PROTOLAYER_UNWRAP); + ret = session2_timer_start(session, KR_CONN_RTT_MAX, 0); if (ret != 0) { worker_del_tcp_waiting(addr); free(conn); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return kr_error(EINVAL); } @@ -1550,9 +1442,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr session2_timer_stop(session); worker_del_tcp_waiting(addr); free(conn); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_FAILED); return kr_error(EAGAIN); } @@ -1564,9 +1454,7 @@ static int tcp_task_make_connection(struct qr_task *task, const struct sockaddr session2_timer_stop(session); worker_del_tcp_waiting(addr); free(conn); - session2_unwrap(session, - protolayer_event_nd(PROTOLAYER_EVENT_CLOSE), - NULL, NULL, NULL); + session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL); return kr_error(EINVAL); } @@ -1752,7 +1640,7 @@ int worker_submit(struct session2 *session, struct comm_info *comm, struct http_ctx *http_ctx = NULL; #if ENABLE_DOH2 - /* TODO: devise a way to do this... don't know yet */ + /* TODO: http. Devise a way to do this... don't know yet */ // http_ctx = session_http_get_server_ctx(session); // // /* Badly formed query when using DoH leads to a Bad Request */ @@ -1912,6 +1800,7 @@ int worker_end_tcp(struct session2 *session) worker_del_tcp_waiting(peer); worker_del_tcp_connected(peer); + session->connected = false; while (!session2_waitinglist_is_empty(session)) { struct qr_task *task = session2_waitinglist_pop(session, false); @@ -2136,7 +2025,8 @@ void worker_task_pkt_set_msgid(struct qr_task *task, uint16_t msgid) knot_pkt_t *pktbuf = worker_task_get_pktbuf(task); knot_wire_set_id(pktbuf->wire, msgid); struct kr_query *q = task_get_last_pending_query(task); - q->id = msgid; + if (q) + q->id = msgid; } uint64_t worker_task_creation_time(struct qr_task *task) @@ -2190,16 +2080,46 @@ static inline knot_pkt_t *produce_packet_dgram(char *buf, size_t buf_len) return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool); } -static enum protolayer_cb_result pl_dns_dgram_unwrap( +static enum protolayer_cb_result pl_dns_dgram_unwrap_timeout( struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) { struct session2 *session = ctx->manager->session; + kr_assert(session2_get_handle(session)->data == session); + kr_assert(session2_tasklist_get_len(session) == 1); + kr_assert(session2_waitinglist_is_empty(session)); + + session2_timer_stop(session); + + struct qr_task *task = session2_tasklist_get_first(session); + if (!task) + return protolayer_continue(ctx); + + if (task->leading && task->pending_count > 0) { + struct kr_query *qry = array_tail(task->ctx->req.rplan.pending); + qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT); + } + + task->timeouts += 1; + the_worker->stats.timeout += 1; + qr_task_step(task, NULL, NULL); + + return protolayer_continue(ctx); +} + +static enum protolayer_cb_result pl_dns_dgram_unwrap( + struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) { + if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT) + return pl_dns_dgram_unwrap_timeout(layer, ctx); + /* pass thru */ return protolayer_continue(ctx); } + struct session2 *session = ctx->manager->session; + if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) { int ret = kr_ok(); for (int i = 0; i < ctx->payload.iovec.cnt; i++) { @@ -2294,10 +2214,61 @@ static int pl_dns_stream_iter_deinit(struct protolayer_manager *manager, return kr_ok(); } +static enum protolayer_cb_result pl_dns_stream_unwrap_timeout( + struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) +{ + struct session2 *session = ctx->manager->session; + if (session->connected || session->closing) + return protolayer_continue(ctx); + + /* Connection timeout */ + session2_timer_stop(session); + + kr_assert(session2_tasklist_is_empty(session)); + + struct sockaddr *peer = session2_get_peer(session); + worker_del_tcp_waiting(peer); + + struct qr_task *task = session2_waitinglist_get(session); + if (!task) { + /* Normally shouldn't happen. */ + const char *peer_str = kr_straddr(peer); + VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n", + peer_str ? peer_str : ""); + return protolayer_continue(ctx); + } + + struct kr_query *qry = task_get_last_pending_query(task); + if (kr_log_is_debug_qry(WORKER, qry)) { + const char *peer_str = kr_straddr(peer); + VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n", + peer_str ? peer_str : ""); + } + + qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT); + + the_worker->stats.timeout += session2_waitinglist_get_len(session); + session2_waitinglist_retry(session, true); + kr_assert(session2_tasklist_is_empty(session)); + /* uv_cancel() doesn't support uv_connect_t request, + * so that we can't cancel it. + * There still exists possibility of successful connection + * for this request. + * So connection callback (on_connect()) must check + * if connection is in the list of waiting connection. + * If no, most likely this is timed out connection even if + * it was successful. */ + + return protolayer_continue(ctx); +} + static enum protolayer_cb_result pl_dns_stream_unwrap( struct protolayer_data *layer, struct protolayer_cb_ctx *ctx) { if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) { + if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT) + return pl_dns_stream_unwrap_timeout(layer, ctx); + /* pass thru */ return protolayer_continue(ctx); }