int res = 0;
if (session->outgoing && session->peer.ip.sa_family != AF_UNSPEC &&
- session->tasks.len == 0 && session->waiting.len == 0 &&
- session->connected && !session->closing) {
+ session->tasks.len == 0 && session->waiting.len == 0 && !session->closing) {
assert(session->peer.ip.sa_family == AF_INET ||
session->peer.ip.sa_family == AF_INET6);
- /* This is outbound TCP connection which can be reused.
- * Close it after timeout */
- uv_timer_t *timer = &session->timeout;
- timer->data = session;
- uv_timer_stop(timer);
- res = uv_timer_start(timer, on_session_idle_timeout,
- KR_CONN_RTT_MAX, 0);
+ res = 1;
+ if (session->connected) {
+ /* This is outbound TCP connection which can be reused.
+ * Close it after timeout */
+ uv_timer_t *timer = &session->timeout;
+ timer->data = session;
+ uv_timer_stop(timer);
+ res = uv_timer_start(timer, on_session_idle_timeout,
+ KR_CONN_RTT_MAX, 0);
+ }
}
if (res != 0) {
struct qr_task *task = session->waiting.at[0];
session_del_tasks(session, task);
array_del(session->waiting, 0);
- /* TODO fixme
- * Daemon should not have direct access to rplan */
- struct request_ctx *ctx = task->ctx;
- assert(ctx);
- struct kr_request *req = &ctx->req;
- struct kr_rplan *rplan = &req->rplan;
- struct kr_query *qry = array_tail(rplan->pending);
- /* Prevent from KR_STATE_FAIL in kr_resolve_consume() */
- qry->flags.TCP = false;
- qr_task_step(task, task->addrlist, NULL);
+ assert(task->refs > 1);
qr_task_unref(task);
+ qr_task_step(task, NULL, NULL);
}
assert(session->tasks.len == 0);
iorequest_release(worker, req);
return;
}
+ WITH_VERBOSE {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip),
+ addr_str, sizeof(addr_str));
+ VERBOSE_MSG(NULL, "=> connected to '%s'\n", addr_str);
+ }
+
session->connected = true;
session->handle = (uv_handle_t *)handle;
assert (session->waiting.len == session->tasks.len);
+ union inaddr *peer = &session->peer;
+ worker_del_tcp_waiting(worker, &peer->ip);
+
+ WITH_VERBOSE {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(peer->ip.sa_family, kr_inaddr(&peer->ip), addr_str, sizeof(addr_str));
+ VERBOSE_MSG(NULL, "=> connection to '%s' failed\n", addr_str);
+ }
+
while (session->waiting.len > 0) {
struct qr_task *task = session->waiting.at[0];
struct request_ctx *ctx = task->ctx;
+ assert(ctx);
task->timeouts += 1;
worker->stats.timeout += 1;
session_del_tasks(session, task);
array_del(session->waiting, 0);
+ assert(task->refs > 1);
qr_task_unref(task);
- qr_task_finalize(task, KR_STATE_FAIL);
+ qr_task_step(task, NULL, NULL);
}
assert (session->tasks.len == 0);
return qr_task_finalize(task, KR_STATE_FAIL);
}
+ WITH_VERBOSE {
+ char addr_str[INET6_ADDRSTRLEN];
+ inet_ntop(session->peer.ip.sa_family, kr_inaddr(&session->peer.ip), addr_str, sizeof(addr_str));
+ VERBOSE_MSG(NULL, "=> connecting to: '%s'\n", addr_str);
+ }
+
if (uv_tcp_connect(conn, (uv_tcp_t *)client,
addr , on_connect) != 0) {
session_del_tasks(session, task);
}
assert(session->closing == false);
}
- fflush(stdout);
assert(uv_is_closing(session->handle) == false);
/* Consume input and produce next message */
struct qr_task *task = session->waiting.at[0];
array_del(session->waiting, 0);
assert(task->refs > 1);
- qr_task_unref(task);
+ session_del_tasks(session, task);
if (session->outgoing) {
- /* TODO fixme
- * Daemon should not have direct access to rplan */
- struct request_ctx *ctx = task->ctx;
- assert(ctx);
- struct kr_request *req = &ctx->req;
- struct kr_rplan *rplan = &req->rplan;
- struct kr_query *qry = array_tail(rplan->pending);
- /* Prevent from KR_STATE_FAIL in kr_resolve_consume() */
- qry->flags.TCP = false;
- qr_task_step(task, task->addrlist, NULL);
+ qr_task_step(task, NULL, NULL);
} else {
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;
}
- session_del_tasks(session, task);
+ qr_task_unref(task);
}
while (session->tasks.len > 0) {
struct qr_task *task = session->tasks.at[0];
if (session->outgoing) {
- /* TODO fixme
- * Daemon should not have direct access to rplan */
- struct request_ctx *ctx = task->ctx;
- assert(ctx);
- struct kr_request *req = &ctx->req;
- struct kr_rplan *rplan = &req->rplan;
- struct kr_query *qry = array_tail(rplan->pending);
- /* Prevent from KR_STATE_FAIL in kr_resolve_consume() */
- qry->flags.TCP = false;
- qr_task_step(task, task->addrlist, NULL);
+ qr_task_step(task, NULL, NULL);
} else {
assert(task->ctx->source.session == session);
task->ctx->source.session = NULL;