static enum protolayer_cb_result pl_udp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
- if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
- /* events should not happen in UDP (currently) */
- return protolayer_continue(ctx);
- }
-
ctx->payload = protolayer_as_buffer(&ctx->payload);
if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER)) {
/* unsupported payload */
return 0;
}
-static enum protolayer_cb_result pl_tcp_unwrap_timeout(
- struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
-{
- struct session2 *s = ctx->manager->session;
-
- if (kr_fails_assert(!s->closing))
- return protolayer_continue(ctx);
-
- if (!session2_tasklist_is_empty(s)) {
- int finalized = session2_tasklist_finalize_expired(s);
- the_worker->stats.timeout += finalized;
- /* session2_tasklist_finalize_expired() may call worker_task_finalize().
- * If session is a source session and there were IO errors,
- * worker_task_finalize() can finalize all tasks and close session. */
- if (s->closing)
- return protolayer_continue(ctx);
- }
-
- if (!session2_tasklist_is_empty(s)) {
- session2_timer_stop(s);
- session2_timer_start(s,
- KR_RESOLVE_TIME_LIMIT / 2,
- KR_RESOLVE_TIME_LIMIT / 2);
- } else {
- /* Normally it should not happen,
- * but better to check if there anything in this list. */
- while (!session2_waitinglist_is_empty(s)) {
- struct qr_task *t = session2_waitinglist_pop(s, false);
- worker_task_finalize(t, KR_STATE_FAIL);
- worker_task_unref(t);
- the_worker->stats.timeout += 1;
- if (s->closing)
- return protolayer_continue(ctx);
- }
- uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
- uint64_t idle_time = kr_now() - s->last_activity;
- if (idle_time < idle_in_timeout) {
- idle_in_timeout -= idle_time;
- session2_timer_stop(s);
- session2_timer_start(s, idle_in_timeout, idle_in_timeout);
- } else {
- struct sockaddr *peer = session2_get_peer(s);
- char *peer_str = kr_straddr(peer);
- kr_log_debug(IO, "=> closing connection to '%s'\n",
- peer_str ? peer_str : "");
- if (s->outgoing) {
- worker_del_tcp_waiting(peer);
- worker_del_tcp_connected(peer);
- }
- session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
- }
- }
-
- return protolayer_continue(ctx);
-}
-
static enum protolayer_cb_result pl_tcp_unwrap(struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
struct session2 *s = ctx->manager->session;
struct pl_tcp_sess_data *tcp = protolayer_sess_data(layer);
struct sockaddr *peer = session2_get_peer(s);
- if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
- if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
- return pl_tcp_unwrap_timeout(layer, ctx);
-
- /* pass thru */
- return protolayer_continue(ctx);
- }
-
if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
const char *buf = ctx->payload.buffer.buf;
const size_t len = ctx->payload.buffer.len;
kr_straddr(peer));
}
worker_end_tcp(s);
- ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
- return protolayer_push(ctx);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+ return protolayer_break(ctx, ECONNRESET);
}
ssize_t trimmed = proxy_process_header(&tcp->proxy, data, data_len);
}
}
worker_end_tcp(s);
- ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
- return protolayer_push(ctx);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+ return protolayer_break(ctx, ECONNRESET);
} else if (trimmed == 0) {
- ctx->payload = protolayer_event_nd(PROTOLAYER_EVENT_CLOSE);
- return protolayer_push(ctx);
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+ return protolayer_break(ctx, ECONNRESET);
}
if (tcp->proxy.command != PROXY2_CMD_LOCAL && tcp->proxy.family != AF_UNSPEC) {
const void *target,
protolayer_finished_cb cb, void *baton);
static int session2_transport_event(struct session2 *s,
- struct protolayer_event event,
- const void *target,
- protolayer_finished_cb cb, void *baton);
+ enum protolayer_event_type event,
+ void *baton);
struct protolayer_globals protolayer_globals[PROTOLAYER_PROTOCOL_COUNT] = {0};
ctx->finished_cb(ret, session, ctx->finished_cb_target,
ctx->finished_cb_baton);
- /* events bounce back from unwrap to wrap */
- bool bounce_back = (ctx->direction == PROTOLAYER_UNWRAP
- && ret == PROTOLAYER_RET_NORMAL
- && !ctx->status
- && ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT);
- if (bounce_back)
- session2_wrap(session, ctx->payload, NULL, NULL, NULL);
-
free(ctx);
return ret;
if (kr_log_is_debug(PROTOLAYER, NULL)) {
const char *sess_dir = manager->session->outgoing ? "out" : "in";
- const char *event_name = (payload.type == PROTOLAYER_PAYLOAD_EVENT)
- ? protolayer_event_names[payload.event.type]
- : "";
- const char *event_space = (payload.type == PROTOLAYER_PAYLOAD_EVENT) ? " " : "";
- kr_log_debug(PROTOLAYER, "[%s] %s%s%s submitted to grp '%s' in %s direction\n",
+ kr_log_debug(PROTOLAYER, "[%s] %s submitted to grp '%s' in %s direction\n",
sess_dir,
protolayer_payload_names[payload.type],
- event_space, event_name,
protolayer_grp_names[manager->grp],
(direction == PROTOLAYER_UNWRAP) ? "unwrap" : "wrap");
}
ret = session2_transport_pushv(session,
ctx->payload.iovec.iov, ctx->payload.iovec.cnt,
ctx->target, protolayer_push_finished, ctx);
- } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
- ret = session2_transport_event(session,
- ctx->payload.event,
- ctx->target, protolayer_push_finished, ctx);
} else {
kr_assert(false && "Invalid payload type");
ret = kr_error(EINVAL);
payload, target, cb, baton);
}
+static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton)
+{
+ bool cont;
+ struct protolayer_manager *m = s->layers;
+ for (ssize_t i = m->num_layers - 1; i >= 0; i--) {
+ struct protolayer_data *data = protolayer_manager_get(m, i);
+ struct protolayer_globals *globals = &protolayer_globals[data->protocol];
+ if (globals->event_wrap)
+ cont = globals->event_wrap(event, &baton, m, data);
+ else
+ cont = true;
+
+ if (!cont)
+ return;
+ }
+
+ session2_transport_event(s, event, baton);
+}
+
+void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton)
+{
+ bool cont;
+ struct protolayer_manager *m = s->layers;
+ for (ssize_t i = 0; i < m->num_layers; i++) {
+ struct protolayer_data *data = protolayer_manager_get(m, i);
+ struct protolayer_globals *globals = &protolayer_globals[data->protocol];
+ if (globals->event_unwrap)
+ cont = globals->event_unwrap(event, &baton, m, data);
+ else
+ cont = true;
+
+ if (!cont)
+ return;
+ }
+
+ /* Immediately bounce back in the `wrap` direction.
+ *
+ * TODO: This might be undesirable for cases with sub-sessions - the
+ * current idea is for the layers managing sub-sessions to just return
+ * `false` on `event_unwrap`, but a more "automatic" mechanism may be
+ * added when this is relevant, to make it less error-prone. */
+ session2_event_wrap(s, event, baton);
+}
+
struct parent_pushv_ctx {
struct session2 *session;
session2_transport_single_push_finished, ctx);
}
-struct event_ctx {
- struct session2 *session;
- protolayer_finished_cb cb;
- void *baton;
- const void *target;
-};
-
-static void session2_transport_io_event_finished(uv_handle_t *handle)
-{
- struct session2 *s = handle->data;
- struct event_ctx *ctx = s->data;
- if (ctx->cb)
- ctx->cb(kr_ok(), ctx->session, ctx->target, ctx->baton);
- free(ctx);
-}
-
-static void session2_transport_parent_event_finished(int status,
- struct session2 *session,
- const void *target,
- void *baton)
-{
- struct event_ctx *ctx = baton;
- if (ctx->cb)
- ctx->cb(status, ctx->session, target, ctx->baton);
- free(ctx);
-}
-
-static int session2_handle_close(struct session2 *s, uv_handle_t *handle,
- struct event_ctx *ctx)
+static int session2_handle_close(struct session2 *s, uv_handle_t *handle)
{
io_stop_read(handle);
- s->data = ctx;
- uv_close(handle, session2_transport_io_event_finished);
+ uv_close(handle, NULL);
return kr_ok();
}
static int session2_transport_event(struct session2 *s,
- struct protolayer_event event,
- const void *target,
- protolayer_finished_cb cb, void *baton)
+ enum protolayer_event_type event,
+ void *baton)
{
- if (s->closing) {
- if (cb)
- cb(kr_error(ESTALE), s, target, baton);
+ if (s->closing)
return kr_ok();
- }
- bool is_close_event = (event.type == PROTOLAYER_EVENT_CLOSE ||
- event.type == PROTOLAYER_EVENT_FORCE_CLOSE);
+ bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE ||
+ event == PROTOLAYER_EVENT_FORCE_CLOSE);
if (is_close_event) {
kr_require(session2_is_empty(s));
session2_timer_stop(s);
s->closing = true;
}
- struct event_ctx *ctx = malloc(sizeof(*ctx));
- kr_require(ctx);
- *ctx = (struct event_ctx){
- .session = s,
- .cb = cb,
- .baton = baton,
- .target = target
- };
-
switch (s->transport.type) {
case SESSION2_TRANSPORT_IO:;
uv_handle_t *handle = s->transport.io.handle;
if (kr_fails_assert(handle)) {
- free(ctx);
return kr_error(EINVAL);
}
if (is_close_event)
- return session2_handle_close(s, handle, ctx);
+ return session2_handle_close(s, handle);
- if (ctx->cb)
- ctx->cb(kr_ok(), s, target, baton);
-
- free(ctx);
return kr_ok();
case SESSION2_TRANSPORT_PARENT:;
- int ret = session2_wrap(s, protolayer_event(event), target,
- session2_transport_parent_event_finished, ctx);
- if (ret < 0) {
- free(ctx);
- return ret;
- }
-
+ session2_event_wrap(s, event, baton);
return kr_ok();
default:
kr_assert(false && "Invalid transport");
- free(ctx);
return kr_error(EINVAL);
}
}
typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
const void *target, void *baton);
+
#define PROTOLAYER_EVENT_MAP(XX) \
XX(CLOSE) /**< Signal to gracefully close the session -
* i.e. layers add their standard disconnection
extern char *protolayer_event_names[];
-/** Event, with optional auxiliary data. */
-struct protolayer_event {
- enum protolayer_event_type type;
- union {
- void *ptr;
- char raw[sizeof(void *)];
- } data; /**< Optional data supplied with the event.
- * May be used by a layer. */
-};
#define PROTOLAYER_PAYLOAD_MAP(XX) \
XX(BUFFER, "Buffer") \
XX(IOVEC, "IOVec") \
- XX(EVENT, "Event") \
XX(WIRE_BUF, "Wire buffer")
/** Defines whether the data for a `struct protolayer_cb_ctx` is represented
int cnt;
} iovec;
- /** Only valid if `type` is `_EVENT`. */
- struct protolayer_event event;
-
/** Only valid if `type` is `_WIRE_BUF`. */
struct wire_buf *wire_buf;
};
};
}
-/** Convenience function to get an event-type payload. */
-static inline struct protolayer_payload protolayer_event(struct protolayer_event event)
-{
- return (struct protolayer_payload){
- .type = PROTOLAYER_PAYLOAD_EVENT,
- .event = event
- };
-}
-
-/** Convenience function to get an event-type payload without auxiliary data. */
-static inline struct protolayer_payload protolayer_event_nd(enum protolayer_event_type event)
-{
- return (struct protolayer_payload){
- .type = PROTOLAYER_PAYLOAD_EVENT,
- .event = {
- .type = event
- }
- };
-}
-
/** Convenience function to get a wire-buf-type payload. */
static inline struct protolayer_payload protolayer_wire_buf(struct wire_buf *wire_buf)
{
PROTOLAYER_CB_RESULT_MAGIC = 0x364F392E,
};
+/** Function type for `wrap` and `unwrap` callbacks of layers. Return value
+ * determines the flow of iteration; see the enum docs for more info. */
typedef enum protolayer_cb_result (*protolayer_cb)(
struct protolayer_data *layer, struct protolayer_cb_ctx *ctx);
+
+/** Function type for `event_wrap` and `event_unwrap` callbacks of layers.
+ * `baton` always points to some memory; it may be modified accommodate for
+ * the behaviour of the next layer in the sequence.
+ *
+ * When `true` is returned, iteration proceeds as normal. When `false` is
+ * returned, iteration stops. */
+typedef bool (*protolayer_event_cb)(enum protolayer_event_type event,
+ void **baton,
+ struct protolayer_manager *manager,
+ struct protolayer_data *layer);
+
+/** Function type for (de)initialization callbacks of layers.
+ *
+ * Returning 0 means success, other return values mean error and halt the
+ * initialization. */
typedef int (*protolayer_data_cb)(struct protolayer_manager *manager,
struct protolayer_data *layer);
protolayer_data_cb iter_deinit; /**< Called at the end of a layer
* sequence to deinitialize
* layer-specific iteration data. */
+
protolayer_cb unwrap; /**< Strips the buffer of protocol-specific
* data. E.g. a HTTP layer removes HTTP
* status and headers. */
protolayer_cb wrap; /**< Wraps the buffer into protocol-specific
* data. E.g. a HTTP layer adds HTTP status
* and headers. */
+
+ protolayer_event_cb event_unwrap; /**< Processes events in the unwrap order. */
+ protolayer_event_cb event_wrap; /**< Processes events in the wrap order. */
};
/** Global data about layered protocols. Indexed by `enum protolayer_protocol`. */
/** Resets the valid bytes of the buffer to zero, as well as the error flag. */
int wire_buf_reset(struct wire_buf *wb);
-static void *wire_buf_data(const struct wire_buf *wb)
+static inline void *wire_buf_data(const struct wire_buf *wb)
{
return &wb->buf[wb->start];
}
-static size_t wire_buf_data_length(const struct wire_buf *wb)
+static inline size_t wire_buf_data_length(const struct wire_buf *wb)
{
return wb->end - wb->start;
}
-static void *wire_buf_free_space(const struct wire_buf *wb)
+static inline void *wire_buf_free_space(const struct wire_buf *wb)
{
return &wb->buf[wb->end];
}
-static size_t wire_buf_free_space_length(const struct wire_buf *wb)
+static inline size_t wire_buf_free_space_length(const struct wire_buf *wb)
{
return wb->size - wb->end;
}
int session2_wrap(struct session2 *s, struct protolayer_payload payload,
const void *target, protolayer_finished_cb cb, void *baton);
-/** Convenience function to send the specified event to be processed in the
- * `unwrap` direction. `data` may be `NULL`.
- *
- * See `session2_unwrap` for more information. */
-static inline int session2_event(struct session2 *s,
- enum protolayer_event_type type, void *data)
-{
- struct protolayer_event event = {
- .type = type,
- .data = { .ptr = data }
- };
- return session2_unwrap(s, protolayer_event(event), NULL, NULL, NULL);
-}
+/** Sends an event to be synchronously processed by the protocol layers of the
+ * specified session. The layers are first iterated through in the `_UNWRAP`
+ * direction, then bounced back in the `_WRAP` direction. */
+void session2_event(struct session2 *s, enum protolayer_event_type type, void *baton);
/** Removes the specified request task from the session's tasklist. The session
* must be outgoing. If the session is UDP, a signal to close is also sent to it. */
return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool);
}
-static enum protolayer_cb_result pl_dns_dgram_unwrap_timeout(
- struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+static bool pl_dns_dgram_event_unwrap(enum protolayer_event_type event,
+ void **baton,
+ struct protolayer_manager *manager,
+ struct protolayer_data *layer)
{
- struct session2 *session = ctx->manager->session;
- kr_assert(session2_get_handle(session)->data == session);
- kr_assert(session2_tasklist_get_len(session) == 1);
- kr_assert(session2_waitinglist_is_empty(session));
+ if (event != PROTOLAYER_EVENT_TIMEOUT)
+ return true;
+
+ struct session2 *session = manager->session;
+ if (session2_tasklist_get_len(session) != 1 ||
+ !session2_waitinglist_is_empty(session))
+ return true;
session2_timer_stop(session);
struct qr_task *task = session2_tasklist_get_first(session);
if (!task)
- return protolayer_continue(ctx);
+ return true;
if (task->leading && task->pending_count > 0) {
struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
the_worker->stats.timeout += 1;
qr_task_step(task, NULL, NULL);
- return protolayer_continue(ctx);
+ return true;
}
static enum protolayer_cb_result pl_dns_dgram_unwrap(
struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
-
- if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
- if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
- return pl_dns_dgram_unwrap_timeout(layer, ctx);
-
- /* pass thru */
- return protolayer_continue(ctx);
- }
-
struct session2 *session = ctx->manager->session;
if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
return kr_ok();
}
-static enum protolayer_cb_result pl_dns_stream_unwrap_timeout(
- struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+static bool pl_dns_stream_resolution_timeout(struct session2 *s)
{
- struct session2 *session = ctx->manager->session;
- if (session->connected || session->closing)
- return protolayer_continue(ctx);
+ if (kr_fails_assert(!s->closing))
+ return true;
+
+ if (!session2_tasklist_is_empty(s)) {
+ int finalized = session2_tasklist_finalize_expired(s);
+ the_worker->stats.timeout += finalized;
+ /* session2_tasklist_finalize_expired() may call worker_task_finalize().
+ * If session is a source session and there were IO errors,
+ * worker_task_finalize() can finalize all tasks and close session. */
+ if (s->closing)
+ return true;
+ }
+
+ if (!session2_tasklist_is_empty(s)) {
+ session2_timer_stop(s);
+ session2_timer_start(s,
+ KR_RESOLVE_TIME_LIMIT / 2,
+ KR_RESOLVE_TIME_LIMIT / 2);
+ } else {
+ /* Normally it should not happen,
+ * but better to check if there anything in this list. */
+ while (!session2_waitinglist_is_empty(s)) {
+ struct qr_task *t = session2_waitinglist_pop(s, false);
+ worker_task_finalize(t, KR_STATE_FAIL);
+ worker_task_unref(t);
+ the_worker->stats.timeout += 1;
+ if (s->closing)
+ return true;
+ }
+ uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
+ uint64_t idle_time = kr_now() - s->last_activity;
+ if (idle_time < idle_in_timeout) {
+ idle_in_timeout -= idle_time;
+ session2_timer_stop(s);
+ session2_timer_start(s, idle_in_timeout, idle_in_timeout);
+ } else {
+ struct sockaddr *peer = session2_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_debug(IO, "=> closing connection to '%s'\n",
+ peer_str ? peer_str : "");
+ if (s->outgoing) {
+ worker_del_tcp_waiting(peer);
+ worker_del_tcp_connected(peer);
+ }
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+ }
+ }
+
+ return true;
+}
- /* Connection timeout */
+static bool pl_dns_stream_connection_timeout(struct session2 *session)
+{
session2_timer_stop(session);
kr_assert(session2_tasklist_is_empty(session));
const char *peer_str = kr_straddr(peer);
VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
peer_str ? peer_str : "");
- return protolayer_continue(ctx);
+ return true;
}
struct kr_query *qry = task_get_last_pending_query(task);
* If no, most likely this is timed out connection even if
* it was successful. */
- return protolayer_continue(ctx);
+ return true;
}
-static enum protolayer_cb_result pl_dns_stream_unwrap(
- struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+static bool pl_dns_stream_event_unwrap(enum protolayer_event_type event,
+ void **baton,
+ struct protolayer_manager *manager,
+ struct protolayer_data *layer)
{
- if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
- if (ctx->payload.event.type == PROTOLAYER_EVENT_TIMEOUT)
- return pl_dns_stream_unwrap_timeout(layer, ctx);
+ struct session2 *session = manager->session;
+ if (session->closing || event != PROTOLAYER_EVENT_TIMEOUT)
+ return true;
- /* pass thru */
- return protolayer_continue(ctx);
- }
+ if (session->connected)
+ return pl_dns_stream_resolution_timeout(manager->session);
+ else
+ return pl_dns_stream_connection_timeout(manager->session);
+}
+static enum protolayer_cb_result pl_dns_stream_unwrap(
+ struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
+{
if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) {
/* DNS stream only works with a wire buffer */
return protolayer_break(ctx, kr_error(EINVAL));
static enum protolayer_cb_result pl_dns_stream_wrap(
struct protolayer_data *layer, struct protolayer_cb_ctx *ctx)
{
- if (ctx->payload.type == PROTOLAYER_PAYLOAD_EVENT) {
- /* pass thru */
- return protolayer_continue(ctx);
- }
-
struct pl_dns_stream_iter_data *stream = protolayer_iter_data(layer);
struct session2 *s = ctx->manager->session;
/* DNS protocol layers */
protolayer_globals[PROTOLAYER_DNS_DGRAM] = (struct protolayer_globals){
.unwrap = pl_dns_dgram_unwrap,
+ .event_unwrap = pl_dns_dgram_event_unwrap
};
const struct protolayer_globals stream_common = {
.sess_size = sizeof(struct pl_dns_stream_sess_data),
.iter_init = pl_dns_stream_iter_init,
.iter_deinit = pl_dns_stream_iter_deinit,
.unwrap = pl_dns_stream_unwrap,
- .wrap = pl_dns_stream_wrap
+ .wrap = pl_dns_stream_wrap,
+ .event_unwrap = pl_dns_stream_event_unwrap
};
protolayer_globals[PROTOLAYER_DNS_MSTREAM] = stream_common;
protolayer_globals[PROTOLAYER_DNS_MSTREAM].sess_init = pl_dns_mstream_sess_init;