struct sockaddr_storage addr;
};
+/* This is used to create the accept queue, optimized to be 64 bytes long. */
+struct accept_queue_entry {
+ struct listener *listener; // 8 bytes
+ int fd __attribute__((aligned(8))); // 4 bytes
+ int addr_len; // 4 bytes
+
+ union {
+ sa_family_t family; // 2 bytes
+ struct sockaddr_in in; // 16 bytes
+ struct sockaddr_in6 in6; // 28 bytes
+ } addr; // this is normally 28 bytes
+ /* 20-bytes hole here */
+ char pad0[0] __attribute((aligned(64)));
+};
+
+/* The per-thread accept queue ring, must be a power of two minus 1 */
+#define ACCEPT_QUEUE_SIZE ((1<<8) - 1)
+
+struct accept_queue_ring {
+ unsigned int head;
+ unsigned int tail;
+ struct task *task; /* task of the thread owning this ring */
+ struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
+};
+
+
#endif /* _TYPES_LISTENER_H */
/*
struct xfer_sock_list *xfer_sock_list = NULL;
+#if defined(USE_THREAD)
+
+struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { };
+
+/* dequeue and process a pending connection from the local accept queue (single
+ * consumer). Returns the accepted fd or -1 if none was found. The listener is
+ * placed into *li. The address is copied into *addr for no more than *addr_len
+ * bytes, and the address length is returned into *addr_len.
+ */
+int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len)
+{
+ struct accept_queue_entry *e;
+ unsigned int pos, next;
+ struct listener *ptr;
+ int len;
+ int fd;
+
+ pos = ring->head;
+
+ if (pos == ring->tail)
+ return -1;
+
+ next = pos + 1;
+ if (next >= ACCEPT_QUEUE_SIZE)
+ next = 0;
+
+ e = &ring->entry[pos];
+
+ /* wait for the producer to update the listener's pointer */
+ while (1) {
+ ptr = e->listener;
+ __ha_barrier_load();
+ if (ptr)
+ break;
+ pl_cpu_relax();
+ }
+
+ fd = e->fd;
+ len = e->addr_len;
+ if (len > *addr_len)
+ len = *addr_len;
+
+ if (likely(len > 0))
+ memcpy(addr, &e->addr, len);
+
+ /* release the entry */
+ e->listener = NULL;
+
+ __ha_barrier_store();
+ ring->head = next;
+
+ *addr_len = len;
+ *li = ptr;
+
+ return fd;
+}
+
+
+/* tries to push a new accepted connection <fd> into ring <ring> for listener
+ * <li>, from address <addr> whose length is <addr_len>. Returns non-zero if it
+ * succeeds, or zero if the ring is full. Supports multiple producers.
+ */
+int accept_queue_push_mp(struct accept_queue_ring *ring, int fd,
+ struct listener *li, const void *addr, int addr_len)
+{
+ struct accept_queue_entry *e;
+ unsigned int pos, next;
+
+ pos = ring->tail;
+ do {
+ next = pos + 1;
+ if (next >= ACCEPT_QUEUE_SIZE)
+ next = 0;
+ if (next == ring->head)
+ return 0; // ring full
+ } while (unlikely(!HA_ATOMIC_CAS(&ring->tail, &pos, next)));
+
+
+ e = &ring->entry[pos];
+
+ if (addr_len > sizeof(e->addr))
+ addr_len = sizeof(e->addr);
+
+ if (addr_len)
+ memcpy(&e->addr, addr, addr_len);
+
+ e->addr_len = addr_len;
+ e->fd = fd;
+
+ __ha_barrier_store();
+ /* now commit the change */
+
+ e->listener = li;
+ return 1;
+}
+
+/* proceed with accepting new connections */
+static struct task *accept_queue_process(struct task *t, void *context, unsigned short state)
+{
+ struct accept_queue_ring *ring = context;
+ struct listener *li;
+ struct sockaddr_storage addr;
+ int max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64;
+ int addr_len;
+ int ret;
+ int fd;
+
+ while (max_accept--) {
+ addr_len = sizeof(addr);
+ fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len);
+ if (fd < 0)
+ break;
+
+ HA_ATOMIC_ADD(&li->thr_conn[tid], 1);
+ ret = li->accept(li, fd, &addr);
+ if (ret <= 0) {
+ /* connection was terminated by the application */
+ continue;
+ }
+
+ /* increase the per-process number of cumulated sessions, this
+ * may only be done once l->accept() has accepted the connection.
+ */
+ if (!(li->options & LI_O_UNLIMITED)) {
+ HA_ATOMIC_UPDATE_MAX(&global.sps_max,
+ update_freq_ctr(&global.sess_per_sec, 1));
+ if (li->bind_conf && li->bind_conf->is_ssl) {
+ HA_ATOMIC_UPDATE_MAX(&global.ssl_max,
+ update_freq_ctr(&global.ssl_per_sec, 1));
+ }
+ }
+ }
+
+ /* ran out of budget ? Let's come here ASAP */
+ if (max_accept < 0)
+ task_wakeup(t, TASK_WOKEN_IO);
+
+ return t;
+}
+
+/* Initializes the accept-queues. Returns 0 on success, otherwise ERR_* flags */
+static int accept_queue_init()
+{
+ struct task *t;
+ int i;
+
+ for (i = 0; i < global.nbthread; i++) {
+ t = task_new(1UL << i);
+ if (!t) {
+ ha_alert("Out of memory while initializing accept queue for thread %d\n", i);
+ return ERR_FATAL|ERR_ABORT;
+ }
+ t->process = accept_queue_process;
+ t->context = &accept_queue_rings[i];
+ accept_queue_rings[i].task = t;
+ }
+ return 0;
+}
+
+REGISTER_CONFIG_POSTPARSER("multi-threaded accept queue", accept_queue_init);
+
+#endif // USE_THREAD
+
/* This function adds the specified listener's file descriptor to the polling
* lists if it is in the LI_LISTEN state. The listener enters LI_READY or
* LI_FULL state depending on its number of connections. In deamon mode, we