/* Re-init current table pointers to force announcement on re-connect */
peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL;
peer->appctx = NULL;
- if (peer->flags & PEER_F_LEARN_ASSIGN) {
- /* unassign current peer for learning */
- peer->flags &= ~(PEER_F_LEARN_ASSIGN);
- peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- if (peer->local)
- peers->flags |= PEERS_F_RESYNC_LOCALABORT;
- else
- peers->flags |= PEERS_F_RESYNC_REMOTEABORT;
- /* reschedule a resync */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
- }
- /* reset teaching and learning flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
- peer->flags &= PEER_LEARN_RESET;
+ /* Mark peer as released */
+ peer->flags &= PEER_STATE_RESET;
+ peer->flags |= PEER_F_ST_RELEASED;
+
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
NULL, &msg_head[1], peers->local->id, peer->id);
- if (peer->flags & PEER_F_LEARN_ASSIGN) {
- int commit_a_finish = 1;
-
- peer->flags &= ~PEER_F_LEARN_ASSIGN;
- peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- if (peer->srv->shard) {
- struct peer *ps;
-
- peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
- peer->flags |= PEER_F_LEARN_NOTUP2DATE;
- for (ps = peers->remote; ps; ps = ps->next) {
- if (ps->srv->shard == peer->srv->shard) {
- /* flag all peers from same shard
- * notup2date to disable request
- * of a resync frm them
- */
- ps->flags |= PEER_F_LEARN_NOTUP2DATE;
- }
- else if (ps->srv->shard && !(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
- /* it remains some other shards not requested
- * we don't commit a resync finish to request
- * the other shards
- */
- commit_a_finish = 0;
- }
- }
-
- if (!commit_a_finish) {
- /* it remains some shard to request, we schedule a new request
- */
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
- task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
- }
- }
-
- if (commit_a_finish) {
- peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
- if (peer->local)
- peers->flags |= PEERS_F_RESYNC_LOCALFINISHED;
- else
- peers->flags |= PEERS_F_RESYNC_REMOTEFINISHED;
- }
+ if (peer->flags & PEER_F_LEARN_PROCESS) {
+ peer->flags &= ~PEER_F_LEARN_PROCESS;
+ peer->flags |= PEER_F_LEARN_FINISHED;
+ task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
peer->confirm++;
}
else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
TRACE_PROTO("received control message", PEERS_EV_CTRLMSG,
NULL, &msg_head[1], peers->local->id, peer->id);
- if (peer->flags & PEER_F_LEARN_ASSIGN) {
- peer->flags &= ~PEER_F_LEARN_ASSIGN;
- peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
-
- if (peer->local)
- peers->flags |= PEERS_F_RESYNC_LOCALPARTIAL;
- else
- peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
- peer->flags |= PEER_F_LEARN_NOTUP2DATE;
- peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ if (peer->flags & PEER_F_LEARN_PROCESS) {
+ peer->flags &= ~PEER_F_LEARN_PROCESS;
+ peer->flags |= (PEER_F_LEARN_FINISHED|PEER_F_LEARN_NOTUP2DATE);
task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
}
peer->confirm++;
int repl;
/* Need to request a resync */
- if ((peer->flags & PEER_F_LEARN_ASSIGN) &&
- (peers->flags & PEERS_F_RESYNC_ASSIGN) &&
- !(peers->flags & PEERS_F_RESYNC_PROCESS)) {
-
+ if ((peer->flags & (PEER_F_LEARN_ASSIGN|PEER_F_LEARN_PROCESS|PEER_F_LEARN_FINISHED)) == PEER_F_LEARN_ASSIGN) {
repl = peer_send_resync_reqmsg(appctx, peer, peers);
if (repl <= 0)
return repl;
- peers->flags |= PEERS_F_RESYNC_PROCESS;
+ peer->flags |= PEER_F_LEARN_PROCESS;
}
/* Nothing to read, now we start to write */
/* Init confirm counter */
peer->confirm = 0;
+ peer->flags &= PEER_STATE_RESET;
+ peer->flags |= PEER_F_ST_ACCEPTED;
+
/* Init cursors */
for (st = peer->tables; st ; st = st->next) {
uint commitid, updateid;
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
}
-
- /* reset teaching and learning flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
- peer->flags &= PEER_LEARN_RESET;
-
- /* if current peer is local */
- if (peer->local) {
- /* if current host need resyncfrom local and no process assigned */
- if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* assign local peer for a lesson, consider lesson already requested */
- peer->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
- peers->flags |= PEERS_F_RESYNC_LOCALASSIGN;
- }
-
- }
- else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* assign peer for a lesson */
- peer->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
- }
}
/*
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &st->table->updt_lock);
}
+ /* Awake main task */
+ task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
+
/* Init confirm counter */
peer->confirm = 0;
- /* reset teaching and learning flags to 0 */
- peer->flags &= PEER_TEACH_RESET;
- peer->flags &= PEER_LEARN_RESET;
-
- /* If current peer is local */
- if (peer->local) {
- /* flag to start to teach lesson */
- peer->flags |= PEER_F_TEACH_PROCESS;
- }
- else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
- !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
- /* If peer is remote and resync from remote is needed,
- and no peer currently assigned */
-
- /* assign peer for a lesson */
- peer->flags |= PEER_F_LEARN_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_ASSIGN;
- peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
- }
+ peer->flags &= PEER_STATE_RESET;
+ peer->flags |= PEER_F_ST_CONNECTED;
}
/*
*/
curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
peer_session_forceshutdown(curpeer);
+
+ /* old peer connection was replaced by a new one. */
+ curpeer->flags &= PEER_STATE_RESET;
+ curpeer->flags |= PEER_F_ST_RENEWED;
+
curpeer->heartbeat = TICK_ETERNITY;
curpeer->coll++;
}
static void __process_peer_learn_status(struct peers *peers, struct peer *peer)
{
+ struct peer *ps;
+
+ if (peer->flags & PEER_F_LEARN_PROCESS)
+ peers->flags |= PEERS_F_RESYNC_PROCESS;
+
+ if (!(peer->flags & PEER_F_LEARN_FINISHED))
+ return;
+
+ if (peer->flags & PEER_F_LEARN_NOTUP2DATE) {
+ /* Partial resync */
+ peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALPARTIAL : PEERS_F_RESYNC_REMOTEPARTIAL);
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
+ else {
+ /* Full resync */
+ int commit_a_finish = 1;
+
+ if (peer->srv->shard) {
+ peers->flags |= PEERS_F_RESYNC_REMOTEPARTIAL;
+ peer->flags |= PEER_F_LEARN_NOTUP2DATE;
+ for (ps = peers->remote; ps; ps = ps->next) {
+ if (ps->srv->shard && ps != peer) {
+ if (ps->srv->shard == peer->srv->shard) {
+ /* flag all peers from same shard
+ * notup2date to disable request
+ * of a resync frm them
+ */
+ ps->flags |= PEER_F_LEARN_NOTUP2DATE;
+ }
+ else if (!(ps->flags & PEER_F_LEARN_NOTUP2DATE)) {
+ /* it remains some other shards not requested
+ * we don't commit a resync finish to request
+ * the other shards
+ */
+ commit_a_finish = 0;
+ }
+ }
+ }
+
+ if (!commit_a_finish) {
+ /* it remains some shard to request, we schedule a new request */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
+ }
+ }
+
+ if (commit_a_finish) {
+ peers->flags |= (PEERS_F_RESYNC_LOCAL|PEERS_F_RESYNC_REMOTE);
+ peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALFINISHED : PEERS_F_RESYNC_REMOTEFINISHED);
+ }
+ }
+ peer->flags &= ~(PEER_F_LEARN_ASSIGN|PEER_F_LEARN_PROCESS|PEER_F_LEARN_FINISHED);
+ peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
}
static void __process_peer_state(struct peers *peers, struct peer *peer)
{
if (peer->flags & PEER_F_RESYNC_REQUESTED)
peers->flags |= PEERS_F_RESYNC_REQUESTED;
+
+ /* Check peer state. Order is important */
+ if (peer->flags & (PEER_F_ST_RELEASED|PEER_F_ST_RENEWED)) {
+ if (peer->flags & PEER_F_LEARN_ASSIGN) {
+ /* unassign current peer for learning */
+ peers->flags &= ~(PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+ peers->flags |= (peer->local ? PEERS_F_RESYNC_LOCALABORT : PEERS_F_RESYNC_REMOTEABORT);
+ /* reschedule a resync */
+ peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
+ }
+ peer->flags &= PEER_TEACH_RESET;
+ peer->flags &= PEER_LEARN_RESET;
+ }
+ if (peer->flags & (PEER_F_ST_ACCEPTED|PEER_F_ST_RENEWED)) {
+ peer->flags &= PEER_TEACH_RESET;
+ peer->flags &= PEER_LEARN_RESET;
+
+ /* if current peer is local */
+ if (peer->local) {
+ /* if current host need resyncfrom local and no process assigned */
+ if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* assign local peer for a lesson, consider lesson already requested */
+ peer->flags |= (PEER_F_LEARN_ASSIGN|PEERS_F_RESYNC_PROCESS);
+ peers->flags |= (PEERS_F_RESYNC_ASSIGN|PEERS_F_RESYNC_PROCESS);
+ peers->flags |= PEERS_F_RESYNC_LOCALASSIGN;
+ }
+ }
+ else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* assign peer for a lesson */
+ peer->flags |= PEER_F_LEARN_ASSIGN;
+ peers->flags |= PEERS_F_RESYNC_ASSIGN;
+ peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
+ }
+ }
+ if (peer->flags & PEER_F_ST_CONNECTED) {
+ peer->flags &= PEER_TEACH_RESET;
+ peer->flags &= PEER_LEARN_RESET;
+
+ /* If current peer is local */
+ if (peer->local) {
+ /* flag to start to teach lesson */
+ peer->flags |= PEER_F_TEACH_PROCESS;
+ }
+ else if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
+ !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
+ /* If peer is remote and resync from remote is needed,
+ and no peer currently assigned */
+
+ /* assign peer for a lesson */
+ peer->flags |= PEER_F_LEARN_ASSIGN;
+ peers->flags |= PEERS_F_RESYNC_ASSIGN;
+ peers->flags |= PEERS_F_RESYNC_REMOTEASSIGN;
+ }
+ }
+ peer->flags &= PEER_STATE_RESET;
}
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)