unsigned int localupdate;
unsigned int commitupdate;/* used to identify the latest local updates
pending for sync */
- unsigned int syncing; /* number of sync tasks watching this table now */
+ unsigned int refcnt; /* number of local peer over all peers sections
+ attached to this table */
union {
struct peers *p; /* sync peers */
char *name;
/* add DO NOT STOP flag if not present */
_HA_ATOMIC_INC(&jobs);
peers->flags |= PEERS_F_DONOTSTOP;
- ps = peers->local;
- for (st = ps->tables; st ; st = st->next)
- st->table->syncing++;
/* disconnect all connected peers to process a local sync
* this must be done only the first time we are switching
_HA_ATOMIC_DEC(&jobs);
peers->flags &= ~PEERS_F_DONOTSTOP;
for (st = ps->tables; st ; st = st->next)
- st->table->syncing--;
+ HA_ATOMIC_DEC(&st->table->refcnt);
}
}
else if (!ps->appctx) {
_HA_ATOMIC_DEC(&jobs);
peers->flags &= ~PEERS_F_DONOTSTOP;
for (st = ps->tables; st ; st = st->next)
- st->table->syncing--;
+ HA_ATOMIC_DEC(&st->table->refcnt);
}
}
}
id = curpeer->tables->local_id;
st->local_id = id + 1;
+ /* If peer is local we inc table
+ * refcnt to protect against flush
+ * until this process pushed all
+ * table content to the new one
+ */
+ if (curpeer->local)
+ HA_ATOMIC_INC(&st->table->refcnt);
curpeer->tables = st;
}
st->last_acked, st->last_pushed, st->last_get,
st->teaching_origin, st->update);
chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u"
- " commitupdate=%u syncing=%u",
- t, t->id, t->update, t->localupdate, t->commitupdate, t->syncing);
+ " commitupdate=%u refcnt=%u",
+ t, t->id, t->update, t->localupdate, t->commitupdate, t->refcnt);
if (flags & PEERS_SHOW_F_DICT) {
chunk_appendf(&trash, "\n TX dictionary cache:");
count = 0;
* However we protect tables that are being synced to peers.
*/
if (unlikely(stopping && p->disabled && p->table && p->table->current)) {
- if (!p->table->syncing) {
+
+ if (!p->table->refcnt) {
+ /* !table->refcnt means there
+ * is no more pending full resync
+ * to push to a new process and
+ * we are free to flush the table.
+ */
stktable_trash_oldest(p->table, p->table->current);
pool_gc(NULL);
}