]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: peers: Add a message for heartbeat.
authorFrédéric Lécaille <flecaille@haproxy.com>
Mon, 11 Feb 2019 16:49:39 +0000 (17:49 +0100)
committerWilly Tarreau <w@1wt.eu>
Fri, 1 Mar 2019 08:33:26 +0000 (09:33 +0100)
This patch implements peer heartbeat feature to prevent any haproxy peer
from reconnecting too often, consuming sockets for nothing.

To do so, we add PEER_MSG_CTRL_HEARTBEAT new message to PEER_MSG_CLASS_CONTROL peers
control class of messages. A ->heartbeat field is added to peer structs
to store the heatbeat timeout value which is handled by the same function as for ->reconnect
to control the session timeouts. A 2-bytes heartbeat message is sent every 3s when
no updates have to be sent. This way, the peer which receives such a message is sure
the remote peer is still alive. So, it resets the ->reconnect peer session
timeout to its initial value (5s). This prevents any reconnection to an
already connected alive peer.

include/types/peers.h
src/peers.c

index 5200d56b7a68e789328486ec087e63b84e725f55..6bc99c245b83468532d1ccb795416152dea58240 100644 (file)
@@ -62,6 +62,7 @@ struct peer {
        unsigned int flags;           /* peer session flags */
        unsigned int statuscode;      /* current/last session status code */
        unsigned int reconnect;       /* next connect timer */
+       unsigned int heartbeat;       /* next heartbeat timer */
        unsigned int confirm;         /* confirm message counter */
        struct appctx *appctx;        /* the appctx running it */
        struct shared_table *remote_table;
index 743bce88e8ccf774d8daa75799aa147643de46ba..016d41daa895f51c8a468f2fda78796429dde93a 100644 (file)
 #define        PEER_F_TEACH_COMPLETE           0x00000010 /* All that we know already taught to current peer, used only for a local peer */
 #define        PEER_F_LEARN_ASSIGN             0x00000100 /* Current peer was assigned for a lesson */
 #define        PEER_F_LEARN_NOTUP2DATE         0x00000200 /* Learn from peer finished but peer is not up to date */
+#define        PEER_F_HEARTBEAT                0x40000000 /* Heartbeat message to send. */
 #define        PEER_F_DWNGRD                   0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
 
 #define        PEER_TEACH_RESET                ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
 #define        PEER_LEARN_RESET                ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
 
+#define PEER_HEARTBEAT_TIMEOUT      3000 /* 3 seconds */
+
 /*****************************/
 /* Sync message class        */
 /*****************************/
@@ -105,6 +108,7 @@ enum {
        PEER_MSG_CTRL_RESYNCFINISHED,
        PEER_MSG_CTRL_RESYNCPARTIAL,
        PEER_MSG_CTRL_RESYNCCONFIRM,
+       PEER_MSG_CTRL_HEARTBEAT,
 };
 
 /*****************************/
@@ -886,6 +890,22 @@ static inline int peer_send_resync_finishedmsg(struct appctx *appctx, struct pee
        return peer_send_msg(appctx, peer_prepare_control_msg, &p);
 }
 
+/*
+ * Send a heartbeat message.
+ * Return 0 if the message could not be built modifying the appctx st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must  be considered as an error with an appctx st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_heartbeatmsg(struct appctx *appctx)
+{
+       struct peer_prep_params p = {
+               .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
+       };
+
+       return peer_send_msg(appctx, peer_prepare_control_msg, &p);
+}
+
 /*
  * Build a peer protocol error class message.
  * Returns the number of written bytes used to build the message if succeeded,
@@ -1605,6 +1625,9 @@ static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *pee
                        /* reset teaching flags to 0 */
                        peer->flags &= PEER_TEACH_RESET;
                }
+               else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
+                       peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
+               }
        }
        else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
                if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
@@ -2146,6 +2169,15 @@ switchstate:
                                goto switchstate;
 
 send_msgs:
+                               if (curpeer->flags & PEER_F_HEARTBEAT) {
+                                       curpeer->flags &= ~PEER_F_HEARTBEAT;
+                                       repl = peer_send_heartbeatmsg(appctx);
+                                       if (repl <= 0) {
+                                               if (repl == -1)
+                                                       goto out;
+                                               goto switchstate;
+                                       }
+                               }
                                /* we get here when a peer_recv_msg() returns 0 in reql */
                                repl = peer_send_msgs(appctx, curpeer);
                                if (repl <= 0) {
@@ -2264,6 +2296,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        struct conn_stream *cs;
 
        peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
+       peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
        peer->statuscode = PEER_SESS_SC_CONNECTCODE;
        s = NULL;
 
@@ -2342,8 +2375,8 @@ out_free_cs:
 }
 
 /*
- * Task processing function to manage re-connect and peer session
- * tasks wakeup on local update.
+ * Task processing function to manage re-connect, peer session
+ * tasks wakeup on local update and heartbeat.
  */
 static struct task *process_peer_sync(struct task * task, void *context, unsigned short state)
 {
@@ -2429,14 +2462,25 @@ static struct task *process_peer_sync(struct task * task, void *context, unsigne
                                                appctx_wakeup(ps->appctx);
                                        }
                                        else {
+                                               int update_to_push = 0;
+
                                                /* Awake session if there is data to push */
                                                for (st = ps->tables; st ; st = st->next) {
                                                        if ((int)(st->last_pushed - st->table->localupdate) < 0) {
                                                                /* wake up the peer handler to push local updates */
+                                                               update_to_push = 1;
+                                                               ps->flags &= ~PEER_F_HEARTBEAT;
+                                                               ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
                                                                appctx_wakeup(ps->appctx);
                                                                break;
                                                        }
                                                }
+                                               if (!update_to_push && tick_is_expired(ps->heartbeat, now_ms)) {
+                                                       ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+                                                       ps->flags |= PEER_F_HEARTBEAT;
+                                                       appctx_wakeup(ps->appctx);
+                                               }
+                                               task->expire = tick_first(task->expire, ps->heartbeat);
                                        }
                                        /* else do nothing */
                                } /* SUCCESSCODE */