{
struct listener *l = fdtab[fd].owner;
struct proxy *p;
+ __decl_hathreads(unsigned long mask);
int max_accept;
int next_conn = 0;
int next_feconn = 0;
next_actconn = 0;
#if defined(USE_THREAD)
- count = l->bind_conf->thr_count;
- if (count > 1 && (global.tune.options & GTUNE_LISTENER_MQ)) {
+ mask = thread_mask(l->bind_conf->bind_thread);
+ if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ)) {
struct accept_queue_ring *ring;
- int t1, t2, q1, q2;
+ unsigned int t, t0, t1, t2;
- /* pick a first thread ID using a round robin index,
- * and a second thread ID using a random. The
- * connection will be assigned to the one with the
- * least connections. This provides fairness on short
+ /* The principle is that we have two running indexes,
+ * each visiting in turn all threads bound to this
+ * listener. The connection will be assigned to the one
+ * with the least connections, and the other one will
+ * be updated. This provides a good fairness on short
* connections (round robin) and on long ones (conn
- * count).
+ * count), without ever missing any idle thread.
*/
- t1 = l->bind_conf->thr_idx;
+
+ /* keep a copy for the final update. thr_idx is composite
+ * and made of (t2<<16) + t1.
+ */
+ t0 = l->bind_conf->thr_idx;
do {
- t2 = t1 + 1;
- if (t2 >= count)
- t2 = 0;
- } while (!HA_ATOMIC_CAS(&l->bind_conf->thr_idx, &t1, t2));
+ unsigned long m1, m2;
+ int q1, q2;
+
+ t2 = t1 = t0;
+ t2 >>= 16;
+ t1 &= 0xFFFF;
+
+ /* t1 walks low to high bits ;
+ * t2 walks high to low.
+ */
+ m1 = mask >> t1;
+ m2 = mask & (t2 ? nbits(t2 + 1) : ~0UL);
+
+ if (unlikely((signed long)m2 >= 0)) {
+ /* highest bit not set */
+ if (!m2)
+ m2 = mask;
- t2 = (random() >> 8) % (count - 1); // 0..thr_count-2
- t2 += t1 + 1; // necessarily different from t1
+ t2 = my_flsl(m2) - 1;
+ }
- if (t2 >= count)
- t2 -= count;
+ if (unlikely(!(m1 & 1) || t1 == t2)) {
+ m1 &= ~1UL;
+ if (!m1) {
+ m1 = mask;
+ t1 = 0;
+ }
+ t1 += my_ffsl(m1) - 1;
+ }
- t1 = bind_map_thread_id(l->bind_conf, t1);
- t2 = bind_map_thread_id(l->bind_conf, t2);
+ /* now we have two distinct thread IDs belonging to the mask */
+ q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
+ if (q1 >= ACCEPT_QUEUE_SIZE)
+ q1 -= ACCEPT_QUEUE_SIZE;
+
+ q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
+ if (q2 >= ACCEPT_QUEUE_SIZE)
+ q2 -= ACCEPT_QUEUE_SIZE;
+
+ /* we have 3 possibilities now :
+ * q1 < q2 : t1 is less loaded than t2, so we pick it
+ * and update t2 (since t1 might still be
+ * lower than another thread)
+ * q1 > q2 : t2 is less loaded than t1, so we pick it
+ * and update t1 (since t2 might still be
+ * lower than another thread)
+ * q1 = q2 : both are equally loaded, thus we pick t1
+ * and update t1 as it will become more loaded
+ * than t2.
+ */
- q1 = accept_queue_rings[t1].tail - accept_queue_rings[t1].head + ACCEPT_QUEUE_SIZE;
- if (q1 >= ACCEPT_QUEUE_SIZE)
- q1 -= ACCEPT_QUEUE_SIZE;
+ q1 += l->thr_conn[t1];
+ q2 += l->thr_conn[t2];
- q2 = accept_queue_rings[t2].tail - accept_queue_rings[t2].head + ACCEPT_QUEUE_SIZE;
- if (q2 >= ACCEPT_QUEUE_SIZE)
- q2 -= ACCEPT_QUEUE_SIZE;
+ if (q1 - q2 < 0) {
+ t = t1;
+ t2 = t2 ? t2 - 1 : LONGBITS - 1;
+ }
+ else if (q1 - q2 > 0) {
+ t = t2;
+ t1++;
+ if (t1 >= LONGBITS)
+ t1 = 0;
+ }
+ else {
+ t = t1;
+ t1++;
+ if (t1 >= LONGBITS)
+ t1 = 0;
+ }
- /* make t1 the lowest loaded thread */
- if (q1 >= ACCEPT_QUEUE_SIZE || l->thr_conn[t1] + q1 > l->thr_conn[t2] + q2)
- t1 = t2;
+ /* new value for thr_idx */
+ t1 += (t2 << 16);
+ } while (unlikely(!HA_ATOMIC_CAS(&l->bind_conf->thr_idx, &t0, t1)));
- /* We use deferred accepts even if it's the local thread because
- * tests show that it's the best performing model, likely due to
- * better cache locality when processing this loop.
+ /* We successfully selected the best thread "t" for this
+ * connection. We use deferred accepts even if it's the
+ * local thread because tests show that it's the best
+ * performing model, likely due to better cache locality
+ * when processing this loop.
*/
- ring = &accept_queue_rings[t1];
+ ring = &accept_queue_rings[t];
if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
- HA_ATOMIC_ADD(&activity[t1].accq_pushed, 1);
+ HA_ATOMIC_ADD(&activity[t].accq_pushed, 1);
task_wakeup(ring->task, TASK_WOKEN_IO);
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
- HA_ATOMIC_ADD(&activity[t1].accq_full, 1);
+ HA_ATOMIC_ADD(&activity[t].accq_full, 1);
}
#endif // USE_THREAD