* When a pending connection is dequeued, this function returns 1 if a pendconn
* is dequeued, otherwise 0.
*/
-static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok)
+static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int px_ok, int tgrp)
{
struct pendconn *p = NULL;
struct pendconn *pp = NULL;
int got_it = 0;
p = NULL;
- if (srv->queue.length)
- p = pendconn_first(&srv->queue.head);
+ if (srv->per_tgrp[tgrp - 1].queue.length)
+ p = pendconn_first(&srv->per_tgrp[tgrp - 1].queue.head);
pp = NULL;
- if (px_ok && px->queue.length) {
+ if (px_ok && px->per_tgrp[tgrp - 1].queue.length) {
/* the lock only remains held as long as the pp is
* in the proxy's queue.
*/
- HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock);
- pp = pendconn_first(&px->queue.head);
+ HA_SPIN_LOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
+ pp = pendconn_first(&px->per_tgrp[tgrp - 1].queue.head);
if (!pp)
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
}
if (!p && !pp)
/* No more slot available, give up */
if (!got_it) {
if (pp)
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
return 0;
}
/* now the element won't go, we can release the proxy */
__pendconn_unlink_prx(pp);
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
pp->strm_flags |= SF_ASSIGNED;
pp->target = srv;
task_wakeup(pp->strm->task, TASK_WOKEN_RES);
HA_SPIN_UNLOCK(QUEUE_LOCK, &pp->del_lock);
- _HA_ATOMIC_DEC(&px->queue.length);
- _HA_ATOMIC_INC(&px->queue.idx);
+ _HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length);
+ _HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx);
_HA_ATOMIC_DEC(&px->queueslength);
return 1;
use_p:
/* we don't need the px queue lock anymore, we have the server's lock */
if (pp)
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &px->per_tgrp[tgrp - 1].queue.lock);
p->strm_flags |= SF_ASSIGNED;
p->target = srv;
task_wakeup(p->strm->task, TASK_WOKEN_RES);
__pendconn_unlink_srv(p);
- _HA_ATOMIC_DEC(&srv->queue.length);
- _HA_ATOMIC_INC(&srv->queue.idx);
+ _HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length);
+ _HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx);
_HA_ATOMIC_DEC(&srv->queueslength);
return 1;
}
{
struct server *ref = s->track ? s->track : s;
struct proxy *p = s->proxy;
+ uint64_t non_empty_tgids = all_tgroups_mask;
int maxconn;
- int stop = 0;
int done = 0;
int px_ok;
+ int cur_tgrp;
/* if a server is not usable or backup and must not be used
* to dequeue backend requests.
* and would occasionally leave entries in the queue that are never
* dequeued. Nobody else uses the dequeuing flag so when seeing it
* non-null, we're certain that another thread is waiting on it.
+ *
+ * We'll dequeue MAX_SELF_USE_QUEUE items from the queue corresponding
+ * to our thread group, then we'll get one from a different one, to
+ * be sure those actually get processsed too.
*/
- while (!stop && (done < global.tune.maxpollevents || !s->served) &&
+ while (non_empty_tgids != 0
+ && (done < global.tune.maxpollevents || !s->served) &&
s->served < (maxconn = srv_dynamic_maxconn(s))) {
- if (HA_ATOMIC_XCHG(&s->dequeuing, 1))
- break;
+ int self_served;
+ int to_dequeue;
+
+ /*
+ * self_served contains the number of times we dequeued items
+ * from our own thread-group queue.
+ */
+ self_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].self_served) % (MAX_SELF_USE_QUEUE + 1);
+ if ((self_served == MAX_SELF_USE_QUEUE && non_empty_tgids != (1UL << (tgid - 1))) ||
+ !(non_empty_tgids & (1UL << (tgid - 1)))) {
+ int old_served, new_served;
- HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
- while (s->served < maxconn) {
+ /*
+ * We want to dequeue from another queue. The last
+ * one we used is stored in last_other_tgrp_served.
+ */
+ old_served = _HA_ATOMIC_LOAD(&s->per_tgrp[tgid - 1].last_other_tgrp_served);
+ do {
+ new_served = old_served + 1;
+
+ /*
+ * Find the next tgrp to dequeue from.
+ * If we're here then we know there is
+ * at least one tgrp that is not the current
+ * tgrp that we can dequeue from, so that
+ * loop will end eventually.
+ */
+ while (new_served == tgid ||
+ new_served == global.nbtgroups + 1 ||
+ !(non_empty_tgids & (1UL << (new_served - 1)))) {
+ if (new_served == global.nbtgroups + 1)
+ new_served = 1;
+ else
+ new_served++;
+ }
+ } while (!_HA_ATOMIC_CAS(&s->per_tgrp[tgid - 1].last_other_tgrp_served, &old_served, new_served));
+ cur_tgrp = new_served;
+ to_dequeue = 1;
+ } else {
+ cur_tgrp = tgid;
+ if (self_served == MAX_SELF_USE_QUEUE)
+ self_served = 0;
+ to_dequeue = MAX_SELF_USE_QUEUE - self_served;
+ }
+ if (HA_ATOMIC_XCHG(&s->per_tgrp[cur_tgrp - 1].dequeuing, 1)) {
+ non_empty_tgids &= ~(1UL << (cur_tgrp - 1));
+ continue;
+ }
+
+ HA_SPIN_LOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock);
+ while (to_dequeue > 0 && s->served < maxconn) {
/*
* pendconn_process_next_strm() will increment
* the served field, only if it is < maxconn.
*/
- stop = !pendconn_process_next_strm(s, p, px_ok);
- if (stop)
+ if (!pendconn_process_next_strm(s, p, px_ok, cur_tgrp)) {
+ non_empty_tgids &= ~(1UL << (cur_tgrp - 1));
break;
+ }
+ to_dequeue--;
+ if (cur_tgrp == tgid)
+ _HA_ATOMIC_INC(&s->per_tgrp[tgid - 1].self_served);
done++;
if (done >= global.tune.maxpollevents)
break;
}
- HA_ATOMIC_STORE(&s->dequeuing, 0);
- HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock);
+ HA_ATOMIC_STORE(&s->per_tgrp[cur_tgrp - 1].dequeuing, 0);
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &s->per_tgrp[cur_tgrp - 1].queue.lock);
}
if (done) {
p->lbprm.server_take_conn(s);
}
if (s->served == 0 && p->served == 0 && !HA_ATOMIC_LOAD(&p->ready_srv)) {
+ int i;
+
/*
* If there is no task running on the server, and the proxy,
* let it known that we are ready, there is a small race
* checked, but before we set ready_srv so it would not see it,
* just in case try to run one more stream.
*/
- if (pendconn_process_next_strm(s, p, px_ok)) {
- _HA_ATOMIC_SUB(&p->totpend, 1);
- _HA_ATOMIC_ADD(&p->served, 1);
- done++;
+ for (i = 0; i < global.nbtgroups; i++) {
+ if (pendconn_process_next_strm(s, p, px_ok, i + 1)) {
+ _HA_ATOMIC_SUB(&p->totpend, 1);
+ _HA_ATOMIC_ADD(&p->served, 1);
+ done++;
+ break;
+ }
}
}
return done;
srv = NULL;
if (srv) {
- q = &srv->queue;
+ q = &srv->per_tgrp[tgid - 1].queue;
max_ptr = &srv->counters.nbpend_max;
queueslength = &srv->queueslength;
}
else {
- q = &px->queue;
+ q = &px->per_tgrp[tgid - 1].queue;
max_ptr = &px->be_counters.nbpend_max;
queueslength = &px->queueslength;
}
struct proxy *px = s->proxy;
int px_xferred = 0;
int xferred = 0;
+ int i;
/* The REDISP option was specified. We will ignore cookie and force to
* balance or use the dispatcher.
(s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
goto skip_srv_queue;
- HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock);
- for (node = eb32_first(&s->queue.head); node; node = nodeb) {
- nodeb = eb32_next(node);
+ for (i = 0; i < global.nbtgroups; i++) {
+ struct queue *queue = &s->per_tgrp[i].queue;
+ int local_xferred = 0;
- p = eb32_entry(node, struct pendconn, node);
- if (p->strm_flags & SF_FORCE_PRST)
- continue;
+ HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock);
+ for (node = eb32_first(&queue->head); node; node = nodeb) {
+ nodeb = eb32_next(node);
- /* it's left to the dispatcher to choose a server */
- __pendconn_unlink_srv(p);
- if (!(s->proxy->options & PR_O_REDISP))
- p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED);
+ p = eb32_entry(node, struct pendconn, node);
+ if (p->strm_flags & SF_FORCE_PRST)
+ continue;
- task_wakeup(p->strm->task, TASK_WOKEN_RES);
- xferred++;
+ /* it's left to the dispatcher to choose a server */
+ __pendconn_unlink_srv(p);
+ if (!(s->proxy->options & PR_O_REDISP))
+ p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED);
+
+ task_wakeup(p->strm->task, TASK_WOKEN_RES);
+ local_xferred++;
+ }
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock);
+ xferred += local_xferred;
+ if (local_xferred)
+ _HA_ATOMIC_SUB(&queue->length, local_xferred);
}
- HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock);
if (xferred) {
- _HA_ATOMIC_SUB(&s->queue.length, xferred);
_HA_ATOMIC_SUB(&s->queueslength, xferred);
_HA_ATOMIC_SUB(&s->proxy->totpend, xferred);
}
if (px->lbprm.tot_wact || px->lbprm.tot_wbck)
goto done;
- HA_SPIN_LOCK(QUEUE_LOCK, &px->queue.lock);
- for (node = eb32_first(&px->queue.head); node; node = nodeb) {
- nodeb = eb32_next(node);
- p = eb32_entry(node, struct pendconn, node);
+ for (i = 0; i < global.nbtgroups; i++) {
+ struct queue *queue = &px->per_tgrp[i].queue;
+ int local_xferred = 0;
- /* force-persist streams may occasionally appear in the
- * proxy's queue, and we certainly don't want them here!
- */
- p->strm_flags &= ~SF_FORCE_PRST;
- __pendconn_unlink_prx(p);
+ HA_SPIN_LOCK(QUEUE_LOCK, &queue->lock);
+ for (node = eb32_first(&queue->head); node; node = nodeb) {
+ nodeb = eb32_next(node);
+ p = eb32_entry(node, struct pendconn, node);
+
+ /* force-persist streams may occasionally appear in the
+ * proxy's queue, and we certainly don't want them here!
+ */
+ p->strm_flags &= ~SF_FORCE_PRST;
+ __pendconn_unlink_prx(p);
- task_wakeup(p->strm->task, TASK_WOKEN_RES);
- px_xferred++;
+ task_wakeup(p->strm->task, TASK_WOKEN_RES);
+ local_xferred++;
+ }
+ HA_SPIN_UNLOCK(QUEUE_LOCK, &queue->lock);
+ if (local_xferred)
+ _HA_ATOMIC_SUB(&queue->length, local_xferred);
+ px_xferred += local_xferred;
}
- HA_SPIN_UNLOCK(QUEUE_LOCK, &px->queue.lock);
if (px_xferred) {
- _HA_ATOMIC_SUB(&px->queue.length, px_xferred);
_HA_ATOMIC_SUB(&px->queueslength, px_xferred);
_HA_ATOMIC_SUB(&px->totpend, px_xferred);
}