From: Witold Krecicki Date: Wed, 3 Oct 2018 20:46:23 +0000 (+0200) Subject: Use multiple network event loop threads with separate data structures. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=dfab03fb411374030549a250000f7fe1cd135b45;p=thirdparty%2Fbind9.git Use multiple network event loop threads with separate data structures. --- diff --git a/bin/named/main.c b/bin/named/main.c index 1ff7e139e3e..96a5201914d 100644 --- a/bin/named/main.c +++ b/bin/named/main.c @@ -824,7 +824,7 @@ create_managers(void) { } result = isc_socketmgr_create2(named_g_mctx, &named_g_socketmgr, - maxsocks); + maxsocks, named_g_udpdisp); if (result != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_socketmgr_create() failed: %s", diff --git a/lib/isc/include/isc/socket.h b/lib/isc/include/isc/socket.h index 7eea7298557..d9738af4f64 100644 --- a/lib/isc/include/isc/socket.h +++ b/lib/isc/include/isc/socket.h @@ -813,7 +813,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp); isc_result_t isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, - unsigned int maxsocks); + unsigned int maxsocks, int nthreads); /*%< * Create a socket manager. If "maxsocks" is non-zero, it specifies the * maximum number of sockets that the created manager should handle. diff --git a/lib/isc/unix/socket.c b/lib/isc/unix/socket.c index f13a5086fff..400d87244d1 100644 --- a/lib/isc/unix/socket.c +++ b/lib/isc/unix/socket.c @@ -324,6 +324,7 @@ typedef isc_event_t intev_t; typedef struct isc__socket isc__socket_t; typedef struct isc__socketmgr isc__socketmgr_t; +typedef struct isc__socketthread isc__socketthread_t; #define NEWCONNSOCK(ev) ((isc__socket_t *)(ev)->newsocket) @@ -340,6 +341,7 @@ struct isc__socket { unsigned int references; int fd; int pf; + int threadid; char name[16]; void * tag; @@ -373,8 +375,26 @@ struct isc__socketmgr { isc_socketmgr_t common; isc_mem_t *mctx; isc_mutex_t lock; - isc_mutex_t *fdlock; isc_stats_t *stats; + int nthreads; + isc__socketthread_t *threads; + unsigned int maxsocks; + /* Locked by manager lock. */ + ISC_LIST(isc__socket_t) socklist; + int reserved; /* unlocked */ + isc_condition_t shutdown_ok; + int maxudp; +}; + +struct isc__socketthread { + isc__socketmgr_t * manager; + int threadid; + isc_thread_t thread; + int pipe_fds[2]; + isc_mutex_t *fdlock; + /* Locked by fdlock. */ + isc__socket_t **fds; + int *fdstate; #ifdef USE_KQUEUE int kqueue_fd; int nevents; @@ -384,6 +404,7 @@ struct isc__socketmgr { int epoll_fd; int nevents; struct epoll_event *events; + uint32_t *epoll_events; #endif /* USE_EPOLL */ #ifdef USE_DEVPOLL int devpoll_fd; @@ -391,38 +412,19 @@ struct isc__socketmgr { unsigned int calls; int nevents; struct pollfd *events; + pollinfo_t *fdpollinfo; #endif /* USE_DEVPOLL */ #ifdef USE_SELECT int fd_bufsize; -#endif /* USE_SELECT */ - unsigned int maxsocks; - int pipe_fds[2]; - - /* Locked by fdlock. */ - isc__socket_t **fds; - int *fdstate; -#if defined(USE_EPOLL) - uint32_t *epoll_events; -#endif -#ifdef USE_DEVPOLL - pollinfo_t *fdpollinfo; -#endif - - /* Locked by manager lock. */ - ISC_LIST(isc__socket_t) socklist; -#ifdef USE_SELECT fd_set *read_fds; fd_set *read_fds_copy; fd_set *write_fds; fd_set *write_fds_copy; int maxfd; #endif /* USE_SELECT */ - int reserved; /* unlocked */ - isc_thread_t watcher; - isc_condition_t shutdown_ok; - int maxudp; }; + #define CLOSED 0 /* this one must be zero */ #define MANAGED 1 #define CLOSE_PENDING 2 @@ -457,7 +459,7 @@ static void build_msghdr_send(isc__socket_t *, char *, isc_socketevent_t *, struct msghdr *, struct iovec *, size_t *); static void build_msghdr_recv(isc__socket_t *, char *, isc_socketevent_t *, struct msghdr *, struct iovec *, size_t *); -static bool process_ctlfd(isc__socketmgr_t *manager); +static bool process_ctlfd(isc__socketthread_t *thread); static void setdscp(isc__socket_t *sock, isc_dscp_t dscp); #define SELECT_POKE_SHUTDOWN (-1) @@ -588,6 +590,29 @@ manager_log(isc__socketmgr_t *sockmgr, "sockmgr %p: %s", sockmgr, msgbuf); } +static void +thread_log(isc__socketthread_t *thread, + isc_logcategory_t *category, isc_logmodule_t *module, int level, + const char *fmt, ...) ISC_FORMAT_PRINTF(5, 6); +static void +thread_log(isc__socketthread_t *thread, + isc_logcategory_t *category, isc_logmodule_t *module, int level, + const char *fmt, ...) +{ + char msgbuf[2048]; + va_list ap; + + if (! isc_log_wouldlog(isc_lctx, level)) + return; + + va_start(ap, fmt); + vsnprintf(msgbuf, sizeof(msgbuf), fmt, ap); + va_end(ap); + + isc_log_write(isc_lctx, category, module, level, + "sockmgr %p thread %d: %s", thread->manager, thread->threadid, msgbuf); +} + static void socket_log(isc__socket_t *sock, const isc_sockaddr_t *address, isc_logcategory_t *category, isc_logmodule_t *module, int level, @@ -645,7 +670,7 @@ dec_stats(isc_stats_t *stats, isc_statscounter_t counterid) { } static inline isc_result_t -watch_fd(isc__socketmgr_t *manager, int fd, int msg) { +watch_fd(isc__socketthread_t *thread, int fd, int msg) { isc_result_t result = ISC_R_SUCCESS; #ifdef USE_KQUEUE @@ -658,7 +683,7 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) { evchange.filter = EVFILT_WRITE; evchange.flags = EV_ADD; evchange.ident = fd; - if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) + if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) result = isc__errno2result(errno); return (result); @@ -668,18 +693,18 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) { int ret; int op; - oldevents = manager->epoll_events[fd]; + oldevents = thread->epoll_events[fd]; if (msg == SELECT_POKE_READ) - manager->epoll_events[fd] |= EPOLLIN; + thread->epoll_events[fd] |= EPOLLIN; else - manager->epoll_events[fd] |= EPOLLOUT; + thread->epoll_events[fd] |= EPOLLOUT; - event.events = manager->epoll_events[fd]; + event.events = thread->epoll_events[fd]; memset(&event.data, 0, sizeof(event.data)); event.data.fd = fd; op = (oldevents == 0U) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - ret = epoll_ctl(manager->epoll_fd, op, fd, &event); + ret = epoll_ctl(thread->epoll_fd, op, fd, &event); if (ret == -1) { if (errno == EEXIST) UNEXPECTED_ERROR(__FILE__, __LINE__, @@ -691,6 +716,7 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) { return (result); #elif defined(USE_DEVPOLL) struct pollfd pfd; + INSIST(threadid == 0); int lockid = FDLOCK_ID(fd); memset(&pfd, 0, sizeof(pfd)); @@ -700,32 +726,32 @@ watch_fd(isc__socketmgr_t *manager, int fd, int msg) { pfd.events = POLLOUT; pfd.fd = fd; pfd.revents = 0; - LOCK(&manager->fdlock[lockid]); - if (write(manager->devpoll_fd, &pfd, sizeof(pfd)) == -1) + LOCK(&thread->fdlock[lockid]); + if (write(thread->devpoll_fd, &pfd, sizeof(pfd)) == -1) result = isc__errno2result(errno); else { if (msg == SELECT_POKE_READ) - manager->fdpollinfo[fd].want_read = 1; + thread->fdpollinfo[fd].want_read = 1; else - manager->fdpollinfo[fd].want_write = 1; + thread->fdpollinfo[fd].want_write = 1; } - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); return (result); #elif defined(USE_SELECT) - LOCK(&manager->lock); + LOCK(&thread->manager->lock); if (msg == SELECT_POKE_READ) - FD_SET(fd, manager->read_fds); + FD_SET(fd, thread->read_fds); if (msg == SELECT_POKE_WRITE) - FD_SET(fd, manager->write_fds); - UNLOCK(&manager->lock); + FD_SET(fd, thread->write_fds); + UNLOCK(&thread->manager->lock); return (result); #endif } static inline isc_result_t -unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { +unwatch_fd(isc__socketthread_t *thread, int fd, int msg) { isc_result_t result = ISC_R_SUCCESS; #ifdef USE_KQUEUE @@ -738,7 +764,7 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { evchange.filter = EVFILT_WRITE; evchange.flags = EV_DELETE; evchange.ident = fd; - if (kevent(manager->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) + if (kevent(thread->kqueue_fd, &evchange, 1, NULL, 0, NULL) != 0) result = isc__errno2result(errno); return (result); @@ -747,17 +773,18 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { int ret; int op; - if (msg == SELECT_POKE_READ) - manager->epoll_events[fd] &= ~(EPOLLIN); - else - manager->epoll_events[fd] &= ~(EPOLLOUT); + if (msg == SELECT_POKE_READ) { + thread->epoll_events[fd] &= ~(EPOLLIN); + } else { + thread->epoll_events[fd] &= ~(EPOLLOUT); + } - event.events = manager->epoll_events[fd]; + event.events = thread->epoll_events[fd]; memset(&event.data, 0, sizeof(event.data)); event.data.fd = fd; op = (event.events == 0U) ? EPOLL_CTL_DEL : EPOLL_CTL_MOD; - ret = epoll_ctl(manager->epoll_fd, op, fd, &event); + ret = epoll_ctl(thread->epoll_fd, op, fd, &event); if (ret == -1 && errno != ENOENT) { char strbuf[ISC_STRERRORSIZE]; strerror_r(errno, strbuf, sizeof(strbuf)); @@ -780,45 +807,45 @@ unwatch_fd(isc__socketmgr_t *manager, int fd, int msg) { * only provides a way of canceling per FD, we may need to re-poll the * socket for the other operation. */ - LOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); if (msg == SELECT_POKE_READ && - manager->fdpollinfo[fd].want_write == 1) { + thread->fdpollinfo[fd].want_write == 1) { pfds[1].events = POLLOUT; pfds[1].fd = fd; writelen += sizeof(pfds[1]); } if (msg == SELECT_POKE_WRITE && - manager->fdpollinfo[fd].want_read == 1) { + thread->fdpollinfo[fd].want_read == 1) { pfds[1].events = POLLIN; pfds[1].fd = fd; writelen += sizeof(pfds[1]); } - if (write(manager->devpoll_fd, pfds, writelen) == -1) + if (write(thread->devpoll_fd, pfds, writelen) == -1) result = isc__errno2result(errno); else { if (msg == SELECT_POKE_READ) - manager->fdpollinfo[fd].want_read = 0; + thread->fdpollinfo[fd].want_read = 0; else - manager->fdpollinfo[fd].want_write = 0; + thread->fdpollinfo[fd].want_write = 0; } - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); return (result); #elif defined(USE_SELECT) - LOCK(&manager->lock); + LOCK(&thread->manager->lock); if (msg == SELECT_POKE_READ) - FD_CLR(fd, manager->read_fds); + FD_CLR(fd, thread->read_fds); else if (msg == SELECT_POKE_WRITE) - FD_CLR(fd, manager->write_fds); - UNLOCK(&manager->lock); + FD_CLR(fd, thread->write_fds); + UNLOCK(&thread->manager->lock); return (result); #endif } static void -wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { +wakeup_socket(isc__socketthread_t *thread, int fd, int msg) { isc_result_t result; int lockid = FDLOCK_ID(fd); @@ -828,21 +855,21 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { * or writes. */ - INSIST(fd >= 0 && fd < (int)manager->maxsocks); + INSIST(fd >= 0 && fd < (int)thread->manager->maxsocks); if (msg == SELECT_POKE_CLOSE) { /* No one should be updating fdstate, so no need to lock it */ - INSIST(manager->fdstate[fd] == CLOSE_PENDING); - manager->fdstate[fd] = CLOSED; - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); + INSIST(thread->fdstate[fd] == CLOSE_PENDING); + thread->fdstate[fd] = CLOSED; + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); (void)close(fd); return; } - LOCK(&manager->fdlock[lockid]); - if (manager->fdstate[fd] == CLOSE_PENDING) { - UNLOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); + if (thread->fdstate[fd] == CLOSE_PENDING) { + UNLOCK(&thread->fdlock[lockid]); /* * We accept (and ignore) any error from unwatch_fd() as we are @@ -852,20 +879,20 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { * fdlock; otherwise it could cause deadlock due to a lock order * reversal. */ - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); return; } - if (manager->fdstate[fd] != MANAGED) { - UNLOCK(&manager->fdlock[lockid]); + if (thread->fdstate[fd] != MANAGED) { + UNLOCK(&thread->fdlock[lockid]); return; } - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); /* * Set requested bit. */ - result = watch_fd(manager, fd, msg); + result = watch_fd(thread, fd, msg); if (result != ISC_R_SUCCESS) { /* * XXXJT: what should we do? Ignoring the failure of watching @@ -885,7 +912,7 @@ wakeup_socket(isc__socketmgr_t *manager, int fd, int msg) { * will not get partial writes. */ static void -select_poke(isc__socketmgr_t *mgr, int fd, int msg) { +select_poke(isc__socketmgr_t *mgr, int threadid, int fd, int msg) { int cc; int buf[2]; char strbuf[ISC_STRERRORSIZE]; @@ -894,7 +921,7 @@ select_poke(isc__socketmgr_t *mgr, int fd, int msg) { buf[1] = msg; do { - cc = write(mgr->pipe_fds[1], buf, sizeof(buf)); + cc = write(mgr->threads[threadid].pipe_fds[1], buf, sizeof(buf)); #ifdef ENOSR /* * Treat ENOSR as EAGAIN but loop slowly as it is @@ -924,12 +951,12 @@ select_poke(isc__socketmgr_t *mgr, int fd, int msg) { * Read a message on the internal fd. */ static void -select_readmsg(isc__socketmgr_t *mgr, int *fd, int *msg) { +select_readmsg(isc__socketthread_t *thread, int *fd, int *msg) { int buf[2]; int cc; char strbuf[ISC_STRERRORSIZE]; - cc = read(mgr->pipe_fds[0], buf, sizeof(buf)); + cc = read(thread->pipe_fds[0], buf, sizeof(buf)); if (cc < 0) { *msg = SELECT_POKE_NOTHING; *fd = -1; /* Silence compiler. */ @@ -1752,16 +1779,16 @@ doio_send(isc__socket_t *sock, isc_socketevent_t *dev) { static void socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) { int lockid = FDLOCK_ID(fd); - + isc__socketthread_t *thread = &manager->threads[sock->threadid]; /* * No one has this socket open, so the watcher doesn't have to be * poked, and the socket doesn't have to be locked. */ - LOCK(&manager->fdlock[lockid]); - manager->fds[fd] = NULL; - manager->fdstate[fd] = CLOSE_PENDING; - UNLOCK(&manager->fdlock[lockid]); - select_poke(manager, fd, SELECT_POKE_CLOSE); + LOCK(&thread->fdlock[lockid]); + thread->fds[fd] = NULL; + thread->fdstate[fd] = CLOSE_PENDING; + UNLOCK(&thread->fdlock[lockid]); + select_poke(manager, sock->threadid, fd, SELECT_POKE_CLOSE); inc_stats(manager->stats, sock->statsindex[STATID_CLOSE]); if (sock->active == 1) { @@ -1775,23 +1802,23 @@ socketclose(isc__socketmgr_t *manager, isc__socket_t *sock, int fd) { */ #ifdef USE_SELECT LOCK(&manager->lock); - if (manager->maxfd == fd) { + if (thread->maxfd == fd) { int i; - manager->maxfd = 0; + thread->maxfd = 0; for (i = fd - 1; i >= 0; i--) { lockid = FDLOCK_ID(i); - LOCK(&manager->fdlock[lockid]); - if (manager->fdstate[i] == MANAGED) { - manager->maxfd = i; - UNLOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); + if (thread->fdstate[i] == MANAGED) { + thread->maxfd = i; + UNLOCK(&thread->fdlock[lockid]); break; } - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); } - if (manager->maxfd < manager->pipe_fds[0]) - manager->maxfd = manager->pipe_fds[0]; + if (thread->maxfd < thread->pipe_fds[0]) + thread->maxfd = thread->pipe_fds[0]; } UNLOCK(&manager->lock); @@ -2506,6 +2533,7 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type, { isc__socket_t *sock = NULL; isc__socketmgr_t *manager = (isc__socketmgr_t *)manager0; + isc__socketthread_t *thread; isc_result_t result; int lockid; @@ -2545,7 +2573,9 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type, return (result); } + sock->threadid = sock->fd % manager->nthreads; // TODO? sock->references = 1; + thread = &manager->threads[sock->threadid]; *socketp = (isc_socket_t *)sock; /* @@ -2554,23 +2584,24 @@ socket_create(isc_socketmgr_t *manager0, int pf, isc_sockettype_t type, */ lockid = FDLOCK_ID(sock->fd); - LOCK(&manager->fdlock[lockid]); - manager->fds[sock->fd] = sock; - manager->fdstate[sock->fd] = MANAGED; + LOCK(&thread->fdlock[lockid]); + thread->fds[sock->fd] = sock; + thread->fdstate[sock->fd] = MANAGED; #if defined(USE_EPOLL) - manager->epoll_events[sock->fd] = 0; + thread->epoll_events[sock->fd] = 0; #endif #ifdef USE_DEVPOLL - INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 && - sock->manager->fdpollinfo[sock->fd].want_write == 0); + INSIST(thread->fdpollinfo[sock->fd].want_read == 0 && + thread->fdpollinfo[sock->fd].want_write == 0); #endif - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); LOCK(&manager->lock); ISC_LIST_APPEND(manager->socklist, sock, link); #ifdef USE_SELECT - if (manager->maxfd < sock->fd) - manager->maxfd = sock->fd; + if (thread->maxfd < sock->fd) { + thread->maxfd = sock->fd; + } #endif UNLOCK(&manager->lock); @@ -2613,6 +2644,7 @@ isc_result_t isc_socket_open(isc_socket_t *sock0) { isc_result_t result; isc__socket_t *sock = (isc__socket_t *)sock0; + isc__socketthread_t *thread; REQUIRE(VALID_SOCKET(sock)); @@ -2624,30 +2656,32 @@ isc_socket_open(isc_socket_t *sock0) { * this socket. */ REQUIRE(sock->fd == -1); + REQUIRE(sock->threadid == -1); result = opensocket(sock->manager, sock, NULL); - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { sock->fd = -1; - - if (result == ISC_R_SUCCESS) { + } else { + sock->threadid = sock->fd % sock->manager->nthreads; // TODO? + thread = &sock->manager->threads[sock->threadid]; int lockid = FDLOCK_ID(sock->fd); - LOCK(&sock->manager->fdlock[lockid]); - sock->manager->fds[sock->fd] = sock; - sock->manager->fdstate[sock->fd] = MANAGED; + LOCK(&thread->fdlock[lockid]); + thread->fds[sock->fd] = sock; + thread->fdstate[sock->fd] = MANAGED; #if defined(USE_EPOLL) - sock->manager->epoll_events[sock->fd] = 0; + thread->epoll_events[sock->fd] = 0; #endif #ifdef USE_DEVPOLL - INSIST(sock->manager->fdpollinfo[sock->fd].want_read == 0 && - sock->manager->fdpollinfo[sock->fd].want_write == 0); + INSIST(thread->fdpollinfo[sock->fd].want_read == 0 && + thread->fdpollinfo[sock->fd].want_write == 0); #endif - UNLOCK(&sock->manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); #ifdef USE_SELECT LOCK(&sock->manager->lock); - if (sock->manager->maxfd < sock->fd) - sock->manager->maxfd = sock->fd; + if (thread->maxfd < sock->fd) + thread->maxfd = sock->fd; UNLOCK(&sock->manager->lock); #endif } @@ -2824,6 +2858,7 @@ send_connectdone_event(isc__socket_t *sock, isc_socket_connev_t **dev) { static void internal_accept(isc__socket_t *sock) { isc__socketmgr_t *manager; + isc__socketthread_t *thread, *nthread; isc_socket_newconnev_t *dev; isc_task_t *task; socklen_t addrlen; @@ -2841,6 +2876,7 @@ internal_accept(isc__socket_t *sock) { manager = sock->manager; INSIST(VALID_MANAGER(manager)); + thread = &manager->threads[sock->threadid]; INSIST(sock->listener); @@ -2976,7 +3012,8 @@ internal_accept(isc__socket_t *sock) { * Poke watcher if there are more pending accepts. */ if (!ISC_LIST_EMPTY(sock->accept_list)) - watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT); + watch_fd(thread, sock->fd, + SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); @@ -2995,8 +3032,10 @@ internal_accept(isc__socket_t *sock) { int lockid = FDLOCK_ID(fd); NEWCONNSOCK(dev)->fd = fd; + NEWCONNSOCK(dev)->threadid = fd % manager->nthreads; // TODO NEWCONNSOCK(dev)->bound = 1; NEWCONNSOCK(dev)->connected = 1; + nthread = &manager->threads[NEWCONNSOCK(dev)->threadid]; /* * Use minimum mtu if possible. @@ -3020,19 +3059,19 @@ internal_accept(isc__socket_t *sock) { NEWCONNSOCK(dev)->active = 1; } - LOCK(&manager->fdlock[lockid]); - manager->fds[fd] = NEWCONNSOCK(dev); - manager->fdstate[fd] = MANAGED; + LOCK(&nthread->fdlock[lockid]); + thread->fds[fd] = NEWCONNSOCK(dev); + thread->fdstate[fd] = MANAGED; #if defined(USE_EPOLL) - manager->epoll_events[fd] = 0; + thread->epoll_events[fd] = 0; #endif - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&nthread->fdlock[lockid]); LOCK(&manager->lock); #ifdef USE_SELECT - if (manager->maxfd < fd) - manager->maxfd = fd; + if (nthread->maxfd < fd) + nthread->maxfd = fd; #endif socket_log(sock, &NEWCONNSOCK(dev)->peer_address, CREATION, @@ -3062,7 +3101,7 @@ internal_accept(isc__socket_t *sock) { return; soft_error: - watch_fd(sock->manager, sock->fd, SELECT_POKE_ACCEPT); + watch_fd(thread, sock->fd, SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); inc_stats(manager->stats, sock->statsindex[STATID_ACCEPTFAIL]); @@ -3119,7 +3158,8 @@ internal_recv(isc__socket_t *sock) { poke: if (!ISC_LIST_EMPTY(sock->recv_list)) - watch_fd(sock->manager, sock->fd, SELECT_POKE_READ); + watch_fd(&sock->manager->threads[sock->threadid], sock->fd, + SELECT_POKE_READ); UNLOCK(&sock->lock); } @@ -3160,7 +3200,7 @@ internal_send(isc__socket_t *sock) { poke: if (!ISC_LIST_EMPTY(sock->send_list)) - watch_fd(sock->manager, sock->fd, SELECT_POKE_WRITE); + watch_fd(&sock->manager->threads[sock->threadid], sock->fd, SELECT_POKE_WRITE); UNLOCK(&sock->lock); } @@ -3170,7 +3210,7 @@ internal_send(isc__socket_t *sock) { * and unlocking twice if both reads and writes are possible. */ static void -process_fd(isc__socketmgr_t *manager, int fd, bool readable, +process_fd(isc__socketthread_t *thread, int fd, bool readable, bool writeable) { isc__socket_t *sock; @@ -3180,25 +3220,27 @@ process_fd(isc__socketmgr_t *manager, int fd, bool readable, /* * If the socket is going to be closed, don't do more I/O. */ - LOCK(&manager->fdlock[lockid]); - if (manager->fdstate[fd] == CLOSE_PENDING) { - UNLOCK(&manager->fdlock[lockid]); + LOCK(&thread->fdlock[lockid]); + if (thread->fdstate[fd] == CLOSE_PENDING) { + UNLOCK(&thread->fdlock[lockid]); - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); return; } - sock = manager->fds[fd]; + sock = thread->fds[fd]; + if (sock == NULL) { + unwatch_read = readable; + unwatch_write = writeable; + goto unlock_fd; + } + LOCK(&sock->lock); sock->references++; UNLOCK(&sock->lock); if (readable) { - if (sock == NULL) { - unwatch_read = true; - goto check_write; - } if (!SOCK_DEAD(sock)) { if (sock->listener) internal_accept(sock); @@ -3207,7 +3249,7 @@ process_fd(isc__socketmgr_t *manager, int fd, bool readable, } unwatch_read = true; } -check_write: + if (writeable) { if (sock == NULL) { unwatch_write = true; @@ -3223,70 +3265,76 @@ check_write: } unlock_fd: - UNLOCK(&manager->fdlock[lockid]); + UNLOCK(&thread->fdlock[lockid]); if (unwatch_read) - (void)unwatch_fd(manager, fd, SELECT_POKE_READ); + (void)unwatch_fd(thread, fd, SELECT_POKE_READ); if (unwatch_write) - (void)unwatch_fd(manager, fd, SELECT_POKE_WRITE); - LOCK(&sock->lock); - sock->references--; - UNLOCK(&sock->lock); + (void)unwatch_fd(thread, fd, SELECT_POKE_WRITE); + if (sock != NULL) { + LOCK(&sock->lock); + sock->references--; + UNLOCK(&sock->lock); + } } #ifdef USE_KQUEUE static bool -process_fds(isc__socketmgr_t *manager, struct kevent *events, int nevents) { +process_fds(isc__socketthread_t *thread, struct kevent *events, + int nevents) +{ int i; bool readable, writable; bool done = false; bool have_ctlevent = false; + INSIST(thread->threadid == 0); - if (nevents == manager->nevents) { + if (nevents == thread->nevents) { /* * This is not an error, but something unexpected. If this * happens, it may indicate the need for increasing * ISC_SOCKET_MAXEVENTS. */ - manager_log(manager, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, - "maximum number of FD events (%d) received", - nevents); + thread_log(thread, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, + "maximum number of FD events (%d) received", + nevents); } for (i = 0; i < nevents; i++) { - REQUIRE(events[i].ident < manager->maxsocks); - if (events[i].ident == (uintptr_t)manager->pipe_fds[0]) { + REQUIRE(events[i].ident < thread->manager->maxsocks); + if (events[i].ident == (uintptr_t)thread->pipe_fds[0]) { have_ctlevent = true; continue; } readable = (events[i].filter == EVFILT_READ); writable = (events[i].filter == EVFILT_WRITE); - process_fd(manager, events[i].ident, readable, writable); + process_fd(thread, events[i].ident, readable, writable); } if (have_ctlevent) - done = process_ctlfd(manager); + done = process_ctlfd(thread); return (done); } #elif defined(USE_EPOLL) static bool -process_fds(isc__socketmgr_t *manager, struct epoll_event *events, int nevents) +process_fds(isc__socketthread_t *thread, struct epoll_event *events, + int nevents) { int i; bool done = false; bool have_ctlevent = false; - if (nevents == manager->nevents) { - manager_log(manager, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, - "maximum number of FD events (%d) received", - nevents); + if (nevents == thread->nevents) { + thread_log(thread, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, + "maximum number of FD events (%d) received", + nevents); } for (i = 0; i < nevents; i++) { - REQUIRE(events[i].data.fd < (int)manager->maxsocks); - if (events[i].data.fd == manager->pipe_fds[0]) { + REQUIRE(events[i].data.fd < (int)thread->manager->maxsocks); + if (events[i].data.fd == thread->pipe_fds[0]) { have_ctlevent = true; continue; } @@ -3300,78 +3348,81 @@ process_fds(isc__socketmgr_t *manager, struct epoll_event *events, int nevents) * won't block because we use non-blocking sockets. */ int fd = events[i].data.fd; - events[i].events |= manager->epoll_events[fd]; + events[i].events |= thread->epoll_events[fd]; } - process_fd(manager, events[i].data.fd, + process_fd(thread, events[i].data.fd, (events[i].events & EPOLLIN) != 0, (events[i].events & EPOLLOUT) != 0); } if (have_ctlevent) - done = process_ctlfd(manager); + done = process_ctlfd(thread); return (done); } #elif defined(USE_DEVPOLL) static bool -process_fds(isc__socketmgr_t *manager, struct pollfd *events, int nevents) { +process_fds(isc__socketthread_t *thread, struct pollfd *events, + int nevents) +{ int i; bool done = false; bool have_ctlevent = false; + INSIST(threadid == 0); - if (nevents == manager->nevents) { - manager_log(manager, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, - "maximum number of FD events (%d) received", - nevents); + if (nevents == thread->nevents) { + thread_log(manager, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, ISC_LOG_INFO, + "maximum number of FD events (%d) received", + nevents); } for (i = 0; i < nevents; i++) { REQUIRE(events[i].fd < (int)manager->maxsocks); - if (events[i].fd == manager->pipe_fds[0]) { + if (events[i].fd == thread->pipe_fds[0]) { have_ctlevent = true; continue; } - process_fd(manager, events[i].fd, + process_fd(thread, events[i].fd, (events[i].events & POLLIN) != 0, (events[i].events & POLLOUT) != 0); } if (have_ctlevent) - done = process_ctlfd(manager); + done = process_ctlfd(thread); return (done); } #elif defined(USE_SELECT) static void -process_fds(isc__socketmgr_t *manager, int maxfd, fd_set *readfds, +process_fds(isc__socketthread_t *thread, int maxfd, fd_set *readfds, fd_set *writefds) { int i; - REQUIRE(maxfd <= (int)manager->maxsocks); + REQUIRE(maxfd <= (int)thread->manager->maxsocks); for (i = 0; i < maxfd; i++) { - if (i == manager->pipe_fds[0] || i == manager->pipe_fds[1]) + if (i == thread->pipe_fds[0] || i == thread->pipe_fds[1]) continue; - process_fd(manager, i, FD_ISSET(i, readfds), + process_fd(thread, i, FD_ISSET(i, readfds), FD_ISSET(i, writefds)); } } #endif static bool -process_ctlfd(isc__socketmgr_t *manager) { +process_ctlfd(isc__socketthread_t *thread) { int msg, fd; for (;;) { - select_readmsg(manager, &fd, &msg); + select_readmsg(thread, &fd, &msg); - manager_log(manager, IOEVENT, - isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, - ISC_MSG_WATCHERMSG, - "watcher got message %d " - "for socket %d"), msg, fd); + thread_log(thread, IOEVENT, + isc_msgcat_get(isc_msgcat, ISC_MSGSET_SOCKET, + ISC_MSG_WATCHERMSG, + "watcher got message %d " + "for socket %d"), msg, fd); /* * Nothing to read? @@ -3394,7 +3445,7 @@ process_ctlfd(isc__socketmgr_t *manager) { * and decide if we need to watch on it now * or not. */ - wakeup_socket(manager, fd, msg); + wakeup_socket(thread, fd, msg); } return (false); @@ -3404,12 +3455,14 @@ process_ctlfd(isc__socketmgr_t *manager) { * This is the thread that will loop forever, always in a select or poll * call. * - * When select returns something to do, track down what thread gets to do - * this I/O and post the event to it. + * When select returns something to do, do whatever's necessary and post + * an event to the task that was requesting the action. */ static isc_threadresult_t -watcher(void *uap) { - isc__socketmgr_t *manager = uap; +netthread(void *uap) { + isc__socketthread_t *thread = uap; + isc__socketmgr_t *manager = thread->manager; + (void)manager; bool done; int cc; #ifdef USE_KQUEUE @@ -3417,6 +3470,7 @@ watcher(void *uap) { #elif defined (USE_EPOLL) const char *fnname = "epoll_wait()"; #elif defined(USE_DEVPOLL) + INSIST(threadid == 0); isc_result_t result; const char *fnname = "ioctl(DP_POLL)"; struct dvpoll dvp; @@ -3435,17 +3489,18 @@ watcher(void *uap) { /* * Get the control fd here. This will never change. */ - ctlfd = manager->pipe_fds[0]; + ctlfd = thread->pipe_fds[0]; #endif done = false; while (!done) { do { #ifdef USE_KQUEUE - cc = kevent(manager->kqueue_fd, NULL, 0, - manager->events, manager->nevents, NULL); + cc = kevent(thread->kqueue_fd, NULL, 0, + thread->events, thread->nevents, NULL); #elif defined(USE_EPOLL) - cc = epoll_wait(manager->epoll_fd, manager->events, - manager->nevents, -1); + cc = epoll_wait(thread->epoll_fd, + thread->events, + thread->nevents, -1); #elif defined(USE_DEVPOLL) /* * Re-probe every thousand calls. @@ -3459,10 +3514,10 @@ watcher(void *uap) { manager->calls = 0; } for (pass = 0; pass < 2; pass++) { - dvp.dp_fds = manager->events; - dvp.dp_nfds = manager->nevents; - if (dvp.dp_nfds >= manager->open_max) - dvp.dp_nfds = manager->open_max - 1; + dvp.dp_fds = tgread->events; + dvp.dp_nfds = thread->nevents; + if (dvp.dp_nfds >= thread->open_max) + dvp.dp_nfds = thread->open_max - 1; #ifndef ISC_SOCKET_USE_POLLWATCH dvp.dp_timeout = -1; #else @@ -3472,7 +3527,7 @@ watcher(void *uap) { dvp.dp_timeout = ISC_SOCKET_POLLWATCH_TIMEOUT; #endif /* ISC_SOCKET_USE_POLLWATCH */ - cc = ioctl(manager->devpoll_fd, DP_POLL, &dvp); + cc = ioctl(thread->devpoll_fd, DP_POLL, &dvp); if (cc == -1 && errno == EINVAL) { /* * {OPEN_MAX} may have dropped. Look @@ -3487,16 +3542,20 @@ watcher(void *uap) { break; } #elif defined(USE_SELECT) + /* + * We will have only one thread anyway, we can lock + * manager lock and don't care + */ LOCK(&manager->lock); - memmove(manager->read_fds_copy, manager->read_fds, - manager->fd_bufsize); - memmove(manager->write_fds_copy, manager->write_fds, - manager->fd_bufsize); - maxfd = manager->maxfd + 1; + memmove(thread->read_fds_copy, thread->read_fds, + thread->fd_bufsize); + memmove(thread->write_fds_copy, thread->write_fds, + thread->fd_bufsize); + maxfd = thread->maxfd + 1; UNLOCK(&manager->lock); - cc = select(maxfd, manager->read_fds_copy, - manager->write_fds_copy, NULL, NULL); + cc = select(maxfd, thread->read_fds_copy, + thread->write_fds_copy, NULL, NULL); #endif /* USE_KQUEUE */ if (cc < 0 && !SOFT_ERROR(errno)) { @@ -3525,11 +3584,11 @@ watcher(void *uap) { * (and it can also be a false positive) * so it would be just too noisy. */ - manager_log(manager, - ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_SOCKET, - ISC_LOG_DEBUG(1), - "unexpected POLL timeout"); + thread_log(thread, + ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_SOCKET, + ISC_LOG_DEBUG(1), + "unexpected POLL timeout"); } pollstate = poll_active; } @@ -3537,23 +3596,22 @@ watcher(void *uap) { } while (cc < 0); #if defined(USE_KQUEUE) || defined (USE_EPOLL) || defined (USE_DEVPOLL) - done = process_fds(manager, manager->events, cc); + done = process_fds(thread, thread->events, cc); #elif defined(USE_SELECT) - process_fds(manager, maxfd, manager->read_fds_copy, - manager->write_fds_copy); + process_fds(thread, maxfd, thread->read_fds_copy, + thread->write_fds_copy); /* * Process reads on internal, control fd. */ - if (FD_ISSET(ctlfd, manager->read_fds_copy)) - done = process_ctlfd(manager); + if (FD_ISSET(ctlfd, thread->read_fds_copy)) + done = process_ctlfd(thread); #endif } - manager_log(manager, TRACE, "%s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_EXITING, "watcher exiting")); - + thread_log(thread, TRACE, "%s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_EXITING, "watcher exiting")); return ((isc_threadresult_t)0); } @@ -3576,24 +3634,64 @@ isc_socketmgr_maxudp(isc_socketmgr_t *manager0, int maxudp) { } /* - * Create a new socket manager. + * Setup socket thread, thread->manager and thread->threadid must be filled. */ static isc_result_t -setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { - isc_result_t result; -#if defined(USE_KQUEUE) || defined(USE_EPOLL) || defined(USE_DEVPOLL) +setup_thread(isc__socketthread_t *thread) { + isc_result_t result = ISC_R_SUCCESS; + int i; char strbuf[ISC_STRERRORSIZE]; -#endif + REQUIRE(thread != NULL); + REQUIRE(VALID_MANAGER(thread->manager)); + REQUIRE(thread->threadid >= 0 && thread->threadid < thread->manager->nthreads); + thread->fds = isc_mem_get(thread->manager->mctx, + thread->manager->maxsocks * sizeof(isc__socket_t *)); + if (thread->fds == NULL) { + result = ISC_R_NOMEMORY; + return (result); // TODO + } + memset(thread->fds, 0, thread->manager->maxsocks * sizeof(isc_socket_t *)); + + thread->fdstate = isc_mem_get(thread->manager->mctx, thread->manager->maxsocks * sizeof(int)); + if (thread ->fdstate == NULL) { + result = ISC_R_NOMEMORY; + return (result); // TODO + } + memset(thread->fdstate, 0, thread->manager->maxsocks * sizeof(int)); + + thread->fdlock = isc_mem_get(thread->manager->mctx, FDLOCK_COUNT * sizeof(isc_mutex_t)); + if (thread->fdlock == NULL) { + result = ISC_R_NOMEMORY; + return (result); + } + + for (i = 0; i < FDLOCK_COUNT; i++) { + result = isc_mutex_init(&thread->fdlock[i]); + if (result != ISC_R_SUCCESS) { + return (result); + } + } + + if (pipe(thread->pipe_fds) != 0) { + strerror_r(errno, strbuf, sizeof(strbuf)); + UNEXPECTED_ERROR(__FILE__, __LINE__, + "pipe() %s: %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed"), + strbuf); + return (ISC_R_UNEXPECTED); + } + RUNTIME_CHECK(make_nonblock(thread->pipe_fds[0]) == ISC_R_SUCCESS); #ifdef USE_KQUEUE - manager->nevents = ISC_SOCKET_MAXEVENTS; - manager->events = isc_mem_get(mctx, sizeof(struct kevent) * - manager->nevents); - if (manager->events == NULL) + thread->nevents = ISC_SOCKET_MAXEVENTS; + thread->events = isc_mem_get(thread->manager->mctx, sizeof(struct kevent) * + thread->nevents); + if (thread->events == NULL) return (ISC_R_NOMEMORY); - manager->kqueue_fd = kqueue(); - if (manager->kqueue_fd == -1) { + thread->kqueue_fd = kqueue(); + if (thread->kqueue_fd == -1) { result = isc__errno2result(errno); strerror_r(errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, @@ -3601,52 +3699,61 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), strbuf); - isc_mem_put(mctx, manager->events, - sizeof(struct kevent) * manager->nevents); + isc_mem_put(thread->manager->mctx, thread->events, + sizeof(struct kevent) * thread->nevents); return (result); } - result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); + result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); if (result != ISC_R_SUCCESS) { - close(manager->kqueue_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct kevent) * manager->nevents); - return (result); + close(thread->kqueue_fd); + isc_mem_put(thread->manager->mctx, thread->events, + sizeof(struct kevent) * thread->nevents); } + return (result); + #elif defined(USE_EPOLL) - manager->nevents = ISC_SOCKET_MAXEVENTS; - manager->events = isc_mem_get(mctx, sizeof(struct epoll_event) * - manager->nevents); - if (manager->events == NULL) + thread->nevents = ISC_SOCKET_MAXEVENTS; + thread->epoll_events = isc_mem_get(thread->manager->mctx, + (thread->manager->maxsocks * + sizeof(uint32_t))); + if (thread->epoll_events == NULL) { + return (ISC_R_NOMEMORY); + } + memset(thread->epoll_events, 0, + thread->manager->maxsocks * sizeof(uint32_t)); + + thread->events = isc_mem_get(thread->manager->mctx, + sizeof(struct epoll_event) * + thread->nevents); + if (thread->events == NULL) { return (ISC_R_NOMEMORY); - manager->epoll_fd = epoll_create(manager->nevents); - if (manager->epoll_fd == -1) { + } + + thread->epoll_fd = epoll_create(thread->nevents); + if (thread->epoll_fd == -1) { result = isc__errno2result(errno); strerror_r(errno, strbuf, sizeof(strbuf)); UNEXPECTED_ERROR(__FILE__, __LINE__, "epoll_create %s: %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed"), - strbuf); - isc_mem_put(mctx, manager->events, - sizeof(struct epoll_event) * manager->nevents); - return (result); - } - result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); - if (result != ISC_R_SUCCESS) { - close(manager->epoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct epoll_event) * manager->nevents); + strbuf); return (result); + } + result = watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); + return (result); + #elif defined(USE_DEVPOLL) - manager->nevents = ISC_SOCKET_MAXEVENTS; + thread->nevents = ISC_SOCKET_MAXEVENTS; result = isc_resource_getcurlimit(isc_resource_openfiles, &manager->open_max); if (result != ISC_R_SUCCESS) manager->open_max = 64; manager->calls = 0; - manager->events = isc_mem_get(mctx, sizeof(struct pollfd) * + manager->events = isc_mem_get(thread->manager->mctx, + sizeof(struct pollfd) * manager->nevents); if (manager->events == NULL) return (ISC_R_NOMEMORY); @@ -3677,7 +3784,7 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { sizeof(pollinfo_t) * manager->maxsocks); return (result); } - result = watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); + result = watch_fd(manager, 0, manager->pipe_fds[0], SELECT_POKE_READ); if (result != ISC_R_SUCCESS) { close(manager->devpoll_fd); isc_mem_put(mctx, manager->events, @@ -3686,6 +3793,8 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { sizeof(pollinfo_t) * manager->maxsocks); return (result); } + + return (ISC_R_SUCCESS); #elif defined(USE_SELECT) UNUSED(result); @@ -3695,101 +3804,125 @@ setup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { * FD_SETSIZE, but we separate the cases to avoid possible portability * issues regarding howmany() and the actual representation of fd_set. */ - manager->fd_bufsize = howmany(manager->maxsocks, NFDBITS) * + thread->fd_bufsize = howmany(manager->maxsocks, NFDBITS) * sizeof(fd_mask); #else - manager->fd_bufsize = sizeof(fd_set); + thread->fd_bufsize = sizeof(fd_set); #endif - manager->read_fds = NULL; - manager->read_fds_copy = NULL; - manager->write_fds = NULL; - manager->write_fds_copy = NULL; - - manager->read_fds = isc_mem_get(mctx, manager->fd_bufsize); - if (manager->read_fds != NULL) - manager->read_fds_copy = isc_mem_get(mctx, manager->fd_bufsize); - if (manager->read_fds_copy != NULL) - manager->write_fds = isc_mem_get(mctx, manager->fd_bufsize); - if (manager->write_fds != NULL) { - manager->write_fds_copy = isc_mem_get(mctx, - manager->fd_bufsize); - } - if (manager->write_fds_copy == NULL) { - if (manager->write_fds != NULL) { - isc_mem_put(mctx, manager->write_fds, - manager->fd_bufsize); + thread->read_fds = NULL; + thread->read_fds_copy = NULL; + thread->write_fds = NULL; + thread->write_fds_copy = NULL; + + thread->read_fds = isc_mem_get(thread->manager->mctx, thread->fd_bufsize); + if (thread->read_fds != NULL) + thread->read_fds_copy = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + if (thread->read_fds_copy != NULL) + thread->write_fds = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + if (thread->write_fds != NULL) { + thread->write_fds_copy = isc_mem_get(thread->manager->mctx, + thread->fd_bufsize); + } + if (thread->write_fds_copy == NULL) { + if (thread->write_fds != NULL) { + isc_mem_put(thread->manager->mctx, thread->write_fds, + thread->fd_bufsize); } - if (manager->read_fds_copy != NULL) { - isc_mem_put(mctx, manager->read_fds_copy, - manager->fd_bufsize); + if (thread->read_fds_copy != NULL) { + isc_mem_put(thread->manager->mctx, + thread->read_fds_copy, + thread->fd_bufsize); } - if (manager->read_fds != NULL) { - isc_mem_put(mctx, manager->read_fds, - manager->fd_bufsize); + if (thread->read_fds != NULL) { + isc_mem_put(thread->manager->mctx, thread->read_fds, + thread->fd_bufsize); } return (ISC_R_NOMEMORY); } - memset(manager->read_fds, 0, manager->fd_bufsize); - memset(manager->write_fds, 0, manager->fd_bufsize); + memset(thread->read_fds, 0, thread->fd_bufsize); + memset(thread->write_fds, 0, thread->fd_bufsize); - (void)watch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); - manager->maxfd = manager->pipe_fds[0]; -#endif /* USE_KQUEUE */ + (void)watch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); + thread->maxfd = thread->pipe_fds[0]; return (ISC_R_SUCCESS); +#endif /* USE_KQUEUE */ } static void -cleanup_watcher(isc_mem_t *mctx, isc__socketmgr_t *manager) { +cleanup_thread(isc_mem_t *mctx, isc__socketthread_t *thread) { isc_result_t result; + int i; - result = unwatch_fd(manager, manager->pipe_fds[0], SELECT_POKE_READ); + result = unwatch_fd(thread, thread->pipe_fds[0], SELECT_POKE_READ); if (result != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "epoll_ctl(DEL) %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); } - #ifdef USE_KQUEUE - close(manager->kqueue_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct kevent) * manager->nevents); + close(thread->kqueue_fd); + isc_mem_put(mctx, thread->events, + sizeof(struct kevent) * thread->nevents); #elif defined(USE_EPOLL) - close(manager->epoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct epoll_event) * manager->nevents); + close(thread->epoll_fd); + + isc_mem_put(mctx, thread->events, + sizeof(struct epoll_event) * thread->nevents); #elif defined(USE_DEVPOLL) - close(manager->devpoll_fd); - isc_mem_put(mctx, manager->events, - sizeof(struct pollfd) * manager->nevents); - isc_mem_put(mctx, manager->fdpollinfo, - sizeof(pollinfo_t) * manager->maxsocks); + close(thread->devpoll_fd); + isc_mem_put(mctx, thread->events, + sizeof(struct pollfd) * thread->nevents); + isc_mem_put(mctx, thread->fdpollinfo, + sizeof(pollinfo_t) * thread->maxsocks); #elif defined(USE_SELECT) - if (manager->read_fds != NULL) - isc_mem_put(mctx, manager->read_fds, manager->fd_bufsize); - if (manager->read_fds_copy != NULL) - isc_mem_put(mctx, manager->read_fds_copy, manager->fd_bufsize); - if (manager->write_fds != NULL) - isc_mem_put(mctx, manager->write_fds, manager->fd_bufsize); - if (manager->write_fds_copy != NULL) - isc_mem_put(mctx, manager->write_fds_copy, manager->fd_bufsize); + if (thread->read_fds != NULL) + isc_mem_put(mctx, thread->read_fds, thread->fd_bufsize); + if (thread->read_fds_copy != NULL) + isc_mem_put(mctx, thread->read_fds_copy, thread->fd_bufsize); + if (thread->write_fds != NULL) + isc_mem_put(mctx, thread->write_fds, thread->fd_bufsize); + if (thread->write_fds_copy != NULL) + isc_mem_put(mctx, thread->write_fds_copy, thread->fd_bufsize); #endif /* USE_KQUEUE */ + for (i = 0; i < (int)thread->manager->maxsocks; i++) + if (thread->fdstate[i] == CLOSE_PENDING) /* no need to lock */ + (void)close(i); + +#if defined(USE_EPOLL) + isc_mem_put(thread->manager->mctx, thread->epoll_events, + thread->manager->maxsocks * sizeof(uint32_t)); +#endif + isc_mem_put(thread->manager->mctx, thread->fds, + thread->manager->maxsocks * sizeof(isc__socket_t *)); + isc_mem_put(thread->manager->mctx, thread->fdstate, + thread->manager->maxsocks * sizeof(int)); + + + if (thread->fdlock != NULL) { + for (i = 0; i < FDLOCK_COUNT; i++) + DESTROYLOCK(&thread->fdlock[i]); + isc_mem_put(thread->manager->mctx, thread->fdlock, + FDLOCK_COUNT * sizeof(isc_mutex_t)); + } + } isc_result_t isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { - return (isc_socketmgr_create2(mctx, managerp, 0)); + return (isc_socketmgr_create2(mctx, managerp, 0, 1)); } isc_result_t isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, - unsigned int maxsocks) + unsigned int maxsocks, int nthreads) { int i; isc__socketmgr_t *manager; - char strbuf[ISC_STRERRORSIZE]; isc_result_t result; REQUIRE(managerp != NULL && *managerp == NULL); @@ -3798,153 +3931,63 @@ isc_socketmgr_create2(isc_mem_t *mctx, isc_socketmgr_t **managerp, maxsocks = ISC_SOCKET_MAXSOCKETS; manager = isc_mem_get(mctx, sizeof(*manager)); - if (manager == NULL) + if (manager == NULL) { return (ISC_R_NOMEMORY); + } /* zero-clear so that necessary cleanup on failure will be easy */ memset(manager, 0, sizeof(*manager)); manager->maxsocks = maxsocks; manager->reserved = 0; manager->maxudp = 0; - manager->fds = isc_mem_get(mctx, - manager->maxsocks * sizeof(isc__socket_t *)); - if (manager->fds == NULL) { - result = ISC_R_NOMEMORY; - goto free_manager; - } - manager->fdstate = isc_mem_get(mctx, manager->maxsocks * sizeof(int)); - if (manager->fdstate == NULL) { - result = ISC_R_NOMEMORY; - goto free_manager; - } -#if defined(USE_EPOLL) - manager->epoll_events = isc_mem_get(mctx, (manager->maxsocks * - sizeof(uint32_t))); - if (manager->epoll_events == NULL) { - result = ISC_R_NOMEMORY; - goto free_manager; - } - memset(manager->epoll_events, 0, manager->maxsocks * sizeof(uint32_t)); -#endif + manager->nthreads = nthreads; manager->stats = NULL; manager->common.magic = ISCAPI_SOCKETMGR_MAGIC; manager->common.impmagic = SOCKET_MANAGER_MAGIC; manager->mctx = NULL; - memset(manager->fds, 0, manager->maxsocks * sizeof(isc_socket_t *)); ISC_LIST_INIT(manager->socklist); result = isc_mutex_init(&manager->lock); - if (result != ISC_R_SUCCESS) - goto free_manager; - manager->fdlock = isc_mem_get(mctx, FDLOCK_COUNT * sizeof(isc_mutex_t)); - if (manager->fdlock == NULL) { - result = ISC_R_NOMEMORY; - goto cleanup_lock; - } - for (i = 0; i < FDLOCK_COUNT; i++) { - result = isc_mutex_init(&manager->fdlock[i]); - if (result != ISC_R_SUCCESS) { - while (--i >= 0) - DESTROYLOCK(&manager->fdlock[i]); - isc_mem_put(mctx, manager->fdlock, - FDLOCK_COUNT * sizeof(isc_mutex_t)); - manager->fdlock = NULL; - goto cleanup_lock; - } + if (result != ISC_R_SUCCESS) { + return (result); } - if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_condition_init() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_lock; - } - - /* - * Create the special fds that will be used to wake up the - * select/poll loop when something internal needs to be done. - */ - if (pipe(manager->pipe_fds) != 0) { - strerror_r(errno, strbuf, sizeof(strbuf)); - UNEXPECTED_ERROR(__FILE__, __LINE__, - "pipe() %s: %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed"), - strbuf); - result = ISC_R_UNEXPECTED; - goto cleanup_condition; + return (ISC_R_UNEXPECTED); } - RUNTIME_CHECK(make_nonblock(manager->pipe_fds[0]) == ISC_R_SUCCESS); - - /* - * Set up initial state for the select loop - */ - result = setup_watcher(mctx, manager); - if (result != ISC_R_SUCCESS) - goto cleanup; - - memset(manager->fdstate, 0, manager->maxsocks * sizeof(int)); /* * Start up the select/poll thread. */ - if (isc_thread_create(watcher, manager, &manager->watcher) != - ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_thread_create() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - cleanup_watcher(mctx, manager); - result = ISC_R_UNEXPECTED; - goto cleanup; - } - isc_thread_setname(manager->watcher, "isc-socket"); + manager->threads = isc_mem_get(mctx, sizeof(isc__socketthread_t) * manager->nthreads); + RUNTIME_CHECK(manager->threads != NULL); isc_mem_attach(mctx, &manager->mctx); + for (i=0; i < manager->nthreads; i++) { + manager->threads[i].manager = manager; + manager->threads[i].threadid = i; + setup_thread(&manager->threads[i]); + if (isc_thread_create(netthread, &manager->threads[i], &manager->threads[i].thread) != + ISC_R_SUCCESS) { + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_thread_create() %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed")); + return (ISC_R_UNEXPECTED); + } + char tname[1024]; + sprintf(tname, "isc-socket-%d", i); + isc_thread_setname(manager->threads[i].thread, tname); + } + *managerp = (isc_socketmgr_t *)manager; return (ISC_R_SUCCESS); -cleanup: - (void)close(manager->pipe_fds[0]); - (void)close(manager->pipe_fds[1]); - -cleanup_condition: - (void)isc_condition_destroy(&manager->shutdown_ok); - - -cleanup_lock: - if (manager->fdlock != NULL) { - for (i = 0; i < FDLOCK_COUNT; i++) - DESTROYLOCK(&manager->fdlock[i]); - } - DESTROYLOCK(&manager->lock); - -free_manager: - if (manager->fdlock != NULL) { - isc_mem_put(mctx, manager->fdlock, - FDLOCK_COUNT * sizeof(isc_mutex_t)); - } -#if defined(USE_EPOLL) - if (manager->epoll_events != NULL) { - isc_mem_put(mctx, manager->epoll_events, - manager->maxsocks * sizeof(uint32_t)); - } -#endif - if (manager->fdstate != NULL) { - isc_mem_put(mctx, manager->fdstate, - manager->maxsocks * sizeof(int)); - } - if (manager->fds != NULL) { - isc_mem_put(mctx, manager->fds, - manager->maxsocks * sizeof(isc_socket_t *)); - } - isc_mem_put(mctx, manager, sizeof(*manager)); - - return (result); } isc_result_t @@ -4004,48 +4047,30 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) { * half of the pipe, which will send EOF to the read half. * This is currently a no-op in the non-threaded case. */ - select_poke(manager, 0, SELECT_POKE_SHUTDOWN); + for (i = 0; i < manager->nthreads; i++) { + select_poke(manager, i, 0, SELECT_POKE_SHUTDOWN); + } /* * Wait for thread to exit. */ - if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS) - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_thread_join() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - + for (i = 0; i < manager->nthreads; i++) { + if (isc_thread_join(manager->threads[i].thread, NULL) != ISC_R_SUCCESS) + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_thread_join() %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed")); + cleanup_thread(manager->mctx, &manager->threads[i]); + } /* * Clean up. */ - cleanup_watcher(manager->mctx, manager); - - (void)close(manager->pipe_fds[0]); - (void)close(manager->pipe_fds[1]); + isc_mem_put(manager->mctx, manager->threads, sizeof(isc__socketthread_t) * manager->nthreads); (void)isc_condition_destroy(&manager->shutdown_ok); - for (i = 0; i < (int)manager->maxsocks; i++) - if (manager->fdstate[i] == CLOSE_PENDING) /* no need to lock */ - (void)close(i); - -#if defined(USE_EPOLL) - isc_mem_put(manager->mctx, manager->epoll_events, - manager->maxsocks * sizeof(uint32_t)); -#endif - isc_mem_put(manager->mctx, manager->fds, - manager->maxsocks * sizeof(isc__socket_t *)); - isc_mem_put(manager->mctx, manager->fdstate, - manager->maxsocks * sizeof(int)); if (manager->stats != NULL) isc_stats_detach(&manager->stats); - - if (manager->fdlock != NULL) { - for (i = 0; i < FDLOCK_COUNT; i++) - DESTROYLOCK(&manager->fdlock[i]); - isc_mem_put(manager->mctx, manager->fdlock, - FDLOCK_COUNT * sizeof(isc_mutex_t)); - } DESTROYLOCK(&manager->lock); manager->common.magic = 0; manager->common.impmagic = 0; @@ -4102,7 +4127,8 @@ socket_recv(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * watched, poke the watcher to start paying attention to it. */ if (ISC_LIST_EMPTY(sock->recv_list)) - select_poke(sock->manager, sock->fd, SELECT_POKE_READ); + select_poke(sock->manager, sock->threadid, sock->fd, + SELECT_POKE_READ); ISC_LIST_ENQUEUE(sock->recv_list, dev, ev_link); socket_log(sock, NULL, EVENT, NULL, 0, 0, @@ -4249,7 +4275,8 @@ socket_send(isc__socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * paying attention to it. */ if (ISC_LIST_EMPTY(sock->send_list)) - select_poke(sock->manager, sock->fd, + select_poke(sock->manager, sock->threadid, + sock->fd, SELECT_POKE_WRITE); ISC_LIST_ENQUEUE(sock->send_list, dev, ev_link); @@ -4811,7 +4838,8 @@ isc_socket_accept(isc_socket_t *sock0, ISC_LIST_ENQUEUE(sock->accept_list, dev, ev_link); if (do_poke) - select_poke(manager, sock->fd, SELECT_POKE_ACCEPT); + select_poke(manager, sock->threadid, sock->fd, + SELECT_POKE_ACCEPT); UNLOCK(&sock->lock); return (ISC_R_SUCCESS); @@ -4966,7 +4994,8 @@ isc_socket_connect(isc_socket_t *sock0, const isc_sockaddr_t *addr, * bit of time waking it up now or later won't matter all that much. */ if (ISC_LIST_EMPTY(sock->connect_list) && !sock->connecting) - select_poke(manager, sock->fd, SELECT_POKE_CONNECT); + select_poke(manager, sock->threadid, sock->fd, + SELECT_POKE_CONNECT); sock->connecting = 1; @@ -5023,8 +5052,8 @@ internal_connect(isc__socket_t *sock) { */ if (SOFT_ERROR(errno) || errno == EINPROGRESS) { sock->connecting = 1; - watch_fd(sock->manager, sock->fd, - SELECT_POKE_CONNECT); + watch_fd(&sock->manager->threads[sock->threadid], sock->fd, + SELECT_POKE_CONNECT); UNLOCK(&sock->lock); return;