From: Christopher Faulet Date: Wed, 22 Oct 2025 15:53:24 +0000 (+0200) Subject: MEDIUM: peer: Improve management of reconnect timer and heartbeat messages X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=7bc7d06ce5a520c26ba17b4be232cbf9ee3e1ed4;p=thirdparty%2Fhaproxy.git MEDIUM: peer: Improve management of reconnect timer and heartbeat messages heartbeat messages are sent to keep a connection active. However, a heartbeat messages was sent periodically, even if some other messages were sent during this period. It is not an issue but it is useless. heartbeat messages should only be sent if nothing was sent to the peer since a moment. So, in this patch, the heartbeat timer is rearmed each time a message is sent. On the receiver side, the reconnect timer was only rearmed when a heartbeat message was received instead of rearming it for any messages. Again, it is not an issue because the inactivity is managed with PEER_F_ALIVE flag. This flag is removed when the reconnect timer timed out but it is reinserted when something is received. But a periodic wakeup may be uselessly performed. So, in this patch, the reconnect timer is rearmed each time a message is received. Finally, to have waiting to long to reconnect when a peer session is closed, the reconnect timeout is rearmed with a low value when the peer applet is released. --- diff --git a/src/peers.c b/src/peers.c index 9ccdf55db..9afcd99f7 100644 --- a/src/peers.c +++ b/src/peers.c @@ -1104,6 +1104,7 @@ void __peer_session_deinit(struct peer *peer) peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; peer->appstate = PEER_APP_ST_STOPPING; TRACE_STATE("peer session stopping", PEERS_EV_SESS_END, peer->appctx, peer); + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); task_wakeup(peers->sync_task, TASK_WOKEN_MSG); } @@ -1246,6 +1247,7 @@ static inline int peer_send_msg(struct appctx *appctx, int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *), struct peer_prep_params *params) { + struct peer *peer = appctx->svcctx; int ret, msglen; TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); @@ -1257,6 +1259,7 @@ static inline int peer_send_msg(struct appctx *appctx, return 0; } + /* message to buffer */ ret = applet_putblk(appctx, trash.area, msglen); if (ret <= 0) { @@ -1265,6 +1268,11 @@ static inline int peer_send_msg(struct appctx *appctx, appctx->st0 = PEER_SESS_ST_END; } } + else if (peer) { + /* A message was sent, rearm the heartbeat timer */ + peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); + peer->flags &= ~PEER_F_HEARTBEAT; + } TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); return ret; @@ -2502,7 +2510,6 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee } else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) { TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); peer->rx_hbt++; } } @@ -3091,6 +3098,7 @@ switchstate: if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl)) goto switchstate; + curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); curpeer->flags |= PEER_F_ALIVE; /* skip consumed message */ @@ -3105,7 +3113,15 @@ switchstate: goto switchstate; send_msgs: - if (curpeer->flags & PEER_F_HEARTBEAT) { + /* we get here when a peer_recv_msg() returns 0 in reql */ + repl = peer_send_msgs(appctx, curpeer, curpeer->peers); + if (repl <= 0) { + if (repl == -1) + goto out; + goto switchstate; + } + + if (tick_is_expired(curpeer->heartbeat, now_ms) || (curpeer->flags & PEER_F_HEARTBEAT)) { curpeer->flags &= ~PEER_F_HEARTBEAT; repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers); if (repl <= 0) { @@ -3115,13 +3131,6 @@ send_msgs: } curpeer->tx_hbt++; } - /* we get here when a peer_recv_msg() returns 0 in reql */ - repl = peer_send_msgs(appctx, curpeer, curpeer->peers); - if (repl <= 0) { - if (repl == -1) - goto out; - goto switchstate; - } /* noting more to do */ goto out; @@ -3501,64 +3510,43 @@ static void __process_running_peer_sync(struct task *task, struct peers *peers, appctx_wakeup(peer->appctx); } else { - int update_to_push = 0; - - /* Awake session if there is data to push */ - for (st = peer->tables; st ; st = st->next) { - if (st->last_update != st->table->last_update) { - update_to_push = 1; - - /* wake up the peer handler to push local updates */ - /* There is no need to send a heartbeat message - * when some updates must be pushed. The remote - * peer will consider peer as alive when it will - * receive these updates. - */ - peer->flags &= ~PEER_F_HEARTBEAT; - /* Re-schedule another one later. */ - peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); - /* Refresh reconnect if necessary */ - if (tick_is_expired(peer->reconnect, now_ms)) - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - /* We are going to send updates, let's ensure we will - * come back to send heartbeat messages or to reconnect. - */ - TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer); - task->expire = tick_first(peer->reconnect, peer->heartbeat); - appctx_wakeup(peer->appctx); - break; - } + if (tick_is_expired(peer->reconnect, now_ms)) { + if (peer->flags & PEER_F_ALIVE) { + /* This peer was alive during a 'reconnect' period. + * Flag it as not alive again for the next period. + */ + peer->flags &= ~PEER_F_ALIVE; + TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); + } + else { + peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); + peer->heartbeat = TICK_ETERNITY; + TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); + peer_session_forceshutdown(peer); + sync_peer_app_state(peers, peer); + peer->no_hbt++; + } } - /* When there are updates to send we do not reconnect - * and do not send heartbeat message either. - */ - if (!update_to_push) { - if (tick_is_expired(peer->reconnect, now_ms)) { - if (peer->flags & PEER_F_ALIVE) { - /* This peer was alive during a 'reconnect' period. - * Flag it as not alive again for the next period. - */ - peer->flags &= ~PEER_F_ALIVE; - TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); - } - else { - peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); - peer->heartbeat = TICK_ETERNITY; - TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); - peer_session_forceshutdown(peer); - sync_peer_app_state(peers, peer); - peer->no_hbt++; - } - } - else if (tick_is_expired(peer->heartbeat, now_ms)) { + + if (peer->appctx) { + if (tick_is_expired(peer->heartbeat, now_ms)) { peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); peer->flags |= PEER_F_HEARTBEAT; TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer); appctx_wakeup(peer->appctx); } - task->expire = tick_first(peer->reconnect, peer->heartbeat); + else { + for (st = peer->tables; st ; st = st->next) { + if (st->last_update != st->table->last_update) { + appctx_wakeup(peer->appctx); + break; + } + } + } } + + task->expire = tick_first(peer->reconnect, peer->heartbeat); } /* else do nothing */ } /* SUCCESSCODE */