]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: peers: Update the peer applet to use its own buffers
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 22 Jul 2025 17:01:10 +0000 (19:01 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Thu, 24 Jul 2025 10:06:49 +0000 (12:06 +0200)
Thanks to this patch, the peer applet is now using its own buffers. .rcv_buf
and .snd_buf callback functions are now defined to use the default raw
functions. The applet API is now used and any dependencies on the
stream-connectors and the channels were removed.

src/peers.c
src/sink.c

index e79d9d9d33481ea34ccf983940fd94577bacfa93..88e5cf50ef0f1d377ef4d155eb7173f22b334f6e 100644 (file)
@@ -1153,12 +1153,18 @@ static int peer_get_version(const char *str,
  */
 static inline int peer_getline(struct appctx  *appctx)
 {
-       struct stconn *sc = appctx_sc(appctx);
        int n;
 
-       n = co_getline(sc_oc(sc), trash.area, trash.size);
-       if (!n)
+       if (applet_get_inbuf(appctx) == NULL || !applet_input_data(appctx)) {
+               applet_need_more_data(appctx);
                return 0;
+       }
+
+       n = applet_getline(appctx, trash.area, trash.size);
+       if (!n) {
+               applet_need_more_data(appctx);
+               return 0;
+       }
 
        if (n < 0 || trash.area[n - 1] != '\n') {
                appctx->st0 = PEER_SESS_ST_END;
@@ -1170,8 +1176,7 @@ static inline int peer_getline(struct appctx  *appctx)
        else
                trash.area[n - 1] = 0;
 
-       co_skip(sc_oc(sc), n);
-
+       applet_skip_input(appctx, n);
        return n;
 }
 
@@ -2404,10 +2409,9 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
                                 uint32_t *msg_len, int *totl)
 {
        int reql;
-       struct stconn *sc = appctx_sc(appctx);
        char *cur;
 
-       reql = co_getblk(sc_oc(sc), msg_head, 2 * sizeof(char), *totl);
+       reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl);
        if (reql <= 0) /* closed or EOL not found */
                goto incomplete;
 
@@ -2421,11 +2425,11 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
        /* Read and Decode message length */
        msg_head    += *totl;
        msg_head_sz -= *totl;
-       reql = co_data(sc_oc(sc)) - *totl;
+       reql = applet_input_data(appctx) - *totl;
        if (reql > msg_head_sz)
                reql = msg_head_sz;
 
-       reql = co_getblk(sc_oc(sc), msg_head, reql, *totl);
+       reql = applet_getblk(appctx, msg_head, reql, *totl);
        if (reql <= 0) /* closed */
                goto incomplete;
 
@@ -2451,7 +2455,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
                        return -1;
                }
 
-               reql = co_getblk(sc_oc(sc), trash.area, *msg_len, *totl);
+               reql = applet_getblk(appctx, trash.area, *msg_len, *totl);
                if (reql <= 0) /* closed */
                        goto incomplete;
                *totl += reql;
@@ -2460,7 +2464,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms
        return 1;
 
  incomplete:
-       if (reql < 0 || (sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
+       if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
                /* there was an error or the message was truncated */
                appctx->st0 = PEER_SESS_ST_END;
                return -1;
@@ -2792,8 +2796,7 @@ static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer
        char *p;
        int reql;
        struct peer *peer;
-       struct stream *s = appctx_strm(appctx);
-       struct peers *peers = strm_fe(s)->parent;
+       struct peers *peers = strm_fe(appctx_strm(appctx))->parent;
 
        reql = peer_getline(appctx);
        if (!reql)
@@ -2897,9 +2900,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers)
  */
 static void peer_io_handler(struct appctx *appctx)
 {
-       struct stconn *sc = appctx_sc(appctx);
-       struct stream *s = __sc_strm(sc);
-       struct peers *curpeers = strm_fe(s)->parent;
        struct peer *curpeer = NULL;
        int reql = 0;
        int repl = 0;
@@ -2907,14 +2907,14 @@ static void peer_io_handler(struct appctx *appctx)
        int prev_state;
        int msg_done = 0;
 
-       if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
-               co_skip(sc_oc(sc), co_data(sc_oc(sc)));
+       if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
+               applet_reset_input(appctx);
                goto out;
        }
 
-       /* Check if the input buffer is available. */
-       if (sc_ib(sc)->size == 0) {
-               sc_need_room(sc, 0);
+       /* Check if the out buffer is available. */
+       if (!applet_get_outbuf(appctx)) {
+               applet_have_more_data(appctx);
                goto out;
        }
 
@@ -3015,7 +3015,7 @@ switchstate:
                                curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
                                curpeer->last_hdshk = now_ms;
 
-                               init_connected_peer(curpeer, curpeers);
+                               init_connected_peer(curpeer, curpeer->peers);
 
                                /* switch to waiting message state */
                                _HA_ATOMIC_INC(&connected_peers);
@@ -3054,9 +3054,7 @@ switchstate:
                                                goto switchstate;
                                        }
                                }
-
-                               if (sc_ic(sc)->flags & CF_WROTE_DATA)
-                                       curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
+                               curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
 
                                reql = peer_getline(appctx);
                                if (!reql)
@@ -3070,11 +3068,11 @@ switchstate:
                                curpeer->last_hdshk = now_ms;
 
                                /* Awake main task */
-                               task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG);
+                               task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG);
 
                                /* If status code is success */
                                if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
-                                       init_connected_peer(curpeer, curpeers);
+                                       init_connected_peer(curpeer, curpeer->peers);
                                }
                                else {
                                        if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
@@ -3141,7 +3139,7 @@ switchstate:
                                curpeer->flags |= PEER_F_ALIVE;
 
                                /* skip consumed message */
-                               co_skip(sc_oc(sc), totl);
+                               applet_skip_input(appctx, totl);
 
                                /* make sure we don't process too many at once */
                                if (msg_done >= peers_max_updates_at_once)
@@ -3154,7 +3152,7 @@ switchstate:
 send_msgs:
                                if (curpeer->flags & PEER_F_HEARTBEAT) {
                                        curpeer->flags &= ~PEER_F_HEARTBEAT;
-                                       repl = peer_send_heartbeatmsg(appctx, curpeer, curpeers);
+                                       repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
                                        if (repl <= 0) {
                                                if (repl == -1)
                                                        goto out;
@@ -3163,7 +3161,7 @@ send_msgs:
                                        curpeer->tx_hbt++;
                                }
                                /* we get here when a peer_recv_msg() returns 0 in reql */
-                               repl = peer_send_msgs(appctx, curpeer, curpeers);
+                               repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
                                if (repl <= 0) {
                                        if (repl == -1)
                                                goto out;
@@ -3214,14 +3212,14 @@ send_msgs:
                                        HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
                                        curpeer = NULL;
                                }
-                               se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
-                               co_skip(sc_oc(sc), co_data(sc_oc(sc)));
+                               applet_set_eos(appctx);
+                               applet_reset_input(appctx);
                                goto out;
                        }
                }
        }
 out:
-       sc_opposite(sc)->flags |= SC_FL_RCV_ONCE;
+       /* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */
 
        if (curpeer)
                HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
@@ -3232,6 +3230,8 @@ static struct applet peer_applet = {
        .obj_type = OBJ_TYPE_APPLET,
        .name = "<PEER>", /* used for logging */
        .fct = peer_io_handler,
+       .rcv_buf = appctx_raw_rcv_buf,
+       .snd_buf = appctx_raw_snd_buf,
        .init = peer_session_init,
        .release = peer_session_release,
 };
index a98d806c45b2d8ef437462fe8de598a24a67a7a3..ce948ff009596ed055294d22f6b76e5309a41ca3 100644 (file)
@@ -499,7 +499,6 @@ soft_close:
         * soft_close will result in the port staying in TIME_WAIT state:
         * don't abuse from soft_close!
         */
-       applet_set_eoi(appctx);
        applet_set_eos(appctx);
 
        /* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR