]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: queue: Handle the race condition between queue and dequeue differently
authorOlivier Houchard <ohouchard@haproxy.com>
Thu, 5 Dec 2024 14:30:06 +0000 (15:30 +0100)
committerOlivier Houchard <cognet@ci0.org>
Tue, 24 Dec 2024 13:10:06 +0000 (14:10 +0100)
There is a small race condition, where a server would check if there is
something left in the proxy queue, and adding something to the proxy
queue. If the server checks just before the stream is added to the queue,
and it no longer has any stream to deal with, then nothing will take
care of the stream, that may stay in the queue forever.
This was worked around with commit 5541d4995d, by checking for that exact
condition after adding the stream to the queue, and trying again to get
a server assigned if it is detected.
That fix lead to multiple infinite loops, that got fixed, but it is not
unlikely that it could happen again. So let's fix the initial problem
differently : a single server may mark itself as ready, and it removes
itself once used. The principle is that when we discover that the just
queued stream is alone with no active request anywhere ot dequeue it,
instead of rebalancing it, it will be assigned to that current "ready"
server that is available to handle it. The extra cost of the atomic ops
is negligible since the situation is super rare.

include/haproxy/proxy-t.h
src/backend.c
src/queue.c
src/server.c

index 97215c64e93b20cff423cd34ff7ff59f42ab8941..4517e1d4e3c436689358d56a3c270aa045ecb4ef 100644 (file)
@@ -317,6 +317,7 @@ struct proxy {
        int srv_act, srv_bck;                   /* # of servers eligible for LB (UP|!checked) AND (enabled+weight!=0) */
        int served;                             /* # of active sessions currently being served */
        int  cookie_len;                        /* strlen(cookie_name), computed only once */
+       struct server *ready_srv;               /* a server being ready to serve requests */
        char *cookie_domain;                    /* domain used to insert the cookie */
        char *cookie_name;                      /* name of the cookie to look for */
        char *cookie_attrs;                     /* list of attributes to add to the cookie */
index 55aeacd085128730e53eaa1d299c39c398d2a3b6..daa1c2a1d84c98c611888dfc48ff6f2d0e81c2da 100644 (file)
@@ -967,7 +967,6 @@ int assign_server_and_queue(struct stream *s)
        struct server *srv;
        int err;
 
- balance_again:
        if (s->pend_pos)
                return SRV_STATUS_INTERNAL;
 
@@ -1088,13 +1087,63 @@ int assign_server_and_queue(struct stream *s)
                         * ended and no new one will be able to dequeue that one.
                         * This is more visible with maxconn 1 where it can
                         * happen 1/1000 times, though the vast majority are
-                        * correctly recovered from. Since it's so rare and we
-                        * have no server assigned, the best solution in this
-                        * case is to detect the condition, dequeue our request
-                        * and balance it again.
+                        * correctly recovered from.
+                        * To work around that, when a server is getting idle,
+                        * it will set the ready_srv field of the proxy.
+                        * Here, if ready_srv is non-NULL, we get that server,
+                        * and we attempt to switch its served from 0 to 1.
+                        * If it works, then we can just run, otherwise,
+                        * it means another stream will be running, and will
+                        * dequeue us eventually, so we can just do nothing.
                         */
-                       if (unlikely(pendconn_must_try_again(p)))
-                               goto balance_again;
+                       if (unlikely(s->be->ready_srv != NULL)) {
+                               struct server *newserv;
+
+                               newserv = HA_ATOMIC_XCHG(&s->be->ready_srv, NULL);
+                               if (newserv != NULL) {
+                                       int got_slot = 0;
+
+                                       while (_HA_ATOMIC_LOAD(&newserv->served) == 0) {
+                                               int served = 0;
+
+                                               if (_HA_ATOMIC_CAS(&newserv->served, &served, 1)) {
+                                                       got_slot = 1;
+                                                       break;
+                                               }
+                                       }
+                                       if (!got_slot) {
+                                               /*
+                                                * Somebody else can now
+                                                * wake up us, stop now.
+                                                */
+                                               return SRV_STATUS_QUEUED;
+                                       }
+
+                                       HA_SPIN_LOCK(QUEUE_LOCK, &p->queue->lock);
+                                       if (!p->node.node.leaf_p) {
+                                               /*
+                                                * Okay we've been queued and
+                                                * unqueued already, just leave
+                                                */
+                                               _HA_ATOMIC_DEC(&newserv->served);
+                                               return SRV_STATUS_QUEUED;
+                                       }
+                                       eb32_delete(&p->node);
+                                       HA_SPIN_UNLOCK(QUEUE_LOCK, &p->queue->lock);
+
+                                       _HA_ATOMIC_DEC(&p->queue->length);
+                                       _HA_ATOMIC_INC(&p->queue->idx);
+                                       _HA_ATOMIC_DEC(&s->be->totpend);
+
+                                       pool_free(pool_head_pendconn, p);
+
+                                       s->flags |= SF_ASSIGNED;
+                                       s->target = &newserv->obj_type;
+                                       s->pend_pos = NULL;
+                                       sess_change_server(s, newserv);
+                                       return SRV_STATUS_OK;
+                               }
+                       }
 
                        return SRV_STATUS_QUEUED;
                }
index 234b01a3b51a81095d8886eb7a0b3c84f35eac26..b4c239e980f5c31a937f4a0268dd4e670e317f98 100644 (file)
@@ -437,6 +437,24 @@ int process_srv_queue(struct server *s)
                if (p->lbprm.server_take_conn)
                        p->lbprm.server_take_conn(s);
        }
+       if (s->served == 0 && p->served == 0 && !HA_ATOMIC_LOAD(&p->ready_srv)) {
+               /*
+                * If there is no task running on the server, and the proxy,
+                * let it known that we are ready, there is a small race
+                * condition if a task was being added just before we checked
+                * the proxy queue. It will look for that server, and use it
+                * if nothing is currently running, as there would be nobody
+                * to wake it up.
+                */
+               _HA_ATOMIC_STORE(&p->ready_srv, s);
+               /*
+                * Maybe a stream was added to the queue just after we
+                * checked, but before we set ready_srv so it would not see it,
+                * just in case try to run one more stream.
+                */
+               if (pendconn_process_next_strm(s, p, px_ok))
+                       done++;
+       }
        return done;
 }
 
index 32b5a4c33976ec3c7b9eb6e7baeb701372d70a55..79b56126fc1dc2a7c579c601bd9381d379251601 100644 (file)
@@ -6834,6 +6834,14 @@ static void srv_update_status(struct server *s, int type, int cause)
                        s->counters.down_trans++;
                        _srv_event_hdl_publish(EVENT_HDL_SUB_SERVER_DOWN, cb_data.common, s);
                }
+
+               /*
+                * If the server is no longer running, let's not pretend
+                * it can handle requests.
+                */
+               if (s->cur_state != SRV_ST_RUNNING && s->proxy->ready_srv == s)
+                       HA_ATOMIC_STORE(&s->proxy->ready_srv, NULL);
+
                s->counters.last_change = ns_to_sec(now_ns);
 
                /* publish the state change */