uint8_t *wire_buf; /**< Buffer for DNS message. */
ssize_t wire_buf_size; /**< Buffer size. */
- ssize_t wire_buf_idx; /**< The number of bytes in wire_buf filled so far. */
+ ssize_t wire_buf_start_idx; /**< Data start offset in wire_buf. */
+ ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
};
static void on_session_close(uv_handle_t *handle)
ssize_t session_wirebuf_consume(struct session *session, const uint8_t *data, ssize_t len)
{
- if (data != &session->wire_buf[session->wire_buf_idx]) {
+ if (data != &session->wire_buf[session->wire_buf_end_idx]) {
/* shouldn't happen */
return kr_error(EINVAL);
}
- if (session->wire_buf_idx + len > session->wire_buf_size) {
+ if (session->wire_buf_end_idx + len > session->wire_buf_size) {
/* shouldn't happen */
return kr_error(EINVAL);
}
- session->wire_buf_idx += len;
+ session->wire_buf_end_idx += len;
return len;
}
knot_pkt_t *session_produce_packet(struct session *session, knot_mm_t *mm)
{
session->sflags.wirebuf_error = false;
- if (session->wire_buf_idx == 0) {
+ if (session->wire_buf_end_idx == 0) {
+ return NULL;
+ }
+
+ if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
return NULL;
}
+ if (session->wire_buf_start_idx > session->wire_buf_end_idx) {
+ session->sflags.wirebuf_error = true;
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
+ return NULL;
+ }
+
const uv_handle_t *handle = session->handle;
- uint8_t *msg_start = session->wire_buf;
- uint16_t msg_size = session->wire_buf_idx;
+ uint8_t *msg_start = &session->wire_buf[session->wire_buf_start_idx];
+ ssize_t wirebuf_msg_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+ uint16_t msg_size = wirebuf_msg_data_size;
if (!handle) {
session->sflags.wirebuf_error = true;
return NULL;
} else if (handle->type == UV_TCP) {
- if (session->wire_buf_idx < 2) {
+ if (msg_size < 2) {
return NULL;
}
- msg_size = knot_wire_read_u16(session->wire_buf);
- if (msg_size + 2 > session->wire_buf_idx) {
+ msg_size = knot_wire_read_u16(msg_start);
+ if (msg_size + 2 > wirebuf_msg_data_size) {
session->sflags.wirebuf_error = false;
return NULL;
}
int session_discard_packet(struct session *session, const knot_pkt_t *pkt)
{
uv_handle_t *handle = session->handle;
- uint8_t *wirebuf_data_start = session->wire_buf;
- size_t wirebuf_msg_data_size = session->wire_buf_idx;
- uint8_t *wirebuf_msg_start = session->wire_buf;
- size_t wirebuf_msg_size = session->wire_buf_idx;
+ /* Pointer to data start in wire_buf */
+ uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx];
+ /* Number of data bytes in wire_buf */
+ size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+ /* Pointer to message start in wire_buf */
+ uint8_t *wirebuf_msg_start = wirebuf_data_start;
+ /* Number of message bytes in wire_buf.
+ * For UDP it is the same number as wirebuf_data_size. */
+ size_t wirebuf_msg_size = wirebuf_data_size;
+ /* Wire data from parsed packet. */
uint8_t *pkt_msg_start = pkt->wire;
+ /* Number of bytes in packet wire buffer. */
size_t pkt_msg_size = pkt->size;
- if (pkt->tsig_rr) {
+ if (knot_pkt_has_tsig(pkt)) {
pkt_msg_size += pkt->tsig_wire.len;
}
session->sflags.wirebuf_error = true;
+
if (!handle) {
return kr_error(EINVAL);
- } else if (handle->type == UV_UDP) {
- /* Fast check for UDP */
- if (wirebuf_msg_start != pkt_msg_start) {
+ } else if (handle->type == UV_TCP) {
+ /* wire_buf contains TCP DNS message. */
+ if (wirebuf_data_size < 2) {
+ /* TCP message length field isn't in buffer, must not happen. */
+ assert(0);
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
+ return kr_error(EINVAL);
+ }
+ wirebuf_msg_size = knot_wire_read_u16(wirebuf_msg_start);
+ wirebuf_msg_start += 2;
+ if (wirebuf_msg_size + 2 > wirebuf_data_size) {
+ /* TCP message length field is greater then
+ * number of bytes in buffer, must not happen. */
+ assert(0);
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
- session->wire_buf_idx = 0;
- session->sflags.wirebuf_error = false;
- return kr_ok();
- }
-
- if (session->wire_buf_idx < 2) {
- return kr_error(EINVAL);
}
- wirebuf_msg_size = knot_wire_read_u16(wirebuf_data_start);
- wirebuf_msg_start += 2;
- wirebuf_msg_data_size = wirebuf_msg_size + 2;
if (wirebuf_msg_start != pkt_msg_start) {
+ /* packet wirebuf must be located at the beginning
+ * of the session wirebuf, must not happen. */
+ assert(0);
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
-
- if (wirebuf_msg_size != pkt_msg_size) {
+ if (wirebuf_msg_size < pkt_msg_size) {
+ /* Message length field is lesser then packet size,
+ * must not happen. */
+ assert(0);
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
return kr_error(EINVAL);
}
- if (wirebuf_msg_data_size > session->wire_buf_idx) {
- return kr_error(EINVAL);
+ if (handle->type == UV_TCP) {
+ session->wire_buf_start_idx += wirebuf_msg_size + 2;
+ } else {
+ session->wire_buf_start_idx += pkt_msg_size;
}
+ session->sflags.wirebuf_error = false;
- uint16_t wirebuf_data_amount = session->wire_buf_idx - wirebuf_msg_data_size;
- if (wirebuf_data_amount) {
- if (wirebuf_msg_data_size < wirebuf_data_amount) {
- memmove(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
- wirebuf_data_amount);
- } else {
- memcpy(wirebuf_data_start, &wirebuf_data_start[wirebuf_msg_data_size],
- wirebuf_data_amount);
- }
+ if (wirebuf_data_size == 0) {
+ session_wirebuf_discard(session);
+ } else if (wirebuf_data_size < KNOT_WIRE_HEADER_SIZE) {
+ session_wirebuf_compress(session);
}
- session->wire_buf_idx = wirebuf_data_amount;
- session->sflags.wirebuf_error = false;
-
return kr_ok();
}
+void session_wirebuf_discard(struct session *session)
+{
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = 0;
+}
+
+void session_wirebuf_compress(struct session *session)
+{
+ if (session->wire_buf_start_idx == 0) {
+ return;
+ }
+ uint8_t *wirebuf_data_start = &session->wire_buf[session->wire_buf_start_idx];
+ size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+ if (session->wire_buf_start_idx < wirebuf_data_size) {
+ memmove(session->wire_buf, wirebuf_data_start, wirebuf_data_size);
+ } else {
+ memcpy(session->wire_buf, wirebuf_data_start, wirebuf_data_size);
+ }
+ session->wire_buf_start_idx = 0;
+ session->wire_buf_end_idx = wirebuf_data_size;
+}
+
bool session_wirebuf_error(struct session *session)
{
return session->sflags.wirebuf_error;
size_t session_wirebuf_get_len(struct session *session)
{
- return session->wire_buf_idx;
+ return session->wire_buf_end_idx;
}
size_t session_wirebuf_get_size(struct session *session)
uint8_t *session_wirebuf_get_free_start(struct session *session)
{
- return &session->wire_buf[session->wire_buf_idx];
+ return &session->wire_buf[session->wire_buf_end_idx];
}
size_t session_wirebuf_get_free_size(struct session *session)
{
- return session->wire_buf_size - session->wire_buf_idx;
+ return session->wire_buf_size - session->wire_buf_end_idx;
}
void session_poison(struct session *session)
int session_wirebuf_process(struct session *session)
{
int ret = 0;
- if (session->wire_buf_idx == 0) {
+ if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
return ret;
}
struct worker_ctx *worker = session_get_handle(session)->loop->data;
}
ret += 1;
}
- assert(ret < 100);
if (session_wirebuf_error(session)) {
ret = -1;
}
worker->stats.concurrent -= 1;
}
+/*@ Register new qr_task within session. */
+static int qr_task_register(struct qr_task *task, struct session *session)
+{
+ assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
+
+ session_tasklist_add(session, task);
+
+ struct request_ctx *ctx = task->ctx;
+ assert(ctx && (ctx->source.session == NULL || ctx->source.session == session));
+ ctx->source.session = session;
+ /* Soft-limit on parallel queries, there is no "slow down" RCODE
+ * that we could use to signalize to client, but we can stop reading,
+ * an in effect shrink TCP window size. To get more precise throttling,
+ * we would need to copy remainder of the unread buffer and reassemble
+ * when resuming reading. This is NYI. */
+ if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
+ uv_handle_t *handle = session_get_handle(session);
+ if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
+ io_stop_read(handle);
+ session_set_throttled(session, true);
+ }
+ }
+
+ return 0;
+}
+
static void qr_task_complete(struct qr_task *task)
{
struct request_ctx *ctx = task->ctx;
}
/* Update statistics */
- if (ctx->source.session &&
- handle != session_get_handle(ctx->source.session) &&
- addr) {
+ if (session_is_outgoing(session) && addr) {
if (session_has_tls(session))
worker->stats.tls += 1;
else if (handle->type == UV_UDP)
else if (addr->sa_family == AF_INET)
worker->stats.ipv4 += 1;
}
-
return ret;
}
request_free(ctx);
return kr_error(ENOMEM);
}
+
+ if (handle->type == UV_TCP && qr_task_register(task, session)) {
+ return kr_error(ENOMEM);
+ }
} else if (query) { /* response from upstream */
if ((ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
!knot_wire_get_qr(query->wire)) {
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
- qr_task_unref(task);
}
while (!session_tasklist_is_empty(session)) {
struct qr_task *task = session_tasklist_get_first(session);