From 471bb7a908e9a8739e6956d61f57177d8f1dce44 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Luk=C3=A1=C5=A1=20Ondr=C3=A1=C4=8Dek?= Date: Thu, 14 Nov 2024 18:25:58 +0100 Subject: [PATCH] daemon/session2: add half-closed TCP connection handling --- daemon/io.c | 12 ++++++++++++ daemon/session2.c | 39 ++++++++++++++++++++++++++------------- daemon/session2.h | 6 ++++++ daemon/tls.c | 6 ++++++ daemon/worker.c | 41 +++++++++++++++++++++++++++++++++-------- 5 files changed, 83 insertions(+), 21 deletions(-) diff --git a/daemon/io.c b/daemon/io.c index 98500c347..02ebd5b42 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -338,6 +338,18 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } + // allow deferring EOF for incoming connections to send answer even if half-closed + if (!s->outgoing && (nread == UV_EOF)) { + if (kr_log_is_debug(IO, NULL)) { + struct sockaddr *peer = session2_get_peer(s); + char *peer_str = kr_straddr(peer); + kr_log_debug(IO, "=> connection to '%s' half-closed by peer (EOF)\n", + peer_str ? peer_str : ""); + } + session2_event(s, PROTOLAYER_EVENT_EOF, NULL); + return; + } + if (nread < 0 || !buf->base) { if (kr_log_is_debug(IO, NULL)) { struct sockaddr *peer = session2_get_peer(s); diff --git a/daemon/session2.c b/daemon/session2.c index 40328e132..c372bfb53 100644 --- a/daemon/session2.c +++ b/daemon/session2.c @@ -307,6 +307,18 @@ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue) return false; } +static inline ssize_t session2_get_protocol( + struct session2 *s, enum protolayer_type protocol) +{ + const struct protolayer_grp *grp = &protolayer_grps[s->proto]; + for (ssize_t i = 0; i < grp->num_layers; i++) { + enum protolayer_type found = grp->layers[i]; + if (protocol == found) + return i; + } + + return -1; +} /** Gets layer-specific session data for the layer with the specified index * from the manager. */ @@ -333,6 +345,14 @@ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_sess_data_get(ctx->session, ctx->layer_ix); } +void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) { + ssize_t layer_ix = session2_get_protocol(s, protocol); + if (layer_ix < 0) + return NULL; + + return protolayer_sess_data_get(s, layer_ix); +} + /** Gets layer-specific iteration data for the layer with the specified index * from the context. */ static inline struct protolayer_data *protolayer_iter_data_get( @@ -358,19 +378,6 @@ void *protolayer_iter_data_get_current(struct protolayer_iter_ctx *ctx) return protolayer_iter_data_get(ctx, ctx->layer_ix); } -static inline ssize_t session2_get_protocol( - struct session2 *s, enum protolayer_type protocol) -{ - const struct protolayer_grp *grp = &protolayer_grps[s->proto]; - for (ssize_t i = 0; i < grp->num_layers; i++) { - enum protolayer_type found = grp->layers[i]; - if (protocol == found) - return i; - } - - return -1; -} - static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx) { unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP) @@ -1684,6 +1691,12 @@ static int session2_transport_event(struct session2 *s, if (s->closing) return kr_ok(); + if (event == PROTOLAYER_EVENT_EOF) { + // no layer wanted to retain TCP half-closed state + session2_force_close(s); + return kr_ok(); + } + bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE || event == PROTOLAYER_EVENT_FORCE_CLOSE); if (is_close_event) { diff --git a/daemon/session2.h b/daemon/session2.h index 957df6d9c..73b88d32e 100644 --- a/daemon/session2.h +++ b/daemon/session2.h @@ -334,6 +334,8 @@ typedef void (*protolayer_finished_cb)(int status, struct session2 *session, XX(MALFORMED) \ /** Signal that a connection has ended. */\ XX(DISCONNECT) \ + /** Signal EOF from peer (e.g. half-closed TCP connection). */\ + XX(EOF) \ /** Failed task send - update stats. */\ XX(STATS_SEND_ERR) \ /** Outgoing query submission - update stats. */\ @@ -535,6 +537,10 @@ size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue); * queue iterators, as it does not need to iterate through the whole queue. */ bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue); +/** Gets layer-specific session data for the specified protocol layer. + * Returns NULL if the layer is not present in the session. */ +void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol); + /** Gets layer-specific session data for the last processed layer. * To be used after returning from its callback for async continuation but before calling protolayer_continue. */ void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx); diff --git a/daemon/tls.c b/daemon/tls.c index 231bff2d5..6b3436f71 100644 --- a/daemon/tls.c +++ b/daemon/tls.c @@ -1325,6 +1325,12 @@ static enum protolayer_event_cb_result pl_tls_event_unwrap( return PROTOLAYER_EVENT_PROPAGATE; } + if (event == PROTOLAYER_EVENT_EOF) { + // TCP half-closed state not allowed + session2_force_close(s); + return PROTOLAYER_EVENT_CONSUME; + } + if (tls->client_side) { if (event == PROTOLAYER_EVENT_CONNECT) return pl_tls_client_connect_start(tls, s); diff --git a/daemon/worker.c b/daemon/worker.c index c14f927f8..8596e93bf 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -103,6 +103,14 @@ struct qr_task qr_task_free((task)); \ } while (0) +struct pl_dns_stream_sess_data { + struct protolayer_data h; + bool single : 1; /**< True: Stream only allows a single packet */ + bool produced : 1; /**< True: At least one packet has been produced */ + bool connected : 1; /**< True: The stream is connected */ + bool half_closed : 1; /**< True: EOF was received, the stream is half-closed */ +}; + /* Forward decls */ static void qr_task_free(struct qr_task *task); static int qr_task_step(struct qr_task *task, @@ -122,7 +130,6 @@ static struct session2* worker_find_tcp_waiting(const struct sockaddr* addr); static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt); - struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */ struct worker_ctx *the_worker = NULL; @@ -995,6 +1002,18 @@ static int qr_task_finalize(struct qr_task *task, int state) session2_close(source_session); } + if (source_session->stream && !source_session->closing) { + struct pl_dns_stream_sess_data *stream = + protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_MULTI_STREAM); + if (!stream) + stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_UNSIZED_STREAM); + if (!stream) + stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_SINGLE_STREAM); + if (stream && stream->half_closed) { + session2_force_close(source_session); + } + } + qr_task_unref(task); if (ret != kr_ok() || state != KR_STATE_DONE) @@ -1804,13 +1823,6 @@ static enum protolayer_iter_cb_result pl_dns_dgram_unwrap( } } -struct pl_dns_stream_sess_data { - struct protolayer_data h; - bool single : 1; /**< True: Stream only allows a single packet */ - bool produced : 1; /**< True: At least one packet has been produced */ - bool connected : 1; /**< True: The stream is connected */ -}; - static int pl_dns_stream_sess_init(struct session2 *session, void *sess_data, void *param) { @@ -2015,6 +2027,16 @@ static enum protolayer_event_cb_result pl_dns_stream_disconnected( return PROTOLAYER_EVENT_PROPAGATE; } +static enum protolayer_event_cb_result pl_dns_stream_eof( + struct session2 *session, struct pl_dns_stream_sess_data *stream) +{ + if (!session2_is_empty(session)) { + stream->half_closed = true; + return PROTOLAYER_EVENT_CONSUME; + } + return PROTOLAYER_EVENT_PROPAGATE; +} + static enum protolayer_event_cb_result pl_dns_stream_event_unwrap( enum protolayer_event_type event, void **baton, struct session2 *session, void *sess_data) @@ -2046,6 +2068,9 @@ static enum protolayer_event_cb_result pl_dns_stream_event_unwrap( case PROTOLAYER_EVENT_FORCE_CLOSE: return pl_dns_stream_disconnected(session, stream); + case PROTOLAYER_EVENT_EOF: + return pl_dns_stream_eof(session, stream); + default: return PROTOLAYER_EVENT_PROPAGATE; } -- 2.47.2