#define PEER_F_TEACH_COMPLETE 0x00000010 /* All that we know already taught to current peer, used only for a local peer */
#define PEER_F_LEARN_ASSIGN 0x00000100 /* Current peer was assigned for a lesson */
#define PEER_F_LEARN_NOTUP2DATE 0x00000200 /* Learn from peer finished but peer is not up to date */
+#define PEER_F_HEARTBEAT 0x40000000 /* Heartbeat message to send. */
#define PEER_F_DWNGRD 0x80000000 /* When this flag is enabled, we must downgrade the supported version announced during peer sessions. */
#define PEER_TEACH_RESET ~(PEER_F_TEACH_PROCESS|PEER_F_TEACH_FINISHED) /* PEER_F_TEACH_COMPLETE should never be reset */
#define PEER_LEARN_RESET ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_NOTUP2DATE)
+#define PEER_HEARTBEAT_TIMEOUT 3000 /* 3 seconds */
+
/*****************************/
/* Sync message class */
/*****************************/
PEER_MSG_CTRL_RESYNCFINISHED,
PEER_MSG_CTRL_RESYNCPARTIAL,
PEER_MSG_CTRL_RESYNCCONFIRM,
+ PEER_MSG_CTRL_HEARTBEAT,
};
/*****************************/
return peer_send_msg(appctx, peer_prepare_control_msg, &p);
}
+/*
+ * Send a heartbeat message.
+ * Return 0 if the message could not be built modifying the appctx st0 to PEER_SESS_ST_END value.
+ * Returns -1 if there was not enough room left to send the message,
+ * any other negative returned value must be considered as an error with an appctx st0
+ * returned value equal to PEER_SESS_ST_END.
+ */
+static inline int peer_send_heartbeatmsg(struct appctx *appctx)
+{
+ struct peer_prep_params p = {
+ .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
+ };
+
+ return peer_send_msg(appctx, peer_prepare_control_msg, &p);
+}
+
/*
* Build a peer protocol error class message.
* Returns the number of written bytes used to build the message if succeeded,
/* reset teaching flags to 0 */
peer->flags &= PEER_TEACH_RESET;
}
+ else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
+ }
}
else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
goto switchstate;
send_msgs:
+ if (curpeer->flags & PEER_F_HEARTBEAT) {
+ curpeer->flags &= ~PEER_F_HEARTBEAT;
+ repl = peer_send_heartbeatmsg(appctx);
+ if (repl <= 0) {
+ if (repl == -1)
+ goto out;
+ goto switchstate;
+ }
+ }
/* we get here when a peer_recv_msg() returns 0 in reql */
repl = peer_send_msgs(appctx, curpeer);
if (repl <= 0) {
struct conn_stream *cs;
peer->reconnect = tick_add(now_ms, MS_TO_TICKS(5000));
+ peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
peer->statuscode = PEER_SESS_SC_CONNECTCODE;
s = NULL;
}
/*
- * Task processing function to manage re-connect and peer session
- * tasks wakeup on local update.
+ * Task processing function to manage re-connect, peer session
+ * tasks wakeup on local update and heartbeat.
*/
static struct task *process_peer_sync(struct task * task, void *context, unsigned short state)
{
appctx_wakeup(ps->appctx);
}
else {
+ int update_to_push = 0;
+
/* Awake session if there is data to push */
for (st = ps->tables; st ; st = st->next) {
if ((int)(st->last_pushed - st->table->localupdate) < 0) {
/* wake up the peer handler to push local updates */
+ update_to_push = 1;
+ ps->flags &= ~PEER_F_HEARTBEAT;
+ ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
appctx_wakeup(ps->appctx);
break;
}
}
+ if (!update_to_push && tick_is_expired(ps->heartbeat, now_ms)) {
+ ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ ps->flags |= PEER_F_HEARTBEAT;
+ appctx_wakeup(ps->appctx);
+ }
+ task->expire = tick_first(task->expire, ps->heartbeat);
}
/* else do nothing */
} /* SUCCESSCODE */