enum lock_label {
THREAD_SYNC_LOCK = 0,
+ FDTAB_LOCK,
+ FDCACHE_LOCK,
+ FD_LOCK,
+ POLL_LOCK,
POOL_LOCK,
LOCK_LABELS
};
static inline void show_lock_stats()
{
- const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "POOL"};
+ const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
+ "POOL" };
int lbl;
for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
#include <unistd.h>
#include <common/config.h>
+
#include <types/fd.h>
/* public variables */
+
extern unsigned int *fd_cache; // FD events cache
-extern unsigned int *fd_updt; // FD updates list
extern int fd_cache_num; // number of events in the cache
-extern int fd_nbupdt; // number of updates in the list
+
+extern THREAD_LOCAL int *fd_updt; // FD updates list
+extern THREAD_LOCAL int fd_nbupdt; // number of updates in the list
+
+#ifdef USE_THREAD
+HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */
+HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */
+HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */
+#endif
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
*/
static inline void fd_alloc_cache_entry(const int fd)
{
+ RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
if (fdtab[fd].cache)
- return;
+ goto end;
fd_cache_num++;
fdtab[fd].cache = fd_cache_num;
fd_cache[fd_cache_num-1] = fd;
+ end:
+ RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Removes entry used by fd <fd> from the FD cache and replaces it with the
{
unsigned int pos;
+ RWLOCK_WRLOCK(FDCACHE_LOCK, &fdcache_lock);
pos = fdtab[fd].cache;
if (!pos)
- return;
+ goto end;
fdtab[fd].cache = 0;
fd_cache_num--;
if (likely(pos <= fd_cache_num)) {
fd_cache[pos - 1] = fd;
fdtab[fd].cache = pos;
}
+ end:
+ RWLOCK_WRUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* Computes the new polled status based on the active and ready statuses, for
/* Disable processing recv events on fd <fd> */
static inline void fd_stop_recv(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing send events on fd <fd> */
static inline void fd_stop_send(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_send_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable processing of events on fd <fd> for both directions. */
static inline void fd_stop_both(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_active(fd)) {
fdtab[fd].state &= ~FD_EV_ACTIVE_RW;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot receive anymore without polling (EAGAIN detected). */
static inline void fd_cant_recv(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can receive anymore without polling. */
static inline void fd_may_recv(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_recv_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Disable readiness when polled. This is useful to interrupt reading when it
*/
static inline void fd_done_recv(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_recv_polled(fd) && fd_recv_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> cannot send anymore without polling (EAGAIN detected). */
static inline void fd_cant_send(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fd_send_ready(fd)) {
fdtab[fd].state &= ~FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Report that FD <fd> can send anymore without polling (EAGAIN detected). */
static inline void fd_may_send(const int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_send_ready(fd)) {
fdtab[fd].state |= FD_EV_READY_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to receive */
static inline void fd_want_recv(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_recv_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_R;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Prepare FD <fd> to try to send */
static inline void fd_want_send(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (!fd_send_active(fd)) {
fdtab[fd].state |= FD_EV_ACTIVE_W;
fd_update_cache(fd); /* need an update entry to change the state */
}
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
}
/* Update events seen for FD <fd> and its state if needed. This should be called
* by the poller to set FD_POLL_* flags. */
static inline void fd_update_events(int fd, int evts)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev &= FD_POLL_STICKY;
fdtab[fd].ev |= evts;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR))
fd_may_recv(fd);
/* Prepares <fd> for being polled */
static inline void fd_insert(int fd)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].ev = 0;
fdtab[fd].new = 1;
fdtab[fd].updated = 0;
fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0;
+ fdtab[fd].cache = 0;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+
+ SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
if (fd + 1 > maxfd)
maxfd = fd + 1;
+ SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
}
#define _TYPES_FD_H
#include <common/config.h>
+#include <common/hathreads.h>
#include <types/port_range.h>
/* Direction for each FD event update */
struct fdtab {
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
+#ifdef USE_THREAD
+ HA_SPINLOCK_T lock;
+#endif
unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */
/* private data */
-static struct epoll_event *epoll_events;
+static THREAD_LOCAL struct epoll_event *epoll_events = NULL;
static int epoll_fd;
/* This structure may be used for any purpose. Warning! do not use it in
* recursive functions !
*/
-static struct epoll_event ev;
+static THREAD_LOCAL struct epoll_event ev;
#ifndef EPOLLRDHUP
/* EPOLLRDHUP was defined late in libc, and it appeared in kernel 2.6.17 */
*/
REGPRM1 static void __fd_clo(int fd)
{
- if (unlikely(fdtab[fd].cloned)) {
+ if (unlikely(fdtab[fd].cloned))
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &ev);
- }
}
/*
/* first, scan the update list to find polling changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
- fdtab[fd].state = en;
if ((en & FD_EV_POLLED_RW) == 0) {
/* fd removed from poll list */
ev.events |= EPOLLOUT;
ev.data.fd = fd;
+
epoll_ctl(epoll_fd, opcode, fd, &ev);
}
}
/* always remap RDHUP to HUP as they're used similarly */
if (e & EPOLLRDHUP) {
- cur_poller.flags |= HAP_POLL_F_RDHUP;
+ HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
n |= FD_POLL_HUP;
}
fd_update_events(fd, n);
/* the caller will take care of cached events */
}
+static int init_epoll_per_thread()
+{
+ epoll_events = calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
+ if (epoll_events == NULL)
+ return 0;
+ return 1;
+}
+
+static void deinit_epoll_per_thread()
+{
+ free(epoll_events);
+}
+
/*
* Initialization of the epoll() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
if (epoll_fd < 0)
goto fail_fd;
- epoll_events = (struct epoll_event*)
- calloc(1, sizeof(struct epoll_event) * global.tune.maxpollevents);
-
- if (epoll_events == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_epoll_per_thread);
+ hap_register_per_thread_deinit(deinit_epoll_per_thread);
+ }
+ else if (!init_epoll_per_thread())
goto fail_ee;
return 1;
/* private data */
static int kqueue_fd;
-static struct kevent *kev = NULL;
+static THREAD_LOCAL struct kevent *kev = NULL;
/*
* kqueue() poller
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed */
- fdtab[fd].state = en;
-
if ((eo ^ en) & FD_EV_POLLED_R) {
/* read poll status changed */
if (en & FD_EV_POLLED_R) {
}
}
+
+static int init_kqueue_per_thread()
+{
+ /* we can have up to two events per fd (*/
+ kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
+ if (kev == NULL)
+ return 0;
+ return 1;
+}
+
+static void deinit_kqueue_per_thread()
+{
+ free(kev);
+}
+
/*
* Initialization of the kqueue() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
if (kqueue_fd < 0)
goto fail_fd;
- /* we can have up to two events per fd (*/
- kev = calloc(1, sizeof(struct kevent) * 2 * global.maxsock);
- if (kev == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_kqueue_per_thread);
+ hap_register_per_thread_deinit(deinit_kqueue_per_thread);
+ }
+ else if (!init_kqueue_per_thread())
goto fail_kev;
-
+
return 1;
fail_kev:
static unsigned int *fd_evts[2];
/* private data */
-static struct pollfd *poll_events = NULL;
-
+static THREAD_LOCAL int nbfd = 0;
+static THREAD_LOCAL struct pollfd *poll_events = NULL;
static inline void hap_fd_set(int fd, unsigned int *evts)
{
REGPRM1 static void __fd_clo(int fd)
{
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
hap_fd_clr(fd, fd_evts[DIR_RD]);
hap_fd_clr(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
/*
REGPRM2 static void _do_poll(struct poller *p, int exp)
{
int status;
- int fd, nbfd;
+ int fd;
int wait_time;
int updt_idx, en, eo;
int fds, count;
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
- fdtab[fd].state = en;
-
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
if ((eo & ~en) & FD_EV_POLLED_R)
hap_fd_clr(fd, fd_evts[DIR_RD]);
else if ((en & ~eo) & FD_EV_POLLED_R)
hap_fd_clr(fd, fd_evts[DIR_WR]);
else if ((en & ~eo) & FD_EV_POLLED_W)
hap_fd_set(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
fd_nbupdt = 0;
for (fds = 0; (fds * 8*sizeof(**fd_evts)) < maxfd; fds++) {
rn = fd_evts[DIR_RD][fds];
wn = fd_evts[DIR_WR][fds];
-
+
if (!(rn|wn))
continue;
poll_events[nbfd].events = (sr ? (POLLIN | POLLRDHUP) : 0) | (sw ? POLLOUT : 0);
nbfd++;
}
- }
+ }
}
-
+
/* now let's wait for events */
if (!exp)
wait_time = MAX_DELAY_MS;
unsigned int n;
int e = poll_events[count].revents;
fd = poll_events[count].fd;
-
+
if (!(e & ( POLLOUT | POLLIN | POLLERR | POLLHUP | POLLRDHUP )))
continue;
/* always remap RDHUP to HUP as they're used similarly */
if (e & POLLRDHUP) {
- cur_poller.flags |= HAP_POLL_F_RDHUP;
+ HA_ATOMIC_OR(&cur_poller.flags, HAP_POLL_F_RDHUP);
n |= FD_POLL_HUP;
}
-
fd_update_events(fd, n);
}
}
+
+static int init_poll_per_thread()
+{
+ poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock);
+ if (poll_events == NULL)
+ return 0;
+ return 1;
+}
+
+static void deinit_poll_per_thread()
+{
+ free(poll_events);
+}
+
/*
* Initialization of the poll() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
p->private = NULL;
fd_evts_bytes = (global.maxsock + sizeof(**fd_evts) - 1) / sizeof(**fd_evts) * sizeof(**fd_evts);
- poll_events = calloc(1, sizeof(struct pollfd) * global.maxsock);
-
- if (poll_events == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_poll_per_thread);
+ hap_register_per_thread_deinit(deinit_poll_per_thread);
+ }
+ else if (!init_poll_per_thread())
goto fail_pe;
-
+
if ((fd_evts[DIR_RD] = calloc(1, fd_evts_bytes)) == NULL)
goto fail_srevt;
-
if ((fd_evts[DIR_WR] = calloc(1, fd_evts_bytes)) == NULL)
goto fail_swevt;
#include <proto/fd.h>
+/* private data */
static fd_set *fd_evts[2];
-static fd_set *tmp_evts[2];
+static THREAD_LOCAL fd_set *tmp_evts[2];
/* Immediately remove the entry upon close() */
REGPRM1 static void __fd_clo(int fd)
{
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
FD_CLR(fd, fd_evts[DIR_RD]);
FD_CLR(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
/*
int fd, i;
struct timeval delta;
int delta_ms;
- int readnotnull, writenotnull;
int fds;
int updt_idx, en, eo;
char count;
-
+ int readnotnull, writenotnull;
+
/* first, scan the update list to find changes */
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
fd = fd_updt[updt_idx];
- fdtab[fd].updated = 0;
- fdtab[fd].new = 0;
if (!fdtab[fd].owner)
continue;
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
+ fdtab[fd].updated = 0;
+ fdtab[fd].new = 0;
+
eo = fdtab[fd].state;
en = fd_compute_new_polled_status(eo);
+ fdtab[fd].state = en;
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
if ((eo ^ en) & FD_EV_POLLED_RW) {
/* poll status changed, update the lists */
- fdtab[fd].state = en;
-
+ SPIN_LOCK(POLL_LOCK, &poll_lock);
if ((eo & ~en) & FD_EV_POLLED_R)
FD_CLR(fd, fd_evts[DIR_RD]);
else if ((en & ~eo) & FD_EV_POLLED_R)
FD_CLR(fd, fd_evts[DIR_WR]);
else if ((en & ~eo) & FD_EV_POLLED_W)
FD_SET(fd, fd_evts[DIR_WR]);
+ SPIN_UNLOCK(POLL_LOCK, &poll_lock);
}
}
fd_nbupdt = 0;
+ /* let's restore fdset state */
+ readnotnull = 0; writenotnull = 0;
+ for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) {
+ readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0;
+ writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0;
+ }
+
+#if 0
+ /* just a verification code, needs to be removed for performance */
+ for (i=0; i<maxfd; i++) {
+ if (FD_ISSET(i, tmp_evts[DIR_RD]) != FD_ISSET(i, fd_evts[DIR_RD]))
+ abort();
+ if (FD_ISSET(i, tmp_evts[DIR_WR]) != FD_ISSET(i, fd_evts[DIR_WR]))
+ abort();
+ }
+#endif
+
delta_ms = 0;
delta.tv_sec = 0;
delta.tv_usec = 0;
delta.tv_usec = (delta_ms % 1000) * 1000;
}
- /* let's restore fdset state */
-
- readnotnull = 0; writenotnull = 0;
- for (i = 0; i < (maxfd + FD_SETSIZE - 1)/(8*sizeof(int)); i++) {
- readnotnull |= (*(((int*)tmp_evts[DIR_RD])+i) = *(((int*)fd_evts[DIR_RD])+i)) != 0;
- writenotnull |= (*(((int*)tmp_evts[DIR_WR])+i) = *(((int*)fd_evts[DIR_WR])+i)) != 0;
- }
-
- // /* just a verification code, needs to be removed for performance */
- // for (i=0; i<maxfd; i++) {
- // if (FD_ISSET(i, tmp_evts[DIR_RD]) != FD_ISSET(i, fd_evts[DIR_RD]))
- // abort();
- // if (FD_ISSET(i, tmp_evts[DIR_WR]) != FD_ISSET(i, fd_evts[DIR_WR]))
- // abort();
- //
- // }
-
gettimeofday(&before_poll, NULL);
status = select(maxfd,
readnotnull ? tmp_evts[DIR_RD] : NULL,
writenotnull ? tmp_evts[DIR_WR] : NULL,
NULL,
&delta);
-
+
tv_update_date(delta_ms, status);
measure_idle();
}
}
+static int init_select_per_thread()
+{
+ int fd_set_bytes;
+
+ fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
+ if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
+ goto fail;
+ if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
+ goto fail;
+ return 1;
+ fail:
+ free(tmp_evts[DIR_RD]);
+ free(tmp_evts[DIR_WR]);
+ return 0;
+}
+
+static void deinit_select_per_thread()
+{
+ free(tmp_evts[DIR_WR]);
+ free(tmp_evts[DIR_RD]);
+}
+
/*
* Initialization of the select() poller.
* Returns 0 in case of failure, non-zero in case of success. If it fails, it
*/
REGPRM1 static int _do_init(struct poller *p)
{
- __label__ fail_swevt, fail_srevt, fail_wevt, fail_revt;
+ __label__ fail_swevt, fail_srevt, fail_revt;
int fd_set_bytes;
p->private = NULL;
goto fail_revt;
fd_set_bytes = sizeof(fd_set) * (global.maxsock + FD_SETSIZE - 1) / FD_SETSIZE;
-
- if ((tmp_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_select_per_thread);
+ hap_register_per_thread_deinit(deinit_select_per_thread);
+ }
+ else if (!init_select_per_thread())
goto fail_revt;
-
- if ((tmp_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
- goto fail_wevt;
if ((fd_evts[DIR_RD] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_srevt;
-
if ((fd_evts[DIR_WR] = (fd_set *)calloc(1, fd_set_bytes)) == NULL)
goto fail_swevt;
free(fd_evts[DIR_RD]);
fail_srevt:
free(tmp_evts[DIR_WR]);
- fail_wevt:
free(tmp_evts[DIR_RD]);
fail_revt:
p->pref = 0;
#include <types/global.h>
#include <proto/fd.h>
+#include <proto/log.h>
#include <proto/port_range.h>
struct fdtab *fdtab = NULL; /* array of all the file descriptors */
int nbpollers = 0;
unsigned int *fd_cache = NULL; // FD events cache
-unsigned int *fd_updt = NULL; // FD updates list
int fd_cache_num = 0; // number of events in the cache
-int fd_nbupdt = 0; // number of updates in the list
+
+THREAD_LOCAL int *fd_updt = NULL; // FD updates list
+THREAD_LOCAL int fd_nbupdt = 0; // number of updates in the list
+
+#ifdef USE_THREAD
+HA_SPINLOCK_T fdtab_lock; /* global lock to protect fdtab array */
+HA_RWLOCK_T fdcache_lock; /* global lock to protect fd_cache array */
+HA_SPINLOCK_T poll_lock; /* global lock to protect poll info */
+#endif
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
* The file descriptor is also closed.
*/
static void fd_dodelete(int fd, int do_close)
{
+ SPIN_LOCK(FD_LOCK, &fdtab[fd].lock);
if (fdtab[fd].linger_risk) {
/* this is generally set when connecting to servers */
setsockopt(fd, SOL_SOCKET, SO_LINGER,
fdtab[fd].new = 0;
if (do_close)
close(fd);
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ SPIN_LOCK(FDTAB_LOCK, &fdtab_lock);
while ((maxfd-1 >= 0) && !fdtab[maxfd-1].owner)
maxfd--;
+ SPIN_UNLOCK(FDTAB_LOCK, &fdtab_lock);
}
/* Deletes an FD from the fdsets, and recomputes the maxfd limit.
{
int fd, entry, e;
+ if (!fd_cache_num)
+ return;
+
+ RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
for (entry = 0; entry < fd_cache_num; ) {
fd = fd_cache[entry];
- e = fdtab[fd].state;
+ if (SPIN_TRYLOCK(FD_LOCK, &fdtab[fd].lock))
+ goto next;
+
+ RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
+
+ e = fdtab[fd].state;
fdtab[fd].ev &= FD_POLL_STICKY;
if ((e & (FD_EV_READY_R | FD_EV_ACTIVE_R)) == (FD_EV_READY_R | FD_EV_ACTIVE_R))
if ((e & (FD_EV_READY_W | FD_EV_ACTIVE_W)) == (FD_EV_READY_W | FD_EV_ACTIVE_W))
fdtab[fd].ev |= FD_POLL_OUT;
- if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
+ if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) {
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
fdtab[fd].iocb(fd);
- else
+ }
+ else {
fd_release_cache_entry(fd);
+ SPIN_UNLOCK(FD_LOCK, &fdtab[fd].lock);
+ }
+ RWLOCK_RDLOCK(FDCACHE_LOCK, &fdcache_lock);
/* If the fd was removed from the cache, it has been
* replaced by the next one that we don't want to skip !
*/
if (entry < fd_cache_num && fd_cache[entry] != fd)
continue;
+ next:
entry++;
}
+ RWLOCK_RDUNLOCK(FDCACHE_LOCK, &fdcache_lock);
}
/* disable the specified poller */
pollers[p].pref = 0;
}
+/* Initialize the pollers per thread */
+static int init_pollers_per_thread()
+{
+ if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
+ return 0;
+ return 1;
+}
+
+/* Deinitialize the pollers per thread */
+static void deinit_pollers_per_thread()
+{
+ free(fd_updt);
+ fd_updt = NULL;
+}
+
/*
* Initialize the pollers till the best one is found.
* If none works, returns 0, otherwise 1.
if ((fd_cache = calloc(global.maxsock, sizeof(*fd_cache))) == NULL)
goto fail_cache;
- if ((fd_updt = calloc(global.maxsock, sizeof(*fd_updt))) == NULL)
+ if (global.nbthread > 1) {
+ hap_register_per_thread_init(init_pollers_per_thread);
+ hap_register_per_thread_deinit(deinit_pollers_per_thread);
+ }
+ else if (!init_pollers_per_thread())
goto fail_updt;
+ for (p = 0; p < global.maxsock; p++)
+ SPIN_INIT(&fdtab[p].lock);
+
+ //memset(fd_cache, -1, global.maxsock);
+
+ SPIN_INIT(&fdtab_lock);
+ RWLOCK_INIT(&fdcache_lock);
+ SPIN_INIT(&poll_lock);
do {
bp = NULL;
for (p = 0; p < nbpollers; p++)
struct poller *bp;
int p;
+ for (p = 0; p < global.maxsock; p++)
+ SPIN_DESTROY(&fdtab[p].lock);
+
for (p = 0; p < nbpollers; p++) {
bp = &pollers[p];
if (bp && bp->pref)
bp->term(bp);
}
+
free(fd_updt); fd_updt = NULL;
free(fd_cache); fd_cache = NULL;
free(fdinfo); fdinfo = NULL;
free(fdtab); fdtab = NULL;
+
+ SPIN_DESTROY(&fdtab_lock);
+ RWLOCK_DESTROY(&fdcache_lock);
+ SPIN_DESTROY(&poll_lock);
}
/*
deinit_pollers();
} /* end deinit() */
+void mworker_pipe_handler(int fd)
+{
+ char c;
+
+ while (read(fd, &c, 1) == -1) {
+ if (errno == EINTR)
+ continue;
+ if (errno == EAGAIN) {
+ fd_cant_recv(fd);
+ return;
+ }
+ break;
+ }
+
+ deinit();
+ exit(EXIT_FAILURE);
+ return;
+}
+
+void mworker_pipe_register(int pipefd[2])
+{
+ close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
+ fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
+ fdtab[mworker_pipe[0]].owner = mworker_pipe;
+ fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
+ fd_insert(mworker_pipe[0]);
+ fd_want_recv(mworker_pipe[0]);
+}
static void sync_poll_loop()
{
}
}
+ if (global.mode & MODE_MWORKER)
+ mworker_pipe_register(mworker_pipe);
+
+ protocol_enable_all();
THREAD_SYNC_ENABLE();
run_poll_loop();
return t;
}
-void mworker_pipe_handler(int fd)
-{
- char c;
-
- while (read(fd, &c, 1) == -1) {
- if (errno == EINTR)
- continue;
- if (errno == EAGAIN) {
- fd_cant_recv(fd);
- return;
- }
- break;
- }
-
- deinit();
- exit(EXIT_FAILURE);
- return;
-}
-
-void mworker_pipe_register(int pipefd[2])
-{
- close(mworker_pipe[1]); /* close the write end of the master pipe in the children */
-
- fcntl(mworker_pipe[0], F_SETFL, O_NONBLOCK);
- fdtab[mworker_pipe[0]].owner = mworker_pipe;
- fdtab[mworker_pipe[0]].iocb = mworker_pipe_handler;
- fd_insert(mworker_pipe[0]);
- fd_want_recv(mworker_pipe[0]);
- }
-
-
int main(int argc, char **argv)
{
int err, retry;
}
global.mode &= ~MODE_STARTING;
-
- if (global.mode & MODE_MWORKER)
- mworker_pipe_register(mworker_pipe);
-
- protocol_enable_all();
/*
* That's it : the central polling loop. Run until we stop.
*/
}
else {
tid = 0;
+
+ if (global.mode & MODE_MWORKER)
+ mworker_pipe_register(mworker_pipe);
+
+ protocol_enable_all();
+
run_poll_loop();
}