]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: check: Use the CS to handle subscriptions for read/write events
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 18 May 2022 13:57:15 +0000 (15:57 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Thu, 19 May 2022 08:12:38 +0000 (10:12 +0200)
Instead of using the health-check to subscribe to read/write events, we now
rely on the conn-stream. Indeed, on the server side, the conn-stream's
endpoint is a multiplexer. Thus it seems appropriate to handle subscriptions
for read/write events the same way than for the streams. Of course, the I/O
callback function is not the same. We use srv_chk_io_cb() instead of
cs_conn_io_cb().

include/haproxy/check-t.h
src/check.c
src/conn_stream.c
src/tcpcheck.c

index c7a88a34444f2c6da398c8e93f6641840ec16476..066737751d87b2ce35560f35478204cd013fe901 100644 (file)
@@ -176,7 +176,6 @@ struct check {
        char **envp;                            /* the environment to use if running a process-based check */
        struct pid_list *curpid;                /* entry in pid_list used for current process-based test, or -1 if not in test */
        struct sockaddr_storage addr;           /* the address to check */
-       struct wait_event wait_list;            /* Waiting for I/O events */
        char *sni;                              /* Server name */
        char *alpn_str;                         /* ALPN to use for checks */
        int alpn_len;                           /* ALPN string length */
index f80c66e7476fd51e86454693b0a7052646423b9d..c8891666c94ca52ccc4e380a2bf73970e65d3ebd 100644 (file)
@@ -1070,8 +1070,7 @@ static int wake_srv_chk(struct conn_stream *cs)
 /* This function checks if any I/O is wanted, and if so, attempts to do so */
 struct task *srv_chk_io_cb(struct task *t, void *ctx, unsigned int state)
 {
-       struct check *check = ctx;
-       struct conn_stream *cs = check->cs;
+       struct conn_stream *cs = ctx;
 
        wake_srv_chk(cs);
        return NULL;
@@ -1188,13 +1187,6 @@ struct task *process_chk_conn(struct task *t, void *context, unsigned int state)
        }
 
        if (cs) {
-               if (conn && check->wait_list.events)
-                       conn->mux->unsubscribe(cs, check->wait_list.events, &check->wait_list);
-               /* We may have been scheduled to run, and the
-                * I/O handler expects to have a cs, so remove
-                * the tasklet
-                */
-               tasklet_remove_from_tasklet_list(check->wait_list.tasklet);
                cs_destroy(cs);
                cs = check->cs = NULL;
                conn = NULL;
@@ -1280,16 +1272,18 @@ int check_buf_available(void *target)
 {
        struct check *check = target;
 
+       BUG_ON(!check->cs);
+
        if ((check->state & CHK_ST_IN_ALLOC) && b_alloc(&check->bi)) {
                TRACE_STATE("unblocking check, input buffer allocated", CHK_EV_TCPCHK_EXP|CHK_EV_RX_BLK, check);
                check->state &= ~CHK_ST_IN_ALLOC;
-               tasklet_wakeup(check->wait_list.tasklet);
+               tasklet_wakeup(check->cs->wait_event.tasklet);
                return 1;
        }
        if ((check->state & CHK_ST_OUT_ALLOC) && b_alloc(&check->bo)) {
                TRACE_STATE("unblocking check, output buffer allocated", CHK_EV_TCPCHK_SND|CHK_EV_TX_BLK, check);
                check->state &= ~CHK_ST_OUT_ALLOC;
-               tasklet_wakeup(check->wait_list.tasklet);
+               tasklet_wakeup(check->cs->wait_event.tasklet);
                return 1;
        }
 
@@ -1331,13 +1325,6 @@ const char *init_check(struct check *check, int type)
        check->bi = BUF_NULL;
        check->bo = BUF_NULL;
        LIST_INIT(&check->buf_wait.list);
-
-       check->wait_list.tasklet = tasklet_new();
-       if (!check->wait_list.tasklet)
-               return "out of memory while allocating check tasklet";
-       check->wait_list.events = 0;
-       check->wait_list.tasklet->process = srv_chk_io_cb;
-       check->wait_list.tasklet->context = check;
        return NULL;
 }
 
@@ -1357,8 +1344,6 @@ void free_check(struct check *check)
        }
 
        task_destroy(check->task);
-       if (check->wait_list.tasklet)
-               tasklet_free(check->wait_list.tasklet);
 
        check_release_buf(check, &check->bi);
        check_release_buf(check, &check->bo);
index 7fc3a493552fb3d7c890f9bfd5b068fef572fbf4..df916cc9b20d4e9202d1de26405ce686b4faf427 100644 (file)
@@ -266,8 +266,18 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
                cs->ops = &cs_app_conn_ops;
                cs->data_cb = &cs_data_conn_cb;
        }
-       else if (cs_check(cs))
+       else if (cs_check(cs)) {
+               if (!cs->wait_event.tasklet) {
+                       cs->wait_event.tasklet = tasklet_new();
+                       if (!cs->wait_event.tasklet)
+                               return -1;
+                       cs->wait_event.tasklet->process = srv_chk_io_cb;
+                       cs->wait_event.tasklet->context = cs;
+                       cs->wait_event.events = 0;
+               }
+
                cs->data_cb = &check_conn_cb;
+       }
        return 0;
 }
 
@@ -340,7 +350,6 @@ static void cs_detach_endp(struct conn_stream **csp)
                struct cs_endpoint *endp = cs->endp;
 
                if (conn->mux) {
-                       /* TODO: handle unsubscribe for healthchecks too */
                        if (cs->wait_event.events != 0)
                                conn->mux->unsubscribe(cs, cs->wait_event.events, &cs->wait_event);
                        endp->flags |= CS_EP_ORPHAN;
index 4a4e071d3940143a5bccd1dc12918b9e40232deb..3b1d75a18f0aaa00b218a2ff525b7853541f2b69 100644 (file)
@@ -1071,8 +1071,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
                if (conn->flags & CO_FL_WAIT_XPRT) {
                        /* We are still waiting for the connection establishment */
                        if (next && next->action == TCPCHK_ACT_SEND) {
-                               if (!(check->wait_list.events & SUB_RETRY_SEND))
-                                       conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
+                               if (!(check->cs->wait_event.events & SUB_RETRY_SEND))
+                                       conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->cs->wait_event);
                                ret = TCPCHK_EVAL_WAIT;
                                TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
                        }
@@ -1108,12 +1108,8 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
                goto fail_check;
        }
        conn->ctx = check->cs;
-       tasklet_set_tid(check->wait_list.tasklet, tid);
        conn_set_owner(conn, check->sess, NULL);
 
-       /* Maybe there were an older connection we were waiting on */
-       check->wait_list.events = 0;
-
        /* no client address */
        if (!sockaddr_alloc(&conn->dst, NULL, 0)) {
                TRACE_ERROR("sockaddr allocation error", CHK_EV_TCPCHK_CONN|CHK_EV_TCPCHK_ERR, check);
@@ -1298,9 +1294,9 @@ enum tcpcheck_eval_ret tcpcheck_eval_connect(struct check *check, struct tcpchec
        if (conn->flags & CO_FL_WAIT_XPRT) {
                if (conn->mux) {
                        if (next && next->action == TCPCHK_ACT_SEND)
-                               conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->wait_list);
+                               conn->mux->subscribe(check->cs, SUB_RETRY_SEND, &check->cs->wait_event);
                        else
-                               conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->wait_list);
+                               conn->mux->subscribe(check->cs, SUB_RETRY_RECV, &check->cs->wait_event);
                }
                ret = TCPCHK_EVAL_WAIT;
                TRACE_DEVEL("not connected yet", CHK_EV_TCPCHK_CONN, check);
@@ -1495,7 +1491,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_send(struct check *check, struct tcpcheck_r
                }
        }
        if ((IS_HTX_CONN(conn) && !htx_is_empty(htxbuf(&check->bo))) || (!IS_HTX_CONN(conn) && b_data(&check->bo))) {
-               conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
+               conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event);
                ret = TCPCHK_EVAL_WAIT;
                TRACE_DEVEL("data not fully sent, wait", CHK_EV_TCPCHK_SND|CHK_EV_TX_DATA, check);
                goto out;
@@ -1547,7 +1543,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_recv(struct check *check, struct tcpcheck_r
 
        TRACE_ENTER(CHK_EV_RX_DATA, check);
 
-       if (check->wait_list.events & SUB_RETRY_RECV) {
+       if (cs->wait_event.events & SUB_RETRY_RECV) {
                TRACE_DEVEL("waiting for response", CHK_EV_RX_DATA, check);
                goto wait_more_data;
        }
@@ -1600,7 +1596,7 @@ enum tcpcheck_eval_ret tcpcheck_eval_recv(struct check *check, struct tcpcheck_r
                        goto out;
                }
                if (!(cs->endp->flags & (CS_EP_WANT_ROOM|CS_EP_ERROR|CS_EP_EOS))) {
-                       conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
+                       conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
                        TRACE_DEVEL("waiting for response", CHK_EV_RX_DATA, check);
                        goto wait_more_data;
                }
@@ -2237,8 +2233,8 @@ int tcpcheck_main(struct check *check)
 
                        if (eval_ret == TCPCHK_EVAL_WAIT) {
                                check->current_step = rule->expect.head;
-                               if (!(check->wait_list.events & SUB_RETRY_RECV))
-                                       conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
+                               if (!(cs->wait_event.events & SUB_RETRY_RECV))
+                                       conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
                        }
                        break;
                case TCPCHK_ACT_ACTION_KW: