]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: check: Use a persistent conn-stream for health-checks
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 6 Jan 2022 07:46:56 +0000 (08:46 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Thu, 24 Feb 2022 10:00:03 +0000 (11:00 +0100)
In the same way a stream has always valid conn-streams, when a health-checks
is created, a conn-stream is now created and the health-check is attached on
it, as an app. This simplify a bit the connect part when a health-check is
running.

src/check.c
src/conn_stream.c
src/tcpcheck.c

index cbdf620a9bd4b3abdbd668e9f64c0c652c60cce1..e4f2589f759aa2357963014be2ae9c02973f772a 100644 (file)
@@ -1094,7 +1094,6 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
 {
        struct check *check = context;
        struct proxy *proxy = check->proxy;
-       struct conn_stream *cs;
        struct connection *conn;
        int rv;
        int expired = tick_is_expired(t->expire, now_ms);
@@ -1137,8 +1136,7 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
                expired = 0;
        }
 
-       cs = check->cs;
-       conn = cs_conn(cs);
+       conn = cs_conn(check->cs);
 
        /* there was a test running.
         * First, let's check whether there was an uncaught error,
@@ -1148,17 +1146,15 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
                /* Here the connection must be defined. Otherwise the
                 * error would have already been detected
                 */
-               if ((conn && ((conn->flags & CO_FL_ERROR) || (cs->flags & CS_FL_ERROR))) || expired) {
+               if ((conn && ((conn->flags & CO_FL_ERROR) || (check->cs->flags & CS_FL_ERROR))) || expired) {
                        TRACE_ERROR("report connection error", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END|CHK_EV_HCHK_ERR, check);
                        chk_report_conn_err(check, 0, expired);
                }
                else {
                        if (check->state & CHK_ST_CLOSE_CONN) {
                                TRACE_DEVEL("closing current connection", CHK_EV_TASK_WAKE|CHK_EV_HCHK_RUN, check);
-                               cs_destroy(cs);
-                               cs = NULL;
+                               cs_detach_endp(check->cs);
                                conn = NULL;
-                               check->cs = NULL;
                                check->state &= ~CHK_ST_CLOSE_CONN;
                                tcpcheck_main(check);
                        }
@@ -1173,8 +1169,7 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
        TRACE_STATE("health-check complete or aborted", CHK_EV_TASK_WAKE|CHK_EV_HCHK_END, check);
 
        check->current_step = NULL;
-       cs = check->cs;
-       conn = cs_conn(cs);
+       conn = cs_conn(check->cs);
 
        if (conn && conn->xprt) {
                /* The check was aborted and the connection was not yet closed.
@@ -1182,21 +1177,18 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
                 * as a failed response coupled with "observe layer7" caused the
                 * server state to be suddenly changed.
                 */
-               cs_drain_and_close(cs);
+               cs_drain_and_close(check->cs);
        }
 
-       if (cs) {
-               if (conn && check->wait_list.events)
-                       conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
-               /* We may have been scheduled to run, and the
-                * I/O handler expects to have a cs, so remove
-                * the tasklet
-                */
-               tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
-               cs_destroy(cs);
-               cs = check->cs = NULL;
-               conn = NULL;
-       }
+       /* TODO: must be handled by cs_detach_endp */
+       if (conn && check->wait_list.events)
+               conn->mux->unsubscribe(check->cs, check->wait_list.events, &check->wait_list);
+       /* We may have been scheduled to run, and the
+        * I/O handler expects to have a cs, so remove
+        * the tasklet
+        */
+       tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
+       cs_detach_endp(check->cs);
 
        if (check->sess != NULL) {
                vars_prune(&check->vars, check->sess, NULL);
@@ -1356,11 +1348,8 @@ void free_check(struct check *check)
        check_release_buf(check, &check->bi);
        check_release_buf(check, &check->bo);
        if (check->cs) {
-               struct connection *conn = cs_conn(check->cs);
-
-               if (conn)
-                       conn_free(conn);
-               cs_free(check->cs);
+               cs_detach_endp(check->cs);
+               cs_detach_app(check->cs);
                check->cs = NULL;
        }
 }
@@ -1399,15 +1388,18 @@ int start_check_task(struct check *check, int mininter,
        /* task for the check. Process-based checks exclusively run on thread 1. */
        if (check->type == PR_O2_EXT_CHK)
                t = task_new_on(0);
-       else
+       else {
+               check->cs = cs_new();
+               if (!check->cs)
+                       goto fail_alloc_cs;
+               if (cs_attach_app(check->cs, &check->obj_type) < 0)
+                       goto fail_attach_cs;
                t = task_new_anywhere();
-
-       if (!t) {
-               ha_alert("Starting [%s:%s] check: out of memory.\n",
-                        check->server->proxy->id, check->server->id);
-               return 0;
        }
 
+       if (!t)
+               goto fail_alloc_task;
+
        check->task = t;
        t->process = process_chk;
        t->context = check;
@@ -1424,6 +1416,14 @@ int start_check_task(struct check *check, int mininter,
        task_queue(t);
 
        return 1;
+
+  fail_alloc_task:
+  fail_attach_cs:
+       cs_free(check->cs);
+  fail_alloc_cs:
+       ha_alert("Starting [%s:%s] check: out of memory.\n",
+                check->server->proxy->id, check->server->id);
+       return 0;
 }
 
 /*
index f0b06c65bc783589e298622a2c1483a1470eff82..df4181c15892048233501a5cecb0aee39b9a4191 100644 (file)
@@ -112,6 +112,7 @@ void cs_detach_endp(struct conn_stream *cs)
 
        if ((conn = cs_conn(cs))) {
                if (conn->mux) {
+                       /* TODO: handle unsubscribe for healthchecks too */
                        if (cs->si && cs->si->wait_event.events != 0)
                                conn->mux->unsubscribe(cs, cs->si->wait_event.events, &cs->si->wait_event);
                        conn->mux->detach(cs);
index 0aab10fed21a8d70ea6d80336cbf9bdf3d969e54..1b8199b1ce7ce25cd00a8a1f08133742137a0c25 100644 (file)
@@ -1057,8 +1057,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
        struct proxy *proxy = check->proxy;
        struct server *s = check->server;
        struct task *t = check->task;
-       struct conn_stream *cs = check->cs;
-       struct connection *conn = cs_conn(cs);
+       struct connection *conn = cs_conn(check->cs);
        struct protocol *proto;
        struct xprt_ops *xprt;
        struct tcpcheck_rule *next;
@@ -1074,7 +1073,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
                        /* We are still waiting for the connection establishment */
                        if (next && next->action == TCPCHK_ACT_SEND) {
                                if (!(check->wait_list.events & SUB_RETRY_SEND))
-                                       conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
+                                       conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
                                ret = TCPCHK_EVAL_WAIT;
                                TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
                        }
@@ -1092,22 +1091,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
 
        /* No connection, prepare a new one */
        conn = conn_new((s ? &s->obj_type : &proxy->obj_type));
-       if (conn)
-               cs = cs_new();
-       if (!conn || !cs) {
-               chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
-                            tcpcheck_get_step_id(check, rule));
-               if (rule->comment)
-                       chunk_appendf(&trash, " comment: '%s'", rule->comment);
-               set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
-               ret = TCPCHK_EVAL_STOP;
-               TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
-               if (conn)
-                       conn_free(conn);
-               goto out;
-       }
-       cs_attach_endp(cs, &conn->obj_type, conn);
-       if (cs_attach_app(cs, &check->obj_type) < 0) {
+       if (!conn) {
                chunk_printf(&trash, "TCPCHK error allocating connection at step %d",
                             tcpcheck_get_step_id(check, rule));
                if (rule->comment)
@@ -1115,12 +1099,10 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
                set_server_check_status(check, HCHK_STATUS_SOCKERR, trash.area);
                ret = TCPCHK_EVAL_STOP;
                TRACE_ERROR("conn-stream allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
-               cs_destroy(cs);
                goto out;
        }
+       cs_attach_endp(check->cs, &conn->obj_type, conn);
        tasklet_set_tid(check->wait_list.tasklet, tid);
-
-       check->cs = cs;
        conn_set_owner(conn, check->sess, NULL);
 
        /* Maybe there were an older connection we were waiting on */
@@ -1214,7 +1196,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
                goto fail_check;
 
        conn_set_private(conn);
-       conn->ctx = cs;
+       conn->ctx = check->cs;
 
 #ifdef USE_OPENSSL
        if (connect->sni)
@@ -1263,7 +1245,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
 
                        mux_ops = conn_get_best_mux(conn, IST_NULL, PROTO_SIDE_BE, mode);
                }
-               if (mux_ops && conn_install_mux(conn, mux_ops, cs, proxy, check->sess) < 0) {
+               if (mux_ops && conn_install_mux(conn, mux_ops, check->cs, proxy, check->sess) < 0) {
                        TRACE_ERROR("failed to install mux", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
                        status = SF_ERR_INTERNAL;
                        goto fail_check;
@@ -1310,9 +1292,9 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
        if (conn->flags & CO_FL_WAIT_XPRT) {
                if (conn->mux) {
                        if (next && next->action == TCPCHK_ACT_SEND)
-                               conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
+                               conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
                        else
-                               conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
+                               conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->wait_list);
                }
                ret = TCPCHK_EVAL_WAIT;
                TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
@@ -2157,7 +2139,7 @@ int tcpcheck_main(struct check *check)
         */
 
        /* 1- check for connection error, if any */
-       if ((conn && conn->flags & CO_FL_ERROR) || (cs && cs->flags & CS_FL_ERROR))
+       if ((conn && conn->flags & CO_FL_ERROR) || (cs->flags & CS_FL_ERROR))
                goto out_end_tcpcheck;
 
        /* 2- check if a rule must be resume. It happens if check->current_step
@@ -2202,7 +2184,7 @@ int tcpcheck_main(struct check *check)
                switch (rule->action) {
                case TCPCHK_ACT_CONNECT:
                        /* Not the first connection, release it first */
-                       if (cs && check->current_step != rule) {
+                       if (cs_conn(cs) && check->current_step != rule) {
                                check->state |= CHK_ST_CLOSE_CONN;
                                retcode = -1;
                        }
@@ -2219,11 +2201,10 @@ int tcpcheck_main(struct check *check)
                        TRACE_PROTO("eval connect rule", CHK_EV_TCPCHK_EVAL|CHK_EV_TCPCHK_CONN, check);
                        eval_ret = tcpcheck_eval_connect(check, rule);
 
-                       /* Refresh conn-stream and connection */
-                       cs = check->cs;
+                       /* Refresh connection */
                        conn = cs_conn(cs);
                        last_read = 0;
-                       must_read = ((cs && IS_HTX_CS(cs)) ? htx_is_empty(htxbuf(&check->bi)) : !b_data(&check->bi));
+                       must_read = (IS_HTX_CS(cs) ? htx_is_empty(htxbuf(&check->bi)) : !b_data(&check->bi));
                        break;
                case TCPCHK_ACT_SEND:
                        check->current_step = rule;
@@ -2321,7 +2302,7 @@ int tcpcheck_main(struct check *check)
        TRACE_PROTO("tcp-check passed", CHK_EV_TCPCHK_EVAL, check);
 
   out_end_tcpcheck:
-       if ((conn && conn->flags & CO_FL_ERROR) || (cs && cs->flags & CS_FL_ERROR)) {
+       if ((conn && conn->flags & CO_FL_ERROR) || (cs->flags & CS_FL_ERROR)) {
                TRACE_ERROR("report connection error", CHK_EV_TCPCHK_EVAL|CHK_EV_TCPCHK_ERR, check);
                chk_report_conn_err(check, errno, 0);
        }