--- /dev/null
+/* Copyright (C) CZ.NIC, z.s.p.o. <knot-resolver@labs.nic.cz>
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+/* HINT: If you are looking to implement a new protocol, start with the doc
+ * comment of the `PROTOLAYER_PROTOCOL_MAP` macro and continue from there. */
+
+/* GLOSSARY:
+ *
+ * Event:
+ * - An Event may be processed by the protocol layer sequence much like a
+ * Payload, but with a special callback. Events may be used to notify layers
+ * that e.g. a connection has been established; a timeout has occurred; a
+ * malformed packet has been received, etc. Events are generally not sent
+ * through the transport - they may, however, trigger a new payload to be
+ * sent, e.g. a HTTP error status response.
+ *
+ * Iteration:
+ * - The processing of Payload data or an event using a particular sequence
+ * of Protocol layers, either in Wrap or Unwrap direction. For payload
+ * processing, it is also the lifetime of `struct protolayer_iter_ctx` and
+ * layer-specific data contained therein.
+ *
+ * Payload:
+ * - Data processed by protocol layers in a particular sequence. In the wrap
+ * direction, this data generally starts as a DNS packet, which is then
+ * wrapped in protocol ceremony data by each layer. In the unwrap direction,
+ * the opposite takes place - ceremony data is removed until a raw DNS packet
+ * is retrieved.
+ *
+ * Protocol layer:
+ * - An implementation of a particular protocol. A layer transforms payloads
+ * to conform to a particular protocol, e.g. UDP, TCP, TLS, HTTP, QUIC, etc.
+ * While transforming a payload, a layer may also modify metadata - e.g. the
+ * UDP and TCP layers in the Unwrap direction implement the PROXYv2 protocol,
+ * using which they retrieve the IP address of the actual originating client
+ * and store it in the appropriate struct.
+ *
+ * Protolayer:
+ * - Same as 'Protocol layer'.
+ *
+ * Unwrap:
+ * - The direction of data transformation, starting with the transport (e.g.
+ * data that came from the network), ending with an internal subsystem (e.g.
+ * DNS query resolution).
+ *
+ * Wrap:
+ * - The direction of data transformation, starting with an internal
+ * subsystem (e.g. an answer to a resolved DNS query), ending with the
+ * transport (e.g. data that is going to be sent to the client). */
+
+#pragma once
+
+#include <stdalign.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <uv.h>
+
+#include "contrib/mempattern.h"
+#include "lib/generic/queue.h"
+#include "lib/generic/trie.h"
+#include "lib/utils.h"
+
+/* Forward declarations */
+struct session2;
+struct protolayer_iter_ctx;
+
+/** Type of MAC addresses. */
+typedef uint8_t ethaddr_t[6];
+
+/** Information about the transport - addresses and proxy. */
+struct comm_info {
+ /** The original address the data came from. May be that of a proxied
+ * client, if they came through a proxy. May be `NULL` if
+ * the communication did not come from network. */
+ const struct sockaddr *src_addr;
+
+ /** The actual address the resolver is communicating with. May be
+ * the address of a proxy if the communication came through one,
+ * otherwise it will be the same as `src_addr`. May be `NULL` if
+ * the communication did not come from network. */
+ const struct sockaddr *comm_addr;
+
+ /** The original destination address. May be the resolver's address, or
+ * the address of a proxy if the communication came through one. May be
+ * `NULL` if the communication did not come from network. */
+ const struct sockaddr *dst_addr;
+
+ /** Data parsed from a PROXY header. May be `NULL` if the communication
+ * did not come through a proxy, or if the PROXYv2 protocol was not used. */
+ const struct proxy_result *proxy;
+
+ /** Pointer to protolayer-specific data, e.g. a key to decide, which
+ * sub-session to use. */
+ void *target;
+
+ /* XDP data */
+ ethaddr_t eth_from;
+ ethaddr_t eth_to;
+ bool xdp:1;
+};
+
+/** Protocol layer types map - an enumeration of individual protocol layer
+ * implementations
+ *
+ * This macro is used to generate `enum protolayer_protocol` as well as other
+ * additional data on protocols, e.g. name string constants.
+ *
+ * To define a new protocol, add a new identifier to this macro, and, within
+ * some logical compilation unit (e.g. `daemon/worker.c` for DNS layers),
+ * initialize the protocol's `protolayer_globals[]`, ideally in a function
+ * called at the start of the program (e.g. `worker_init()`). See the docs of
+ * `struct protolayer_globals` for details on what data this structure should
+ * contain.
+ *
+ * To use protocols within sessions, protocol layer groups also need to be
+ * defined, to indicate the order in which individual protocols are to be
+ * processed. See `PROTOLAYER_GRP_MAP` below for more details. */
+#define PROTOLAYER_PROTOCOL_MAP(XX) \
+ /* General transport protocols */\
+ XX(UDP)\
+ XX(TCP)\
+ XX(TLS)\
+ XX(HTTP)\
+ \
+ /* DNS (`worker`) */\
+ XX(DNS_DGRAM) /**< Packets WITHOUT prepended size, one per (un)wrap,
+ * limited to UDP sizes, multiple sources (single
+ * session for multiple clients). */\
+ XX(DNS_UNSIZED_STREAM) /**< Singular packet WITHOUT prepended size, one
+ * per (un)wrap, no UDP limits, single source. */\
+ XX(DNS_MULTI_STREAM) /**< Multiple packets WITH prepended sizes in a
+ * stream (may span multiple (un)wraps). */\
+ XX(DNS_SINGLE_STREAM) /**< Singular packet WITH prepended size in a
+ * stream (may span multiple (un)wraps). */
+
+/** The identifiers of protocol layer types. */
+enum protolayer_protocol {
+ PROTOLAYER_NULL = 0,
+#define XX(cid) PROTOLAYER_##cid,
+ PROTOLAYER_PROTOCOL_MAP(XX)
+#undef XX
+ PROTOLAYER_PROTOCOL_COUNT /* must be the last! */
+};
+
+/** Maps protocol layer type IDs to string names.
+ * E.g. PROTOLAYER_HTTP has name 'HTTP'. */
+extern const char *protolayer_protocol_names[];
+
+/** Protocol layer group map
+ *
+ * This macro is used to generate `enum protolayer_grp` as well as other
+ * additional data on protocol layer groups, e.g. name string constants.
+ *
+ * Each group represents a sequence of layers in the unwrap direction (wrap
+ * direction being the opposite). The sequence dictates the order in which
+ * individual layers are processed. This macro is used to generate global data
+ * about groups.
+ *
+ * For defining new groups, see the docs of `protolayer_grps[]` in
+ * `daemon/session2.h`.
+ *
+ * Parameters are:
+ * 1. Constant name (for e.g. PROTOLAYER_GRP_* enum values)
+ * 2. Variable name (for e.g. protolayer_grp_* arrays - in `session2.c`)
+ * 3. Human-readable name for logging */
+#define PROTOLAYER_GRP_MAP(XX) \
+ XX(DOUDP, doudp, "DNS UDP") \
+ XX(DOTCP, dotcp, "DNS TCP") \
+ XX(DOTLS, dot, "DNS-over-TLS") \
+ XX(DOHTTPS, doh, "DNS-over-HTTPS")
+
+/** The identifiers of pre-defined protocol layer sequences. */
+enum protolayer_grp {
+ PROTOLAYER_GRP_NULL = 0,
+#define XX(cid, vid, name) PROTOLAYER_GRP_##cid,
+ PROTOLAYER_GRP_MAP(XX)
+#undef XX
+ PROTOLAYER_GRP_COUNT
+};
+
+/** Maps protocol layer group IDs to human-readable descriptions.
+ * E.g. PROTOLAYER_GRP_DOH has description 'DNS-over-HTTPS'. */
+extern const char *protolayer_grp_names[];
+
+/** Flow control indicators for protocol layer `wrap` and `unwrap` callbacks.
+ * Use via `protolayer_continue`, `protolayer_break`, and `protolayer_push`
+ * functions. */
+enum protolayer_iter_action {
+ PROTOLAYER_ITER_ACTION_NULL = 0,
+
+ PROTOLAYER_ITER_ACTION_CONTINUE,
+ PROTOLAYER_ITER_ACTION_BREAK,
+};
+
+/** Direction of layer sequence processing. */
+enum protolayer_direction {
+ /** Processes buffers in order of layers as defined in the layer group.
+ * In this direction, protocol ceremony data should be removed from the
+ * buffer, parsing additional data provided by the protocol. */
+ PROTOLAYER_UNWRAP,
+
+ /** Processes buffers in reverse order of layers as defined in the
+ * layer group. In this direction, protocol ceremony data should be
+ * added. */
+ PROTOLAYER_WRAP,
+};
+
+enum protolayer_ret {
+ /** Returned when a protolayer context iteration has finished
+ * processing, i.e. with _BREAK. */
+ PROTOLAYER_RET_NORMAL = 0,
+
+ /** Returned when a protolayer context iteration is waiting for an
+ * asynchronous callback to a continuation function. This will never be
+ * passed to `protolayer_finished_cb`, only returned by
+ * `session2_unwrap` or `session2_wrap`. */
+ PROTOLAYER_RET_ASYNC,
+
+ /** Returned when a protolayer context iteration has ended on a layer
+ * that needs more data from another buffer. */
+ PROTOLAYER_RET_WAITING,
+};
+
+/** Called when a payload iteration (started by `session2_unwrap` or
+ * `session2_wrap`) has ended - i.e. the input buffer will not be processed any
+ * further.
+ *
+ * `status` may be one of `enum protolayer_ret` or a negative number indicating
+ * an error.
+ * `target` is the `target` parameter passed to the `session2_(un)wrap`
+ * function.
+ * `baton` is the `baton` parameter passed to the `session2_(un)wrap` function. */
+typedef void (*protolayer_finished_cb)(int status, struct session2 *session,
+ const struct comm_info *comm, void *baton);
+
+
+/** Protocol layer event type map
+ *
+ * This macro is used to generate `enum protolayer_event_type` as well as the
+ * relevant name string constants for each event type.
+ *
+ * Event types are used to distinguish different events that can be passed to
+ * sessions using `session2_event()`. */
+#define PROTOLAYER_EVENT_MAP(XX) \
+ XX(CLOSE) /**< Signal to gracefully close the session -
+ * i.e. layers add their standard disconnection
+ * ceremony (e.g. `gnutls_bye()`). */\
+ XX(FORCE_CLOSE) /**< Signal to forcefully close the
+ * session - i.e. layers SHOULD NOT add
+ * any disconnection ceremony, if
+ * avoidable. */\
+ XX(CONNECT_TIMEOUT) /**< Signal that a connection could not be
+ * established due to a timeout. */\
+ XX(GENERAL_TIMEOUT) /**< Signal that a general application-defined
+ * timeout has occurred. */\
+ XX(CONNECT) /**< Signal that a connection has been established. */\
+ XX(CONNECT_FAIL) /**< Signal that a connection could not have been
+ * established. */\
+ XX(MALFORMED) /**< Signal that a malformed request has been received. */\
+ XX(DISCONNECT) /**< Signal that a connection has ended. */\
+ XX(STATS_SEND_ERR) /**< Failed task send - update stats. */\
+ XX(STATS_QRY_OUT) /**< Outgoing query submission - update stats. */
+
+/** Event type, to be interpreted by a layer. */
+enum protolayer_event_type {
+ PROTOLAYER_EVENT_NULL = 0,
+#define XX(cid) PROTOLAYER_EVENT_##cid,
+ PROTOLAYER_EVENT_MAP(XX)
+#undef XX
+ PROTOLAYER_EVENT_COUNT
+};
+
+/** Maps event types to their names. */
+extern const char *protolayer_event_names[];
+
+
+/** Payload types.
+ *
+ * Parameters are:
+ * 1. Constant name
+ * 2. Human-readable name for logging */
+#define PROTOLAYER_PAYLOAD_MAP(XX) \
+ XX(BUFFER, "Buffer") \
+ XX(IOVEC, "IOVec") \
+ XX(WIRE_BUF, "Wire buffer")
+
+/** Determines which union member of `struct protolayer_payload` is currently
+ * valid. */
+enum protolayer_payload_type {
+ PROTOLAYER_PAYLOAD_NULL = 0,
+#define XX(cid, name) PROTOLAYER_PAYLOAD_##cid,
+ PROTOLAYER_PAYLOAD_MAP(XX)
+#undef XX
+ PROTOLAYER_PAYLOAD_COUNT
+};
+
+/** Maps payload type IDs to human-readable names. */
+extern const char *protolayer_payload_names[];
+
+/** Data processed by the sequence of layers. All pointed-to memory is always
+ * owned by its creator. It is also the layer (group) implementor's
+ * responsibility to keep data compatible in between layers. No payload memory
+ * is ever (de-)allocated by the protolayer manager! */
+struct protolayer_payload {
+ enum protolayer_payload_type type;
+ unsigned int ttl; /**< time-to-live hint (e.g. for HTTP Cache-Control) */
+ union {
+ /** Only valid if `type` is `_BUFFER`. */
+ struct {
+ void *buf;
+ size_t len;
+ } buffer;
+
+ /** Only valid if `type` is `_IOVEC`. */
+ struct {
+ struct iovec *iov;
+ int cnt;
+ } iovec;
+
+ /** Only valid if `type` is `_WIRE_BUF`. */
+ struct wire_buf *wire_buf;
+ };
+};
+
+/** Context for protocol layer iterations, containing payload data,
+ * layer-specific data, and internal information for the protocol layer
+ * manager. */
+struct protolayer_iter_ctx {
+/* read-write: */
+ /** The payload */
+ struct protolayer_payload payload;
+ /** Communication information. Typically written into by one of the
+ * first layers facilitating transport protocol processing. */
+ struct comm_info comm;
+
+/* callback for when the layer iteration has ended - read-only: */
+ protolayer_finished_cb finished_cb;
+ void *finished_cb_baton;
+
+/* internal information for the manager - private: */
+ enum protolayer_direction direction;
+ bool async_mode;
+ unsigned int layer_ix;
+ struct protolayer_manager *manager;
+ int status;
+ enum protolayer_iter_action action;
+
+ /** Contains a sequence of variably-sized CPU-aligned layer-specific
+ * structs. See `struct protolayer_manager::data`. */
+ alignas(CPU_STRUCT_ALIGN) char data[];
+};
+
+/** Gets the total size of the data in the specified payload. */
+size_t protolayer_payload_size(const struct protolayer_payload *payload);
+
+/** Copies the specified payload to `dest`. Only `max_len` or the size of the
+ * payload is written, whichever is less.
+ *
+ * Returns the actual length of copied data. */
+size_t protolayer_payload_copy(void *dest,
+ const struct protolayer_payload *payload,
+ size_t max_len);
+
+/** Convenience function to get a buffer-type payload. */
+static inline struct protolayer_payload protolayer_buffer(void *buf, size_t len)
+{
+ return (struct protolayer_payload){
+ .type = PROTOLAYER_PAYLOAD_BUFFER,
+ .buffer = {
+ .buf = buf,
+ .len = len
+ }
+ };
+}
+
+/** Convenience function to get an iovec-type payload. */
+static inline struct protolayer_payload protolayer_iovec(
+ struct iovec *iov, int iovcnt)
+{
+ return (struct protolayer_payload){
+ .type = PROTOLAYER_PAYLOAD_IOVEC,
+ .iovec = {
+ .iov = iov,
+ .cnt = iovcnt
+ }
+ };
+}
+
+/** Convenience function to get a wire-buf-type payload. */
+static inline struct protolayer_payload protolayer_wire_buf(struct wire_buf *wire_buf)
+{
+ return (struct protolayer_payload){
+ .type = PROTOLAYER_PAYLOAD_WIRE_BUF,
+ .wire_buf = wire_buf
+ };
+}
+
+/** Convenience function to represent the specified payload as a buffer-type.
+ * Supports only `_BUFFER` and `_WIRE_BUF` on the input, otherwise returns
+ * `_NULL` type or aborts on assertion if allowed.
+ *
+ * If the input payload is `_WIRE_BUF`, the pointed-to wire buffer is reset to
+ * indicate that all of its contents have been used up, and the buffer is ready
+ * to be reused. */
+struct protolayer_payload protolayer_as_buffer(const struct protolayer_payload *payload);
+
+/** A predefined queue type for iteration context. */
+typedef queue_t(struct protolayer_iter_ctx *) protolayer_iter_ctx_queue_t;
+
+/** Iterates through the specified `queue` and gets the sum of all payloads
+ * available in it. */
+size_t protolayer_queue_count_payload(const protolayer_iter_ctx_queue_t *queue);
+
+/** Mandatory header members for any layer-specific data. */
+#define PROTOLAYER_DATA_HEADER() struct {\
+ struct session2 *session; /**< Pointer to the owner session. */\
+}
+
+/** Layer-specific data - the generic struct. */
+struct protolayer_data {
+ PROTOLAYER_DATA_HEADER();
+};
+
+/** Return value of `protolayer_iter_cb` callbacks. To be returned by *layer
+ * sequence return functions* as a sanity check. Not to be used directly by
+ * user code. */
+enum protolayer_iter_cb_result {
+ PROTOLAYER_ITER_CB_RESULT_MAGIC = 0x364F392E,
+};
+
+/** Function type for `struct protolayer_globals::wrap` and `struct
+ * protolayer_globals::unwrap`. The function processes the provided
+ * `ctx->payload` and decides the next action for the currently processed
+ * sequence.
+ *
+ * The function (or another function, that the pointed-to function causes to be
+ * called, directly or through an asynchronous operation), must call one of the
+ * *layer sequence return functions* (e.g. `protolayer_continue()`,
+ * `protolayer_async()`, ...) to advance (or end) the layer sequence. The
+ * function must return the result of such a return function. */
+typedef enum protolayer_iter_cb_result (*protolayer_iter_cb)(
+ void *sess_data,
+ void *iter_data,
+ struct protolayer_iter_ctx *ctx);
+
+/** Function type for `struct protolayer_globals::event_wrap` and `struct
+ * protolayer_globals::event_unwrap` callbacks of layers. The `baton` parameter
+ * points to the mutable, iteration-specific baton pointer, initialized by the
+ * `baton` parameter of one of the `session2_event` functions. The pointed-to
+ * value of `baton` may be modified to accommodate for the next layer in the
+ * sequence.
+ *
+ * When `true` is returned, iteration over the sequence of layers continues.
+ * When `false` is returned, iteration stops. */
+typedef bool (*protolayer_event_cb)(enum protolayer_event_type event,
+ void **baton,
+ struct protolayer_manager *manager,
+ void *sess_data);
+
+/** Function type for initialization callbacks of layer session data.
+ *
+ * The `param` value is the one associated with the currently initialized
+ * layer, from the `layer_param` array of `session2_new()` - may be NULL if
+ * none is provided for the current layer.
+ *
+ * `data` points to the layer-specific data struct.
+ *
+ * Returning 0 means success, other return values mean error and halt the
+ * initialization. */
+typedef int (*protolayer_data_sess_init_cb)(struct protolayer_manager *manager,
+ void *data,
+ void *param);
+
+/** Function type for (de)initialization callback of layer iteration data.
+ *
+ * `ctx` points to the iteration context that `data` belongs to.
+ *
+ * `data` points to the layer-specific data struct.
+ *
+ * Returning 0 means success, other return values mean error and halt the
+ * initialization. */
+typedef int (*protolayer_iter_data_cb)(struct protolayer_manager *manager,
+ struct protolayer_iter_ctx *ctx,
+ void *data);
+
+/** Function type for (de)initialization callbacks of layers.
+ *
+ * `data` points to the layer-specific data struct.
+ *
+ * Returning 0 means success, other return values mean error and halt the
+ * initialization. */
+typedef int (*protolayer_data_cb)(struct protolayer_manager *manager,
+ void *data);
+
+/** Function type for (de)initialization callbacks of DNS requests.
+ *
+ * `req` points to the request for initialization.
+ * `sess_data` points to layer-specific session data struct. */
+typedef void (*protolayer_request_cb)(struct protolayer_manager *manager,
+ struct kr_request *req,
+ void *sess_data);
+
+/** A collection of protocol layers and their layer-specific data, tied to a
+ * session. The manager contains a sequence of protocol layers (determined by
+ * `grp`), which define how the data processed by the session is to be
+ * interpreted. */
+struct protolayer_manager {
+ enum protolayer_grp grp;
+ struct session2 *session;
+ size_t num_layers;
+ size_t cb_ctx_size; /**< Size of a single callback context, including
+ * layer-specific per-iteration data. */
+
+ /** The following flexible array has basically this structure:
+ *
+ * struct {
+ * size_t sess_offsets[num_layers];
+ * size_t iter_offsets[num_layers];
+ * variably-sized-data sess_data[num_layers];
+ * }
+ *
+ * It is done this way, because different layer groups will have
+ * different numbers of layers and differently-sized layer-specific
+ * data. C does not have a convenient way to define this in structs, so
+ * we do it via this flexible array.
+ *
+ * `sess_data` is a sequence of variably-sized CPU-aligned
+ * layer-specific structs.
+ *
+ * `sess_offsets` determines data offsets in `sess_data` for pointer
+ * retrieval.
+ *
+ * `iter_offsets` determines data offsets in `struct
+ * protolayer_iter_ctx::data` for pointer retrieval. */
+ alignas(CPU_STRUCT_ALIGN) char data[];
+};
+
+/** Initialization parameters for protocol layer session data. */
+struct protolayer_data_param {
+ enum protolayer_protocol protocol; /**< Which protocol these parameters
+ * are meant for. */
+ void *param; /**< Pointer to protolayer-related initialization
+ * parameters. Only needs to be valid during session
+ * initialization. */
+};
+
+/** Global data for a specific layered protocol. This is to be initialized in
+ * the `protolayer_globals` global array (below) during the the resolver's
+ * startup. It contains pointers to functions implementing a particular
+ * protocol, as well as other importand data.
+ *
+ * Every member of this struct is allowed to be zero/NULL if a particular
+ * protocol has no use for it. */
+struct protolayer_globals {
+ /** Size of the layer-specific data struct, valid per-session.
+ *
+ * The struct MUST begin with the `PROTOLAYER_DATA_HEADER()` macro. If
+ * no session struct is used by the layer, the value may be zero. */
+ size_t sess_size;
+
+ /** Size of the layer-specific data struct, valid per-iteration. It
+ * gets created and destroyed together with a `struct
+ * protolayer_iter_ctx`.
+ *
+ * The struct MUST begin with the `PROTOLAYER_DATA_HEADER()` macro. If
+ * no iteration struct is used by the layer, the value may be zero. */
+ size_t iter_size;
+
+ /** Called during session creation to initialize
+ * layer-specific session data. The data is always provided
+ * zero-initialized to this function. */
+ protolayer_data_sess_init_cb sess_init;
+
+ /** Called during session destruction to deinitialize
+ * layer-specific session data. */
+ protolayer_data_cb sess_deinit;
+
+ /** Called at the beginning of a non-event layer sequence to initialize
+ * layer-specific iteration data. The data is always zero-initialized
+ * during iteration context initialization. */
+ protolayer_iter_data_cb iter_init;
+
+ /** Called at the end of a non-event layer sequence to deinitialize
+ * layer-specific iteration data. */
+ protolayer_iter_data_cb iter_deinit;
+
+ /** Strips the buffer of protocol-specific data. E.g. a HTTP layer
+ * removes HTTP status and headers. Optional - iteration continues
+ * automatically if this is NULL. */
+ protolayer_iter_cb unwrap;
+
+ /** Wraps the buffer into protocol-specific data. E.g. a HTTP layer
+ * adds HTTP status and headers. Optional - iteration continues
+ * automatically if this is NULL. */
+ protolayer_iter_cb wrap;
+
+ /** Processes events in the unwrap order (sent from the outside).
+ * Optional - iteration continues automatically if this is NULL. */
+ protolayer_event_cb event_unwrap;
+
+ /** Processes events in the wrap order (bounced back by the session).
+ * Optional - iteration continues automatically if this is NULL. */
+ protolayer_event_cb event_wrap;
+
+ /** Modifies the provided request for use with the layer. Mostly for
+ * setting `struct kr_request::qsource.comm_flags`. */
+ protolayer_request_cb request_init;
+};
+
+/** Global data about layered protocols. Mapped by `enum protolayer_protocol`.
+ * Individual protocols are to be initialized during resolver startup. */
+extern struct protolayer_globals protolayer_globals[PROTOLAYER_PROTOCOL_COUNT];
+
+
+/** *Layer sequence return function* - signalizes the protolayer manager to
+ * continue processing the next layer. */
+enum protolayer_iter_cb_result protolayer_continue(struct protolayer_iter_ctx *ctx);
+
+/** *Layer sequence return function* - signalizes that the layer wants to stop
+ * processing of the buffer and clean up, possibly due to an error (indicated
+ * by a non-zero `status`). */
+enum protolayer_iter_cb_result protolayer_break(struct protolayer_iter_ctx *ctx, int status);
+
+/** *Layer sequence return function* - signalizes that the current sequence
+ * will continue in an asynchronous manner. The layer should store the context
+ * and call another sequence return function at another point. This may be used
+ * in layers that work through libraries whose operation is asynchronous, like
+ * GnuTLS.
+ *
+ * Note that this return function is just a readability hint - another return
+ * function may be called in another stack frame before it (generally during a
+ * call to an external library function, e.g. GnuTLS or nghttp2) and the
+ * sequence will continue correctly. */
+static inline enum protolayer_iter_cb_result protolayer_async(void)
+{
+ return PROTOLAYER_ITER_CB_RESULT_MAGIC;
+}
+
+
+/** A buffer, with indices marking the chunk containing valid data.
+ *
+ * May be initialized in two possible ways:
+ * - via `wire_buf_init`
+ * - to zero, then reserved via `wire_buf_reserve`. */
+struct wire_buf {
+ char *buf; /**< Buffer memory. */
+ size_t size; /**< Current size of the buffer memory. */
+ size_t start; /**< Index at which the valid data of the buffer starts (inclusive). */
+ size_t end; /**< Index at which the valid data of the buffer ends (exclusive). */
+};
+
+/** Initializes the wire buffer with the specified `initial_size` and allocates
+ * the underlying memory. */
+int wire_buf_init(struct wire_buf *wb, size_t initial_size);
+
+/** De-allocates the wire buffer's underlying memory (the struct itself is left
+ * intact). */
+void wire_buf_deinit(struct wire_buf *wb);
+
+/** Ensures that the wire buffer's size is at least `size`. `*wb` must be
+ * initialized, either to zero or via `wire_buf_init`. */
+int wire_buf_reserve(struct wire_buf *wb, size_t size);
+
+/** Adds `length` to the end index of the valid data, marking `length` more
+ * bytes as valid.
+ *
+ * Returns 0 on success.
+ * Returns `kr_error(EINVAL)` if the end index would exceed the
+ * buffer size. */
+int wire_buf_consume(struct wire_buf *wb, size_t length);
+
+/** Adds `length` to the start index of the valid data, marking `length` less
+ * bytes as valid.
+ *
+ * Returns 0 on success.
+ * Returns `kr_error(EINVAL)` if the start index would exceed
+ * the end index. */
+int wire_buf_trim(struct wire_buf *wb, size_t length);
+
+/** Moves the valid bytes of the buffer to the buffer's beginning. */
+int wire_buf_movestart(struct wire_buf *wb);
+
+/** Resets the valid bytes of the buffer to zero, as well as the error flag. */
+int wire_buf_reset(struct wire_buf *wb);
+
+/** Gets a pointer to the data marked as valid in the wire buffer. */
+static inline void *wire_buf_data(const struct wire_buf *wb)
+{
+ return &wb->buf[wb->start];
+}
+
+/** Gets the length of the data marked as valid in the wire buffer. */
+static inline size_t wire_buf_data_length(const struct wire_buf *wb)
+{
+ return wb->end - wb->start;
+}
+
+/** Gets a pointer to the free space after the valid data of the wire buffer. */
+static inline void *wire_buf_free_space(const struct wire_buf *wb)
+{
+ return &wb->buf[wb->end];
+}
+
+/** Gets the length of the free space after the valid data of the wire buffer. */
+static inline size_t wire_buf_free_space_length(const struct wire_buf *wb)
+{
+ return wb->size - wb->end;
+}
+
+
+/** Indicates how a session sends data in the `wrap` direction and receives
+ * data in the `unwrap` direction. */
+enum session2_transport_type {
+ SESSION2_TRANSPORT_NULL = 0,
+ SESSION2_TRANSPORT_IO,
+ SESSION2_TRANSPORT_PARENT,
+};
+
+/** A data unit for a single sequential data source. The data may be organized
+ * as a stream or a sequence of datagrams - this is up to the actual individual
+ * protocols used by the session, as defined by the `layers` member - see
+ * `struct protolayer_manager` and the types of its members for more info.
+ *
+ * A session processes data in two directions:
+ *
+ * - `_UNWRAP` deals with raw data received from the session's transport. It
+ * strips the ceremony of individual protocols from the buffers. The last
+ * (bottommost) layer is generally responsible for submitting the unwrapped
+ * data to be processed by an internal system, e.g. to be resolved as a DNS
+ * query.
+ *
+ * - `_WRAP` deals with data generated by an internal system. It adds the
+ * required protocol ceremony to it (e.g. encryption). The first (topmost)
+ * layer is responsible for preparing the data to be sent through the
+ * session's transport. */
+struct session2 {
+ /** Data for sending data out in the `wrap` direction and receiving new
+ * data in the `unwrap` direction. */
+ struct {
+ enum session2_transport_type type; /**< See `enum session2_transport_type` */
+ union {
+ /** For `_IO` type transport. Contains a libuv handle
+ * and session-related address storage. */
+ struct {
+ uv_handle_t *handle;
+ union kr_sockaddr peer;
+ union kr_sockaddr sockname;
+ } io;
+
+ /** For `_PARENT` type transport. */
+ struct session2 *parent;
+ };
+ } transport;
+
+ struct protolayer_manager *layers; /**< Protocol layers of this session. */
+ knot_mm_t pool;
+ uv_timer_t timer; /**< For session-wide timeout events. */
+ enum protolayer_event_type timer_event; /**< The event fired on timeout. */
+ trie_t *tasks; /**< List of tasks associated with given session. */
+ queue_t(struct qr_task *) waiting; /**< List of tasks waiting for
+ * sending to upstream. */
+
+ int uv_count; /**< Number of unclosed libUV handles owned by this
+ * session. */
+
+ /** Communication information. Typically written into by one of the
+ * first layers facilitating transport protocol processing.
+ * Zero-initialized by default. */
+ struct comm_info comm;
+
+ /** Managed buffer for data received by `io`. */
+ struct wire_buf wire_buf;
+
+ /** Time of last IO activity (if any occurs). Otherwise session
+ * creation time. */
+ uint64_t last_activity;
+
+ /** If true, the session's transport is towards an upstream server.
+ * Otherwise, it is towards a client. */
+ bool outgoing : 1;
+
+ /** If true, the session is at the end of its lifecycle and is about
+ * to close. */
+ bool closing : 1;
+
++ /** If true, the session has done something useful,
++ * e.g. it has produced a packet. */
++ bool was_useful : 1;
++
+ /** If true, encryption takes place in this session. Layers may use
+ * this to determine whether padding should be applied. A layer that
+ * provides security shall set this to `true` during session
+ * initialization. */
+ bool secure : 1;
+
+ /** If true, the session contains a stream-based protocol layer.
+ * Set during protocol layer initialization by the stream-based layer. */
+ bool stream : 1;
+
+ /** If true, the session contains a HTTP protocol layer.
+ * Set during protocol layer initialization by the HTTP layer. */
+ bool http : 1;
+
+ /** If true, a connection is established. Only applicable to sessions
+ * using connection-based protocols. One of the stream-based protocol
+ * layers is going to be the writer for this flag. */
+ bool connected : 1;
+
+ /** If true, session is being rate-limited. One of the protocol layers
+ * is going to be the writer for this flag. */
+ bool throttled : 1;
+};
+
+/** Allocates and initializes a new session with the specified protocol layer
+ * group, and the provided transport context.
+ *
+ * `layer_param` is a pointer to an array of size `layer_param_count`. The
+ * parameters are passed to the layer session initializers. The parameter array
+ * is only required to be valid before this function returns. It is up to the
+ * individual layer implementations to determine the lifetime of the data
+ * pointed to by the parameters. */
+struct session2 *session2_new(enum session2_transport_type transport_type,
+ enum protolayer_grp layer_grp,
+ struct protolayer_data_param *layer_param,
+ size_t layer_param_count,
+ bool outgoing);
+
+/** Allocates and initializes a new session with the specified protocol layer
+ * group, using a *libuv handle* as its transport. */
+static inline struct session2 *session2_new_io(uv_handle_t *handle,
+ enum protolayer_grp layer_grp,
+ struct protolayer_data_param *layer_param,
+ size_t layer_param_count,
+ bool outgoing)
+{
+ struct session2 *s = session2_new(SESSION2_TRANSPORT_IO, layer_grp,
+ layer_param, layer_param_count, outgoing);
+ s->transport.io.handle = handle;
+ handle->data = s;
+ s->uv_count++; /* Session owns the handle */
+ return s;
+}
+
+/** Allocates and initializes a new session with the specified protocol layer
+ * group, using a *parent session* as its transport. */
+static inline struct session2 *session2_new_child(struct session2 *parent,
+ enum protolayer_grp layer_grp,
+ struct protolayer_data_param *layer_param,
+ size_t layer_param_count,
+ bool outgoing)
+{
+ struct session2 *s = session2_new(SESSION2_TRANSPORT_PARENT, layer_grp,
+ layer_param, layer_param_count, outgoing);
+ s->transport.parent = parent;
+ return s;
+}
+
+/** Used when a libUV handle owned by the session is closed. Once all owned
+ * handles are closed, the session is freed. */
+void session2_unhandle(struct session2 *s);
+
+/** Start reading from the underlying transport. */
+int session2_start_read(struct session2 *session);
+
+/** Stop reading from the underlying transport. */
+int session2_stop_read(struct session2 *session);
+
+/** Gets the peer address from the specified session, iterating through the
+ * session hierarchy (child-to-parent) until an `_IO` session is found if
+ * needed.
+ *
+ * May return `NULL` if no peer is set. */
+struct sockaddr *session2_get_peer(struct session2 *s);
+
+/** Gets the sockname from the specified session, iterating through the
+ * session hierarchy (child-to-parent) until an `_IO` session is found if
+ * needed.
+ *
+ * May return `NULL` if no peer is set. */
+struct sockaddr *session2_get_sockname(struct session2 *s);
+
+/** Gets the libuv handle from the specified session, iterating through the
+ * session hierarchy (child-to-parent) until an `_IO` session is found if
+ * needed.
+ *
+ * May return `NULL` if no peer is set. */
+KR_EXPORT uv_handle_t *session2_get_handle(struct session2 *s);
+
+/** Start the session timer. On timeout, the specified `event` is sent in the
+ * `_UNWRAP` direction. Only a single timeout can be active at a time. */
+int session2_timer_start(struct session2 *s, enum protolayer_event_type event,
+ uint64_t timeout, uint64_t repeat);
+
+/** Restart the session timer without changing any of its parameters. */
+int session2_timer_restart(struct session2 *s);
+
+/** Stop the session timer. */
+int session2_timer_stop(struct session2 *s);
+
+int session2_tasklist_add(struct session2 *session, struct qr_task *task);
+int session2_tasklist_del(struct session2 *session, struct qr_task *task);
+struct qr_task *session2_tasklist_get_first(struct session2 *session);
+struct qr_task *session2_tasklist_del_first(struct session2 *session, bool deref);
+struct qr_task *session2_tasklist_find_msgid(const struct session2 *session, uint16_t msg_id);
+struct qr_task *session2_tasklist_del_msgid(const struct session2 *session, uint16_t msg_id);
+void session2_tasklist_finalize(struct session2 *session, int status);
+int session2_tasklist_finalize_expired(struct session2 *session);
+
+static inline size_t session2_tasklist_get_len(const struct session2 *session)
+{
+ return trie_weight(session->tasks);
+}
+
+static inline bool session2_tasklist_is_empty(const struct session2 *session)
+{
+ return session2_tasklist_get_len(session) == 0;
+}
+
+int session2_waitinglist_push(struct session2 *session, struct qr_task *task);
+struct qr_task *session2_waitinglist_get(const struct session2 *session);
+struct qr_task *session2_waitinglist_pop(struct session2 *session, bool deref);
+void session2_waitinglist_retry(struct session2 *session, bool increase_timeout_cnt);
+void session2_waitinglist_finalize(struct session2 *session, int status);
+
+static inline size_t session2_waitinglist_get_len(const struct session2 *session)
+{
+ return queue_len(session->waiting);
+}
+
+static inline bool session2_waitinglist_is_empty(const struct session2 *session)
+{
+ return session2_waitinglist_get_len(session) == 0;
+}
+
+static inline bool session2_is_empty(const struct session2 *session)
+{
+ return session2_tasklist_is_empty(session) &&
+ session2_waitinglist_is_empty(session);
+}
+
+/** Sends the specified `payload` to be processed in the `_UNWRAP` direction by
+ * the session's protocol layers.
+ *
+ * The `comm` parameter may contain a pointer to comm data, e.g. for UDP, that
+ * comm data shall contain a pointer to the sender's `struct sockaddr_*`. If
+ * `comm` is `NULL`, session-wide data shall be used.
+ *
+ * Note that the payload data may be modified by any of the layers, to avoid
+ * making copies. Once the payload is passed to this function, the content of
+ * the referenced data is undefined to the caller.
+ *
+ * Once all layers are processed, `cb` is called with `baton` passed as one
+ * of its parameters. `cb` may also be `NULL`. See `protolayer_finished_cb` for
+ * more info.
+ *
+ * Returns one of `enum protolayer_ret` or a negative number
+ * indicating an error. */
+int session2_unwrap(struct session2 *s, struct protolayer_payload payload,
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton);
+
+/** Same as `session2_unwrap`, but looks up the specified `protocol` in the
+ * session's assigned protocol group and sends the `payload` to the layer that
+ * is next in the sequence in the `_UNWRAP` direction.
+ *
+ * Layers may use this to generate their own data to send in the sequence, e.g.
+ * for protocol-specific ceremony. */
+int session2_unwrap_after(struct session2 *s, enum protolayer_protocol protocol,
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton);
+
+/** Sends the specified `payload` to be processed in the `_WRAP` direction by
+ * the session's protocol layers. The `target` parameter may contain a pointer
+ * to some data specific to the bottommost layer of this session.
+ *
+ * Note that the payload data may be modified by any of the layers, to avoid
+ * making copies. Once the payload is passed to this function, the content of
+ * the referenced data is undefined to the caller.
+ *
+ * Once all layers are processed, `cb` is called with `baton` passed as one
+ * of its parameters. `cb` may also be `NULL`. See `protolayer_finished_cb` for
+ * more info.
+ *
+ * Returns one of `enum protolayer_ret` or a negative number
+ * indicating an error. */
+int session2_wrap(struct session2 *s, struct protolayer_payload payload,
+ const struct comm_info *comm, protolayer_finished_cb cb,
+ void *baton);
+
+/** Same as `session2_wrap`, but looks up the specified `protocol` in the
+ * session's assigned protocol group and sends the `payload` to the layer that
+ * is next in the sequence in the `_WRAP` direction.
+ *
+ * Layers may use this to generate their own data to send in the sequence, e.g.
+ * for protocol-specific ceremony. */
+int session2_wrap_after(struct session2 *s, enum protolayer_protocol protocol,
+ struct protolayer_payload payload,
+ const struct comm_info *comm,
+ protolayer_finished_cb cb, void *baton);
+
+/** Sends an event to be synchronously processed by the protocol layers of the
+ * specified session. The layers are first iterated through in the `_UNWRAP`
+ * direction, then bounced back in the `_WRAP` direction. */
+void session2_event(struct session2 *s, enum protolayer_event_type event, void *baton);
+
+/** Sends an event to be synchronously processed by the protocol layers of the
+ * specified session, starting from the specified `protocol` in the `_UNWRAP`
+ * direction. The layers are first iterated through in the `_UNWRAP` direction,
+ * then bounced back in the `_WRAP` direction.
+ *
+ * NOTE: The bounced iteration does not exclude any layers - the layer
+ * specified by `protocol` and those before it are only skipped in the
+ * `_UNWRAP` direction! */
+void session2_event_after(struct session2 *s, enum protolayer_protocol protocol,
+ enum protolayer_event_type event, void *baton);
+
+/** Performs initial setup of the specified `req`, using the session's protocol
+ * layers. Layers are processed in the `_UNWRAP` direction. */
+void session2_init_request(struct session2 *s, struct kr_request *req);
+
+/** Removes the specified request task from the session's tasklist. The session
+ * must be outgoing. If the session is UDP, a signal to close is also sent to it. */
+void session2_kill_ioreq(struct session2 *session, struct qr_task *task);
+
+/** Update `last_activity` to the current timestamp. */
+static inline void session2_touch(struct session2 *session)
+{
+ session->last_activity = kr_now();
+}
if (!session || !pkt)
return kr_error(EINVAL);
- const bool is_query = (knot_wire_get_qr(pkt->wire) == 0);
- uv_handle_t *handle = session_get_handle(session);
- if (!handle || !handle->loop->data)
- return kr_error(EINVAL);
-
+ const bool is_query = pkt->size > KNOT_WIRE_OFFSET_FLAGS1
+ && knot_wire_get_qr(pkt->wire) == 0;
- const bool is_outgoing = session_flags(session)->outgoing;
+ const bool is_outgoing = session->outgoing;
- int ret = knot_pkt_parse(pkt, 0);
- if (ret == KNOT_ETRAIL && is_outgoing && !kr_fails_assert(pkt->parsed < pkt->size))
- ret = KNOT_EOK; // we deal with this later, so that `selection` applies
+ int ret = 0;
+ if (is_query == is_outgoing)
+ ret = KNOT_ENOENT;
+
+ // For responses from upstream, try to find associated task and query.
+ // In case of errors, at least try to guess.
+ struct qr_task *task = NULL;
+ bool task_matched_id = false;
+ if (is_outgoing && pkt->size >= 2) {
+ const uint16_t id = knot_wire_get_id(pkt->wire);
- task = session_tasklist_del_msgid(session, id);
++ task = session2_tasklist_del_msgid(session, id);
+ task_matched_id = task != NULL;
+ if (task_matched_id) // Note receive time for RTT calculation
+ task->recv_time = kr_now();
+ if (!task_matched_id) {
+ ret = KNOT_ENOENT;
+ VERBOSE_MSG(NULL, "=> DNS message with mismatching ID %d\n",
+ (int)id);
+ }
+ }
- if (!task && is_outgoing && handle->type == UV_TCP) {
++ if (!task && is_outgoing && session->stream) {
+ // Source address of the reply got somewhat validated,
+ // so we try to at least guess which query, for error reporting.
- task = session_tasklist_get_first(session);
++ task = session2_tasklist_get_first(session);
+ }
+ struct kr_query *qry = NULL;
+ if (task)
+ qry = array_tail(task->ctx->req.rplan.pending);
+
+ // Parse the packet, unless it's useless anyway.
+ if (ret == 0) {
+ ret = knot_pkt_parse(pkt, 0);
+ if (ret == KNOT_ETRAIL && is_outgoing
+ && !kr_fails_assert(pkt->parsed < pkt->size)) {
+ // We deal with this later, so that RCODE takes priority.
+ ret = 0;
+ }
+ if (ret && kr_log_is_debug_qry(WORKER, qry)) {
+ VERBOSE_MSG(qry, "=> DNS message failed to parse, %s\n",
+ knot_strerror(ret));
+ }
+ }
- struct http_ctx *http_ctx = NULL;
-#if ENABLE_DOH2
- http_ctx = session_http_get_server_ctx(session);
-
/* Badly formed query when using DoH leads to a Bad Request */
- if (http_ctx && !is_outgoing && ret) {
- http_send_status(session, HTTP_STATUS_BAD_REQUEST);
- return kr_error(ret);
+ /* TODO: Do not necessarily tie it to HTTP - it should probably be a
+ * more generic flag */
+ if (session->http && !is_outgoing && ret) {
+ session2_event(session, PROTOLAYER_EVENT_MALFORMED, NULL);
+ return ret;
}
-#endif
-
- if (!is_outgoing && http_ctx && queue_len(http_ctx->streams) <= 0)
- return kr_error(ENOENT);
+ const struct sockaddr *addr = comm ? comm->src_addr : NULL;
+
/* Ignore badly formed queries. */
- if (ret && kr_log_is_debug(WORKER, NULL)) {
- VERBOSE_MSG(NULL, "=> incoming packet failed to parse, %s\n",
- knot_strerror(ret));
- }
- if (ret || is_query == is_outgoing) {
+ if (ret) {
+ if (is_outgoing && qry) // unusuable response from somewhat validated IP
+ qry->server_selection.error(qry, task->transport, KR_SELECTION_MALFORMED);
if (!is_outgoing)
the_worker->stats.dropped += 1;
+ if (task_matched_id) // notify task that answer won't be coming anymore
+ qr_task_step(task, addr, NULL);
return kr_error(EILSEQ);
}
/* Start new task on listening sockets,
* or resume if this is subrequest */
- struct qr_task *task = NULL;
- const struct sockaddr *addr = NULL;
if (!is_outgoing) { /* request from a client */
struct request_ctx *ctx =
- request_create(the_worker, session, comm, eth_from,
- eth_to, knot_wire_get_id(pkt->wire));
- if (http_ctx)
- queue_pop(http_ctx->streams);
+ request_create(session, comm, knot_wire_get_id(pkt->wire));
if (!ctx)
return kr_error(ENOMEM);
return kr_error(ENOMEM);
}
} else { /* response from upstream */
- const uint16_t id = knot_wire_get_id(pkt->wire);
- task = session2_tasklist_del_msgid(session, id);
if (task == NULL) {
- VERBOSE_MSG(NULL, "=> ignoring packet with mismatching ID %d\n",
- (int)id);
return kr_error(ENOENT);
}
- if (kr_fails_assert(!session_flags(session)->closing))
+ if (kr_fails_assert(!session->closing))
return kr_error(EINVAL);
- addr = (comm) ? comm->src_addr : NULL;
- /* Note receive time for RTT calculation */
- task->recv_time = kr_now();
}
- if (kr_fails_assert(!uv_is_closing(session_get_handle(session))))
+ if (kr_fails_assert(!session->closing))
return kr_error(EINVAL);
/* Packet was successfully parsed.
return task->pktbuf;
}
-struct request_ctx *worker_task_get_request(struct qr_task *task)
-{
- return task->ctx;
-}
-
+ struct kr_transport *worker_task_get_transport(struct qr_task *task)
+ {
+ return task->transport;
+ }
+
-struct session *worker_request_get_source_session(const struct kr_request *req)
+struct session2 *worker_request_get_source_session(const struct kr_request *req)
{
static_assert(offsetof(struct request_ctx, req) == 0,
"Bad struct request_ctx definition.");
the_worker = NULL;
}
-int worker_init(struct engine *engine, int worker_count)
+static inline knot_pkt_t *produce_packet(char *buf, size_t buf_len)
+{
+ return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool);
+}
+
+static bool pl_dns_dgram_event_unwrap(enum protolayer_event_type event,
+ void **baton,
+ struct protolayer_manager *manager,
+ void *sess_data)
+{
+ if (event != PROTOLAYER_EVENT_GENERAL_TIMEOUT)
+ return true;
+
+ struct session2 *session = manager->session;
+ if (session2_tasklist_get_len(session) != 1 ||
+ !session2_waitinglist_is_empty(session))
+ return true;
+
+ session2_timer_stop(session);
+
+ struct qr_task *task = session2_tasklist_get_first(session);
+ if (!task)
+ return true;
+
+ if (task->leading && task->pending_count > 0) {
+ struct kr_query *qry = array_tail(task->ctx->req.rplan.pending);
+ qry->server_selection.error(qry, task->transport, KR_SELECTION_QUERY_TIMEOUT);
+ }
+
+ task->timeouts += 1;
+ the_worker->stats.timeout += 1;
+ qr_task_step(task, NULL, NULL);
+
+ return true;
+}
+
+static enum protolayer_iter_cb_result pl_dns_dgram_unwrap(
+ void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
+{
+ struct session2 *session = ctx->manager->session;
+
+ if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
+ int ret = kr_ok();
+ for (int i = 0; i < ctx->payload.iovec.cnt; i++) {
+ const struct iovec *iov = &ctx->payload.iovec.iov[i];
+ knot_pkt_t *pkt = produce_packet(
+ iov->iov_base, iov->iov_len);
+ if (!pkt) {
+ ret = KNOT_EMALF;
+ break;
+ }
+
+ ret = worker_submit(session, &ctx->comm, pkt);
+ if (ret)
+ break;
+ }
+
+ mp_flush(the_worker->pkt_pool.ctx);
+ return protolayer_break(ctx, ret);
+ } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
+ knot_pkt_t *pkt = produce_packet(
+ ctx->payload.buffer.buf,
+ ctx->payload.buffer.len);
+ if (!pkt)
+ return protolayer_break(ctx, KNOT_EMALF);
+
+ int ret = worker_submit(session, &ctx->comm, pkt);
+ mp_flush(the_worker->pkt_pool.ctx);
+ return protolayer_break(ctx, ret);
+ } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF) {
+ knot_pkt_t *pkt = produce_packet(
+ wire_buf_data(ctx->payload.wire_buf),
+ wire_buf_data_length(ctx->payload.wire_buf));
+ if (!pkt)
+ return protolayer_break(ctx, KNOT_EMALF);
+
+ int ret = worker_submit(session, &ctx->comm, pkt);
+ wire_buf_reset(ctx->payload.wire_buf);
+ mp_flush(the_worker->pkt_pool.ctx);
+ return protolayer_break(ctx, ret);
+ } else {
+ kr_assert(false && "Invalid payload");
+ return protolayer_break(ctx, kr_error(EINVAL));
+ }
+}
+
+struct pl_dns_stream_sess_data {
+ PROTOLAYER_DATA_HEADER();
+ bool single : 1; /**< True: Stream only allows a single packet */
+ bool produced : 1; /**< True: At least one packet has been produced */
+};
+
+struct pl_dns_stream_iter_data {
+ PROTOLAYER_DATA_HEADER();
+ struct {
+ knot_mm_t *pool;
+ void *mem;
+ } sent;
+};
+
+static int pl_dns_stream_sess_init(struct protolayer_manager *manager,
+ void *sess_data, void *param)
+{
+ /* _UNSIZED_STREAM and _MULTI_STREAM - don't forget to split if needed
+ * at some point */
+ manager->session->stream = true;
+ return kr_ok();
+}
+
+static int pl_dns_single_stream_sess_init(struct protolayer_manager *manager,
+ void *sess_data, void *param)
+{
+ manager->session->stream = true;
+ struct pl_dns_stream_sess_data *stream = sess_data;
+ stream->single = true;
+ return kr_ok();
+}
+
+static int pl_dns_stream_iter_deinit(struct protolayer_manager *manager,
+ struct protolayer_iter_ctx *ctx,
+ void *iter_data)
+{
+ struct pl_dns_stream_iter_data *stream = iter_data;
+ mm_free(stream->sent.pool, stream->sent.mem);
+ return kr_ok();
+}
+
+static bool pl_dns_stream_resolution_timeout(struct session2 *s)
+{
+ if (kr_fails_assert(!s->closing))
+ return true;
+
+ if (!session2_tasklist_is_empty(s)) {
+ int finalized = session2_tasklist_finalize_expired(s);
+ the_worker->stats.timeout += finalized;
+ /* session2_tasklist_finalize_expired() may call worker_task_finalize().
+ * If session is a source session and there were IO errors,
+ * worker_task_finalize() can finalize all tasks and close session. */
+ if (s->closing)
+ return true;
+ }
+
+ if (!session2_tasklist_is_empty(s)) {
+ session2_timer_stop(s);
+ session2_timer_start(s,
+ PROTOLAYER_EVENT_GENERAL_TIMEOUT,
+ KR_RESOLVE_TIME_LIMIT / 2,
+ KR_RESOLVE_TIME_LIMIT / 2);
+ } else {
+ /* Normally it should not happen,
+ * but better to check if there anything in this list. */
+ while (!session2_waitinglist_is_empty(s)) {
+ struct qr_task *t = session2_waitinglist_pop(s, false);
+ worker_task_finalize(t, KR_STATE_FAIL);
+ worker_task_unref(t);
+ the_worker->stats.timeout += 1;
+ if (s->closing)
+ return true;
+ }
+ uint64_t idle_in_timeout = the_network->tcp.in_idle_timeout;
+ uint64_t idle_time = kr_now() - s->last_activity;
+ if (idle_time < idle_in_timeout) {
+ idle_in_timeout -= idle_time;
+ session2_timer_stop(s);
+ session2_timer_start(s, PROTOLAYER_EVENT_GENERAL_TIMEOUT,
+ idle_in_timeout, idle_in_timeout);
+ } else {
+ struct sockaddr *peer = session2_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_debug(IO, "=> closing connection to '%s'\n",
+ peer_str ? peer_str : "");
+ if (s->outgoing) {
+ worker_del_tcp_waiting(peer);
+ worker_del_tcp_connected(peer);
+ }
+ session2_event(s, PROTOLAYER_EVENT_CLOSE, NULL);
+ }
+ }
+
+ return true;
+}
+
+static bool pl_dns_stream_connected(struct session2 *session)
+{
+ if (session->connected)
+ return true;
+
+ session->connected = true;
+
+ struct sockaddr *peer = session2_get_peer(session);
+ if (session->outgoing && worker_del_tcp_waiting(peer) != 0) {
+ /* session isn't in list of waiting queries, *
+ * something gone wrong */
+ session2_waitinglist_finalize(session, KR_STATE_FAIL);
+ kr_assert(session2_tasklist_is_empty(session));
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
+ return false;
+ }
+
+ worker_add_tcp_connected(peer, session);
+ return true;
+}
+
+static bool pl_dns_stream_connection_fail(struct session2 *session,
+ enum kr_selection_error sel_err)
+{
+ session2_timer_stop(session);
+
+ kr_assert(session2_tasklist_is_empty(session));
+
+ struct sockaddr *peer = session2_get_peer(session);
+ worker_del_tcp_waiting(peer);
+
+ struct qr_task *task = session2_waitinglist_get(session);
+ if (!task) {
+ /* Normally shouldn't happen. */
+ const char *peer_str = kr_straddr(peer);
+ VERBOSE_MSG(NULL, "=> connection to '%s' failed, empty waitinglist\n",
+ peer_str ? peer_str : "");
+ return true;
+ }
+
+ struct kr_query *qry = task_get_last_pending_query(task);
+ if (kr_log_is_debug_qry(WORKER, qry)) {
+ const char *peer_str = kr_straddr(peer);
+ bool timeout = sel_err == KR_SELECTION_TCP_CONNECT_TIMEOUT;
+ VERBOSE_MSG(qry, "=> connection to '%s' failed (%s)\n",
+ peer_str ? peer_str : "",
+ timeout ? "timeout" : "error");
+ }
+
+ if (qry)
+ qry->server_selection.error(qry, task->transport, sel_err);
+
+ the_worker->stats.timeout += session2_waitinglist_get_len(session);
+ session2_waitinglist_retry(session, true);
+ kr_assert(session2_tasklist_is_empty(session));
+ /* uv_cancel() doesn't support uv_connect_t request,
+ * so that we can't cancel it.
+ * There still exists possibility of successful connection
+ * for this request.
+ * So connection callback (on_connect()) must check
+ * if connection is in the list of waiting connection.
+ * If no, most likely this is timed out connection even if
+ * it was successful. */
+
+ return true;
+}
+
+static bool pl_dns_stream_disconnected(struct session2 *session)
+{
+ if (!session->connected)
+ return true;
+
+ struct sockaddr *peer = session2_get_peer(session);
+ worker_del_tcp_waiting(peer);
+ worker_del_tcp_connected(peer);
+ session->connected = false;
+
+ while (!session2_waitinglist_is_empty(session)) {
+ struct qr_task *task = session2_waitinglist_pop(session, false);
+ kr_assert(task->refs > 1);
+ session2_tasklist_del(session, task);
+ if (session->outgoing) {
+ if (task->ctx->req.options.FORWARD) {
+ /* We are in TCP_FORWARD mode.
+ * To prevent failing at kr_resolve_consume()
+ * qry.flags.TCP must be cleared.
+ * TODO - refactoring is needed. */
+ struct kr_request *req = &task->ctx->req;
+ struct kr_rplan *rplan = &req->rplan;
+ struct kr_query *qry = array_tail(rplan->pending);
+ qry->flags.TCP = false;
+ }
+ qr_task_step(task, NULL, NULL);
+ } else {
+ kr_assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ worker_task_unref(task);
+ }
+ while (!session2_tasklist_is_empty(session)) {
+ struct qr_task *task = session2_tasklist_del_first(session, false);
+ if (session->outgoing) {
+ if (task->ctx->req.options.FORWARD) {
+ struct kr_request *req = &task->ctx->req;
+ struct kr_rplan *rplan = &req->rplan;
+ struct kr_query *qry = array_tail(rplan->pending);
+ qry->flags.TCP = false;
+ }
+ qr_task_step(task, NULL, NULL);
+ } else {
+ kr_assert(task->ctx->source.session == session);
+ task->ctx->source.session = NULL;
+ }
+ worker_task_unref(task);
+ }
+
+ return true;
+}
+
+static bool pl_dns_stream_event_unwrap(enum protolayer_event_type event,
+ void **baton,
+ struct protolayer_manager *manager,
+ void *sess_data)
+{
+ struct session2 *session = manager->session;
+ if (session->closing)
+ return true;
+
+ if (event == PROTOLAYER_EVENT_GENERAL_TIMEOUT) {
+ return pl_dns_stream_resolution_timeout(manager->session);
+ } else if (event == PROTOLAYER_EVENT_CONNECT_TIMEOUT) {
+ return pl_dns_stream_connection_fail(manager->session,
+ KR_SELECTION_TCP_CONNECT_TIMEOUT);
+ } else if (event == PROTOLAYER_EVENT_CONNECT) {
+ return pl_dns_stream_connected(session);
+ } else if (event == PROTOLAYER_EVENT_DISCONNECT
+ || event == PROTOLAYER_EVENT_CLOSE
+ || event == PROTOLAYER_EVENT_FORCE_CLOSE) {
+ return pl_dns_stream_disconnected(session);
+ } else if (event == PROTOLAYER_EVENT_CONNECT_FAIL) {
+ enum kr_selection_error err = (*baton)
+ ? *(enum kr_selection_error *)baton
+ : KR_SELECTION_TCP_CONNECT_FAILED;
+ return pl_dns_stream_connection_fail(manager->session, err);
+ }
+
+ return true;
+}
+
+static knot_pkt_t *produce_stream_packet(struct wire_buf *wb)
{
- if (kr_fails_assert(engine && engine->L && the_worker == NULL))
+ uint16_t pkt_len = knot_wire_read_u16(wire_buf_data(wb));
+ if (wire_buf_data_length(wb) < pkt_len + sizeof(uint16_t))
+ return NULL;
+
+ wire_buf_trim(wb, sizeof(uint16_t));
+ knot_pkt_t *pkt = produce_packet(wire_buf_data(wb), pkt_len);
+ wire_buf_trim(wb, pkt_len);
+ return pkt;
+}
+
+static enum protolayer_iter_cb_result pl_dns_stream_unwrap(
+ void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
+{
+ if (kr_fails_assert(ctx->payload.type == PROTOLAYER_PAYLOAD_WIRE_BUF)) {
+ /* DNS stream only works with a wire buffer */
+ return protolayer_break(ctx, kr_error(EINVAL));
+ }
+
+ int status = kr_ok();
+ struct session2 *session = ctx->manager->session;
+ struct pl_dns_stream_sess_data *stream_sess = sess_data;
+ struct wire_buf *wb = ctx->payload.wire_buf;
+ const uint32_t max_iters = (wire_buf_data_length(wb) /
+ (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
+ int iters = 0;
+
+ knot_pkt_t *pkt;
+ while ((pkt = produce_stream_packet(wb)) && iters < max_iters) {
++ session->was_useful = true;
+ if (stream_sess->single && stream_sess->produced) {
+ if (kr_log_is_debug(WORKER, NULL)) {
+ kr_log_debug(WORKER, "Unexpected extra data from %s\n",
+ kr_straddr(ctx->comm.src_addr));
+ }
+ worker_end_tcp(session);
+ status = KNOT_EMALF;
+ goto exit;
+ }
+
+ stream_sess->produced = true;
+ if (!pkt) {
+ status = KNOT_EMALF;
+ goto exit;
+ }
+
+ int ret = worker_submit(session, &ctx->comm, pkt);
+ wire_buf_movestart(wb);
+ if (ret == kr_ok()) {
+ iters += 1;
+ }
+ }
+
+
+
+ /* worker_submit() may cause the session to close (e.g. due to IO
+ * write error when the packet triggers an immediate answer). This is
+ * an error state, as well as any wirebuf error. */
+ if (session->closing)
+ status = kr_error(EIO);
+
+exit:
+ wire_buf_movestart(wb);
+ mp_flush(the_worker->pkt_pool.ctx);
+ return protolayer_break(ctx, status);
+}
+
+struct sized_iovs {
+ uint8_t nlen[2];
+ struct iovec iovs[];
+};
+
+static enum protolayer_iter_cb_result pl_dns_stream_wrap(
+ void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
+{
+ struct pl_dns_stream_iter_data *stream = iter_data;
+ struct session2 *s = ctx->manager->session;
+
+ if (kr_fails_assert(!stream->sent.mem))
+ return protolayer_break(ctx, kr_error(EINVAL));
+
+ if (ctx->payload.type == PROTOLAYER_PAYLOAD_BUFFER) {
+ if (kr_fails_assert(ctx->payload.buffer.len <= UINT16_MAX))
+ return protolayer_break(ctx, kr_error(EMSGSIZE));
+
+ const int iovcnt = 2;
+ struct sized_iovs *siov = mm_alloc(&s->pool,
+ sizeof(*siov) + iovcnt * sizeof(struct iovec));
+ kr_require(siov);
+ knot_wire_write_u16(siov->nlen, ctx->payload.buffer.len);
+ siov->iovs[0] = (struct iovec){
+ .iov_base = &siov->nlen,
+ .iov_len = sizeof(siov->nlen)
+ };
+ siov->iovs[1] = (struct iovec){
+ .iov_base = ctx->payload.buffer.buf,
+ .iov_len = ctx->payload.buffer.len
+ };
+
+ stream->sent.mem = siov;
+ stream->sent.pool = &s->pool;
+
+ ctx->payload = protolayer_iovec(siov->iovs, iovcnt);
+ return protolayer_continue(ctx);
+ } else if (ctx->payload.type == PROTOLAYER_PAYLOAD_IOVEC) {
+ const int iovcnt = 1 + ctx->payload.iovec.cnt;
+ struct sized_iovs *siov = mm_alloc(&s->pool,
+ sizeof(*siov) + iovcnt * sizeof(struct iovec));
+ kr_require(siov);
+
+ size_t total_len = 0;
+ for (int i = 0; i < ctx->payload.iovec.cnt; i++) {
+ const struct iovec *iov = &ctx->payload.iovec.iov[i];
+ total_len += iov->iov_len;
+ siov->iovs[i + 1] = *iov;
+ }
+
+ if (kr_fails_assert(total_len <= UINT16_MAX))
+ return protolayer_break(ctx, kr_error(EMSGSIZE));
+ knot_wire_write_u16(siov->nlen, total_len);
+ siov->iovs[0] = (struct iovec){
+ .iov_base = &siov->nlen,
+ .iov_len = sizeof(siov->nlen)
+ };
+
+ stream->sent.mem = siov;
+ stream->sent.pool = &s->pool;
+
+ ctx->payload = protolayer_iovec(siov->iovs, iovcnt);
+ return protolayer_continue(ctx);
+ } else {
+ kr_assert(false && "Invalid payload");
+ return protolayer_break(ctx, kr_error(EINVAL));
+ }
+}
+
+static void pl_dns_stream_request_init(struct protolayer_manager *manager,
+ struct kr_request *req,
+ void *sess_data)
+{
+ req->qsource.comm_flags.tcp = true;
+}
+
+int worker_init(void)
+{
+ if (kr_fails_assert(the_worker == NULL))
return kr_error(EINVAL);
- kr_bindings_register(engine->L);
+ kr_bindings_register(the_engine->L); // TODO move
+
+ /* DNS protocol layers */
+ protolayer_globals[PROTOLAYER_DNS_DGRAM] = (struct protolayer_globals){
+ .unwrap = pl_dns_dgram_unwrap,
+ .event_unwrap = pl_dns_dgram_event_unwrap
+ };
+ protolayer_globals[PROTOLAYER_DNS_UNSIZED_STREAM] = (struct protolayer_globals){
+ .sess_init = pl_dns_stream_sess_init,
+ .unwrap = pl_dns_dgram_unwrap,
+ .event_unwrap = pl_dns_stream_event_unwrap,
+ .request_init = pl_dns_stream_request_init
+ };
+ const struct protolayer_globals stream_common = {
+ .sess_size = sizeof(struct pl_dns_stream_sess_data),
+ .sess_init = NULL, /* replaced in specific layers below */
+ .iter_size = sizeof(struct pl_dns_stream_iter_data),
+ .iter_deinit = pl_dns_stream_iter_deinit,
+ .unwrap = pl_dns_stream_unwrap,
+ .wrap = pl_dns_stream_wrap,
+ .event_unwrap = pl_dns_stream_event_unwrap,
+ .request_init = pl_dns_stream_request_init
+ };
+ protolayer_globals[PROTOLAYER_DNS_MULTI_STREAM] = stream_common;
+ protolayer_globals[PROTOLAYER_DNS_MULTI_STREAM].sess_init = pl_dns_stream_sess_init;
+ protolayer_globals[PROTOLAYER_DNS_SINGLE_STREAM] = stream_common;
+ protolayer_globals[PROTOLAYER_DNS_SINGLE_STREAM].sess_init = pl_dns_single_stream_sess_init;
/* Create main worker. */
- struct worker_ctx *worker = &the_worker_value;
- memset(worker, 0, sizeof(*worker));
- worker->engine = engine;
+ the_worker = &the_worker_value;
+ memset(the_worker, 0, sizeof(*the_worker));
uv_loop_t *loop = uv_default_loop();
- worker->loop = loop;
+ the_worker->loop = loop;
- worker->count = worker_count;
+ static const int worker_count = 1;
+ the_worker->count = worker_count;
/* Register table for worker per-request variables */
- lua_newtable(engine->L);
- lua_setfield(engine->L, -2, "vars");
- lua_getfield(engine->L, -1, "vars");
- worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
- lua_pop(engine->L, 1);
+ struct lua_State *L = the_engine->L;
+ lua_newtable(L);
+ lua_setfield(L, -2, "vars");
+ lua_getfield(L, -1, "vars");
+ the_worker->vars_table_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+ lua_pop(L, 1);
- worker->tcp_pipeline_max = MAX_PIPELINED;
- worker->out_addr4.sin_family = AF_UNSPEC;
- worker->out_addr6.sin6_family = AF_UNSPEC;
+ the_worker->tcp_pipeline_max = MAX_PIPELINED;
+ the_worker->out_addr4.sin_family = AF_UNSPEC;
+ the_worker->out_addr6.sin6_family = AF_UNSPEC;
- array_init(worker->doh_qry_headers);
+ array_init(the_worker->doh_qry_headers);
- int ret = worker_reserve(worker);
+ int ret = worker_reserve();
if (ret) return ret;
- worker->next_request_uid = UINT16_MAX + 1;
+ the_worker->next_request_uid = UINT16_MAX + 1;
/* Set some worker.* fields in Lua */
- lua_getglobal(engine->L, "worker");
+ lua_getglobal(L, "worker");
pid_t pid = getpid();
auto_free char *pid_str = NULL;