#endif
}
-static void checkOrFixFDS(Logr::log_t log)
-{
- unsigned int availFDs = getFilenumLimit();
- unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before
- wantFDs += (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread;
+static void checkOrFixFDS(unsigned int listeningSockets, Logr::log_t log)
+{
+ const auto availFDs = getFilenumLimit();
+ // Posix threads
+ const auto threads = RecThreadInfo::numRecursorThreads();
+ // We do not count the handler and task threads, they do not spawn many mthreads at once
+ const auto workers = RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers();
+
+ // Static part: the FDs from the start, pipes, controlsocket, web socket, listen sockets
+ unsigned int staticPart = 25; // general allowance, including control socket, web, snmp
+ // Handler thread gets one pipe, the others all of them
+ staticPart += 2 + (threads - 1) * (sizeof(RecThreadInfo::ThreadPipeSet) / sizeof(int)); // number of fd's in ThreadPipeSet
+ // listen sockets
+ staticPart += listeningSockets;
+ // Another fd per thread for poll/kqueue
+ staticPart += threads;
+ // Incoming TCP, connections are shared by threads and are kept open for a while
+ staticPart += g_maxTCPClients;
+
+ // Dynamic parts per worker
+ // Each mthread uses one fd for either outgoing UDP or outgoing TCP (but not simultaneously)
+ unsigned int perWorker = g_maxMThreads;
+ // plus each worker thread can have a number of idle outgoing TCP connections
+ perWorker += TCPOutConnectionManager::s_maxIdlePerThread;
+
+ auto wantFDs = staticPart + workers * perWorker;
if (wantFDs > availFDs) {
unsigned int hardlimit = getFilenumLimit(true);
+ if (staticPart >= hardlimit) {
+ log->info(Logr::Critical, "Number of available filedescriptors is lower than the minimum needed",
+ "hardlimit", Logging::Loggable(hardlimit), "minimum", Logging::Loggable(staticPart));
+ _exit(1);
+ }
if (hardlimit >= wantFDs) {
setFilenumLimit(wantFDs);
- SLOG(g_log << Logger::Warning << "Raised soft limit on number of filedescriptors to " << wantFDs << " to match max-mthreads and threads settings" << endl,
- log->info(Logr::Warning, "Raised soft limit on number of filedescriptors to match max-mthreads and threads settings", "limit", Logging::Loggable(wantFDs)));
+ log->info(Logr::Warning, "Raised soft limit on number of filedescriptors to match max-mthreads and threads settings", "limit", Logging::Loggable(wantFDs));
}
else {
- auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers());
- SLOG(g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl,
- log->info(Logr::Warning, "Insufficient number of filedescriptors available for max-mthreads*threads setting! Reducing max-mthreads", "hardlimit", Logging::Loggable(hardlimit), "want", Logging::Loggable(wantFDs), "max-mthreads", Logging::Loggable(newval)));
+ auto newval = (hardlimit - staticPart) / workers;
+ log->info(Logr::Warning, "Insufficient number of filedescriptors available for max-mthreads*threads setting! Reducing max-mthreads", "hardlimit", Logging::Loggable(hardlimit), "want", Logging::Loggable(wantFDs), "max-mthreads", Logging::Loggable(newval));
g_maxMThreads = newval;
setFilenumLimit(hardlimit);
}
return 0;
}
-static void initDistribution(Logr::log_t log)
+static unsigned int initDistribution(Logr::log_t log)
{
+ unsigned int count = 0;
g_balancingFactor = ::arg().asDouble("distribution-load-factor");
if (g_balancingFactor != 0.0 && g_balancingFactor < 1.0) {
g_balancingFactor = 0.0;
for (unsigned int i = 0; i < RecThreadInfo::numDistributors(); i++, threadNum++) {
auto& info = RecThreadInfo::info(threadNum);
auto& deferredAdds = info.getDeferredAdds();
- makeUDPServerSockets(deferredAdds, log);
+ count += makeUDPServerSockets(deferredAdds, log);
}
}
else {
for (unsigned int i = 0; i < RecThreadInfo::numUDPWorkers(); i++, threadNum++) {
auto& info = RecThreadInfo::info(threadNum);
auto& deferredAdds = info.getDeferredAdds();
- makeUDPServerSockets(deferredAdds, log);
+ count += makeUDPServerSockets(deferredAdds, log);
}
}
threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers();
auto& info = RecThreadInfo::info(threadNum);
auto& deferredAdds = info.getDeferredAdds();
auto& tcpSockets = info.getTCPSockets();
- makeTCPServerSockets(deferredAdds, tcpSockets, log);
+ count += makeTCPServerSockets(deferredAdds, tcpSockets, log);
}
}
else {
std::set<int> tcpSockets;
/* we don't have reuseport so we can only open one socket per
listening addr:port and everyone will listen on it */
- makeUDPServerSockets(s_deferredUDPadds, log);
- makeTCPServerSockets(s_deferredTCPadds, tcpSockets, log);
+ count += makeUDPServerSockets(s_deferredUDPadds, log);
+ count += makeTCPServerSockets(s_deferredTCPadds, tcpSockets, log);
// TCP queries are handled by TCP workers
for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) {
info.setTCPSockets(tcpSockets);
}
}
+ return count;
}
static int initForks(Logr::log_t log)
initSuffixMatchNodes(log);
initCarbon();
- initDistribution(log);
+ auto listeningSockets = initDistribution(log);
#ifdef NOD_ENABLED
// Setup newly observed domain globals
auto forks = initForks(log);
- checkOrFixFDS(log);
+ g_tcpTimeout = ::arg().asNum("client-tcp-timeout");
+ g_maxTCPClients = ::arg().asNum("max-tcp-clients");
+ g_maxTCPPerClient = ::arg().asNum("max-tcp-per-client");
+ g_tcpMaxQueriesPerConn = ::arg().asNum("max-tcp-queries-per-connection");
+ g_maxUDPQueriesPerRound = ::arg().asNum("max-udp-queries-per-round");
+
+ g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
+ g_maxChainLength = ::arg().asNum("max-chain-length");
+
+ checkOrFixFDS(listeningSockets, log);
checkOrFixLinuxMapCountLimits(log);
#ifdef HAVE_LIBSODIUM
RecThreadInfo::makeThreadPipes(log);
- g_tcpTimeout = ::arg().asNum("client-tcp-timeout");
- g_maxTCPClients = ::arg().asNum("max-tcp-clients");
- g_maxTCPPerClient = ::arg().asNum("max-tcp-per-client");
- g_tcpMaxQueriesPerConn = ::arg().asNum("max-tcp-queries-per-connection");
- g_maxUDPQueriesPerRound = ::arg().asNum("max-udp-queries-per-round");
-
- g_useKernelTimestamp = ::arg().mustDo("protobuf-use-kernel-timestamp");
- g_maxChainLength = ::arg().asNum("max-chain-length");
-
disableStats(StatComponent::API, ::arg()["stats-api-blacklist"]);
disableStats(StatComponent::Carbon, ::arg()["stats-carbon-blacklist"]);
disableStats(StatComponent::RecControl, ::arg()["stats-rec-control-blacklist"]);