peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
peer->appstate = PEER_APP_ST_STOPPING;
TRACE_STATE("peer session stopping", PEERS_EV_SESS_END, peer->appctx, peer);
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *),
struct peer_prep_params *params)
{
+ struct peer *peer = appctx->svcctx;
int ret, msglen;
TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
return 0;
}
+
/* message to buffer */
ret = applet_putblk(appctx, trash.area, msglen);
if (ret <= 0) {
appctx->st0 = PEER_SESS_ST_END;
}
}
+ else if (peer) {
+ /* A message was sent, rearm the heartbeat timer */
+ peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ peer->flags &= ~PEER_F_HEARTBEAT;
+ }
TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
return ret;
}
else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
- peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
peer->rx_hbt++;
}
}
if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl))
goto switchstate;
+ curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
curpeer->flags |= PEER_F_ALIVE;
/* skip consumed message */
goto switchstate;
send_msgs:
- if (curpeer->flags & PEER_F_HEARTBEAT) {
+ /* we get here when a peer_recv_msg() returns 0 in reql */
+ repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
+ if (repl <= 0) {
+ if (repl == -1)
+ goto out;
+ goto switchstate;
+ }
+
+ if (tick_is_expired(curpeer->heartbeat, now_ms) || (curpeer->flags & PEER_F_HEARTBEAT)) {
curpeer->flags &= ~PEER_F_HEARTBEAT;
repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
if (repl <= 0) {
}
curpeer->tx_hbt++;
}
- /* we get here when a peer_recv_msg() returns 0 in reql */
- repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
- if (repl <= 0) {
- if (repl == -1)
- goto out;
- goto switchstate;
- }
/* noting more to do */
goto out;
appctx_wakeup(peer->appctx);
}
else {
- int update_to_push = 0;
-
- /* Awake session if there is data to push */
- for (st = peer->tables; st ; st = st->next) {
- if (st->last_update != st->table->last_update) {
- update_to_push = 1;
-
- /* wake up the peer handler to push local updates */
- /* There is no need to send a heartbeat message
- * when some updates must be pushed. The remote
- * peer will consider <peer> peer as alive when it will
- * receive these updates.
- */
- peer->flags &= ~PEER_F_HEARTBEAT;
- /* Re-schedule another one later. */
- peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- /* Refresh reconnect if necessary */
- if (tick_is_expired(peer->reconnect, now_ms))
- peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
- /* We are going to send updates, let's ensure we will
- * come back to send heartbeat messages or to reconnect.
- */
- TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer);
- task->expire = tick_first(peer->reconnect, peer->heartbeat);
- appctx_wakeup(peer->appctx);
- break;
- }
+ if (tick_is_expired(peer->reconnect, now_ms)) {
+ if (peer->flags & PEER_F_ALIVE) {
+ /* This peer was alive during a 'reconnect' period.
+ * Flag it as not alive again for the next period.
+ */
+ peer->flags &= ~PEER_F_ALIVE;
+ TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer);
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
+ }
+ else {
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ peer->heartbeat = TICK_ETERNITY;
+ TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer);
+ peer_session_forceshutdown(peer);
+ sync_peer_app_state(peers, peer);
+ peer->no_hbt++;
+ }
}
- /* When there are updates to send we do not reconnect
- * and do not send heartbeat message either.
- */
- if (!update_to_push) {
- if (tick_is_expired(peer->reconnect, now_ms)) {
- if (peer->flags & PEER_F_ALIVE) {
- /* This peer was alive during a 'reconnect' period.
- * Flag it as not alive again for the next period.
- */
- peer->flags &= ~PEER_F_ALIVE;
- TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer);
- peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
- }
- else {
- peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- peer->heartbeat = TICK_ETERNITY;
- TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer);
- peer_session_forceshutdown(peer);
- sync_peer_app_state(peers, peer);
- peer->no_hbt++;
- }
- }
- else if (tick_is_expired(peer->heartbeat, now_ms)) {
+
+ if (peer->appctx) {
+ if (tick_is_expired(peer->heartbeat, now_ms)) {
peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
peer->flags |= PEER_F_HEARTBEAT;
TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer);
appctx_wakeup(peer->appctx);
}
- task->expire = tick_first(peer->reconnect, peer->heartbeat);
+ else {
+ for (st = peer->tables; st ; st = st->next) {
+ if (st->last_update != st->table->last_update) {
+ appctx_wakeup(peer->appctx);
+ break;
+ }
+ }
+ }
}
+
+ task->expire = tick_first(peer->reconnect, peer->heartbeat);
}
/* else do nothing */
} /* SUCCESSCODE */