From f7e2271ef61ab5a2b2d676f199393197a82e20b4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Oto=20=C5=A0=C5=A5=C3=A1va?= Date: Thu, 3 Feb 2022 14:06:01 +0100 Subject: [PATCH] daemon: use flags from proxy header + refactor comm data --- daemon/io.c | 34 ++++++++++++---- daemon/io.h | 23 +++++++++++ daemon/lua/kres-gen-30.lua | 1 + daemon/lua/kres-gen-31.lua | 1 + daemon/session.c | 7 ++-- daemon/session.h | 5 +-- daemon/worker.c | 82 +++++++++++++++++++++++++------------- daemon/worker.h | 12 +++--- lib/resolve.h | 3 +- 9 files changed, 118 insertions(+), 50 deletions(-) diff --git a/daemon/io.c b/daemon/io.c index e1e8e8af4..001ed0f50 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -90,6 +90,8 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, ssize_t data_len = nread; const struct sockaddr *src_addr = comm_addr; const struct sockaddr *dst_addr = NULL; + struct proxy_result proxy; + bool has_proxy = false; if (!session_flags(s)->outgoing && proxy_header_present(data, data_len)) { if (!proxy_allowed(&the_worker->engine->net, comm_addr)) { kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n", @@ -97,7 +99,6 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, return; } - struct proxy_result proxy; ssize_t trimmed = proxy_process_header(&proxy, s, data, data_len); if (trimmed == KNOT_EMALF) { if (kr_log_is_debug(IO, NULL)) { @@ -116,6 +117,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, } if (proxy.command == PROXY2_CMD_PROXY && proxy.family != AF_UNSPEC) { + has_proxy = true; src_addr = &proxy.src_addr.ip; dst_addr = &proxy.dst_addr.ip; @@ -133,7 +135,13 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, ssize_t consumed = session_wirebuf_consume(s, data, data_len); kr_assert(consumed == data_len); - session_wirebuf_process(s, src_addr, comm_addr, dst_addr); + struct io_comm_data comm = { + .src_addr = src_addr, + .comm_addr = comm_addr, + .dst_addr = dst_addr, + .proxy = (has_proxy) ? &proxy : NULL + }; + session_wirebuf_process(s, &comm); session_wirebuf_discard(s); mp_flush(the_worker->pkt_pool.ctx); } @@ -342,6 +350,8 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) ssize_t data_len = nread; const struct sockaddr *src_addr = session_get_peer(s); const struct sockaddr *dst_addr = NULL; + struct proxy_result proxy; + bool has_proxy = false; if (!session_flags(s)->outgoing && !session_flags(s)->no_proxy && proxy_header_present(data, data_len)) { if (!proxy_allowed(&the_worker->engine->net, src_addr)) { @@ -354,7 +364,6 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } - struct proxy_result proxy; ssize_t trimmed = proxy_process_header(&proxy, s, data, data_len); if (trimmed < 0) { if (kr_log_is_debug(IO, NULL)) { @@ -375,6 +384,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) } if (proxy.command != PROXY2_CMD_LOCAL && proxy.family != AF_UNSPEC) { + has_proxy = true; src_addr = &proxy.src_addr.ip; dst_addr = &proxy.dst_addr.ip; @@ -437,7 +447,13 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) consumed = session_wirebuf_consume(s, data, data_len); kr_assert(consumed == data_len); - int ret = session_wirebuf_process(s, src_addr, session_get_peer(s), dst_addr); + struct io_comm_data comm = { + .src_addr = src_addr, + .comm_addr = session_get_peer(s), + .dst_addr = dst_addr, + .proxy = (has_proxy) ? &proxy : NULL + }; + int ret = session_wirebuf_process(s, &comm); if (ret < 0) { /* An error has occurred, close the session. */ worker_end_tcp(s); @@ -923,10 +939,12 @@ static void xdp_rx(uv_poll_t* handle, int status, int events) if (kpkt == NULL) { ret = kr_error(ENOMEM); } else { - ret = worker_submit(xhd->session, - (const struct sockaddr *)&msg->ip_from, - (const struct sockaddr *)&msg->ip_from, - (const struct sockaddr *)&msg->ip_to, + struct io_comm_data comm = { + .src_addr = (const struct sockaddr *)&msg->ip_from, + .comm_addr = (const struct sockaddr *)&msg->ip_from, + .dst_addr = (const struct sockaddr *)&msg->ip_to + }; + ret = worker_submit(xhd->session, &comm, msg->eth_from, msg->eth_to, kpkt); } if (ret) diff --git a/daemon/io.h b/daemon/io.h index ff9ba0bce..bc1e800a8 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -16,6 +16,29 @@ struct tls_ctx; struct tls_client_ctx; struct io_stream_data; +/** Communication data. */ +struct io_comm_data { + /** The original address the data came from. May be that of a proxied + * client, if they came through a proxy. May be `NULL` if + * the communication did not come from network. */ + const struct sockaddr *src_addr; + + /** The actual address the resolver is communicating with. May be + * the address of a proxy if the communication came through one, + * otherwise it will be the same as `src_addr`. May be `NULL` if + * the communication did not come from network. */ + const struct sockaddr *comm_addr; + + /** The original destination address. May be the resolver's address, or + * the address of a proxy if the communication came through one. May be + * `NULL` if the communication did not come from network. */ + const struct sockaddr *dst_addr; + + /** Data parsed from a PROXY header. May be `NULL` if the communication + * did not come through a proxy, or if the PROXYv2 protocol was not used. */ + const struct proxy_result *proxy; +}; + /** Bind address into a file-descriptor (only, no libuv). type is e.g. SOCK_DGRAM */ int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags); /** Initialize a UDP handle and start listening. */ diff --git a/daemon/lua/kres-gen-30.lua b/daemon/lua/kres-gen-30.lua index a34102e8c..fd5a14aac 100644 --- a/daemon/lua/kres-gen-30.lua +++ b/daemon/lua/kres-gen-30.lua @@ -209,6 +209,7 @@ struct kr_request { const struct sockaddr *dst_addr; const knot_pkt_t *packet; struct kr_request_qsource_flags flags; + struct kr_request_qsource_flags comm_flags; size_t size; int32_t stream_id; kr_http_header_array_t headers; diff --git a/daemon/lua/kres-gen-31.lua b/daemon/lua/kres-gen-31.lua index 0a87bcc7f..d526b3347 100644 --- a/daemon/lua/kres-gen-31.lua +++ b/daemon/lua/kres-gen-31.lua @@ -209,6 +209,7 @@ struct kr_request { const struct sockaddr *dst_addr; const knot_pkt_t *packet; struct kr_request_qsource_flags flags; + struct kr_request_qsource_flags comm_flags; size_t size; int32_t stream_id; kr_http_header_array_t headers; diff --git a/daemon/session.c b/daemon/session.c index 0c1bbac2f..3d14b951c 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -11,6 +11,7 @@ #include "daemon/http.h" #include "daemon/worker.h" #include "daemon/io.h" +#include "daemon/proxyv2.h" #include "lib/generic/queue.h" #define TLS_CHUNK_SIZE (16 * 1024) @@ -749,9 +750,7 @@ void session_unpoison(struct session *session) kr_asan_unpoison(session, sizeof(*session)); } -int session_wirebuf_process( - struct session *session, const struct sockaddr *src_addr, - const struct sockaddr *comm_addr, const struct sockaddr *dst_addr) +int session_wirebuf_process(struct session *session, struct io_comm_data *comm) { int ret = 0; if (session->wire_buf_start_idx == session->wire_buf_end_idx) @@ -766,7 +765,7 @@ int session_wirebuf_process( (ret < max_iterations)) { if (kr_fails_assert(!session_wirebuf_error(session))) return -1; - int res = worker_submit(session, src_addr, comm_addr, dst_addr, NULL, NULL, pkt); + int res = worker_submit(session, comm, NULL, NULL, pkt); /* Errors from worker_submit() are intentionally *not* handled in order to * ensure the entire wire buffer is processed. */ if (res == kr_ok()) diff --git a/daemon/session.h b/daemon/session.h index abe1ddd1f..723f71d47 100644 --- a/daemon/session.h +++ b/daemon/session.h @@ -13,6 +13,7 @@ struct qr_task; struct worker_ctx; struct session; +struct io_comm_data; struct session_flags { bool outgoing : 1; /**< True: to upstream; false: from a client. */ @@ -131,9 +132,7 @@ size_t session_wirebuf_get_free_size(struct session *session); void session_wirebuf_discard(struct session *session); /** Move all data to the beginning of the buffer. */ void session_wirebuf_compress(struct session *session); -int session_wirebuf_process( - struct session *session, const struct sockaddr *src_addr, - const struct sockaddr *comm_addr, const struct sockaddr *dst_addr); +int session_wirebuf_process(struct session *session, struct io_comm_data *comm); ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len); /** Trims `len` bytes from the start of the session's wire buffer. diff --git a/daemon/worker.c b/daemon/worker.c index 0f9983d60..0a27547d1 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -27,6 +27,7 @@ #include "daemon/bindings/api.h" #include "daemon/engine.h" #include "daemon/io.h" +#include "daemon/proxyv2.h" #include "daemon/session.h" #include "daemon/tls.h" #include "daemon/http.h" @@ -353,13 +354,11 @@ static inline bool is_tcp_waiting(struct sockaddr *address) { * in case the request didn't come from network. */ static struct request_ctx *request_create(struct worker_ctx *worker, - struct session *session, - const struct sockaddr *addr, - const struct sockaddr *comm_addr, - const struct sockaddr *dst_addr, - const uint8_t *eth_from, - const uint8_t *eth_to, - uint32_t uid) + struct session *session, + struct io_comm_data *comm, + const uint8_t *eth_from, + const uint8_t *eth_to, + uint32_t uid) { knot_mm_t pool = { .ctx = pool_borrow(worker), @@ -405,13 +404,30 @@ static struct request_ctx *request_create(struct worker_ctx *worker, req->pool = pool; req->vars_ref = LUA_NOREF; req->uid = uid; - req->qsource.flags.xdp = is_xdp; + req->qsource.comm_flags.xdp = is_xdp; kr_request_set_extended_error(req, KNOT_EDNS_EDE_NONE, NULL); array_init(req->qsource.headers); if (session) { - req->qsource.flags.tcp = session_get_handle(session)->type == UV_TCP; - req->qsource.flags.tls = session_flags(session)->has_tls; - req->qsource.flags.http = session_flags(session)->has_http; + const struct sockaddr *src_addr = NULL; + const struct sockaddr *comm_addr = NULL; + const struct sockaddr *dst_addr = NULL; + const struct proxy_result *proxy = NULL; + if (comm) { + src_addr = comm->src_addr; + comm_addr = comm->comm_addr; + dst_addr = comm->dst_addr; + proxy = comm->proxy; + } + + req->qsource.comm_flags.tcp = session_get_handle(session)->type == UV_TCP; + req->qsource.comm_flags.tls = session_flags(session)->has_tls; + req->qsource.comm_flags.http = session_flags(session)->has_http; + + req->qsource.flags = req->qsource.comm_flags; + if (proxy) { + req->qsource.flags.tcp = proxy->protocol == SOCK_STREAM; + } + req->qsource.stream_id = -1; #if ENABLE_DOH2 if (req->qsource.flags.http) { @@ -426,16 +442,30 @@ static struct request_ctx *request_create(struct worker_ctx *worker, } #endif /* We need to store a copy of peer address. */ - memcpy(&ctx->source.addr.ip, addr, kr_sockaddr_len(addr)); - req->qsource.addr = &ctx->source.addr.ip; + if (src_addr) { + memcpy(&ctx->source.addr.ip, src_addr, kr_sockaddr_len(src_addr)); + req->qsource.addr = &ctx->source.addr.ip; + } else { + req->qsource.addr = NULL; + } + if (!comm_addr) - comm_addr = addr; - memcpy(&ctx->source.comm_addr.ip, comm_addr, kr_sockaddr_len(comm_addr)); - req->qsource.comm_addr = &ctx->source.comm_addr.ip; + comm_addr = src_addr; + if (comm_addr) { + memcpy(&ctx->source.comm_addr.ip, comm_addr, kr_sockaddr_len(comm_addr)); + req->qsource.comm_addr = &ctx->source.comm_addr.ip; + } else { + req->qsource.comm_addr = NULL; + } + if (!dst_addr) /* We wouldn't have to copy in this case, but for consistency. */ dst_addr = session_get_sockname(session); - memcpy(&ctx->source.dst_addr.ip, dst_addr, kr_sockaddr_len(dst_addr)); - req->qsource.dst_addr = &ctx->source.dst_addr.ip; + if (dst_addr) { + memcpy(&ctx->source.dst_addr.ip, dst_addr, kr_sockaddr_len(dst_addr)); + req->qsource.dst_addr = &ctx->source.dst_addr.ip; + } else { + req->qsource.dst_addr = NULL; + } } req->selection_context.is_tls_capable = is_tls_capable; @@ -1802,10 +1832,8 @@ static int parse_packet(knot_pkt_t *query) return ret; } -int worker_submit(struct session *session, - const struct sockaddr *src_addr, const struct sockaddr *comm_addr, - const struct sockaddr *dst_addr, - const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt) +int worker_submit(struct session *session, struct io_comm_data *comm, + const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt) { if (!session || !pkt) return kr_error(EINVAL); @@ -1849,8 +1877,8 @@ int worker_submit(struct session *session, const struct sockaddr *addr = NULL; if (!is_outgoing) { /* request from a client */ struct request_ctx *ctx = - request_create(the_worker, session, src_addr, comm_addr, dst_addr, - eth_from, eth_to, knot_wire_get_id(pkt->wire)); + request_create(the_worker, session, comm, eth_from, + eth_to, knot_wire_get_id(pkt->wire)); if (http_ctx) queue_pop(http_ctx->streams); if (!ctx) @@ -1881,7 +1909,7 @@ int worker_submit(struct session *session, } if (kr_fails_assert(!session_flags(session)->closing)) return kr_error(EINVAL); - addr = src_addr; + addr = (comm) ? comm->src_addr : NULL; /* Note receive time for RTT calculation */ task->recv_time = kr_now(); } @@ -2089,8 +2117,8 @@ struct qr_task *worker_resolve_start(knot_pkt_t *query, struct kr_qflags options return NULL; - struct request_ctx *ctx = request_create(worker, NULL, NULL, NULL, NULL, NULL, NULL, - worker->next_request_uid); + struct request_ctx *ctx = request_create(worker, NULL, NULL, NULL, NULL, + worker->next_request_uid); if (!ctx) return NULL; diff --git a/daemon/worker.h b/daemon/worker.h index 5543ab705..169a6d52e 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -17,6 +17,8 @@ struct worker_ctx; struct session; /** Zone import context (opaque). */ struct zone_import_ctx; +/** Data about the communication (defined in io.h). */ +struct io_comm_data; /** Pointer to the singleton worker. NULL if not initialized. */ KR_EXPORT extern struct worker_ctx *the_worker; @@ -32,17 +34,13 @@ void worker_deinit(void); * Process an incoming packet (query from a client or answer from upstream). * * @param session session the packet came from, or NULL (not from network) - * @param src_addr original address the packet came from, or NULL (not from network) - * @param comm_addr actual address the packet came from, or NULL (then the same as src_addr). - * May be different from peer if the packet went through a proxy with PROXYv2 enabled. + * @param comm IO communication data (see `struct io_comm_data` docs) * @param eth_* MAC addresses or NULL (they're useful for XDP) * @param pkt the packet, or NULL (an error from the transport layer) * @return 0 or an error code */ -int worker_submit(struct session *session, - const struct sockaddr *src_addr, const struct sockaddr *comm_addr, - const struct sockaddr *dst_addr, - const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt); +int worker_submit(struct session *session, struct io_comm_data *comm, + const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt); /** * End current DNS/TCP session, this disassociates pending tasks from this session diff --git a/lib/resolve.h b/lib/resolve.h index 437812ef5..7b7efb044 100644 --- a/lib/resolve.h +++ b/lib/resolve.h @@ -220,7 +220,8 @@ struct kr_request { * closely related: issue #173. */ const struct sockaddr *dst_addr; const knot_pkt_t *packet; - struct kr_request_qsource_flags flags; /**< See definition above. */ + struct kr_request_qsource_flags flags; /**< Flags for the original client. */ + struct kr_request_qsource_flags comm_flags; /**< Flags for the actual client (may be a proxy). */ size_t size; /**< query packet size */ int32_t stream_id; /**< HTTP/2 stream ID for DoH requests */ kr_http_header_array_t headers; /**< HTTP/2 headers for DoH requests */ -- 2.47.3