struct xprt_ops;
struct proxy;
struct fe_counters;
+struct connection;
/* listener state */
enum li_state {
int maxconn; /* maximum connections allowed on this listener */
unsigned int backlog; /* if set, listen backlog */
int maxaccept; /* if set, max number of connections accepted at once (-1 when disabled) */
- int (*accept)(struct listener *l, int fd, struct sockaddr_storage *addr); /* upper layer's accept() */
+ int (*accept)(struct connection *conn); /* upper layer's accept() */
enum obj_type *default_target; /* default target to use for accepted sessions or NULL */
/* cache line boundary */
struct mt_list wait_queue; /* link element to make the listener wait for something (LI_LIMITED) */
struct bind_kw kw[VAR_ARRAY];
};
-/* This is used to create the accept queue, optimized to be 64 bytes long. */
-struct accept_queue_entry {
- struct listener *listener; // 8 bytes
- int fd __attribute__((aligned(8))); // 4 bytes
- int addr_len; // 4 bytes
-
- union {
- sa_family_t family; // 2 bytes
- struct sockaddr_in in; // 16 bytes
- struct sockaddr_in6 in6; // 28 bytes
- } addr; // this is normally 28 bytes
- /* 20-bytes hole here */
- char pad0[0] __attribute((aligned(64)));
-};
-
/* The per-thread accept queue ring, must be a power of two minus 1 */
-#define ACCEPT_QUEUE_SIZE ((1<<8) - 1)
+#define ACCEPT_QUEUE_SIZE ((1<<10) - 1)
struct accept_queue_ring {
unsigned int head;
unsigned int tail;
struct tasklet *tasklet; /* tasklet of the thread owning this ring */
- struct accept_queue_entry entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
+ struct connection *entry[ACCEPT_QUEUE_SIZE] __attribute((aligned(64)));
};
struct accept_queue_ring accept_queue_rings[MAX_THREADS] __attribute__((aligned(64))) = { };
/* dequeue and process a pending connection from the local accept queue (single
- * consumer). Returns the accepted fd or -1 if none was found. The listener is
- * placed into *li. The address is copied into *addr for no more than *addr_len
- * bytes, and the address length is returned into *addr_len.
+ * consumer). Returns the accepted connection or NULL if none was found.
*/
-int accept_queue_pop_sc(struct accept_queue_ring *ring, struct listener **li, void *addr, int *addr_len)
+struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
{
- struct accept_queue_entry *e;
unsigned int pos, next;
- struct listener *ptr;
- int len;
- int fd;
+ struct connection *ptr;
+ struct connection **e;
pos = ring->head;
if (pos == ring->tail)
- return -1;
+ return NULL;
next = pos + 1;
if (next >= ACCEPT_QUEUE_SIZE)
/* wait for the producer to update the listener's pointer */
while (1) {
- ptr = e->listener;
+ ptr = *e;
__ha_barrier_load();
if (ptr)
break;
pl_cpu_relax();
}
- fd = e->fd;
- len = e->addr_len;
- if (len > *addr_len)
- len = *addr_len;
-
- if (likely(len > 0))
- memcpy(addr, &e->addr, len);
-
/* release the entry */
- e->listener = NULL;
+ *e = NULL;
__ha_barrier_store();
ring->head = next;
-
- *addr_len = len;
- *li = ptr;
-
- return fd;
+ return ptr;
}
-/* tries to push a new accepted connection <fd> into ring <ring> for listener
- * <li>, from address <addr> whose length is <addr_len>. Returns non-zero if it
- * succeeds, or zero if the ring is full. Supports multiple producers.
+/* tries to push a new accepted connection <conn> into ring <ring>. Returns
+ * non-zero if it succeeds, or zero if the ring is full. Supports multiple
+ * producers.
*/
-int accept_queue_push_mp(struct accept_queue_ring *ring, int fd,
- struct listener *li, const void *addr, int addr_len)
+int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
{
- struct accept_queue_entry *e;
unsigned int pos, next;
pos = ring->tail;
return 0; // ring full
} while (unlikely(!_HA_ATOMIC_CAS(&ring->tail, &pos, next)));
-
- e = &ring->entry[pos];
-
- if (addr_len > sizeof(e->addr))
- addr_len = sizeof(e->addr);
-
- if (addr_len)
- memcpy(&e->addr, addr, addr_len);
-
- e->addr_len = addr_len;
- e->fd = fd;
-
+ ring->entry[pos] = conn;
__ha_barrier_store();
- /* now commit the change */
-
- e->listener = li;
return 1;
}
static struct task *accept_queue_process(struct task *t, void *context, unsigned short state)
{
struct accept_queue_ring *ring = context;
+ struct connection *conn;
struct listener *li;
- struct sockaddr_storage addr;
unsigned int max_accept;
- int addr_len;
int ret;
- int fd;
/* if global.tune.maxaccept is -1, then max_accept is UINT_MAX. It
* is not really illimited, but it is probably enough.
*/
max_accept = global.tune.maxaccept ? global.tune.maxaccept : 64;
for (; max_accept; max_accept--) {
- addr_len = sizeof(addr);
- fd = accept_queue_pop_sc(ring, &li, &addr, &addr_len);
- if (fd < 0)
+ conn = accept_queue_pop_sc(ring);
+ if (!conn)
break;
+ li = __objt_listener(conn->target);
_HA_ATOMIC_ADD(&li->thr_conn[tid], 1);
- ret = li->accept(li, fd, &addr);
+ ret = li->accept(conn);
if (ret <= 0) {
/* connection was terminated by the application */
continue;
void listener_accept(int fd)
{
struct listener *l = fdtab[fd].owner;
+ struct connection *cli_conn;
struct proxy *p;
unsigned int max_accept;
int next_conn = 0;
if (unlikely(master == 1))
fcntl(cfd, F_SETFD, FD_CLOEXEC);
+ /* we'll have to at least allocate a connection, assign the listener
+ * to conn->target, set the source address, and set the fd.
+ */
+ cli_conn = conn_new(&l->obj_type);
+ if (cli_conn) {
+ cli_conn->handle.fd = cfd;
+ cli_conn->flags |= CO_FL_ADDR_FROM_SET;
+ if (!sockaddr_alloc(&cli_conn->src, &addr, laddr)) {
+ conn_free(cli_conn);
+ cli_conn = NULL;
+ }
+ }
+
+ if (!cli_conn) {
+ /* no more memory, give up! */
+ close(cfd);
+ continue;
+ }
+
/* The connection was accepted, it must be counted as such */
if (l->counters)
HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn);
send_log(p, LOG_EMERG,
"Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n",
p->id);
+ conn_free(cli_conn);
close(cfd);
expire = tick_add(now_ms, 1000); /* try again in 1 second */
goto limit_global;
next_feconn = 0;
next_actconn = 0;
+
#if defined(USE_THREAD)
mask = thread_mask(l->rx.settings->bind_thread) & all_threads_mask;
if (atleast2(mask) && (global.tune.options & GTUNE_LISTENER_MQ) && !stopping) {
* when processing this loop.
*/
ring = &accept_queue_rings[t];
- if (accept_queue_push_mp(ring, cfd, l, &addr, laddr)) {
+ if (accept_queue_push_mp(ring, cli_conn)) {
_HA_ATOMIC_ADD(&activity[t].accq_pushed, 1);
tasklet_wakeup(ring->tasklet);
continue;
#endif // USE_THREAD
_HA_ATOMIC_ADD(&l->thr_conn[tid], 1);
- ret = l->accept(l, cfd, &addr);
+ ret = l->accept(cli_conn);
if (unlikely(ret <= 0)) {
/* The connection was closed by stream_accept(). Either
* we just have to ignore it (ret == 0) or it's a critical
/* This function is called from the protocol layer accept() in order to
* instantiate a new session on behalf of a given listener and frontend. It
* returns a positive value upon success, 0 if the connection can be ignored,
- * or a negative value upon critical failure. The accepted file descriptor is
+ * or a negative value upon critical failure. The accepted connection is
* closed if we return <= 0. If no handshake is needed, it immediately tries
- * to instantiate a new stream. The created connection's owner points to the
- * new session until the upper layers are created.
+ * to instantiate a new stream. The connection must already have been filled
+ * with the incoming connection handle (a fd), a target (the listener) and a
+ * source address.
*/
-int session_accept_fd(struct listener *l, int cfd, struct sockaddr_storage *addr)
+int session_accept_fd(struct connection *cli_conn)
{
- struct connection *cli_conn;
+ struct listener *l = __objt_listener(cli_conn->target);
struct proxy *p = l->bind_conf->frontend;
+ int cfd = cli_conn->handle.fd;
struct session *sess;
int ret;
-
ret = -1; /* assume unrecoverable error by default */
- if (unlikely((cli_conn = conn_new(&l->obj_type)) == NULL))
- goto out_close;
-
- if (!sockaddr_alloc(&cli_conn->src, addr, sizeof(*addr)))
- goto out_free_conn;
-
- cli_conn->handle.fd = cfd;
- cli_conn->flags |= CO_FL_ADDR_FROM_SET;
cli_conn->proxy_netns = l->rx.settings->netns;
conn_prepare(cli_conn, l->rx.proto, l->bind_conf->xprt);
conn_stop_tracking(cli_conn);
conn_xprt_close(cli_conn);
conn_free(cli_conn);
- out_close:
listener_release(l);
if (ret < 0 && l->bind_conf->xprt == xprt_get(XPRT_RAW) &&
p->mode == PR_MODE_HTTP && l->bind_conf->mux_proto == NULL) {