]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: fd: add the tgid to the fd and pass it to fd_insert()
authorWilly Tarreau <w@1wt.eu>
Tue, 5 Jul 2022 03:16:13 +0000 (05:16 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 15 Jul 2022 17:58:06 +0000 (19:58 +0200)
The file descriptors will need to know the thread group ID in addition
to the mask. This extends fd_insert() to take the tgid, and will store
it into the FD.

In the FD, the tgid is stored as a combination of tgid on the lower 16
bits and a refcount on the higher 16 bits. This allows to know when it's
really possible to trust the tgid and the running mask. If a refcount is
higher than 1 it indeed indicates another thread else might be in the
process of updating these values.

Since a closed FD must necessarily have a zero refcount, a test was
added to fd_insert() to make sure that it is the case.

include/haproxy/fd-t.h
include/haproxy/fd.h
src/dns.c
src/fd.c
src/proto_sockpair.c
src/sock.c
src/sock_inet.c
src/sock_unix.c
src/ssl_sock.c

index de91f17767dad1c9cff1459c50a3c0d19816c58d..cecf797fe39e7187d82e08fb8a48ad4fcc3f44ba 100644 (file)
@@ -153,6 +153,11 @@ struct fdlist {
 
 /* info about one given fd. Note: only align on cache lines when using threads;
  * 32-bit small archs can put everything in 32-bytes when threads are disabled.
+ * refc_tgid is an atomic 32-bit composite value made of 16 higher bits
+ * containing a refcount on tgid and the running_mask, and 16 lower bits
+ * containing a thread group ID. The tgid may only be changed when refc is zero
+ * and running may only be checked/changed when refc is held and shows the
+ * reader is alone. An FD with tgid zero belongs to nobody.
  */
 struct fdtab {
        unsigned long running_mask;          /* mask of thread IDs currently using the fd */
@@ -162,6 +167,7 @@ struct fdtab {
        void (*iocb)(int fd);                /* I/O handler */
        void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
        unsigned int state;                  /* FD state for read and write directions (FD_EV_*) + FD_POLL_* */
+       unsigned int refc_tgid;              /* refcounted tgid, updated atomically */
 #ifdef DEBUG_FD
        unsigned int event_count;            /* number of events reported */
 #endif
index b9f3803b49d238220f1f1f531034704a53e18c17..ac1a41a4867e753e09d80b97ff7736ec89437b26 100644 (file)
@@ -314,6 +314,12 @@ static inline void fd_want_send(int fd)
        updt_fd_polling(fd);
 }
 
+/* returns the tgid from an fd (masks the refcount) */
+static forceinline int fd_tgid(int fd)
+{
+       return _HA_ATOMIC_LOAD(&fdtab[fd].refc_tgid) & 0xFFFF;
+}
+
 /* remove tid_bit from the fd's running mask and returns the bits that remain
  * after the atomic operation.
  */
@@ -322,10 +328,10 @@ static inline long fd_clr_running(int fd)
        return _HA_ATOMIC_AND_FETCH(&fdtab[fd].running_mask, ~tid_bit);
 }
 
-/* Prepares <fd> for being polled on all permitted threads (these will then be
- * refined to only cover running ones).
+/* Prepares <fd> for being polled on all permitted threads of this group ID
+ * (these will then be refined to only cover running ones).
 */
-static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned long thread_mask)
+static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), int tgid, unsigned long thread_mask)
 {
        extern void sock_conn_iocb(int);
 
@@ -335,6 +341,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
        BUG_ON(fd < 0 || fd >= global.maxsock);
        BUG_ON(fdtab[fd].owner != NULL);
        BUG_ON(fdtab[fd].state != 0);
+       BUG_ON(fdtab[fd].refc_tgid != 0);
 
        thread_mask &= all_threads_mask;
        BUG_ON(thread_mask == 0);
@@ -342,6 +349,7 @@ static inline void fd_insert(int fd, void *owner, void (*iocb)(int fd), unsigned
        fdtab[fd].owner = owner;
        fdtab[fd].iocb = iocb;
        fdtab[fd].state = 0;
+       fdtab[fd].refc_tgid = tgid;
 #ifdef DEBUG_FD
        fdtab[fd].event_count = 0;
 #endif
index 75949b59bf80ec7412323cb9fff7390701066629..a77e71fdfb5316c486bdaad42e508e132380f183 100644 (file)
--- a/src/dns.c
+++ b/src/dns.c
@@ -74,7 +74,7 @@ static int dns_connect_nameserver(struct dns_nameserver *ns)
 
        /* Add the fd in the fd list and update its parameters */
        dgram->t.sock.fd = fd;
-       fd_insert(fd, dgram, dgram_fd_handler, all_threads_mask);
+       fd_insert(fd, dgram, dgram_fd_handler, tgid, all_threads_mask);
        fd_want_recv(fd);
        return 0;
 }
index 9757d0ad7d303ecd433e805a93f521b39d8db36a..f93c11719656124e7d9ac08184cf8fed5a2792f5 100644 (file)
--- a/src/fd.c
+++ b/src/fd.c
@@ -329,6 +329,7 @@ void _fd_delete_orphan(int fd)
        polled_mask[fd].poll_recv = polled_mask[fd].poll_send = 0;
 
        fdtab[fd].state = 0;
+       fdtab[fd].refc_tgid = 0;
 
 #ifdef DEBUG_FD
        fdtab[fd].event_count = 0;
@@ -824,8 +825,8 @@ static int init_pollers_per_thread()
        poller_rd_pipe = mypipe[0];
        poller_wr_pipe[tid] = mypipe[1];
        fd_set_nonblock(poller_rd_pipe);
-       fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tid_bit);
-       fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tid_bit);
+       fd_insert(poller_rd_pipe, poller_pipe_io_handler, poller_pipe_io_handler, tgid, tid_bit);
+       fd_insert(poller_wr_pipe[tid], poller_pipe_io_handler, poller_pipe_io_handler, tgid, tid_bit);
        fd_want_recv(poller_rd_pipe);
        fd_stop_both(poller_wr_pipe[tid]);
        return 1;
index 0798283b93a071f35d07416a1bfdfb0202df1a5f..282a094bf36da596cfbd8052f8508a5876f91e68 100644 (file)
@@ -157,7 +157,7 @@ int sockpair_bind_receiver(struct receiver *rx, char **errmsg)
 
        rx->flags |= RX_F_BOUND;
 
-       fd_insert(rx->fd, rx->owner, rx->iocb, rx->bind_thread);
+       fd_insert(rx->fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread);
        return err;
 
  bind_return:
index 602e9c55bd04eae456fafc0d0570103374623162..43e30df1dba86f83cc8dbd1f0af5485f7ced1f7d 100644 (file)
@@ -703,7 +703,7 @@ void sock_accept_iocb(int fd)
 void sock_conn_ctrl_init(struct connection *conn)
 {
        BUG_ON(conn->flags & CO_FL_FDLESS);
-       fd_insert(conn->handle.fd, conn, sock_conn_iocb, tid_bit);
+       fd_insert(conn->handle.fd, conn, sock_conn_iocb, tgid, tid_bit);
 }
 
 /* This completes the release of connection <conn> by removing its FD from the
index 9e1451ad9b745db6dfac3fbbd74124a42a5aa541..d517e4c4237492c57a7e5de21824b1c5bfb3cb45 100644 (file)
@@ -390,7 +390,7 @@ int sock_inet_bind_receiver(struct receiver *rx, char **errmsg)
        rx->fd = fd;
        rx->flags |= RX_F_BOUND;
 
-       fd_insert(fd, rx->owner, rx->iocb, rx->bind_thread);
+       fd_insert(fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread);
 
        /* for now, all regularly bound TCP listeners are exportable */
        if (!(rx->flags & RX_F_INHERITED))
index 026d86c57faac33f403854ca643270090520f5dc..05d9be7a4b26364c5fbcdef5320eecdf84a439f9 100644 (file)
@@ -284,7 +284,7 @@ int sock_unix_bind_receiver(struct receiver *rx, char **errmsg)
        rx->fd = fd;
        rx->flags |= RX_F_BOUND;
 
-       fd_insert(fd, rx->owner, rx->iocb, rx->bind_thread);
+       fd_insert(fd, rx->owner, rx->iocb, rx->bind_tgroup, rx->bind_thread);
 
        /* for now, all regularly bound TCP listeners are exportable */
        if (!(rx->flags & RX_F_INHERITED))
index 3c89aaa8d2107775fa9a33b67a34736c2a1af4a3..00f024023e7efd99cde8eeba025ad1c68af7135e 100644 (file)
@@ -824,7 +824,7 @@ static inline void ssl_async_process_fds(struct ssl_sock_ctx *ctx)
 
        /* We add new fds to the fdtab */
        for (i=0 ; i < num_add_fds ; i++) {
-               fd_insert(add_fd[i], ctx, ssl_async_fd_handler, tid_bit);
+               fd_insert(add_fd[i], ctx, ssl_async_fd_handler, tgid, tid_bit);
        }
 
        num_add_fds = 0;