#define HTTP_MAX_HEADER_IN_SIZE 1024
+/* Initial max frame size: https://tools.ietf.org/html/rfc7540#section-6.5.2 */
+#define HTTP_MAX_FRAME_SIZE 16384
+
#define HTTP_FRAME_HDLEN 9
#define HTTP_FRAME_PADLEN 1
http->current_method = HTTP_METHOD_NONE;
http->uri_path = NULL;
http->status = HTTP_STATUS_OK;
- wire_buf_init(&http->wire_buf, KNOT_WIRE_MAX_PKTSIZE);
+ wire_buf_init(&http->wire_buf, manager->wire_buf.size);
ret = nghttp2_session_server_new(&http->h2, callbacks, http);
if (ret < 0)
{
protolayer_globals[PROTOLAYER_HTTP] = (struct protolayer_globals) {
.sess_size = sizeof(struct pl_http_sess_data),
- .sess_init = pl_http_sess_init,
.sess_deinit = pl_http_sess_deinit,
+ .wire_buf_overhead = HTTP_MAX_FRAME_SIZE,
+ .sess_init = pl_http_sess_init,
.unwrap = pl_http_unwrap,
.wrap = pl_http_wrap,
.event_unwrap = pl_http_event_unwrap,
static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
- /* UDP sessions use worker buffer for wire data,
- * TCP sessions use session buffer for wire data
- * (see session_set_handle()).
- * TLS sessions use buffer from TLS context.
- * The content of the worker buffer is
- * guaranteed to be unchanged only for the duration of
- * udp_read() and tcp_read().
- */
struct session2 *s = handle->data;
- buf->base = wire_buf_free_space(&s->wire_buf);
- buf->len = wire_buf_free_space_length(&s->wire_buf);
+ buf->base = wire_buf_free_space(&s->layers->wire_buf);
+ buf->len = wire_buf_free_space_length(&s->layers->wire_buf);
}
static void udp_on_unwrapped(int status, struct session2 *session,
const struct comm_info *comm, void *baton)
{
- wire_buf_reset(&session->wire_buf);
+ wire_buf_reset(&session->layers->wire_buf);
}
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
}
}
- int ret = wire_buf_consume(&s->wire_buf, nread);
+ int ret = wire_buf_consume(&s->layers->wire_buf, nread);
if (ret) {
- wire_buf_reset(&s->wire_buf);
+ wire_buf_reset(&s->layers->wire_buf);
return;
}
.comm_addr = comm_addr,
.src_addr = comm_addr
};
- session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), &in_comm,
+ session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf), &in_comm,
udp_on_unwrapped, NULL);
}
return;
}
- int ret = wire_buf_consume(&s->wire_buf, nread);
+ if (kr_fails_assert(buf->base == wire_buf_free_space(&s->layers->wire_buf))) {
+ return;
+ }
+
+ int ret = wire_buf_consume(&s->layers->wire_buf, nread);
if (ret) {
- wire_buf_reset(&s->wire_buf);
+ wire_buf_reset(&s->layers->wire_buf);
return;
}
- session2_unwrap(s, protolayer_wire_buf(&s->wire_buf), NULL, NULL, NULL);
+ session2_unwrap(s, protolayer_wire_buf(&s->layers->wire_buf), NULL, NULL, NULL);
}
static void _tcp_accept(uv_stream_t *master, int status, enum protolayer_grp grp)
#define VERBOSE_LOG(session, fmt, ...) do {\
if (kr_log_is_debug(PROTOLAYER, NULL)) {\
const char *sess_dir = (session)->outgoing ? "out" : "in";\
- kr_log_debug(PROTOLAYER, "(%s) " fmt, sess_dir, __VA_ARGS__);\
+ kr_log_debug(PROTOLAYER, "[%08X] (%s) " fmt, \
+ (session)->log_id, sess_dir, __VA_ARGS__);\
}\
} while (0);\
+static uint32_t next_log_id = 1;
struct protolayer_globals protolayer_globals[PROTOLAYER_PROTOCOL_COUNT] = {{0}};
if (kr_fails_assert(num_layers))
return NULL;
+ size_t wire_buf_length = 0;
ssize_t offsets[2 * num_layers];
manager_size += sizeof(offsets);
size_t total_sess_data_size = 0;
size_t total_iter_data_size = 0;
for (size_t i = 0; i < num_layers; i++) {
- sess_offsets[i] = protolayer_globals[protocols[i]].sess_size
- ? total_sess_data_size : -1;
- total_sess_data_size += ALIGN_TO(protolayer_globals[protocols[i]].sess_size,
- CPU_STRUCT_ALIGN);
-
- iter_offsets[i] = protolayer_globals[protocols[i]].iter_size
- ? total_iter_data_size : -1;
- total_iter_data_size += ALIGN_TO(protolayer_globals[protocols[i]].iter_size,
- CPU_STRUCT_ALIGN);
+ const struct protolayer_globals *g = &protolayer_globals[protocols[i]];
+
+ sess_offsets[i] = g->sess_size ? total_sess_data_size : -1;
+ total_sess_data_size += ALIGN_TO(g->sess_size, CPU_STRUCT_ALIGN);
+
+ iter_offsets[i] = g->iter_size ? total_iter_data_size : -1;
+ total_iter_data_size += ALIGN_TO(g->iter_size, CPU_STRUCT_ALIGN);
+
+ wire_buf_length += g->wire_buf_overhead;
}
manager_size += total_sess_data_size;
cb_ctx_size += total_iter_data_size;
m->cb_ctx_size = cb_ctx_size;
memcpy(m->data, offsets, sizeof(offsets));
+ int ret = wire_buf_init(&m->wire_buf, wire_buf_length);
+ kr_require(!ret);
+
/* Initialize the layer's session data */
for (size_t i = 0; i < num_layers; i++) {
struct protolayer_globals *globals = &protolayer_globals[protocols[i]];
}
}
+ wire_buf_deinit(&m->wire_buf);
free(m);
}
.transport = {
.type = transport_type,
},
+ .log_id = next_log_id++,
.outgoing = outgoing,
.tasks = trie_create(NULL),
};
mm_ctx_mempool(&s->pool, CPU_PAGE_SIZE);
queue_init(s->waiting);
- int ret = wire_buf_init(&s->wire_buf, KNOT_WIRE_MAX_PKTSIZE);
- kr_require(!ret);
-
- ret = uv_timer_init(uv_default_loop(), &s->timer);
+ int ret = uv_timer_init(uv_default_loop(), &s->timer);
kr_require(!ret);
s->timer.data = s;
s->uv_count++; /* Session owns the timer */
static void session2_free(struct session2 *s)
{
protolayer_manager_free(s->layers);
- wire_buf_deinit(&s->wire_buf);
mm_ctx_delete(&s->pool);
trie_free(s->tasks);
queue_deinit(s->waiting);
struct session2;
struct protolayer_iter_ctx;
+
/** Type of MAC addresses. */
typedef uint8_t ethaddr_t[6];
bool xdp:1;
};
+
+/** A buffer, with indices marking the chunk containing valid data.
+ *
+ * May be initialized in two possible ways:
+ * - via `wire_buf_init`
+ * - to zero, then reserved via `wire_buf_reserve`. */
+struct wire_buf {
+ char *buf; /**< Buffer memory. */
+ size_t size; /**< Current size of the buffer memory. */
+ size_t start; /**< Index at which the valid data of the buffer starts (inclusive). */
+ size_t end; /**< Index at which the valid data of the buffer ends (exclusive). */
+};
+
+/** Initializes the wire buffer with the specified `initial_size` and allocates
+ * the underlying memory. */
+int wire_buf_init(struct wire_buf *wb, size_t initial_size);
+
+/** De-allocates the wire buffer's underlying memory (the struct itself is left
+ * intact). */
+void wire_buf_deinit(struct wire_buf *wb);
+
+/** Ensures that the wire buffer's size is at least `size`. `*wb` must be
+ * initialized, either to zero or via `wire_buf_init`. */
+int wire_buf_reserve(struct wire_buf *wb, size_t size);
+
+/** Adds `length` to the end index of the valid data, marking `length` more
+ * bytes as valid.
+ *
+ * Returns 0 on success.
+ * Returns `kr_error(EINVAL)` if the end index would exceed the
+ * buffer size. */
+int wire_buf_consume(struct wire_buf *wb, size_t length);
+
+/** Adds `length` to the start index of the valid data, marking `length` less
+ * bytes as valid.
+ *
+ * Returns 0 on success.
+ * Returns `kr_error(EINVAL)` if the start index would exceed
+ * the end index. */
+int wire_buf_trim(struct wire_buf *wb, size_t length);
+
+/** Moves the valid bytes of the buffer to the buffer's beginning. */
+int wire_buf_movestart(struct wire_buf *wb);
+
+/** Resets the valid bytes of the buffer to zero, as well as the error flag. */
+int wire_buf_reset(struct wire_buf *wb);
+
+/** Gets a pointer to the data marked as valid in the wire buffer. */
+static inline void *wire_buf_data(const struct wire_buf *wb)
+{
+ return &wb->buf[wb->start];
+}
+
+/** Gets the length of the data marked as valid in the wire buffer. */
+static inline size_t wire_buf_data_length(const struct wire_buf *wb)
+{
+ return wb->end - wb->start;
+}
+
+/** Gets a pointer to the free space after the valid data of the wire buffer. */
+static inline void *wire_buf_free_space(const struct wire_buf *wb)
+{
+ return &wb->buf[wb->end];
+}
+
+/** Gets the length of the free space after the valid data of the wire buffer. */
+static inline size_t wire_buf_free_space_length(const struct wire_buf *wb)
+{
+ return wb->size - wb->end;
+}
+
+
+
/** Protocol layer types map - an enumeration of individual protocol layer
* implementations
*
* interpreted. */
struct protolayer_manager {
enum protolayer_grp grp;
+ struct wire_buf wire_buf;
struct session2 *session;
size_t num_layers;
size_t cb_ctx_size; /**< Size of a single callback context, including
* no iteration struct is used by the layer, the value may be zero. */
size_t iter_size;
+ /** Number of bytes that this layer adds onto the session's wire
+ * buffer. All overheads in a group are summed together to form the
+ * resulting wire buffer length. */
+ size_t wire_buf_overhead;
+
/** Called during session creation to initialize
* layer-specific session data. The data is always provided
* zero-initialized to this function. */
}
-/** A buffer, with indices marking the chunk containing valid data.
- *
- * May be initialized in two possible ways:
- * - via `wire_buf_init`
- * - to zero, then reserved via `wire_buf_reserve`. */
-struct wire_buf {
- char *buf; /**< Buffer memory. */
- size_t size; /**< Current size of the buffer memory. */
- size_t start; /**< Index at which the valid data of the buffer starts (inclusive). */
- size_t end; /**< Index at which the valid data of the buffer ends (exclusive). */
-};
-
-/** Initializes the wire buffer with the specified `initial_size` and allocates
- * the underlying memory. */
-int wire_buf_init(struct wire_buf *wb, size_t initial_size);
-
-/** De-allocates the wire buffer's underlying memory (the struct itself is left
- * intact). */
-void wire_buf_deinit(struct wire_buf *wb);
-
-/** Ensures that the wire buffer's size is at least `size`. `*wb` must be
- * initialized, either to zero or via `wire_buf_init`. */
-int wire_buf_reserve(struct wire_buf *wb, size_t size);
-
-/** Adds `length` to the end index of the valid data, marking `length` more
- * bytes as valid.
- *
- * Returns 0 on success.
- * Returns `kr_error(EINVAL)` if the end index would exceed the
- * buffer size. */
-int wire_buf_consume(struct wire_buf *wb, size_t length);
-
-/** Adds `length` to the start index of the valid data, marking `length` less
- * bytes as valid.
- *
- * Returns 0 on success.
- * Returns `kr_error(EINVAL)` if the start index would exceed
- * the end index. */
-int wire_buf_trim(struct wire_buf *wb, size_t length);
-
-/** Moves the valid bytes of the buffer to the buffer's beginning. */
-int wire_buf_movestart(struct wire_buf *wb);
-
-/** Resets the valid bytes of the buffer to zero, as well as the error flag. */
-int wire_buf_reset(struct wire_buf *wb);
-
-/** Gets a pointer to the data marked as valid in the wire buffer. */
-static inline void *wire_buf_data(const struct wire_buf *wb)
-{
- return &wb->buf[wb->start];
-}
-
-/** Gets the length of the data marked as valid in the wire buffer. */
-static inline size_t wire_buf_data_length(const struct wire_buf *wb)
-{
- return wb->end - wb->start;
-}
-
-/** Gets a pointer to the free space after the valid data of the wire buffer. */
-static inline void *wire_buf_free_space(const struct wire_buf *wb)
-{
- return &wb->buf[wb->end];
-}
-
-/** Gets the length of the free space after the valid data of the wire buffer. */
-static inline size_t wire_buf_free_space_length(const struct wire_buf *wb)
-{
- return wb->size - wb->end;
-}
-
-
/** Indicates how a session sends data in the `wrap` direction and receives
* data in the `unwrap` direction. */
enum session2_transport_type {
queue_t(struct qr_task *) waiting; /**< List of tasks waiting for
* sending to upstream. */
+ uint32_t log_id; /**< Session ID for logging. */
+
int uv_count; /**< Number of unclosed libUV handles owned by this
* session. */
* Zero-initialized by default. */
struct comm_info comm;
- /** Managed buffer for data received by `io`. */
- struct wire_buf wire_buf;
-
/** Time of last IO activity (if any occurs). Otherwise session
* creation time. */
uint64_t last_activity;
#define EPHEMERAL_CERT_EXPIRATION_SECONDS_RENEW_BEFORE (60*60*24*7)
#define GNUTLS_PIN_MIN_VERSION 0x030400
#define UNWRAP_BUF_SIZE 16384
+#define TLS_CHUNK_SIZE (16 * 1024)
#define VERBOSE_MSG(cl_side, ...)\
if (cl_side) \
{
protolayer_globals[PROTOLAYER_TLS] = (struct protolayer_globals){
.sess_size = sizeof(struct pl_tls_sess_data),
- .sess_init = pl_tls_sess_init,
.sess_deinit = pl_tls_sess_deinit,
+ .wire_buf_overhead = TLS_CHUNK_SIZE,
+ .sess_init = pl_tls_sess_init,
.unwrap = pl_tls_unwrap,
.wrap = pl_tls_wrap,
.event_unwrap = pl_tls_event_unwrap,
struct qr_task *task = baton;
qr_task_on_send(task, session, status);
qr_task_unref(task);
- wire_buf_reset(&session->wire_buf);
}
static int qr_task_send(struct qr_task *task, struct session2 *session,
the_worker = NULL;
}
-static inline knot_pkt_t *produce_packet(char *buf, size_t buf_len)
+static inline knot_pkt_t *produce_packet(uint8_t *buf, size_t buf_len)
{
return knot_pkt_new(buf, buf_len, &the_worker->pkt_pool);
}
static enum protolayer_event_cb_result pl_dns_stream_connected(
struct session2 *session)
{
- if (session->connected)
+ if (kr_fails_assert(!session->connected))
return PROTOLAYER_EVENT_PROPAGATE;
session->connected = true;
return PROTOLAYER_EVENT_PROPAGATE;
}
-static knot_pkt_t *produce_stream_packet(struct session2 *session,
+static knot_pkt_t *stream_produce_packet(struct session2 *session,
struct wire_buf *wb,
bool *out_err)
{
return NULL;
}
- uint16_t pkt_len = knot_wire_read_u16(wire_buf_data(wb));
- if (pkt_len == 0) {
+ uint16_t msg_len = knot_wire_read_u16(wire_buf_data(wb));
+ if (msg_len == 0) {
*out_err = true;
return NULL;
}
- if (pkt_len >= wb->size) {
+ if (msg_len >= wb->size) {
*out_err = true;
return NULL;
}
- if (wire_buf_data_length(wb) < pkt_len + sizeof(uint16_t)) {
+ if (wire_buf_data_length(wb) < msg_len + sizeof(uint16_t)) {
return NULL;
}
+ uint8_t *wire = (uint8_t *)wire_buf_data(wb) + sizeof(uint16_t);
+
session->was_useful = true;
- wire_buf_trim(wb, sizeof(uint16_t));
- knot_pkt_t *pkt = produce_packet(wire_buf_data(wb), pkt_len);
+ knot_pkt_t *pkt = produce_packet(wire, msg_len);
*out_err = (pkt == NULL);
- wire_buf_trim(wb, pkt_len);
return pkt;
}
+static int stream_discard_packet(struct session2 *session,
+ struct wire_buf *wb,
+ const knot_pkt_t *pkt,
+ bool *out_err)
+{
+ *out_err = true;
+
+ if (kr_fails_assert(wire_buf_data_length(wb) >= sizeof(uint16_t))) {
+ wire_buf_reset(wb);
+ return kr_error(EINVAL);
+ }
+
+ size_t msg_size = knot_wire_read_u16(wire_buf_data(wb));
+ uint8_t *wire = (uint8_t *)wire_buf_data(wb) + sizeof(uint16_t);
+ if (kr_fails_assert(msg_size + sizeof(uint16_t) <= wire_buf_data_length(wb))) {
+ /* TCP message length field is greater then
+ * number of bytes in buffer, must not happen. */
+ wire_buf_reset(wb);
+ return kr_error(EINVAL);
+ }
+
+ if (kr_fails_assert(wire == pkt->wire)) {
+ /* packet wirebuf must be located at the beginning
+ * of the session wirebuf, must not happen. */
+ wire_buf_reset(wb);
+ return kr_error(EINVAL);
+ }
+
+ if (kr_fails_assert(msg_size >= pkt->size)) {
+ wire_buf_reset(wb);
+ return kr_error(EINVAL);
+ }
+
+ wire_buf_trim(wb, msg_size + sizeof(uint16_t));
+ *out_err = false;
+
+ if (wire_buf_data_length(wb) == 0) {
+ wire_buf_reset(wb);
+ } else if (wire_buf_data_length(wb) < KNOT_WIRE_HEADER_SIZE) {
+ wire_buf_movestart(wb);
+ }
+
+ return kr_ok();
+}
+
static enum protolayer_iter_cb_result pl_dns_stream_unwrap(
void *sess_data, void *iter_data, struct protolayer_iter_ctx *ctx)
{
bool pkt_error = false;
knot_pkt_t *pkt = NULL;
- while ((pkt = produce_stream_packet(session, wb, &pkt_error)) && iters < max_iters) {
+ while ((pkt = stream_produce_packet(session, wb, &pkt_error)) && iters < max_iters) {
if (kr_fails_assert(!pkt_error)) {
status = kr_error(EINVAL);
goto exit;
}
stream_sess->produced = true;
-
int ret = worker_submit(session, &ctx->comm, pkt);
- wire_buf_movestart(wb);
/* Errors from worker_submit() are intentionally *not* handled
* in order to ensure the entire wire buffer is processed. */
if (ret == kr_ok()) {
iters += 1;
}
+ if (stream_discard_packet(session, wb, pkt, &pkt_error) < 0) {
+ /* Packet data isn't stored in memory as expected.
+ * something went wrong, normally should not happen. */
+ break;
+ }
}
/* worker_submit() may cause the session to close (e.g. due to IO
/* DNS protocol layers */
protolayer_globals[PROTOLAYER_DNS_DGRAM] = (struct protolayer_globals){
+ .wire_buf_overhead = KNOT_WIRE_MAX_PKTSIZE,
.unwrap = pl_dns_dgram_unwrap,
.event_unwrap = pl_dns_dgram_event_unwrap
};
protolayer_globals[PROTOLAYER_DNS_UNSIZED_STREAM] = (struct protolayer_globals){
+ .wire_buf_overhead = KNOT_WIRE_MAX_PKTSIZE,
.sess_init = pl_dns_stream_sess_init,
.unwrap = pl_dns_dgram_unwrap,
.event_unwrap = pl_dns_stream_event_unwrap,
};
const struct protolayer_globals stream_common = {
.sess_size = sizeof(struct pl_dns_stream_sess_data),
- .sess_init = NULL, /* replaced in specific layers below */
.iter_size = sizeof(struct pl_dns_stream_iter_data),
+ .wire_buf_overhead = KNOT_WIRE_MAX_PKTSIZE,
+ .sess_init = NULL, /* replaced in specific layers below */
.iter_deinit = pl_dns_stream_iter_deinit,
.unwrap = pl_dns_stream_unwrap,
.wrap = pl_dns_stream_wrap,