/** Gets layer-specific session data for the layer with the specified index
* from the manager. */
static inline struct protolayer_data *protolayer_sess_data_get(
- struct protolayer_manager *m, size_t layer_ix)
+ struct session2 *s, size_t layer_ix)
{
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
if (kr_fails_assert(layer_ix < grp->num_layers))
return NULL;
- /* See doc comment of `struct protolayer_manager::data` */
- const ssize_t *offsets = (ssize_t *)m->data;
- char *pl_data_beg = &m->data[2 * grp->num_layers * sizeof(*offsets)];
+ /* See doc comment of `struct session2::layer_data` */
+ const ssize_t *offsets = (ssize_t *)s->layer_data;
+ char *pl_data_beg = &s->layer_data[2 * grp->num_layers * sizeof(*offsets)];
ssize_t offset = offsets[layer_ix];
if (offset < 0) /* No session data for this layer */
static inline struct protolayer_data *protolayer_iter_data_get(
struct protolayer_iter_ctx *ctx, size_t layer_ix)
{
- struct protolayer_manager *m = ctx->manager;
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ struct session2 *s = ctx->session;
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
if (kr_fails_assert(layer_ix < grp->num_layers))
return NULL;
- /* See doc comment of `struct protolayer_manager::data` */
- const ssize_t *offsets = (ssize_t *)&m->data[grp->num_layers * sizeof(*offsets)];
+ /* See doc comment of `struct session2::layer_data` */
+ const ssize_t *offsets = (ssize_t *)&s->layer_data[grp->num_layers * sizeof(*offsets)];
ssize_t offset = offsets[layer_ix];
if (offset < 0) /* No iteration data for this layer */
return (struct protolayer_data *)(ctx->data + offset);
}
-static inline ssize_t protolayer_manager_get_protocol(
- struct protolayer_manager *m, enum protolayer_type protocol)
+static inline ssize_t session2_get_protocol(
+ struct session2 *s, enum protolayer_type protocol)
{
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (ssize_t i = 0; i < grp->num_layers; i++) {
enum protolayer_type found = grp->layers[i];
if (protocol == found)
static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
{
unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP)
- ? protolayer_grps[ctx->manager->proto].num_layers - 1
+ ? protolayer_grps[ctx->session->proto].num_layers - 1
: 0;
return ctx->layer_ix == last_ix;
}
static inline const char *layer_name_ctx(struct protolayer_iter_ctx *ctx)
{
- return layer_name(ctx->manager->proto, ctx->layer_ix);
+ return layer_name(ctx->session->proto, ctx->layer_ix);
}
static int protolayer_iter_ctx_finish(struct protolayer_iter_ctx *ctx, int ret)
{
- struct session2 *session = ctx->manager->session;
-
- struct protolayer_manager *m = ctx->manager;
- const struct protolayer_globals *globals = &protolayer_globals[m->proto];
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ struct session2 *s = ctx->session;
+ const struct protolayer_globals *globals = &protolayer_globals[s->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (size_t i = 0; i < grp->num_layers; i++) {
struct protolayer_data *d = protolayer_iter_data_get(ctx, i);
if (globals->iter_deinit)
- globals->iter_deinit(m, ctx, d);
+ globals->iter_deinit(ctx, d);
}
if (ret)
- VERBOSE_LOG(session, "layer context of group '%s' (on %u: %s) ended with return code %d\n",
- kr_proto_name(ctx->manager->proto),
+ VERBOSE_LOG(s, "layer context of group '%s' (on %u: %s) ended with return code %d\n",
+ kr_proto_name(s->proto),
ctx->layer_ix, layer_name_ctx(ctx), ret);
if (ctx->status)
- VERBOSE_LOG(session, "iteration of group '%s' (on %u: %s) ended with status %d\n",
- kr_proto_name(ctx->manager->proto),
+ VERBOSE_LOG(s, "iteration of group '%s' (on %u: %s) ended with status %d\n",
+ kr_proto_name(s->proto),
ctx->layer_ix, layer_name_ctx(ctx), ctx->status);
if (ctx->finished_cb)
- ctx->finished_cb(ret, session, &ctx->comm,
+ ctx->finished_cb(ret, s, &ctx->comm,
ctx->finished_cb_baton);
- free(ctx->async_buffer);
+ mm_ctx_delete(&ctx->pool);
free(ctx);
return ret;
/** Pushes the specified protocol layer's payload to the session's transport. */
static int protolayer_push(struct protolayer_iter_ctx *ctx)
{
- struct session2 *session = ctx->manager->session;
+ struct session2 *session = ctx->session;
if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
ctx->payload = protolayer_payload_as_buffer(&ctx->payload);
if (kr_fails_assert(buf_len))
return;
- void *buf = malloc(buf_len);
+ void *buf = mm_alloc(&ctx->pool, buf_len);
kr_require(buf);
protolayer_payload_copy(buf, &ctx->payload, buf_len);
- ctx->async_buffer = buf;
ctx->payload = protolayer_payload_buffer(buf, buf_len, false);
}
static int protolayer_step(struct protolayer_iter_ctx *ctx)
{
while (true) {
- if (kr_fails_assert(ctx->manager->proto < KR_PROTO_COUNT))
+ if (kr_fails_assert(ctx->session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);
- enum protolayer_type protocol = protolayer_grps[ctx->manager->proto].layers[ctx->layer_ix];
+ enum protolayer_type protocol = protolayer_grps[ctx->session->proto].layers[ctx->layer_ix];
struct protolayer_globals *globals = &protolayer_globals[protocol];
ctx->async_mode = false;
protolayer_iter_cb cb = (ctx->direction == PROTOLAYER_UNWRAP)
? globals->unwrap : globals->wrap;
- if (ctx->manager->session->closing) {
+ if (ctx->session->closing) {
return protolayer_iter_ctx_finish(
ctx, kr_error(ECANCELED));
}
if (cb) {
struct protolayer_data *sess_data = protolayer_sess_data_get(
- ctx->manager, ctx->layer_ix);
+ ctx->session, ctx->layer_ix);
struct protolayer_data *iter_data = protolayer_iter_data_get(
ctx, ctx->layer_ix);
enum protolayer_iter_cb_result result = cb(sess_data, iter_data, ctx);
/** Submits the specified buffer to the sequence of layers represented by the
* specified protolayer manager. The sequence will be processed in the
- * specified direction.
+ * specified `direction`, starting by the layer specified by `layer_ix`.
*
* Returns PROTOLAYER_RET_NORMAL when all layers have finished,
* PROTOLAYER_RET_ASYNC when some layers are asynchronous and waiting for
* continuation, or a negative number for errors (kr_error). */
-static int protolayer_manager_submit(
- struct protolayer_manager *manager,
+static int session2_submit(
+ struct session2 *session,
enum protolayer_direction direction, size_t layer_ix,
struct protolayer_payload payload, const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
- if (manager->session->closing)
+ if (session->closing)
return kr_error(ECANCELED);
- if (kr_fails_assert(manager->proto < KR_PROTO_COUNT))
+ if (kr_fails_assert(session->proto < KR_PROTO_COUNT))
return kr_error(EFAULT);
- struct protolayer_iter_ctx *ctx = malloc(manager->cb_ctx_size);
+ struct protolayer_iter_ctx *ctx = malloc(session->iter_ctx_size);
kr_require(ctx);
- VERBOSE_LOG(manager->session,
+ VERBOSE_LOG(session,
"%s submitted to grp '%s' in %s direction (%zu: %s)\n",
protolayer_payload_name(payload.type),
- kr_proto_name(manager->proto),
+ kr_proto_name(session->proto),
(direction == PROTOLAYER_UNWRAP) ? "unwrap" : "wrap",
- layer_ix, layer_name(manager->proto, layer_ix));
+ layer_ix, layer_name(session->proto, layer_ix));
*ctx = (struct protolayer_iter_ctx) {
.payload = payload,
- .comm = (comm) ? *comm : manager->session->comm,
+ .comm = (comm) ? *comm : session->comm,
.direction = direction,
.layer_ix = layer_ix,
- .manager = manager,
+ .session = session,
.finished_cb = cb,
.finished_cb_baton = baton
};
+ mm_ctx_mempool(&ctx->pool, CPU_PAGE_SIZE);
- const struct protolayer_grp *grp = &protolayer_grps[manager->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[session->proto];
for (size_t i = 0; i < grp->num_layers; i++) {
struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
struct protolayer_data *iter_data = protolayer_iter_data_get(ctx, i);
if (iter_data) {
memset(iter_data, 0, globals->iter_size);
- iter_data->session = manager->session;
+ iter_data->session = session;
}
if (globals->iter_init)
- globals->iter_init(manager, ctx, iter_data);
+ globals->iter_init(ctx, iter_data);
}
return protolayer_step(ctx);
return NULL;
}
-/** Allocates and initializes a new manager. */
-static struct protolayer_manager *protolayer_manager_new(
- struct session2 *s,
- enum kr_proto proto,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count)
-{
- if (kr_fails_assert(s && proto))
- return NULL;
-
- size_t manager_size = sizeof(struct protolayer_manager);
- size_t cb_ctx_size = sizeof(struct protolayer_iter_ctx);
-
- const struct protolayer_grp *grp = &protolayer_grps[proto];
- if (kr_fails_assert(grp->num_layers))
- return NULL;
-
- size_t wire_buf_length = 0;
- size_t wire_buf_max_length = 0;
- ssize_t offsets[2 * grp->num_layers];
- manager_size += sizeof(offsets);
-
- ssize_t *sess_offsets = offsets;
- ssize_t *iter_offsets = &offsets[grp->num_layers];
-
- /* Space for layer-specific data, guaranteeing alignment */
- size_t total_sess_data_size = 0;
- size_t total_iter_data_size = 0;
- for (size_t i = 0; i < grp->num_layers; i++) {
- const struct protolayer_globals *g = &protolayer_globals[grp->layers[i]];
-
- sess_offsets[i] = g->sess_size ? total_sess_data_size : -1;
- total_sess_data_size += ALIGN_TO(g->sess_size, CPU_STRUCT_ALIGN);
-
- iter_offsets[i] = g->iter_size ? total_iter_data_size : -1;
- total_iter_data_size += ALIGN_TO(g->iter_size, CPU_STRUCT_ALIGN);
-
- size_t wire_buf_overhead = (g->wire_buf_overhead_cb)
- ? g->wire_buf_overhead_cb(s->outgoing)
- : g->wire_buf_overhead;
- wire_buf_length += wire_buf_overhead;
- wire_buf_max_length += MAX(g->wire_buf_max_overhead, wire_buf_overhead);
- }
- manager_size += total_sess_data_size;
- cb_ctx_size += total_iter_data_size;
-
- /* Allocate and initialize manager */
- struct protolayer_manager *m = calloc(1, manager_size);
- kr_require(m);
- m->proto = proto;
- m->session = s;
- m->cb_ctx_size = cb_ctx_size;
- memcpy(m->data, offsets, sizeof(offsets));
-
- m->wire_buf_max_length = wire_buf_max_length;
- int ret = wire_buf_init(&m->wire_buf, wire_buf_length);
- kr_require(!ret);
-
- /* Initialize the layer's session data */
- for (size_t i = 0; i < grp->num_layers; i++) {
- struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
- struct protolayer_data *sess_data = protolayer_sess_data_get(m, i);
- if (sess_data) {
- memset(sess_data, 0, globals->sess_size);
- sess_data->session = s;
- }
-
- void *param = get_init_param(grp->layers[i], layer_param, layer_param_count);
- if (globals->sess_init)
- globals->sess_init(m, sess_data, param);
- }
-
- return m;
-}
-
-/** Deinitializes all layer data in the manager and deallocates it. */
-static void protolayer_manager_free(struct protolayer_manager *m)
-{
- if (!m) return;
-
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
- for (size_t i = 0; i < grp->num_layers; i++) {
- struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
- if (globals->sess_deinit) {
- struct protolayer_data *sess_data = protolayer_sess_data_get(m, i);
- globals->sess_deinit(m, sess_data);
- }
- }
-
- wire_buf_deinit(&m->wire_buf);
- free(m);
-}
-
enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx)
{
if (ctx->async_mode) {
struct session2 *session2_new(enum session2_transport_type transport_type,
- enum kr_proto layer_grp,
+ enum kr_proto proto,
struct protolayer_data_param *layer_param,
size_t layer_param_count,
bool outgoing)
{
- kr_require(transport_type && layer_grp);
+ kr_require(transport_type && proto);
+
+ size_t session_size = sizeof(struct session2);
+ size_t iter_ctx_size = sizeof(struct protolayer_iter_ctx);
+
+ const struct protolayer_grp *grp = &protolayer_grps[proto];
+ if (kr_fails_assert(grp->num_layers))
+ return NULL;
+
+ size_t wire_buf_length = 0;
+ ssize_t offsets[2 * grp->num_layers];
+ session_size += sizeof(offsets);
+
+ ssize_t *sess_offsets = offsets;
+ ssize_t *iter_offsets = &offsets[grp->num_layers];
+
+ /* Space for layer-specific data, guaranteeing alignment */
+ size_t total_sess_data_size = 0;
+ size_t total_iter_data_size = 0;
+ for (size_t i = 0; i < grp->num_layers; i++) {
+ const struct protolayer_globals *g = &protolayer_globals[grp->layers[i]];
+
+ sess_offsets[i] = g->sess_size ? total_sess_data_size : -1;
+ total_sess_data_size += ALIGN_TO(g->sess_size, CPU_STRUCT_ALIGN);
+
+ iter_offsets[i] = g->iter_size ? total_iter_data_size : -1;
+ total_iter_data_size += ALIGN_TO(g->iter_size, CPU_STRUCT_ALIGN);
- struct session2 *s = malloc(sizeof(*s));
+ size_t wire_buf_overhead = (g->wire_buf_overhead_cb)
+ ? g->wire_buf_overhead_cb(outgoing)
+ : g->wire_buf_overhead;
+ wire_buf_length += wire_buf_overhead;
+ }
+ session_size += total_sess_data_size;
+ iter_ctx_size += total_iter_data_size;
+
+ struct session2 *s = malloc(session_size);
kr_require(s);
*s = (struct session2) {
.log_id = next_log_id++,
.outgoing = outgoing,
.tasks = trie_create(NULL),
- };
- struct protolayer_manager *layers = protolayer_manager_new(s, layer_grp,
- layer_param, layer_param_count);
- if (!layers) {
- free(s);
- return NULL;
- }
- s->layers = layers;
+ .proto = proto,
+ .iter_ctx_size = iter_ctx_size,
+ };
- mm_ctx_mempool(&s->pool, CPU_PAGE_SIZE);
+ memcpy(&s->layer_data, offsets, sizeof(offsets));
queue_init(s->waiting);
+ int ret = wire_buf_init(&s->wire_buf, wire_buf_length);
+ kr_require(!ret);
- int ret = uv_timer_init(uv_default_loop(), &s->timer);
+ ret = uv_timer_init(uv_default_loop(), &s->timer);
kr_require(!ret);
s->timer.data = s;
s->uv_count++; /* Session owns the timer */
+ /* Initialize the layer's session data */
+ for (size_t i = 0; i < grp->num_layers; i++) {
+ struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
+ struct protolayer_data *sess_data = protolayer_sess_data_get(s, i);
+ if (sess_data) {
+ memset(sess_data, 0, globals->sess_size);
+ sess_data->session = s;
+ }
+
+ void *param = get_init_param(grp->layers[i], layer_param, layer_param_count);
+ if (globals->sess_init)
+ globals->sess_init(s, sess_data, param);
+ }
+
session2_touch(s);
return s;
* and timer are already closed, otherwise may leak resources. */
static void session2_free(struct session2 *s)
{
- protolayer_manager_free(s->layers);
- mm_ctx_delete(&s->pool);
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
+ for (size_t i = 0; i < grp->num_layers; i++) {
+ struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
+ if (globals->sess_deinit) {
+ struct protolayer_data *sess_data = protolayer_sess_data_get(s, i);
+ globals->sess_deinit(s, sess_data);
+ }
+ }
+
+ wire_buf_deinit(&s->wire_buf);
trie_free(s->tasks);
queue_deinit(s->waiting);
free(s);
const struct comm_info *comm, protolayer_finished_cb cb,
void *baton)
{
- return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, 0,
- payload, comm, cb, baton);
+ return session2_submit(s, PROTOLAYER_UNWRAP,
+ 0, payload, comm, cb, baton);
}
int session2_unwrap_after(struct session2 *s, enum protolayer_type protocol,
const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
- ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) + 1;
+ ssize_t layer_ix = session2_get_protocol(s, protocol) + 1;
if (layer_ix < 0)
return layer_ix;
- return protolayer_manager_submit(s->layers, PROTOLAYER_UNWRAP, layer_ix,
- payload, comm, cb, baton);
+ return session2_submit(s, PROTOLAYER_UNWRAP,
+ layer_ix, payload, comm, cb, baton);
}
int session2_wrap(struct session2 *s, struct protolayer_payload payload,
const struct comm_info *comm, protolayer_finished_cb cb,
void *baton)
{
- return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP,
- protolayer_grps[s->layers->proto].num_layers - 1,
+ return session2_submit(s, PROTOLAYER_WRAP,
+ protolayer_grps[s->proto].num_layers - 1,
payload, comm, cb, baton);
}
const struct comm_info *comm,
protolayer_finished_cb cb, void *baton)
{
- ssize_t layer_ix = protolayer_manager_get_protocol(s->layers, protocol) - 1;
+ ssize_t layer_ix = session2_get_protocol(s, protocol) - 1;
if (layer_ix < 0)
return layer_ix;
- return protolayer_manager_submit(s->layers, PROTOLAYER_WRAP, layer_ix,
+ return session2_submit(s, PROTOLAYER_WRAP, layer_ix,
payload, comm, cb, baton);
}
static void session2_event_wrap(struct session2 *s, enum protolayer_event_type event, void *baton)
{
bool cont;
- struct protolayer_manager *m = s->layers;
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (ssize_t i = grp->num_layers - 1; i >= 0; i--) {
struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
if (globals->event_wrap) {
- struct protolayer_data *sess_data = protolayer_sess_data_get(m, i);
- cont = globals->event_wrap(event, &baton, m, sess_data);
+ struct protolayer_data *sess_data = protolayer_sess_data_get(s, i);
+ cont = globals->event_wrap(event, &baton, s, sess_data);
} else {
cont = true;
}
void session2_event_unwrap(struct session2 *s, ssize_t start_ix, enum protolayer_event_type event, void *baton)
{
bool cont;
- struct protolayer_manager *m = s->layers;
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (ssize_t i = start_ix; i < grp->num_layers; i++) {
struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
if (globals->event_unwrap) {
- struct protolayer_data *sess_data = protolayer_sess_data_get(m, i);
- cont = globals->event_unwrap(event, &baton, m, sess_data);
+ struct protolayer_data *sess_data = protolayer_sess_data_get(s, i);
+ cont = globals->event_unwrap(event, &baton, s, sess_data);
} else {
cont = true;
}
void session2_event_after(struct session2 *s, enum protolayer_type protocol,
enum protolayer_event_type event, void *baton)
{
- ssize_t start_ix = protolayer_manager_get_protocol(s->layers, protocol);
+ ssize_t start_ix = session2_get_protocol(s, protocol);
if (kr_fails_assert(start_ix >= 0))
return;
session2_event_unwrap(s, start_ix + 1, event, baton);
void session2_init_request(struct session2 *s, struct kr_request *req)
{
- struct protolayer_manager *m = s->layers;
- const struct protolayer_grp *grp = &protolayer_grps[m->proto];
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
for (ssize_t i = 0; i < grp->num_layers; i++) {
struct protolayer_globals *globals = &protolayer_globals[grp->layers[i]];
if (globals->request_init) {
- struct protolayer_data *sess_data = protolayer_sess_data_get(m, i);
- globals->request_init(m, req, sess_data);
+ struct protolayer_data *sess_data = protolayer_sess_data_get(s, i);
+ globals->request_init(s, req, sess_data);
}
}
}
* layer-specific data, and internal information for the protocol layer
* manager. */
struct protolayer_iter_ctx {
-/* read-write: */
+/* read-write for layers: */
/** The payload */
struct protolayer_payload payload;
/** Communication information. Typically written into by one of the
* first layers facilitating transport protocol processing. */
struct comm_info comm;
+ /** Per-iter memory pool. Has no `free` procedure, gets freed as a whole
+ * when the context is being destroyed. Initialized and destroyed
+ * automatically - layers may use it to allocate memory. */
+ knot_mm_t pool;
-/* callback for when the layer iteration has ended - read-only: */
+/* callback for when the layer iteration has ended - read-only for layers: */
protolayer_finished_cb finished_cb;
void *finished_cb_baton;
-/* internal information for the manager - private: */
+/* internal information for the manager - should only be used by the protolayer
+ * system, never by layers: */
enum protolayer_direction direction;
+ /** If `true`, the processing of layers has been paused and is waiting
+ * to be resumed or canceled. */
bool async_mode;
+ /** The index of the layer that is currently being (or has just been)
+ * processed. */
unsigned int layer_ix;
- struct protolayer_manager *manager;
+ struct session2 *session;
+ /** Status passed to the finish callback. */
int status;
enum protolayer_iter_action action;
- void *async_buffer;
/** Contains a sequence of variably-sized CPU-aligned layer-specific
- * structs. See `struct protolayer_manager::data`. */
+ * structs. See `struct session2::layer_data` for details. */
alignas(CPU_STRUCT_ALIGN) char data[];
};
* stops. */
typedef enum protolayer_event_cb_result (*protolayer_event_cb)(
enum protolayer_event_type event, void **baton,
- struct protolayer_manager *manager, void *sess_data);
+ struct session2 *session, void *sess_data);
/** Function type for initialization callbacks of layer session data.
*
*
* Returning 0 means success, other return values mean error and halt the
* initialization. */
-typedef int (*protolayer_data_sess_init_cb)(struct protolayer_manager *manager,
- void *data,
- void *param);
+typedef int (*protolayer_data_sess_init_cb)(struct session2 *session,
+ void *data, void *param);
/** Function type for determining the size of a layer's wire buffer overhead. */
typedef size_t (*protolayer_wire_buf_overhead_cb)(bool outgoing);
*
* Returning 0 means success, other return values mean error and halt the
* initialization. */
-typedef int (*protolayer_iter_data_cb)(struct protolayer_manager *manager,
- struct protolayer_iter_ctx *ctx,
+typedef int (*protolayer_iter_data_cb)(struct protolayer_iter_ctx *ctx,
void *data);
/** Function type for (de)initialization callbacks of layers.
*
* Returning 0 means success, other return values mean error and halt the
* initialization. */
-typedef int (*protolayer_data_cb)(struct protolayer_manager *manager,
- void *data);
+typedef int (*protolayer_data_cb)(struct session2 *session, void *data);
/** Function type for (de)initialization callbacks of DNS requests.
*
* `req` points to the request for initialization.
* `sess_data` points to layer-specific session data struct. */
-typedef void (*protolayer_request_cb)(struct protolayer_manager *manager,
+typedef void (*protolayer_request_cb)(struct session2 *session,
struct kr_request *req,
void *sess_data);
-/** A collection of protocol layers and their layer-specific data, tied to a
- * session. The manager contains a sequence of protocol layers (determined by
- * `grp`), which define how the data processed by the session is to be
- * interpreted. */
-struct protolayer_manager {
- enum kr_proto proto;
- struct wire_buf wire_buf;
- size_t wire_buf_max_length;
- struct session2 *session;
- size_t cb_ctx_size; /**< Size of a single callback context, including
- * layer-specific per-iteration data. */
-
- /** The following flexible array has basically this structure:
- *
- * struct {
- * size_t sess_offsets[num_layers];
- * size_t iter_offsets[num_layers];
- * variably-sized-data sess_data[num_layers];
- * }
- *
- * It is done this way, because different layer groups will have
- * different numbers of layers and differently-sized layer-specific
- * data. C does not have a convenient way to define this in structs, so
- * we do it via this flexible array.
- *
- * `sess_data` is a sequence of variably-sized CPU-aligned
- * layer-specific structs.
- *
- * `sess_offsets` determines data offsets in `sess_data` for pointer
- * retrieval.
- *
- * `iter_offsets` determines data offsets in `struct
- * protolayer_iter_ctx::data` for pointer retrieval. */
- alignas(CPU_STRUCT_ALIGN) char data[];
-};
-
/** Initialization parameters for protocol layer session data. */
struct protolayer_data_param {
enum protolayer_type protocol; /**< Which protocol these parameters
/** A data unit for a single sequential data source. The data may be organized
* as a stream or a sequence of datagrams - this is up to the actual individual
- * protocols used by the session, as defined by the `layers` member - see
- * `struct protolayer_manager` and the types of its members for more info.
+ * protocols used by the session - see `enum kr_proto` and
+ * `protolayer_`-prefixed types and functions for more information.
*
* A session processes data in two directions:
*
};
} transport;
- struct protolayer_manager *layers; /**< Protocol layers of this session. */
- knot_mm_t pool;
uv_timer_t timer; /**< For session-wide timeout events. */
enum protolayer_event_type timer_event; /**< The event fired on timeout. */
trie_t *tasks; /**< List of tasks associated with given session. */
queue_t(struct qr_task *) waiting; /**< List of tasks waiting for
* sending to upstream. */
-
+ struct wire_buf wire_buf;
uint32_t log_id; /**< Session ID for logging. */
int uv_count; /**< Number of unclosed libUV handles owned by this
/** If true, session is being rate-limited. One of the protocol layers
* is going to be the writer for this flag. */
bool throttled : 1;
+
+ /* Protocol layers */
+
+ /** The set of protocol layers used by this session. */
+ enum kr_proto proto;
+ /** The size of a single iteration context
+ * (`struct protolayer_iter_ctx`), including layer-specific data. */
+ size_t iter_ctx_size;
+
+ /** The following flexible array has basically this structure:
+ *
+ * struct {
+ * size_t sess_offsets[num_layers];
+ * size_t iter_offsets[num_layers];
+ * variably-sized-data sess_data[num_layers];
+ * }
+ *
+ * It is done this way, because different layer groups will have
+ * different numbers of layers and differently-sized layer-specific
+ * data. C does not have a convenient way to define this in structs, so
+ * we do it via this flexible array.
+ *
+ * `sess_data` is a sequence of variably-sized CPU-aligned
+ * layer-specific structs.
+ *
+ * `sess_offsets` determines data offsets in `sess_data` for pointer
+ * retrieval.
+ *
+ * `iter_offsets` determines data offsets in `struct
+ * protolayer_iter_ctx::data` for pointer retrieval. */
+ alignas(CPU_STRUCT_ALIGN) char layer_data[];
};
/** Allocates and initializes a new session with the specified protocol layer
static enum protolayer_event_cb_result pl_dns_dgram_event_unwrap(
enum protolayer_event_type event, void **baton,
- struct protolayer_manager *manager, void *sess_data)
+ struct session2 *session, void *sess_data)
{
if (event != PROTOLAYER_EVENT_GENERAL_TIMEOUT)
return PROTOLAYER_EVENT_PROPAGATE;
- struct session2 *session = manager->session;
if (session2_tasklist_get_len(session) != 1 ||
!session2_waitinglist_is_empty(session))
return PROTOLAYER_EVENT_PROPAGATE;
static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
{
- struct session2 *session = ctx->manager->session;
+ struct session2 *session = ctx->session;
if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
int ret = kr_ok();
bool connected : 1; /**< True: The stream is connected */
};
-struct pl_dns_stream_iter_data {
- struct protolayer_data h;
- struct {
- knot_mm_t *pool;
- void *mem;
- } sent;
-};
-
-static int pl_dns_stream_sess_init(struct protolayer_manager *manager,
- void *sess_data, void *param)
+static int pl_dns_stream_sess_init(struct session2 *session,
+ void *sess_data, void *param)
{
/* _UNSIZED_STREAM and _MULTI_STREAM - don't forget to split if needed
* at some point */
- manager->session->stream = true;
+ session->stream = true;
return kr_ok();
}
-static int pl_dns_single_stream_sess_init(struct protolayer_manager *manager,
+static int pl_dns_single_stream_sess_init(struct session2 *session,
void *sess_data, void *param)
{
- manager->session->stream = true;
+ session->stream = true;
struct pl_dns_stream_sess_data *stream = sess_data;
stream->single = true;
return kr_ok();
}
-static int pl_dns_stream_iter_deinit(struct protolayer_manager *manager,
- struct protolayer_iter_ctx *ctx,
- void *iter_data)
-{
- struct pl_dns_stream_iter_data *stream = iter_data;
- mm_free(stream->sent.pool, stream->sent.mem);
- return kr_ok();
-}
-
static enum protolayer_event_cb_result pl_dns_stream_resolution_timeout(
struct session2 *s)
{
static enum protolayer_event_cb_result pl_dns_stream_event_unwrap(
enum protolayer_event_type event, void **baton,
- struct protolayer_manager *manager, void *sess_data)
+ struct session2 *session, void *sess_data)
{
- struct session2 *session = manager->session;
if (session->closing)
return PROTOLAYER_EVENT_PROPAGATE;
switch (event) {
case PROTOLAYER_EVENT_GENERAL_TIMEOUT:
- return pl_dns_stream_resolution_timeout(manager->session);
+ return pl_dns_stream_resolution_timeout(session);
case PROTOLAYER_EVENT_CONNECT_TIMEOUT:
- return pl_dns_stream_connection_fail(manager->session,
+ return pl_dns_stream_connection_fail(session,
KR_SELECTION_TCP_CONNECT_TIMEOUT);
case PROTOLAYER_EVENT_CONNECT:
enum kr_selection_error err = (*baton)
? *(enum kr_selection_error *)baton
: KR_SELECTION_TCP_CONNECT_FAILED;
- return pl_dns_stream_connection_fail(manager->session, err);
+ return pl_dns_stream_connection_fail(session, err);
case PROTOLAYER_EVENT_DISCONNECT:
case PROTOLAYER_EVENT_CLOSE:
}
int status = kr_ok();
- struct session2 *session = ctx->manager->session;
+ struct session2 *session = ctx->session;
struct pl_dns_stream_sess_data *stream_sess = sess_data;
struct wire_buf *wb = ctx->payload.wire_buf;
static enum protolayer_iter_cb_result pl_dns_stream_wrap(
void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
{
- struct pl_dns_stream_iter_data *stream = iter_data;
- struct session2 *s = ctx->manager->session;
-
- if (kr_fails_assert(!stream->sent.mem))
- return protolayer_break(ctx, kr_error(EINVAL));
-
if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
if (kr_fails_assert(ctx->payload.buffer.len <= UINT16_MAX))
return protolayer_break(ctx, kr_error(EMSGSIZE));
const int iovcnt = 2;
- struct sized_iovs *siov = mm_alloc(&s->pool,
+ struct sized_iovs *siov = mm_alloc(&ctx->pool,
sizeof(*siov) + iovcnt * sizeof(struct iovec));
kr_require(siov);
knot_wire_write_u16(siov->nlen, ctx->payload.buffer.len);
.iov_len = ctx->payload.buffer.len
};
- stream->sent.mem = siov;
- stream->sent.pool = &s->pool;
-
ctx->payload = protolayer_payload_iovec(siov->iovs, iovcnt, false);
return protolayer_continue(ctx);
} else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
const int iovcnt = 1 + ctx->payload.iovec.cnt;
- struct sized_iovs *siov = mm_alloc(&s->pool,
+ struct sized_iovs *siov = mm_alloc(&ctx->pool,
sizeof(*siov) + iovcnt * sizeof(struct iovec));
kr_require(siov);
.iov_len = sizeof(siov->nlen)
};
- stream->sent.mem = siov;
- stream->sent.pool = &s->pool;
-
ctx->payload = protolayer_payload_iovec(siov->iovs, iovcnt, false);
return protolayer_continue(ctx);
} else {
}
}
-static void pl_dns_stream_request_init(struct protolayer_manager *manager,
+static void pl_dns_stream_request_init(struct session2 *session,
struct kr_request *req,
void *sess_data)
{
};
const struct protolayer_globals stream_common = {
.sess_size = sizeof(struct pl_dns_stream_sess_data),
- .iter_size = sizeof(struct pl_dns_stream_iter_data),
.wire_buf_overhead = KNOT_WIRE_MAX_PKTSIZE,
.sess_init = NULL, /* replaced in specific layers below */
- .iter_deinit = pl_dns_stream_iter_deinit,
.unwrap = pl_dns_stream_unwrap,
.wrap = pl_dns_stream_wrap,
.event_unwrap = pl_dns_stream_event_unwrap,