}
static void callback_finished_free_baton(int status, struct session2 *session,
- const void *target, void *baton)
+ const struct comm_info *comm, void *baton)
{
free(baton);
}
}
static void udp_on_unwrapped(int status, struct session2 *session,
- const void *target, void *baton)
+ const struct comm_info *comm, void *baton)
{
wire_buf_reset(&session->wire_buf);
}
return;
}
- session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), comm_addr,
+ struct comm_info in_comm = {
+ .comm_addr = comm_addr,
+ .src_addr = comm_addr
+ };
+ session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), &in_comm,
udp_on_unwrapped, NULL);
}
struct pl_udp_iter_data {
PROTOLAYER_DATA_HEADER();
struct proxy_result proxy;
- struct comm_info comm;
bool has_proxy;
};
-static int pl_udp_iter_init(struct protolayer_manager *manager,
- struct protolayer_iter_ctx *ctx,
- void *iter_data)
-{
- struct pl_udp_iter_data *udp = iter_data;
- ctx->comm = &udp->comm;
- return kr_ok();
-}
-
static enum protolayer_iter_cb_result pl_udp_unwrap(
void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
{
char *data = ctx->payload.buffer.buf;
ssize_t data_len = ctx->payload.buffer.len;
- struct comm_info *comm = ctx->comm;
- comm->comm_addr = ctx->target;
- comm->src_addr = ctx->target;
+ struct comm_info *comm = &ctx->comm;
if (!s->outgoing && proxy_header_present(data, data_len)) {
if (!proxy_allowed(comm->comm_addr)) {
kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
bool has_proxy : 1;
};
+static int pl_tcp_sess_init(struct protolayer_manager *manager,
+ void *data,
+ void *param)
+{
+ struct sockaddr *peer = session2_get_peer(manager->session);
+ manager->session->comm = (struct comm_info) {
+ .comm_addr = peer,
+ .src_addr = peer
+ };
+ return 0;
+}
+
static int pl_tcp_sess_deinit(struct protolayer_manager *manager, void *sess_data)
{
struct pl_tcp_sess_data *tcp = sess_data;
char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */
ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf);
- struct comm_info *comm = ctx->comm;
- comm->src_addr = peer;
- comm->comm_addr = peer;
- comm->dst_addr = NULL;
+ struct comm_info *comm = &ctx->manager->session->comm;
if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) {
if (!proxy_allowed(comm->src_addr)) {
if (kr_log_is_debug(IO, NULL)) {
}
tcp->had_data = true;
-
+ ctx->comm = ctx->manager->session->comm;
return protolayer_continue(ctx);
}
{
protolayer_globals[PROTOLAYER_UDP] = (struct protolayer_globals){
.iter_size = sizeof(struct pl_udp_iter_data),
- .iter_init = pl_udp_iter_init,
.unwrap = pl_udp_unwrap,
.event_wrap = pl_udp_event_wrap,
};
protolayer_globals[PROTOLAYER_TCP] = (struct protolayer_globals){
.sess_size = sizeof(struct pl_tcp_sess_data),
+ .sess_init = pl_tcp_sess_init,
.sess_deinit = pl_tcp_sess_deinit,
.unwrap = pl_tcp_unwrap,
.event_wrap = pl_tcp_event_wrap,
return 0;
}
-/* TODO: xdp */
-//#if ENABLE_XDP
-//static void xdp_rx(uv_poll_t* handle, int status, int events)
-//{
-// const int XDP_RX_BATCH_SIZE = 64;
-// if (status < 0) {
-// kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
-// return;
-// }
-// if (events != UV_READABLE) {
-// kr_log_error(XDP, "poll unexpected events: %d\n", events);
-// return;
-// }
-//
-// xdp_handle_data_t *xhd = handle->data;
-// kr_require(xhd && xhd->session && xhd->socket);
-// uint32_t rcvd;
-// knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
-// int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
-// #if KNOT_VERSION_HEX >= 0x030100
-// , NULL
-// #endif
-// );
-//
-// if (kr_fails_assert(ret == KNOT_EOK)) {
-// /* ATM other error codes can only be returned when called incorrectly */
-// kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
-// return;
-// }
-// kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
-// kr_require(rcvd <= XDP_RX_BATCH_SIZE);
-// for (int i = 0; i < rcvd; ++i) {
-// const knot_xdp_msg_t *msg = &msgs[i];
-// kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
-// knot_pkt_t *kpkt = knot_pkt_new(msg->payload.iov_base, msg->payload.iov_len,
-// &the_worker->pkt_pool);
-// if (kpkt == NULL) {
-// ret = kr_error(ENOMEM);
-// } else {
-// struct io_comm_data comm = {
-// .src_addr = (const struct sockaddr *)&msg->ip_from,
-// .comm_addr = (const struct sockaddr *)&msg->ip_from,
-// .dst_addr = (const struct sockaddr *)&msg->ip_to
-// };
-// ret = worker_submit(xhd->session, &comm,
-// msg->eth_from, msg->eth_to, kpkt);
-// }
-// if (ret)
-// kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
-// mp_flush(the_worker->pkt_pool.ctx);
-// }
-// knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
-//}
-///// Warn if the XDP program is running in emulated mode (XDP_SKB)
-//static void xdp_warn_mode(const char *ifname)
-//{
-// if (kr_fails_assert(ifname))
-// return;
-//
-// const unsigned if_index = if_nametoindex(ifname);
-// if (!if_index) {
-// kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
-// ifname, strerror(errno));
-// return;
-// }
-//
-// const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
-// switch (mode) {
-// case KNOT_XDP_MODE_FULL:
-// return;
-// case KNOT_XDP_MODE_EMUL:
-// kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
-// ifname);
-// return;
-// case KNOT_XDP_MODE_NONE: // enum warnings from compiler
-// break;
-// }
-// kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
-// ifname, (int)mode);
-//}
-//int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
-//{
-// if (!ep || !ep->handle) {
-// return kr_error(EINVAL);
-// }
-//
-// // RLIMIT_MEMLOCK often needs raising when operating on BPF
-// static int ret_limit = 1;
-// if (ret_limit == 1) {
-// struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
-// ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
-// ? kr_error(errno) : 0;
-// }
-// if (ret_limit) return ret_limit;
-//
-// xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
-// if (!xhd) return kr_error(ENOMEM);
-//
-// xhd->socket = NULL; // needed for some reason
-//
-// // This call is a libknot version hell, unfortunately.
-// int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
-// #if KNOT_VERSION_HEX < 0x030100
-// ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL,
-// KNOT_XDP_LOAD_BPF_MAYBE
-// #elif KNOT_VERSION_HEX < 0x030200
-// ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
-// KNOT_XDP_LOAD_BPF_MAYBE
-// #else
-// KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
-// ep->port, 0/*quic_port*/,
-// KNOT_XDP_LOAD_BPF_MAYBE,
-// NULL/*xdp_config*/
-// #endif
-// );
-//
-// if (!ret) xdp_warn_mode(ifname);
-//
-// if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
-// if (ret || kr_fails_assert(xhd->socket)) {
-// free(xhd);
-// return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
-// }
-// xhd->tx_waker.data = xhd->socket;
-//
-// ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
-// ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
-// if (ret) {
-// knot_xdp_deinit(xhd->socket);
-// free(xhd);
-// return kr_error(ret);
-// }
-//
-// // beware: this sets poll_handle->data
-// xhd->session = session_new(ep->handle, false, false);
-// kr_require(!session_flags(xhd->session)->outgoing);
-// session_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
-//
-// ep->handle->data = xhd;
-// ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
-// return ret;
-//}
-//#endif
+#if ENABLE_XDP
+static void xdp_rx(uv_poll_t* handle, int status, int events)
+{
+ const int XDP_RX_BATCH_SIZE = 64;
+ if (status < 0) {
+ kr_log_error(XDP, "poll status %d: %s\n", status, uv_strerror(status));
+ return;
+ }
+ if (events != UV_READABLE) {
+ kr_log_error(XDP, "poll unexpected events: %d\n", events);
+ return;
+ }
+
+ xdp_handle_data_t *xhd = handle->data;
+ kr_require(xhd && xhd->session && xhd->socket);
+ uint32_t rcvd;
+ knot_xdp_msg_t msgs[XDP_RX_BATCH_SIZE];
+ int ret = knot_xdp_recv(xhd->socket, msgs, XDP_RX_BATCH_SIZE, &rcvd
+ #if KNOT_VERSION_HEX >= 0x030100
+ , NULL
+ #endif
+ );
+
+ if (kr_fails_assert(ret == KNOT_EOK)) {
+ /* ATM other error codes can only be returned when called incorrectly */
+ kr_log_error(XDP, "knot_xdp_recv(): %d, %s\n", ret, knot_strerror(ret));
+ return;
+ }
+ kr_log_debug(XDP, "poll triggered, processing a batch of %d packets\n", (int)rcvd);
+ kr_require(rcvd <= XDP_RX_BATCH_SIZE);
+ for (int i = 0; i < rcvd; ++i) {
+ knot_xdp_msg_t *msg = &msgs[i];
+ kr_require(msg->payload.iov_len <= KNOT_WIRE_MAX_PKTSIZE);
+ struct comm_info comm = {
+ .src_addr = (const struct sockaddr *)&msg->ip_from,
+ .comm_addr = (const struct sockaddr *)&msg->ip_from,
+ .dst_addr = (const struct sockaddr *)&msg->ip_to,
+ .xdp = true
+ };
+ memcpy(comm.eth_from, msg->eth_from, sizeof(comm.eth_from));
+ memcpy(comm.eth_to, msg->eth_to, sizeof(comm.eth_to));
+ session2_unwrap(xhd->session,
+ protolayer_buffer(msg->payload.iov_base, msg->payload.iov_len),
+ &comm, NULL, NULL);
+ if (ret)
+ kr_log_debug(XDP, "worker_submit() == %d: %s\n", ret, kr_strerror(ret));
+ mp_flush(the_worker->pkt_pool.ctx);
+ }
+ knot_xdp_recv_finish(xhd->socket, msgs, rcvd);
+}
+/// Warn if the XDP program is running in emulated mode (XDP_SKB)
+static void xdp_warn_mode(const char *ifname)
+{
+ if (kr_fails_assert(ifname))
+ return;
+
+ const unsigned if_index = if_nametoindex(ifname);
+ if (!if_index) {
+ kr_log_warning(XDP, "warning: interface %s, unexpected error when converting its name: %s\n",
+ ifname, strerror(errno));
+ return;
+ }
+
+ const knot_xdp_mode_t mode = knot_eth_xdp_mode(if_index);
+ switch (mode) {
+ case KNOT_XDP_MODE_FULL:
+ return;
+ case KNOT_XDP_MODE_EMUL:
+ kr_log_warning(XDP, "warning: interface %s running only with XDP emulation\n",
+ ifname);
+ return;
+ case KNOT_XDP_MODE_NONE: // enum warnings from compiler
+ break;
+ }
+ kr_log_warning(XDP, "warning: interface %s running in unexpected XDP mode %d\n",
+ ifname, (int)mode);
+}
+int io_listen_xdp(uv_loop_t *loop, struct endpoint *ep, const char *ifname)
+{
+ if (!ep || !ep->handle) {
+ return kr_error(EINVAL);
+ }
+
+ // RLIMIT_MEMLOCK often needs raising when operating on BPF
+ static int ret_limit = 1;
+ if (ret_limit == 1) {
+ struct rlimit no_limit = { RLIM_INFINITY, RLIM_INFINITY };
+ ret_limit = setrlimit(RLIMIT_MEMLOCK, &no_limit)
+ ? kr_error(errno) : 0;
+ }
+ if (ret_limit) return ret_limit;
+
+ xdp_handle_data_t *xhd = malloc(sizeof(*xhd));
+ if (!xhd) return kr_error(ENOMEM);
+
+ xhd->socket = NULL; // needed for some reason
+ queue_init(xhd->tx_waker_queue);
+
+ // This call is a libknot version hell, unfortunately.
+ int ret = knot_xdp_init(&xhd->socket, ifname, ep->nic_queue,
+ #if KNOT_VERSION_HEX < 0x030100
+ ep->port ? ep->port : KNOT_XDP_LISTEN_PORT_ALL,
+ KNOT_XDP_LOAD_BPF_MAYBE
+ #elif KNOT_VERSION_HEX < 0x030200
+ ep->port ? ep->port : (KNOT_XDP_LISTEN_PORT_PASS | 0),
+ KNOT_XDP_LOAD_BPF_MAYBE
+ #else
+ KNOT_XDP_FILTER_UDP | (ep->port ? 0 : KNOT_XDP_FILTER_PASS),
+ ep->port, 0/*quic_port*/,
+ KNOT_XDP_LOAD_BPF_MAYBE,
+ NULL/*xdp_config*/
+ #endif
+ );
+
+ if (!ret) xdp_warn_mode(ifname);
+
+ if (!ret) ret = uv_idle_init(loop, &xhd->tx_waker);
+ if (ret || kr_fails_assert(xhd->socket)) {
+ free(xhd);
+ return ret == 0 ? kr_error(EINVAL) : kr_error(ret);
+ }
+ xhd->tx_waker.data = xhd;
+
+ ep->fd = knot_xdp_socket_fd(xhd->socket); // probably not useful
+ ret = uv_poll_init(loop, (uv_poll_t *)ep->handle, ep->fd);
+ if (ret) {
+ knot_xdp_deinit(xhd->socket);
+ free(xhd);
+ return kr_error(ret);
+ }
+
+ xhd->session = session2_new_io(ep->handle, PROTOLAYER_GRP_DOUDP,
+ NULL, 0, false);
+ kr_require(xhd->session);
+ session2_get_sockname(xhd->session)->sa_family = AF_XDP; // to have something in there
+
+ ep->handle->data = xhd;
+ ret = uv_poll_start((uv_poll_t *)ep->handle, UV_READABLE, xdp_rx);
+ return ret;
+}
+#endif
int io_create(uv_loop_t *loop, uv_handle_t *handle, int type, unsigned family,
enum protolayer_grp grp,
uv_close((uv_handle_t *)&xhd->tx_waker, NULL);
session2_free(xhd->session);
knot_xdp_deinit(xhd->socket);
+ queue_deinit(xhd->tx_waker_queue);
free(xhd);
#else
kr_assert(false);
struct knot_xdp_socket *socket;
struct session2 *session;
uv_idle_t tx_waker;
+ queue_t(void *) tx_waker_queue;
} xdp_handle_data_t;
}
if (is_xdp) {
-// #if ENABLE_XDP
-// uv_poll_t *ep_handle = malloc(sizeof(uv_poll_t));
-// ep->handle = (uv_handle_t *)ep_handle;
-// ret = !ep->handle ? ENOMEM
-// : io_listen_xdp(the_network->loop, ep, addr_str);
-// #else
+ #if ENABLE_XDP
+ uv_poll_t *ep_handle = malloc(sizeof(uv_poll_t));
+ ep->handle = (uv_handle_t *)ep_handle;
+ ret = !ep->handle ? ENOMEM
+ : io_listen_xdp(the_network->loop, ep, addr_str);
+ #else
ret = ESOCKTNOSUPPORT;
-// #endif
+ #endif
goto finish_ret;
} /* else */
* SPDX-License-Identifier: GPL-3.0-or-later
*/
+#include "kresconfig.h"
+
#include <ucw/lib.h>
#include <sys/socket.h>
+#if ENABLE_XDP
+ #include <libknot/xdp/xdp.h>
+#endif
+
#include "lib/log.h"
#include "lib/utils.h"
#include "daemon/io.h"
/* Forward decls. */
static int session2_transport_pushv(struct session2 *s,
struct iovec *iov, int iovcnt,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton);
static inline int session2_transport_push(struct session2 *s,
char *buf, size_t buf_len,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton);
static int session2_transport_event(struct session2 *s,
enum protolayer_event_type event,
ctx->layer_ix, layer_name_ctx(ctx), ctx->status);
if (ctx->finished_cb)
- ctx->finished_cb(ret, session, ctx->finished_cb_target,
+ ctx->finished_cb(ret, session, &ctx->comm,
ctx->finished_cb_baton);
free(ctx);
return ret;
}
-static void protolayer_push_finished(int status, struct session2 *s, const void *target, void *baton)
+static void protolayer_push_finished(int status, struct session2 *s, const struct comm_info *comm, void *baton)
{
struct protolayer_iter_ctx *ctx = baton;
ctx->status = status;
if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
session2_transport_push(session,
ctx->payload.buffer.buf, ctx->payload.buffer.len,
- ctx->target, protolayer_push_finished, ctx);
+ &ctx->comm, protolayer_push_finished, ctx);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
session2_transport_pushv(session,
ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
- ctx->target, protolayer_push_finished, ctx);
+ &ctx->comm, protolayer_push_finished, ctx);
} else {
kr_assert(false && "Invalid payload type");
return kr_error(EINVAL);
static int protolayer_manager_submit(
struct protolayer_manager *manager,
enum protolayer_direction direction, size_t layer_ix,
- struct protolayer_payload payload, const void *target,
+ struct protolayer_payload payload, const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size);
*ctx = (struct protolayer_iter_ctx) {
.payload = payload,
- .target = target,
- .comm = &manager->session->comm,
+ .comm = (comm) ? *comm : manager->session->comm,
.direction = direction,
.layer_ix = layer_ix,
.manager = manager,
.finished_cb = cb,
- .finished_cb_target = target,
.finished_cb_baton = baton
};
return s;
}
-static void session2_timer_on_close(uv_handle_t *handle)
+void session2_free(struct session2 *s)
{
- struct session2 *s = handle->data;
protolayer_manager_free(s->layers);
wire_buf_deinit(&s->wire_buf);
mm_ctx_delete(&s->pool);
free(s);
}
-void session2_free(struct session2 *s)
-{
- uv_close((uv_handle_t *)&s->timer, session2_timer_on_close);
-}
-
int session2_start_read(struct session2 *session)
{
if (session->transport.type == SESSION2_TRANSPORT_IO)
}
int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
- const void *target, protolayer_finished_cb cb, void *baton)
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton)
{
return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
- struct protolayer_payload payload, const void *target,
- protolayer_finished_cb cb, void *baton)
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton)
{
ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1;
if (layer_ix < 0)
return layer_ix;
return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
int session2_wrap(struct session2 *s, struct protolayer_payload payload,
- const void *target, protolayer_finished_cb cb, void *baton)
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton)
{
return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP,
s->layers->num_layers - 1,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
- struct protolayer_payload payload, const void *target,
- protolayer_finished_cb cb, void *baton)
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton)
{
ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1;
if (layer_ix < 0)
return layer_ix;
return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix,
- payload, target, cb, baton);
+ payload, comm, cb, baton);
}
static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton)
struct session2_pushv_ctx {
struct session2 *session;
protolayer_finished_cb cb;
- const void *target;
+ const struct comm_info *comm;
void *baton;
char *buf;
static void session2_transport_parent_pushv_finished(int status,
struct session2 *session,
- const void *target,
+ const struct comm_info *comm,
void *baton)
{
struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
- ctx->cb(status, ctx->session, target, ctx->baton);
+ ctx->cb(status, ctx->session, comm, ctx->baton);
free(ctx->buf);
free(ctx);
}
{
struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
free(ctx->buf);
free(ctx);
}
{
struct session2_pushv_ctx *ctx = req->data;
if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
free(ctx->buf);
free(ctx);
free(req);
{
struct session2_pushv_ctx *ctx = req->data;
if (ctx->cb)
- ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ ctx->cb(status, ctx->session, ctx->comm, ctx->baton);
free(ctx->buf);
free(ctx);
free(req);
}
+#if ENABLE_XDP
+static void xdp_tx_waker(uv_idle_t *handle)
+{
+ xdp_handle_data_t *xhd = handle->data;
+ int ret = knot_xdp_send_finish(xhd->socket);
+ if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
+ kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
+ /* Apparently some drivers need many explicit wake-up calls
+ * even if we push no additional packets (in case they accumulated a lot) */
+ if (ret != KNOT_EAGAIN)
+ uv_idle_stop(handle);
+ knot_xdp_send_prepare(xhd->socket);
+ /* LATER(opt.): it _might_ be better for performance to do these two steps
+ * at different points in time */
+ while (queue_len(xhd->tx_waker_queue)) {
+ struct session2_pushv_ctx *ctx = queue_head(xhd->tx_waker_queue);
+ if (ctx->cb)
+ ctx->cb(kr_ok(), ctx->session, ctx->comm, ctx->baton);
+ free(ctx);
+ queue_pop(xhd->tx_waker_queue);
+ }
+}
+#endif
+
static int session2_transport_pushv(struct session2 *s,
struct iovec *iov, int iovcnt,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
if (kr_fails_assert(s))
.session = s,
.cb = cb,
.baton = baton,
- .target = target
+ .comm = comm
};
switch (s->transport.type) {
uv_handle_t *handle = s->transport.io.handle;
if (kr_fails_assert(handle)) {
if (cb)
- cb(kr_error(EINVAL), s, target, baton);
+ cb(kr_error(EINVAL), s, comm, baton);
free(ctx);
return kr_error(EINVAL);
}
- /* TODO: XDP */
if (handle->type == UV_UDP) {
if (ENABLE_SENDMMSG && !s->outgoing) {
int fd;
if (kr_fails_assert(iovcnt == 1))
return kr_error(EINVAL);
- udp_queue_push(fd, target, iov->iov_base, iov->iov_len,
+ udp_queue_push(fd, comm->comm_addr, iov->iov_base, iov->iov_len,
session2_transport_udp_queue_pushv_finished,
ctx);
return kr_ok();
uv_udp_send_t *req = malloc(sizeof(*req));
req->data = ctx;
int ret = uv_udp_send(req, (uv_udp_t *)handle,
- (uv_buf_t *)iov, iovcnt, target,
+ (uv_buf_t *)iov, iovcnt, comm->comm_addr,
session2_transport_udp_pushv_finished);
if (ret) {
if (cb)
- cb(ret, s, target, baton);
+ cb(ret, s, comm, baton);
free(req);
free(ctx);
}
session2_transport_stream_pushv_finished);
if (ret) {
if (cb)
- cb(ret, s, target, baton);
+ cb(ret, s, comm, baton);
free(req);
free(ctx);
}
return ret;
+#if ENABLE_XDP
+ } else if (handle->type == UV_POLL) {
+ xdp_handle_data_t *xhd = handle->data;
+ if (kr_fails_assert(xhd && xhd->socket))
+ return kr_error(EIO);
+
+ /* TODO: support multiple iovecs properly? */
+ if (kr_fails_assert(iovcnt == 1))
+ return kr_error(EINVAL);
+
+ knot_xdp_msg_t msg;
+#if KNOT_VERSION_HEX >= 0x030100
+ /* We don't have a nice way of preserving the _msg_t from frame allocation,
+ * so we manually redo all other parts of knot_xdp_send_alloc() */
+ memset(&msg, 0, sizeof(msg));
+ bool ipv6 = comm->comm_addr->sa_family == AF_INET6;
+ msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0;
+ memcpy(msg.eth_from, comm->eth_from, sizeof(comm->eth_from));
+ memcpy(msg.eth_to, comm->eth_to, sizeof(comm->eth_to));
+#endif
+ const struct sockaddr *ip_from = comm->dst_addr;
+ const struct sockaddr *ip_to = comm->comm_addr;
+ memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
+ memcpy(&msg.ip_to, ip_to, kr_sockaddr_len(ip_to));
+ msg.payload = *iov;
+
+ uint32_t sent;
+ int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
+
+ queue_push(xhd->tx_waker_queue, ctx);
+ uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
+ kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
+
+ return kr_ok();
+#endif
} else {
kr_assert(false && "Unsupported handle");
if (cb)
- cb(kr_error(EINVAL), s, target, baton);
+ cb(kr_error(EINVAL), s, comm, baton);
free(ctx);
return kr_error(EINVAL);
}
return kr_error(EINVAL);
}
int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
- target, session2_transport_parent_pushv_finished,
+ comm, session2_transport_parent_pushv_finished,
ctx);
return (ret < 0) ? ret : kr_ok();
static void session2_transport_single_push_finished(int status,
struct session2 *s,
- const void *target,
+ const struct comm_info *comm,
void *baton)
{
struct push_ctx *ctx = baton;
if (ctx->cb)
- ctx->cb(status, s, target, ctx->baton);
+ ctx->cb(status, s, comm, ctx->baton);
free(ctx);
}
static inline int session2_transport_push(struct session2 *s,
char *buf, size_t buf_len,
- const void *target,
+ const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
struct push_ctx *ctx = malloc(sizeof(*ctx));
.baton = baton
};
- return session2_transport_pushv(s, &ctx->iov, 1, target,
+ return session2_transport_pushv(s, &ctx->iov, 1, comm,
session2_transport_single_push_finished, ctx);
}
static int session2_handle_close(struct session2 *s, uv_handle_t *handle)
{
+ if (kr_fails_assert(s->transport.type == SESSION2_TRANSPORT_IO
+ && s->transport.io.handle == handle))
+ return kr_error(EINVAL);
+
io_stop_read(handle);
+ uv_close((uv_handle_t *)&s->timer, NULL);
uv_close(handle, on_session2_handle_close);
return kr_ok();
}
struct session2;
struct protolayer_iter_ctx;
+/** Type of MAC addresses. */
+typedef uint8_t ethaddr_t[6];
+
/** Information about the transport - addresses and proxy. */
struct comm_info {
/** The original address the data came from. May be that of a proxied
/** Data parsed from a PROXY header. May be `NULL` if the communication
* did not come through a proxy, or if the PROXYv2 protocol was not used. */
const struct proxy_result *proxy;
+
+ /** Pointer to protolayer-specific data, e.g. a key to decide, which
+ * sub-session to use. */
+ void *target;
+
+ /* XDP data */
+ ethaddr_t eth_from;
+ ethaddr_t eth_to;
+ bool xdp:1;
};
/** Protocol layer types - the individual implementations of protocol layers.
* function.
* `baton` is the `baton` parameter passed to the `session2_(un)wrap` function. */
typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
- const void *target, void *baton);
+ const struct comm_info *comm, void *baton);
#define PROTOLAYER_EVENT_MAP(XX) \
/* read-write: */
/** The payload */
struct protolayer_payload payload;
- /** Transport information (e.g. UDP sender address). May be `NULL`. */
- const void *target;
/** Communication information. Typically written into by one of the
- * first layers facilitating transport protocol processing.
- * Points to session-wide comm info by default, may be changed
- * by a layer to point elsewhere. */
- struct comm_info *comm;
+ * first layers facilitating transport protocol processing. */
+ struct comm_info comm;
/* callback for when the layer iteration has ended - read-only: */
protolayer_finished_cb finished_cb;
- const void *finished_cb_target;
void *finished_cb_baton;
/* internal information for the manager - private: */
return s;
}
-/** De-allocates the session. */
+/** De-allocates the session. Must only be called once the underlying IO handle
+ * and timer are already closed, otherwise may leak resources. */
void session2_free(struct session2 *s);
/** Start reading from the underlying transport. */
}
/** Sends the specified `payload` to be processed in the `_UNWRAP` direction by
- * the session's protocol layers. The `target` parameter may contain a pointer
- * to transport-specific data, e.g. for UDP, it shall contain a pointer to the
- * sender's `struct sockaddr_*`.
+ * the session's protocol layers.
+ *
+ * The `comm` parameter may contain a pointer to comm data, e.g. for UDP, that
+ * comm data shall contain a pointer to the sender's `struct sockaddr_*`. If
+ * `comm` is `NULL`, session-wide data shall be used.
*
* Note that the payload data may be modified by any of the layers, to avoid
* making copies. Once the payload is passed to this function, the content of
* Returns one of `enum protolayer_ret` or a negative number
* indicating an error. */
int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
- const void *target, protolayer_finished_cb cb, void *baton);
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton);
/** Same as `session2_unwrap`, but looks up the specified `protocol` in the
* session's assigned protocol group and sends the `payload` to the layer that
* Layers may use this to generate their own data to send in the sequence, e.g.
* for protocol-specific ceremony. */
int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
- struct protolayer_payload payload, const void *target,
- protolayer_finished_cb cb, void *baton);
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton);
/** Sends the specified `payload` to be processed in the `_WRAP` direction by
* the session's protocol layers. The `target` parameter may contain a pointer
* Returns one of `enum protolayer_ret` or a negative number
* indicating an error. */
int session2_wrap(struct session2 *s, struct protolayer_payload payload,
- const void *target, protolayer_finished_cb cb, void *baton);
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton);
/** Same as `session2_wrap`, but looks up the specified `protocol` in the
* session's assigned protocol group and sends the `payload` to the layer that
* Layers may use this to generate their own data to send in the sequence, e.g.
* for protocol-specific ceremony. */
int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
- struct protolayer_payload payload, const void *target,
- protolayer_finished_cb cb, void *baton);
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton);
/** Sends an event to be synchronously processed by the protocol layers of the
* specified session. The layers are first iterated through in the `_UNWRAP`
};
static void kres_gnutls_push_finished(int status, struct session2 *session,
- const void *target, void *baton)
+ const struct comm_info *comm, void *baton)
{
struct kres_gnutls_push_ctx *push_ctx = baton;
struct pl_tls_sess_data *tls = push_ctx->sess_data;
/** Local address. For AF_XDP we couldn't use session's,
* as the address might be different every time. */
union kr_sockaddr dst_addr;
- /** MAC addresses - ours [0] and router's [1], in case of AF_XDP socket. */
- uint8_t eth_addrs[2][6];
+
+ /** Router's MAC address for XDP. */
+ ethaddr_t eth_from;
+ /** Our MAC address for XDP. */
+ ethaddr_t eth_to;
+ /** Whether XDP was used. */
+ bool xdp : 1;
} source;
};
const struct sockaddr *packet_source,
knot_pkt_t *packet);
static int qr_task_send(struct qr_task *task, struct session2 *session,
- const struct sockaddr *addr, knot_pkt_t *pkt);
+ const struct comm_info *comm, knot_pkt_t *pkt);
static int qr_task_finalize(struct qr_task *task, int state);
static void qr_task_complete(struct qr_task *task);
static int worker_add_tcp_connected(const struct sockaddr* addr, struct session2 *session);
*maxlen = MIN(*maxlen, out.payload.iov_len);
#if KNOT_VERSION_HEX < 0x030100
/* It's most convenient to fill the MAC addresses at this point. */
- memcpy(out.eth_from, &ctx->source.eth_addrs[0], 6);
- memcpy(out.eth_to, &ctx->source.eth_addrs[1], 6);
+ memcpy(out.eth_from, &ctx->source.eth_from, 6);
+ memcpy(out.eth_to, &ctx->source.eth_to, 6);
#endif
return out.payload.iov_base;
}
return;
if (likely(ans->wire == NULL)) /* sent most likely */
return;
+ if (!ctx->source.session)
+ return;
/* We know it's an AF_XDP socket; otherwise alloc_wire_cb isn't assigned. */
uv_handle_t *handle = session2_get_handle(ctx->source.session);
- if (kr_fails_assert(handle->type == UV_POLL))
+ if (!handle || kr_fails_assert(handle->type == UV_POLL))
return;
xdp_handle_data_t *xhd = handle->data;
/* Freeing is done by sending an empty packet (the API won't really send it). */
*/
static struct request_ctx *request_create(struct session2 *session,
struct comm_info *comm,
- const uint8_t *eth_from,
- const uint8_t *eth_to,
uint32_t uid)
{
knot_mm_t pool = {
return NULL;
}
ctx->source.session = session;
- if (kr_fails_assert(!!eth_to == !!eth_from)) {
- pool_release(pool.ctx);
- return NULL;
- }
- const bool is_xdp = eth_to != NULL;
- if (is_xdp) {
+ if (comm && comm->xdp) {
#if ENABLE_XDP
if (kr_fails_assert(session)) {
pool_release(pool.ctx);
return NULL;
}
- memcpy(&ctx->source.eth_addrs[0], eth_to, sizeof(ctx->source.eth_addrs[0]));
- memcpy(&ctx->source.eth_addrs[1], eth_from, sizeof(ctx->source.eth_addrs[1]));
+ memcpy(ctx->source.eth_to, comm->eth_to, sizeof(ctx->source.eth_to));
+ memcpy(ctx->source.eth_from, comm->eth_from, sizeof(ctx->source.eth_from));
ctx->req.alloc_wire_cb = alloc_wire_cb;
#else
kr_assert(!EINVAL);
req->pool = pool;
req->vars_ref = LUA_NOREF;
req->uid = uid;
- req->qsource.comm_flags.xdp = is_xdp;
+ req->qsource.comm_flags.xdp = comm && comm->xdp;
kr_request_set_extended_error(req, KNOT_EDNS_EDE_NONE, NULL);
array_init(req->qsource.headers);
if (session) {
}
static void qr_task_wrap_finished(int status, struct session2 *session,
- const void *target, void *baton)
+ const struct comm_info *comm, void *baton)
{
struct qr_task *task = baton;
qr_task_on_send(task, session, status);
}
static int qr_task_send(struct qr_task *task, struct session2 *session,
- const struct sockaddr *addr, knot_pkt_t *pkt)
+ const struct comm_info *comm, knot_pkt_t *pkt)
{
if (!session)
return qr_task_on_send(task, NULL, kr_error(EIO));
int ret = 0;
- if (addr == NULL)
- addr = session2_get_peer(session);
+ if (comm == NULL)
+ comm = &session->comm;
if (pkt == NULL)
pkt = worker_task_get_pktbuf(task);
qr_task_ref(task);
struct protolayer_payload payload = protolayer_buffer((char *)pkt->wire, pkt->size);
payload.ttl = packet_ttl(pkt);
- ret = session2_wrap(session, payload, addr, qr_task_wrap_finished, task);
+ ret = session2_wrap(session, payload, comm, qr_task_wrap_finished, task);
if (ret >= 0) {
session2_touch(session);
}
/* Update outgoing query statistics */
- if (session->outgoing && addr) {
+ if (session->outgoing && comm) {
session2_event(session, PROTOLAYER_EVENT_STATS_QRY_OUT, NULL);
- if (addr->sa_family == AF_INET6)
+ if (comm->comm_addr->sa_family == AF_INET6)
the_worker->stats.ipv6 += 1;
- else if (addr->sa_family == AF_INET)
+ else if (comm->comm_addr->sa_family == AF_INET)
the_worker->stats.ipv4 += 1;
}
return ret;
kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
memcpy(peer, addr, kr_sockaddr_len(addr));
- ret = qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf);
+ struct comm_info out_comm = {
+ .comm_addr = (struct sockaddr *)choice
+ };
+ ret = qr_task_send(task, session, &out_comm, task->pktbuf);
if (ret) {
session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
return ret;
return true;
}
-//#if ENABLE_XDP
-//static void xdp_tx_waker(uv_idle_t *handle)
-//{
-// int ret = knot_xdp_send_finish(handle->data);
-// if (ret != KNOT_EAGAIN && ret != KNOT_EOK)
-// kr_log_error(XDP, "check: ret = %d, %s\n", ret, knot_strerror(ret));
-// /* Apparently some drivers need many explicit wake-up calls
-// * even if we push no additional packets (in case they accumulated a lot) */
-// if (ret != KNOT_EAGAIN)
-// uv_idle_stop(handle);
-// knot_xdp_send_prepare(handle->data);
-// /* LATER(opt.): it _might_ be better for performance to do these two steps
-// * at different points in time */
-//}
-//#endif
-//
-///** Send an answer packet over XDP. */
-//static int xdp_push(struct qr_task *task, const uv_handle_t *src_handle)
-//{
-//#if ENABLE_XDP
-// struct request_ctx *ctx = task->ctx;
-// xdp_handle_data_t *xhd = src_handle->data;
-// if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
-// return qr_task_on_send(task, NULL, kr_error(EINVAL));
-//
-// knot_xdp_msg_t msg;
-//#if KNOT_VERSION_HEX >= 0x030100
-// /* We don't have a nice way of preserving the _msg_t from frame allocation,
-// * so we manually redo all other parts of knot_xdp_send_alloc() */
-// memset(&msg, 0, sizeof(msg));
-// bool ipv6 = ctx->source.addr.ip.sa_family == AF_INET6;
-// msg.flags = ipv6 ? KNOT_XDP_MSG_IPV6 : 0;
-// memcpy(msg.eth_from, &ctx->source.eth_addrs[0], 6);
-// memcpy(msg.eth_to, &ctx->source.eth_addrs[1], 6);
-//#endif
-// const struct sockaddr *ip_from = &ctx->source.dst_addr.ip;
-// const struct sockaddr *ip_to = &ctx->source.comm_addr.ip;
-// memcpy(&msg.ip_from, ip_from, kr_sockaddr_len(ip_from));
-// memcpy(&msg.ip_to, ip_to, kr_sockaddr_len(ip_to));
-// msg.payload.iov_base = ctx->req.answer->wire;
-// msg.payload.iov_len = ctx->req.answer->size;
-//
-// uint32_t sent;
-// int ret = knot_xdp_send(xhd->socket, &msg, 1, &sent);
-// ctx->req.answer->wire = NULL; /* it's been freed */
-//
-// uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
-// kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
-//
-// return qr_task_on_send(task, xhd->session, ret);
-//#else
-// kr_assert(!EINVAL);
-// return kr_error(EINVAL);
-//#endif
-//}
-
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_require(task && task->leading == false);
qr_task_ref(task);
/* Send back answer */
- /* TODO: xdp */
- int ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
+ struct comm_info out_comm = {
+ .src_addr = &ctx->source.addr.ip,
+ .dst_addr = &ctx->source.dst_addr.ip,
+ .comm_addr = &ctx->source.comm_addr.ip,
+ .xdp = ctx->source.xdp
+ };
+ if (ctx->source.xdp) {
+ memcpy(out_comm.eth_from, ctx->source.eth_from, sizeof(out_comm.eth_from));
+ memcpy(out_comm.eth_to, ctx->source.eth_to, sizeof(out_comm.eth_to));
+ }
+ int ret = qr_task_send(task, source_session, &out_comm, ctx->req.answer);
if (ret != kr_ok()) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));
}
}
-int worker_submit(struct session2 *session, struct comm_info *comm,
- const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
+static int worker_submit(struct session2 *session, struct comm_info *comm, knot_pkt_t *pkt)
{
if (!session || !pkt)
return kr_error(EINVAL);
const struct sockaddr *addr = NULL;
if (!is_outgoing) { /* request from a client */
struct request_ctx *ctx =
- request_create(session, comm, eth_from,
- eth_to, knot_wire_get_id(pkt->wire));
+ request_create(session, comm, knot_wire_get_id(pkt->wire));
if (!ctx)
return kr_error(ENOMEM);
return NULL;
- struct request_ctx *ctx = request_create(NULL, NULL, NULL, NULL,
- the_worker->next_request_uid);
+ struct request_ctx *ctx = request_create(NULL, NULL, the_worker->next_request_uid);
if (!ctx)
return NULL;
break;
}
- ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+ ret = worker_submit(session, &ctx->comm, pkt);
if (ret)
break;
}
if (!pkt)
return protolayer_break(ctx, KNOT_EMALF);
- int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+ int ret = worker_submit(session, &ctx->comm, pkt);
mp_flush(the_worker->pkt_pool.ctx);
return protolayer_break(ctx, ret);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
if (!pkt)
return protolayer_break(ctx, KNOT_EMALF);
- int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+ int ret = worker_submit(session, &ctx->comm, pkt);
wire_buf_reset(ctx->payload.wire_buf);
mp_flush(the_worker->pkt_pool.ctx);
return protolayer_break(ctx, ret);
if (stream_sess->single && stream_sess->produced) {
if (kr_log_is_debug(WORKER, NULL)) {
kr_log_debug(WORKER, "Unexpected extra data from %s\n",
- kr_straddr(ctx->comm->src_addr));
+ kr_straddr(ctx->comm.src_addr));
}
worker_end_tcp(session);
status = KNOT_EMALF;
goto exit;
}
- int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
+ int ret = worker_submit(session, &ctx->comm, pkt);
wire_buf_movestart(wb);
if (ret == kr_ok()) {
iters += 1;