]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: threads/peers: Make peers thread safe
authorEmeric Brun <ebrun@haproxy.com>
Mon, 19 Jun 2017 15:46:37 +0000 (17:46 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:31 +0000 (13:58 +0100)
A lock is used to protect accesses to a peer structure.

A the lock is taken in the applet handler when the peer is identified
and released living the applet handler.

In the scheduling task for peers section, the lock is taken for every
listed peer and released at the end of the process task function.

The peer 'force shutdown' function was also re-worked.

include/common/hathreads.h
include/types/applet.h
include/types/peers.h
src/cfgparse.c
src/peers.c

index 8b32cf6071d54c68e62ae143e1233520d80efa6d..3a77bd175738f8a80118a9080bdc6dd56448c7d0 100644 (file)
@@ -155,6 +155,7 @@ enum lock_label {
        STK_TABLE_LOCK,
        STK_SESS_LOCK,
        APPLETS_LOCK,
+       PEER_LOCK,
        LOCK_LABELS
 };
 struct lock_stat {
@@ -241,7 +242,7 @@ static inline void show_lock_stats()
                                           "TASK_RQ", "TASK_WQ", "POOL",
                                           "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
                                           "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
-                                          "APPLETS" };
+                                          "APPLETS", "PEER" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index e96b703c8ca145815902b91fc7523e46b8164f2c..b56c5631c4fcfb03315a4cc79c43605d06621db9 100644 (file)
@@ -71,7 +71,7 @@ struct appctx {
 
        union {
                struct {
-                       void *ptr;              /* multi-purpose pointer for peers */
+                       void *ptr;              /* current peer or NULL, do not use for something else */
                } peers;                        /* used by the peers applet */
                struct {
                        int connected;
index a77a0942b4ee3c17e89be83ee43460808136d04e..2fc7435c1ca7d3896066bfe12a83efa5db6f5fa8 100644 (file)
@@ -67,6 +67,9 @@ struct peer {
        struct shared_table *remote_table;
        struct shared_table *last_local_table;
        struct shared_table *tables;
+#ifdef USE_THREAD
+       HA_SPINLOCK_T lock;      /* lock used to handle this peer section */
+#endif
        struct peer *next;        /* next peer in the list */
 };
 
index dd49009530ff8ca4c12d5f5d6128dd42c1dd51f6..ca2d5d79dbdf8b506d02e258583fb9525a7ce8f8 100644 (file)
@@ -2039,6 +2039,7 @@ int cfg_parse_peers(const char *file, int linenum, char **args, int kwm)
                newpeer->proto = proto;
                newpeer->xprt  = xprt_get(XPRT_RAW);
                newpeer->sock_init_arg = NULL;
+               SPIN_INIT(&newpeer->lock);
 
                if (strcmp(newpeer->id, localpeer) == 0) {
                        /* Current is local peer, it define a frontend */
index ef332eba233e422a5b6a77aa216d39071b367654..2ca08fe7c5079c339aff1a3ada0f0e44320b5bb4 100644 (file)
@@ -509,6 +509,7 @@ static void peer_session_release(struct appctx *appctx)
 
        /* peer session identified */
        if (peer) {
+               SPIN_LOCK(PEER_LOCK, &peer->lock);
                if (peer->appctx == appctx) {
                        /* Re-init current table pointers to force announcement on re-connect */
                        peer->remote_table = peer->last_local_table = NULL;
@@ -525,6 +526,7 @@ static void peer_session_release(struct appctx *appctx)
                        peer->flags &= PEER_TEACH_RESET;
                        peer->flags &= PEER_LEARN_RESET;
                }
+               SPIN_UNLOCK(PEER_LOCK, &peer->lock);
                task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
        }
 }
@@ -566,6 +568,7 @@ static void peer_io_handler(struct appctx *appctx)
        struct stream_interface *si = appctx->owner;
        struct stream *s = si_strm(si);
        struct peers *curpeers = strm_fe(s)->parent;
+       struct peer *curpeer = NULL;
        int reql = 0;
        int repl = 0;
        size_t proto_len = strlen(PEER_SESSION_PROTO_NAME);
@@ -646,7 +649,6 @@ switchstate:
                                appctx->st0 = PEER_SESS_ST_GETPEER;
                                /* fall through */
                        case PEER_SESS_ST_GETPEER: {
-                               struct peer *curpeer;
                                char *p;
                                reql = co_getline(si_oc(si), trash.str, trash.size);
                                if (reql <= 0) { /* closed or EOL not found */
@@ -689,6 +691,7 @@ switchstate:
                                        goto switchstate;
                                }
 
+                               SPIN_LOCK(PEER_LOCK, &curpeer->lock);
                                if (curpeer->appctx && curpeer->appctx != appctx) {
                                        if (curpeer->local) {
                                                /* Local connection, reply a retry */
@@ -696,6 +699,12 @@ switchstate:
                                                appctx->st1 = PEER_SESS_SC_TRYAGAIN;
                                                goto switchstate;
                                        }
+
+                                       /* we're killing a connection, we must apply a random delay before
+                                        * retrying otherwise the other end will do the same and we can loop
+                                        * for a while.
+                                        */
+                                       curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
                                        peer_session_forceshutdown(curpeer->appctx);
                                }
                                if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
@@ -712,9 +721,16 @@ switchstate:
                                /* fall through */
                        }
                        case PEER_SESS_ST_SENDSUCCESS: {
-                               struct peer *curpeer = appctx->ctx.peers.ptr;
                                struct shared_table *st;
 
+                               if (!curpeer) {
+                                       curpeer = appctx->ctx.peers.ptr;
+                                       SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+                                       if (curpeer->appctx != appctx) {
+                                               appctx->st0 = PEER_SESS_ST_END;
+                                               goto switchstate;
+                                       }
+                               }
                                repl = snprintf(trash.str, trash.size, "%d\n", PEER_SESS_SC_SUCCESSCODE);
                                repl = ci_putblk(si_ic(si), trash.str, repl);
                                if (repl <= 0) {
@@ -767,7 +783,15 @@ switchstate:
                                goto switchstate;
                        }
                        case PEER_SESS_ST_CONNECT: {
-                               struct peer *curpeer = appctx->ctx.peers.ptr;
+
+                               if (!curpeer) {
+                                       curpeer = appctx->ctx.peers.ptr;
+                                       SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+                                       if (curpeer->appctx != appctx) {
+                                               appctx->st0 = PEER_SESS_ST_END;
+                                               goto switchstate;
+                                       }
+                               }
 
                                /* Send headers */
                                repl = snprintf(trash.str, trash.size,
@@ -797,9 +821,17 @@ switchstate:
                                /* fall through */
                        }
                        case PEER_SESS_ST_GETSTATUS: {
-                               struct peer *curpeer = appctx->ctx.peers.ptr;
                                struct shared_table *st;
 
+                               if (!curpeer) {
+                                       curpeer = appctx->ctx.peers.ptr;
+                                       SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+                                       if (curpeer->appctx != appctx) {
+                                               appctx->st0 = PEER_SESS_ST_END;
+                                               goto switchstate;
+                                       }
+                               }
+
                                if (si_ic(si)->flags & CF_WRITE_PARTIAL)
                                        curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
 
@@ -871,7 +903,6 @@ switchstate:
                                /* fall through */
                        }
                        case PEER_SESS_ST_WAITMSG: {
-                               struct peer *curpeer = appctx->ctx.peers.ptr;
                                struct stksess *ts, *newts = NULL;
                                uint32_t msg_len = 0;
                                char *msg_cur = trash.str;
@@ -879,6 +910,15 @@ switchstate:
                                unsigned char msg_head[7];
                                int totl = 0;
 
+                               if (!curpeer) {
+                                       curpeer = appctx->ctx.peers.ptr;
+                                       SPIN_LOCK(PEER_LOCK, &curpeer->lock);
+                                       if (curpeer->appctx != appctx) {
+                                               appctx->st0 = PEER_SESS_ST_END;
+                                               goto switchstate;
+                                       }
+                               }
+
                                reql = co_getblk(si_oc(si), (char *)msg_head, 2*sizeof(unsigned char), totl);
                                if (reql <= 0) /* closed or EOL not found */
                                        goto incomplete;
@@ -1417,6 +1457,7 @@ incomplete:
                                                }
 
                                                if (!(curpeer->flags & PEER_F_TEACH_PROCESS)) {
+                                                       SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
                                                        if (!(curpeer->flags & PEER_F_LEARN_ASSIGN) &&
                                                            ((int)(st->last_pushed - st->table->localupdate) < 0)) {
                                                                struct eb32_node *eb;
@@ -1447,7 +1488,6 @@ incomplete:
 
                                                                /* We force new pushed to 1 to force identifier in update message */
                                                                new_pushed = 1;
-                                                               SPIN_LOCK(STK_TABLE_LOCK, &st->table->lock);
                                                                while (1) {
                                                                        uint32_t msglen;
                                                                        struct stksess *ts;
@@ -1505,8 +1545,8 @@ incomplete:
                                                                        /* identifier may not needed in next update message */
                                                                        new_pushed = 0;
                                                                }
-                                                               SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
                                                        }
+                                                       SPIN_UNLOCK(STK_TABLE_LOCK, &st->table->lock);
                                                }
                                                else {
                                                        if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
@@ -1756,6 +1796,10 @@ incomplete:
                                /* fall through */
                        }
                        case PEER_SESS_ST_END: {
+                               if (curpeer) {
+                                       SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
+                                       curpeer = NULL;
+                               }
                                si_shutw(si);
                                si_shutr(si);
                                si_ic(si)->flags |= CF_READ_NULL;
@@ -1765,6 +1809,9 @@ incomplete:
        }
 out:
        si_oc(si)->flags |= CF_READ_DONTWAIT;
+
+       if (curpeer)
+               SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
        return;
 full:
        si_applet_cant_put(si);
@@ -1783,8 +1830,6 @@ static struct applet peer_applet = {
  */
 static void peer_session_forceshutdown(struct appctx *appctx)
 {
-       struct peer *ps;
-
        /* Note that the peer sessions which have just been created
         * (->st0 == PEER_SESS_ST_CONNECT) must not
         * be shutdown, if not, the TCP session will never be closed
@@ -1797,16 +1842,7 @@ static void peer_session_forceshutdown(struct appctx *appctx)
        if (appctx->applet != &peer_applet)
                return;
 
-       ps = appctx->ctx.peers.ptr;
-       /* we're killing a connection, we must apply a random delay before
-        * retrying otherwise the other end will do the same and we can loop
-        * for a while.
-        */
-       if (ps)
-               ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
-
        appctx->st0 = PEER_SESS_ST_END;
-       appctx->ctx.peers.ptr = NULL;
        appctx_wakeup(appctx);
 }
 
@@ -1922,6 +1958,10 @@ static struct task *process_peer_sync(struct task * task)
                return NULL;
        }
 
+       /* Acquire lock for all peers of the section */
+       for (ps = peers->remote; ps; ps = ps->next)
+               SPIN_LOCK(PEER_LOCK, &ps->lock);
+
        if (!stopping) {
                /* Normal case (not soft stop)*/
 
@@ -2033,6 +2073,11 @@ static struct task *process_peer_sync(struct task * task)
 
                        /* disconnect all connected peers */
                        for (ps = peers->remote; ps; ps = ps->next) {
+                               /* we're killing a connection, we must apply a random delay before
+                                * retrying otherwise the other end will do the same and we can loop
+                                * for a while.
+                                */
+                               ps->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + random() % 2000));
                                if (ps->appctx) {
                                        peer_session_forceshutdown(ps->appctx);
                                        ps->appctx = NULL;
@@ -2086,6 +2131,11 @@ static struct task *process_peer_sync(struct task * task)
                        }
                }
        } /* stopping */
+
+       /* Release lock for all peers of the section */
+       for (ps = peers->remote; ps; ps = ps->next)
+               SPIN_UNLOCK(PEER_LOCK, &ps->lock);
+
        /* Wakeup for re-connect */
        return task;
 }