unsigned int RecThreadInfo::s_numWorkerThreads;
thread_local unsigned int RecThreadInfo::t_id;
+static std::map<unsigned int, std::set<int>> parseCPUMap()
+{
+ std::map<unsigned int, std::set<int>> result;
+
+ const std::string value = ::arg()["cpu-map"];
+
+ if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
+ g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl;
+ return result;
+ }
+
+ std::vector<std::string> parts;
+
+ stringtok(parts, value, " \t");
+
+ for (const auto& part : parts) {
+ if (part.find('=') == string::npos)
+ continue;
+
+ try {
+ auto headers = splitField(part, '=');
+ boost::trim(headers.first);
+ boost::trim(headers.second);
+
+ unsigned int threadId = pdns_stou(headers.first);
+ std::vector<std::string> cpus;
+
+ stringtok(cpus, headers.second, ",");
+
+ for (const auto& cpu : cpus) {
+ int cpuId = std::stoi(cpu);
+
+ result[threadId].insert(cpuId);
+ }
+ }
+ catch (const std::exception& e) {
+ g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl;
+ }
+ }
+
+ return result;
+}
+
+static void setCPUMap(const std::map<unsigned int, std::set<int>>& cpusMap, unsigned int n, pthread_t tid)
+{
+ const auto& cpuMapping = cpusMap.find(n);
+ if (cpuMapping == cpusMap.cend()) {
+ return;
+ }
+ int rc = mapThreadToCPUList(tid, cpuMapping->second);
+ if (rc == 0) {
+ g_log << Logger::Info << "CPU affinity for thread " << n << " has been set to CPU map:";
+ for (const auto cpu : cpuMapping->second) {
+ g_log << Logger::Info << " " << cpu;
+ }
+ g_log << Logger::Info << endl;
+ }
+ else {
+ g_log << Logger::Warning << "Error setting CPU affinity for thread " << n << " to CPU map:";
+ for (const auto cpu : cpuMapping->second) {
+ g_log << Logger::Info << " " << cpu;
+ }
+ g_log << Logger::Info << ' ' << strerror(rc) << endl;
+ }
+}
+
+void RecThreadInfo::start(unsigned int id, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap)
+{
+ thread = std::thread([id, name] {
+ t_id = id;
+ const string threadPrefix = "rec/";
+ setThreadName(threadPrefix + name);
+ recursorThread();
+ });
+ setCPUMap(cpusMap, id, thread.native_handle());
+}
+
+int RecThreadInfo::runThreads()
+{
+ int ret = EXIT_SUCCESS;
+ unsigned int currentThreadId = 1;
+ const auto cpusMap = parseCPUMap();
+
+ if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
+ g_log << Logger::Warning << "Operating unthreaded" << endl;
+
+#ifdef HAVE_SYSTEMD
+ sd_notify(0, "READY=1");
+#endif
+
+ /* This thread handles the web server, carbon, statistics and the control channel */
+ auto& handlerInfo = RecThreadInfo::info(0);
+ handlerInfo.setHandler();
+ handlerInfo.start(0, "web+stat", cpusMap);
+ auto& taskInfo = RecThreadInfo::info(2);
+ taskInfo.setTaskThread();
+ taskInfo.start(2, "tasks", cpusMap);
+
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.setListener();
+ info.setWorker();
+ info.setThreadId(currentThreadId++);
+ recursorThread();
+
+ handlerInfo.thread.join();
+ if (handlerInfo.exitCode != 0) {
+ ret = handlerInfo.exitCode;
+ }
+ taskInfo.thread.join();
+ if (taskInfo.exitCode != 0) {
+ ret = taskInfo.exitCode;
+ }
+ }
+ else {
+ // Setup RecThreadInfo objects
+ unsigned int tmp = currentThreadId;
+ if (RecThreadInfo::weDistributeQueries()) {
+ for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
+ RecThreadInfo::info(tmp++).setListener();
+ }
+ }
+ for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
+ auto& info = RecThreadInfo::info(tmp++);
+ info.setListener(!RecThreadInfo::weDistributeQueries());
+ info.setWorker();
+ }
+ for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+ auto& info = RecThreadInfo::info(tmp++);
+ info.setTaskThread();
+ }
+
+ // And now start the actual threads
+ if (RecThreadInfo::weDistributeQueries()) {
+ g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl;
+ for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.start(currentThreadId++, "distr", cpusMap);
+ }
+ }
+
+ g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl;
+
+ for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.start(currentThreadId++, "worker", cpusMap);
+ }
+
+ for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.start(currentThreadId++, "taskThread", cpusMap);
+ }
+
+#ifdef HAVE_SYSTEMD
+ sd_notify(0, "READY=1");
+#endif
+
+ /* This thread handles the web server, carbon, statistics and the control channel */
+ auto& info = RecThreadInfo::info(0);
+ info.setHandler();
+ info.start(0, "web+stat", cpusMap);
+
+ for (auto& ti : RecThreadInfo::infos()) {
+ ti.thread.join();
+ if (ti.exitCode != 0) {
+ ret = ti.exitCode;
+ }
+ }
+ }
+ return ret;
+}
+
+void RecThreadInfo::makeThreadPipes()
+{
+ auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
+ if (pipeBufferSize > 0) {
+ g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl;
+ }
+
+ /* thread 0 is the handler / SNMP, worker threads start at 1 */
+ for (unsigned int n = 0; n < numRecursorThreads(); ++n) {
+ auto& threadInfo = info(n);
+
+ int fd[2];
+ if (pipe(fd) < 0)
+ unixDie("Creating pipe for inter-thread communications");
+
+ threadInfo.pipes.readToThread = fd[0];
+ threadInfo.pipes.writeToThread = fd[1];
+
+ // handler thread only gets first pipe, not the others
+ if (n == 0) {
+ continue;
+ }
+
+ if (pipe(fd) < 0)
+ unixDie("Creating pipe for inter-thread communications");
+
+ threadInfo.pipes.readFromThread = fd[0];
+ threadInfo.pipes.writeFromThread = fd[1];
+
+ if (pipe(fd) < 0)
+ unixDie("Creating pipe for inter-thread communications");
+
+ threadInfo.pipes.readQueriesToThread = fd[0];
+ threadInfo.pipes.writeQueriesToThread = fd[1];
+
+ if (pipeBufferSize > 0) {
+ if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
+ int err = errno;
+ g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
+ auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
+ if (existingSize > 0) {
+ g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
+ }
+ }
+ }
+
+ if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) {
+ unixDie("Making pipe for inter-thread communications non-blocking");
+ }
+ }
+}
+
ArgvMap& arg()
{
static ArgvMap theArg;
}
}
-static std::map<unsigned int, std::set<int>> parseCPUMap()
-{
- std::map<unsigned int, std::set<int>> result;
-
- const std::string value = ::arg()["cpu-map"];
-
- if (!value.empty() && !isSettingThreadCPUAffinitySupported()) {
- g_log << Logger::Warning << "CPU mapping requested but not supported, skipping" << endl;
- return result;
- }
-
- std::vector<std::string> parts;
-
- stringtok(parts, value, " \t");
-
- for (const auto& part : parts) {
- if (part.find('=') == string::npos)
- continue;
-
- try {
- auto headers = splitField(part, '=');
- boost::trim(headers.first);
- boost::trim(headers.second);
-
- unsigned int threadId = pdns_stou(headers.first);
- std::vector<std::string> cpus;
-
- stringtok(cpus, headers.second, ",");
-
- for (const auto& cpu : cpus) {
- int cpuId = std::stoi(cpu);
-
- result[threadId].insert(cpuId);
- }
- }
- catch (const std::exception& e) {
- g_log << Logger::Error << "Error parsing cpu-map entry '" << part << "': " << e.what() << endl;
- }
- }
-
- return result;
-}
-
-static void setCPUMap(const std::map<unsigned int, std::set<int>>& cpusMap, unsigned int n, pthread_t tid)
-{
- const auto& cpuMapping = cpusMap.find(n);
- if (cpuMapping != cpusMap.cend()) {
- int rc = mapThreadToCPUList(tid, cpuMapping->second);
- if (rc == 0) {
- g_log << Logger::Info << "CPU affinity for worker " << n << " has been set to CPU map:";
- for (const auto cpu : cpuMapping->second) {
- g_log << Logger::Info << " " << cpu;
- }
- g_log << Logger::Info << endl;
- }
- else {
- g_log << Logger::Warning << "Error setting CPU affinity for worker " << n << " to CPU map:";
- for (const auto cpu : cpuMapping->second) {
- g_log << Logger::Info << " " << cpu;
- }
- g_log << Logger::Info << strerror(rc) << endl;
- }
- }
-}
-
static void checkSocketDir(void)
{
struct stat st;
g_log << u << buf.str() << endl;
}
-static void makeThreadPipes()
-{
- auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
- if (pipeBufferSize > 0) {
- g_log << Logger::Info << "Resizing the buffer of the distribution pipe to " << pipeBufferSize << endl;
- }
-
- /* thread 0 is the handler / SNMP, worker threads start at 1 */
- for (unsigned int n = 0; n < RecThreadInfo::numRecursorThreads(); ++n) {
- auto& threadInfo = RecThreadInfo::info(n);
-
- int fd[2];
- if (pipe(fd) < 0)
- unixDie("Creating pipe for inter-thread communications");
-
- threadInfo.pipes.readToThread = fd[0];
- threadInfo.pipes.writeToThread = fd[1];
-
- // handler thread only gets first pipe, not the others
- if (n == 0) {
- continue;
- }
-
- if (pipe(fd) < 0)
- unixDie("Creating pipe for inter-thread communications");
-
- threadInfo.pipes.readFromThread = fd[0];
- threadInfo.pipes.writeFromThread = fd[1];
-
- if (pipe(fd) < 0)
- unixDie("Creating pipe for inter-thread communications");
-
- threadInfo.pipes.readQueriesToThread = fd[0];
- threadInfo.pipes.writeQueriesToThread = fd[1];
-
- if (pipeBufferSize > 0) {
- if (!setPipeBufferSize(threadInfo.pipes.writeQueriesToThread, pipeBufferSize)) {
- int err = errno;
- g_log << Logger::Warning << "Error resizing the buffer of the distribution pipe for thread " << n << " to " << pipeBufferSize << ": " << strerror(err) << endl;
- auto existingSize = getPipeBufferSize(threadInfo.pipes.writeQueriesToThread);
- if (existingSize > 0) {
- g_log << Logger::Warning << "The current size of the distribution pipe's buffer for thread " << n << " is " << existingSize << endl;
- }
- }
- }
-
- if (!setNonBlocking(threadInfo.pipes.writeQueriesToThread)) {
- unixDie("Making pipe for inter-thread communications non-blocking");
- }
- }
-}
static int ratePercentage(uint64_t nom, uint64_t denom)
{
static int serviceMain(int argc, char* argv[])
{
- int ret = EXIT_SUCCESS;
-
g_slogStructured = ::arg().mustDo("structured-logging");
g_log.setName(g_programname);
startLuaConfigDelayedThreads(delayedLuaThreads, g_luaconfs.getCopy().generation);
delayedLuaThreads.rpzPrimaryThreads.clear(); // no longer needed
- makeThreadPipes();
+ RecThreadInfo::makeThreadPipes();
g_tcpTimeout = ::arg().asNum("client-tcp-timeout");
g_maxTCPPerClient = ::arg().asNum("max-tcp-per-client");
g_avoidUdpSourcePorts.insert(port);
}
- unsigned int currentThreadId = 1;
- const auto cpusMap = parseCPUMap();
-
- if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
- g_log << Logger::Warning << "Operating unthreaded" << endl;
-#ifdef HAVE_SYSTEMD
- sd_notify(0, "READY=1");
-#endif
-
- /* This thread handles the web server, carbon, statistics and the control channel */
- auto& handlerInfo = RecThreadInfo::info(0);
- handlerInfo.setHandler();
- handlerInfo.start(0, "web+stat");
- auto& taskInfo = RecThreadInfo::info(2);
- taskInfo.setTaskThread();
- taskInfo.start(2, "tasks");
-
- setCPUMap(cpusMap, currentThreadId, pthread_self());
-
- auto& info = RecThreadInfo::info(currentThreadId);
- info.setListener();
- info.setWorker();
- info.setThreadId(currentThreadId++);
- recursorThread();
-
- handlerInfo.thread.join();
- if (handlerInfo.exitCode != 0) {
- ret = handlerInfo.exitCode;
- }
- taskInfo.thread.join();
- if (taskInfo.exitCode != 0) {
- ret = taskInfo.exitCode;
- }
- }
- else {
- // Setup RecThreadInfo objects
- unsigned int tmp = currentThreadId;
- if (RecThreadInfo::weDistributeQueries()) {
- for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
- RecThreadInfo::info(tmp++).setListener();
- }
- }
- for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
- auto& info = RecThreadInfo::info(tmp++);
- info.setListener(!RecThreadInfo::weDistributeQueries());
- info.setWorker();
- }
- for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
- auto& info = RecThreadInfo::info(tmp++);
- info.setTaskThread();
- }
-
- if (RecThreadInfo::weDistributeQueries()) {
- g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl;
- for (unsigned int n = 0; n < RecThreadInfo::numDistributors(); ++n) {
- auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "distr");
- setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
- }
- }
-
- g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl;
-
- for (unsigned int n = 0; n < RecThreadInfo::numWorkers(); ++n) {
- auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "worker");
- setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
- }
-
- for (unsigned int n = 0; n < RecThreadInfo::numTaskThreads(); ++n) {
- auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "taskThread");
- setCPUMap(cpusMap, currentThreadId, info.thread.native_handle()); // XXX off by one?
- }
-
-#ifdef HAVE_SYSTEMD
- sd_notify(0, "READY=1");
-#endif
-
- /* This thread handles the web server, carbon, statistics and the control channel */
- auto& info = RecThreadInfo::info(0);
- info.setHandler();
- info.start(0, "web+stat");
-
- for (auto& ti : RecThreadInfo::infos()) {
- ti.thread.join();
- if (ti.exitCode != 0) {
- ret = ti.exitCode;
- }
- }
- }
-
- return ret;
+ return RecThreadInfo::runThreads();
}
static void handlePipeRequest(int fd, FDMultiplexer::funcparam_t& var)
if (threadInfo.isHandler()) {
if (!primeHints()) {
- threadInfo.exitCode = EXIT_FAILURE;
+ threadInfo.setExitCode(EXIT_FAILURE);
RecursorControlChannel::stop = 1;
g_log << Logger::Critical << "Priming cache failed, stopping" << endl;
return nullptr;