int next_conn = 0;
int next_feconn = 0;
int next_actconn = 0;
+ int queued = 0; // <0 if RQ threshold exceeded
int expire;
int ret;
* worst case. If we fail due to system limits or temporary resource
* shortage, we try again 100ms later in the worst case.
*/
- for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--) {
+ for (; max_accept; next_conn = next_feconn = next_actconn = 0, max_accept--, queued++) {
unsigned int count;
int status;
__decl_thread(unsigned long mask);
}
if (!(l->options & LI_O_UNLIMITED)) {
+ if (tasks_run_queue + queued >= global.tune.runqueue_depth) {
+ /* load too high already */
+ next_actconn = 0;
+ goto limit_rqueue;
+ }
+
do {
count = actconn;
if (unlikely(count >= global.maxconn)) {
_HA_ATOMIC_SUB(&actconn, 1);
if ((l->state == LI_FULL && (!l->maxconn || l->nbconn < l->maxconn)) ||
- (l->state == LI_LIMITED &&
+ (l->state == LI_LIMITED && queued >= 0 &&
((!p || p->feconn < p->maxconn) && (actconn < global.maxconn) &&
(!tick_isset(global_listener_queue_task->expire) ||
tick_is_expired(global_listener_queue_task->expire, now_ms))))) {
/* at least one thread has to this when quitting */
resume_listener(l);
- /* Dequeues all of the listeners waiting for a resource */
+ /* Dequeues all of the listeners waiting for a resource in case
+ * we've just aborted an operation that made others stop.
+ */
dequeue_all_listeners();
if (p && !MT_LIST_ISEMPTY(&p->listener_queue) &&
if (p->task && tick_isset(expire))
task_schedule(p->task, expire);
goto end;
+
+ limit_rqueue:
+ /* The run queue is too high. We're temporarily limiting this listener,
+ * and queuing the global task to be woken up right now, so that it's
+ * picked from the wait queue after run queues are OK again. If the
+ * load is just a short spike, there will be no delay. If the load is
+ * maintained, the global task will wait a little bit more.
+ */
+ queued = -1;
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, now_ms);
+ goto end;
+
}
/* Notify the listener that a connection initiated from it was released. This
void listener_release(struct listener *l)
{
struct proxy *fe = l->bind_conf->frontend;
+ int last = 0;
if (!(l->options & LI_O_UNLIMITED))
- _HA_ATOMIC_SUB(&actconn, 1);
+ last = _HA_ATOMIC_SUB(&actconn, 1) == 0;
if (fe)
_HA_ATOMIC_SUB(&fe->feconn, 1);
_HA_ATOMIC_SUB(&l->nbconn, 1);
if (l->state == LI_FULL || l->state == LI_LIMITED)
resume_listener(l);
- /* Dequeues all of the listeners waiting for a resource */
- dequeue_all_listeners();
-
if (!MT_LIST_ISEMPTY(&fe->listener_queue) &&
(!fe->fe_sps_lim || freq_ctr_remain(&fe->fe_sess_per_sec, fe->fe_sps_lim, 0) > 0))
dequeue_proxy_listeners(fe);
+
+ /* Dequeue all of the listeners waiting for the a resource, or requeue
+ * the current listener into the global queue if the load remains too
+ * high. In this case we make sure to reconsider it in no more than one
+ * millisecond, which also makes sure that the global task will let all
+ * current tasks flush before executing.
+ */
+ if (tasks_run_queue < global.tune.runqueue_depth || last) {
+ dequeue_all_listeners();
+ } else {
+ if (l->state != LI_LIMITED)
+ limit_listener(l, &global_listener_queue);
+ task_schedule(global_listener_queue_task, tick_add(now_ms, 1));
+ }
}
/* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */
if (unlikely(actconn >= global.maxconn))
goto out;
+ /* If the run queue is still crowded, we woke up too early, let's wait
+ * a bit more (1ms delay) so that we don't wake up more than 1k times
+ * per second.
+ */
+ if (tasks_run_queue >= global.tune.runqueue_depth) {
+ t->expire = tick_add(now_ms, 1);
+ goto wait_more;
+ }
+
/* We should periodically try to enable listeners waiting for a global
* resource here, because it is possible, though very unlikely, that
* they have been blocked by a temporary lack of global resource such
out:
t->expire = TICK_ETERNITY;
+ wait_more:
task_queue(t);
return t;
}