typedef struct isc__socket isc__socket_t;
typedef struct isc__socketmgr isc__socketmgr_t;
+typedef struct isc__socketthread isc__socketthread_t;
#define NEWCONNSOCK(ev) ((isc__socket_t *)(ev)->newsocket)
unsigned int references;
int fd;
int pf;
+ int threadid;
char name[16];
void * tag;
isc_socketmgr_t common;
isc_mem_t *mctx;
isc_mutex_t lock;
- isc_mutex_t *fdlock;
isc_stats_t *stats;
+ int nthreads;
+ isc__socketthread_t *threads;
+ unsigned int maxsocks;
+ /* Locked by manager lock. */
+ ISC_LIST(isc__socket_t) socklist;
+ int reserved; /* unlocked */
+ isc_condition_t shutdown_ok;
+ int maxudp;
+};
+
+struct isc__socketthread {
+ isc__socketmgr_t * manager;
+ int threadid;
+ isc_thread_t thread;
+ int pipe_fds[2];
+ isc_mutex_t *fdlock;
+ /* Locked by fdlock. */
+ isc__socket_t **fds;
+ int *fdstate;
#ifdef USE_KQUEUE
int kqueue_fd;
int nevents;
int epoll_fd;
int nevents;
struct epoll_event *events;
+ uint32_t *epoll_events;
#endif /* USE_EPOLL */
#ifdef USE_DEVPOLL
int devpoll_fd;
unsigned int calls;
int nevents;
struct pollfd *events;
+ pollinfo_t *fdpollinfo;
#endif /* USE_DEVPOLL */
#ifdef USE_SELECT
int fd_bufsize;
-#endif /* USE_SELECT */
- unsigned int maxsocks;
- int pipe_fds[2];
-
- /* Locked by fdlock. */
- isc__socket_t **fds;
- int *fdstate;
-#if defined(USE_EPOLL)
- uint32_t *epoll_events;
-#endif
-#ifdef USE_DEVPOLL
- pollinfo_t *fdpollinfo;
-#endif
-
- /* Locked by manager lock. */
- ISC_LIST(isc__socket_t) socklist;
-#ifdef USE_SELECT
fd_set *read_fds;
fd_set *read_fds_copy;
fd_set *write_fds;
fd_set *write_fds_copy;
int maxfd;
#endif /* USE_SELECT */
- int reserved; /* unlocked */
- isc_thread_t watcher;
- isc_condition_t shutdown_ok;
- int maxudp;
};
+
#define CLOSED 0 /* this one must be zero */
#define MANAGED 1
#define CLOSE_PENDING 2
struct msghdr *, struct iovec *, size_t *);
static void build_msghdr_recv(isc__socket_t *, char *, isc_socketevent_t *,
struct msghdr *, struct iovec *, size_t *);
-static bool process_ctlfd(isc__socketmgr_t *manager);
+static bool process_ctlfd(isc__socketthread_t *thread);
static void setdscp(isc__socket_t *sock, isc_dscp_t dscp);
#define SELECT_POKE_SHUTDOWN (-1)
"sockmgr %p: %s", sockmgr, msgbuf);
}
+static void
+thread_log(isc__socketthread_t *thread,
+ isc_logcategory_t *category, isc_logmodule_t *module, int level,
+ const char *fmt, ...) ISC_FORMAT_PRINTF(5, 6);
+static void
+thread_log(isc__socketthread_t *thread,
+ isc_logcategory_t *category, isc_logmodule_t *module, int level,
+ const char *fmt, ...)
+{
+ char msgbuf[2048];
+ va_list ap;
+
+ if (! isc_log_wouldlog(isc_lctx, level))
+ return;
+
+ va_start(ap, fmt);
+ vsnprintf(msgbuf, sizeof(msgbuf), fmt, ap);
+ va_end(ap);
+
+ isc_log_write(isc_lctx, category, module, level,
+ "sockmgr %p thread %d: %s", thread->manager, thread->threadid, msgbuf);
+}
+
static void
socket_log(isc__socket_t *sock, const isc_sockaddr_t *address,
isc_logcategory_t *category, isc_logmodule_t *module, int level,
}
static inline isc_result_t
-watch_fd(isc__socketmgr_t *manager, int fd, int msg) {
+watch_fd(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result = ISC_R_SUCCESS;
#ifdef USE_KQUEUE
evchange.filter = EVFILT_WRITE;
evchange.flags = EV_ADD;
evchange.ident = fd;
- if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
+ if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
result = isc__errno2result(errno);
return (result);
int ret;
int op;
- oldevents = manager->epoll_events[fd];
+ oldevents = thread->epoll_events[fd];
if (msg == SELECT_POKE_READ)
- manager->epoll_events[fd] |= EPOLLIN;
+ thread->epoll_events[fd] |= EPOLLIN;
else
- manager->epoll_events[fd] |= EPOLLOUT;
+ thread->epoll_events[fd] |= EPOLLOUT;
- event.events = manager->epoll_events[fd];
+ event.events = thread->epoll_events[fd];
memset(&event.data, 0, sizeof(event.data));
event.data.fd = fd;
op = (oldevents == 0U) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
- ret = epoll_ctl(manager->epoll_fd, op, fd, &event);
+ ret = epoll_ctl(thread->epoll_fd, op, fd, &event);
if (ret == -1) {
if (errno == EEXIST)
UNEXPECTED_ERROR(__FILE__, __LINE__,
return (result);
#elif defined(USE_DEVPOLL)
struct pollfd pfd;
+ INSIST(threadid == 0);
int lockid = FDLOCK_ID(fd);
memset(&pfd, 0, sizeof(pfd));
pfd.events = POLLOUT;
pfd.fd = fd;
pfd.revents = 0;
- LOCK(&manager->fdlock[lockid]);
- if (write(manager->devpoll_fd, &pfd, sizeof(pfd)) == -1)
+ LOCK(&thread->fdlock[lockid]);
+ if (write(thread->devpoll_fd, &pfd, sizeof(pfd)) == -1)
result = isc__errno2result(errno);
else {
if (msg == SELECT_POKE_READ)
- manager->fdpollinfo[fd].want_read = 1;
+ thread->fdpollinfo[fd].want_read = 1;
else
- manager->fdpollinfo[fd].want_write = 1;
+ thread->fdpollinfo[fd].want_write = 1;
}
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
return (result);
#elif defined(USE_SELECT)
- LOCK(&manager->lock);
+ LOCK(&thread->manager->lock);
if (msg == SELECT_POKE_READ)
- FD_SET(fd, manager->read_fds);
+ FD_SET(fd, thread->read_fds);
if (msg == SELECT_POKE_WRITE)
- FD_SET(fd, manager->write_fds);
- UNLOCK(&manager->lock);
+ FD_SET(fd, thread->write_fds);
+ UNLOCK(&thread->manager->lock);
return (result);
#endif
}
static inline isc_result_t
-unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) {
+unwatch_fd(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result = ISC_R_SUCCESS;
#ifdef USE_KQUEUE
evchange.filter = EVFILT_WRITE;
evchange.flags = EV_DELETE;
evchange.ident = fd;
- if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
+ if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0)
result = isc__errno2result(errno);
return (result);
int ret;
int op;
- if (msg == SELECT_POKE_READ)
- manager->epoll_events[fd] &= ~(EPOLLIN);
- else
- manager->epoll_events[fd] &= ~(EPOLLOUT);
+ if (msg == SELECT_POKE_READ) {
+ thread->epoll_events[fd] &= ~(EPOLLIN);
+ } else {
+ thread->epoll_events[fd] &= ~(EPOLLOUT);
+ }
- event.events = manager->epoll_events[fd];
+ event.events = thread->epoll_events[fd];
memset(&event.data, 0, sizeof(event.data));
event.data.fd = fd;
op = (event.events == 0U) ? EPOLL_CTL_DEL : EPOLL_CTL_MOD;
- ret = epoll_ctl(manager->epoll_fd, op, fd, &event);
+ ret = epoll_ctl(thread->epoll_fd, op, fd, &event);
if (ret == -1 && errno != ENOENT) {
char strbuf[ISC_STRERRORSIZE];
strerror_r(errno, strbuf, sizeof(strbuf));
* only provides a way of canceling per FD, we may need to re-poll the
* socket for the other operation.
*/
- LOCK(&manager->fdlock[lockid]);
+ LOCK(&thread->fdlock[lockid]);
if (msg == SELECT_POKE_READ &&
- manager->fdpollinfo[fd].want_write == 1) {
+ thread->fdpollinfo[fd].want_write == 1) {
pfds[1].events = POLLOUT;
pfds[1].fd = fd;
writelen += sizeof(pfds[1]);
}
if (msg == SELECT_POKE_WRITE &&
- manager->fdpollinfo[fd].want_read == 1) {
+ thread->fdpollinfo[fd].want_read == 1) {
pfds[1].events = POLLIN;
pfds[1].fd = fd;
writelen += sizeof(pfds[1]);
}
- if (write(manager->devpoll_fd, pfds, writelen) == -1)
+ if (write(thread->devpoll_fd, pfds, writelen) == -1)
result = isc__errno2result(errno);
else {
if (msg == SELECT_POKE_READ)
- manager->fdpollinfo[fd].want_read = 0;
+ thread->fdpollinfo[fd].want_read = 0;
else
- manager->fdpollinfo[fd].want_write = 0;
+ thread->fdpollinfo[fd].want_write = 0;
}
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
return (result);
#elif defined(USE_SELECT)
- LOCK(&manager->lock);
+ LOCK(&thread->manager->lock);
if (msg == SELECT_POKE_READ)
- FD_CLR(fd, manager->read_fds);
+ FD_CLR(fd, thread->read_fds);
else if (msg == SELECT_POKE_WRITE)
- FD_CLR(fd, manager->write_fds);
- UNLOCK(&manager->lock);
+ FD_CLR(fd, thread->write_fds);
+ UNLOCK(&thread->manager->lock);
return (result);
#endif
}
static void
-wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) {
+wakeup_socket(isc__socketthread_t *thread, int fd, int msg) {
isc_result_t result;
int lockid = FDLOCK_ID(fd);
* or writes.
*/
- INSIST(fd >= 0 && fd < (int)manager->maxsocks);
+ INSIST(fd >= 0 && fd < (int)thread->manager->maxsocks);
if (msg == SELECT_POKE_CLOSE) {
/* No one should be updating fdstate, so no need to lock it */
- INSIST(manager->fdstate[fd] == CLOSE_PENDING);
- manager->fdstate[fd] = CLOSED;
- (void)unwatch_fd(manager, fd, SELECT_POKE_READ);
- (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
+ INSIST(thread->fdstate[fd] == CLOSE_PENDING);
+ thread->fdstate[fd] = CLOSED;
+ (void)unwatch_fd(thread, fd, SELECT_POKE_READ);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE);
(void)close(fd);
return;
}
- LOCK(&manager->fdlock[lockid]);
- if (manager->fdstate[fd] == CLOSE_PENDING) {
- UNLOCK(&manager->fdlock[lockid]);
+ LOCK(&thread->fdlock[lockid]);
+ if (thread->fdstate[fd] == CLOSE_PENDING) {
+ UNLOCK(&thread->fdlock[lockid]);
/*
* We accept (and ignore) any error from unwatch_fd() as we are
* fdlock; otherwise it could cause deadlock due to a lock order
* reversal.
*/
- (void)unwatch_fd(manager, fd, SELECT_POKE_READ);
- (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_READ);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE);
return;
}
- if (manager->fdstate[fd] != MANAGED) {
- UNLOCK(&manager->fdlock[lockid]);
+ if (thread->fdstate[fd] != MANAGED) {
+ UNLOCK(&thread->fdlock[lockid]);
return;
}
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
/*
* Set requested bit.
*/
- result = watch_fd(manager, fd, msg);
+ result = watch_fd(thread, fd, msg);
if (result != ISC_R_SUCCESS) {
/*
* XXXJT: what should we do? Ignoring the failure of watching
* will not get partial writes.
*/
static void
-select_poke(isc__socketmgr_t *mgr, int fd, int msg) {
+select_poke(isc__socketmgr_t *mgr, int threadid, int fd, int msg) {
int cc;
int buf[2];
char strbuf[ISC_STRERRORSIZE];
buf[1] = msg;
do {
- cc = write(mgr->pipe_fds[1], buf, sizeof(buf));
+ cc = write(mgr->threads[threadid].pipe_fds[1], buf, sizeof(buf));
#ifdef ENOSR
/*
* Treat ENOSR as EAGAIN but loop slowly as it is
* Read a message on the internal fd.
*/
static void
-select_readmsg(isc__socketmgr_t *mgr, int *fd, int *msg) {
+select_readmsg(isc__socketthread_t *thread, int *fd, int *msg) {
int buf[2];
int cc;
char strbuf[ISC_STRERRORSIZE];
- cc = read(mgr->pipe_fds[0], buf, sizeof(buf));
+ cc = read(thread->pipe_fds[0], buf, sizeof(buf));
if (cc < 0) {
*msg = SELECT_POKE_NOTHING;
*fd = -1; /* Silence compiler. */
static void
socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) {
int lockid = FDLOCK_ID(fd);
-
+ isc__socketthread_t *thread = &manager->threads[sock->threadid];
/*
* No one has this socket open, so the watcher doesn't have to be
* poked, and the socket doesn't have to be locked.
*/
- LOCK(&manager->fdlock[lockid]);
- manager->fds[fd] = NULL;
- manager->fdstate[fd] = CLOSE_PENDING;
- UNLOCK(&manager->fdlock[lockid]);
- select_poke(manager, fd, SELECT_POKE_CLOSE);
+ LOCK(&thread->fdlock[lockid]);
+ thread->fds[fd] = NULL;
+ thread->fdstate[fd] = CLOSE_PENDING;
+ UNLOCK(&thread->fdlock[lockid]);
+ select_poke(manager, sock->threadid, fd, SELECT_POKE_CLOSE);
inc_stats(manager->stats, sock->statsindex[STATID_CLOSE]);
if (sock->active == 1) {
*/
#ifdef USE_SELECT
LOCK(&manager->lock);
- if (manager->maxfd == fd) {
+ if (thread->maxfd == fd) {
int i;
- manager->maxfd = 0;
+ thread->maxfd = 0;
for (i = fd - 1; i >= 0; i--) {
lockid = FDLOCK_ID(i);
- LOCK(&manager->fdlock[lockid]);
- if (manager->fdstate[i] == MANAGED) {
- manager->maxfd = i;
- UNLOCK(&manager->fdlock[lockid]);
+ LOCK(&thread->fdlock[lockid]);
+ if (thread->fdstate[i] == MANAGED) {
+ thread->maxfd = i;
+ UNLOCK(&thread->fdlock[lockid]);
break;
}
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
}
- if (manager->maxfd < manager->pipe_fds[0])
- manager->maxfd = manager->pipe_fds[0];
+ if (thread->maxfd < thread->pipe_fds[0])
+ thread->maxfd = thread->pipe_fds[0];
}
UNLOCK(&manager->lock);
{
isc__socket_t *sock = NULL;
isc__socketmgr_t *manager = (isc__socketmgr_t *)manager0;
+ isc__socketthread_t *thread;
isc_result_t result;
int lockid;
return (result);
}
+ sock->threadid = sock->fd % manager->nthreads; // TODO?
sock->references = 1;
+ thread = &manager->threads[sock->threadid];
*socketp = (isc_socket_t *)sock;
/*
*/
lockid = FDLOCK_ID(sock->fd);
- LOCK(&manager->fdlock[lockid]);
- manager->fds[sock->fd] = sock;
- manager->fdstate[sock->fd] = MANAGED;
+ LOCK(&thread->fdlock[lockid]);
+ thread->fds[sock->fd] = sock;
+ thread->fdstate[sock->fd] = MANAGED;
#if defined(USE_EPOLL)
- manager->epoll_events[sock->fd] = 0;
+ thread->epoll_events[sock->fd] = 0;
#endif
#ifdef USE_DEVPOLL
- INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 &&
- sock->manager->fdpollinfo[sock->fd].want_write == 0);
+ INSIST(thread->fdpollinfo[sock->fd].want_read == 0 &&
+ thread->fdpollinfo[sock->fd].want_write == 0);
#endif
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
LOCK(&manager->lock);
ISC_LIST_APPEND(manager->socklist, sock, link);
#ifdef USE_SELECT
- if (manager->maxfd < sock->fd)
- manager->maxfd = sock->fd;
+ if (thread->maxfd < sock->fd) {
+ thread->maxfd = sock->fd;
+ }
#endif
UNLOCK(&manager->lock);
isc_socket_open(isc_socket_t *sock0) {
isc_result_t result;
isc__socket_t *sock = (isc__socket_t *)sock0;
+ isc__socketthread_t *thread;
REQUIRE(VALID_SOCKET(sock));
* this socket.
*/
REQUIRE(sock->fd == -1);
+ REQUIRE(sock->threadid == -1);
result = opensocket(sock->manager, sock, NULL);
- if (result != ISC_R_SUCCESS)
+ if (result != ISC_R_SUCCESS) {
sock->fd = -1;
-
- if (result == ISC_R_SUCCESS) {
+ } else {
+ sock->threadid = sock->fd % sock->manager->nthreads; // TODO?
+ thread = &sock->manager->threads[sock->threadid];
int lockid = FDLOCK_ID(sock->fd);
- LOCK(&sock->manager->fdlock[lockid]);
- sock->manager->fds[sock->fd] = sock;
- sock->manager->fdstate[sock->fd] = MANAGED;
+ LOCK(&thread->fdlock[lockid]);
+ thread->fds[sock->fd] = sock;
+ thread->fdstate[sock->fd] = MANAGED;
#if defined(USE_EPOLL)
- sock->manager->epoll_events[sock->fd] = 0;
+ thread->epoll_events[sock->fd] = 0;
#endif
#ifdef USE_DEVPOLL
- INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 &&
- sock->manager->fdpollinfo[sock->fd].want_write == 0);
+ INSIST(thread->fdpollinfo[sock->fd].want_read == 0 &&
+ thread->fdpollinfo[sock->fd].want_write == 0);
#endif
- UNLOCK(&sock->manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
#ifdef USE_SELECT
LOCK(&sock->manager->lock);
- if (sock->manager->maxfd < sock->fd)
- sock->manager->maxfd = sock->fd;
+ if (thread->maxfd < sock->fd)
+ thread->maxfd = sock->fd;
UNLOCK(&sock->manager->lock);
#endif
}
static void
internal_accept(isc__socket_t *sock) {
isc__socketmgr_t *manager;
+ isc__socketthread_t *thread, *nthread;
isc_socket_newconnev_t *dev;
isc_task_t *task;
socklen_t addrlen;
manager = sock->manager;
INSIST(VALID_MANAGER(manager));
+ thread = &manager->threads[sock->threadid];
INSIST(sock->listener);
* Poke watcher if there are more pending accepts.
*/
if (!ISC_LIST_EMPTY(sock->accept_list))
- watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
+ watch_fd(thread, sock->fd,
+ SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
int lockid = FDLOCK_ID(fd);
NEWCONNSOCK(dev)->fd = fd;
+ NEWCONNSOCK(dev)->threadid = fd % manager->nthreads; // TODO
NEWCONNSOCK(dev)->bound = 1;
NEWCONNSOCK(dev)->connected = 1;
+ nthread = &manager->threads[NEWCONNSOCK(dev)->threadid];
/*
* Use minimum mtu if possible.
NEWCONNSOCK(dev)->active = 1;
}
- LOCK(&manager->fdlock[lockid]);
- manager->fds[fd] = NEWCONNSOCK(dev);
- manager->fdstate[fd] = MANAGED;
+ LOCK(&nthread->fdlock[lockid]);
+ thread->fds[fd] = NEWCONNSOCK(dev);
+ thread->fdstate[fd] = MANAGED;
#if defined(USE_EPOLL)
- manager->epoll_events[fd] = 0;
+ thread->epoll_events[fd] = 0;
#endif
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&nthread->fdlock[lockid]);
LOCK(&manager->lock);
#ifdef USE_SELECT
- if (manager->maxfd < fd)
- manager->maxfd = fd;
+ if (nthread->maxfd < fd)
+ nthread->maxfd = fd;
#endif
socket_log(sock, &NEWCONNSOCK(dev)->peer_address, CREATION,
return;
soft_error:
- watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT);
+ watch_fd(thread, sock->fd, SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
inc_stats(manager->stats, sock->statsindex[STATID_ACCEPTFAIL]);
poke:
if (!ISC_LIST_EMPTY(sock->recv_list))
- watch_fd(sock->manager, sock->fd, SELECT_POKE_READ);
+ watch_fd(&sock->manager->threads[sock->threadid], sock->fd,
+ SELECT_POKE_READ);
UNLOCK(&sock->lock);
}
poke:
if (!ISC_LIST_EMPTY(sock->send_list))
- watch_fd(sock->manager, sock->fd, SELECT_POKE_WRITE);
+ watch_fd(&sock->manager->threads[sock->threadid], sock->fd, SELECT_POKE_WRITE);
UNLOCK(&sock->lock);
}
* and unlocking twice if both reads and writes are possible.
*/
static void
-process_fd(isc__socketmgr_t *manager, int fd, bool readable,
+process_fd(isc__socketthread_t *thread, int fd, bool readable,
bool writeable)
{
isc__socket_t *sock;
/*
* If the socket is going to be closed, don't do more I/O.
*/
- LOCK(&manager->fdlock[lockid]);
- if (manager->fdstate[fd] == CLOSE_PENDING) {
- UNLOCK(&manager->fdlock[lockid]);
+ LOCK(&thread->fdlock[lockid]);
+ if (thread->fdstate[fd] == CLOSE_PENDING) {
+ UNLOCK(&thread->fdlock[lockid]);
- (void)unwatch_fd(manager, fd, SELECT_POKE_READ);
- (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_READ);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE);
return;
}
- sock = manager->fds[fd];
+ sock = thread->fds[fd];
+ if (sock == NULL) {
+ unwatch_read = readable;
+ unwatch_write = writeable;
+ goto unlock_fd;
+ }
+
LOCK(&sock->lock);
sock->references++;
UNLOCK(&sock->lock);
if (readable) {
- if (sock == NULL) {
- unwatch_read = true;
- goto check_write;
- }
if (!SOCK_DEAD(sock)) {
if (sock->listener)
internal_accept(sock);
}
unwatch_read = true;
}
-check_write:
+
if (writeable) {
if (sock == NULL) {
unwatch_write = true;
}
unlock_fd:
- UNLOCK(&manager->fdlock[lockid]);
+ UNLOCK(&thread->fdlock[lockid]);
if (unwatch_read)
- (void)unwatch_fd(manager, fd, SELECT_POKE_READ);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_READ);
if (unwatch_write)
- (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE);
- LOCK(&sock->lock);
- sock->references--;
- UNLOCK(&sock->lock);
+ (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE);
+ if (sock != NULL) {
+ LOCK(&sock->lock);
+ sock->references--;
+ UNLOCK(&sock->lock);
+ }
}
#ifdef USE_KQUEUE
static bool
-process_fds(isc__socketmgr_t *manager, struct kevent *events, int nevents) {
+process_fds(isc__socketthread_t *thread, struct kevent *events,
+ int nevents)
+{
int i;
bool readable, writable;
bool done = false;
bool have_ctlevent = false;
+ INSIST(thread->threadid == 0);
- if (nevents == manager->nevents) {
+ if (nevents == thread->nevents) {
/*
* This is not an error, but something unexpected. If this
* happens, it may indicate the need for increasing
* ISC_SOCKET_MAXEVENTS.
*/
- manager_log(manager, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
- "maximum number of FD events (%d) received",
- nevents);
+ thread_log(thread, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
+ "maximum number of FD events (%d) received",
+ nevents);
}
for (i = 0; i < nevents; i++) {
- REQUIRE(events[i].ident < manager->maxsocks);
- if (events[i].ident == (uintptr_t)manager->pipe_fds[0]) {
+ REQUIRE(events[i].ident < thread->manager->maxsocks);
+ if (events[i].ident == (uintptr_t)thread->pipe_fds[0]) {
have_ctlevent = true;
continue;
}
readable = (events[i].filter == EVFILT_READ);
writable = (events[i].filter == EVFILT_WRITE);
- process_fd(manager, events[i].ident, readable, writable);
+ process_fd(thread, events[i].ident, readable, writable);
}
if (have_ctlevent)
- done = process_ctlfd(manager);
+ done = process_ctlfd(thread);
return (done);
}
#elif defined(USE_EPOLL)
static bool
-process_fds(isc__socketmgr_t *manager, struct epoll_event *events, int nevents)
+process_fds(isc__socketthread_t *thread, struct epoll_event *events,
+ int nevents)
{
int i;
bool done = false;
bool have_ctlevent = false;
- if (nevents == manager->nevents) {
- manager_log(manager, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
- "maximum number of FD events (%d) received",
- nevents);
+ if (nevents == thread->nevents) {
+ thread_log(thread, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
+ "maximum number of FD events (%d) received",
+ nevents);
}
for (i = 0; i < nevents; i++) {
- REQUIRE(events[i].data.fd < (int)manager->maxsocks);
- if (events[i].data.fd == manager->pipe_fds[0]) {
+ REQUIRE(events[i].data.fd < (int)thread->manager->maxsocks);
+ if (events[i].data.fd == thread->pipe_fds[0]) {
have_ctlevent = true;
continue;
}
* won't block because we use non-blocking sockets.
*/
int fd = events[i].data.fd;
- events[i].events |= manager->epoll_events[fd];
+ events[i].events |= thread->epoll_events[fd];
}
- process_fd(manager, events[i].data.fd,
+ process_fd(thread, events[i].data.fd,
(events[i].events & EPOLLIN) != 0,
(events[i].events & EPOLLOUT) != 0);
}
if (have_ctlevent)
- done = process_ctlfd(manager);
+ done = process_ctlfd(thread);
return (done);
}
#elif defined(USE_DEVPOLL)
static bool
-process_fds(isc__socketmgr_t *manager, struct pollfd *events, int nevents) {
+process_fds(isc__socketthread_t *thread, struct pollfd *events,
+ int nevents)
+{
int i;
bool done = false;
bool have_ctlevent = false;
+ INSIST(threadid == 0);
- if (nevents == manager->nevents) {
- manager_log(manager, ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
- "maximum number of FD events (%d) received",
- nevents);
+ if (nevents == thread->nevents) {
+ thread_log(manager, ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_SOCKET, ISC_LOG_INFO,
+ "maximum number of FD events (%d) received",
+ nevents);
}
for (i = 0; i < nevents; i++) {
REQUIRE(events[i].fd < (int)manager->maxsocks);
- if (events[i].fd == manager->pipe_fds[0]) {
+ if (events[i].fd == thread->pipe_fds[0]) {
have_ctlevent = true;
continue;
}
- process_fd(manager, events[i].fd,
+ process_fd(thread, events[i].fd,
(events[i].events & POLLIN) != 0,
(events[i].events & POLLOUT) != 0);
}
if (have_ctlevent)
- done = process_ctlfd(manager);
+ done = process_ctlfd(thread);
return (done);
}
#elif defined(USE_SELECT)
static void
-process_fds(isc__socketmgr_t *manager, int maxfd, fd_set *readfds,
+process_fds(isc__socketthread_t *thread, int maxfd, fd_set *readfds,
fd_set *writefds)
{
int i;
- REQUIRE(maxfd <= (int)manager->maxsocks);
+ REQUIRE(maxfd <= (int)thread->manager->maxsocks);
for (i = 0; i < maxfd; i++) {
- if (i == manager->pipe_fds[0] || i == manager->pipe_fds[1])
+ if (i == thread->pipe_fds[0] || i == thread->pipe_fds[1])
continue;
- process_fd(manager, i, FD_ISSET(i, readfds),
+ process_fd(thread, i, FD_ISSET(i, readfds),
FD_ISSET(i, writefds));
}
}
#endif
static bool
-process_ctlfd(isc__socketmgr_t *manager) {
+process_ctlfd(isc__socketthread_t *thread) {
int msg, fd;
for (;;) {
- select_readmsg(manager, &fd, &msg);
+ select_readmsg(thread, &fd, &msg);
- manager_log(manager, IOEVENT,
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET,
- ISC_MSG_WATCHERMSG,
- "watcher got message %d "
- "for socket %d"), msg, fd);
+ thread_log(thread, IOEVENT,
+ isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET,
+ ISC_MSG_WATCHERMSG,
+ "watcher got message %d "
+ "for socket %d"), msg, fd);
/*
* Nothing to read?
* and decide if we need to watch on it now
* or not.
*/
- wakeup_socket(manager, fd, msg);
+ wakeup_socket(thread, fd, msg);
}
return (false);
* This is the thread that will loop forever, always in a select or poll
* call.
*
- * When select returns something to do, track down what thread gets to do
- * this I/O and post the event to it.
+ * When select returns something to do, do whatever's necessary and post
+ * an event to the task that was requesting the action.
*/
static isc_threadresult_t
-watcher(void *uap) {
- isc__socketmgr_t *manager = uap;
+netthread(void *uap) {
+ isc__socketthread_t *thread = uap;
+ isc__socketmgr_t *manager = thread->manager;
+ (void)manager;
bool done;
int cc;
#ifdef USE_KQUEUE
#elif defined (USE_EPOLL)
const char *fnname = "epoll_wait()";
#elif defined(USE_DEVPOLL)
+ INSIST(threadid == 0);
isc_result_t result;
const char *fnname = "ioctl(DP_POLL)";
struct dvpoll dvp;
/*
* Get the control fd here. This will never change.
*/
- ctlfd = manager->pipe_fds[0];
+ ctlfd = thread->pipe_fds[0];
#endif
done = false;
while (!done) {
do {
#ifdef USE_KQUEUE
- cc = kevent(manager->kqueue_fd, NULL, 0,
- manager->events, manager->nevents, NULL);
+ cc = kevent(thread->kqueue_fd, NULL, 0,
+ thread->events, thread->nevents, NULL);
#elif defined(USE_EPOLL)
- cc = epoll_wait(manager->epoll_fd, manager->events,
- manager->nevents, -1);
+ cc = epoll_wait(thread->epoll_fd,
+ thread->events,
+ thread->nevents, -1);
#elif defined(USE_DEVPOLL)
/*
* Re-probe every thousand calls.
manager->calls = 0;
}
for (pass = 0; pass < 2; pass++) {
- dvp.dp_fds = manager->events;
- dvp.dp_nfds = manager->nevents;
- if (dvp.dp_nfds >= manager->open_max)
- dvp.dp_nfds = manager->open_max - 1;
+ dvp.dp_fds = tgread->events;
+ dvp.dp_nfds = thread->nevents;
+ if (dvp.dp_nfds >= thread->open_max)
+ dvp.dp_nfds = thread->open_max - 1;
#ifndef ISC_SOCKET_USE_POLLWATCH
dvp.dp_timeout = -1;
#else
dvp.dp_timeout =
ISC_SOCKET_POLLWATCH_TIMEOUT;
#endif /* ISC_SOCKET_USE_POLLWATCH */
- cc = ioctl(manager->devpoll_fd, DP_POLL, &dvp);
+ cc = ioctl(thread->devpoll_fd, DP_POLL, &dvp);
if (cc == -1 && errno == EINVAL) {
/*
* {OPEN_MAX} may have dropped. Look
break;
}
#elif defined(USE_SELECT)
+ /*
+ * We will have only one thread anyway, we can lock
+ * manager lock and don't care
+ */
LOCK(&manager->lock);
- memmove(manager->read_fds_copy, manager->read_fds,
- manager->fd_bufsize);
- memmove(manager->write_fds_copy, manager->write_fds,
- manager->fd_bufsize);
- maxfd = manager->maxfd + 1;
+ memmove(thread->read_fds_copy, thread->read_fds,
+ thread->fd_bufsize);
+ memmove(thread->write_fds_copy, thread->write_fds,
+ thread->fd_bufsize);
+ maxfd = thread->maxfd + 1;
UNLOCK(&manager->lock);
- cc = select(maxfd, manager->read_fds_copy,
- manager->write_fds_copy, NULL, NULL);
+ cc = select(maxfd, thread->read_fds_copy,
+ thread->write_fds_copy, NULL, NULL);
#endif /* USE_KQUEUE */
if (cc < 0 && !SOFT_ERROR(errno)) {
* (and it can also be a false positive)
* so it would be just too noisy.
*/
- manager_log(manager,
- ISC_LOGCATEGORY_GENERAL,
- ISC_LOGMODULE_SOCKET,
- ISC_LOG_DEBUG(1),
- "unexpected POLL timeout");
+ thread_log(thread,
+ ISC_LOGCATEGORY_GENERAL,
+ ISC_LOGMODULE_SOCKET,
+ ISC_LOG_DEBUG(1),
+ "unexpected POLL timeout");
}
pollstate = poll_active;
}
} while (cc < 0);
#if defined(USE_KQUEUE) || defined (USE_EPOLL) || defined (USE_DEVPOLL)
- done = process_fds(manager, manager->events, cc);
+ done = process_fds(thread, thread->events, cc);
#elif defined(USE_SELECT)
- process_fds(manager, maxfd, manager->read_fds_copy,
- manager->write_fds_copy);
+ process_fds(thread, maxfd, thread->read_fds_copy,
+ thread->write_fds_copy);
/*
* Process reads on internal, control fd.
*/
- if (FD_ISSET(ctlfd, manager->read_fds_copy))
- done = process_ctlfd(manager);
+ if (FD_ISSET(ctlfd, thread->read_fds_copy))
+ done = process_ctlfd(thread);
#endif
}
- manager_log(manager, TRACE, "%s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_EXITING, "watcher exiting"));
-
+ thread_log(thread, TRACE, "%s",
+ isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
+ ISC_MSG_EXITING, "watcher exiting"));
return ((isc_threadresult_t)0);
}
}
/*
- * Create a new socket manager.
+ * Setup socket thread, thread->manager and thread->threadid must be filled.
*/
static isc_result_t
-setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) {
- isc_result_t result;
-#if defined(USE_KQUEUE) || defined(USE_EPOLL) || defined(USE_DEVPOLL)
+setup_thread(isc__socketthread_t *thread) {
+ isc_result_t result = ISC_R_SUCCESS;
+ int i;
char strbuf[ISC_STRERRORSIZE];
-#endif
+ REQUIRE(thread != NULL);
+ REQUIRE(VALID_MANAGER(thread->manager));
+ REQUIRE(thread->threadid >= 0 && thread->threadid < thread->manager->nthreads);
+ thread->fds = isc_mem_get(thread->manager->mctx,
+ thread->manager->maxsocks * sizeof(isc__socket_t *));
+ if (thread->fds == NULL) {
+ result = ISC_R_NOMEMORY;
+ return (result); // TODO
+ }
+ memset(thread->fds, 0, thread->manager->maxsocks * sizeof(isc_socket_t *));
+
+ thread->fdstate = isc_mem_get(thread->manager->mctx, thread->manager->maxsocks * sizeof(int));
+ if (thread ->fdstate == NULL) {
+ result = ISC_R_NOMEMORY;
+ return (result); // TODO
+ }
+ memset(thread->fdstate, 0, thread->manager->maxsocks * sizeof(int));
+
+ thread->fdlock = isc_mem_get(thread->manager->mctx, FDLOCK_COUNT * sizeof(isc_mutex_t));
+ if (thread->fdlock == NULL) {
+ result = ISC_R_NOMEMORY;
+ return (result);
+ }
+
+ for (i = 0; i < FDLOCK_COUNT; i++) {
+ result = isc_mutex_init(&thread->fdlock[i]);
+ if (result != ISC_R_SUCCESS) {
+ return (result);
+ }
+ }
+
+ if (pipe(thread->pipe_fds) != 0) {
+ strerror_r(errno, strbuf, sizeof(strbuf));
+ UNEXPECTED_ERROR(__FILE__, __LINE__,
+ "pipe() %s: %s",
+ isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
+ ISC_MSG_FAILED, "failed"),
+ strbuf);
+ return (ISC_R_UNEXPECTED);
+ }
+ RUNTIME_CHECK(make_nonblock(thread->pipe_fds[0]) == ISC_R_SUCCESS);
#ifdef USE_KQUEUE
- manager->nevents = ISC_SOCKET_MAXEVENTS;
- manager->events = isc_mem_get(mctx, sizeof(struct kevent) *
- manager->nevents);
- if (manager->events == NULL)
+ thread->nevents = ISC_SOCKET_MAXEVENTS;
+ thread->events = isc_mem_get(thread->manager->mctx, sizeof(struct kevent) *
+ thread->nevents);
+ if (thread->events == NULL)
return (ISC_R_NOMEMORY);
- manager->kqueue_fd = kqueue();
- if (manager->kqueue_fd == -1) {
+ thread->kqueue_fd = kqueue();
+ if (thread->kqueue_fd == -1) {
result = isc__errno2result(errno);
strerror_r(errno, strbuf, sizeof(strbuf));
UNEXPECTED_ERROR(__FILE__, __LINE__,
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"),
strbuf);
- isc_mem_put(mctx, manager->events,
- sizeof(struct kevent) * manager->nevents);
+ isc_mem_put(thread->manager->mctx, thread->events,
+ sizeof(struct kevent) * thread->nevents);
return (result);
}
- result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ);
+ result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ);
if (result != ISC_R_SUCCESS) {
- close(manager->kqueue_fd);
- isc_mem_put(mctx, manager->events,
- sizeof(struct kevent) * manager->nevents);
- return (result);
+ close(thread->kqueue_fd);
+ isc_mem_put(thread->manager->mctx, thread->events,
+ sizeof(struct kevent) * thread->nevents);
}
+ return (result);
+
#elif defined(USE_EPOLL)
- manager->nevents = ISC_SOCKET_MAXEVENTS;
- manager->events = isc_mem_get(mctx, sizeof(struct epoll_event) *
- manager->nevents);
- if (manager->events == NULL)
+ thread->nevents = ISC_SOCKET_MAXEVENTS;
+ thread->epoll_events = isc_mem_get(thread->manager->mctx,
+ (thread->manager->maxsocks *
+ sizeof(uint32_t)));
+ if (thread->epoll_events == NULL) {
+ return (ISC_R_NOMEMORY);
+ }
+ memset(thread->epoll_events, 0,
+ thread->manager->maxsocks * sizeof(uint32_t));
+
+ thread->events = isc_mem_get(thread->manager->mctx,
+ sizeof(struct epoll_event) *
+ thread->nevents);
+ if (thread->events == NULL) {
return (ISC_R_NOMEMORY);
- manager->epoll_fd = epoll_create(manager->nevents);
- if (manager->epoll_fd == -1) {
+ }
+
+ thread->epoll_fd = epoll_create(thread->nevents);
+ if (thread->epoll_fd == -1) {
result = isc__errno2result(errno);
strerror_r(errno, strbuf, sizeof(strbuf));
UNEXPECTED_ERROR(__FILE__, __LINE__,
"epoll_create %s: %s",
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"),
- strbuf);
- isc_mem_put(mctx, manager->events,
- sizeof(struct epoll_event) * manager->nevents);
- return (result);
- }
- result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ);
- if (result != ISC_R_SUCCESS) {
- close(manager->epoll_fd);
- isc_mem_put(mctx, manager->events,
- sizeof(struct epoll_event) * manager->nevents);
+ strbuf);
return (result);
+
}
+ result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ);
+ return (result);
+
#elif defined(USE_DEVPOLL)
- manager->nevents = ISC_SOCKET_MAXEVENTS;
+ thread->nevents = ISC_SOCKET_MAXEVENTS;
result = isc_resource_getcurlimit(isc_resource_openfiles,
&manager->open_max);
if (result != ISC_R_SUCCESS)
manager->open_max = 64;
manager->calls = 0;
- manager->events = isc_mem_get(mctx, sizeof(struct pollfd) *
+ manager->events = isc_mem_get(thread->manager->mctx,
+ sizeof(struct pollfd) *
manager->nevents);
if (manager->events == NULL)
return (ISC_R_NOMEMORY);
sizeof(pollinfo_t) * manager->maxsocks);
return (result);
}
- result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ);
+ result = watch_fd(manager, 0, manager->pipe_fds[0], SELECT_POKE_READ);
if (result != ISC_R_SUCCESS) {
close(manager->devpoll_fd);
isc_mem_put(mctx, manager->events,
sizeof(pollinfo_t) * manager->maxsocks);
return (result);
}
+
+ return (ISC_R_SUCCESS);
#elif defined(USE_SELECT)
UNUSED(result);
* FD_SETSIZE, but we separate the cases to avoid possible portability
* issues regarding howmany() and the actual representation of fd_set.
*/
- manager->fd_bufsize = howmany(manager->maxsocks, NFDBITS) *
+ thread->fd_bufsize = howmany(manager->maxsocks, NFDBITS) *
sizeof(fd_mask);
#else
- manager->fd_bufsize = sizeof(fd_set);
+ thread->fd_bufsize = sizeof(fd_set);
#endif
- manager->read_fds = NULL;
- manager->read_fds_copy = NULL;
- manager->write_fds = NULL;
- manager->write_fds_copy = NULL;
-
- manager->read_fds = isc_mem_get(mctx, manager->fd_bufsize);
- if (manager->read_fds != NULL)
- manager->read_fds_copy = isc_mem_get(mctx, manager->fd_bufsize);
- if (manager->read_fds_copy != NULL)
- manager->write_fds = isc_mem_get(mctx, manager->fd_bufsize);
- if (manager->write_fds != NULL) {
- manager->write_fds_copy = isc_mem_get(mctx,
- manager->fd_bufsize);
- }
- if (manager->write_fds_copy == NULL) {
- if (manager->write_fds != NULL) {
- isc_mem_put(mctx, manager->write_fds,
- manager->fd_bufsize);
+ thread->read_fds = NULL;
+ thread->read_fds_copy = NULL;
+ thread->write_fds = NULL;
+ thread->write_fds_copy = NULL;
+
+ thread->read_fds = isc_mem_get(thread->manager->mctx, thread->fd_bufsize);
+ if (thread->read_fds != NULL)
+ thread->read_fds_copy = isc_mem_get(thread->manager->mctx,
+ thread->fd_bufsize);
+ if (thread->read_fds_copy != NULL)
+ thread->write_fds = isc_mem_get(thread->manager->mctx,
+ thread->fd_bufsize);
+ if (thread->write_fds != NULL) {
+ thread->write_fds_copy = isc_mem_get(thread->manager->mctx,
+ thread->fd_bufsize);
+ }
+ if (thread->write_fds_copy == NULL) {
+ if (thread->write_fds != NULL) {
+ isc_mem_put(thread->manager->mctx, thread->write_fds,
+ thread->fd_bufsize);
}
- if (manager->read_fds_copy != NULL) {
- isc_mem_put(mctx, manager->read_fds_copy,
- manager->fd_bufsize);
+ if (thread->read_fds_copy != NULL) {
+ isc_mem_put(thread->manager->mctx,
+ thread->read_fds_copy,
+ thread->fd_bufsize);
}
- if (manager->read_fds != NULL) {
- isc_mem_put(mctx, manager->read_fds,
- manager->fd_bufsize);
+ if (thread->read_fds != NULL) {
+ isc_mem_put(thread->manager->mctx, thread->read_fds,
+ thread->fd_bufsize);
}
return (ISC_R_NOMEMORY);
}
- memset(manager->read_fds, 0, manager->fd_bufsize);
- memset(manager->write_fds, 0, manager->fd_bufsize);
+ memset(thread->read_fds, 0, thread->fd_bufsize);
+ memset(thread->write_fds, 0, thread->fd_bufsize);
- (void)watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ);
- manager->maxfd = manager->pipe_fds[0];
-#endif /* USE_KQUEUE */
+ (void)watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ);
+ thread->maxfd = thread->pipe_fds[0];
return (ISC_R_SUCCESS);
+#endif /* USE_KQUEUE */
}
static void
-cleanup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) {
+cleanup_thread(isc_mem_t *mctx, isc__socketthread_t *thread) {
isc_result_t result;
+ int i;
- result = unwatch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ);
+ result = unwatch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ);
if (result != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"epoll_ctl(DEL) %s",
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"));
}
-
#ifdef USE_KQUEUE
- close(manager->kqueue_fd);
- isc_mem_put(mctx, manager->events,
- sizeof(struct kevent) * manager->nevents);
+ close(thread->kqueue_fd);
+ isc_mem_put(mctx, thread->events,
+ sizeof(struct kevent) * thread->nevents);
#elif defined(USE_EPOLL)
- close(manager->epoll_fd);
- isc_mem_put(mctx, manager->events,
- sizeof(struct epoll_event) * manager->nevents);
+ close(thread->epoll_fd);
+
+ isc_mem_put(mctx, thread->events,
+ sizeof(struct epoll_event) * thread->nevents);
#elif defined(USE_DEVPOLL)
- close(manager->devpoll_fd);
- isc_mem_put(mctx, manager->events,
- sizeof(struct pollfd) * manager->nevents);
- isc_mem_put(mctx, manager->fdpollinfo,
- sizeof(pollinfo_t) * manager->maxsocks);
+ close(thread->devpoll_fd);
+ isc_mem_put(mctx, thread->events,
+ sizeof(struct pollfd) * thread->nevents);
+ isc_mem_put(mctx, thread->fdpollinfo,
+ sizeof(pollinfo_t) * thread->maxsocks);
#elif defined(USE_SELECT)
- if (manager->read_fds != NULL)
- isc_mem_put(mctx, manager->read_fds, manager->fd_bufsize);
- if (manager->read_fds_copy != NULL)
- isc_mem_put(mctx, manager->read_fds_copy, manager->fd_bufsize);
- if (manager->write_fds != NULL)
- isc_mem_put(mctx, manager->write_fds, manager->fd_bufsize);
- if (manager->write_fds_copy != NULL)
- isc_mem_put(mctx, manager->write_fds_copy, manager->fd_bufsize);
+ if (thread->read_fds != NULL)
+ isc_mem_put(mctx, thread->read_fds, thread->fd_bufsize);
+ if (thread->read_fds_copy != NULL)
+ isc_mem_put(mctx, thread->read_fds_copy, thread->fd_bufsize);
+ if (thread->write_fds != NULL)
+ isc_mem_put(mctx, thread->write_fds, thread->fd_bufsize);
+ if (thread->write_fds_copy != NULL)
+ isc_mem_put(mctx, thread->write_fds_copy, thread->fd_bufsize);
#endif /* USE_KQUEUE */
+ for (i = 0; i < (int)thread->manager->maxsocks; i++)
+ if (thread->fdstate[i] == CLOSE_PENDING) /* no need to lock */
+ (void)close(i);
+
+#if defined(USE_EPOLL)
+ isc_mem_put(thread->manager->mctx, thread->epoll_events,
+ thread->manager->maxsocks * sizeof(uint32_t));
+#endif
+ isc_mem_put(thread->manager->mctx, thread->fds,
+ thread->manager->maxsocks * sizeof(isc__socket_t *));
+ isc_mem_put(thread->manager->mctx, thread->fdstate,
+ thread->manager->maxsocks * sizeof(int));
+
+
+ if (thread->fdlock != NULL) {
+ for (i = 0; i < FDLOCK_COUNT; i++)
+ DESTROYLOCK(&thread->fdlock[i]);
+ isc_mem_put(thread->manager->mctx, thread->fdlock,
+ FDLOCK_COUNT * sizeof(isc_mutex_t));
+ }
+
}
isc_result_t
isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) {
- return (isc_socketmgr_create2(mctx, managerp, 0));
+ return (isc_socketmgr_create2(mctx, managerp, 0, 1));
}
isc_result_t
isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp,
- unsigned int maxsocks)
+ unsigned int maxsocks, int nthreads)
{
int i;
isc__socketmgr_t *manager;
- char strbuf[ISC_STRERRORSIZE];
isc_result_t result;
REQUIRE(managerp != NULL && *managerp == NULL);
maxsocks = ISC_SOCKET_MAXSOCKETS;
manager = isc_mem_get(mctx, sizeof(*manager));
- if (manager == NULL)
+ if (manager == NULL) {
return (ISC_R_NOMEMORY);
+ }
/* zero-clear so that necessary cleanup on failure will be easy */
memset(manager, 0, sizeof(*manager));
manager->maxsocks = maxsocks;
manager->reserved = 0;
manager->maxudp = 0;
- manager->fds = isc_mem_get(mctx,
- manager->maxsocks * sizeof(isc__socket_t *));
- if (manager->fds == NULL) {
- result = ISC_R_NOMEMORY;
- goto free_manager;
- }
- manager->fdstate = isc_mem_get(mctx, manager->maxsocks * sizeof(int));
- if (manager->fdstate == NULL) {
- result = ISC_R_NOMEMORY;
- goto free_manager;
- }
-#if defined(USE_EPOLL)
- manager->epoll_events = isc_mem_get(mctx, (manager->maxsocks *
- sizeof(uint32_t)));
- if (manager->epoll_events == NULL) {
- result = ISC_R_NOMEMORY;
- goto free_manager;
- }
- memset(manager->epoll_events, 0, manager->maxsocks * sizeof(uint32_t));
-#endif
+ manager->nthreads = nthreads;
manager->stats = NULL;
manager->common.magic = ISCAPI_SOCKETMGR_MAGIC;
manager->common.impmagic = SOCKET_MANAGER_MAGIC;
manager->mctx = NULL;
- memset(manager->fds, 0, manager->maxsocks * sizeof(isc_socket_t *));
ISC_LIST_INIT(manager->socklist);
result = isc_mutex_init(&manager->lock);
- if (result != ISC_R_SUCCESS)
- goto free_manager;
- manager->fdlock = isc_mem_get(mctx, FDLOCK_COUNT * sizeof(isc_mutex_t));
- if (manager->fdlock == NULL) {
- result = ISC_R_NOMEMORY;
- goto cleanup_lock;
- }
- for (i = 0; i < FDLOCK_COUNT; i++) {
- result = isc_mutex_init(&manager->fdlock[i]);
- if (result != ISC_R_SUCCESS) {
- while (--i >= 0)
- DESTROYLOCK(&manager->fdlock[i]);
- isc_mem_put(mctx, manager->fdlock,
- FDLOCK_COUNT * sizeof(isc_mutex_t));
- manager->fdlock = NULL;
- goto cleanup_lock;
- }
+ if (result != ISC_R_SUCCESS) {
+ return (result);
}
-
if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) {
UNEXPECTED_ERROR(__FILE__, __LINE__,
"isc_condition_init() %s",
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
ISC_MSG_FAILED, "failed"));
- result = ISC_R_UNEXPECTED;
- goto cleanup_lock;
- }
-
- /*
- * Create the special fds that will be used to wake up the
- * select/poll loop when something internal needs to be done.
- */
- if (pipe(manager->pipe_fds) != 0) {
- strerror_r(errno, strbuf, sizeof(strbuf));
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "pipe() %s: %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"),
- strbuf);
- result = ISC_R_UNEXPECTED;
- goto cleanup_condition;
+ return (ISC_R_UNEXPECTED);
}
- RUNTIME_CHECK(make_nonblock(manager->pipe_fds[0]) == ISC_R_SUCCESS);
-
- /*
- * Set up initial state for the select loop
- */
- result = setup_watcher(mctx, manager);
- if (result != ISC_R_SUCCESS)
- goto cleanup;
-
- memset(manager->fdstate, 0, manager->maxsocks * sizeof(int));
/*
* Start up the select/poll thread.
*/
- if (isc_thread_create(watcher, manager, &manager->watcher) !=
- ISC_R_SUCCESS) {
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "isc_thread_create() %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"));
- cleanup_watcher(mctx, manager);
- result = ISC_R_UNEXPECTED;
- goto cleanup;
- }
- isc_thread_setname(manager->watcher, "isc-socket");
+ manager->threads = isc_mem_get(mctx, sizeof(isc__socketthread_t) * manager->nthreads);
+ RUNTIME_CHECK(manager->threads != NULL);
isc_mem_attach(mctx, &manager->mctx);
+ for (i=0; i < manager->nthreads; i++) {
+ manager->threads[i].manager = manager;
+ manager->threads[i].threadid = i;
+ setup_thread(&manager->threads[i]);
+ if (isc_thread_create(netthread, &manager->threads[i], &manager->threads[i].thread) !=
+ ISC_R_SUCCESS) {
+ UNEXPECTED_ERROR(__FILE__, __LINE__,
+ "isc_thread_create() %s",
+ isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
+ ISC_MSG_FAILED, "failed"));
+ return (ISC_R_UNEXPECTED);
+ }
+ char tname[1024];
+ sprintf(tname, "isc-socket-%d", i);
+ isc_thread_setname(manager->threads[i].thread, tname);
+ }
+
*managerp = (isc_socketmgr_t *)manager;
return (ISC_R_SUCCESS);
-cleanup:
- (void)close(manager->pipe_fds[0]);
- (void)close(manager->pipe_fds[1]);
-
-cleanup_condition:
- (void)isc_condition_destroy(&manager->shutdown_ok);
-
-
-cleanup_lock:
- if (manager->fdlock != NULL) {
- for (i = 0; i < FDLOCK_COUNT; i++)
- DESTROYLOCK(&manager->fdlock[i]);
- }
- DESTROYLOCK(&manager->lock);
-
-free_manager:
- if (manager->fdlock != NULL) {
- isc_mem_put(mctx, manager->fdlock,
- FDLOCK_COUNT * sizeof(isc_mutex_t));
- }
-#if defined(USE_EPOLL)
- if (manager->epoll_events != NULL) {
- isc_mem_put(mctx, manager->epoll_events,
- manager->maxsocks * sizeof(uint32_t));
- }
-#endif
- if (manager->fdstate != NULL) {
- isc_mem_put(mctx, manager->fdstate,
- manager->maxsocks * sizeof(int));
- }
- if (manager->fds != NULL) {
- isc_mem_put(mctx, manager->fds,
- manager->maxsocks * sizeof(isc_socket_t *));
- }
- isc_mem_put(mctx, manager, sizeof(*manager));
-
- return (result);
}
isc_result_t
* half of the pipe, which will send EOF to the read half.
* This is currently a no-op in the non-threaded case.
*/
- select_poke(manager, 0, SELECT_POKE_SHUTDOWN);
+ for (i = 0; i < manager->nthreads; i++) {
+ select_poke(manager, i, 0, SELECT_POKE_SHUTDOWN);
+ }
/*
* Wait for thread to exit.
*/
- if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS)
- UNEXPECTED_ERROR(__FILE__, __LINE__,
- "isc_thread_join() %s",
- isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
- ISC_MSG_FAILED, "failed"));
-
+ for (i = 0; i < manager->nthreads; i++) {
+ if (isc_thread_join(manager->threads[i].thread, NULL) != ISC_R_SUCCESS)
+ UNEXPECTED_ERROR(__FILE__, __LINE__,
+ "isc_thread_join() %s",
+ isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
+ ISC_MSG_FAILED, "failed"));
+ cleanup_thread(manager->mctx, &manager->threads[i]);
+ }
/*
* Clean up.
*/
- cleanup_watcher(manager->mctx, manager);
-
- (void)close(manager->pipe_fds[0]);
- (void)close(manager->pipe_fds[1]);
+ isc_mem_put(manager->mctx, manager->threads, sizeof(isc__socketthread_t) * manager->nthreads);
(void)isc_condition_destroy(&manager->shutdown_ok);
- for (i = 0; i < (int)manager->maxsocks; i++)
- if (manager->fdstate[i] == CLOSE_PENDING) /* no need to lock */
- (void)close(i);
-
-#if defined(USE_EPOLL)
- isc_mem_put(manager->mctx, manager->epoll_events,
- manager->maxsocks * sizeof(uint32_t));
-#endif
- isc_mem_put(manager->mctx, manager->fds,
- manager->maxsocks * sizeof(isc__socket_t *));
- isc_mem_put(manager->mctx, manager->fdstate,
- manager->maxsocks * sizeof(int));
if (manager->stats != NULL)
isc_stats_detach(&manager->stats);
-
- if (manager->fdlock != NULL) {
- for (i = 0; i < FDLOCK_COUNT; i++)
- DESTROYLOCK(&manager->fdlock[i]);
- isc_mem_put(manager->mctx, manager->fdlock,
- FDLOCK_COUNT * sizeof(isc_mutex_t));
- }
DESTROYLOCK(&manager->lock);
manager->common.magic = 0;
manager->common.impmagic = 0;
* watched, poke the watcher to start paying attention to it.
*/
if (ISC_LIST_EMPTY(sock->recv_list))
- select_poke(sock->manager, sock->fd, SELECT_POKE_READ);
+ select_poke(sock->manager, sock->threadid, sock->fd,
+ SELECT_POKE_READ);
ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link);
socket_log(sock, NULL, EVENT, NULL, 0, 0,
* paying attention to it.
*/
if (ISC_LIST_EMPTY(sock->send_list))
- select_poke(sock->manager, sock->fd,
+ select_poke(sock->manager, sock->threadid,
+ sock->fd,
SELECT_POKE_WRITE);
ISC_LIST_ENQUEUE(sock->send_list, dev, ev_link);
ISC_LIST_ENQUEUE(sock->accept_list, dev, ev_link);
if (do_poke)
- select_poke(manager, sock->fd, SELECT_POKE_ACCEPT);
+ select_poke(manager, sock->threadid, sock->fd,
+ SELECT_POKE_ACCEPT);
UNLOCK(&sock->lock);
return (ISC_R_SUCCESS);
* bit of time waking it up now or later won't matter all that much.
*/
if (ISC_LIST_EMPTY(sock->connect_list) && !sock->connecting)
- select_poke(manager, sock->fd, SELECT_POKE_CONNECT);
+ select_poke(manager, sock->threadid, sock->fd,
+ SELECT_POKE_CONNECT);
sock->connecting = 1;
*/
if (SOFT_ERROR(errno) || errno == EINPROGRESS) {
sock->connecting = 1;
- watch_fd(sock->manager, sock->fd,
- SELECT_POKE_CONNECT);
+ watch_fd(&sock->manager->threads[sock->threadid], sock->fd,
+ SELECT_POKE_CONNECT);
UNLOCK(&sock->lock);
return;