return NULL;
}
-/*
- * Task processing function to manage re-connect, peer session
- * tasks wakeup on local update and heartbeat. Let's keep it exported so that it
- * resolves in stack traces and "show tasks".
- */
-struct task *process_peer_sync(struct task * task, void *context, unsigned int state)
+static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
{
- struct peers *peers = context;
struct peer *ps;
struct shared_table *st;
- task->expire = TICK_ETERNITY;
-
/* Acquire lock for all peers of the section */
for (ps = peers->remote; ps; ps = ps->next)
HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
- if (!stopping) {
- /* Normal case (not soft stop)*/
-
- /* resync timeout set to TICK_ETERNITY means we just start
- * a new process and timer was not initialized.
- * We must arm this timer to switch to a request to a remote
- * node if incoming connection from old local process never
- * comes.
- */
- if (peers->resync_timeout == TICK_ETERNITY)
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ /* resync timeout set to TICK_ETERNITY means we just start
+ * a new process and timer was not initialized.
+ * We must arm this timer to switch to a request to a remote
+ * node if incoming connection from old local process never
+ * comes.
+ */
+ if (peers->resync_timeout == TICK_ETERNITY)
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
- (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* Resync from local peer needed
- no peer was assigned for the lesson
- and no old local peer found
- or resync timeout expire */
+ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
+ (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* Resync from local peer needed
+ no peer was assigned for the lesson
+ and no old local peer found
+ or resync timeout expire */
- /* flag no more resync from local, to try resync from remotes */
- peers->flags |= PEERS_F_RESYNC_LOCAL;
- peers->flags |= PEERS_F_RESYNC_LOCALTIMEOUT;
+ /* flag no more resync from local, to try resync from remotes */
+ peers->flags |= PEERS_F_RESYNC_LOCAL;
+ peers->flags |= PEERS_F_RESYNC_LOCALTIMEOUT;
- /* reschedule a resync */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- }
+ /* reschedule a resync */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
- /* For each session */
- for (ps = peers->remote; ps; ps = ps->next) {
- /* For each remote peers */
- if (!ps->local) {
- if (!ps->appctx) {
- /* no active peer connection */
- if (ps->statuscode == 0 ||
- ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
- ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
- ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
- tick_is_expired(ps->reconnect, now_ms))) {
- /* connection never tried
- * or previous peer connection established with success
- * or previous peer connection failed while connecting
- * and reconnection timer is expired */
-
- /* retry a connect */
- ps->appctx = peer_session_create(peers, ps);
- }
- else if (!tick_is_expired(ps->reconnect, now_ms)) {
- /* If previous session failed during connection
- * but reconnection timer is not expired */
+ /* For each session */
+ for (ps = peers->remote; ps; ps = ps->next) {
+ /* For each remote peers */
+ if (!ps->local) {
+ if (!ps->appctx) {
+ /* no active peer connection */
+ if (ps->statuscode == 0 ||
+ ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
+ ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
+ ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
+ tick_is_expired(ps->reconnect, now_ms))) {
+ /* connection never tried
+ * or previous peer connection established with success
+ * or previous peer connection failed while connecting
+ * and reconnection timer is expired */
+
+ /* retry a connect */
+ ps->appctx = peer_session_create(peers, ps);
+ }
+ else if (!tick_is_expired(ps->reconnect, now_ms)) {
+ /* If previous session failed during connection
+ * but reconnection timer is not expired */
- /* reschedule task for reconnect */
- task->expire = tick_first(task->expire, ps->reconnect);
- }
- /* else do nothing */
- } /* !ps->appctx */
- else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
- /* current peer connection is active and established */
- if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
- /* Resync from a remote is needed
- * and no peer was assigned for lesson
- * and current peer may be up2date */
-
- /* assign peer for the lesson */
- ps->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
-
- /* wake up peer handler to handle a request of resync */
- appctx_wakeup(ps->appctx);
+ /* reschedule task for reconnect */
+ task->expire = tick_first(task->expire, ps->reconnect);
+ }
+ /* else do nothing */
+ } /* !ps->appctx */
+ else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
+ /* current peer connection is active and established */
+ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+ !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
+ /* Resync from a remote is needed
+ * and no peer was assigned for lesson
+ * and current peer may be up2date */
+
+ /* assign peer for the lesson */
+ ps->flags |= PEER_F_LEARN_ASSIGN;
+ peers->flags |= PEERS_F_RESYNC_ASSIGN;
+ peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
+
+ /* wake up peer handler to handle a request of resync */
+ appctx_wakeup(ps->appctx);
+ }
+ else {
+ int update_to_push = 0;
+
+ /* Awake session if there is data to push */
+ for (st = ps->tables; st ; st = st->next) {
+ if (st->last_pushed != st->table->localupdate) {
+ /* wake up the peer handler to push local updates */
+ update_to_push = 1;
+ /* There is no need to send a heartbeat message
+ * when some updates must be pushed. The remote
+ * peer will consider <ps> peer as alive when it will
+ * receive these updates.
+ */
+ ps->flags &= ~PEER_F_HEARTBEAT;
+ /* Re-schedule another one later. */
+ ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ /* Refresh reconnect if necessary */
+ if (tick_is_expired(ps->reconnect, now_ms))
+ ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
+ /* We are going to send updates, let's ensure we will
+ * come back to send heartbeat messages or to reconnect.
+ */
+ task->expire = tick_first(ps->reconnect, ps->heartbeat);
+ appctx_wakeup(ps->appctx);
+ break;
+ }
}
- else {
- int update_to_push = 0;
-
- /* Awake session if there is data to push */
- for (st = ps->tables; st ; st = st->next) {
- if (st->last_pushed != st->table->localupdate) {
- /* wake up the peer handler to push local updates */
- update_to_push = 1;
- /* There is no need to send a heartbeat message
- * when some updates must be pushed. The remote
- * peer will consider <ps> peer as alive when it will
- * receive these updates.
- */
- ps->flags &= ~PEER_F_HEARTBEAT;
- /* Re-schedule another one later. */
- ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- /* Refresh reconnect if necessary */
- if (tick_is_expired(ps->reconnect, now_ms))
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
- /* We are going to send updates, let's ensure we will
- * come back to send heartbeat messages or to reconnect.
+ /* When there are updates to send we do not reconnect
+ * and do not send heartbeat message either.
+ */
+ if (!update_to_push) {
+ if (tick_is_expired(ps->reconnect, now_ms)) {
+ if (ps->flags & PEER_F_ALIVE) {
+ /* This peer was alive during a 'reconnect' period.
+ * Flag it as not alive again for the next period.
*/
- task->expire = tick_first(ps->reconnect, ps->heartbeat);
- appctx_wakeup(ps->appctx);
- break;
+ ps->flags &= ~PEER_F_ALIVE;
+ ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
}
- }
- /* When there are updates to send we do not reconnect
- * and do not send heartbeat message either.
- */
- if (!update_to_push) {
- if (tick_is_expired(ps->reconnect, now_ms)) {
- if (ps->flags & PEER_F_ALIVE) {
- /* This peer was alive during a 'reconnect' period.
- * Flag it as not alive again for the next period.
- */
- ps->flags &= ~PEER_F_ALIVE;
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
- }
- else {
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- ps->heartbeat = TICK_ETERNITY;
- peer_session_forceshutdown(ps);
- ps->no_hbt++;
- }
+ else {
+ ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ ps->heartbeat = TICK_ETERNITY;
+ peer_session_forceshutdown(ps);
+ ps->no_hbt++;
}
- else if (tick_is_expired(ps->heartbeat, now_ms)) {
- ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- ps->flags |= PEER_F_HEARTBEAT;
- appctx_wakeup(ps->appctx);
- }
- task->expire = tick_first(ps->reconnect, ps->heartbeat);
}
+ else if (tick_is_expired(ps->heartbeat, now_ms)) {
+ ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ ps->flags |= PEER_F_HEARTBEAT;
+ appctx_wakeup(ps->appctx);
+ }
+ task->expire = tick_first(ps->reconnect, ps->heartbeat);
}
- /* else do nothing */
- } /* SUCCESSCODE */
- } /* !ps->peer->local */
- } /* for */
-
- /* Resync from remotes expired: consider resync is finished */
- if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- tick_is_expired(peers->resync_timeout, now_ms)) {
- /* Resync from remote peer needed
- * no peer was assigned for the lesson
- * and resync timeout expire */
-
- /* flag no more resync from remote, consider resync is finished */
- peers->flags |= PEERS_F_RESYNC_REMOTE;
- peers->flags |= PEERS_F_RESYNC_REMOTETIMEOUT;
- }
+ }
+ /* else do nothing */
+ } /* SUCCESSCODE */
+ } /* !ps->peer->local */
+ } /* for */
+
+ /* Resync from remotes expired: consider resync is finished */
+ if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
+ tick_is_expired(peers->resync_timeout, now_ms)) {
+ /* Resync from remote peer needed
+ * no peer was assigned for the lesson
+ * and resync timeout expire */
+
+ /* flag no more resync from remote, consider resync is finished */
+ peers->flags |= PEERS_F_RESYNC_REMOTE;
+ peers->flags |= PEERS_F_RESYNC_REMOTETIMEOUT;
+ }
- if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
- /* Resync not finished*/
- /* reschedule task to resync timeout if not expired, to ended resync if needed */
- if (!tick_is_expired(peers->resync_timeout, now_ms))
- task->expire = tick_first(task->expire, peers->resync_timeout);
- }
- } /* !stopping */
- else {
- /* soft stop case */
- if (state & TASK_WOKEN_SIGNAL) {
- /* We've just received the signal */
- if (!(peers->flags & PEERS_F_DONOTSTOP)) {
- /* add DO NOT STOP flag if not present */
- _HA_ATOMIC_INC(&jobs);
- peers->flags |= PEERS_F_DONOTSTOP;
-
- /* disconnect all connected peers to process a local sync
- * this must be done only the first time we are switching
- * in stopping state
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
+ /* Resync not finished*/
+ /* reschedule task to resync timeout if not expired, to ended resync if needed */
+ if (!tick_is_expired(peers->resync_timeout, now_ms))
+ task->expire = tick_first(task->expire, peers->resync_timeout);
+ }
+
+ /* Release lock for all peers of the section */
+ for (ps = peers->remote; ps; ps = ps->next)
+ HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+}
+
+static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
+{
+ struct peer *ps;
+ struct shared_table *st;
+
+ /* Acquire lock for all peers of the section */
+ for (ps = peers->remote; ps; ps = ps->next)
+ HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
+
+ if (state & TASK_WOKEN_SIGNAL) {
+ /* We've just received the signal */
+ if (!(peers->flags & PEERS_F_DONOTSTOP)) {
+ /* add DO NOT STOP flag if not present */
+ _HA_ATOMIC_INC(&jobs);
+ peers->flags |= PEERS_F_DONOTSTOP;
+
+ /* disconnect all connected peers to process a local sync
+ * this must be done only the first time we are switching
+ * in stopping state
+ */
+ for (ps = peers->remote; ps; ps = ps->next) {
+ /* we're killing a connection, we must apply a random delay before
+ * retrying otherwise the other end will do the same and we can loop
+ * for a while.
*/
- for (ps = peers->remote; ps; ps = ps->next) {
- /* we're killing a connection, we must apply a random delay before
- * retrying otherwise the other end will do the same and we can loop
- * for a while.
- */
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- if (ps->appctx) {
- peer_session_forceshutdown(ps);
- }
+ ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ if (ps->appctx) {
+ peer_session_forceshutdown(ps);
}
-
- /* Set resync timeout for the local peer and request a immediate reconnect */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- peers->local->reconnect = now_ms;
}
+
+ /* Set resync timeout for the local peer and request a immediate reconnect */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ peers->local->reconnect = now_ms;
}
+ }
- ps = peers->local;
- if (ps->flags & PEER_F_TEACH_COMPLETE) {
- if (peers->flags & PEERS_F_DONOTSTOP) {
- /* resync of new process was complete, current process can die now */
- _HA_ATOMIC_DEC(&jobs);
- peers->flags &= ~PEERS_F_DONOTSTOP;
- for (st = ps->tables; st ; st = st->next)
- HA_ATOMIC_DEC(&st->table->refcnt);
- }
+ ps = peers->local;
+ if (ps->flags & PEER_F_TEACH_COMPLETE) {
+ if (peers->flags & PEERS_F_DONOTSTOP) {
+ /* resync of new process was complete, current process can die now */
+ _HA_ATOMIC_DEC(&jobs);
+ peers->flags &= ~PEERS_F_DONOTSTOP;
+ for (st = ps->tables; st ; st = st->next)
+ HA_ATOMIC_DEC(&st->table->refcnt);
}
- else if (!ps->appctx) {
- /* Re-arm resync timeout if necessary */
- if (!tick_isset(peers->resync_timeout))
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
+ else if (!ps->appctx) {
+ /* Re-arm resync timeout if necessary */
+ if (!tick_isset(peers->resync_timeout))
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- /* If there's no active peer connection */
- if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
- !tick_is_expired(peers->resync_timeout, now_ms) &&
- (ps->statuscode == 0 ||
- ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
- ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
- ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
- /* The resync is finished for the local peer and
- * the resync timeout is not expired and
- * connection never tried
- * or previous peer connection was successfully established
- * or previous tcp connect succeeded but init state incomplete
- * or during previous connect, peer replies a try again statuscode */
-
- if (!tick_is_expired(ps->reconnect, now_ms)) {
- /* reconnection timer is not expired. reschedule task for reconnect */
- task->expire = tick_first(task->expire, ps->reconnect);
- }
- else {
- /* connect to the local peer if we must push a local sync */
- if (peers->flags & PEERS_F_DONOTSTOP) {
- peer_session_create(peers, ps);
- }
- }
+ /* If there's no active peer connection */
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
+ !tick_is_expired(peers->resync_timeout, now_ms) &&
+ (ps->statuscode == 0 ||
+ ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
+ ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
+ ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
+ /* The resync is finished for the local peer and
+ * the resync timeout is not expired and
+ * connection never tried
+ * or previous peer connection was successfully established
+ * or previous tcp connect succeeded but init state incomplete
+ * or during previous connect, peer replies a try again statuscode */
+
+ if (!tick_is_expired(ps->reconnect, now_ms)) {
+ /* reconnection timer is not expired. reschedule task for reconnect */
+ task->expire = tick_first(task->expire, ps->reconnect);
}
- else {
- /* Other error cases */
+ else {
+ /* connect to the local peer if we must push a local sync */
if (peers->flags & PEERS_F_DONOTSTOP) {
- /* unable to resync new process, current process can die now */
- _HA_ATOMIC_DEC(&jobs);
- peers->flags &= ~PEERS_F_DONOTSTOP;
- for (st = ps->tables; st ; st = st->next)
- HA_ATOMIC_DEC(&st->table->refcnt);
+ peer_session_create(peers, ps);
}
}
}
- else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
- /* Reset resync timeout during a resync */
- peers->resync_timeout = TICK_ETERNITY;
-
- /* current peer connection is active and established
- * wake up all peer handlers to push remaining local updates */
- for (st = ps->tables; st ; st = st->next) {
- if (st->last_pushed != st->table->localupdate) {
- appctx_wakeup(ps->appctx);
- break;
- }
+ else {
+ /* Other error cases */
+ if (peers->flags & PEERS_F_DONOTSTOP) {
+ /* unable to resync new process, current process can die now */
+ _HA_ATOMIC_DEC(&jobs);
+ peers->flags &= ~PEERS_F_DONOTSTOP;
+ for (st = ps->tables; st ; st = st->next)
+ HA_ATOMIC_DEC(&st->table->refcnt);
}
}
- } /* stopping */
+ }
+ else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
+ /* Reset resync timeout during a resync */
+ peers->resync_timeout = TICK_ETERNITY;
+
+ /* current peer connection is active and established
+ * wake up all peer handlers to push remaining local updates */
+ for (st = ps->tables; st ; st = st->next) {
+ if (st->last_pushed != st->table->localupdate) {
+ appctx_wakeup(ps->appctx);
+ break;
+ }
+ }
+ }
/* Release lock for all peers of the section */
for (ps = peers->remote; ps; ps = ps->next)
HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+}
+
+/*
+ * Task processing function to manage re-connect, peer session
+ * tasks wakeup on local update and heartbeat. Let's keep it exported so that it
+ * resolves in stack traces and "show tasks".
+ */
+struct task *process_peer_sync(struct task * task, void *context, unsigned int state)
+{
+ struct peers *peers = context;
+
+ task->expire = TICK_ETERNITY;
+
+ if (!stopping) {
+ /* Normal case (not soft stop)*/
+ __process_running_peer_sync(task, peers, state);
+
+ }
+ else {
+ /* soft stop case */
+ __process_stopping_peer_sync(task, peers, state);
+ } /* stopping */
+
/* Wakeup for re-connect */
return task;
}