static unsigned int selectWorker(unsigned int hash)
{
- assert(RecThreadInfo::numWorkers() != 0); // NOLINT: assert implementation
+ assert(RecThreadInfo::numUDPWorkers() != 0); // NOLINT: assert implementation
if (g_balancingFactor == 0) {
- return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numWorkers());
+ return RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + (hash % RecThreadInfo::numUDPWorkers());
}
/* we start with one, representing the query we are currently handling */
double currentLoad = 1;
- std::vector<unsigned int> load(RecThreadInfo::numWorkers());
- for (size_t idx = 0; idx < RecThreadInfo::numWorkers(); idx++) {
+ std::vector<unsigned int> load(RecThreadInfo::numUDPWorkers());
+ for (size_t idx = 0; idx < RecThreadInfo::numUDPWorkers(); idx++) {
load[idx] = getWorkerLoad(idx);
currentLoad += load[idx];
}
- double targetLoad = (currentLoad / RecThreadInfo::numWorkers()) * g_balancingFactor;
+ double targetLoad = (currentLoad / RecThreadInfo::numUDPWorkers()) * g_balancingFactor;
- unsigned int worker = hash % RecThreadInfo::numWorkers();
+ unsigned int worker = hash % RecThreadInfo::numUDPWorkers();
/* at least one server has to be at or below the average load */
if (load[worker] > targetLoad) {
++t_Counters.at(rec::Counter::rebalancedQueries);
do {
- worker = (worker + 1) % RecThreadInfo::numWorkers();
+ worker = (worker + 1) % RecThreadInfo::numUDPWorkers();
} while (load[worker] > targetLoad);
}
was full, let's try another one */
unsigned int newTarget = 0;
do {
- newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numWorkers());
+ newTarget = RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + dns_random(RecThreadInfo::numUDPWorkers());
} while (newTarget == target);
if (!trySendingQueryToWorker(newTarget, tmsg)) {
std::shared_ptr<Logr::Logger> g_slogudpout;
/* without reuseport, all listeners share the same sockets */
-static deferredAdd_t g_deferredAdds;
-static deferredAdd_t g_deferredTCPAdds;
+static deferredAdd_t s_deferredUDPadds;
+static deferredAdd_t s_deferredTCPadds;
/* first we have the handler thread, t_id == 0 (some other
helper threads like SNMP might have t_id == 0 as well)
bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
unsigned int RecThreadInfo::s_numDistributorThreads;
-unsigned int RecThreadInfo::s_numWorkerThreads;
+unsigned int RecThreadInfo::s_numUDPWorkerThreads;
unsigned int RecThreadInfo::s_numTCPWorkerThreads;
thread_local unsigned int RecThreadInfo::t_id;
int ret = EXIT_SUCCESS;
const auto cpusMap = parseCPUMap(log);
- if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
+ if (RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers() == 1) {
SLOG(g_log << Logger::Warning << "Operating with single distributor/worker thread" << endl,
log->info(Logr::Notice, "Operating with single distributor/worker thread"));
currentThreadId = 2;
for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.setListener();
info.setTCPListener();
info.setWorker();
info.start(currentThreadId, "tcpworker", cpusMap, log);
RecThreadInfo::info(currentThreadId).setListener();
}
}
- for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+ for (unsigned int thread = 0; thread < RecThreadInfo::numUDPWorkers(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
info.setListener(!RecThreadInfo::weDistributeQueries());
info.setWorker();
}
for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.setListener();
info.setTCPListener();
info.setWorker();
}
info.start(currentThreadId, "distr", cpusMap, log);
}
}
- SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl,
- log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numWorkers())));
+ SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numUDPWorkers() << " worker threads" << endl,
+ log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numUDPWorkers())));
- for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+ for (unsigned int thread = 0; thread < RecThreadInfo::numUDPWorkers(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
info.start(currentThreadId, "worker", cpusMap, log);
}
static void checkOrFixFDS(Logr::log_t log)
{
unsigned int availFDs = getFilenumLimit();
- unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before
- wantFDs += (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread;
+ unsigned int wantFDs = g_maxMThreads * (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) + 25; // even healthier margin than before
+ wantFDs += (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers()) * TCPOutConnectionManager::s_maxIdlePerThread;
if (wantFDs > availFDs) {
unsigned int hardlimit = getFilenumLimit(true);
log->info(Logr::Warning, "Raised soft limit on number of filedescriptors to match max-mthreads and threads settings", "limit", Logging::Loggable(wantFDs)));
}
else {
- auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numWorkers() + RecThreadInfo::numTCPWorkers());
+ auto newval = (hardlimit - 25 - TCPOutConnectionManager::s_maxIdlePerThread) / (RecThreadInfo::numUDPWorkers() + RecThreadInfo::numTCPWorkers());
SLOG(g_log << Logger::Warning << "Insufficient number of filedescriptors available for max-mthreads*threads setting! (" << hardlimit << " < " << wantFDs << "), reducing max-mthreads to " << newval << endl,
log->info(Logr::Warning, "Insufficient number of filedescriptors available for max-mthreads*threads setting! Reducing max-mthreads", "hardlimit", Logging::Loggable(hardlimit), "want", Logging::Loggable(wantFDs), "max-mthreads", Logging::Loggable(newval)));
g_maxMThreads = newval;
}
unsigned int thread = 0;
- for (auto& threadInfo : RecThreadInfo::infos()) {
+ for (const auto& threadInfo : RecThreadInfo::infos()) {
if (thread++ == RecThreadInfo::id()) {
func(); // don't write to ourselves!
continue;
unsigned int thread = 0;
T ret = T();
- for (auto& threadInfo : RecThreadInfo::infos()) {
+ for (const auto& threadInfo : RecThreadInfo::infos()) {
if (thread++ == RecThreadInfo::id()) {
continue;
}
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int i = 0; i < RecThreadInfo::numWorkers(); i++, threadNum++) {
+ for (unsigned int i = 0; i < RecThreadInfo::numUDPWorkers(); i++, threadNum++) {
auto& info = RecThreadInfo::info(threadNum);
auto& deferredAdds = info.getDeferredAdds();
makeUDPServerSockets(deferredAdds, log);
}
}
- threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers();
+ threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers();
for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++, threadNum++) {
auto& info = RecThreadInfo::info(threadNum);
auto& deferredAdds = info.getDeferredAdds();
std::set<int> tcpSockets;
/* we don't have reuseport so we can only open one socket per
listening addr:port and everyone will listen on it */
- makeUDPServerSockets(g_deferredAdds, log);
- makeTCPServerSockets(g_deferredTCPAdds, tcpSockets, log);
+ makeUDPServerSockets(s_deferredUDPadds, log);
+ makeTCPServerSockets(s_deferredTCPadds, tcpSockets, log);
// TCP queries are handled by TCP workers
for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) {
- auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers());
+ auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numUDPWorkers());
info.setTCPSockets(tcpSockets);
}
}
g_paddingOutgoing = ::arg().mustDo("edns-padding-out");
RecThreadInfo::setNumDistributorThreads(::arg().asNum("distributor-threads"));
- RecThreadInfo::setNumWorkerThreads(::arg().asNum("threads"));
- if (RecThreadInfo::numWorkers() < 1) {
+ RecThreadInfo::setNumUDPWorkerThreads(::arg().asNum("threads"));
+ if (RecThreadInfo::numUDPWorkers() < 1) {
SLOG(g_log << Logger::Warning << "Asked to run with 0 threads, raising to 1 instead" << endl,
log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead"));
- RecThreadInfo::setNumWorkerThreads(1);
+ RecThreadInfo::setNumUDPWorkerThreads(1);
}
RecThreadInfo::setNumTCPWorkerThreads(1); // XXX
if (RecThreadInfo::numTCPWorkers() < 1) {
}
}
- unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numWorkers();
+ unsigned int ringsize = ::arg().asNum("stats-ringbuffer-entries") / RecThreadInfo::numUDPWorkers();
if (ringsize != 0) {
t_remotes = std::make_unique<addrringbuf_t>();
if (RecThreadInfo::weDistributeQueries()) {
}
else {
/* otherwise all listeners are listening on the same ones */
- for (const auto& deferred : threadInfo.isTCPListener() ? g_deferredTCPAdds : g_deferredAdds) {
+ for (const auto& deferred : threadInfo.isTCPListener() ? s_deferredTCPadds : s_deferredUDPadds) {
t_fdm->addReadFD(deferred.first, deferred.second);
}
}
return worker;
}
+ // UDP or TCP listener?
[[nodiscard]] bool isListener() const
{
return listener;
}
+
+ // A TCP-only listener?
[[nodiscard]] bool isTCPListener() const
{
return tcplistener;
void setTCPListener(bool flag = true)
{
+ setListener(flag);
tcplistener = flag;
}
return 1;
}
- static unsigned int numWorkers()
+ static unsigned int numUDPWorkers()
{
- return s_numWorkerThreads;
+ return s_numUDPWorkerThreads;
}
static unsigned int numTCPWorkers()
s_weDistributeQueries = flag;
}
- static void setNumWorkerThreads(unsigned int n)
+ static void setNumUDPWorkerThreads(unsigned int n)
{
- s_numWorkerThreads = n;
+ s_numUDPWorkerThreads = n;
}
static void setNumTCPWorkerThreads(unsigned int n)
static unsigned int numRecursorThreads()
{
- return numHandlers() + numDistributors() + numWorkers() + numTCPWorkers() + numTaskThreads();
+ return numHandlers() + numDistributors() + numUDPWorkers() + numTCPWorkers() + numTaskThreads();
}
static int runThreads(Logr::log_t);
return deferredAdds;
}
- ThreadPipeSet& getPipes()
+ const ThreadPipeSet& getPipes() const
{
return pipes;
}
MT_t* mt{nullptr};
uint64_t numberOfDistributedQueries{0};
- void start(unsigned int theId, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
+ void start(unsigned int tid, const string& tname, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
std::string name;
std::thread thread;
static std::vector<RecThreadInfo> s_threadInfos;
static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
static unsigned int s_numDistributorThreads;
- static unsigned int s_numWorkerThreads;
+ static unsigned int s_numUDPWorkerThreads;
static unsigned int s_numTCPWorkerThreads;
};