From: Grigorii Demidov Date: Wed, 19 Sep 2018 12:39:39 +0000 (+0200) Subject: daemon/session: minor refactoring around session flags to reduce number of api functions X-Git-Tag: v3.1.0~10^2~30 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2beb1c059edb24cec4de06ceba6045d6644603ff;p=thirdparty%2Fknot-resolver.git daemon/session: minor refactoring around session flags to reduce number of api functions --- diff --git a/daemon/io.c b/daemon/io.c index aedaa9dd9..09719f946 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -59,12 +59,12 @@ static void handle_getbuf(uv_handle_t* handle, size_t suggested_size, uv_buf_t* * guaranteed to be unchanged only for the duration of * udp_read() and tcp_read(). */ - struct session *session = handle->data; - if (!session_has_tls(session)) { - buf->base = (char *) session_wirebuf_get_free_start(session); - buf->len = session_wirebuf_get_free_size(session); + struct session *s = handle->data; + if (!session_flags(s)->has_tls) { + buf->base = (char *) session_wirebuf_get_free_start(s); + buf->len = session_wirebuf_get_free_size(s); } else { - struct tls_common_ctx *ctx = session_tls_get_common_ctx(session); + struct tls_common_ctx *ctx = session_tls_get_common_ctx(s); buf->base = (char *) ctx->recv_buf; buf->len = sizeof(ctx->recv_buf); } @@ -76,7 +76,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; struct session *s = handle->data; - if (session_is_closing(s)) { + if (session_flags(s)->closing) { return; } if (nread <= 0) { @@ -89,7 +89,7 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, return; } struct sockaddr *peer = session_get_peer(s); - if (session_is_outgoing(s)) { + if (session_flags(s)->outgoing) { assert(peer->sa_family != AF_UNSPEC); if (kr_sockaddr_cmp(peer, addr) != 0) { return; @@ -97,7 +97,8 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, } else { memcpy(peer, addr, kr_sockaddr_len(addr)); } - ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, nread); + ssize_t consumed = session_wirebuf_consume(s, (const uint8_t *)buf->base, + nread); assert(consumed == nread); session_wirebuf_process(s); session_wirebuf_discard(s); @@ -110,7 +111,7 @@ static int udp_bind_finalize(uv_handle_t *handle) /* Handle is already created, just create context. */ struct session *s = session_new(); assert(s); - session_set_outgoing(s, false); + session_flags(s)->outgoing = false; session_set_handle(s, handle); return io_start_read(handle); } @@ -145,10 +146,10 @@ static void tcp_timeout_trigger(uv_timer_t *timer) { struct session *s = timer->data; - assert(session_is_outgoing(s) == false); + assert(!session_flags(s)->outgoing); if (!session_tasklist_is_empty(s)) { uv_timer_again(timer); - } else if (!session_is_closing(s)) { + } else if (!session_flags(s)->closing) { uv_timer_stop(timer); session_close(s); } @@ -162,7 +163,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) assert(s && session_get_handle(s) == (uv_handle_t *)handle && handle->type == UV_TCP); - if (session_is_closing(s)) { + if (session_flags(s)->closing) { return; } @@ -190,7 +191,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) ssize_t consumed = 0; const uint8_t *data = (const uint8_t *)buf->base; ssize_t data_len = nread; - if (session_has_tls(s)) { + if (session_flags(s)->has_tls) { /* buf->base points to start of the tls receive buffer. Decode data free space in session wire buffer. */ consumed = tls_process_input_data(s, (const uint8_t *)buf->base, nread); @@ -209,7 +210,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) /* Exceeded per-connection quota for outstanding requests * stop reading from stream and close after last message is processed. */ uv_timer_t *t = session_get_timer(s); - if (!session_is_outgoing(s) && !uv_is_closing((uv_handle_t *)t)) { + if (!session_flags(s)->outgoing && !uv_is_closing((uv_handle_t *)t)) { uv_timer_stop(t); if (session_tasklist_is_empty(s)) { session_close(s); @@ -220,7 +221,7 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) } /* Connection spawned at least one request, reset its deadline for next query. * https://tools.ietf.org/html/rfc7766#section-6.2.3 */ - } else if (ret > 0 && !session_is_closing(s)) { + } else if (ret > 0 && !session_flags(s)->closing) { session_timer_restart(s); } session_wirebuf_compress(s); @@ -254,7 +255,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) /* struct session was allocated \ borrowed from memory pool. */ struct session *session = client->data; - assert(session_is_outgoing(session) == false); + assert(session_flags(session)->outgoing == false); if (uv_accept(master, client) != 0) { /* close session, close underlying uv handles and @@ -267,7 +268,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) * It will re-check every half of a request time limit if the connection * is idle and should be terminated, this is an educated guess. */ struct session *s = client->data; - assert(session_is_outgoing(s) == false); + assert(session_flags(s)->outgoing == false); struct sockaddr *peer = session_get_peer(s); int peer_len = sizeof(union inaddr); @@ -282,7 +283,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) uint64_t idle_in_timeout = net->tcp.in_idle_timeout; uint64_t timeout = KR_CONN_RTT_MAX / 2; - session_set_has_tls(s, tls); + session_flags(s)->has_tls = tls; if (tls) { timeout += TLS_MAX_HANDSHAKE_TIME; struct tls_ctx_t *ctx = session_tls_get_server_ctx(s); diff --git a/daemon/session.c b/daemon/session.c index 73e6d0cf7..e8109fd89 100644 --- a/daemon/session.c +++ b/daemon/session.c @@ -12,16 +12,6 @@ /** List of tasks. */ typedef array_t(struct qr_task *) session_tasklist_t; -struct session_flags { - bool outgoing : 1; /**< True: to upstream; false: from a client. */ - bool throttled : 1; /**< True: data reading from peer is temporarily stopped. */ - bool has_tls : 1; /**< True: given session uses TLS. */ - bool connected : 1; /**< True: TCP connection is established. */ - bool closing : 1; /**< True: session close sequence is in progress. */ - bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */ -}; - - /* Per-session (TCP or UDP) persistent structure, * that exists between remote counterpart and a local socket. */ @@ -225,44 +215,9 @@ struct qr_task* session_tasklist_find(const struct session *session, uint16_t ms return ret; } -bool session_is_outgoing(const struct session *session) -{ - return session->sflags.outgoing; -} - -void session_set_outgoing(struct session *session, bool outgoing) -{ - session->sflags.outgoing = outgoing; -} - -bool session_is_closing(const struct session *session) -{ - return session->sflags.closing; -} - -void session_set_closing(struct session *session, bool closing) -{ - session->sflags.closing = closing; -} - -bool session_is_connected(const struct session *session) -{ - return session->sflags.connected; -} - -void session_set_connected(struct session *session, bool connected) -{ - session->sflags.connected = connected; -} - -bool session_is_throttled(const struct session *session) -{ - return session->sflags.throttled; -} - -void session_set_throttled(struct session *session, bool throttled) +struct session_flags *session_flags(struct session *session) { - session->sflags.throttled = throttled; + return &session->sflags; } struct sockaddr *session_get_peer(struct session *session) diff --git a/daemon/session.h b/daemon/session.h index aef3332e2..a22bb4ff6 100644 --- a/daemon/session.h +++ b/daemon/session.h @@ -24,6 +24,15 @@ struct qr_task; struct worker_ctx; struct session; +struct session_flags { + bool outgoing : 1; /**< True: to upstream; false: from a client. */ + bool throttled : 1; /**< True: data reading from peer is temporarily stopped. */ + bool has_tls : 1; /**< True: given session uses TLS. */ + bool connected : 1; /**< True: TCP connection is established. */ + bool closing : 1; /**< True: session close sequence is in progress. */ + bool wirebuf_error : 1; /**< True: last operation with wirebuf ended up with an error. */ +}; + /* Allocate new session. */ struct session *session_new(void); /* Clear and free given session. */ @@ -76,20 +85,8 @@ void session_tasklist_finalize(struct session *session, int status); bool session_is_empty(const struct session *session); /** Finalize all tasks. */ void session_tasks_finalize(struct session *session, int status); - -/** Operations with flags */ -bool session_is_outgoing(const struct session *session); -void session_set_outgoing(struct session *session, bool outgoing); -bool session_is_closing(const struct session *session); -void session_set_closing(struct session *session, bool closing); -bool session_is_connected(const struct session *session); -void session_set_connected(struct session *session, bool connected); -bool session_is_throttled(const struct session *session); -void session_set_throttled(struct session *session, bool throttled); -bool session_has_tls(const struct session *session); -void session_set_has_tls(struct session *session, bool has_tls); -bool session_wirebuf_error(struct session *session); - +/** Get pointer to session flags */ +struct session_flags *session_flags(struct session *session); /** Get peer address. */ struct sockaddr *session_get_peer(struct session *session); /** Get pointer to server-side tls-related data. */ diff --git a/daemon/tls.c b/daemon/tls.c index fa2e894b5..f73e7d74b 100644 --- a/daemon/tls.c +++ b/daemon/tls.c @@ -392,7 +392,7 @@ int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb struct tls_common_ctx *tls_ctx = session_tls_get_common_ctx(s); assert (tls_ctx); - assert (session_is_outgoing(s) == tls_ctx->client_side); + assert (session_flags(s)->outgoing == tls_ctx->client_side); const uint16_t pkt_size = htons(pkt->size); const char *logstring = tls_ctx->client_side ? client_logstring : server_logstring; @@ -1129,7 +1129,7 @@ int tls_client_connect_start(struct tls_client_ctx_t *client_ctx, return kr_error(EINVAL); } - assert(session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP); + assert(session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP); struct tls_common_ctx *ctx = &client_ctx->c; diff --git a/daemon/worker.c b/daemon/worker.c index 318f3e5d7..12ee439ad 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -236,7 +236,7 @@ static uv_handle_t *ioreq_spawn(struct qr_task *task, int socktype, sa_family_t /* Set current handle as a subrequest type. */ struct session *session = handle->data; if (ret == 0) { - session_set_outgoing(session, true); + session_flags(session)->outgoing = true; ret = session_tasklist_add(session, task); } if (ret < 0) { @@ -348,7 +348,7 @@ static struct request_ctx *request_create(struct worker_ctx *worker, array_init(ctx->tasks); struct session *s = handle ? handle->data : NULL; if (s) { - assert(session_is_outgoing(s) == false); + assert(session_flags(s)->outgoing == false); } ctx->source.session = s; @@ -547,13 +547,13 @@ static void qr_task_free(struct qr_task *task) /* Process source session. */ if (s && session_tasklist_get_len(s) < worker->tcp_pipeline_max/2 && - !session_is_closing(s) && !session_is_throttled(s)) { + !session_flags(s)->closing && !session_flags(s)->throttled) { uv_handle_t *handle = session_get_handle(s); /* Start reading again if the session is throttled and * the number of outgoing requests is below watermark. */ if (handle) { io_start_read(handle); - session_set_throttled(s, false); + session_flags(s)->throttled = false; } } @@ -569,7 +569,7 @@ static void qr_task_free(struct qr_task *task) /*@ Register new qr_task within session. */ static int qr_task_register(struct qr_task *task, struct session *session) { - assert(!session_is_outgoing(session) && session_get_handle(session)->type == UV_TCP); + assert(!session_flags(session)->outgoing && session_get_handle(session)->type == UV_TCP); session_tasklist_add(session, task); @@ -583,9 +583,9 @@ static int qr_task_register(struct qr_task *task, struct session *session) * when resuming reading. This is NYI. */ if (session_tasklist_get_len(session) >= task->ctx->worker->tcp_pipeline_max) { uv_handle_t *handle = session_get_handle(session); - if (handle && !session_is_throttled(session) && !session_is_closing(session)) { + if (handle && !session_flags(session)->throttled && !session_flags(session)->closing) { io_stop_read(handle); - session_set_throttled(session, true); + session_flags(session)->throttled = true; } } @@ -603,7 +603,7 @@ static void qr_task_complete(struct qr_task *task) struct session *s = ctx->source.session; if (s) { - assert(!session_is_outgoing(s) && session_waitinglist_is_empty(s)); + assert(!session_flags(s)->outgoing && session_waitinglist_is_empty(s)); session_tasklist_del(s, task); } @@ -622,14 +622,14 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status } struct session* s = handle->data; assert(s); - if (!session_is_outgoing(s) || session_waitinglist_is_empty(s)) { + if (!session_flags(s)->outgoing || session_waitinglist_is_empty(s)) { return status; } } if (handle) { struct session* s = handle->data; - bool outgoing = session_is_outgoing(s); + bool outgoing = session_flags(s)->outgoing; if (!outgoing) { struct session* source_s = task->ctx->source.session; if (source_s) { @@ -638,7 +638,7 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status } if (handle->type == UV_TCP && outgoing && !session_waitinglist_is_empty(s)) { session_waitinglist_del(s, task); - if (session_is_closing(s)) { + if (session_flags(s)->closing) { return status; } /* Finalize the task, if any errors. @@ -667,7 +667,7 @@ static int qr_task_on_send(struct qr_task *task, uv_handle_t *handle, int status } } } - if (!session_is_closing(s)) { + if (!session_flags(s)->closing) { io_start_read(handle); /* Start reading new query */ } } @@ -740,8 +740,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, /* Send using given protocol */ struct session *session = handle->data; - assert(!session_is_closing(session)); - if (session_has_tls(session)) { + assert(!session_flags(session)->closing); + if (session_flags(session)->has_tls) { uv_write_t *write_req = (uv_write_t *)ioreq; write_req->data = task; ret = tls_write(write_req, handle, pkt, &on_task_write); @@ -779,8 +779,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, } /* Update statistics */ - if (session_is_outgoing(session) && addr) { - if (session_has_tls(session)) + if (session_flags(session)->outgoing && addr) { + if (session_flags(session)->has_tls) worker->stats.tls += 1; else if (handle->type == UV_UDP) worker->stats.udp += 1; @@ -809,7 +809,7 @@ static int session_next_waiting_send(struct session *session) static int session_tls_hs_cb(struct session *session, int status) { - assert(session_is_outgoing(session)); + assert(session_flags(session)->outgoing); uv_handle_t *handle = session_get_handle(session); uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; @@ -890,16 +890,16 @@ static void on_connect(uv_connect_t *req, int status) struct session *session = handle->data; struct sockaddr *peer = session_get_peer(session); - assert(session_is_outgoing(session)); + assert(session_flags(session)->outgoing); if (status == UV_ECANCELED) { worker_del_tcp_waiting(worker, peer); - assert(session_is_empty(session) && session_is_closing(session)); + assert(session_is_empty(session) && session_flags(session)->closing); iorequest_release(worker, req); return; } - if (session_is_closing(session)) { + if (session_flags(session)->closing) { worker_del_tcp_waiting(worker, peer); assert(session_is_empty(session)); iorequest_release(worker, req); @@ -918,7 +918,7 @@ static void on_connect(uv_connect_t *req, int status) return; } - if (!session_has_tls(session)) { + if (!session_flags(session)->has_tls) { /* if there is a TLS, session still waiting for handshake, * otherwise remove it from waiting list */ if (worker_del_tcp_waiting(worker, peer) != 0) { @@ -941,10 +941,10 @@ static void on_connect(uv_connect_t *req, int status) VERBOSE_MSG(qry, "=> connected to '%s'\n", peer_str); } - session_set_connected(session, true); + session_flags(session)->connected = true; int ret = kr_ok(); - if (session_has_tls(session)) { + if (session_flags(session)->has_tls) { struct tls_client_ctx_t *tls_ctx = session_tls_get_client_ctx(session); ret = tls_client_connect_start(tls_ctx, session, session_tls_hs_cb); if (ret == kr_error(EAGAIN)) { @@ -1010,11 +1010,11 @@ static void on_tcp_watchdog_timeout(uv_timer_t *timer) struct worker_ctx *worker = timer->loop->data; struct sockaddr *peer = session_get_peer(session); - assert(session_is_outgoing(session)); + assert(session_flags(session)->outgoing); uv_timer_stop(timer); - if (session_has_tls(session)) { + if (session_flags(session)->has_tls) { worker_del_tcp_waiting(worker, peer); } @@ -1074,7 +1074,7 @@ static uv_handle_t *retransmit(struct qr_task *task) struct sockaddr *addr = (struct sockaddr *)choice; struct session *session = ret->data; struct sockaddr *peer = session_get_peer(session); - assert (peer->sa_family == AF_UNSPEC && session_is_outgoing(session)); + assert (peer->sa_family == AF_UNSPEC && session_flags(session)->outgoing); memcpy(peer, addr, kr_sockaddr_len(addr)); if (qr_task_send(task, ret, (struct sockaddr *)choice, task->pktbuf) == 0) { @@ -1193,7 +1193,7 @@ static int qr_task_finalize(struct qr_task *task, int state) /* Send back answer */ struct session *source_session = ctx->source.session; uv_handle_t *handle = session_get_handle(source_session); - assert(!session_is_closing(source_session)); + assert(!session_flags(source_session)->closing); assert(handle && handle->data == ctx->source.session); assert(ctx->source.addr.ip.sa_family != AF_UNSPEC); int res = qr_task_send(task, handle, @@ -1243,7 +1243,7 @@ static int qr_task_step(struct qr_task *task, task->addrlist = NULL; task->addrlist_count = 0; task->addrlist_turn = 0; - req->has_tls = (ctx->source.session && session_has_tls(ctx->source.session)); + req->has_tls = (ctx->source.session && session_flags(ctx->source.session)->has_tls); if (worker->too_many_open) { struct kr_rplan *rplan = &req->rplan; @@ -1327,8 +1327,8 @@ static int qr_task_step(struct qr_task *task, } struct session* session = NULL; if ((session = worker_find_tcp_waiting(ctx->worker, addr)) != NULL) { - assert(session_is_outgoing(session)); - if (session_is_closing(session)) { + assert(session_flags(session)->outgoing); + if (session_flags(session)->closing) { subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); } @@ -1352,8 +1352,8 @@ static int qr_task_step(struct qr_task *task, task->pending_count += 1; } else if ((session = worker_find_tcp_connected(ctx->worker, addr)) != NULL) { /* Connection has been already established */ - assert(session_is_outgoing(session)); - if (session_is_closing(session)) { + assert(session_flags(session)->outgoing); + if (session_flags(session)->closing) { session_tasklist_del(session, task); subreq_finalize(task, packet_source, packet); return qr_task_finalize(task, KR_STATE_FAIL); @@ -1456,7 +1456,7 @@ static int qr_task_step(struct qr_task *task, } tls_client_ctx_set_session(tls_ctx, session); session_tls_set_client_ctx(session, tls_ctx); - session_set_has_tls(session, true); + session_flags(session)->has_tls = true; } conn->data = session; @@ -1541,7 +1541,7 @@ int worker_submit(struct session *session, knot_pkt_t *query) * or resume if this is subrequest */ struct qr_task *task = NULL; struct sockaddr *addr = NULL; - if (!session_is_outgoing(session)) { /* request from a client */ + if (!session_flags(session)->outgoing) { /* request from a client */ /* Ignore badly formed queries. */ if (!query || ret != 0 || knot_wire_get_qr(query->wire)) { if (query) worker->stats.dropped += 1; @@ -1578,7 +1578,7 @@ int worker_submit(struct session *session, knot_pkt_t *query) if (task == NULL) { return kr_error(ENOENT); } - assert(!session_is_closing(session)); + assert(!session_flags(session)->closing); addr = session_get_peer(session); } assert(uv_is_closing(session_get_handle(session)) == false); @@ -1681,7 +1681,7 @@ int worker_end_tcp(struct session *session) struct worker_ctx *worker = handle->loop->data; struct sockaddr *peer = session_get_peer(session); worker_del_tcp_connected(worker, peer); - session_set_connected(session, false); + session_flags(session)->connected = false; struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session); if (tls_client_ctx) { @@ -1701,7 +1701,7 @@ int worker_end_tcp(struct session *session) session_waitinglist_del_index(session, 0); assert(task->refs > 1); session_tasklist_del(session, task); - if (session_is_outgoing(session)) { + if (session_flags(session)->outgoing) { if (task->ctx->req.options.FORWARD) { /* We are in TCP_FORWARD mode. * To prevent failing at kr_resolve_consume() @@ -1721,7 +1721,7 @@ int worker_end_tcp(struct session *session) while (!session_tasklist_is_empty(session)) { struct qr_task *task = session_tasklist_get_first(session); session_tasklist_del_index(session, 0); - if (session_is_outgoing(session)) { + if (session_flags(session)->outgoing) { if (task->ctx->req.options.FORWARD) { struct kr_request *req = &task->ctx->req; struct kr_rplan *rplan = &req->rplan;