/* Marks a ready listener as full so that the stream code tries to re-enable
* it upon next close() using resume_listener().
- *
- * Note: this function is only called from listener_accept so <l> is already
- * locked.
*/
static void listener_full(struct listener *l)
{
+ HA_SPIN_LOCK(LISTENER_LOCK, &l->lock);
if (l->state >= LI_READY) {
if (l->state == LI_LIMITED) {
HA_SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_DEL(&l->wait_queue);
HA_SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
}
-
- fd_stop_recv(l->fd);
- l->state = LI_FULL;
+ if (l->state != LI_FULL) {
+ fd_stop_recv(l->fd);
+ l->state = LI_FULL;
+ }
}
+ HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
}
/* Marks a ready listener as limited so that we only try to re-enable it when
* resources are free again. It will be queued into the specified queue.
- *
- * Note: this function is only called from listener_accept so <l> is already
- * locked.
*/
static void limit_listener(struct listener *l, struct list *list)
{
+ HA_SPIN_LOCK(LISTENER_LOCK, &l->lock);
if (l->state == LI_READY) {
HA_SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
LIST_ADDQ(list, &l->wait_queue);
fd_stop_recv(l->fd);
l->state = LI_LIMITED;
}
+ HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
}
/* This function adds all of the protocol's listener's file descriptors to the
*/
void delete_listener(struct listener *listener)
{
- if (listener->state != LI_ASSIGNED)
- return;
-
HA_SPIN_LOCK(LISTENER_LOCK, &listener->lock);
- listener->state = LI_INIT;
- LIST_DEL(&listener->proto_list);
- listener->proto->nb_listeners--;
- HA_ATOMIC_SUB(&jobs, 1);
- HA_ATOMIC_SUB(&listeners, 1);
+ if (listener->state == LI_ASSIGNED) {
+ listener->state = LI_INIT;
+ LIST_DEL(&listener->proto_list);
+ listener->proto->nb_listeners--;
+ HA_ATOMIC_SUB(&jobs, 1);
+ HA_ATOMIC_SUB(&listeners, 1);
+ }
HA_SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
}
struct listener *l = fdtab[fd].owner;
struct proxy *p;
int max_accept;
+ int next_conn = 0;
int expire;
int cfd;
int ret;
return;
p = l->bind_conf->frontend;
max_accept = l->maxaccept ? l->maxaccept : 1;
- if (HA_SPIN_TRYLOCK(LISTENER_LOCK, &l->lock))
- return;
if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) {
int max = freq_ctr_remain(&global.sess_per_sec, global.sps_lim, 0);
* worst case. If we fail due to system limits or temporary resource
* shortage, we try again 100ms later in the worst case.
*/
- while (l->nbconn < l->maxconn && max_accept--) {
+ for (; max_accept-- > 0; next_conn = 0) {
struct sockaddr_storage addr;
socklen_t laddr = sizeof(addr);
unsigned int count;
+ /* pre-increase the number of connections without going too far */
+ do {
+ count = l->nbconn;
+ if (count >= l->maxconn) {
+ /* the listener was marked full or another
+ * thread is going to do it.
+ */
+ next_conn = 0;
+ goto end;
+ }
+ next_conn = count + 1;
+ } while (!HA_ATOMIC_CAS(&l->nbconn, &count, next_conn));
+
+ if (next_conn == l->maxconn) {
+ /* we filled it, mark it full */
+ listener_full(l);
+ }
+
if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
goto transient_error;
case EINTR:
case ECONNABORTED:
+ HA_ATOMIC_SUB(&l->nbconn, 1);
continue;
case ENFILE:
if (p)
if (unlikely(master == 1))
fcntl(cfd, F_SETFD, FD_CLOEXEC);
+ /* The connection was accepted, it must be counted as such */
+ if (l->counters)
+ HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
+
+ if (!(l->options & LI_O_UNLIMITED)) {
+ count = update_freq_ctr(&global.conn_per_sec, 1);
+ HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
+ HA_ATOMIC_ADD(&actconn, 1);
+ }
+
if (unlikely(cfd >= global.maxsock)) {
send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id);
+ if (!(l->options & LI_O_UNLIMITED))
+ HA_ATOMIC_SUB(&actconn, 1);
close(cfd);
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
goto end;
}
- /* increase the per-process number of cumulated connections */
- if (!(l->options & LI_O_UNLIMITED)) {
- count = update_freq_ctr(&global.conn_per_sec, 1);
- HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
- HA_ATOMIC_ADD(&actconn, 1);
- }
-
- count = HA_ATOMIC_ADD(&l->nbconn, 1);
- if (l->counters)
- HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count);
+ /* past this point, l->accept() will automatically decrement
+ * l->nbconn and actconn once done. Setting next_conn=0 allows
+ * the error path not to rollback on nbconn. It's more convenient
+ * than duplicating all exit labels.
+ */
+ next_conn = 0;
ret = l->accept(l, cfd, &addr);
if (unlikely(ret <= 0)) {
goto transient_error;
}
- /* increase the per-process number of cumulated connections */
+ /* increase the per-process number of cumulated sessions, this
+ * may only be done once l->accept() has accepted the connection.
+ */
if (!(l->options & LI_O_UNLIMITED)) {
count = update_freq_ctr(&global.sess_per_sec, 1);
HA_ATOMIC_UPDATE_MAX(&global.sps_max, count);
}
#endif
- } /* end of while (max_accept--) */
+ } /* end of for (max_accept--) */
/* we've exhausted max_accept, so there is no need to poll again */
stop:
limit_listener(l, &global_listener_queue);
task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire));
end:
- if (l->nbconn >= l->maxconn)
- listener_full(l);
+ if (next_conn)
+ HA_ATOMIC_SUB(&l->nbconn, 1);
- HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
+ if (l->nbconn < l->maxconn && l->state == LI_FULL) {
+ /* at least one thread has to this when quitting */
+ resume_listener(l);
+
+ /* Dequeues all of the listeners waiting for a resource */
+ if (!LIST_ISEMPTY(&global_listener_queue))
+ dequeue_all_listeners(&global_listener_queue);
+
+ if (!LIST_ISEMPTY(&p->listener_queue) &&
+ (!p->fe_sps_lim || freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0) > 0))
+ dequeue_all_listeners(&p->listener_queue);
+ }
}
/* Notify the listener that a connection initiated from it was released. This