#include "lib/log.h"
#include "lib/utils.h"
#include "daemon/io.h"
+#include "daemon/udp_queue.h"
#include "daemon/worker.h"
#include "daemon/session2.h"
static int session2_transport_pushv(struct session2 *s,
- const struct iovec *iov, int iovcnt,
+ struct iovec *iov, int iovcnt,
const void *target,
protolayer_finished_cb cb, void *baton);
static inline int session2_transport_push(struct session2 *s,
}
-struct parent_pushv_ctx {
+struct session2_pushv_ctx {
struct session2 *session;
protolayer_finished_cb cb;
const void *target;
const void *target,
void *baton)
{
- struct parent_pushv_ctx *ctx = baton;
+ struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
ctx->cb(status, ctx->session, target, ctx->baton);
free(ctx->buf);
free(ctx);
}
-static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status)
+static void session2_transport_udp_queue_pushv_finished(int status, void *baton)
{
- struct parent_pushv_ctx *ctx = req->data;
+ struct session2_pushv_ctx *ctx = baton;
if (ctx->cb)
ctx->cb(status, ctx->session, ctx->target, ctx->baton);
free(ctx->buf);
free(ctx);
- free(req);
}
-static void session2_transport_stream_pushv_finished(uv_write_t *req, int status)
+static void session2_transport_udp_pushv_finished(uv_udp_send_t *req, int status)
{
- struct parent_pushv_ctx *ctx = req->data;
+ struct session2_pushv_ctx *ctx = req->data;
if (ctx->cb)
ctx->cb(status, ctx->session, ctx->target, ctx->baton);
free(ctx->buf);
free(req);
}
-static int concat_iovs(const struct iovec *iov, int iovcnt, char **buf, size_t *buf_len)
+static void session2_transport_stream_pushv_finished(uv_write_t *req, int status)
{
- if (!iov || iovcnt <= 0)
- return kr_error(ENODATA);
-
- size_t len = 0;
- for (int i = 0; i < iovcnt; i++) {
- size_t old_len = len;
- len += iov[i].iov_len;
- if (kr_fails_assert(len >= old_len)) {
- *buf = NULL;
- return kr_error(EFBIG);
- }
- }
-
- *buf_len = len;
- if (len == 0) {
- *buf = NULL;
- return kr_ok();
- }
-
- *buf = malloc(len);
- kr_require(*buf);
-
- char *c = *buf;
- for (int i = 0; i < iovcnt; i++) {
- if (iov[i].iov_len == 0)
- continue;
- memcpy(c, iov[i].iov_base, iov[i].iov_len);
- c += iov[i].iov_len;
- }
-
- return kr_ok();
+ struct session2_pushv_ctx *ctx = req->data;
+ if (ctx->cb)
+ ctx->cb(status, ctx->session, ctx->target, ctx->baton);
+ free(ctx->buf);
+ free(ctx);
+ free(req);
}
static int session2_transport_pushv(struct session2 *s,
- const struct iovec *iov, int iovcnt,
+ struct iovec *iov, int iovcnt,
const void *target,
protolayer_finished_cb cb, void *baton)
{
if (kr_fails_assert(s))
return kr_error(EINVAL);
- struct parent_pushv_ctx *ctx = malloc(sizeof(*ctx));
+ struct session2_pushv_ctx *ctx = malloc(sizeof(*ctx));
kr_require(ctx);
- *ctx = (struct parent_pushv_ctx){
+ *ctx = (struct session2_pushv_ctx){
.session = s,
.cb = cb,
.baton = baton,
return kr_error(EINVAL);
}
+ /* TODO: XDP */
if (handle->type == UV_UDP) {
- uv_udp_send_t *req = malloc(sizeof(*req));
- req->data = ctx;
- uv_udp_send(req, (uv_udp_t *)handle,
- (uv_buf_t *)iov, iovcnt, target,
- session2_transport_udp_pushv_finished);
- return kr_ok();
+ if (ENABLE_SENDMMSG && !s->outgoing) {
+ int fd;
+ int ret = uv_fileno(handle, &fd);
+ if (kr_fails_assert(!ret))
+ return kr_error(EIO);
+
+ /* TODO: support multiple iovecs properly? */
+ if (kr_fails_assert(iovcnt == 1))
+ return kr_error(EINVAL);
+
+ udp_queue_push(fd, target, iov->iov_base, iov->iov_len,
+ session2_transport_udp_queue_pushv_finished,
+ ctx);
+ return kr_ok();
+ } else {
+ uv_udp_send_t *req = malloc(sizeof(*req));
+ req->data = ctx;
+ uv_udp_send(req, (uv_udp_t *)handle,
+ (uv_buf_t *)iov, iovcnt, target,
+ session2_transport_udp_pushv_finished);
+ return kr_ok();
+ }
} else if (handle->type == UV_TCP) {
uv_write_t *req = malloc(sizeof(*req));
req->data = ctx;
free(ctx);
return kr_error(EINVAL);
}
- int ret = concat_iovs(iov, iovcnt, &ctx->buf, &ctx->buf_len);
- if (ret) {
- free(ctx);
- return ret;
- }
- session2_wrap(parent, protolayer_buffer(ctx->buf, ctx->buf_len),
+ int ret = session2_wrap(parent, protolayer_iovec(iov, iovcnt),
target, session2_transport_parent_pushv_finished,
ctx);
- return kr_ok();
+ return (ret < 0) ? ret : kr_ok();
default:
kr_assert(false && "Invalid transport");
#include "kresconfig.h"
#include "daemon/udp_queue.h"
-#include "daemon/worker.h"
+#include "daemon/session2.h"
#include "lib/generic/array.h"
#include "lib/utils.h"
return 0;
}
/* Appease the linker in case this unused call isn't optimized out. */
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len,
+ udp_queue_cb cb, void *baton)
{
abort();
}
int len; /**< The number of messages in the queue: 0..UDP_QUEUE_LEN */
struct mmsghdr msgvec[UDP_QUEUE_LEN]; /**< Parameter for sendmmsg() */
struct {
- struct qr_task *task; /**< Links for completion callbacks. */
+ udp_queue_cb cb;
+ void *cb_baton;
struct iovec msg_iov[1]; /**< storage for .msgvec[i].msg_iov */
} items[UDP_QUEUE_LEN];
} udp_queue_t;
/* ATM we don't really do anything about failures. */
int err = sent_len < 0 ? errno : EAGAIN /* unknown error, really */;
for (int i = 0; i < q->len; ++i) {
- qr_task_on_send(q->items[i].task, NULL, i < sent_len ? 0 : err);
- worker_task_unref(q->items[i].task);
+ if (q->items[i].cb)
+ q->items[i].cb(i < sent_len ? 0 : err, q->items[i].cb_baton);
}
q->len = 0;
}
return ret;
}
-void udp_queue_push(int fd, struct kr_request *req, struct qr_task *task)
+void udp_queue_push(int fd, const struct sockaddr *sa, char *buf, size_t buf_len,
+ udp_queue_cb cb, void *baton)
{
if (fd < 0) {
kr_log_error(SYSTEM, "ERROR: called udp_queue_push(fd = %d, ...)\n", fd);
abort();
}
- worker_task_ref(task);
/* Get a valid correct queue. */
if (fd >= state.udp_queues_len) {
const int new_len = fd + 1;
udp_queue_t *const q = state.udp_queues[fd];
/* Append to the queue */
- struct sockaddr *sa = (struct sockaddr *)/*const-cast*/req->qsource.comm_addr;
- q->msgvec[q->len].msg_hdr.msg_name = sa;
+ q->msgvec[q->len].msg_hdr.msg_name = (void *)sa;
q->msgvec[q->len].msg_hdr.msg_namelen = kr_sockaddr_len(sa);
- q->items[q->len].task = task;
+ q->items[q->len].cb = cb;
+ q->items[q->len].cb_baton = baton;
q->items[q->len].msg_iov[0] = (struct iovec){
- .iov_base = req->answer->wire,
- .iov_len = req->answer->size,
+ .iov_base = buf,
+ .iov_len = buf_len,
};
if (q->len == 0)
array_push(state.waiting_fds, fd);
#include "daemon/session2.h"
#include "daemon/tls.h"
#include "daemon/http.h"
-#include "daemon/udp_queue.h"
#include "lib/layer.h"
#include "lib/utils.h"
qr_task_ref(task);
/* Send back answer */
- int ret;
- const uv_handle_t *src_handle = session2_get_handle(source_session);
- /* TODO: this should probably just be a _wrap? */
- if (kr_fails_assert(src_handle->type == UV_UDP || src_handle->type == UV_TCP
- || src_handle->type == UV_POLL)) {
- ret = kr_error(EINVAL);
- } else if (src_handle->type == UV_POLL) {
- ret = xdp_push(task, src_handle);
- } else if (src_handle->type == UV_UDP && ENABLE_SENDMMSG) {
- int fd;
- ret = uv_fileno(src_handle, &fd);
- if (ret == 0)
- udp_queue_push(fd, &ctx->req, task);
- else
- kr_assert(false);
- } else {
- ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
- }
+ /* TODO: xdp */
+ int ret = qr_task_send(task, source_session, &ctx->source.comm_addr.ip, ctx->req.answer);
if (ret != kr_ok()) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));