#if defined(USE_THREAD)
+ if (!(global.tune.options & GTUNE_LISTENER_MQ_ANY) || stopping)
+ goto local_accept;
+
+ /* we want to perform thread rebalancing if the listener is
+ * bound to more than one thread or if it's part of a shard
+ * with more than one listener.
+ */
mask = l->rx.bind_thread & _HA_ATOMIC_LOAD(&tg->threads_enabled);
- if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ_ANY) && !stopping) {
+ if (l->rx.shard_info || atleast2(mask)) {
struct accept_queue_ring *ring;
- unsigned int t, t0, t1, t2;
- int base = tg->base;
+ struct listener *new_li;
+ uint n0, n1, n2, r1, r2, t, t1, t2;
+ const struct tgroup_info *g1, *g2;
+ ulong m1, m2;
/* The principle is that we have two running indexes,
* each visiting in turn all threads bound to this
- * listener. The connection will be assigned to the one
- * with the least connections, and the other one will
- * be updated. This provides a good fairness on short
- * connections (round robin) and on long ones (conn
- * count), without ever missing any idle thread.
+ * listener's shard. The connection will be assigned to
+ * the one with the least connections, and the other
+ * one will be updated. This provides a good fairness
+ * on short connections (round robin) and on long ones
+ * (conn count), without ever missing any idle thread.
+ * Each thread number is encoded as a combination of
+ * times the receiver number and its local thread
+ * number from 0 to MAX_THREADS_PER_GROUP - 1. The two
+ * indexes are stored as 16 bit numbers in the thr_idx
+ * variable.
+ *
+ * In the loop below we have this for each index:
+ * - n is the thread index
+ * - r is the receiver number
+ * - g is the receiver's thread group
+ * - t is the thread number in this receiver
+ * - m is the receiver's thread mask shifted by the thread number
*/
/* keep a copy for the final update. thr_idx is composite
- * and made of (t2<<16) + t1.
+ * and made of (n2<<16) + n1.
*/
- t0 = l->thr_idx;
- do {
- unsigned long m1, m2;
+ n0 = l->thr_idx;
+ while (1) {
int q1, q2;
- t2 = t1 = t0;
- t2 >>= 16;
- t1 &= 0xFFFF;
+ new_li = NULL;
+
+ n2 = n1 = n0;
+ n2 >>= 16;
+ n1 &= 0xFFFF;
/* t1 walks low to high bits ;
* t2 walks high to low.
*/
- m1 = mask >> t1;
- m2 = mask & (t2 ? nbits(t2 + 1) : ~0UL);
- if (unlikely(!(m1 & 1))) {
- m1 &= ~1UL;
- if (!m1) {
- m1 = mask;
- t1 = 0;
+ /* calculate r1/g1/t1 first */
+ r1 = n1 / MAX_THREADS_PER_GROUP;
+ t1 = n1 % MAX_THREADS_PER_GROUP;
+ while (1) {
+ if (l->rx.shard_info) {
+ /* multiple listeners, take the group into account */
+ if (r1 >= l->rx.shard_info->nbgroups)
+ r1 = 0;
+
+ g1 = &ha_tgroup_info[l->rx.shard_info->members[r1]->bind_tgroup - 1];
+ m1 = l->rx.shard_info->members[r1]->bind_thread;
+ } else {
+ /* single listener */
+ r1 = 0;
+ g1 = tg;
+ m1 = l->rx.bind_thread;
+ }
+ m1 &= _HA_ATOMIC_LOAD(&g1->threads_enabled);
+ m1 >>= t1;
+
+ /* find first existing thread */
+ if (unlikely(!(m1 & 1))) {
+ m1 &= ~1UL;
+ if (!m1) {
+ /* no more threads here, switch to
+ * first thread of next group.
+ */
+ t1 = 0;
+ if (l->rx.shard_info)
+ r1++;
+ /* loop again */
+ continue;
+ }
+ t1 += my_ffsl(m1) - 1;
}
- t1 += my_ffsl(m1) - 1;
+ /* done: r1 and t1 are OK */
+ break;
}
+ /* now r2/g2/t2 */
+ r2 = n2 / MAX_THREADS_PER_GROUP;
+ t2 = n2 % MAX_THREADS_PER_GROUP;
+
/* if running in round-robin mode ("fair"), we don't need
* to go further.
*/
if ((global.tune.options & GTUNE_LISTENER_MQ_ANY) == GTUNE_LISTENER_MQ_FAIR) {
- t = t1;
+ t = g1->base + t1;
+ if (l->rx.shard_info && t != tid)
+ new_li = l->rx.shard_info->members[r1]->owner;
goto updt_t1;
}
- if (unlikely(!(m2 & (1UL << t2)) || t1 == t2)) {
- /* highest bit not set */
- if (!m2)
- m2 = mask;
-
- t2 = my_flsl(m2) - 1;
+ while (1) {
+ if (l->rx.shard_info) {
+ /* multiple listeners, take the group into account */
+ if (r2 >= l->rx.shard_info->nbgroups)
+ r2 = l->rx.shard_info->nbgroups - 1;
+
+ g2 = &ha_tgroup_info[l->rx.shard_info->members[r2]->bind_tgroup - 1];
+ m2 = l->rx.shard_info->members[r2]->bind_thread;
+ } else {
+ /* single listener */
+ r2 = 0;
+ g2 = tg;
+ m2 = l->rx.bind_thread;
+ }
+ m2 &= _HA_ATOMIC_LOAD(&g2->threads_enabled);
+ m2 &= nbits(t2 + 1);
+
+ /* find previous existing thread */
+ if (unlikely(!(m2 & (1UL << t2)) || (g1 == g2 && t1 == t2))) {
+ /* highest bit not set or colliding threads, let's check
+ * if we still have other threads available after this
+ * one.
+ */
+ m2 &= ~(1UL << t2);
+ if (!m2) {
+ /* no more threads here, switch to
+ * last thread of previous group.
+ */
+ t2 = MAX_THREADS_PER_GROUP - 1;
+ if (l->rx.shard_info)
+ r2--;
+ /* loop again */
+ continue;
+ }
+ t2 = my_flsl(m2) - 1;
+ }
+ /* done: r2 and t2 are OK */
+ break;
}
- /* now we have two distinct thread IDs belonging to the mask */
- q1 = accept_queue_ring_len(&accept_queue_rings[base + t1]);
- q2 = accept_queue_ring_len(&accept_queue_rings[base + t2]);
+ /* here we have (r1,g1,t1) that designate the first receiver, its
+ * thread group and local thread, and (r2,g2,t2) that designate
+ * the second receiver, its thread group and local thread.
+ */
+ q1 = accept_queue_ring_len(&accept_queue_rings[g1->base + t1]);
+ q2 = accept_queue_ring_len(&accept_queue_rings[g2->base + t2]);
+
+ /* add to this the currently active connections */
+ if (l->rx.shard_info) {
+ q1 += _HA_ATOMIC_LOAD(&((struct listener *)l->rx.shard_info->members[r1]->owner)->thr_conn[t1]);
+ q2 += _HA_ATOMIC_LOAD(&((struct listener *)l->rx.shard_info->members[r2]->owner)->thr_conn[t2]);
+ } else {
+ q1 += _HA_ATOMIC_LOAD(&l->thr_conn[t1]);
+ q2 += _HA_ATOMIC_LOAD(&l->thr_conn[t2]);
+ }
/* we have 3 possibilities now :
* q1 < q2 : t1 is less loaded than t2, so we pick it
* than t2.
*/
- q1 += l->thr_conn[t1];
- q2 += l->thr_conn[t2];
-
if (q1 - q2 < 0) {
- t = t1;
- t2 = t2 ? t2 - 1 : LONGBITS - 1;
+ t = g1->base + t1;
+
+ if (l->rx.shard_info)
+ new_li = l->rx.shard_info->members[r1]->owner;
+
+ t2--;
+ if (t2 >= MAX_THREADS_PER_GROUP) {
+ if (l->rx.shard_info)
+ r2--;
+ t2 = MAX_THREADS_PER_GROUP - 1;
+ }
}
else if (q1 - q2 > 0) {
- t = t2;
- t1++;
- if (t1 >= LONGBITS)
- t1 = 0;
+ t = g2->base + t2;
+
+ if (l->rx.shard_info)
+ new_li = l->rx.shard_info->members[r2]->owner;
+ goto updt_t1;
}
else {
- t = t1;
+ t = g1->base + t1;
+
+ if (l->rx.shard_info)
+ new_li = l->rx.shard_info->members[r1]->owner;
updt_t1:
t1++;
- if (t1 >= LONGBITS)
+ if (t1 >= MAX_THREADS_PER_GROUP) {
+ if (l->rx.shard_info)
+ r1++;
t1 = 0;
+ }
}
+ /* the target thread number is in <t> now */
+
/* new value for thr_idx */
- t1 += (t2 << 16);
- } while (unlikely(!_HA_ATOMIC_CAS(&l->thr_idx, &t0, t1)));
+ n1 = ((r1 & 63) * MAX_THREADS_PER_GROUP) + t1;
+ n2 = ((r2 & 63) * MAX_THREADS_PER_GROUP) + t2;
+ n1 += (n2 << 16);
+
+ /* try to update the index */
+ if (likely(_HA_ATOMIC_CAS(&l->thr_idx, &n0, n1)))
+ break;
+ } /* end of main while() loop */
+
+ /* we may need to update the listener in the connection
+ * if we switched to another group.
+ */
+ if (new_li)
+ cli_conn->target = &new_li->obj_type;
+
+ /* here we have the target thread number in <t> and we hold a
+ * reservation in the target ring.
+ */
if (l->rx.proto && l->rx.proto->set_affinity) {
- if (l->rx.proto->set_affinity(cli_conn, base + t)) {
+ if (l->rx.proto->set_affinity(cli_conn, t)) {
/* Failed migration, stay on the same thread. */
goto local_accept;
}
* performing model, likely due to better cache locality
* when processing this loop.
*/
- ring = &accept_queue_rings[base + t];
+ ring = &accept_queue_rings[t];
if (accept_queue_push_mp(ring, cli_conn)) {
- _HA_ATOMIC_INC(&activity[base + t].accq_pushed);
+ _HA_ATOMIC_INC(&activity[t].accq_pushed);
tasklet_wakeup(ring->tasklet);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
- _HA_ATOMIC_INC(&activity[base + t].accq_full);
+ _HA_ATOMIC_INC(&activity[t].accq_full);
}
#endif // USE_THREAD
local_accept:
+ /* restore the connection's listener in case we failed to migrate above */
+ cli_conn->target = &l->obj_type;
_HA_ATOMIC_INC(&l->thr_conn[ti->ltid]);
ret = l->bind_conf->accept(cli_conn);
if (unlikely(ret <= 0)) {