From: Christopher Faulet Date: Tue, 22 Jul 2025 17:01:10 +0000 (+0200) Subject: MEDIUM: peers: Update the peer applet to use its own buffers X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=9ad435e302c565be74fa00c340ca460c3f1b4c01;p=thirdparty%2Fhaproxy.git MEDIUM: peers: Update the peer applet to use its own buffers Thanks to this patch, the peer applet is now using its own buffers. .rcv_buf and .snd_buf callback functions are now defined to use the default raw functions. The applet API is now used and any dependencies on the stream-connectors and the channels were removed. --- diff --git a/src/peers.c b/src/peers.c index e79d9d9d3..88e5cf50e 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1153,12 +1153,18 @@ static int peer_get_version(const char *str, */ static inline int peer_getline(struct appctx *appctx) { - struct stconn *sc = appctx_sc(appctx); int n; - n = co_getline(sc_oc(sc), trash.area, trash.size); - if (!n) + if (applet_get_inbuf(appctx) == NULL || !applet_input_data(appctx)) { + applet_need_more_data(appctx); return 0; + } + + n = applet_getline(appctx, trash.area, trash.size); + if (!n) { + applet_need_more_data(appctx); + return 0; + } if (n < 0 || trash.area[n - 1] != '\n') { appctx->st0 = PEER_SESS_ST_END; @@ -1170,8 +1176,7 @@ static inline int peer_getline(struct appctx *appctx) else trash.area[n - 1] = 0; - co_skip(sc_oc(sc), n); - + applet_skip_input(appctx, n); return n; } @@ -2404,10 +2409,9 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms uint32_t *msg_len, int *totl) { int reql; - struct stconn *sc = appctx_sc(appctx); char *cur; - reql = co_getblk(sc_oc(sc), msg_head, 2 * sizeof(char), *totl); + reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl); if (reql <= 0) /* closed or EOL not found */ goto incomplete; @@ -2421,11 +2425,11 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms /* Read and Decode message length */ msg_head += *totl; msg_head_sz -= *totl; - reql = co_data(sc_oc(sc)) - *totl; + reql = applet_input_data(appctx) - *totl; if (reql > msg_head_sz) reql = msg_head_sz; - reql = co_getblk(sc_oc(sc), msg_head, reql, *totl); + reql = applet_getblk(appctx, msg_head, reql, *totl); if (reql <= 0) /* closed */ goto incomplete; @@ -2451,7 +2455,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms return -1; } - reql = co_getblk(sc_oc(sc), trash.area, *msg_len, *totl); + reql = applet_getblk(appctx, trash.area, *msg_len, *totl); if (reql <= 0) /* closed */ goto incomplete; *totl += reql; @@ -2460,7 +2464,7 @@ static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t ms return 1; incomplete: - if (reql < 0 || (sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) { + if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) { /* there was an error or the message was truncated */ appctx->st0 = PEER_SESS_ST_END; return -1; @@ -2792,8 +2796,7 @@ static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer char *p; int reql; struct peer *peer; - struct stream *s = appctx_strm(appctx); - struct peers *peers = strm_fe(s)->parent; + struct peers *peers = strm_fe(appctx_strm(appctx))->parent; reql = peer_getline(appctx); if (!reql) @@ -2897,9 +2900,6 @@ static inline void init_connected_peer(struct peer *peer, struct peers *peers) */ static void peer_io_handler(struct appctx *appctx) { - struct stconn *sc = appctx_sc(appctx); - struct stream *s = __sc_strm(sc); - struct peers *curpeers = strm_fe(s)->parent; struct peer *curpeer = NULL; int reql = 0; int repl = 0; @@ -2907,14 +2907,14 @@ static void peer_io_handler(struct appctx *appctx) int prev_state; int msg_done = 0; - if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) { - co_skip(sc_oc(sc), co_data(sc_oc(sc))); + if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) { + applet_reset_input(appctx); goto out; } - /* Check if the input buffer is available. */ - if (sc_ib(sc)->size == 0) { - sc_need_room(sc, 0); + /* Check if the out buffer is available. */ + if (!applet_get_outbuf(appctx)) { + applet_have_more_data(appctx); goto out; } @@ -3015,7 +3015,7 @@ switchstate: curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE; curpeer->last_hdshk = now_ms; - init_connected_peer(curpeer, curpeers); + init_connected_peer(curpeer, curpeer->peers); /* switch to waiting message state */ _HA_ATOMIC_INC(&connected_peers); @@ -3054,9 +3054,7 @@ switchstate: goto switchstate; } } - - if (sc_ic(sc)->flags & CF_WROTE_DATA) - curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE; + curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE; reql = peer_getline(appctx); if (!reql) @@ -3070,11 +3068,11 @@ switchstate: curpeer->last_hdshk = now_ms; /* Awake main task */ - task_wakeup(curpeers->sync_task, TASK_WOKEN_MSG); + task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG); /* If status code is success */ if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) { - init_connected_peer(curpeer, curpeers); + init_connected_peer(curpeer, curpeer->peers); } else { if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION) @@ -3141,7 +3139,7 @@ switchstate: curpeer->flags |= PEER_F_ALIVE; /* skip consumed message */ - co_skip(sc_oc(sc), totl); + applet_skip_input(appctx, totl); /* make sure we don't process too many at once */ if (msg_done >= peers_max_updates_at_once) @@ -3154,7 +3152,7 @@ switchstate: send_msgs: if (curpeer->flags & PEER_F_HEARTBEAT) { curpeer->flags &= ~PEER_F_HEARTBEAT; - repl = peer_send_heartbeatmsg(appctx, curpeer, curpeers); + repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers); if (repl <= 0) { if (repl == -1) goto out; @@ -3163,7 +3161,7 @@ send_msgs: curpeer->tx_hbt++; } /* we get here when a peer_recv_msg() returns 0 in reql */ - repl = peer_send_msgs(appctx, curpeer, curpeers); + repl = peer_send_msgs(appctx, curpeer, curpeer->peers); if (repl <= 0) { if (repl == -1) goto out; @@ -3214,14 +3212,14 @@ send_msgs: HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); curpeer = NULL; } - se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI); - co_skip(sc_oc(sc), co_data(sc_oc(sc))); + applet_set_eos(appctx); + applet_reset_input(appctx); goto out; } } } out: - sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; + /* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */ if (curpeer) HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); @@ -3232,6 +3230,8 @@ static struct applet peer_applet = { .obj_type = OBJ_TYPE_APPLET, .name = "", /* used for logging */ .fct = peer_io_handler, + .rcv_buf = appctx_raw_rcv_buf, + .snd_buf = appctx_raw_snd_buf, .init = peer_session_init, .release = peer_session_release, }; diff --git a/src/sink.c b/src/sink.c index a98d806c4..ce948ff00 100644 --- a/src/sink.c +++ b/src/sink.c @@ -499,7 +499,6 @@ soft_close: * soft_close will result in the port staying in TIME_WAIT state: * don't abuse from soft_close! */ - applet_set_eoi(appctx); applet_set_eos(appctx); /* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR