struct stktable *table; /* stick table to sync */
int local_id;
int remote_id;
- int flags;
unsigned int update_id;
- struct stksess *last;
- struct stksess *end;
+ unsigned int bucket;
+ struct {
+ struct stksess *last;
+ struct stksess *end;
+ int flags;
+ } buckets[CONFIG_HAP_TBL_BUCKETS];
+
uint64_t remote_data;
unsigned int remote_data_nbelem[STKTABLE_DATA_TYPES];
unsigned int last_update; /* the counter of the last local update sent */
};
#endif /* _HAPROXY_PEERS_T_H */
-
struct eb32_node in_bucket; /* Each bucket maintains a tree, ordered by expiration date, this does not require sh_lock as only one task will ever modify it */
struct mt_list in_bucket_toadd; /* To add to the bucket tree */
+ struct list updates; /* head of sticky updates sequence list, uses updt_lock */
+ struct mt_list *pend_updts; /* list of updates to be added to the update sequence tree, one per thread-group */
+ /* this lock is heavily used and must be on its own cache line */
+ __decl_thread(HA_RWLOCK_T updt_lock); /* lock protecting the updates part */
__decl_thread(HA_RWLOCK_T sh_lock); /* for the trees above */
int next_exp; /* Next expiration for this table */
} buckets[CONFIG_HAP_TBL_BUCKETS];
unsigned int current; /* number of sticky sessions currently in table */
THREAD_ALIGN(64);
- unsigned int last_update; /* a counter representing the update inserted in the list (will wrap) */
- struct list updates; /* head of sticky updates sequence list, uses updt_lock */
- struct mt_list *pend_updts; /* list of updates to be added to the update sequence tree, one per thread-group */
+ unsigned int last_update; /* a counter representing the update inserted in the list (will wrap) */
struct tasklet *updt_task;/* tasklet responsible for pushing the pending updates into the tree */
- THREAD_ALIGN(64);
- /* this lock is heavily used and must be on its own cache line */
- __decl_thread(HA_RWLOCK_T updt_lock); /* lock protecting the updates part */
-
/* rarely used config stuff below (should not interfere with updt_lock) */
struct proxy *proxies_list; /* The list of proxies which reference this stick-table. */
struct {
: "<NEVER>"));
if (st)
- chunk_appendf(&trace_buf, "st=(.id=%s, .fl=0x%08x) ", st->table->id, st->flags);
+ chunk_appendf(&trace_buf, "st=(.id=%s) ", st->table->id);
if (src->verbosity == PEERS_VERB_MINIMAL)
return;
{
int ret, new_pushed, use_timed;
int updates_sent = 0;
- int locked = 0;
+ uint bucket;
TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
p->last_local_table = st;
}
- if (st->flags & SHTABLE_F_RESET_SYNCHED) {
- if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
- /* just don't engage here if there is any contention */
- applet_have_more_data(appctx);
- ret = -1;
- goto out;
- }
- locked = 1;
- LIST_DEL_INIT(&st->last->upd);
- LIST_INSERT(&st->table->updates, &st->last->upd);
- if (p->flags & PEER_F_TEACH_PROCESS) {
- LIST_DEL_INIT(&st->end->upd);
- LIST_APPEND(&st->table->updates, &st->end->upd);
- }
- st->flags &= ~SHTABLE_F_RESET_SYNCHED;
- }
-
/* We force new pushed to 1 to force identifier in update message */
new_pushed = 1;
- while (1) {
- struct stksess *ts;
+ bucket = st->bucket;
+ do {
+ int locked = 0;
- if (locked == 0 && HA_RWLOCK_TRYWRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
- /* just don't engage here if there is any contention */
- applet_have_more_data(appctx);
- ret = -1;
- break;
+ if (st->buckets[bucket].flags & SHTABLE_F_RESET_SYNCHED) {
+ if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_UPDT_LOCK, &st->table->buckets[bucket].updt_lock) != 0) {
+ /* just don't engage here if there is any contention */
+ applet_have_more_data(appctx);
+ ret = -1;
+ goto out;
+ }
+ locked = 1;
+ LIST_DEL_INIT(&st->buckets[bucket].last->upd);
+ LIST_INSERT(&st->table->buckets[bucket].updates, &st->buckets[bucket].last->upd);
+ if (p->flags & PEER_F_TEACH_PROCESS) {
+ LIST_DEL_INIT(&st->buckets[bucket].end->upd);
+ LIST_APPEND(&st->table->buckets[bucket].updates, &st->buckets[bucket].end->upd);
+ }
+ st->buckets[bucket].flags &= ~SHTABLE_F_RESET_SYNCHED;
}
- locked = 1;
- BUG_ON(!LIST_INLIST(&st->last->upd));
+ while (1) {
+ struct stksess *ts;
- ts = LIST_NEXT(&st->last->upd, typeof(ts), upd);
+ if (locked == 0 && HA_RWLOCK_TRYWRLOCK(STK_TABLE_UPDT_LOCK, &st->table->buckets[bucket].updt_lock) != 0) {
+ /* just don't engage here if there is any contention */
+ applet_have_more_data(appctx);
+ ret = -1;
+ break;
+ }
+ locked = 1;
- if (&ts->upd == &st->table->updates)
- break;
- if (ts == st->end) {
- LIST_DEL_INIT(&st->end->upd);
- break;
- }
+ BUG_ON(!LIST_INLIST(&st->buckets[bucket].last->upd));
- LIST_DEL_INIT(&st->last->upd);
- LIST_INSERT(&ts->upd, &st->last->upd);
+ ts = LIST_NEXT(&st->buckets[bucket].last->upd, typeof(ts), upd);
- if ((!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL) ||
- (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE) ||
- (p->srv->shard && ts->shard != p->srv->shard) ||
- ts->updt_type == STKSESS_UPDT_MARKER) {
- // TODO: INC updates_sent ?
- continue;
- }
+ if (&ts->upd == &st->table->buckets[bucket].updates)
+ break;
+ if (ts == st->buckets[bucket].end) {
+ LIST_DEL_INIT(&st->buckets[bucket].end->upd);
+ break;
+ }
- HA_ATOMIC_INC(&ts->ref_cnt);
+ LIST_DEL_INIT(&st->buckets[bucket].last->upd);
+ LIST_INSERT(&ts->upd, &st->buckets[bucket].last->upd);
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
- locked = 0;
+ if ((!(p->flags & PEER_F_TEACH_PROCESS) && ts->updt_type != STKSESS_UPDT_LOCAL) ||
+ (ts->updt_type != STKSESS_UPDT_LOCAL && ts->updt_type != STKSESS_UPDT_REMOTE) ||
+ (p->srv->shard && ts->shard != p->srv->shard) ||
+ ts->updt_type == STKSESS_UPDT_MARKER) {
+ // TODO: INC updates_sent ?
+ continue;
+ }
- if (!_HA_ATOMIC_LOAD(&ts->seen))
- _HA_ATOMIC_STORE(&ts->seen, 1);
+ HA_ATOMIC_INC(&ts->ref_cnt);
- ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->buckets[bucket].updt_lock);
+ locked = 0;
- HA_ATOMIC_DEC(&ts->ref_cnt);
- if (ret <= 0)
- break;
+ if (!_HA_ATOMIC_LOAD(&ts->seen))
+ _HA_ATOMIC_STORE(&ts->seen, 1);
+
+ ret = peer_send_updatemsg(st, appctx, ts, st->update_id, new_pushed, use_timed);
- p->last.table = st;
- p->last.id = st->update_id;
- st->update_id++;
- p->flags &= ~PEER_F_SYNCHED;
+ HA_ATOMIC_DEC(&ts->ref_cnt);
+ if (ret <= 0)
+ break;
- /* identifier may not needed in next update message */
- new_pushed = 0;
+ p->last.table = st;
+ p->last.id = st->update_id;
+ st->update_id++;
+ p->flags &= ~PEER_F_SYNCHED;
- updates_sent++;
- if (updates_sent >= peers_max_updates_at_once) {
- applet_have_more_data(appctx);
- ret = -1;
- break;
+ /* identifier may not needed in next update message */
+ new_pushed = 0;
+
+ updates_sent++;
+ if (updates_sent >= peers_max_updates_at_once) {
+ applet_have_more_data(appctx);
+ ret = -1;
+ break;
+ }
}
- }
- if (locked) {
+ if (locked)
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->buckets[bucket].updt_lock);
+
+ if (ret <= 0)
+ break;
+
+ bucket++;
+ if (bucket >= CONFIG_HAP_TBL_BUCKETS)
+ bucket = 0;
+ } while (bucket != st->bucket);
+
+ out:
+ if (ret > 0) {
+ TRACE_PROTO("peer up-to-date", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_UPDATE, appctx, NULL, st);
st->last_update = st->table->last_update;
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
+ st->bucket = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
}
- out:
TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
return ret;
}
TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
/* prepare tables for a global push */
- for (st = peer->tables; st; st = st->next)
- st->flags |= SHTABLE_F_RESET_SYNCHED;
+ for (st = peer->tables; st; st = st->next) {
+ int i;
+
+ st->bucket = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
+ for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++)
+ st->buckets[i].flags |= SHTABLE_F_RESET_SYNCHED;
+ }
/* reset teaching flags to 0 */
peer->flags &= ~(PEER_F_SYNCHED|PEER_TEACH_FLAGS);
for (st = peer->tables; st ; st = st->next) {
st->last_get = st->last_acked = 0;
- if (!(peer->flags & PEER_F_SYNCHED) || (peer->flags & PEER_F_TEACH_PROCESS))
- st->flags |= SHTABLE_F_RESET_SYNCHED;
+ if (!(peer->flags & PEER_F_SYNCHED) || (peer->flags & PEER_F_TEACH_PROCESS)) {
+ int i;
+
+ st->bucket = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
+ for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++)
+ st->buckets[i].flags |= SHTABLE_F_RESET_SYNCHED;
+ }
}
/* Awake main task to ack the new peer state */
int retval = 0;
for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
+ int i;
+
st = calloc(1,sizeof(*st));
if (!st) {
retval = 1;
id = curpeer->tables->local_id;
st->local_id = id + 1;
- st->last = calloc(1, sizeof(*st->last));
- st->last->updt_type = STKSESS_UPDT_MARKER;
- LIST_INIT(&st->last->upd);
- LIST_APPEND(&table->updates, &st->last->upd);
+ st->bucket = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
+ for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
+ st->buckets[i].last = calloc(1, sizeof(*st->buckets[i].last));
+ st->buckets[i].last->updt_type = STKSESS_UPDT_MARKER;
+ LIST_INIT(&st->buckets[i].last->upd);
+ LIST_APPEND(&table->buckets[i].updates, &st->buckets[i].last->upd);
- st->end = calloc(1, sizeof(*st->end));
- st->end->updt_type = STKSESS_UPDT_MARKER;
- LIST_INIT(&st->end->upd);
+ st->buckets[i].end = calloc(1, sizeof(*st->buckets[i].end));
+ st->buckets[i].end->updt_type = STKSESS_UPDT_MARKER;
+ LIST_INIT(&st->buckets[i].end->upd);
+ }
/* If peer is local we inc table
* refcnt to protect against flush
dcache = peer->dcache;
chunk_appendf(&trash, "\n %p local_id=%d remote_id=%d "
- "flags=0x%x remote_data=0x%llx",
+ "remote_data=0x%llx",
st, st->local_id, st->remote_id,
- st->flags, (unsigned long long)st->remote_data);
+ (unsigned long long)st->remote_data);
chunk_appendf(&trash, "\n last_acked=%u last_get=%u",
st->last_acked, st->last_get);
chunk_appendf(&trash, "\n table:%p id=%s refcnt=%u",
*/
int __stksess_kill(struct stktable *t, struct stksess *ts)
{
+ uint bucket = 0;
int updt_locked = 0;
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
/* make sure we're no longer in the updates list */
MT_LIST_DELETE(&ts->pend_updts);
-
/* ... and that we didn't leave the update list */
if (LIST_INLIST(&ts->upd)) {
+ bucket = stktable_calc_bucket_num(t, ts->key.key,
+ ((t->type == SMP_T_STR) ? strlen((const char *)ts->key.key) : t->key_size));
updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->buckets[bucket].updt_lock);
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
goto out_unlock;
}
out_unlock:
if (updt_locked)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->buckets[bucket].updt_lock);
return 1;
}
*/
if (!updt_locked) {
updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->buckets[bucket].updt_lock);
}
/* now we're locked, new peers can't grab it anymore,
* existing ones already have the ref_cnt.
}
if (updt_locked)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->buckets[bucket].updt_lock);
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->buckets[bucket].sh_lock);
/* If sync is enabled */
if (t->sync_task) {
+ uint bucket = stktable_calc_bucket_num(t, ts->key.key,
+ ((t->type == SMP_T_STR) ? strlen((const char *)ts->key.key) : t->key_size));
+
if (local) {
/* Check if this entry is not in the tree or not
* scheduled for at least one peer.
*/
if (!LIST_INLIST(&ts->upd) || _HA_ATOMIC_LOAD(&ts->seen)) {
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_LOCAL);
- did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
+ did_append = MT_LIST_TRY_APPEND(&t->buckets[bucket].pend_updts[tgid - 1], &ts->pend_updts);
}
}
else {
if (!LIST_INLIST(&ts->upd)) {
_HA_ATOMIC_STORE(&ts->updt_type, STKSESS_UPDT_REMOTE);
- did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
+ did_append = MT_LIST_TRY_APPEND(&t->buckets[bucket].pend_updts[tgid - 1], &ts->pend_updts);
}
}
struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
{
struct stktable *table = ctx;
- int i = 0, cur_tgid = tgid - 1, empty_tgid = 0;
+ unsigned int bucket;
+ int i = 0, n = 0;
+
+ for (bucket = 0; bucket < CONFIG_HAP_TBL_BUCKETS; bucket++) {
+ int cur_tgid = tgid - 1, empty_tgid = 0;
+
+ /* we really don't want to wait on this one */
+ if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &table->buckets[bucket].updt_lock) != 0)
+ continue;
+
+ while (1) {
+ struct stksess *stksess = NULL;
+
+ if (i >= STKTABLE_MAX_UPDATES_AT_ONCE)
+ break;
+ i++;
+
+ stksess = MT_LIST_POP_LOCKED(&table->buckets[bucket].pend_updts[cur_tgid], typeof(stksess), pend_updts);
+ if (!stksess) {
+ empty_tgid++;
+ cur_tgid++;
+ if (cur_tgid == global.nbtgroups)
+ cur_tgid = 0;
- /* we really don't want to wait on this one */
- if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &table->updt_lock) != 0)
- goto leave;
+ if (empty_tgid == global.nbtgroups)
+ break;
- for (i = 0; i < STKTABLE_MAX_UPDATES_AT_ONCE; i++) {
- struct stksess *stksess = MT_LIST_POP_LOCKED(&table->pend_updts[cur_tgid], typeof(stksess), pend_updts);
+ continue;
+ }
- if (!stksess) {
- empty_tgid++;
cur_tgid++;
+ empty_tgid = 0;
if (cur_tgid == global.nbtgroups)
cur_tgid = 0;
-
- if (empty_tgid == global.nbtgroups)
- break;
- continue;
+ stksess->seen = 0;
+ if (stksess->updt_type == STKSESS_UPDT_LOCAL)
+ table->last_update++;
+ LIST_DEL_INIT(&stksess->upd);
+ LIST_APPEND(&table->buckets[bucket].updates, &stksess->upd);
+ n++;
+ /*
+ * Now that we're done inserting the stksess, unlock it.
+ * It is kept locked here to prevent a race condition
+ * when stksess_kill() could free() it after we removed
+ * it from the list, but before we inserted it into the tree
+ */
+ MT_LIST_INIT(&stksess->pend_updts);
}
- cur_tgid++;
- empty_tgid = 0;
- if (cur_tgid == global.nbtgroups)
- cur_tgid = 0;
- stksess->seen = 0;
- if (stksess->updt_type == STKSESS_UPDT_LOCAL)
- table->last_update++;
- LIST_DEL_INIT(&stksess->upd);
- LIST_APPEND(&table->updates, &stksess->upd);
- /*
- * Now that we're done inserting the stksess, unlock it.
- * It is kept locked here to prevent a race condition
- * when stksess_kill() could free() it after we removed
- * it from the list, but before we inserted it into the tree
- */
- MT_LIST_INIT(&stksess->pend_updts);
- }
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &table->buckets[bucket].updt_lock);
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &table->updt_lock);
+ /* There's more to do, let's schedule another session */
+ if (empty_tgid < global.nbtgroups)
+ tasklet_wakeup(table->updt_task);
-leave:
- /* There's more to do, let's schedule another session */
- if (empty_tgid < global.nbtgroups)
- tasklet_wakeup(table->updt_task);
+ if (i >= STKTABLE_MAX_UPDATES_AT_ONCE)
+ break;
+ }
- if (i > 0) {
+ if (n > 0) {
/* We did at least one update, let's wake the sync task */
task_wakeup(table->sync_task, TASK_WOKEN_MSG);
}
*/
if (!updt_locked) {
updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->buckets[bucket].updt_lock);
}
/* now we're locked, new peers can't grab it anymore,
* existing ones already have the ref_cnt.
}
if (updt_locked)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->buckets[bucket].updt_lock);
/*
* Now find the first element, so that we can reposition
memset(&t->buckets[bucket].exps, 0, sizeof(t->buckets[bucket].exps));
HA_RWLOCK_INIT(&t->buckets[bucket].sh_lock);
MT_LIST_INIT(&t->buckets[bucket].in_bucket_toadd);
+ LIST_INIT(&t->buckets[bucket].updates);
+ HA_RWLOCK_INIT(&t->buckets[bucket].updt_lock);
}
- LIST_INIT(&t->updates);
-
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
if (t->peers.p && t->peers.p->peers_fe && !(t->peers.p->peers_fe->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
t->write_to.t = table;
}
- t->pend_updts = calloc(global.nbtgroups, sizeof(*t->pend_updts));
- if (!t->pend_updts)
- goto mem_error;
- for (i = 0; i < global.nbtgroups; i++)
- MT_LIST_INIT(&t->pend_updts[i]);
+
+ for (bucket = 0; bucket < CONFIG_HAP_TBL_BUCKETS; bucket++) {
+ t->buckets[bucket].pend_updts = calloc(global.nbtgroups, sizeof(*t->buckets[bucket].pend_updts));
+ if (!t->buckets[bucket].pend_updts)
+ goto mem_error;
+ for (i = 0; i < global.nbtgroups; i++)
+ MT_LIST_INIT(&t->buckets[bucket].pend_updts[i]);
+ }
t->updt_task = tasklet_new();
- t->last_update = 0;
if (!t->updt_task)
goto mem_error;
+ t->last_update = 0;
t->updt_task->context = t;
t->updt_task->process = stktable_add_pend_updates;
return 1;
eb32_delete(&t->buckets[i].in_bucket);
MT_LIST_DELETE(&t->buckets[i].in_bucket_toadd);
HA_SPIN_UNLOCK(OTHER_LOCK, &per_bucket[i].lock);
+
+ ha_free(&t->buckets[i].pend_updts);
}
tasklet_free(t->updt_task);
- ha_free(&t->pend_updts);
pool_destroy(t->pool);
}