]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: listener: implement multi-queue accept for threads
authorWilly Tarreau <w@1wt.eu>
Sun, 27 Jan 2019 14:37:19 +0000 (15:37 +0100)
committerWilly Tarreau <w@1wt.eu>
Wed, 27 Feb 2019 13:27:07 +0000 (14:27 +0100)
There is one point where we can migrate a connection to another thread
without taking risk, it's when we accept it : the new FD is not yet in
the fd cache and no task was created yet. It's still possible to assign
it a different thread than the one which accepted the connection. The
only requirement for this is to have one accept queue per thread and
their respective processing tasks that have to be woken up each time
an entry is added to the queue.

This is a multiple-producer, single-consumer model. Entries are added
at the queue's tail and the processing task is woken up. The consumer
picks entries at the head and processes them in order. The accept queue
contains the fd, the source address, and the listener. Each entry of
the accept queue was rounded up to 64 bytes (one cache line) to avoid
cache aliasing because tests have shown that otherwise performance
suffers a lot (5%). A test has shown that it's important to have at
least 256 entries for the rings, as at 128 it's still possible to fill
them often at high loads on small thread counts.

The processing task does almost nothing except calling the listener's
accept() function and updating the global session and SSL rate counters
just like listener_accept() does on synchronous calls.

At this point the accept queue is implemented but not used.

include/proto/listener.h
include/types/listener.h
src/listener.c

index 3a6762ccbd73f66699a4368af5e80ebfcb43ff55..8ec41af115e1a1830358b877acd5654db0c9ec86 100644 (file)
@@ -169,6 +169,9 @@ static inline const char *listener_state_str(const struct listener *l)
 }
 
 extern struct xfer_sock_list *xfer_sock_list;
+
+extern struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64)));
+
 #endif /* _PROTO_LISTENER_H */
 
 /*
index 8e9db71dc35e4d43179dd987ff3fd11268fce295..876d3c3722c4fbeaccbdb54a724f500735a4179c 100644 (file)
@@ -272,6 +272,32 @@ struct xfer_sock_list {
        struct sockaddr_storage addr;
 };
 
+/* This is used to create the accept queue, optimized to be 64 bytes long. */
+struct accept_queue_entry {
+       struct listener *listener;          // 8 bytes
+       int fd __attribute__((aligned(8))); // 4 bytes
+       int addr_len;                       // 4 bytes
+
+       union {
+               sa_family_t family;         // 2 bytes
+               struct sockaddr_in in;      // 16 bytes
+               struct sockaddr_in6 in6;    // 28 bytes
+       } addr; // this is normally 28 bytes
+       /* 20-bytes hole here */
+       char pad0[0] __attribute((aligned(64)));
+};
+
+/* The per-thread accept queue ring, must be a power of two minus 1 */
+#define ACCEPT_QUEUE_SIZE ((1<<8) - 1)
+
+struct accept_queue_ring {
+       unsigned int head;
+       unsigned int tail;
+       struct task *task;  /* task of the thread owning this ring */
+       struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
+};
+
+
 #endif /* _TYPES_LISTENER_H */
 
 /*
index fb527ab16d60de9866cef68b275fbfe9d5de7a86..9e8c87818dc0d545266e2af281ddf7348003a6f1 100644 (file)
@@ -52,6 +52,169 @@ static struct bind_kw_list bind_keywords = {
 
 struct xfer_sock_list *xfer_sock_list = NULL;
 
+#if defined(USE_THREAD)
+
+struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { };
+
+/* dequeue and process a pending connection from the local accept queue (single
+ * consumer). Returns the accepted fd or -1 if none was found. The listener is
+ * placed into *li. The address is copied into *addr for no more than *addr_len
+ * bytes, and the address length is returned into *addr_len.
+ */
+int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len)
+{
+       struct accept_queue_entry *e;
+       unsigned int pos, next;
+       struct listener *ptr;
+       int len;
+       int fd;
+
+       pos = ring->head;
+
+       if (pos == ring->tail)
+               return -1;
+
+       next = pos + 1;
+       if (next >= ACCEPT_QUEUE_SIZE)
+               next = 0;
+
+       e = &ring->entry[pos];
+
+       /* wait for the producer to update the listener's pointer */
+       while (1) {
+               ptr = e->listener;
+               __ha_barrier_load();
+               if (ptr)
+                       break;
+               pl_cpu_relax();
+       }
+
+       fd = e->fd;
+       len = e->addr_len;
+       if (len > *addr_len)
+               len = *addr_len;
+
+       if (likely(len > 0))
+               memcpy(addr, &e->addr, len);
+
+       /* release the entry */
+       e->listener = NULL;
+
+       __ha_barrier_store();
+       ring->head = next;
+
+       *addr_len = len;
+       *li = ptr;
+
+       return fd;
+}
+
+
+/* tries to push a new accepted connection <fd> into ring <ring> for listener
+ * <li>, from address <addr> whose length is <addr_len>. Returns non-zero if it
+ * succeeds, or zero if the ring is full. Supports multiple producers.
+ */
+int accept_queue_push_mp(struct accept_queue_ring *ring, int fd,
+                         struct listener *li, const void *addr, int addr_len)
+{
+       struct accept_queue_entry *e;
+       unsigned int pos, next;
+
+       pos = ring->tail;
+       do {
+               next = pos + 1;
+               if (next >= ACCEPT_QUEUE_SIZE)
+                       next = 0;
+               if (next == ring->head)
+                       return 0; // ring full
+       } while (unlikely(!HA_ATOMIC_CAS(&ring->tail, &pos, next)));
+
+
+       e = &ring->entry[pos];
+
+       if (addr_len > sizeof(e->addr))
+               addr_len = sizeof(e->addr);
+
+       if (addr_len)
+               memcpy(&e->addr, addr, addr_len);
+
+       e->addr_len = addr_len;
+       e->fd = fd;
+
+       __ha_barrier_store();
+       /* now commit the change */
+
+       e->listener = li;
+       return 1;
+}
+
+/* proceed with accepting new connections */
+static struct task *accept_queue_process(struct task *t, void *context, unsigned short state)
+{
+       struct accept_queue_ring *ring = context;
+       struct listener *li;
+       struct sockaddr_storage addr;
+       int max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64;
+       int addr_len;
+       int ret;
+       int fd;
+
+       while (max_accept--) {
+               addr_len = sizeof(addr);
+               fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len);
+               if (fd < 0)
+                       break;
+
+               HA_ATOMIC_ADD(&li->thr_conn[tid], 1);
+               ret = li->accept(li, fd, &addr);
+               if (ret <= 0) {
+                       /* connection was terminated by the application */
+                       continue;
+               }
+
+               /* increase the per-process number of cumulated sessions, this
+                * may only be done once l->accept() has accepted the connection.
+                */
+               if (!(li->options & LI_O_UNLIMITED)) {
+                       HA_ATOMIC_UPDATE_MAX(&global.sps_max,
+                                            update_freq_ctr(&global.sess_per_sec, 1));
+                       if (li->bind_conf && li->bind_conf->is_ssl) {
+                               HA_ATOMIC_UPDATE_MAX(&global.ssl_max,
+                                                    update_freq_ctr(&global.ssl_per_sec, 1));
+                       }
+               }
+       }
+
+       /* ran out of budget ? Let's come here ASAP */
+       if (max_accept < 0)
+               task_wakeup(t, TASK_WOKEN_IO);
+
+       return t;
+}
+
+/* Initializes the accept-queues. Returns 0 on success, otherwise ERR_* flags */
+static int accept_queue_init()
+{
+       struct task *t;
+       int i;
+
+       for (i = 0; i < global.nbthread; i++) {
+               t = task_new(1UL << i);
+               if (!t) {
+                       ha_alert("Out of memory while initializing accept queue for thread %d\n", i);
+                       return ERR_FATAL|ERR_ABORT;
+               }
+               t->process = accept_queue_process;
+               t->context = &accept_queue_rings[i];
+               accept_queue_rings[i].task = t;
+       }
+       return 0;
+}
+
+REGISTER_CONFIG_POSTPARSER("multi-threaded accept queue", accept_queue_init);
+
+#endif // USE_THREAD
+
 /* This function adds the specified listener's file descriptor to the polling
  * lists if it is in the LI_LISTEN state. The listener enters LI_READY or
  * LI_FULL state depending on its number of connections. In deamon mode, we