]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/session: minor refactoring around session flags to reduce number of api functions
authorGrigorii Demidov <grigorii.demidov@nic.cz>
Wed, 19 Sep 2018 12:39:39 +0000 (14:39 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Fri, 12 Oct 2018 15:36:43 +0000 (17:36 +0200)
daemon/io.c
daemon/session.c
daemon/session.h
daemon/tls.c
daemon/worker.c

index aedaa9dd950f388bdea1e2bbf3e02efc0349a9f6..09719f946d9634f49a9fe8431936977fb2c70229 100644 (file)
@@ -59,12 +59,12 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t*
         * guaranteed to be unchanged only for the duration of
         * udp_read() and tcp_read().
         */
-       struct session *session = handle->data;
-       if (!session_has_tls(session)) {
-               buf->base = (char *) session_wirebuf_get_free_start(session);
-               buf->len = session_wirebuf_get_free_size(session);
+       struct session *s = handle->data;
+       if (!session_flags(s)->has_tls) {
+               buf->base = (char *) session_wirebuf_get_free_start(s);
+               buf->len = session_wirebuf_get_free_size(s);
        } else {
-               struct tls_common_ctx *ctx = session_tls_get_common_ctx(session);
+               struct tls_common_ctx *ctx = session_tls_get_common_ctx(s);
                buf->base = (char *) ctx->recv_buf;
                buf->len = sizeof(ctx->recv_buf);
        }
@@ -76,7 +76,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
        struct session *s = handle->data;
-       if (session_is_closing(s)) {
+       if (session_flags(s)->closing) {
                return;
        }
        if (nread <= 0) {
@@ -89,7 +89,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
                return;
        }
        struct sockaddr *peer = session_get_peer(s);
-       if (session_is_outgoing(s)) {
+       if (session_flags(s)->outgoing) {
                assert(peer->sa_family != AF_UNSPEC);
                if (kr_sockaddr_cmp(peer, addr) != 0) {
                        return;
@@ -97,7 +97,8 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
        } else {
                memcpy(peer, addr, kr_sockaddr_len(addr));
        }
-       ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread);
+       ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base,
+                                                  nread);
        assert(consumed == nread);
        session_wirebuf_process(s);
        session_wirebuf_discard(s);
@@ -110,7 +111,7 @@ static int udp_bind_finalize(uv_handle_t *handle)
        /* Handle is already created, just create context. */
        struct session *s = session_new();
        assert(s);
-       session_set_outgoing(s, false);
+       session_flags(s)->outgoing = false;
        session_set_handle(s, handle);
        return io_start_read(handle);
 }
@@ -145,10 +146,10 @@ static void tcp_timeout_trigger(uv_timer_t *timer)
 {
        struct session *s = timer->data;
 
-       assert(session_is_outgoing(s) == false);
+       assert(!session_flags(s)->outgoing);
        if (!session_tasklist_is_empty(s)) {
                uv_timer_again(timer);
-       } else if (!session_is_closing(s)) {
+       } else if (!session_flags(s)->closing) {
                uv_timer_stop(timer);
                session_close(s);
        }
@@ -162,7 +163,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
        assert(s && session_get_handle(s) == (uv_handle_t *)handle &&
               handle->type == UV_TCP); 
 
-       if (session_is_closing(s)) {
+       if (session_flags(s)->closing) {
                return;
        }
 
@@ -190,7 +191,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
        ssize_t consumed = 0;
        const uint8_t *data = (const uint8_t *)buf->base;
        ssize_t data_len = nread;
-       if (session_has_tls(s)) {
+       if (session_flags(s)->has_tls) {
                /* buf->base points to start of the tls receive buffer.
                   Decode data free space in session wire buffer. */
                consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread);
@@ -209,7 +210,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
                /* Exceeded per-connection quota for outstanding requests
                 * stop reading from stream and close after last message is processed. */
                uv_timer_t *t = session_get_timer(s);
-               if (!session_is_outgoing(s) && !uv_is_closing((uv_handle_t *)t)) {
+               if (!session_flags(s)->outgoing && !uv_is_closing((uv_handle_t *)t)) {
                        uv_timer_stop(t);
                        if (session_tasklist_is_empty(s)) {
                                session_close(s);
@@ -220,7 +221,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
                }
        /* Connection spawned at least one request, reset its deadline for next query.
         * https://tools.ietf.org/html/rfc7766#section-6.2.3 */
-       } else if (ret > 0 && !session_is_closing(s)) {
+       } else if (ret > 0 && !session_flags(s)->closing) {
                session_timer_restart(s);
        }
        session_wirebuf_compress(s);
@@ -254,7 +255,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
 
        /* struct session was allocated \ borrowed from memory pool. */
        struct session *session = client->data;
-       assert(session_is_outgoing(session) == false);
+       assert(session_flags(session)->outgoing == false);
 
        if (uv_accept(master, client) != 0) {
                /* close session, close underlying uv handles and
@@ -267,7 +268,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
         * It will re-check every half of a request time limit if the connection
         * is idle and should be terminated, this is an educated guess. */
        struct session *s = client->data;
-       assert(session_is_outgoing(s) == false);
+       assert(session_flags(s)->outgoing == false);
 
        struct sockaddr *peer = session_get_peer(s);
        int peer_len = sizeof(union inaddr);
@@ -282,7 +283,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
        uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
 
        uint64_t timeout = KR_CONN_RTT_MAX / 2;
-       session_set_has_tls(s, tls);
+       session_flags(s)->has_tls = tls;
        if (tls) {
                timeout += TLS_MAX_HANDSHAKE_TIME;
                struct tls_ctx_t *ctx = session_tls_get_server_ctx(s);
index 73e6d0cf7b28818585709391c8518292910cd052..e8109fd89368f0e2dd157dc6da857e06c1a7ba75 100644 (file)
 /** List of tasks. */
 typedef array_t(struct qr_task *) session_tasklist_t;
 
-struct session_flags {
-       bool outgoing : 1;      /**< True: to upstream; false: from a client. */
-       bool throttled : 1;     /**< True: data reading from peer is temporarily stopped. */
-       bool has_tls : 1;       /**< True: given session uses TLS. */
-       bool connected : 1;     /**< True: TCP connection is established. */
-       bool closing : 1;       /**< True: session close sequence is in progress. */
-       bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
-};
-
-
 /* Per-session (TCP or UDP) persistent structure,
  * that exists between remote counterpart and a local socket.
  */
@@ -225,44 +215,9 @@ struct qr_task* session_tasklist_find(const struct session *session, uint16_t ms
        return ret;
 }
 
-bool session_is_outgoing(const struct session *session)
-{
-       return session->sflags.outgoing;
-}
-
-void session_set_outgoing(struct session *session, bool outgoing)
-{
-       session->sflags.outgoing = outgoing;
-}
-
-bool session_is_closing(const struct session *session)
-{
-       return session->sflags.closing;
-}
-
-void session_set_closing(struct session *session, bool closing)
-{
-       session->sflags.closing = closing;
-}
-
-bool session_is_connected(const struct session *session)
-{
-       return session->sflags.connected;
-}
-
-void session_set_connected(struct session *session, bool connected)
-{
-       session->sflags.connected = connected;
-}
-
-bool session_is_throttled(const struct session *session)
-{
-       return session->sflags.throttled;
-}
-
-void session_set_throttled(struct session *session, bool throttled)
+struct session_flags *session_flags(struct session *session)
 {
-       session->sflags.throttled = throttled;
+       return &session->sflags;
 }
 
 struct sockaddr *session_get_peer(struct session *session)
index aef3332e2e28e5de24607e8be19f20a790359566..a22bb4ff69abe2b42c3c8e39b025a64160edaaf1 100644 (file)
@@ -24,6 +24,15 @@ struct qr_task;
 struct worker_ctx;
 struct session;
 
+struct session_flags {
+       bool outgoing : 1;      /**< True: to upstream; false: from a client. */
+       bool throttled : 1;     /**< True: data reading from peer is temporarily stopped. */
+       bool has_tls : 1;       /**< True: given session uses TLS. */
+       bool connected : 1;     /**< True: TCP connection is established. */
+       bool closing : 1;       /**< True: session close sequence is in progress. */
+       bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */
+};
+
 /* Allocate new session. */
 struct session *session_new(void);
 /* Clear and free given session. */
@@ -76,20 +85,8 @@ void session_tasklist_finalize(struct session *session, int status);
 bool session_is_empty(const struct session *session);
 /** Finalize all tasks. */
 void session_tasks_finalize(struct session *session, int status);
-
-/** Operations with flags */
-bool session_is_outgoing(const struct session *session);
-void session_set_outgoing(struct session *session, bool outgoing);
-bool session_is_closing(const struct session *session);
-void session_set_closing(struct session *session, bool closing);
-bool session_is_connected(const struct session *session);
-void session_set_connected(struct session *session, bool connected);
-bool session_is_throttled(const struct session *session);
-void session_set_throttled(struct session *session, bool throttled);
-bool session_has_tls(const struct session *session);
-void session_set_has_tls(struct session *session, bool has_tls);
-bool session_wirebuf_error(struct session *session);
-
+/** Get pointer to session flags */
+struct session_flags *session_flags(struct session *session);
 /** Get peer address. */
 struct sockaddr *session_get_peer(struct session *session);
 /** Get pointer to server-side tls-related data. */
index fa2e894b571477fa90bc95187b8dd8342f75bb12..f73e7d74b51cb058e80c9abe963e3295ada9d01f 100644 (file)
@@ -392,7 +392,7 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb
        struct tls_common_ctx *tls_ctx = session_tls_get_common_ctx(s);
 
        assert (tls_ctx);
-       assert (session_is_outgoing(s) == tls_ctx->client_side);
+       assert (session_flags(s)->outgoing == tls_ctx->client_side);
 
        const uint16_t pkt_size = htons(pkt->size);
        const char *logstring = tls_ctx->client_side ? client_logstring : server_logstring;
@@ -1129,7 +1129,7 @@ int tls_client_connect_start(struct tls_client_ctx_t *client_ctx,
                return kr_error(EINVAL);
        }
 
-       assert(session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
+       assert(session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP);
 
        struct tls_common_ctx *ctx = &client_ctx->c;
 
index 318f3e5d7c87b5a375b7de10f50f5bbc5d0bf229..12ee439add5ff4aad47fc15676aaced3446405a4 100644 (file)
@@ -236,7 +236,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t
        /* Set current handle as a subrequest type. */
        struct session *session = handle->data;
        if (ret == 0) {
-               session_set_outgoing(session, true);
+               session_flags(session)->outgoing = true;
                ret = session_tasklist_add(session, task);
        }
        if (ret < 0) {
@@ -348,7 +348,7 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
        array_init(ctx->tasks);
        struct session *s = handle ? handle->data : NULL;
        if (s) {
-               assert(session_is_outgoing(s) == false);
+               assert(session_flags(s)->outgoing == false);
        }
        ctx->source.session = s;
 
@@ -547,13 +547,13 @@ static void qr_task_free(struct qr_task *task)
 
        /* Process source session. */
        if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 &&
-           !session_is_closing(s) && !session_is_throttled(s)) {
+           !session_flags(s)->closing && !session_flags(s)->throttled) {
                uv_handle_t *handle = session_get_handle(s);
                /* Start reading again if the session is throttled and
                 * the number of outgoing requests is below watermark. */
                if (handle) {
                        io_start_read(handle);
-                       session_set_throttled(s, false);
+                       session_flags(s)->throttled = false;
                }
        }
 
@@ -569,7 +569,7 @@ static void qr_task_free(struct qr_task *task)
 /*@ Register new qr_task within session. */
 static int qr_task_register(struct qr_task *task, struct session *session)
 {
-       assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP);
+       assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP);
 
        session_tasklist_add(session, task);
 
@@ -583,9 +583,9 @@ static int qr_task_register(struct qr_task *task, struct session *session)
         * when resuming reading. This is NYI.  */
        if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) {
                uv_handle_t *handle = session_get_handle(session);
-               if (handle && !session_is_throttled(session) && !session_is_closing(session)) {
+               if (handle && !session_flags(session)->throttled && !session_flags(session)->closing) {
                        io_stop_read(handle);
-                       session_set_throttled(session, true);
+                       session_flags(session)->throttled = true;
                }
        }
 
@@ -603,7 +603,7 @@ static void qr_task_complete(struct qr_task *task)
 
        struct session *s = ctx->source.session;
        if (s) {
-               assert(!session_is_outgoing(s) && session_waitinglist_is_empty(s));
+               assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s));
                session_tasklist_del(s, task);
        }
 
@@ -622,14 +622,14 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                }
                struct session* s = handle->data;
                assert(s);
-               if (!session_is_outgoing(s) || session_waitinglist_is_empty(s)) {
+               if (!session_flags(s)->outgoing || session_waitinglist_is_empty(s)) {
                        return status;
                }
        }
 
        if (handle) {
                struct session* s = handle->data;
-               bool outgoing = session_is_outgoing(s);
+               bool outgoing = session_flags(s)->outgoing;
                if (!outgoing) {
                        struct session* source_s = task->ctx->source.session;
                        if (source_s) {
@@ -638,7 +638,7 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                }
                if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) {
                        session_waitinglist_del(s, task);
-                       if (session_is_closing(s)) {
+                       if (session_flags(s)->closing) {
                                return status;
                        }
                        /* Finalize the task, if any errors.
@@ -667,7 +667,7 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status
                                }
                        }
                }
-               if (!session_is_closing(s)) {
+               if (!session_flags(s)->closing) {
                        io_start_read(handle); /* Start reading new query */
                }
        }
@@ -740,8 +740,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
 
        /* Send using given protocol */
        struct session *session = handle->data;
-       assert(!session_is_closing(session));
-       if (session_has_tls(session)) {
+       assert(!session_flags(session)->closing);
+       if (session_flags(session)->has_tls) {
                uv_write_t *write_req = (uv_write_t *)ioreq;
                write_req->data = task;
                ret = tls_write(write_req, handle, pkt, &on_task_write);
@@ -779,8 +779,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle,
        }
 
        /* Update statistics */
-       if (session_is_outgoing(session) && addr) {
-               if (session_has_tls(session))
+       if (session_flags(session)->outgoing && addr) {
+               if (session_flags(session)->has_tls)
                        worker->stats.tls += 1;
                else if (handle->type == UV_UDP)
                        worker->stats.udp += 1;
@@ -809,7 +809,7 @@ static int session_next_waiting_send(struct session *session)
 
 static int session_tls_hs_cb(struct session *session, int status)
 {
-       assert(session_is_outgoing(session));
+       assert(session_flags(session)->outgoing);
        uv_handle_t *handle = session_get_handle(session);
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
@@ -890,16 +890,16 @@ static void on_connect(uv_connect_t *req, int status)
        struct session *session = handle->data;
        struct sockaddr *peer = session_get_peer(session);
 
-       assert(session_is_outgoing(session));
+       assert(session_flags(session)->outgoing);
 
        if (status == UV_ECANCELED) {
                worker_del_tcp_waiting(worker, peer);
-               assert(session_is_empty(session) && session_is_closing(session));
+               assert(session_is_empty(session) && session_flags(session)->closing);
                iorequest_release(worker, req);
                return;
        }
 
-       if (session_is_closing(session)) {
+       if (session_flags(session)->closing) {
                worker_del_tcp_waiting(worker, peer);
                assert(session_is_empty(session));
                iorequest_release(worker, req);
@@ -918,7 +918,7 @@ static void on_connect(uv_connect_t *req, int status)
                return;
        }
 
-       if (!session_has_tls(session)) {
+       if (!session_flags(session)->has_tls) {
                /* if there is a TLS, session still waiting for handshake,
                 * otherwise remove it from waiting list */
                if (worker_del_tcp_waiting(worker, peer) != 0) {
@@ -941,10 +941,10 @@ static void on_connect(uv_connect_t *req, int status)
                VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str);
        }
 
-       session_set_connected(session, true);
+       session_flags(session)->connected = true;
 
        int ret = kr_ok();
-       if (session_has_tls(session)) {
+       if (session_flags(session)->has_tls) {
                struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session);
                ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb);
                if (ret == kr_error(EAGAIN)) {
@@ -1010,11 +1010,11 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer)
        struct worker_ctx *worker =  timer->loop->data;
        struct sockaddr *peer = session_get_peer(session);
 
-       assert(session_is_outgoing(session));
+       assert(session_flags(session)->outgoing);
 
        uv_timer_stop(timer);
 
-       if (session_has_tls(session)) {
+       if (session_flags(session)->has_tls) {
                worker_del_tcp_waiting(worker, peer);
        }
 
@@ -1074,7 +1074,7 @@ static uv_handle_t *retransmit(struct qr_task *task)
                struct sockaddr *addr = (struct sockaddr *)choice;
                struct session *session = ret->data;
                struct sockaddr *peer = session_get_peer(session);
-               assert (peer->sa_family == AF_UNSPEC && session_is_outgoing(session));
+               assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing);
                memcpy(peer, addr, kr_sockaddr_len(addr));
                if (qr_task_send(task, ret, (struct sockaddr *)choice,
                                 task->pktbuf) == 0) {
@@ -1193,7 +1193,7 @@ static int qr_task_finalize(struct qr_task *task, int state)
        /* Send back answer */
        struct session *source_session = ctx->source.session;
        uv_handle_t *handle = session_get_handle(source_session);
-       assert(!session_is_closing(source_session));
+       assert(!session_flags(source_session)->closing);
        assert(handle && handle->data == ctx->source.session);
        assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
        int res = qr_task_send(task, handle,
@@ -1243,7 +1243,7 @@ static int qr_task_step(struct qr_task *task,
        task->addrlist = NULL;
        task->addrlist_count = 0;
        task->addrlist_turn = 0;
-       req->has_tls = (ctx->source.session && session_has_tls(ctx->source.session));
+       req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls);
 
        if (worker->too_many_open) {
                struct kr_rplan *rplan = &req->rplan;
@@ -1327,8 +1327,8 @@ static int qr_task_step(struct qr_task *task,
                }
                struct session* session = NULL;
                if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) {
-                       assert(session_is_outgoing(session));
-                       if (session_is_closing(session)) {
+                       assert(session_flags(session)->outgoing);
+                       if (session_flags(session)->closing) {
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
                        }
@@ -1352,8 +1352,8 @@ static int qr_task_step(struct qr_task *task,
                        task->pending_count += 1;
                } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) {
                        /* Connection has been already established */
-                       assert(session_is_outgoing(session));
-                       if (session_is_closing(session)) {
+                       assert(session_flags(session)->outgoing);
+                       if (session_flags(session)->closing) {
                                session_tasklist_del(session, task);
                                subreq_finalize(task, packet_source, packet);
                                return qr_task_finalize(task, KR_STATE_FAIL);
@@ -1456,7 +1456,7 @@ static int qr_task_step(struct qr_task *task,
                                }
                                tls_client_ctx_set_session(tls_ctx, session);
                                session_tls_set_client_ctx(session, tls_ctx);
-                               session_set_has_tls(session, true);
+                               session_flags(session)->has_tls = true;
                        }
 
                        conn->data = session;
@@ -1541,7 +1541,7 @@ int worker_submit(struct session *session, knot_pkt_t *query)
         * or resume if this is subrequest */
        struct qr_task *task = NULL;
        struct sockaddr *addr = NULL;
-       if (!session_is_outgoing(session)) { /* request from a client */
+       if (!session_flags(session)->outgoing) { /* request from a client */
                /* Ignore badly formed queries. */
                if (!query || ret != 0 || knot_wire_get_qr(query->wire)) {
                        if (query) worker->stats.dropped += 1;
@@ -1578,7 +1578,7 @@ int worker_submit(struct session *session, knot_pkt_t *query)
                if (task == NULL) {
                        return kr_error(ENOENT);
                }
-               assert(!session_is_closing(session));
+               assert(!session_flags(session)->closing);
                addr = session_get_peer(session);
        }
        assert(uv_is_closing(session_get_handle(session)) == false);
@@ -1681,7 +1681,7 @@ int worker_end_tcp(struct session *session)
        struct worker_ctx *worker = handle->loop->data;
        struct sockaddr *peer = session_get_peer(session);
        worker_del_tcp_connected(worker, peer);
-       session_set_connected(session, false);
+       session_flags(session)->connected = false;
 
        struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
        if (tls_client_ctx) {
@@ -1701,7 +1701,7 @@ int worker_end_tcp(struct session *session)
                session_waitinglist_del_index(session, 0);
                assert(task->refs > 1);
                session_tasklist_del(session, task);
-               if (session_is_outgoing(session)) {
+               if (session_flags(session)->outgoing) {
                        if (task->ctx->req.options.FORWARD) {
                                /* We are in TCP_FORWARD mode.
                                 * To prevent failing at kr_resolve_consume()
@@ -1721,7 +1721,7 @@ int worker_end_tcp(struct session *session)
        while (!session_tasklist_is_empty(session)) {
                struct qr_task *task = session_tasklist_get_first(session);
                session_tasklist_del_index(session, 0);
-               if (session_is_outgoing(session)) {
+               if (session_flags(session)->outgoing) {
                        if (task->ctx->req.options.FORWARD) {
                                struct kr_request *req = &task->ctx->req;
                                struct kr_rplan *rplan = &req->rplan;