]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: threads/xref: Convert xref function to a thread safe model
authorThierry FOURNIER <thierry.fournier@ozon.io>
Fri, 1 Sep 2017 12:17:32 +0000 (14:17 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:32 +0000 (13:58 +0100)
Ensure that the unlink is done safely between thread and that
the peer struct will not destroy between the usage of the peer.

include/common/xref.h
src/hlua.c

index b020280ef0b13052e3cd706003a9eca976c4c5cf..6dfa7b62758dfaebe12d25f66aaa858dc873a060 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef __XREF_H__
 #define __XREF_H__
 
+#include <common/hathreads.h>
+
 /* xref is used to create relation between two elements.
  * Once an element is released, it breaks the relation. If the
  * relation is already broken, it frees the xref struct.
@@ -13,25 +15,64 @@ struct xref {
        struct xref *peer;
 };
 
+#define XREF_BUSY ((struct xref *)1)
+
 static inline void xref_create(struct xref *xref_a, struct xref *xref_b)
 {
        xref_a->peer = xref_b;
        xref_b->peer = xref_a;
 }
 
-static inline struct xref *xref_get_peer(struct xref *xref)
+static inline struct xref *xref_get_peer_and_lock(struct xref *xref)
 {
-       if (!xref->peer)
-               return NULL;
-       return xref->peer;
+       struct xref *local;
+       struct xref *remote;
+
+       while (1) {
+
+               /* Get the local pointer to the peer. */
+               local = HA_ATOMIC_XCHG(&xref->peer, XREF_BUSY);
+
+               /* If the local pointer is NULL, the peer no longer exists. */
+               if (local == NULL) {
+                       xref->peer = NULL;
+                       return NULL;
+               }
+
+               /* If the local pointeru is BUSY, the peer try to acquire the
+                * lock. We retry the process.
+                */
+               if (local == XREF_BUSY)
+                       continue;
+
+               /* We are locked, the peer cant disapear, try to acquire
+                * the pper's lock. Note that remote can't be NULL.
+                */
+               remote = HA_ATOMIC_XCHG(&local->peer, XREF_BUSY);
+
+               /* The remote lock is BUSY, We retry the process. */
+               if (remote == XREF_BUSY) {
+                       xref->peer = local;
+                       continue;
+               }
+
+               /* We have the lock, we return the value of the xref. */
+               return local;
+       }
 }
 
-static inline void xref_disconnect(struct xref *xref)
+static inline void xref_unlock(struct xref *xref, struct xref *peer)
 {
-       if (!xref->peer)
-               return;
+       /* Release the peer. */
+       peer->peer = xref;
 
-       xref->peer->peer = NULL;
+       /* Release myself. */
+       xref->peer = peer;
+}
+
+static inline void xref_disconnect(struct xref *xref, struct xref *peer)
+{
+       peer->peer = NULL;
        xref->peer = NULL;
 }
 
index 1d14e54863ea91e5142f3ad2b6b52e35cb97da9b..6edb08a83b0195411eab3033d7c38796d9c72b2f 100644 (file)
@@ -1579,8 +1579,12 @@ static void hlua_socket_handler(struct appctx *appctx)
  */
 static void hlua_socket_release(struct appctx *appctx)
 {
+       struct xref *peer;
+
        /* Remove my link in the original object. */
-       xref_disconnect(&appctx->ctx.hlua_cosocket.xref);
+       peer = xref_get_peer_and_lock(&appctx->ctx.hlua_cosocket.xref);
+       if (peer)
+               xref_disconnect(&appctx->ctx.hlua_cosocket.xref, peer);
 
        /* Wake all the task waiting for me. */
        notification_wake(&appctx->ctx.hlua_cosocket.wake_on_read);
@@ -1602,11 +1606,9 @@ __LJMP static int hlua_socket_gc(lua_State *L)
        MAY_LJMP(check_args(L, 1, "__gc"));
 
        socket = MAY_LJMP(hlua_checksocket(L, 1));
-       peer = xref_get_peer(&socket->xref);
-       if (!peer) {
-               xref_disconnect(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
+       if (!peer)
                return 0;
-       }
        appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
 
        /* Set the flag which destroy the session. */
@@ -1614,7 +1616,7 @@ __LJMP static int hlua_socket_gc(lua_State *L)
        appctx_wakeup(appctx);
 
        /* Remove all reference between the Lua stack and the coroutine stream. */
-       xref_disconnect(&socket->xref);
+       xref_disconnect(&socket->xref, peer);
        return 0;
 }
 
@@ -1637,11 +1639,9 @@ __LJMP static int hlua_socket_close(lua_State *L)
        if (socket->tid != tid)
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
-       peer = xref_get_peer(&socket->xref);
-       if (!peer) {
-               xref_disconnect(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
+       if (!peer)
                return 0;
-       }
        appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
 
        /* Set the flag which destroy the session. */
@@ -1649,7 +1649,7 @@ __LJMP static int hlua_socket_close(lua_State *L)
        appctx_wakeup(appctx);
 
        /* Remove all reference between the Lua stack and the coroutine stream. */
-       xref_disconnect(&socket->xref);
+       xref_disconnect(&socket->xref, peer);
        return 0;
 }
 
@@ -1694,11 +1694,9 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
-       if (!peer) {
-               xref_disconnect(&socket->xref);
-               goto connection_closed;
-       }
+       peer = xref_get_peer_and_lock(&socket->xref);
+       if (!peer)
+               goto no_peer;
        appctx = container_of(peer, struct appctx, ctx.hlua_cosocket.xref);
        si = appctx->owner;
        s = si_strm(si);
@@ -1784,10 +1782,15 @@ __LJMP static int hlua_socket_receive_yield(struct lua_State *L, int status, lua
 
        /* Return result. */
        luaL_pushresult(&socket->b);
+       xref_unlock(&socket->xref, peer);
        return 1;
 
 connection_closed:
 
+       xref_unlock(&socket->xref, peer);
+
+no_peer:
+
        /* If the buffer containds data. */
        if (socket->b.n > 0) {
                luaL_pushresult(&socket->b);
@@ -1800,8 +1803,11 @@ connection_closed:
 connection_empty:
 
        appctx = objt_appctx(s->si[0].end);
-       if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task))
+       if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_read, hlua->task)) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "out of memory"));
+       }
+       xref_unlock(&socket->xref, peer);
        WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_receive_yield, TICK_ETERNITY, 0));
        return 0;
 }
@@ -1915,9 +1921,8 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                lua_pushinteger(L, -1);
                return 1;
        }
@@ -1927,6 +1932,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
 
        /* Check for connection close. */
        if (channel_output_closed(&s->req)) {
+               xref_unlock(&socket->xref, peer);
                lua_pushinteger(L, -1);
                return 1;
        }
@@ -1936,8 +1942,10 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
        send_len = buf_len - sent;
 
        /* All the data are sent. */
-       if (sent >= buf_len)
+       if (sent >= buf_len) {
+               xref_unlock(&socket->xref, peer);
                return 1; /* Implicitly return the length sent. */
+       }
 
        /* Check if the buffer is avalaible because HAProxy doesn't allocate
         * the request buffer if its not required.
@@ -1952,8 +1960,10 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
        len = buffer_total_space(s->req.buf);
        if (len <= 0) {
                appctx = objt_appctx(s->si[0].end);
-               if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+               if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+                       xref_unlock(&socket->xref, peer);
                        WILL_LJMP(luaL_error(L, "out of memory"));
+               }
                goto hlua_socket_write_yield_return;
        }
 
@@ -1974,6 +1984,7 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
                MAY_LJMP(hlua_socket_close(L));
                lua_pop(L, 1);
                lua_pushinteger(L, -1);
+               xref_unlock(&socket->xref, peer);
                return 1;
        }
 
@@ -1989,10 +2000,13 @@ static int hlua_socket_write_yield(struct lua_State *L,int status, lua_KContext
        lua_pushinteger(L, sent + len);
 
        /* All the data buffer is sent ? */
-       if (sent + len >= buf_len)
+       if (sent + len >= buf_len) {
+               xref_unlock(&socket->xref, peer);
                return 1;
+       }
 
 hlua_socket_write_yield_return:
+       xref_unlock(&socket->xref, peer);
        WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_write_yield, TICK_ETERNITY, 0));
        return 0;
 }
@@ -2131,6 +2145,7 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
        struct appctx *appctx;
        struct stream_interface *si;
        struct stream *s;
+       int ret;
 
        MAY_LJMP(check_args(L, 1, "getpeername"));
 
@@ -2143,9 +2158,8 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                lua_pushnil(L);
                return 1;
        }
@@ -2155,17 +2169,21 @@ __LJMP static int hlua_socket_getpeername(struct lua_State *L)
 
        conn = objt_conn(s->si[1].end);
        if (!conn) {
+               xref_unlock(&socket->xref, peer);
                lua_pushnil(L);
                return 1;
        }
 
        conn_get_to_addr(conn);
        if (!(conn->flags & CO_FL_ADDR_TO_SET)) {
+               xref_unlock(&socket->xref, peer);
                lua_pushnil(L);
                return 1;
        }
 
-       return MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+       ret = MAY_LJMP(hlua_socket_info(L, &conn->addr.to));
+       xref_unlock(&socket->xref, peer);
+       return ret;
 }
 
 /* Returns information about my connection side. */
@@ -2177,6 +2195,7 @@ static int hlua_socket_getsockname(struct lua_State *L)
        struct xref *peer;
        struct stream_interface *si;
        struct stream *s;
+       int ret;
 
        MAY_LJMP(check_args(L, 1, "getsockname"));
 
@@ -2189,9 +2208,8 @@ static int hlua_socket_getsockname(struct lua_State *L)
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                lua_pushnil(L);
                return 1;
        }
@@ -2201,17 +2219,21 @@ static int hlua_socket_getsockname(struct lua_State *L)
 
        conn = objt_conn(s->si[1].end);
        if (!conn) {
+               xref_unlock(&socket->xref, peer);
                lua_pushnil(L);
                return 1;
        }
 
        conn_get_from_addr(conn);
        if (!(conn->flags & CO_FL_ADDR_FROM_SET)) {
+               xref_unlock(&socket->xref, peer);
                lua_pushnil(L);
                return 1;
        }
 
-       return hlua_socket_info(L, &conn->addr.from);
+       ret = hlua_socket_info(L, &conn->addr.from);
+       xref_unlock(&socket->xref, peer);
+       return ret;
 }
 
 /* This struct define the applet. */
@@ -2238,9 +2260,8 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                lua_pushnil(L);
                lua_pushstring(L, "Can't connect");
                return 2;
@@ -2252,11 +2273,14 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
        /* Check if we run on the same thread than the xreator thread.
         * We cannot access to the socket if the thread is different.
         */
-       if (socket->tid != tid)
+       if (socket->tid != tid) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
+       }
 
        /* Check for connection close. */
        if (!hlua || channel_output_closed(&s->req)) {
+               xref_unlock(&socket->xref, peer);
                lua_pushnil(L);
                lua_pushstring(L, "Can't connect");
                return 2;
@@ -2266,12 +2290,16 @@ __LJMP static int hlua_socket_connect_yield(struct lua_State *L, int status, lua
 
        /* Check for connection established. */
        if (appctx->ctx.hlua_cosocket.connected) {
+               xref_unlock(&socket->xref, peer);
                lua_pushinteger(L, 1);
                return 1;
        }
 
-       if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+       if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "out of memory error"));
+       }
+       xref_unlock(&socket->xref, peer);
        WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
        return 0;
 }
@@ -2308,9 +2336,8 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
                port = MAY_LJMP(luaL_checkinteger(L, 3));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                lua_pushnil(L);
                return 1;
        }
@@ -2320,29 +2347,39 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
 
        /* Initialise connection. */
        conn = si_alloc_conn(&s->si[1]);
-       if (!conn)
+       if (!conn) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "connect: internal error"));
+       }
 
        /* needed for the connection not to be closed */
        conn->target = s->target;
 
        /* Parse ip address. */
        addr = str2sa_range(ip, NULL, &low, &high, NULL, NULL, NULL, 0);
-       if (!addr)
+       if (!addr) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "connect: cannot parse destination address '%s'", ip));
-       if (low != high)
+       }
+       if (low != high) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "connect: port ranges not supported : address '%s'", ip));
+       }
        memcpy(&conn->addr.to, addr, sizeof(struct sockaddr_storage));
 
        /* Set port. */
        if (low == 0) {
                if (conn->addr.to.ss_family == AF_INET) {
-                       if (port == -1)
+                       if (port == -1) {
+                               xref_unlock(&socket->xref, peer);
                                WILL_LJMP(luaL_error(L, "connect: port missing"));
+                       }
                        ((struct sockaddr_in *)&conn->addr.to)->sin_port = htons(port);
                } else if (conn->addr.to.ss_family == AF_INET6) {
-                       if (port == -1)
+                       if (port == -1) {
+                               xref_unlock(&socket->xref, peer);
                                WILL_LJMP(luaL_error(L, "connect: port missing"));
+                       }
                        ((struct sockaddr_in6 *)&conn->addr.to)->sin6_port = htons(port);
                }
        }
@@ -2359,8 +2396,11 @@ __LJMP static int hlua_socket_connect(struct lua_State *L)
 
        hlua->flags |= HLUA_MUST_GC;
 
-       if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task))
+       if (!notification_new(&hlua->com, &appctx->ctx.hlua_cosocket.wake_on_write, hlua->task)) {
+               xref_unlock(&socket->xref, peer);
                WILL_LJMP(luaL_error(L, "out of memory"));
+       }
+       xref_unlock(&socket->xref, peer);
        WILL_LJMP(hlua_yieldk(L, 0, 0, hlua_socket_connect_yield, TICK_ETERNITY, 0));
 
        return 0;
@@ -2379,9 +2419,8 @@ __LJMP static int hlua_socket_connect_ssl(struct lua_State *L)
        socket  = MAY_LJMP(hlua_checksocket(L, 1));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                lua_pushnil(L);
                return 1;
        }
@@ -2390,6 +2429,7 @@ __LJMP static int hlua_socket_connect_ssl(struct lua_State *L)
        s = si_strm(si);
 
        s->target = &socket_ssl.obj_type;
+       xref_unlock(&socket->xref, peer);
        return MAY_LJMP(hlua_socket_connect(L));
 }
 #endif
@@ -2420,9 +2460,8 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L)
                WILL_LJMP(luaL_error(L, "connect: cannot use socket on other thread"));
 
        /* check for connection break. If some data where read, return it. */
-       peer = xref_get_peer(&socket->xref);
+       peer = xref_get_peer_and_lock(&socket->xref);
        if (!peer) {
-               xref_disconnect(&socket->xref);
                hlua_pusherror(L, "socket: not yet initialised, you can't set timeouts.");
                WILL_LJMP(lua_error(L));
                return 0;
@@ -2435,6 +2474,7 @@ __LJMP static int hlua_socket_settimeout(struct lua_State *L)
        s->req.wto = tmout;
        s->res.rto = tmout;
        s->res.wto = tmout;
+       xref_unlock(&socket->xref, peer);
 
        return 0;
 }