static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
{
- struct peer *ps;
unsigned int flags = 0;
if (peer->learnstate != PEER_LR_ST_FINISHED)
}
else {
/* Full resync */
+ struct peer *rem_peer;
int commit_a_finish = 1;
if (peer->srv->shard) {
flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL;
peer->flags |= PEER_F_LEARN_NOTUP2DATE;
- for (ps = peers->remote; ps; ps = ps->next) {
- if (ps->srv->shard && ps != peer) {
- HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
- if (ps->srv->shard == peer->srv->shard) {
+ for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) {
+ if (rem_peer->srv->shard && rem_peer != peer) {
+ HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock);
+ if (rem_peer->srv->shard == peer->srv->shard) {
/* flag all peers from same shard
* notup2date to disable request
* of a resync frm them
*/
- ps->flags |= PEER_F_LEARN_NOTUP2DATE;
+ rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE;
}
- else if (!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
+ else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
/* it remains some other shards not requested
* we don't commit a resync finish to request
* the other shards
*/
commit_a_finish = 0;
}
- HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+ HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock);
}
}
/* Process the sync task for a running process. It is called from process_peer_sync() only */
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
{
- struct peer *ps;
+ struct peer *peer;
struct shared_table *st;
/* resync timeout set to TICK_ETERNITY means we just start
}
/* For each session */
- for (ps = peers->remote; ps; ps = ps->next) {
- HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
+ for (peer = peers->remote; peer; peer = peer->next) {
+ HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
- sync_peer_learn_state(peers, ps);
- sync_peer_app_state(peers, ps);
+ sync_peer_learn_state(peers, peer);
+ sync_peer_app_state(peers, peer);
/* Peer changes, if any, were now ack by the sync task. Unblock
* the peer (any wakeup should already be performed, no need to
* do it here)
*/
- ps->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
+ peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
/* For each remote peers */
- if (!ps->local) {
- if (!ps->appctx) {
+ if (!peer->local) {
+ if (!peer->appctx) {
/* no active peer connection */
- if (ps->statuscode == 0 ||
- ((ps->statuscode == PEER_SESS_SC_CONNECTCODE ||
- ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
- ps->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
- tick_is_expired(ps->reconnect, now_ms))) {
+ if (peer->statuscode == 0 ||
+ ((peer->statuscode == PEER_SESS_SC_CONNECTCODE ||
+ peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
+ peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
+ tick_is_expired(peer->reconnect, now_ms))) {
/* connection never tried
* or previous peer connection established with success
* or previous peer connection failed while connecting
* and reconnection timer is expired */
/* retry a connect */
- ps->appctx = peer_session_create(peers, ps);
+ peer->appctx = peer_session_create(peers, peer);
}
- else if (!tick_is_expired(ps->reconnect, now_ms)) {
+ else if (!tick_is_expired(peer->reconnect, now_ms)) {
/* If previous session failed during connection
* but reconnection timer is not expired */
/* reschedule task for reconnect */
- task->expire = tick_first(task->expire, ps->reconnect);
+ task->expire = tick_first(task->expire, peer->reconnect);
}
/* else do nothing */
- } /* !ps->appctx */
- else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE) {
+ } /* !peer->appctx */
+ else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
/* current peer connection is active and established */
if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
!(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
+ !(peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
/* Resync from a remote is needed
* and no peer was assigned for lesson
* and current peer may be up2date */
/* assign peer for the lesson */
- ps->learnstate = PEER_LR_ST_ASSIGNED;
+ peer->learnstate = PEER_LR_ST_ASSIGNED;
HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
/* wake up peer handler to handle a request of resync */
- appctx_wakeup(ps->appctx);
+ appctx_wakeup(peer->appctx);
}
else {
int update_to_push = 0;
/* Awake session if there is data to push */
- for (st = ps->tables; st ; st = st->next) {
+ for (st = peer->tables; st ; st = st->next) {
if (st->last_pushed != st->table->localupdate) {
/* wake up the peer handler to push local updates */
update_to_push = 1;
/* There is no need to send a heartbeat message
* when some updates must be pushed. The remote
- * peer will consider <ps> peer as alive when it will
+ * peer will consider <peer> peer as alive when it will
* receive these updates.
*/
- ps->flags &= ~PEER_F_HEARTBEAT;
+ peer->flags &= ~PEER_F_HEARTBEAT;
/* Re-schedule another one later. */
- ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
/* Refresh reconnect if necessary */
- if (tick_is_expired(ps->reconnect, now_ms))
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
+ if (tick_is_expired(peer->reconnect, now_ms))
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
/* We are going to send updates, let's ensure we will
* come back to send heartbeat messages or to reconnect.
*/
- task->expire = tick_first(ps->reconnect, ps->heartbeat);
- appctx_wakeup(ps->appctx);
+ task->expire = tick_first(peer->reconnect, peer->heartbeat);
+ appctx_wakeup(peer->appctx);
break;
}
}
* and do not send heartbeat message either.
*/
if (!update_to_push) {
- if (tick_is_expired(ps->reconnect, now_ms)) {
- if (ps->flags & PEER_F_ALIVE) {
+ if (tick_is_expired(peer->reconnect, now_ms)) {
+ if (peer->flags & PEER_F_ALIVE) {
/* This peer was alive during a 'reconnect' period.
* Flag it as not alive again for the next period.
*/
- ps->flags &= ~PEER_F_ALIVE;
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
+ peer->flags &= ~PEER_F_ALIVE;
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
}
else {
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- ps->heartbeat = TICK_ETERNITY;
- peer_session_forceshutdown(ps);
- sync_peer_app_state(peers, ps);
- ps->no_hbt++;
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ peer->heartbeat = TICK_ETERNITY;
+ peer_session_forceshutdown(peer);
+ sync_peer_app_state(peers, peer);
+ peer->no_hbt++;
}
}
- else if (tick_is_expired(ps->heartbeat, now_ms)) {
- ps->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
- ps->flags |= PEER_F_HEARTBEAT;
- appctx_wakeup(ps->appctx);
+ else if (tick_is_expired(peer->heartbeat, now_ms)) {
+ peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
+ peer->flags |= PEER_F_HEARTBEAT;
+ appctx_wakeup(peer->appctx);
}
- task->expire = tick_first(ps->reconnect, ps->heartbeat);
+ task->expire = tick_first(peer->reconnect, peer->heartbeat);
}
}
/* else do nothing */
} /* SUCCESSCODE */
- } /* !ps->peer->local */
+ } /* !peer->peer->local */
- HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+ HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
} /* for */
/* Resync from remotes expired: consider resync is finished */
/* Process the sync task for a stopping process. It is called from process_peer_sync() only */
static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
{
- struct peer *ps;
+ struct peer *peer;
struct shared_table *st;
static int dont_stop = 0;
/* For each peer */
- for (ps = peers->remote; ps; ps = ps->next) {
- HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
+ for (peer = peers->remote; peer; peer = peer->next) {
+ HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
- sync_peer_learn_state(peers, ps);
- sync_peer_app_state(peers, ps);
+ sync_peer_learn_state(peers, peer);
+ sync_peer_app_state(peers, peer);
/* Peer changes, if any, were now ack by the sync task. Unblock
* the peer (any wakeup should already be performed, no need to
* do it here)
*/
- ps->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
+ peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) {
/* we're killing a connection, we must apply a random delay before
* retrying otherwise the other end will do the same and we can loop
* for a while.
*/
- ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
- if (ps->appctx) {
- peer_session_forceshutdown(ps);
- sync_peer_app_state(peers, ps);
+ peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
+ if (peer->appctx) {
+ peer_session_forceshutdown(peer);
+ sync_peer_app_state(peers, peer);
}
}
- HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+ HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
}
/* We've just received the signal */
}
}
- ps = peers->local;
- HA_SPIN_LOCK(PEER_LOCK, &ps->lock);
- if (ps->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
+ peer = peers->local;
+ HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
+ if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
if (dont_stop) {
/* resync of new process was complete, current process can die now */
_HA_ATOMIC_DEC(&jobs);
dont_stop = 0;
- for (st = ps->tables; st ; st = st->next)
+ for (st = peer->tables; st ; st = st->next)
HA_ATOMIC_DEC(&st->table->refcnt);
}
}
- else if (!ps->appctx) {
+ else if (!peer->appctx) {
/* Re-arm resync timeout if necessary */
if (!tick_isset(peers->resync_timeout))
peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
/* If there's no active peer connection */
if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
!tick_is_expired(peers->resync_timeout, now_ms) &&
- (ps->statuscode == 0 ||
- ps->statuscode == PEER_SESS_SC_SUCCESSCODE ||
- ps->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
- ps->statuscode == PEER_SESS_SC_TRYAGAIN)) {
+ (peer->statuscode == 0 ||
+ peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
+ peer->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
+ peer->statuscode == PEER_SESS_SC_TRYAGAIN)) {
/* The resync is finished for the local peer and
* the resync timeout is not expired and
* connection never tried
* or previous tcp connect succeeded but init state incomplete
* or during previous connect, peer replies a try again statuscode */
- if (!tick_is_expired(ps->reconnect, now_ms)) {
+ if (!tick_is_expired(peer->reconnect, now_ms)) {
/* reconnection timer is not expired. reschedule task for reconnect */
- task->expire = tick_first(task->expire, ps->reconnect);
+ task->expire = tick_first(task->expire, peer->reconnect);
}
else {
/* connect to the local peer if we must push a local sync */
if (dont_stop) {
- peer_session_create(peers, ps);
+ peer_session_create(peers, peer);
}
}
}
/* unable to resync new process, current process can die now */
_HA_ATOMIC_DEC(&jobs);
dont_stop = 0;
- for (st = ps->tables; st ; st = st->next)
+ for (st = peer->tables; st ; st = st->next)
HA_ATOMIC_DEC(&st->table->refcnt);
}
}
}
- else if (ps->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
+ else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
/* Reset resync timeout during a resync */
peers->resync_timeout = TICK_ETERNITY;
/* current peer connection is active and established
* wake up all peer handlers to push remaining local updates */
- for (st = ps->tables; st ; st = st->next) {
+ for (st = peer->tables; st ; st = st->next) {
if (st->last_pushed != st->table->localupdate) {
- appctx_wakeup(ps->appctx);
+ appctx_wakeup(peer->appctx);
break;
}
}
}
- HA_SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+ HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
}
/*