if (HA_ATOMIC_LOAD(&ts->ref_cnt))
return 0;
- if (ts->upd.node.leaf_p) {
+ if (ts->upd.node.leaf_p || !MT_LIST_ISEMPTY(&ts->pend_updts)) {
updt_locked = 1;
HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
if (HA_ATOMIC_LOAD(&ts->ref_cnt))
goto out_unlock;
}
+ MT_LIST_DELETE(&ts->pend_updts);
eb32_delete(&ts->exp);
eb32_delete(&ts->upd);
ebmb_delete(&ts->key);
ts->key.node.leaf_p = NULL;
ts->exp.node.leaf_p = NULL;
ts->upd.node.leaf_p = NULL;
+ MT_LIST_INIT(&ts->pend_updts);
ts->expire = tick_add(now_ms, MS_TO_TICKS(t->expire));
HA_RWLOCK_INIT(&ts->lock);
return ts;
* with that lock held, will grab a ref_cnt before releasing the
* lock. So we must take this lock as well and check the ref_cnt.
*/
- if (ts->upd.node.leaf_p) {
- if (!updt_locked) {
- updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
- }
- /* now we're locked, new peers can't grab it anymore,
- * existing ones already have the ref_cnt.
- */
- if (HA_ATOMIC_LOAD(&ts->ref_cnt))
- continue;
+ if (!updt_locked) {
+ updt_locked = 1;
+ HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
}
+ /* now we're locked, new peers can't grab it anymore,
+ * existing ones already have the ref_cnt.
+ */
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt))
+ continue;
/* session expired, trash it */
ebmb_delete(&ts->key);
+ MT_LIST_DELETE(&ts->pend_updts);
eb32_delete(&ts->upd);
__stksess_free(t, ts);
batched++;
*/
void stktable_touch_with_exp(struct stktable *t, struct stksess *ts, int local, int expire, int decrefcnt)
{
- struct eb32_node * eb;
- int use_wrlock = 0;
- int do_wakeup = 0;
+ int did_append = 0;
if (expire != HA_ATOMIC_LOAD(&ts->expire)) {
/* we'll need to set the expiration and to wake up the expiration timer .*/
* scheduled for at least one peer.
*/
if (!ts->upd.node.leaf_p || _HA_ATOMIC_LOAD(&ts->seen)) {
- /* Time to upgrade the read lock to write lock */
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
- use_wrlock = 1;
-
- /* here we're write-locked */
-
- ts->seen = 0;
- ts->upd.key = ++t->update;
- t->localupdate = t->update;
- eb32_delete(&ts->upd);
- eb = eb32_insert(&t->updates, &ts->upd);
- if (eb != &ts->upd) {
- eb32_delete(eb);
- eb32_insert(&t->updates, &ts->upd);
- }
+ _HA_ATOMIC_STORE(&ts->updt_is_local, 1);
+ did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
}
- do_wakeup = 1;
}
else {
- /* Note: we land here when learning new entries from
- * remote peers. We hold one ref_cnt so the entry
- * cannot vanish under us, however if two peers create
- * the same key at the exact same time, we must be
- * careful not to perform two parallel inserts! Hence
- * we need to first check leaf_p to know if the entry
- * is new, then lock the tree and check the entry again
- * (since another thread could have created it in the
- * mean time).
- */
if (!ts->upd.node.leaf_p) {
- /* Time to upgrade the read lock to write lock if needed */
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
- use_wrlock = 1;
-
- /* here we're write-locked */
- if (!ts->upd.node.leaf_p) {
- ts->seen = 0;
- ts->upd.key= (++t->update)+(2147483648U);
- eb = eb32_insert(&t->updates, &ts->upd);
- if (eb != &ts->upd) {
- eb32_delete(eb);
- eb32_insert(&t->updates, &ts->upd);
- }
- }
+ _HA_ATOMIC_STORE(&ts->updt_is_local, 0);
+ did_append = MT_LIST_TRY_APPEND(&t->pend_updts[tgid - 1], &ts->pend_updts);
}
}
- /* drop the lock now */
- if (use_wrlock)
- HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
}
+ if (did_append)
+ tasklet_wakeup(t->updt_task);
+
if (decrefcnt)
HA_ATOMIC_DEC(&ts->ref_cnt);
-
- if (do_wakeup)
- task_wakeup(t->sync_task, TASK_WOKEN_MSG);
}
/* Update the expiration timer for <ts> but do not touch its expiration node.
return ts;
}
+static struct task *stktable_add_pend_updates(struct task *t, void *ctx, unsigned int state)
+{
+ struct stktable *table = ctx;
+ struct eb32_node *eb;
+ int i, is_local, cur_tgid = tgid - 1, empty_tgid = 0;
+
+ HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &table->updt_lock);
+ for (i = 0; i < STKTABLE_MAX_UPDATES_AT_ONCE; i++) {
+ struct stksess *stksess = MT_LIST_POP(&table->pend_updts[cur_tgid], typeof(stksess), pend_updts);
+
+ if (!stksess) {
+ empty_tgid++;
+ cur_tgid++;
+ if (cur_tgid == global.nbtgroups)
+ cur_tgid = 0;
+
+ if (empty_tgid == global.nbtgroups)
+ break;
+ continue;
+ }
+ cur_tgid++;
+ empty_tgid = 0;
+ if (cur_tgid == global.nbtgroups)
+ cur_tgid = 0;
+ is_local = stksess->updt_is_local;
+ stksess->seen = 0;
+ if (is_local) {
+ stksess->upd.key = ++table->update;
+ table->localupdate = table->update;
+ eb32_delete(&stksess->upd);
+ } else {
+ stksess->upd.key = (++table->update) + (2147483648U);
+ }
+ eb = eb32_insert(&table->updates, &stksess->upd);
+ if (eb != &stksess->upd) {
+ BUG_ON(1);
+ eb32_delete(eb);
+ eb32_insert(&table->updates, &stksess->upd);
+ }
+ }
+
+ HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &table->updt_lock);
+
+ /* There's more to do, let's schedule another session */
+ if (empty_tgid < global.nbtgroups)
+ tasklet_wakeup(table->updt_task);
+
+ if (i > 0) {
+ /* We did at least one update, let's wake the sync task */
+ task_wakeup(table->sync_task, TASK_WOKEN_MSG);
+ }
+ return t;
+}
+
/* Lookup for an entry with the same key and store the submitted
* stksess if not found. This function locks the table either shared or
* exclusively, and the refcount of the entry is increased.
* with that lock held, will grab a ref_cnt before releasing the
* lock. So we must take this lock as well and check the ref_cnt.
*/
- if (ts->upd.node.leaf_p) {
- if (!updt_locked) {
- updt_locked = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
- }
- /* now we're locked, new peers can't grab it anymore,
- * existing ones already have the ref_cnt.
- */
- if (HA_ATOMIC_LOAD(&ts->ref_cnt))
- continue;
+ if (!updt_locked) {
+ updt_locked = 1;
+ HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
}
+ /* now we're locked, new peers can't grab it anymore,
+ * existing ones already have the ref_cnt.
+ */
+ if (HA_ATOMIC_LOAD(&ts->ref_cnt))
+ continue;
/* session expired, trash it */
ebmb_delete(&ts->key);
+ MT_LIST_DELETE(&ts->pend_updts);
eb32_delete(&ts->upd);
__stksess_free(t, ts);
}
{
int peers_retval = 0;
int shard;
+ int i;
t->hash_seed = XXH64(t->id, t->idlen, 0);
t->write_to.t = table;
}
+ t->pend_updts = calloc(global.nbtgroups, sizeof(*t->pend_updts));
+ if (!t->pend_updts)
+ goto mem_error;
+ for (i = 0; i < global.nbtgroups; i++)
+ MT_LIST_INIT(&t->pend_updts[i]);
+ t->updt_task = tasklet_new();
+ if (!t->updt_task)
+ goto mem_error;
+ t->updt_task->context = t;
+ t->updt_task->process = stktable_add_pend_updates;
return 1;
mem_error:
if (!t)
return;
task_destroy(t->exp_task);
+ tasklet_free(t->updt_task);
+ ha_free(&t->pend_updts);
pool_destroy(t->pool);
}