return;
}
- if (session->buffering != NULL) {
+ if (!session->outgoing && session->buffering != NULL) {
qr_task_complete(session->buffering);
- session->buffering = NULL;
}
+ session->buffering = NULL;
uv_handle_t *handle = session->handle;
io_stop_read(handle);
struct worker_ctx *worker = ctx->worker;
/* Process source session. */
- if (source_session) {
- /* Walk the session task list and remove itself. */
- assert(source_session->outgoing == false);
- session_del_tasks(source_session, task);
+ if (source_session &&
+ source_session->tasks.len < worker->tcp_pipeline_max/2 &&
+ !source_session->closing && source_session->throttled) {
+ uv_handle_t *handle = source_session->handle;
/* Start reading again if the session is throttled and
* the number of outgoing requests is below watermark. */
- uv_handle_t *handle = source_session->handle;
- if (handle && source_session->tasks.len < worker->tcp_pipeline_max/2) {
- if (!uv_is_closing(handle) && source_session->throttled) {
- assert(source_session->closing == false);
- io_start_read(handle);
- source_session->throttled = false;
- }
+ if (handle) {
+ io_start_read(handle);
+ source_session->throttled = false;
}
}
* when resuming reading. This is NYI. */
if (session->tasks.len >= task->ctx->worker->tcp_pipeline_max) {
uv_handle_t *handle = session->handle;
- if (handle && !session->throttled && !uv_is_closing(handle)) {
+ if (handle && !session->throttled && !session->closing) {
io_stop_read(handle);
session->throttled = true;
}
if (task->on_complete) {
task->on_complete(worker, &ctx->req, task->baton);
}
+ struct session *source_session = ctx->source.session;
+ if (source_session) {
+ assert(source_session->outgoing == false &&
+ source_session->waiting.len == 0);
+ session_del_tasks(source_session, task);
+ }
/* Release primary reference to task. */
request_del_tasks(ctx, task);
}
/* This is called when we send subrequest / answer */
static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status)
{
- assert(handle == NULL || uv_is_closing(handle) == false);
if (task->finished) {
assert(task->leading == false);
qr_task_complete(task);
return status;
}
struct session* session = handle->data;
+ assert(session);
if (!session->outgoing ||
session->waiting.len == 0) {
return status;
}
}
-
- if (status == 0 && handle) {
+ if (handle) {
struct session* session = handle->data;
+ if (!session->outgoing && task->ctx->source.session) {
+ assert (task->ctx->source.session->handle == handle);
+ }
if (handle->type == UV_TCP && session->outgoing &&
session->waiting.len > 0) {
session_del_waiting(session, task);
+ if (session->closing) {
+ return status;
+ }
+ if (status != 0) {
+ /* Add to the end for retry */
+ session_add_waiting(session, task);
+ }
if (session->waiting.len > 0) {
struct qr_task *t = session->waiting.at[0];
int ret = qr_task_send(t, (uv_handle_t *)handle,
&session->peer.ip, t->pktbuf);
- if (ret != kr_ok()) {
+ if (ret == kr_ok()) {
+ uv_timer_t *timer = &session->timeout;
+ uv_timer_stop(timer);
+ session->timeout.data = session;
+ timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
+ } else {
uv_timer_t *timer = &session->timeout;
uv_timer_stop(timer);
while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
+ struct qr_task *t = session->waiting.at[0];
if (session->outgoing) {
- qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_finalize(t, KR_STATE_FAIL);
} else {
- assert(task->ctx->source.session == session);
- task->ctx->source.session = NULL;
+ assert(t->ctx->source.session == session);
+ t->ctx->source.session = NULL;
}
array_del(session->waiting, 0);
- qr_task_unref(task);
- session_del_tasks(session, task);
+ session_del_tasks(session, t);
+ qr_task_unref(t);
}
while (session->tasks.len > 0) {
- struct qr_task *task = session->tasks.at[0];
+ struct qr_task *t = session->tasks.at[0];
if (session->outgoing) {
- qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_finalize(t, KR_STATE_FAIL);
} else {
- assert(task->ctx->source.session == session);
- task->ctx->source.session = NULL;
+ assert(t->ctx->source.session == session);
+ t->ctx->source.session = NULL;
}
- session_del_tasks(session, task);
+ session_del_tasks(session, t);
}
session_close(session);
return status;
}
}
}
- if (!uv_is_closing(handle)) {
+ if (!session->closing) {
io_start_read(handle); /* Start reading new query */
}
}
struct worker_ctx *worker = loop->data;
assert(worker == get_worker());
struct qr_task *task = req->data;
- if (qr_valid_handle(task, handle)) {
- qr_task_on_send(task, handle, status);
- }
+ qr_task_on_send(task, handle, status);
qr_task_unref(task);
iorequest_release(worker, req);
}
struct worker_ctx *worker = loop->data;
assert(worker == get_worker());
struct qr_task *task = req->data;
- if (qr_valid_handle(task, handle)) {
- qr_task_on_send(task, handle, status);
- }
+ qr_task_on_send(task, handle, status);
qr_task_unref(task);
iorequest_release(worker, req);
}
if (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
ret = qr_task_send(task, session->handle, &peer->ip, task->pktbuf);
- if (ret == kr_ok()) {
- session->timeout.data = session;
- timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
- }
}
+ session->timeout.data = session;
+ timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
return ret;
}
uv_timer_stop(timer);
struct worker_ctx *worker = get_worker();
- worker_del_tcp_connected(worker, &session->peer.ip);
+ if (session->outgoing) {
+ worker_del_tcp_connected(worker, &session->peer.ip);
- while (session->waiting.len > 0) {
- struct qr_task *task = session->waiting.at[0];
- task->timeouts += 1;
- worker->stats.timeout += 1;
- array_del(session->waiting, 0);
- qr_task_unref(task);
- session_del_tasks(session, task);
- qr_task_finalize(task, KR_STATE_FAIL);
+ while (session->waiting.len > 0) {
+ struct qr_task *task = session->waiting.at[0];
+ task->timeouts += 1;
+ worker->stats.timeout += 1;
+ array_del(session->waiting, 0);
+ session_del_tasks(session, task);
+ qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
+ }
}
while (session->tasks.len > 0) {
worker->stats.timeout += 1;
assert(task->refs > 1);
array_del(session->tasks, 0);
- qr_task_unref(task);
qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_unref(task);
}
session_close(session);
static void on_session_idle_timeout(uv_timer_t *timer)
{
struct session *s = timer->data;
- assert(s && s->outgoing);
+ assert(s);
uv_timer_stop(timer);
if (s->closing) {
return;
uv_handle_t *ret = NULL;
if (task && task->addrlist && task->addrlist_count > 0) {
struct sockaddr_in6 *choice = &((struct sockaddr_in6 *)task->addrlist)[task->addrlist_turn];
+ if (!choice) {
+ return ret;
+ }
ret = ioreq_spawn(task, SOCK_DGRAM, choice->sin6_family);
if (ret &&
qr_task_send(task, ret, (struct sockaddr *)choice,
qry->secret = leader_qry->secret;
leader_qry->secret = 0; /* Next will be already decoded */
}
+ struct session *follower_source_session = follower->ctx->source.session;
qr_task_step(follower, packet_source, pkt);
qr_task_unref(follower);
}
if (ctx->source.session != NULL) {
uv_handle_t *handle = ctx->source.session->handle;
assert(ctx->source.session->closing == false);
- assert(handle->data == ctx->source.session);
+ assert(handle && handle->data == ctx->source.session);
assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
(void) qr_task_send(task, handle,
(struct sockaddr *)&ctx->source.addr,
*/
subreq_lead(task);
struct session *session = handle->data;
+ assert(session->handle->type == UV_UDP);
ret = timer_start(session, on_retransmit, timeout, 0);
/* Start next step with timeout, fatal if can't start a timer. */
if (ret != 0) {
session_del_waiting(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
tls_client_ctx_set_params(tls_ctx, entry);
if (uv_tcp_connect(conn, (uv_tcp_t *)client,
addr , on_connect) != 0) {
+ uv_timer_stop(&session->timeout);
session_del_tasks(session, task);
session_del_waiting(session, task);
worker_del_tcp_waiting(ctx->worker, addr);
iorequest_release(ctx->worker, conn);
+ subreq_finalize(task, packet_source, packet);
return qr_task_step(task, NULL, NULL);
}
}
}
/* Connection error or forced disconnect */
struct session *session = handle->data;
- assert(session);
+ assert(session && session->handle == (uv_handle_t *)handle && handle->type == UV_TCP);
if (session->closing) {
return kr_ok();
}
addr_str, sizeof(addr_str));
VERBOSE_MSG(NULL, "=> connection to '%s' closed by peer\n", addr_str);
}
-
uv_timer_t *timer = &session->timeout;
uv_timer_stop(timer);
struct sockaddr *peer = &session->peer.ip;
TLS_HS_NOT_STARTED);
}
+ if (session->outgoing && session->buffering) {
+ session->buffering = NULL;
+ }
+
+ assert(session->tasks.len >= session->waiting.len);
while (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
array_del(session->waiting, 0);
session_del_tasks(session, task);
}
session_close(session);
- return kr_error(ECONNRESET);
+ return kr_ok();
}
if (session->outgoing) {
uv_timer_stop(&session->timeout);
timer_start(session, on_tcp_watchdog_timeout, MAX_TCP_INACTIVITY, 0);
- if (session->bytes_to_skip) {
- session->buffering = NULL;
- ssize_t min_len = MIN(session->bytes_to_skip, len);
- len -= min_len;
- msg += min_len;
- session->bytes_to_skip -= min_len;
- if (len < 0 || session->bytes_to_skip < 0) {
- /* Something gone wrong.
- * Better kill the connection */
- assert(false);
- return kr_error(EILSEQ);
- }
- if (len == 0) {
- return kr_ok();
- }
- assert(session->bytes_to_skip == 0);
+ }
+
+ if (session->bytes_to_skip) {
+ assert(session->buffering == NULL);
+ ssize_t min_len = MIN(session->bytes_to_skip, len);
+ len -= min_len;
+ msg += min_len;
+ session->bytes_to_skip -= min_len;
+ if (len < 0 || session->bytes_to_skip < 0) {
+ /* Something gone wrong.
+ * Better kill the connection */
+ assert(false);
+ return kr_error(EILSEQ);
}
+ if (len == 0) {
+ return kr_ok();
+ }
+ assert(session->bytes_to_skip == 0);
}
int submitted = 0;
task->ctx->source.session = NULL;
}
array_del(session->waiting, 0);
- qr_task_unref(task);
session_del_tasks(session, task);
+ qr_task_unref(task);
}
while (session->tasks.len > 0) {
struct qr_task *task = session->tasks.at[0];
}
session_close(session);
- return kr_error(EILSEQ);
+ return kr_ok();
}
/* get task */
* that we will get multiple matches sooner or later (!) */
if (task) {
knot_pkt_clear(task->pktbuf);
+ assert(task->leading == false);
} else {
- session->buffering = NULL;
session->bytes_to_skip = msg_size - 2;
ssize_t min_len = MIN(session->bytes_to_skip, len);
len -= min_len;
return kr_error(EILSEQ);
}
if (len == 0) {
- return kr_ok();
+ return submitted;
}
assert(session->bytes_to_skip == 0);
int ret = worker_process_tcp(worker, handle, msg, len);
- submitted += ret;
+ if (ret < 0) {
+ submitted = ret;
+ } else {
+ submitted += ret;
+ }
return submitted;
}
}
knot_wire_set_id(pkt_buf->wire, msg_id);
pkt_buf->size = 2;
task->bytes_remaining = msg_size - 2;
+ assert(session->buffering == NULL);
session->buffering = task;
}
/* At this point session must have either created new task
if (pkt_buf->size + to_read > pkt_buf->max_size) {
// TODO reallocate pkt_buf
pkt_buf->size = 0;
+ len -= to_read;
+ msg += to_read;
session->bytes_to_skip = task->bytes_remaining - to_read;
task->bytes_remaining = 0;
- session->buffering = NULL;
+ if (session->buffering) {
+ if (!session->outgoing) {
+ qr_task_complete(session->buffering);
+ }
+ session->buffering = NULL;
+ }
+ if (len > 0) {
+ int ret = worker_process_tcp(worker, handle, msg, len);
+ if (ret < 0) {
+ submitted = ret;
+ } else {
+ submitted += ret;
+ }
+ }
return submitted;
}
/* Buffer message and check if it's complete */
memcpy(pkt_buf->wire + pkt_buf->size, msg, to_read);
pkt_buf->size += to_read;
task->bytes_remaining -= to_read;
+ len -= to_read;
+ msg += to_read;
if (task->bytes_remaining == 0) {
/* Message was assembled, clear temporary. */
session->buffering = NULL;
const struct sockaddr *addr = session->outgoing ? &session->peer.ip : NULL;
ret = qr_task_step(task, addr, pkt_buf);
}
- if (len - to_read > 0) {
+ if (len > 0) {
/* TODO: this is simple via iteration; recursion doesn't really help */
- ret = worker_process_tcp(worker, handle, msg + to_read, len - to_read);
+ ret = worker_process_tcp(worker, handle, msg, len);
if (ret < 0) {
assert(false);
return ret;
submitted += ret;
}
}
+ assert(submitted >= 0);
return submitted;
}