};
/* FD corresponding to TCP sockets this thread is listening
- on if reuseport is set. Otherwise g_tcpSockets is used instead.
+ on.
These FDs are also in deferredAdds when we have one
socket per listener, and in g_deferredAdds instead. */
std::set<int> tcpSockets;
static std::vector<RecThreadInfo> s_threadInfos;
/* without reuseport, all listeners share the same sockets */
static deferredAdd_t g_deferredAdds;
-static std::set<int> g_tcpSockets;
typedef vector<int> tcpListenSockets_t;
typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
}
}
else {
+ std::set<int> tcpSockets;
/* we don't have reuseport so we can only open one socket per
listening addr:port and everyone will listen on it */
makeUDPServerSockets(g_deferredAdds);
- makeTCPServerSockets(g_deferredAdds, g_tcpSockets);
+ makeTCPServerSockets(g_deferredAdds, tcpSockets);
+
+ /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
+ needs to listen to the shared sockets */
+ if (g_weDistributeQueries) {
+ /* first thread is the handler, then distributors */
+ for (unsigned int threadId = 1; threadId <= g_numDistributorThreads; threadId++) {
+ s_threadInfos.at(threadId).tcpSockets = tcpSockets;
+ }
+ }
+ else {
+ /* first thread is the handler, there is no distributor here and workers are accepting queries */
+ for (unsigned int threadId = 1; threadId <= g_numWorkerThreads; threadId++) {
+ s_threadInfos.at(threadId).tcpSockets = tcpSockets;
+ }
+ }
}
#ifdef NOD_ENABLED
if(threadInfo.isListener) {
if(listenOnTCP) {
- if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
-
- if (g_reusePort) {
- for(const auto fd : threadInfo.tcpSockets) {
- t_fdm->removeReadFD(fd);
- }
+ if(TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
+ for(const auto fd : threadInfo.tcpSockets) {
+ t_fdm->removeReadFD(fd);
}
- else {
- for(const auto fd : g_tcpSockets) {
- t_fdm->removeReadFD(fd);
- }
- }
-
- listenOnTCP=false;
- }
+ listenOnTCP=false;
+ }
}
else {
- if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
- if (g_reusePort) {
- for(const auto fd : threadInfo.tcpSockets) {
- t_fdm->addReadFD(fd, handleNewTCPQuestion);
- }
+ if(TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
+ for(const auto fd : threadInfo.tcpSockets) {
+ t_fdm->addReadFD(fd, handleNewTCPQuestion);
}
- else {
- for(const auto fd : g_tcpSockets) {
- t_fdm->addReadFD(fd, handleNewTCPQuestion);
- }
- }
-
- listenOnTCP=true;
- }
+ listenOnTCP=true;
+ }
}
}
}