]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: threads/applet: Handle multithreading for applets
authorEmeric Brun <ebrun@haproxy.com>
Mon, 19 Jun 2017 10:38:55 +0000 (12:38 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:31 +0000 (13:58 +0100)
A global lock has been added to protect accesses to the list of active
applets. A process mask has also been added on each applet. Like for FDs and
tasks, it is used to know which threads are allowed to process an
applet. Because applets are, most of time, linked to a session, it should be
sticky on the same thread. But in all cases, it is the responsibility of the
applet handler to lock what have to be protected in the applet context.

include/common/hathreads.h
include/proto/applet.h
include/proto/stream_interface.h
src/applet.c
src/flt_spoe.c
src/hlua.c
src/peers.c

index e997ea3a9e5a4ea0dac86b01f0abb99a11ff38a6..8b32cf6071d54c68e62ae143e1233520d80efa6d 100644 (file)
@@ -154,6 +154,7 @@ enum lock_label {
        SIGNALS_LOCK,
        STK_TABLE_LOCK,
        STK_SESS_LOCK,
+       APPLETS_LOCK,
        LOCK_LABELS
 };
 struct lock_stat {
@@ -239,7 +240,8 @@ static inline void show_lock_stats()
        const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
                                           "TASK_RQ", "TASK_WQ", "POOL",
                                           "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
-                                          "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS" };
+                                          "UPDATED_SERVERS", "LBPRM", "SIGNALS", "STK_TABLE", "STK_SESS",
+                                          "APPLETS" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index 3cf8578c1e40e9614912468c59375c7526ace9ce..d9f0ce2dff8d9f5fbbbe5784baa59868c9801c4f 100644 (file)
@@ -31,7 +31,9 @@
 
 extern unsigned int nb_applets;
 extern unsigned int applets_active_queue;
-
+#ifdef USE_THREAD
+extern HA_SPINLOCK_T applet_active_lock;
+#endif
 extern struct list applet_active_queue;
 
 void applet_run_active();
@@ -44,10 +46,11 @@ static int inline appctx_res_wakeup(struct appctx *appctx);
  * minimum acceptable initialization for an appctx. This means only the
  * 3 integer states st0, st1, st2 are zeroed.
  */
-static inline void appctx_init(struct appctx *appctx)
+static inline void appctx_init(struct appctx *appctx, unsigned long thread_mask)
 {
        appctx->st0 = appctx->st1 = appctx->st2 = 0;
        appctx->io_release = NULL;
+       appctx->process_mask = thread_mask;
        appctx->state = APPLET_SLEEPING;
 }
 
@@ -56,7 +59,7 @@ static inline void appctx_init(struct appctx *appctx)
  * pool_free2(connection) or appctx_free(), since it's allocated from the
  * connection pool. <applet> is assigned as the applet, but it can be NULL.
  */
-static inline struct appctx *appctx_new(struct applet *applet)
+static inline struct appctx *appctx_new(struct applet *applet, unsigned long thread_mask)
 {
        struct appctx *appctx;
 
@@ -64,12 +67,12 @@ static inline struct appctx *appctx_new(struct applet *applet)
        if (likely(appctx != NULL)) {
                appctx->obj_type = OBJ_TYPE_APPCTX;
                appctx->applet = applet;
-               appctx_init(appctx);
+               appctx_init(appctx, thread_mask);
                LIST_INIT(&appctx->runq);
                LIST_INIT(&appctx->buffer_wait.list);
                appctx->buffer_wait.target = appctx;
                appctx->buffer_wait.wakeup_cb = (int (*)(void *))appctx_res_wakeup;
-               nb_applets++;
+               HA_ATOMIC_ADD(&nb_applets, 1);
        }
        return appctx;
 }
@@ -83,20 +86,25 @@ static inline void __appctx_free(struct appctx *appctx)
                LIST_DEL(&appctx->runq);
                applets_active_queue--;
        }
+
        if (!LIST_ISEMPTY(&appctx->buffer_wait.list)) {
                LIST_DEL(&appctx->buffer_wait.list);
                LIST_INIT(&appctx->buffer_wait.list);
        }
+
        pool_free2(pool2_connection, appctx);
-       nb_applets--;
+       HA_ATOMIC_SUB(&nb_applets, 1);
 }
 static inline void appctx_free(struct appctx *appctx)
 {
+       SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
        if (appctx->state & APPLET_RUNNING) {
                appctx->state |= APPLET_WANT_DIE;
+               SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
                return;
        }
        __appctx_free(appctx);
+       SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
 }
 
 /* wakes up an applet when conditions have changed */
@@ -110,11 +118,14 @@ static inline void __appctx_wakeup(struct appctx *appctx)
 
 static inline void appctx_wakeup(struct appctx *appctx)
 {
+       SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
        if (appctx->state & APPLET_RUNNING) {
                appctx->state |= APPLET_WOKEN_UP;
+               SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
                return;
        }
        __appctx_wakeup(appctx);
+       SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
 }
 
 /* Callback used to wake up an applet when a buffer is available. The applet
@@ -124,18 +135,23 @@ static inline void appctx_wakeup(struct appctx *appctx)
  * requested */
 static inline int appctx_res_wakeup(struct appctx *appctx)
 {
+       SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
        if (appctx->state & APPLET_RUNNING) {
                if (appctx->state & APPLET_WOKEN_UP) {
+                       SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
                        return 0;
                }
                appctx->state |= APPLET_WOKEN_UP;
+               SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
                return 1;
        }
 
        if (!LIST_ISEMPTY(&appctx->runq)) {
+               SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
                return 0;
        }
        __appctx_wakeup(appctx);
+       SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
        return 1;
 }
 
index 70a323425de295c74051d927eece2206285e71fe..c6578ef07c29fde244bd23016c2ab38293a692fb 100644 (file)
@@ -304,7 +304,7 @@ static inline struct appctx *si_alloc_appctx(struct stream_interface *si, struct
        struct appctx *appctx;
 
        si_release_endpoint(si);
-       appctx = appctx_new(applet);
+       appctx = appctx_new(applet, tid_bit);
        if (appctx)
                si_attach_appctx(si, appctx);
 
index 4e70d8c029b26519ae1d11d1b1dc0974cdbe2761..b0783e6a639b2f3419eaa3224d99d62ffaa80a47 100644 (file)
 unsigned int nb_applets = 0;
 unsigned int applets_active_queue = 0;
 
+#ifdef USE_THREAD
+HA_SPINLOCK_T applet_active_lock;        /* spin lock related to applet active queue */
+#endif
+
 struct list applet_active_queue = LIST_HEAD_INIT(applet_active_queue);
 
 void applet_run_active()
@@ -34,16 +38,22 @@ void applet_run_active()
        if (!applets_active_queue)
                return;
 
+       SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
+
        curr = LIST_NEXT(&applet_active_queue, typeof(curr), runq);
        while (&curr->runq != &applet_active_queue) {
                next = LIST_NEXT(&curr->runq, typeof(next), runq);
-               LIST_DEL(&curr->runq);
-               curr->state = APPLET_RUNNING;
-               LIST_ADDQ(&applet_cur_queue, &curr->runq);
-               applets_active_queue--;
+               if (curr->process_mask & (1UL << tid)) {
+                       LIST_DEL(&curr->runq);
+                       curr->state = APPLET_RUNNING;
+                       LIST_ADDQ(&applet_cur_queue, &curr->runq);
+                       applets_active_queue--;
+               }
                curr = next;
        }
 
+       SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
+
        /* The list is only scanned from the head. This guarantees that if any
         * applet removes another one, there is no side effect while walking
         * through the list.
@@ -74,6 +84,7 @@ void applet_run_active()
                        /* curr was left in the list, move it back to the active list */
                        LIST_DEL(&curr->runq);
                        LIST_INIT(&curr->runq);
+                       SPIN_LOCK(APPLETS_LOCK, &applet_active_lock);
                        if (curr->state & APPLET_WANT_DIE) {
                                curr->state = APPLET_SLEEPING;
                                __appctx_free(curr);
@@ -87,6 +98,13 @@ void applet_run_active()
                                        curr->state = APPLET_SLEEPING;
                                }
                        }
+                       SPIN_UNLOCK(APPLETS_LOCK, &applet_active_lock);
                }
        }
 }
+
+__attribute__((constructor))
+static void __applet_init(void)
+{
+       SPIN_INIT(&applet_active_lock);
+}
index 9543f8fe244becc1160aaa349a128eeea2a40787..aa3f37a103a30de4c9dfcf967a5b5157d3532468 100644 (file)
@@ -1930,7 +1930,7 @@ spoe_create_appctx(struct spoe_config *conf)
        struct session     *sess;
        struct stream      *strm;
 
-       if ((appctx = appctx_new(&spoe_applet)) == NULL)
+       if ((appctx = appctx_new(&spoe_applet, tid_bit)) == NULL)
                goto out_error;
 
        appctx->ctx.spoe.ptr = pool_alloc_dirty(pool2_spoe_appctx);
index 36b1b3ff4df4044b0473186d212b3f0c3d8f5899..d5d07de3472043faa4fc8d411b478e82ba6816d5 100644 (file)
@@ -2376,7 +2376,7 @@ __LJMP static int hlua_socket_new(lua_State *L)
        lua_setmetatable(L, -2);
 
        /* Create the applet context */
-       appctx = appctx_new(&update_applet);
+       appctx = appctx_new(&update_applet, MAX_THREADS_MASK);
        if (!appctx) {
                hlua_pusherror(L, "socket: out of memory");
                goto out_fail_conf;
index 25f1ba32b7c864353276c2d6158a9f671f8353ec..ef332eba233e422a5b6a77aa216d39071b367654 100644 (file)
@@ -1839,7 +1839,7 @@ static struct appctx *peer_session_create(struct peers *peers, struct peer *peer
        peer->statuscode = PEER_SESS_SC_CONNECTCODE;
        s = NULL;
 
-       appctx = appctx_new(&peer_applet);
+       appctx = appctx_new(&peer_applet, tid_bit);
        if (!appctx)
                goto out_close;