int ret = session_wirebuf_process(s);
if (ret < 0) {
+ /* An error has occurred, close the session. */
worker_end_tcp(s);
- /* Exceeded per-connection quota for outstanding requests
- * stop reading from stream and close after last message is processed. */
- uv_timer_t *t = session_get_timer(s);
- if (!session_flags(s)->outgoing && !uv_is_closing((uv_handle_t *)t)) {
- uv_timer_stop(t);
- if (session_tasklist_is_empty(s)) {
- session_close(s);
- } else { /* If there are tasks running, defer until they finish. */
- uv_timer_start(t, tcp_timeout_trigger,
- MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
- }
- }
- /* Connection spawned at least one request, reset its deadline for next query.
- * https://tools.ietf.org/html/rfc7766#section-6.2.3 */
} else if (ret > 0 && !session_flags(s)->closing) {
+ /* Connection spawned at least one request
+ * or
+ * valid answer has been received from upstream.
+ * Reset deadline for next query.
+ * https://tools.ietf.org/html/rfc7766#section-6.2.3
+ */
session_timer_restart(s);
}
session_wirebuf_compress(s);
session_tls_set_server_ctx(s, ctx);
}
}
- uv_timer_t *t = session_get_timer(s);
- uv_timer_start(t, tcp_timeout_trigger, timeout, idle_in_timeout);
+ session_timer_start(s, tcp_timeout_trigger, timeout, idle_in_timeout);
io_start_read((uv_handle_t *)client);
}
return ret;
}
struct session *s = session_new(handle);
- assert(s);
- uv_timer_t *t = session_get_timer(s);
- t->data = s;
- return uv_timer_init(loop, t);
+ if (s == NULL) {
+ ret = -1;
+ }
+ return ret;
}
void io_deinit(uv_handle_t *handle)
session->wire_buf = worker->wire_buf;
session->wire_buf_size = sizeof(worker->wire_buf);
}
-
+
+ uv_timer_init(handle->loop, &session->timeout);
+
session->handle = handle;
handle->data = session;
- return session;
-}
+ session->timeout.data = session;
-uv_timer_t *session_get_timer(struct session *session)
-{
- return &session->timeout;
+ return session;
}
size_t session_tasklist_get_len(const struct session *session)
return NULL;
}
msg_size = knot_wire_read_u16(msg_start);
+ if (msg_size >= session->wire_buf_size) {
+ session->sflags.wirebuf_error = true;
+ return NULL;
+ }
if (msg_size + 2 > wirebuf_msg_data_size) {
- session->sflags.wirebuf_error = false;
return NULL;
}
msg_start += 2;
}
session->sflags.wirebuf_error = false;
+ wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
if (wirebuf_data_size == 0) {
session_wirebuf_discard(session);
} else if (wirebuf_data_size < KNOT_WIRE_HEADER_SIZE) {
return ret;
}
struct worker_ctx *worker = session_get_handle(session)->loop->data;
+ size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
+ uint32_t max_iterations = (wirebuf_data_size / (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
knot_pkt_t *query = NULL;
- while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) && (ret < 100)) {
+ while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) &&
+ (ret < max_iterations)) {
assert (!session_wirebuf_error(session));
- worker_submit(session, query);
+ int res = worker_submit(session, query);
+ if (res != kr_error(EILSEQ)) {
+ /* Packet has been successfully parsed. */
+ ret += 1;
+ }
if (session_discard_packet(session, query) < 0) {
+ /* Packet data isn't stored in memory as expected.
+ something went wrong, normally should not happen. */
break;
}
- ret += 1;
}
if (session_wirebuf_error(session)) {
ret = -1;
/** Get pointer to underlying libuv handle for IO operations. */
uv_handle_t *session_get_handle(struct session *session);
-/** Get pointer to session timer handle. */
-uv_timer_t *session_get_timer(struct session *session);
/** Start session timer. */
int session_timer_start(struct session *session, uv_timer_cb cb,
uint64_t timeout, uint64_t repeat);
assert(session_tasklist_is_empty(session));
session_close(session);
} else {
- uv_timer_t *t = session_get_timer(session);
- uv_timer_stop(t);
- t->data = session;
+ session_timer_stop(session);
session_timer_start(session, on_tcp_watchdog_timeout,
MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
return;
}
- uv_timer_t *t = session_get_timer(session);
- uv_timer_stop(t);
+ session_timer_stop(session);
if (status != 0) {
worker_del_tcp_waiting(worker, peer);
struct sockaddr *addr = NULL;
if (!session_flags(session)->outgoing) { /* request from a client */
/* Ignore badly formed queries. */
- if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
+ if (!query || (ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
+ knot_wire_get_qr(query->wire)) {
if (query) worker->stats.dropped += 1;
return kr_error(EILSEQ);
}