* guaranteed to be unchanged only for the duration of
* udp_read() and tcp_read().
*/
- struct session *session = handle->data;
- if (!session_has_tls(session)) {
- buf->base = (char *) session_wirebuf_get_free_start(session);
- buf->len = session_wirebuf_get_free_size(session);
+ struct session *s = handle->data;
+ if (!session_flags(s)->has_tls) {
+ buf->base = (char *) session_wirebuf_get_free_start(s);
+ buf->len = session_wirebuf_get_free_size(s);
} else {
- struct tls_common_ctx *ctx = session_tls_get_common_ctx(session);
+ struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
buf->base = (char *) ctx->recv_buf;
buf->len = sizeof(ctx->recv_buf);
}
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
struct session *s = handle->data;
- if (session_is_closing(s)) {
+ if (session_flags(s)->closing) {
return;
}
if (nread <= 0) {
return;
}
struct sockaddr *peer = session_get_peer(s);
- if (session_is_outgoing(s)) {
+ if (session_flags(s)->outgoing) {
assert(peer->sa_family != AF_UNSPEC);
if (kr_sockaddr_cmp(peer, addr) != 0) {
return;
} else {
memcpy(peer, addr, kr_sockaddr_len(addr));
}
- ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread);
+ ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base,
+ nread);
assert(consumed == nread);
session_wirebuf_process(s);
session_wirebuf_discard(s);
/* Handle is already created, just create context. */
struct session *s = session_new();
assert(s);
- session_set_outgoing(s, false);
+ session_flags(s)->outgoing = false;
session_set_handle(s, handle);
return io_start_read(handle);
}
{
struct session *s = timer->data;
- assert(session_is_outgoing(s) == false);
+ assert(!session_flags(s)->outgoing);
if (!session_tasklist_is_empty(s)) {
uv_timer_again(timer);
- } else if (!session_is_closing(s)) {
+ } else if (!session_flags(s)->closing) {
uv_timer_stop(timer);
session_close(s);
}
assert(s && session_get_handle(s) == (uv_handle_t *)handle &&
handle->type == UV_TCP);
- if (session_is_closing(s)) {
+ if (session_flags(s)->closing) {
return;
}
ssize_t consumed = 0;
const uint8_t *data = (const uint8_t *)buf->base;
ssize_t data_len = nread;
- if (session_has_tls(s)) {
+ if (session_flags(s)->has_tls) {
/* buf->base points to start of the tls receive buffer.
Decode data free space in session wire buffer. */
consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
/* Exceeded per-connection quota for outstanding requests
* stop reading from stream and close after last message is processed. */
uv_timer_t *t = session_get_timer(s);
- if (!session_is_outgoing(s) && !uv_is_closing((uv_handle_t *)t)) {
+ if (!session_flags(s)->outgoing && !uv_is_closing((uv_handle_t *)t)) {
uv_timer_stop(t);
if (session_tasklist_is_empty(s)) {
session_close(s);
}
/* Connection spawned at least one request, reset its deadline for next query.
* https://tools.ietf.org/html/rfc7766#section-6.2.3 */
- } else if (ret > 0 && !session_is_closing(s)) {
+ } else if (ret > 0 && !session_flags(s)->closing) {
session_timer_restart(s);
}
session_wirebuf_compress(s);
/* struct session was allocated \ borrowed from memory pool. */
struct session *session = client->data;
- assert(session_is_outgoing(session) == false);
+ assert(session_flags(session)->outgoing == false);
if (uv_accept(master, client) != 0) {
/* close session, close underlying uv handles and
* It will re-check every half of a request time limit if the connection
* is idle and should be terminated, this is an educated guess. */
struct session *s = client->data;
- assert(session_is_outgoing(s) == false);
+ assert(session_flags(s)->outgoing == false);
struct sockaddr *peer = session_get_peer(s);
int peer_len = sizeof(union inaddr);
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t timeout = KR_CONN_RTT_MAX / 2;
- session_set_has_tls(s, tls);
+ session_flags(s)->has_tls = tls;
if (tls) {
timeout += TLS_MAX_HANDSHAKE_TIME;
struct tls_ctx_t *ctx = session_tls_get_server_ctx(s);
/* Set current handle as a subrequest type. */
struct session *session = handle->data;
if (ret == 0) {
- session_set_outgoing(session, true);
+ session_flags(session)->outgoing = true;
ret = session_tasklist_add(session, task);
}
if (ret < 0) {
array_init(ctx->tasks);
struct session *s = handle ? handle->data : NULL;
if (s) {
- assert(session_is_outgoing(s) == false);
+ assert(session_flags(s)->outgoing == false);
}
ctx->source.session = s;
/* Process source session. */
if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
- !session_is_closing(s) && !session_is_throttled(s)) {
+ !session_flags(s)->closing && !session_flags(s)->throttled) {
uv_handle_t *handle = session_get_handle(s);
/* Start reading again if the session is throttled and
* the number of outgoing requests is below watermark. */
if (handle) {
io_start_read(handle);
- session_set_throttled(s, false);
+ session_flags(s)->throttled = false;
}
}
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session *session)
{
- assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
+ assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP);
session_tasklist_add(session, task);
* when resuming reading. This is NYI. */
if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
uv_handle_t *handle = session_get_handle(session);
- if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
+ if (handle && !session_flags(session)->throttled && !session_flags(session)->closing) {
io_stop_read(handle);
- session_set_throttled(session, true);
+ session_flags(session)->throttled = true;
}
}
struct session *s = ctx->source.session;
if (s) {
- assert(!session_is_outgoing(s) && session_waitinglist_is_empty(s));
+ assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
session_tasklist_del(s, task);
}
}
struct session* s = handle->data;
assert(s);
- if (!session_is_outgoing(s) || session_waitinglist_is_empty(s)) {
+ if (!session_flags(s)->outgoing || session_waitinglist_is_empty(s)) {
return status;
}
}
if (handle) {
struct session* s = handle->data;
- bool outgoing = session_is_outgoing(s);
+ bool outgoing = session_flags(s)->outgoing;
if (!outgoing) {
struct session* source_s = task->ctx->source.session;
if (source_s) {
}
if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
session_waitinglist_del(s, task);
- if (session_is_closing(s)) {
+ if (session_flags(s)->closing) {
return status;
}
/* Finalize the task, if any errors.
}
}
}
- if (!session_is_closing(s)) {
+ if (!session_flags(s)->closing) {
io_start_read(handle); /* Start reading new query */
}
}
/* Send using given protocol */
struct session *session = handle->data;
- assert(!session_is_closing(session));
- if (session_has_tls(session)) {
+ assert(!session_flags(session)->closing);
+ if (session_flags(session)->has_tls) {
uv_write_t *write_req = (uv_write_t *)ioreq;
write_req->data = task;
ret = tls_write(write_req, handle, pkt, &on_task_write);
}
/* Update statistics */
- if (session_is_outgoing(session) && addr) {
- if (session_has_tls(session))
+ if (session_flags(session)->outgoing && addr) {
+ if (session_flags(session)->has_tls)
worker->stats.tls += 1;
else if (handle->type == UV_UDP)
worker->stats.udp += 1;
static int session_tls_hs_cb(struct session *session, int status)
{
- assert(session_is_outgoing(session));
+ assert(session_flags(session)->outgoing);
uv_handle_t *handle = session_get_handle(session);
uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
struct session *session = handle->data;
struct sockaddr *peer = session_get_peer(session);
- assert(session_is_outgoing(session));
+ assert(session_flags(session)->outgoing);
if (status == UV_ECANCELED) {
worker_del_tcp_waiting(worker, peer);
- assert(session_is_empty(session) && session_is_closing(session));
+ assert(session_is_empty(session) && session_flags(session)->closing);
iorequest_release(worker, req);
return;
}
- if (session_is_closing(session)) {
+ if (session_flags(session)->closing) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session));
iorequest_release(worker, req);
return;
}
- if (!session_has_tls(session)) {
+ if (!session_flags(session)->has_tls) {
/* if there is a TLS, session still waiting for handshake,
* otherwise remove it from waiting list */
if (worker_del_tcp_waiting(worker, peer) != 0) {
VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
}
- session_set_connected(session, true);
+ session_flags(session)->connected = true;
int ret = kr_ok();
- if (session_has_tls(session)) {
+ if (session_flags(session)->has_tls) {
struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
if (ret == kr_error(EAGAIN)) {
struct worker_ctx *worker = timer->loop->data;
struct sockaddr *peer = session_get_peer(session);
- assert(session_is_outgoing(session));
+ assert(session_flags(session)->outgoing);
uv_timer_stop(timer);
- if (session_has_tls(session)) {
+ if (session_flags(session)->has_tls) {
worker_del_tcp_waiting(worker, peer);
}
struct sockaddr *addr = (struct sockaddr *)choice;
struct session *session = ret->data;
struct sockaddr *peer = session_get_peer(session);
- assert (peer->sa_family == AF_UNSPEC && session_is_outgoing(session));
+ assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
memcpy(peer, addr, kr_sockaddr_len(addr));
if (qr_task_send(task, ret, (struct sockaddr *)choice,
task->pktbuf) == 0) {
/* Send back answer */
struct session *source_session = ctx->source.session;
uv_handle_t *handle = session_get_handle(source_session);
- assert(!session_is_closing(source_session));
+ assert(!session_flags(source_session)->closing);
assert(handle && handle->data == ctx->source.session);
assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
int res = qr_task_send(task, handle,
task->addrlist = NULL;
task->addrlist_count = 0;
task->addrlist_turn = 0;
- req->has_tls = (ctx->source.session && session_has_tls(ctx->source.session));
+ req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls);
if (worker->too_many_open) {
struct kr_rplan *rplan = &req->rplan;
}
struct session* session = NULL;
if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
- assert(session_is_outgoing(session));
- if (session_is_closing(session)) {
+ assert(session_flags(session)->outgoing);
+ if (session_flags(session)->closing) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
task->pending_count += 1;
} else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
/* Connection has been already established */
- assert(session_is_outgoing(session));
- if (session_is_closing(session)) {
+ assert(session_flags(session)->outgoing);
+ if (session_flags(session)->closing) {
session_tasklist_del(session, task);
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
tls_client_ctx_set_session(tls_ctx, session);
session_tls_set_client_ctx(session, tls_ctx);
- session_set_has_tls(session, true);
+ session_flags(session)->has_tls = true;
}
conn->data = session;
* or resume if this is subrequest */
struct qr_task *task = NULL;
struct sockaddr *addr = NULL;
- if (!session_is_outgoing(session)) { /* request from a client */
+ if (!session_flags(session)->outgoing) { /* request from a client */
/* Ignore badly formed queries. */
if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
if (query) worker->stats.dropped += 1;
if (task == NULL) {
return kr_error(ENOENT);
}
- assert(!session_is_closing(session));
+ assert(!session_flags(session)->closing);
addr = session_get_peer(session);
}
assert(uv_is_closing(session_get_handle(session)) == false);
struct worker_ctx *worker = handle->loop->data;
struct sockaddr *peer = session_get_peer(session);
worker_del_tcp_connected(worker, peer);
- session_set_connected(session, false);
+ session_flags(session)->connected = false;
struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
if (tls_client_ctx) {
session_waitinglist_del_index(session, 0);
assert(task->refs > 1);
session_tasklist_del(session, task);
- if (session_is_outgoing(session)) {
+ if (session_flags(session)->outgoing) {
if (task->ctx->req.options.FORWARD) {
/* We are in TCP_FORWARD mode.
* To prevent failing at kr_resolve_consume()
while (!session_tasklist_is_empty(session)) {
struct qr_task *task = session_tasklist_get_first(session);
session_tasklist_del_index(session, 0);
- if (session_is_outgoing(session)) {
+ if (session_flags(session)->outgoing) {
if (task->ctx->req.options.FORWARD) {
struct kr_request *req = &task->ctx->req;
struct kr_rplan *rplan = &req->rplan;