]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
BUG/MAJOR: threads/queue: Fix thread-safety issues on the queues management
authorChristopher Faulet <cfaulet@haproxy.com>
Wed, 14 Mar 2018 15:18:06 +0000 (16:18 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 19 Mar 2018 09:03:06 +0000 (10:03 +0100)
The management of the servers and the proxies queues was not thread-safe at
all. First, the accesses to <strm>->pend_pos were not protected. So it was
possible to release it on a thread (for instance because the stream is released)
and to use it in same time on another one (because we redispatch pending
connections for a server). Then, the accesses to stream's information (flags and
target) from anywhere is forbidden. To be safe, The stream's state must always
be updated in the context of process_stream.

So to fix these issues, the queue module has been refactored. A lock has been
added in the pendconn structure. And now, when we try to dequeue a pending
connection, we start by unlinking it from the server/proxy queue and we wake up
the stream. Then, it is the stream reponsibility to really dequeue it (or
release it). This way, we are sure that only the stream can create and release
its <pend_pos> field.

However, be careful. This new implementation should be thread-safe
(hopefully...). But it is not optimal and in some situations, it could be really
slower in multi-threaded mode than in single-threaded one. The problem is that,
when we try to dequeue pending connections, we process it from the older one to
the newer one independently to the thread's affinity. So we need to wait the
other threads' wakeup to really process them. If threads are blocked in the
poller, this will add a significant latency. This problem happens when maxconn
values are very low.

This patch must be backported in 1.8.

include/common/hathreads.h
include/proto/queue.h
include/types/queue.h
include/types/stream.h
src/proto_http.c
src/queue.c
src/stream.c

index 30009cc82486cafa6f6e110388882df027111c96..2620b7791f281d6f34eadc322ef2e3e1e4f47ae2 100644 (file)
@@ -292,6 +292,7 @@ enum lock_label {
        PIPES_LOCK,
        START_LOCK,
        TLSKEYS_REF_LOCK,
+       PENDCONN_LOCK,
        LOCK_LABELS
 };
 struct lock_stat {
@@ -409,6 +410,7 @@ static inline const char *lock_label(enum lock_label label)
        case PIPES_LOCK:           return "PIPES";
        case START_LOCK:           return "START";
        case TLSKEYS_REF_LOCK:     return "TLSKEYS_REF";
+       case PENDCONN_LOCK:        return "PENDCONN";
        case LOCK_LABELS:          break; /* keep compiler happy */
        };
        /* only way to come here is consecutive to an internal bug */
index f66d809f1b30f93c0fd556cc53d0ee35312aed7b..2d4773a09ac4fcdd153bcd7a631e166dec234e67 100644 (file)
@@ -38,6 +38,7 @@ extern struct pool_head *pool_head_pendconn;
 
 int init_pendconn();
 struct pendconn *pendconn_add(struct stream *strm);
+int pendconn_dequeue(struct stream *strm);
 void pendconn_free(struct pendconn *p);
 void process_srv_queue(struct server *s);
 unsigned int srv_dynamic_maxconn(const struct server *s);
index 4b3545140d8e49e683813863c77ea5569de7062c..42dbbd0478223de34496b33ffc25b8fba048bb8c 100644 (file)
 
 #include <common/config.h>
 #include <common/mini-clist.h>
+#include <common/hathreads.h>
 
 #include <types/server.h>
 
 struct stream;
 
 struct pendconn {
-       struct list list;               /* chaining ... */
-       struct stream *strm;            /* the stream waiting for a connection */
-       struct server *srv;             /* the server we are waiting for */
+       int            strm_flags; /* stream flags */
+       struct stream *strm;
+       struct proxy  *px;
+       struct server *srv;        /* the server we are waiting for, may be NULL */
+       struct list    list;       /* next pendconn */
+       __decl_hathreads(HA_SPINLOCK_T lock);
 };
 
 #endif /* _TYPES_QUEUE_H */
index 227b0ffbab705f5edc4f2ebaa0b557509d775a2b..0dbc79f446cb3e34062fa9c2607005e9ef601b7c 100644 (file)
@@ -124,7 +124,7 @@ struct stream {
        struct session *sess;           /* the session this stream is attached to */
 
        struct server *srv_conn;        /* stream already has a slot on a server and is not in queue */
-       struct pendconn *pend_pos;      /* if not NULL, points to the position in the pending queue */
+       struct pendconn *pend_pos;      /* if not NULL, points to the pending position in the pending queue */
 
        struct http_txn *txn;           /* current HTTP transaction being processed. Should become a list. */
 
index ae582b3deb85433eafee372e5e9beb1541cfdc82..80e001d694eb9139a9f772f17ef070777370908e 100644 (file)
@@ -8253,8 +8253,6 @@ void http_reset_txn(struct stream *s)
        s->store_count = 0;
        s->uniq_id = global.req_count++;
 
-       s->pend_pos = NULL;
-
        s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough */
 
        /* We must trim any excess data from the response buffer, because we
index 1dea7d53baf6fb49f8835247e65e8d8a1766827d..aa40fba729cd82334c83e41858c2484693ced28a 100644 (file)
@@ -24,8 +24,6 @@
 
 struct pool_head *pool_head_pendconn;
 
-static void __pendconn_free(struct pendconn *p);
-
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
 int init_pendconn()
 {
@@ -63,78 +61,99 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
        return max;
 }
 
-
-/* Returns the first pending connection for server <s>, which may be NULL if
- * nothing is pending.
- */
-static inline struct pendconn *pendconn_from_srv(const struct server *s) {
-       if (!s->nbpend)
-               return NULL;
-       return LIST_ELEM(s->pendconns.n, struct pendconn *, list);
-}
-
-/* Returns the first pending connection for proxy <px>, which may be NULL if
- * nothing is pending.
+/* Remove the pendconn from the server/proxy queue. At this stage, the
+ * connection is not really dequeued. It will be done during the
+ * process_stream. This function must be called by function owning the locks on
+ * the pendconn _AND_ the server/proxy. It also decreases the pending count.
+ *
+ * The caller must own the lock on the pendconn _AND_ the queue containing the
+ * pendconn. The pendconn must still be queued.
  */
-static inline struct pendconn *pendconn_from_px(const struct proxy *px) {
-       if (!px->nbpend)
-               return NULL;
-
-       return LIST_ELEM(px->pendconns.n, struct pendconn *, list);
+static void pendconn_unlink(struct pendconn *p)
+{
+       if (p->srv)
+               p->srv->nbpend--;
+       else
+               p->px->nbpend--;
+       HA_ATOMIC_SUB(&p->px->totpend, 1);
+       LIST_DEL(&p->list);
+       LIST_INIT(&p->list);
 }
 
-
-/* Detaches the next pending connection from either a server or a proxy, and
- * returns its associated stream. If no pending connection is found, NULL is
- * returned. Note that neither <srv> nor <px> may be NULL.
- * Priority is given to the oldest request in the queue if both <srv> and <px>
- * have pending requests. This ensures that no request will be left unserved.
- * The <px> queue is not considered if the server (or a tracked server) is not
- * RUNNING, is disabled, or has a null weight (server going down). The <srv>
- * queue is still considered in this case, because if some connections remain
- * there, it means that some requests have been forced there after it was seen
- * down (eg: due to option persist).
- * The stream is immediately marked as "assigned", and both its <srv> and
- * <srv_conn> are set to <srv>,
+/* Process the next pending connection from either a server or a proxy, and
+ * returns 0 on success. If no pending connection is found, 1 is returned.
+ * Note that neither <srv> nor <px> may be NULL.  Priority is given to the
+ * oldest request in the queue if both <srv> and <px> have pending
+ * requests. This ensures that no request will be left unserved.  The <px> queue
+ * is not considered if the server (or a tracked server) is not RUNNING, is
+ * disabled, or has a null weight (server going down). The <srv> queue is still
+ * considered in this case, because if some connections remain there, it means
+ * that some requests have been forced there after it was seen down (eg: due to
+ * option persist).  The stream is immediately marked as "assigned", and both
+ * its <srv> and <srv_conn> are set to <srv>.
+ *
+ * This function must only be called if the server queue _AND_ the proxy queue
+ * are locked. Today it is only called by process_srv_queue.
  */
-static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *px)
+static int pendconn_process_next_strm(struct server *srv, struct proxy *px)
 {
-       struct pendconn *ps, *pp;
-       struct stream *strm;
-       struct server *rsrv;
+       struct pendconn *p = NULL;
+       struct server   *rsrv;
 
        rsrv = srv->track;
        if (!rsrv)
                rsrv = srv;
 
-       ps = pendconn_from_srv(srv);
-       pp = pendconn_from_px(px);
-       /* we want to get the definitive pendconn in <ps> */
-       if (!pp || !srv_currently_usable(rsrv)) {
-               if (!ps)
-                       return NULL;
-       } else {
-               /* pendconn exists in the proxy queue */
-               if (!ps || tv_islt(&pp->strm->logs.tv_request, &ps->strm->logs.tv_request))
-                       ps = pp;
+       if (srv->nbpend) {
+               list_for_each_entry(p, &srv->pendconns, list) {
+                       if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+                               goto ps_found;
+               }
+               p = NULL;
        }
-       strm = ps->strm;
-       __pendconn_free(ps);
 
-       /* we want to note that the stream has now been assigned a server */
-       strm->flags |= SF_ASSIGNED;
-       strm->target = &srv->obj_type;
-       __stream_add_srv_conn(strm, srv);
+  ps_found:
+       if (srv_currently_usable(rsrv) && px->nbpend) {
+               struct pendconn *pp;
+
+               list_for_each_entry(pp, &px->pendconns, list) {
+                       /* If the server pendconn is older than the proxy one,
+                        * we process the server one. */
+                       if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request))
+                               goto pendconn_found;
+
+                       if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) {
+                               /* Let's switch from the server pendconn to the
+                                * proxy pendconn. Don't forget to unlock the
+                                * server pendconn, if any. */
+                               if (p)
+                                       HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+                               p = pp;
+                               goto pendconn_found;
+                       }
+               }
+       }
+
+       if (!p)
+               return 1;
+
+  pendconn_found:
+       pendconn_unlink(p);
+       p->strm_flags |= SF_ASSIGNED;
+       p->srv = srv;
+
        HA_ATOMIC_ADD(&srv->served, 1);
        HA_ATOMIC_ADD(&srv->proxy->served, 1);
        if (px->lbprm.server_take_conn)
                px->lbprm.server_take_conn(srv);
+       __stream_add_srv_conn(p->strm, srv);
 
-       return strm;
+       task_wakeup(p->strm->task, TASK_WOKEN_RES);
+       HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+       return 0;
 }
 
-/*
- * Manages a server's connection queue. This function will try to dequeue as
+/* Manages a server's connection queue. This function will try to dequeue as
  * many pending streams as possible, and wake them up.
  */
 void process_srv_queue(struct server *s)
@@ -144,17 +163,10 @@ void process_srv_queue(struct server *s)
 
        HA_SPIN_LOCK(PROXY_LOCK,  &p->lock);
        HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-
-       /* First, check if we can handle some connections queued at the proxy. We
-        * will take as many as we can handle.
-        */
        maxconn = srv_dynamic_maxconn(s);
        while (s->served < maxconn) {
-               struct stream *strm = pendconn_get_next_strm(s, p);
-
-               if (strm == NULL)
+               if (pendconn_process_next_strm(s, p))
                        break;
-               task_wakeup(strm->task, TASK_WOKEN_RES);
        }
        HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
        HA_SPIN_UNLOCK(PROXY_LOCK,  &p->lock);
@@ -165,39 +177,50 @@ void process_srv_queue(struct server *s)
  * are updated accordingly. Returns NULL if no memory is available, otherwise the
  * pendconn itself. If the stream was already marked as served, its flag is
  * cleared. It is illegal to call this function with a non-NULL strm->srv_conn.
+ *
+ * This function must be called by the stream itself, so in the context of
+ * process_stream.
  */
 struct pendconn *pendconn_add(struct stream *strm)
 {
        struct pendconn *p;
-       struct server *srv;
-       int count;
+       struct proxy    *px;
+       struct server   *srv;
 
        p = pool_alloc(pool_head_pendconn);
        if (!p)
                return NULL;
 
-       strm->pend_pos = p;
-       p->strm = strm;
        srv = objt_server(strm->target);
+       px  = strm->be;
+
+       p->srv        = NULL;
+       p->px         = px;
+       p->strm       = strm;
+       p->strm_flags = strm->flags;
+       HA_SPIN_INIT(&p->lock);
 
        if ((strm->flags & SF_ASSIGNED) && srv) {
                p->srv = srv;
                HA_SPIN_LOCK(SERVER_LOCK, &srv->lock);
+               srv->nbpend++;
+               strm->logs.srv_queue_size += srv->nbpend;
+               if (srv->nbpend > srv->counters.nbpend_max)
+                       srv->counters.nbpend_max = srv->nbpend;
                LIST_ADDQ(&srv->pendconns, &p->list);
                HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
-               count = HA_ATOMIC_ADD(&srv->nbpend, 1);
-               strm->logs.srv_queue_size += count;
-               HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count);
-       } else {
-               p->srv = NULL;
-               HA_SPIN_LOCK(PROXY_LOCK, &strm->be->lock);
-               LIST_ADDQ(&strm->be->pendconns, &p->list);
-               HA_SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock);
-               count = HA_ATOMIC_ADD(&strm->be->nbpend, 1);
-               strm->logs.prx_queue_size += count;
-               HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count);
        }
-       HA_ATOMIC_ADD(&strm->be->totpend, 1);
+       else {
+               HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+               px->nbpend++;
+               strm->logs.prx_queue_size += px->nbpend;
+               if (px->nbpend > px->be_counters.nbpend_max)
+                       px->be_counters.nbpend_max = px->nbpend;
+               LIST_ADDQ(&px->pendconns, &p->list);
+               HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+       }
+       HA_ATOMIC_ADD(&px->totpend, 1);
+       strm->pend_pos = p;
        return p;
 }
 
@@ -206,26 +229,28 @@ struct pendconn *pendconn_add(struct stream *strm)
  */
 int pendconn_redistribute(struct server *s)
 {
-       struct pendconn *pc, *pc_bck;
+       struct pendconn *p, *pback;
        int xferred = 0;
 
+       /* The REDISP option was specified. We will ignore cookie and force to
+        * balance or use the dispatcher. */
+       if ((s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
+               return 0;
+
        HA_SPIN_LOCK(SERVER_LOCK, &s->lock);
-       list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) {
-               struct stream *strm = pc->strm;
+       list_for_each_entry_safe(p, pback, &s->pendconns, list) {
+               if (p->strm_flags & SF_FORCE_PRST)
+                       continue;
 
-               if ((strm->be->options & (PR_O_REDISP|PR_O_PERSIST)) == PR_O_REDISP &&
-                   !(strm->flags & SF_FORCE_PRST)) {
-                       /* The REDISP option was specified. We will ignore
-                        * cookie and force to balance or use the dispatcher.
-                        */
+               if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+                       continue;
 
-                       /* it's left to the dispatcher to choose a server */
-                       strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+               /* it's left to the dispatcher to choose a server */
+               pendconn_unlink(p);
+               p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
 
-                       __pendconn_free(pc);
-                       task_wakeup(strm->task, TASK_WOKEN_RES);
-                       xferred++;
-               }
+               task_wakeup(p->strm->task, TASK_WOKEN_RES);
+               HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
        }
        HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock);
        return xferred;
@@ -238,65 +263,110 @@ int pendconn_redistribute(struct server *s)
  */
 int pendconn_grab_from_px(struct server *s)
 {
-       int xferred;
+       struct pendconn *p, *pback;
+       int maxconn, xferred = 0;
 
        if (!srv_currently_usable(s))
                return 0;
 
        HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock);
-       for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) {
-               struct stream *strm;
-               struct pendconn *p;
-
-               p = pendconn_from_px(s->proxy);
-               if (!p)
+       maxconn = srv_dynamic_maxconn(s);
+       list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) {
+               if (s->maxconn && s->served + xferred >= maxconn)
                        break;
-               p->strm->target = &s->obj_type;
-               strm = p->strm;
-               __pendconn_free(p);
-               task_wakeup(strm->task, TASK_WOKEN_RES);
+
+               if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock))
+                       continue;
+
+               pendconn_unlink(p);
+               p->srv = s;
+
+               task_wakeup(p->strm->task, TASK_WOKEN_RES);
+               HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+               xferred++;
        }
        HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock);
        return xferred;
 }
 
-/*
- * Detaches pending connection <p>, decreases the pending count, and frees
- * the pending connection. The connection might have been queued to a specific
- * server as well as to the proxy. The stream also gets marked unqueued.
+/* Try to dequeue pending connection attached to the stream <strm>. It must
+ * always exists here. If the pendconn is still linked to the server or the
+ * proxy queue, nothing is done and the function returns 1. Otherwise,
+ * <strm>->flags and <strm>->target are updated, the pendconn is released and 0
+ * is returned.
+ *
+ * This function must be called by the stream itself, so in the context of
+ * process_stream.
  */
-void pendconn_free(struct pendconn *p)
+int pendconn_dequeue(struct stream *strm)
 {
-       if (p->srv) {
-               HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
-               LIST_DEL(&p->list);
-               HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
-               HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+       struct pendconn *p;
+
+       if (unlikely(!strm->pend_pos)) {
+               /* unexpected case because it is called by the stream itself and
+                * only the stream can release a pendconn. So it is only
+                * possible if a pendconn is released by someone else or if the
+                * stream is supposed to be queued but without its associated
+                * pendconn. In both cases it is a bug! */
+               abort();
        }
-       else {
-               HA_SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock);
-               LIST_DEL(&p->list);
-               HA_SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock);
-               HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+       p = strm->pend_pos;
+       HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
+
+       /* the pendconn is still linked to the server/proxy queue, so unlock it
+        * and go away. */
+       if (!LIST_ISEMPTY(&p->list)) {
+               HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
+               return 1;
        }
-       p->strm->pend_pos = NULL;
-       HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+
+       /* the pendconn must be dequeued now */
+       if (p->srv)
+               strm->target = &p->srv->obj_type;
+
+       strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+       strm->flags |= p->strm_flags & (SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET);
+       strm->pend_pos = NULL;
+       HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
        pool_free(pool_head_pendconn, p);
+       return 0;
 }
 
-/* Lock-free version of pendconn_free. */
-static void __pendconn_free(struct pendconn *p)
+/* Release the pending connection <p>, and decreases the pending count if
+ * needed. The connection might have been queued to a specific server as well as
+ * to the proxy. The stream also gets marked unqueued. <p> must always be
+ * defined here. So it is the caller responsibility to check its existance.
+ *
+ * This function must be called by the stream itself, so in the context of
+ * process_stream.
+ */
+void pendconn_free(struct pendconn *p)
 {
+       struct stream *strm = p->strm;
+
+       HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock);
+
+       /* The pendconn was already unlinked, just release it. */
+       if (LIST_ISEMPTY(&p->list))
+               goto release;
+
        if (p->srv) {
+               HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
+               p->srv->nbpend--;
                LIST_DEL(&p->list);
-               HA_ATOMIC_SUB(&p->srv->nbpend, 1);
+               HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
        }
        else {
+               HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock);
+               p->px->nbpend--;
                LIST_DEL(&p->list);
-               HA_ATOMIC_SUB(&p->strm->be->nbpend, 1);
+               HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock);
        }
-       p->strm->pend_pos = NULL;
-       HA_ATOMIC_SUB(&p->strm->be->totpend, 1);
+       HA_ATOMIC_SUB(&p->px->totpend, 1);
+
+  release:
+       strm->pend_pos = NULL;
+       HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock);
        pool_free(pool_head_pendconn, p);
 }
 
index afa9f9933232c78e6a0582ee671b310e46c955a5..38d724006f7c31506a0b853bf24df3cdbe05069a 100644 (file)
@@ -929,7 +929,7 @@ static void sess_update_stream_int(struct stream *s)
        }
        else if (si->state == SI_ST_QUE) {
                /* connection request was queued, check for any update */
-               if (!s->pend_pos) {
+               if (!pendconn_dequeue(s)) {
                        /* The connection is not in the queue anymore. Either
                         * we have a server connection slot available and we
                         * go directly to the assigned state, or we need to