struct stktable *stktables_list;
struct ceb_root *stktable_by_name = NULL;
+static struct stk_per_bucket per_bucket[CONFIG_HAP_TBL_BUCKETS];
+
#define round_ptr_size(i) (((i) + (sizeof(void *) - 1)) &~ (sizeof(void *) - 1))
/* This function inserts stktable <t> into the tree of known stick-table.
{
int old_exp, new_exp;
int expire = ts->expire;
+ int bucket;
+ int len;
if (!t->expire)
return;
+ if (t->type == SMP_T_STR)
+ len = strlen((const char *)ts->key.key);
+ else
+ len = t->key_size;
+
+ bucket = stktable_calc_shard_num(t, ts->key.key, len);
/* set the task's expire to the newest expiration date. */
- old_exp = HA_ATOMIC_LOAD(&t->exp_task->expire);
+ old_exp = HA_ATOMIC_LOAD(&t->shards[bucket].next_exp);
new_exp = tick_first(expire, old_exp);
/* let's not go further if we're already up to date. We have
* to make sure the compared date doesn't change under us.
*/
if (new_exp == old_exp &&
- HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp))
+ HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) {
return;
+ }
HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock);
- while (!HA_ATOMIC_CAS(&t->exp_task->expire, &old_exp, new_exp)) {
+ while (!HA_ATOMIC_CAS(&t->shards[bucket].next_exp, &old_exp, new_exp)) {
if (new_exp == old_exp)
break;
__ha_cpu_relax();
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock);
+ if (t->type == SMP_T_STR)
+ len = strlen((const char *)ts->key.key);
+ else
+ len = t->key_size;
+
+
/* the timer was advanced, only the task can update it */
- if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp))
- task_wakeup(t->exp_task, TASK_WOKEN_OTHER);
+ if (!tick_isset(old_exp) || tick_is_lt(new_exp, old_exp)) {
+ int ret;
+
+ ret = MT_LIST_TRY_APPEND(&per_bucket[bucket].toadd_tables, &t->shards[bucket].in_bucket_toadd);
+ if (ret)
+ task_wakeup(per_bucket[bucket].exp_task, TASK_WOKEN_OTHER);
+ }
}
/* Returns a valid or initialized stksess for the specified stktable_key in the
* Task processing function to trash expired sticky sessions. A pointer to the
* task itself is returned since it never dies.
*/
-struct task *process_table_expire(struct task *task, void *context, unsigned int state)
+struct task *process_tables_expire(struct task *task, void *context, unsigned int state)
{
- struct stktable *t = context;
+ struct stk_per_bucket *ps = context;
+ struct stktable *t;
struct stksess *ts;
- struct eb32_node *eb;
+ struct eb32_node *table_eb, *eb;
int updt_locked;
- int to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
- int looped;
- int exp_next;
+ int to_visit;
int task_exp;
- int shard, init_shard;
- int failed_once = 0;
- int purged = 0;
+ int shard;
task_exp = TICK_ETERNITY;
- /* start from a random shard number to avoid starvation in the last ones */
- shard = init_shard = statistical_prng_range(CONFIG_HAP_TBL_BUCKETS - 1);
- do {
+ shard = (ps - &per_bucket[0]);
+
+ to_visit = STKTABLE_MAX_UPDATES_AT_ONCE;
+
+ /*
+ * First put all the tables to be added from the list to the tree
+ */
+ while ((t = MT_LIST_POP(&ps->toadd_tables, struct stktable *, shards[shard].in_bucket_toadd)) != NULL) {
+ int next_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp);
+ /*
+ * We're already in the tree
+ */
+ if (tick_isset(t->shards[shard].in_bucket.key) &&
+ tick_is_lt(t->shards[shard].in_bucket.key, next_exp))
+ continue;
+
+ eb32_delete(&t->shards[shard].in_bucket);
+ t->shards[shard].in_bucket.key = next_exp;
+ eb32_insert(&ps->tables, &t->shards[shard].in_bucket);
+ }
+ table_eb = eb32_first(&ps->tables);
+
+ while (table_eb) {
+ struct eb32_node *tmpnode;
+ unsigned int next_exp_table = TICK_ETERNITY;
+
+ t = eb32_entry(table_eb, struct stktable, shards[shard].in_bucket);
updt_locked = 0;
- looped = 0;
- if (HA_RWLOCK_TRYWRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock) != 0) {
- if (purged || failed_once) {
- /* already purged or second failed lock, yield and come back later */
- to_visit = 0;
- break;
- }
- /* make sure we succeed at least once */
- failed_once = 1;
- HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
+ if (tick_is_lt(now_ms, table_eb->key)) {
+ /*
+ * Next expiration in the future, we can give up
+ */
+ if (!tick_isset(task_exp) || tick_is_lt(table_eb->key, task_exp))
+ task_exp = table_eb->key;
+ break;
}
+ HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK);
* half. Let's loop back to the beginning of the tree now if we
* have not yet visited it.
*/
- if (looped)
- break;
- looped = 1;
eb = eb32_first(&t->shards[shard].exps);
if (likely(!eb))
break;
if (likely(tick_is_lt(now_ms, eb->key))) {
/* timer not expired yet, revisit it later */
- exp_next = eb->key;
- goto out_unlock;
+ if (!tick_isset(task_exp) || tick_is_lt(eb->key, task_exp))
+ task_exp = eb->key;
+ break;
}
/* Let's quit earlier if we currently hold the update lock */
MT_LIST_DELETE(&ts->pend_updts);
eb32_delete(&ts->upd);
__stksess_free(t, ts);
- purged++;
}
- /* We have found no task to expire in any tree */
- exp_next = TICK_ETERNITY;
-
- out_unlock:
if (updt_locked)
HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &t->updt_lock);
- task_exp = tick_first(task_exp, exp_next);
+ /*
+ * Now find the first element, so that we can reposition
+ * the table in the shard tree.
+ */
+ eb = eb32_lookup_ge(&t->shards[shard].exps, now_ms - TIMER_LOOK_BACK);
+ if (!eb)
+ eb = eb32_first(&t->shards[shard].exps);
+
+ if (eb)
+ next_exp_table = eb->key;
+ else
+ next_exp_table = TICK_ETERNITY;
+
+ if (!tick_isset(task_exp) || (tick_isset(next_exp_table) && tick_is_lt(next_exp_table, task_exp)))
+ task_exp = next_exp_table;
HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->shards[shard].sh_lock);
+ tmpnode = eb32_next(table_eb);
- shard++;
- if (shard >= CONFIG_HAP_TBL_BUCKETS)
- shard = 0;
- } while (to_visit > 0 && shard != init_shard);
+ if (table_eb->key != next_exp_table) {
+ int old_exp;
+ /*
+ * We have to move the entry in the tree
+ */
+ old_exp = HA_ATOMIC_LOAD(&t->shards[shard].next_exp);
+ if (old_exp >= table_eb->key) {
+ HA_ATOMIC_CAS(&t->shards[shard].next_exp, &old_exp, next_exp_table);
+ }
- if (to_visit <= 0) {
+ eb32_delete(table_eb);
+ table_eb->key = TICK_ETERNITY;
+ /*
+ * If there's more entry, just put it back into the list,
+ * it'll go back into the tree the next time the task runs.
+ */
+ if (next_exp_table != TICK_ETERNITY)
+ MT_LIST_TRY_APPEND(&per_bucket[shard].toadd_tables, &t->shards[shard].in_bucket_toadd);
+ }
+ table_eb = tmpnode;
+ }
+
+ if (tick_is_le(task_exp, now_ms)) {
+ /*
+ * More to do, we should wake up immediately.
+ */
task_wakeup(task, TASK_WOKEN_OTHER);
} else {
- /* Reset the task's expiration. We do this under the lock so as not
- * to ruin a call to task_queue() in stktable_requeue_exp() if we
- * were to update with TICK_ETERNITY.
+ /* Reset the task's expiration.
*/
- HA_RWLOCK_WRLOCK(STK_TABLE_LOCK, &t->lock);
task->expire = task_exp;
- HA_RWLOCK_WRUNLOCK(STK_TABLE_LOCK, &t->lock);
}
return task;
*/
int stktable_init(struct stktable *t, char **err_msg)
{
- static int operating_thread = 0;
int peers_retval = 0;
int shard;
int i;
t->shards[shard].keys = EB_ROOT_UNIQUE;
memset(&t->shards[shard].exps, 0, sizeof(t->shards[shard].exps));
HA_RWLOCK_INIT(&t->shards[shard].sh_lock);
+ MT_LIST_INIT(&t->shards[shard].in_bucket_toadd);
}
t->updates = EB_ROOT_UNIQUE;
t->pool = create_pool("sticktables", sizeof(struct stksess) + round_ptr_size(t->data_size) + t->key_size, MEM_F_SHARED);
- if ( t->expire ) {
- t->exp_task = task_new_on(operating_thread);
- if (!t->exp_task)
- goto mem_error;
- operating_thread = (operating_thread + 1) % global.nbthread;
-
- t->exp_task->process = process_table_expire;
- t->exp_task->context = (void *)t;
- }
if (t->peers.p && t->peers.p->peers_fe && !(t->peers.p->peers_fe->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
peers_retval = peers_register_table(t->peers.p, t);
}
*/
void stktable_deinit(struct stktable *t)
{
+ int i;
+
if (!t)
return;
- task_destroy(t->exp_task);
+ for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
+ HA_SPIN_LOCK(OTHER_LOCK, &per_bucket[i].lock);
+ eb32_delete(&t->shards[i].in_bucket);
+ MT_LIST_DELETE(&t->shards[i].in_bucket_toadd);
+ HA_SPIN_UNLOCK(OTHER_LOCK, &per_bucket[i].lock);
+ }
tasklet_free(t->updt_task);
ha_free(&t->pend_updts);
pool_destroy(t->pool);
static void stkt_late_init(void)
{
struct sample_fetch *f;
+ int i;
f = find_sample_fetch("src", strlen("src"));
if (f)
smp_fetch_src = f->process;
stkt_create_stk_ctr_pool();
+
+ for (i = 0; i < CONFIG_HAP_TBL_BUCKETS; i++) {
+ MT_LIST_INIT(&per_bucket[i].toadd_tables);
+ per_bucket[i].tables = EB_ROOT;
+ per_bucket[i].exp_task = task_new_on(i % global.nbthread);
+ if (per_bucket[i].exp_task == NULL) {
+ ha_alert("Failed to allocate per-shard task!\n");
+ exit(1);
+ }
+ per_bucket[i].exp_task->process = process_tables_expire;
+ per_bucket[i].exp_task->context = &per_bucket[i];
+ HA_SPIN_INIT(&per_bucket[i].lock);
+ }
}
INITCALL0(STG_INIT_2, stkt_late_init);