return PROTOLAYER_EVENT_PROPAGATE;
}
+static int pl_tcp_sess_init(struct session2 *session,
+ void *data, void *param)
+{
+ struct sockaddr *peer = session2_get_peer(session);
+ session->comm_storage = (struct comm_info) {
+ .comm_addr = peer,
+ .src_addr = peer
+ };
+ return 0;
+}
+
static enum protolayer_event_cb_result pl_tcp_event_wrap(
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
};
protolayer_globals[PROTOLAYER_TYPE_TCP] = (struct protolayer_globals){
+ .sess_init = pl_tcp_sess_init,
.event_wrap = pl_tcp_event_wrap,
};
}
char *data = ctx->payload.buffer.buf;
ssize_t data_len = ctx->payload.buffer.len;
- struct comm_info *comm = &ctx->comm;
+ struct comm_info *comm = ctx->comm;
if (!s->outgoing && proxy_header_present(data, data_len)) {
if (!proxy_allowed(comm->comm_addr)) {
kr_log_debug(IO, "<= ignoring PROXYv2 UDP from disallowed address '%s'\n",
bool has_proxy : 1;
};
-static int pl_proxyv2_stream_sess_init(struct session2 *session,
- void *data, void *param)
-{
- struct sockaddr *peer = session2_get_peer(session);
- session->comm = (struct comm_info) {
- .comm_addr = peer,
- .src_addr = peer
- };
- return 0;
-}
-
static enum protolayer_iter_cb_result pl_proxyv2_stream_unwrap(
void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
{
char *data = wire_buf_data(ctx->payload.wire_buf); /* layer's or session's wirebuf */
ssize_t data_len = wire_buf_data_length(ctx->payload.wire_buf);
- struct comm_info *comm = &ctx->session->comm;
+ struct comm_info *comm = ctx->comm;
if (!s->outgoing && !tcp->had_data && proxy_header_present(data, data_len)) {
if (!proxy_allowed(comm->src_addr)) {
if (kr_log_is_debug(IO, NULL)) {
}
tcp->had_data = true;
- ctx->comm = ctx->session->comm;
return protolayer_continue(ctx);
}
protolayer_globals[PROTOLAYER_TYPE_PROXYV2_STREAM] = (struct protolayer_globals){
.sess_size = sizeof(struct pl_proxyv2_stream_sess_data),
- .sess_init = pl_proxyv2_stream_sess_init,
.unwrap = pl_proxyv2_stream_unwrap,
};
}
ctx->layer_ix, layer_name_ctx(ctx), ctx->status);
if (ctx->finished_cb)
- ctx->finished_cb(ret, s, &ctx->comm,
+ ctx->finished_cb(ret, s, ctx->comm,
ctx->finished_cb_baton);
mm_ctx_delete(&ctx->pool);
session2_transport_push(session,
ctx->payload.buffer.buf, ctx->payload.buffer.len,
ctx->payload.short_lived,
- &ctx->comm, protolayer_push_finished, ctx);
+ ctx->comm, protolayer_push_finished, ctx);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
session2_transport_pushv(session,
ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
ctx->payload.short_lived,
- &ctx->comm, protolayer_push_finished, ctx);
+ ctx->comm, protolayer_push_finished, ctx);
} else {
kr_assert(false && "Invalid payload type");
return kr_error(EINVAL);
/** Processes as many layers as possible synchronously, returning when either
* a layer has gone asynchronous, or when the whole sequence has finished.
*
- * May be called multiple times on the same `ctx` to continue processing
- * after an asynchronous operation. */
+ * May be called multiple times on the same `ctx` to continue processing after
+ * an asynchronous operation - user code will do this via *layer sequence return
+ * functions*. */
static int protolayer_step(struct protolayer_iter_ctx *ctx)
{
while (true) {
enum protolayer_type protocol = protolayer_grps[ctx->session->proto].layers[ctx->layer_ix];
struct protolayer_globals *globals = &protolayer_globals[protocol];
+ bool was_async = ctx->async_mode;
ctx->async_mode = false;
- ctx->status = 0;
- ctx->action = PROTOLAYER_ITER_ACTION_NULL;
- protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP)
- ? globals->unwrap : globals->wrap;
+ /* Basically if we went asynchronous, we want to "resume" from
+ * underneath this `if` block. */
+ if (!was_async) {
+ ctx->status = 0;
+ ctx->action = PROTOLAYER_ITER_ACTION_NULL;
- if (ctx->session->closing) {
- return protolayer_iter_ctx_finish(
- ctx, kr_error(ECANCELED));
- }
+ protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP)
+ ? globals->unwrap : globals->wrap;
- if (cb) {
- struct protolayer_data *sess_data = protolayer_sess_data_get(
- ctx->session, ctx->layer_ix);
- struct protolayer_data *iter_data = protolayer_iter_data_get(
- ctx, ctx->layer_ix);
- enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx);
- if (kr_fails_assert(result == PROTOLAYER_ITER_CB_RESULT_MAGIC)) {
- /* Callback did not use a continuation function to return. */
- return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
+ if (ctx->session->closing) {
+ return protolayer_iter_ctx_finish(
+ ctx, kr_error(ECANCELED));
}
- } else {
- ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
- }
+ if (cb) {
+ struct protolayer_data *sess_data = protolayer_sess_data_get(
+ ctx->session, ctx->layer_ix);
+ struct protolayer_data *iter_data = protolayer_iter_data_get(
+ ctx, ctx->layer_ix);
+ enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx);
+ if (kr_fails_assert(result == PROTOLAYER_ITER_CB_RESULT_MAGIC)) {
+ /* Callback did not use a *layer
+ * sequence return function* (see
+ * glossary). */
+ return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
+ }
+ } else {
+ ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
+ }
+
+ if (!ctx->action) {
+ /* We're going asynchronous - the next step is
+ * probably going to be from some sort of a
+ * callback and we will "resume" from underneath
+ * this `if` block. */
+ ctx->async_mode = true;
+ protolayer_payload_ensure_long_lived(ctx);
+ return PROTOLAYER_RET_ASYNC;
+ }
+ }
- if (!ctx->action) {
- /* Next step is from a callback */
- ctx->async_mode = true;
- protolayer_payload_ensure_long_lived(ctx);
- return PROTOLAYER_RET_ASYNC;
+ if (kr_fails_assert(ctx->action)) {
+ return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
}
if (ctx->action == PROTOLAYER_ITER_ACTION_BREAK) {
if (kr_fails_assert(ctx->status == 0)) {
/* Status should be zero without a BREAK. */
- return protolayer_iter_ctx_finish(ctx, kr_error(ECANCELED));
+ return protolayer_iter_ctx_finish(ctx, kr_error(EINVAL));
}
if (ctx->action == PROTOLAYER_ITER_ACTION_CONTINUE) {
if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);
+ bool had_comm_param = (comm != NULL);
+ if (!had_comm_param)
+ comm = &session->comm_storage;
+
struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
kr_require(ctx);
*ctx = (struct protolayer_iter_ctx) {
.payload = payload,
- .comm = (comm) ? *comm : session->comm,
.direction = direction,
.layer_ix = layer_ix,
.session = session,
.finished_cb = cb,
.finished_cb_baton = baton
};
+ if (had_comm_param) {
+ struct comm_addr_storage *addrst = &ctx->comm_addr_storage;
+ if (comm->src_addr) {
+ memcpy(&addrst->src_addr.ip, comm->src_addr,
+ kr_sockaddr_len(comm->src_addr));
+ ctx->comm_storage.src_addr = &addrst->src_addr.ip;
+ }
+ if (comm->comm_addr) {
+ memcpy(&addrst->comm_addr.ip, comm->comm_addr,
+ kr_sockaddr_len(comm->comm_addr));
+ ctx->comm_storage.comm_addr = &addrst->comm_addr.ip;
+ }
+ if (comm->dst_addr) {
+ memcpy(&addrst->dst_addr.ip, comm->dst_addr,
+ kr_sockaddr_len(comm->dst_addr));
+ ctx->comm_storage.dst_addr = &addrst->dst_addr.ip;
+ }
+ ctx->comm = &ctx->comm_storage;
+ } else {
+ ctx->comm = &session->comm_storage;
+ }
mm_ctx_mempool(&ctx->pool, CPU_PAGE_SIZE);
const struct protolayer_grp *grp = &protolayer_grps[session->proto];
return NULL;
}
-enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx)
+/** Called by *Layer sequence return functions* to proceed with protolayer
+ * processing. If the */
+static inline void maybe_async_do_step(struct protolayer_iter_ctx *ctx)
{
- if (ctx->async_mode) {
- protolayer_iter_ctx_next(ctx);
+ if (ctx->async_mode)
protolayer_step(ctx);
- } else {
- ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
- }
+}
+
+enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx)
+{
+ ctx->action = PROTOLAYER_ITER_ACTION_CONTINUE;
+ maybe_async_do_step(ctx);
return PROTOLAYER_ITER_CB_RESULT_MAGIC;
}
enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status)
{
ctx->status = status;
- if (ctx->async_mode) {
- protolayer_iter_ctx_finish(ctx, PROTOLAYER_RET_NORMAL);
- } else {
- ctx->action = PROTOLAYER_ITER_ACTION_BREAK;
- }
+ ctx->action = PROTOLAYER_ITER_ACTION_BREAK;
+ maybe_async_do_step(ctx);
return PROTOLAYER_ITER_CB_RESULT_MAGIC;
}
* processing, it is also the lifetime of `struct protolayer_iter_ctx` and
* layer-specific data contained therein.
*
+ * Layer sequence return function:
+ * - One of `protolayer_break()`, `protolayer_continue()`, or
+ * `protolayer_async()` - a function that a protolayer's `_wrap` or `_unwrap`
+ * callback should call to get its return value. They may either be called
+ * synchronously directly in the callback to end/pause the processing, or, if
+ * the processing went asynchronous, called to resume the iteration of layers.
+ *
* Payload:
* - Data processed by protocol layers in a particular sequence. In the wrap
* direction, this data generally starts as a DNS packet, which is then
bool xdp:1;
};
+/** Just a simple struct able to hold three IPv6 or IPv4 addresses, so that we
+ * can hold them somewhere. */
+struct comm_addr_storage {
+ union kr_sockaddr src_addr;
+ union kr_sockaddr comm_addr;
+ union kr_sockaddr dst_addr;
+};
+
/** A buffer control struct, with indices marking a chunk containing received
* but as of yet unprocessed data - the data in this chunk is called "valid
/* read-write for layers: */
/** The payload */
struct protolayer_payload payload;
- /** Communication information. Typically written into by one of the
- * first layers facilitating transport protocol processing. */
- struct comm_info comm;
+ /** Pointer to communication information. For TCP, this will generally
+ * point to the storage in the session. For UDP, this will generally
+ * point to the storage in this context. */
+ struct comm_info *comm;
+ /** Communication information storage. This will generally be set by one
+ * of the first layers in the sequence, if used, e.g. UDP PROXYv2. */
+ struct comm_info comm_storage;
+ struct comm_addr_storage comm_addr_storage;
/** Per-iter memory pool. Has no `free` procedure, gets freed as a whole
* when the context is being destroyed. Initialized and destroyed
* automatically - layers may use it to allocate memory. */
/* internal information for the manager - should only be used by the protolayer
* system, never by layers: */
enum protolayer_direction direction;
- /** If `true`, the processing of layers has been paused and is waiting
- * to be resumed or canceled. */
+ /** If `true`, the processing of the layer sequence has been paused and
+ * is waiting to be resumed (`protolayer_continue()`) or cancelled
+ * (`protolayer_break()`). */
bool async_mode;
/** The index of the layer that is currently being (or has just been)
* processed. */
};
/** Return value of `protolayer_iter_cb` callbacks. To be returned by *layer
- * sequence return functions* as a sanity check. Not to be used directly by
- * user code. */
+ * sequence return functions* (see glossary) as a sanity check. Not to be used
+ * directly by user code. */
enum protolayer_iter_cb_result {
PROTOLAYER_ITER_CB_RESULT_MAGIC = 0x364F392E,
};
*
* The function (or another function, that the pointed-to function causes to be
* called, directly or through an asynchronous operation), must call one of the
- * *layer sequence return functions* (e.g. `protolayer_continue()`,
- * `protolayer_async()`, ...) to advance (or end) the layer sequence. The
- * function must return the result of such a return function. */
+ * *layer sequence return functions* (see glossary) to advance (or end) the
+ * layer sequence. The function must return the result of such a return
+ * function. */
typedef enum protolayer_iter_cb_result (*protolayer_iter_cb)(
void *sess_data,
void *iter_data,
extern struct protolayer_globals protolayer_globals[PROTOLAYER_TYPE_COUNT];
-/** *Layer sequence return function* - signalizes the protolayer manager to
- * continue processing the next layer. */
+/** *Layer sequence return function* (see glossary) - signalizes the protolayer
+ * manager to continue processing the next layer. */
enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx);
-/** *Layer sequence return function* - signalizes that the layer wants to stop
- * processing of the buffer and clean up, possibly due to an error (indicated
- * by a non-zero `status`). */
+/** *Layer sequence return function* (see glossary) - signalizes that the layer
+ * wants to stop processing of the buffer and clean up, possibly due to an error
+ * (indicated by a non-zero `status`). */
enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status);
-/** *Layer sequence return function* - signalizes that the current sequence
- * will continue in an asynchronous manner. The layer should store the context
- * and call another sequence return function at another point. This may be used
- * in layers that work through libraries whose operation is asynchronous, like
- * GnuTLS.
+/** *Layer sequence return function* (see glossary) - signalizes that the
+ * current sequence will continue in an asynchronous manner. The layer should
+ * store the context and call another sequence return function at another point.
+ * This may be used in layers that work through libraries whose operation is
+ * asynchronous, like GnuTLS.
*
- * Note that this return function is just a readability hint - another return
- * function may be called in another stack frame before it (generally during a
- * call to an external library function, e.g. GnuTLS or nghttp2) and the
- * sequence will continue correctly. */
+ * Note that this one is basically just a readability hint - another return
+ * function may be actually called before it (generally during a call to an
+ * external library function, e.g. GnuTLS or nghttp2). This is completely legal
+ * and the sequence will continue correctly. */
static inline enum protolayer_iter_cb_result protolayer_async(void)
{
return PROTOLAYER_ITER_CB_RESULT_MAGIC;
/** Communication information. Typically written into by one of the
* first layers facilitating transport protocol processing.
* Zero-initialized by default. */
- struct comm_info comm;
+ struct comm_info comm_storage;
/** Time of last IO activity (if any occurs). Otherwise session
* creation time. */
* indicating an error. */
int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
const struct comm_info *comm, protolayer_finished_cb cb,
- void *baton);
+ void *baton);
/** Same as `session2_unwrap`, but looks up the specified `protocol` in the
* session's assigned protocol group and sends the `payload` to the layer that
int ret = 0;
- if (comm == NULL)
- comm = &session->comm;
-
if (pkt == NULL)
pkt = worker_task_get_pktbuf(task);
break;
}
- ret = worker_submit(session, &ctx->comm, pkt);
+ ret = worker_submit(session, ctx->comm, pkt);
if (ret)
break;
}
if (!pkt)
return protolayer_break(ctx, KNOT_EMALF);
- int ret = worker_submit(session, &ctx->comm, pkt);
+ int ret = worker_submit(session, ctx->comm, pkt);
mp_flush(the_worker->pkt_pool.ctx);
return protolayer_break(ctx, ret);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
if (!pkt)
return protolayer_break(ctx, KNOT_EMALF);
- int ret = worker_submit(session, &ctx->comm, pkt);
+ int ret = worker_submit(session, ctx->comm, pkt);
wire_buf_reset(ctx->payload.wire_buf);
mp_flush(the_worker->pkt_pool.ctx);
return protolayer_break(ctx, ret);
if (stream_sess->single && stream_sess->produced) {
if (kr_log_is_debug(WORKER, NULL)) {
kr_log_debug(WORKER, "Unexpected extra data from %s\n",
- kr_straddr(ctx->comm.src_addr));
+ kr_straddr(ctx->comm->src_addr));
}
status = KNOT_EMALF;
goto exit;
}
stream_sess->produced = true;
- int ret = worker_submit(session, &ctx->comm, pkt);
+ int ret = worker_submit(session, ctx->comm, pkt);
/* Errors from worker_submit() are intentionally *not* handled
* in order to ensure the entire wire buffer is processed. */