std::shared_ptr<Logr::Logger> g_slogudpout;
/* without reuseport, all listeners share the same sockets */
-deferredAdd_t g_deferredAdds;
+static deferredAdd_t g_deferredAdds;
+static deferredAdd_t g_deferredTCPAdds;
/* first we have the handler thread, t_id == 0 (some other
helper threads like SNMP might have t_id == 0 as well)
bool RecThreadInfo::s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
unsigned int RecThreadInfo::s_numDistributorThreads;
unsigned int RecThreadInfo::s_numWorkerThreads;
+unsigned int RecThreadInfo::s_numTCPWorkerThreads;
thread_local unsigned int RecThreadInfo::t_id;
static std::map<unsigned int, std::set<int>> parseCPUMap(Logr::log_t log)
int RecThreadInfo::runThreads(Logr::log_t log)
{
int ret = EXIT_SUCCESS;
- unsigned int currentThreadId = 1;
const auto cpusMap = parseCPUMap(log);
if (RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() == 1) {
log->info(Logr::Notice, "Operating with single distributor/worker thread"));
/* This thread handles the web server, carbon, statistics and the control channel */
- auto& handlerInfo = RecThreadInfo::info(0);
+ unsigned int currentThreadId = 0;
+ auto& handlerInfo = RecThreadInfo::info(currentThreadId);
handlerInfo.setHandler();
- handlerInfo.start(0, "web+stat", cpusMap, log);
- auto& taskInfo = RecThreadInfo::info(2);
- taskInfo.setTaskThread();
- taskInfo.start(2, "task", cpusMap, log);
+ handlerInfo.start(currentThreadId, "web+stat", cpusMap, log);
+ // We skip the single UDP worker thread 1, it's handled after the loop and taskthreads
+ currentThreadId = 2;
+ for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.setListener();
+ info.setTCPListener();
+ info.setWorker();
+ info.start(currentThreadId, "tcpworker", cpusMap, log);
+ }
+
+ for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) {
+ auto& taskInfo = RecThreadInfo::info(currentThreadId);
+ taskInfo.setTaskThread();
+ taskInfo.start(currentThreadId, "task", cpusMap, log);
+ }
+
+ currentThreadId = 1;
auto& info = RecThreadInfo::info(currentThreadId);
info.setListener();
+ info.setTCPListener();
info.setWorker();
- RecThreadInfo::setThreadId(currentThreadId++);
+ RecThreadInfo::setThreadId(currentThreadId);
recursorThread();
- handlerInfo.thread.join();
- if (handlerInfo.exitCode != 0) {
- ret = handlerInfo.exitCode;
- }
- taskInfo.thread.join();
- if (taskInfo.exitCode != 0) {
- ret = taskInfo.exitCode;
+ for (unsigned int thread = 0; thread < RecThreadInfo::numRecursorThreads(); thread++) {
+ if (thread == 1) {
+ continue;
+ }
+ auto& tInfo = RecThreadInfo::info(thread);
+ tInfo.thread.join();
+ if (tInfo.exitCode != 0) {
+ ret = tInfo.exitCode;
+ }
}
}
else {
// Setup RecThreadInfo objects
- unsigned int tmp = currentThreadId;
+ unsigned int currentThreadId = 1;
if (RecThreadInfo::weDistributeQueries()) {
- for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); ++thread) {
- RecThreadInfo::info(tmp++).setListener();
+ for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); thread++, currentThreadId++) {
+ RecThreadInfo::info(currentThreadId).setListener();
}
}
- for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); ++thread) {
- auto& info = RecThreadInfo::info(tmp++);
+ for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+ auto& info = RecThreadInfo::info(currentThreadId);
info.setListener(!RecThreadInfo::weDistributeQueries());
info.setWorker();
}
- for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); ++thread) {
- auto& info = RecThreadInfo::info(tmp++);
+ for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.setListener();
+ info.setTCPListener();
+ info.setWorker();
+ }
+ for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) {
+ auto& info = RecThreadInfo::info(currentThreadId);
info.setTaskThread();
}
// And now start the actual threads
+ currentThreadId = 1;
if (RecThreadInfo::weDistributeQueries()) {
SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numDistributors() << " distributor threads" << endl,
log->info(Logr::Notice, "Launching distributor threads", "count", Logging::Loggable(RecThreadInfo::numDistributors())));
- for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); ++thread) {
+ for (unsigned int thread = 0; thread < RecThreadInfo::numDistributors(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "distr", cpusMap, log);
+ info.start(currentThreadId, "distr", cpusMap, log);
}
}
SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numWorkers() << " worker threads" << endl,
log->info(Logr::Notice, "Launching worker threads", "count", Logging::Loggable(RecThreadInfo::numWorkers())));
- for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); ++thread) {
+ for (unsigned int thread = 0; thread < RecThreadInfo::numWorkers(); thread++, currentThreadId++) {
+ auto& info = RecThreadInfo::info(currentThreadId);
+ info.start(currentThreadId, "worker", cpusMap, log);
+ }
+
+ SLOG(g_log << Logger::Warning << "Launching " << RecThreadInfo::numTCPWorkers() << " tcpworker threads" << endl,
+ log->info(Logr::Notice, "Launching tcpworker threads", "count", Logging::Loggable(RecThreadInfo::numTCPWorkers())));
+
+ for (unsigned int thread = 0; thread < RecThreadInfo::numTCPWorkers(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "worker", cpusMap, log);
+ info.start(currentThreadId, "tcpworker", cpusMap, log);
}
- for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); ++thread) {
+ for (unsigned int thread = 0; thread < RecThreadInfo::numTaskThreads(); thread++, currentThreadId++) {
auto& info = RecThreadInfo::info(currentThreadId);
- info.start(currentThreadId++, "task", cpusMap, log);
+ info.start(currentThreadId, "task", cpusMap, log);
}
/* This thread handles the web server, carbon, statistics and the control channel */
- auto& info = RecThreadInfo::info(0);
+ currentThreadId = 0;
+ auto& info = RecThreadInfo::info(currentThreadId);
info.setHandler();
- info.start(0, "web+stat", cpusMap, log);
+ info.start(currentThreadId, "web+stat", cpusMap, log);
for (auto& tInfo : RecThreadInfo::infos()) {
tInfo.thread.join();
size_t idx = 0;
for (const auto& threadInfo : RecThreadInfo::infos()) {
if (threadInfo.isWorker()) {
- SLOG(g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.numberOfDistributedQueries << " queries" << endl,
- log->info(Logr::Info, "Queries handled by thread", "thread", Logging::Loggable(idx), "count", Logging::Loggable(threadInfo.numberOfDistributedQueries)));
+ SLOG(g_log << Logger::Notice << "stats: thread " << idx << " has been distributed " << threadInfo.getNumberOfDistributedQueries() << " queries" << endl,
+ log->info(Logr::Info, "Queries handled by thread", "thread", Logging::Loggable(idx), "count", Logging::Loggable(threadInfo.getNumberOfDistributedQueries())));
++idx;
}
}
}
unsigned int thread = 0;
- for (const auto& threadInfo : RecThreadInfo::infos()) {
+ for (auto& threadInfo : RecThreadInfo::infos()) {
if (thread++ == RecThreadInfo::id()) {
func(); // don't write to ourselves!
continue;
ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: manual ownership handling
tmsg->func = func;
tmsg->wantAnswer = true;
- if (write(threadInfo.pipes.writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: sizeof correct
+ if (write(threadInfo.getPipes().writeToThread, &tmsg, sizeof(tmsg)) != sizeof(tmsg)) { // NOLINT: sizeof correct
delete tmsg; // NOLINT: manual ownership handling
unixDie("write to thread pipe returned wrong size or error");
}
string* resp = nullptr;
- if (read(threadInfo.pipes.readFromThread, &resp, sizeof(resp)) != sizeof(resp)) { // NOLINT: sizeof correct
+ if (read(threadInfo.getPipes().readFromThread, &resp, sizeof(resp)) != sizeof(resp)) { // NOLINT: sizeof correct
unixDie("read from thread pipe returned wrong size or error");
}
unsigned int thread = 0;
T ret = T();
- for (const auto& threadInfo : RecThreadInfo::infos()) {
+ for (auto& threadInfo : RecThreadInfo::infos()) {
if (thread++ == RecThreadInfo::id()) {
continue;
}
- const auto& tps = threadInfo.pipes;
+ const auto& tps = threadInfo.getPipes();
ThreadMSG* tmsg = new ThreadMSG(); // NOLINT: manual ownership handling
tmsg->func = [func] { return voider<T>(func); };
tmsg->wantAnswer = true;
g_reusePort = ::arg().mustDo("reuseport");
#endif
- RecThreadInfo::infos().resize(RecThreadInfo::numHandlers() + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers() + RecThreadInfo::numTaskThreads());
+ RecThreadInfo::infos().resize(RecThreadInfo::numRecursorThreads());
if (g_reusePort) {
+ unsigned int threadNum = 1;
if (RecThreadInfo::weDistributeQueries()) {
/* first thread is the handler, then distributors */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
- auto& info = RecThreadInfo::info(threadId);
- auto& deferredAdds = info.deferredAdds;
- auto& tcpSockets = info.tcpSockets;
+ for (unsigned int i = 0; i < RecThreadInfo::numDistributors(); i++, threadNum++) {
+ auto& info = RecThreadInfo::info(threadNum);
+ auto& deferredAdds = info.getDeferredAdds();
makeUDPServerSockets(deferredAdds, log);
- makeTCPServerSockets(deferredAdds, tcpSockets, log);
}
}
else {
/* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
- auto& info = RecThreadInfo::info(threadId);
- auto& deferredAdds = info.deferredAdds;
- auto& tcpSockets = info.tcpSockets;
+ for (unsigned int i = 0; i < RecThreadInfo::numWorkers(); i++, threadNum++) {
+ auto& info = RecThreadInfo::info(threadNum);
+ auto& deferredAdds = info.getDeferredAdds();
makeUDPServerSockets(deferredAdds, log);
- makeTCPServerSockets(deferredAdds, tcpSockets, log);
}
}
+ threadNum = 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers();
+ for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++, threadNum++) {
+ auto& info = RecThreadInfo::info(threadNum);
+ auto& deferredAdds = info.getDeferredAdds();
+ auto& tcpSockets = info.getTCPSockets();
+ makeTCPServerSockets(deferredAdds, tcpSockets, log);
+ }
}
else {
std::set<int> tcpSockets;
/* we don't have reuseport so we can only open one socket per
listening addr:port and everyone will listen on it */
makeUDPServerSockets(g_deferredAdds, log);
- makeTCPServerSockets(g_deferredAdds, tcpSockets, log);
+ makeTCPServerSockets(g_deferredTCPAdds, tcpSockets, log);
- /* every listener (so distributor if g_weDistributeQueries, workers otherwise)
- needs to listen to the shared sockets */
- if (RecThreadInfo::weDistributeQueries()) {
- /* first thread is the handler, then distributors */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::numDistributors(); threadId++) {
- RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
- }
- }
- else {
- /* first thread is the handler, there is no distributor here and workers are accepting queries */
- for (unsigned int threadId = 1; threadId <= RecThreadInfo::numWorkers(); threadId++) {
- RecThreadInfo::info(threadId).tcpSockets = tcpSockets;
- }
+ // TCP queries are handled by TCP workers
+ for (unsigned int i = 0; i < RecThreadInfo::numTCPWorkers(); i++) {
+ auto& info = RecThreadInfo::info(i + 1 + RecThreadInfo::numDistributors() + RecThreadInfo::numWorkers());
+ info.setTCPSockets(tcpSockets);
}
}
}
vector<string> parts;
stringtok(parts, ::arg()["dot-to-auth-names"], " ,");
#ifndef HAVE_DNS_OVER_TLS
- if (parts.size()) {
+ if (!parts.empty()) {
SLOG(g_log << Logger::Error << "dot-to-auth-names setting contains names, but Recursor was built without DNS over TLS support. Setting will be ignored." << endl,
log->info(Logr::Error, "dot-to-auth-names setting contains names, but Recursor was built without DNS over TLS support. Setting will be ignored"));
}
log->info(Logr::Warning, "Asked to run with 0 threads, raising to 1 instead"));
RecThreadInfo::setNumWorkerThreads(1);
}
+ RecThreadInfo::setNumTCPWorkerThreads(1); // XXX
+ if (RecThreadInfo::numTCPWorkers() < 1) {
+ SLOG(g_log << Logger::Warning << "Asked to run with 0 tcpthreads, raising to 1 instead" << endl,
+ log->info(Logr::Warning, "Asked to run with 0 tcpthreads, raising to 1 instead"));
+ RecThreadInfo::setNumTCPWorkerThreads(1);
+ }
g_maxMThreads = ::arg().asNum("max-mthreads");
}
}
if (tmsg->wantAnswer) {
- if (write(RecThreadInfo::self().pipes.writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
+ if (write(RecThreadInfo::self().getPipes().writeFromThread, &resp, sizeof(resp)) != sizeof(resp)) {
delete tmsg; // NOLINT: manual ownership handling
unixDie("write to thread pipe returned wrong size or error");
}
SLOG(g_log << Logger::Info << "Received rec_control command '" << msg << "' via controlsocket" << endl,
log->info(Logr::Info, "Received rec_control command via control socket", "command", Logging::Loggable(msg)));
- RecursorControlParser rcp;
RecursorControlParser::func_t* command = nullptr;
- auto answer = rcp.getAnswer(clientfd, msg, &command);
+ auto answer = RecursorControlParser::getAnswer(clientfd, msg, &command);
g_rcc.send(clientfd, answer);
command();
void runIfDue(struct timeval& now, const std::function<void()>& function)
{
if (last_run < now - period) {
- // cerr << RecThreadInfo::id() << ' ' << name << ' ' << now.tv_sec << '.' << now.tv_usec << " running" << endl;
function();
Utility::gettimeofday(&last_run);
now = last_run;
static void runTCPMaintenance(RecThreadInfo& threadInfo, bool& listenOnTCP, unsigned int maxTcpClients)
{
- if (threadInfo.isListener()) {
+ if (threadInfo.isTCPListener()) {
if (listenOnTCP) {
if (TCPConnection::getCurrentConnections() > maxTcpClients) { // shutdown, too many connections
- for (const auto fileDesc : threadInfo.tcpSockets) {
+ for (const auto fileDesc : threadInfo.getTCPSockets()) {
t_fdm->removeReadFD(fileDesc);
}
listenOnTCP = false;
}
else {
if (TCPConnection::getCurrentConnections() <= maxTcpClients) { // reenable
- for (const auto fileDesc : threadInfo.tcpSockets) {
+ for (const auto fileDesc : threadInfo.getTCPSockets()) {
t_fdm->addReadFD(fileDesc, handleNewTCPQuestion);
}
listenOnTCP = true;
t_bogusqueryring->set_capacity(ringsize);
}
g_multiTasker = std::make_unique<MT_t>(::arg().asNum("stack-size"), ::arg().asNum("stack-cache-size"));
- threadInfo.mt = g_multiTasker.get();
+ threadInfo.setMT(g_multiTasker.get());
/* start protobuf export threads if needed */
auto luaconfsLocal = g_luaconfs.getLocal();
std::unique_ptr<RecursorWebServer> rws;
- t_fdm->addReadFD(threadInfo.pipes.readToThread, handlePipeRequest);
+ t_fdm->addReadFD(threadInfo.getPipes().readToThread, handlePipeRequest);
if (threadInfo.isHandler()) {
if (::arg().mustDo("webserver")) {
log->info(Logr::Info, "Enabled multiplexer", "name", Logging::Loggable(t_fdm->getName())));
}
else {
- t_fdm->addReadFD(threadInfo.pipes.readQueriesToThread, handlePipeRequest);
+ t_fdm->addReadFD(threadInfo.getPipes().readQueriesToThread, handlePipeRequest);
if (threadInfo.isListener()) {
if (g_reusePort) {
/* then every listener has its own FDs */
- for (const auto& deferred : threadInfo.deferredAdds) {
+ for (const auto& deferred : threadInfo.getDeferredAdds()) {
t_fdm->addReadFD(deferred.first, deferred.second);
}
}
else {
/* otherwise all listeners are listening on the same ones */
- for (const auto& deferred : g_deferredAdds) {
+ for (const auto& deferred : threadInfo.isTCPListener() ? g_deferredTCPAdds : g_deferredAdds) {
t_fdm->addReadFD(deferred.first, deferred.second);
}
}
}
t_traceRegex = std::make_shared<Regex>(newRegex);
t_tracefd = file;
- return new string("ok\n");
+ return new string("ok\n"); // NOLINT(cppcoreguidelines-owning-memory): it's the API
}
catch (const PDNSException& ae) {
- return new string(ae.reason + "\n");
+ return new string(ae.reason + "\n"); // NOLINT(cppcoreguidelines-owning-memory): it's the API
}
}
/* without reuseport, all listeners share the same sockets */
typedef vector<pair<int, std::function<void(int, boost::any&)>>> deferredAdd_t;
-extern deferredAdd_t g_deferredAdds;
typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
extern thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
return s_threadInfos.at(t_id);
}
- static RecThreadInfo& info(unsigned int i)
+ static RecThreadInfo& info(unsigned int index)
{
- return s_threadInfos.at(i);
+ return s_threadInfos.at(index);
}
static vector<RecThreadInfo>& infos()
return s_threadInfos;
}
- bool isDistributor() const
+ [[nodiscard]] bool isDistributor() const
{
if (t_id == 0) {
return false;
return s_weDistributeQueries && listener;
}
- bool isHandler() const
+ [[nodiscard]] bool isHandler() const
{
if (t_id == 0) {
return true;
return handler;
}
- bool isWorker() const
+ [[nodiscard]] bool isWorker() const
{
return worker;
}
- bool isListener() const
+ [[nodiscard]] bool isListener() const
{
return listener;
}
+ [[nodiscard]] bool isTCPListener() const
+ {
+ return tcplistener;
+ }
- bool isTaskThread() const
+ [[nodiscard]] bool isTaskThread() const
{
return taskThread;
}
listener = flag;
}
+ void setTCPListener(bool flag = true)
+ {
+ tcplistener = flag;
+ }
+
void setTaskThread()
{
taskThread = true;
return t_id;
}
- static void setThreadId(unsigned int id)
+ static void setThreadId(unsigned int arg)
{
- t_id = id;
+ t_id = arg;
}
- std::string getName() const
+ [[nodiscard]] std::string getName() const
{
return name;
}
return s_numWorkerThreads;
}
+ static unsigned int numTCPWorkers()
+ {
+ return s_numTCPWorkerThreads;
+ }
+
static unsigned int numDistributors()
{
return s_numDistributorThreads;
s_numWorkerThreads = n;
}
+ static void setNumTCPWorkerThreads(unsigned int n)
+ {
+ s_numTCPWorkerThreads = n;
+ }
+
static void setNumDistributorThreads(unsigned int n)
{
s_numDistributorThreads = n;
static unsigned int numRecursorThreads()
{
- return numHandlers() + numDistributors() + numWorkers() + numTaskThreads();
+ return numHandlers() + numDistributors() + numWorkers() + numTCPWorkers() + numTaskThreads();
}
static int runThreads(Logr::log_t);
static void makeThreadPipes(Logr::log_t);
- void setExitCode(int e)
+ void setExitCode(int n)
{
- exitCode = e;
+ exitCode = n;
}
+ std::set<int>& getTCPSockets()
+ {
+ return tcpSockets;
+ }
+
+ void setTCPSockets(std::set<int>& socks)
+ {
+ tcpSockets = socks;
+ }
+
+ deferredAdd_t& getDeferredAdds()
+ {
+ return deferredAdds;
+ }
+
+ ThreadPipeSet& getPipes()
+ {
+ return pipes;
+ }
+
+ [[nodiscard]] uint64_t getNumberOfDistributedQueries() const
+ {
+ return numberOfDistributedQueries;
+ }
+
+ void incNumberOfDistributedQueries()
+ {
+ numberOfDistributedQueries++;
+ }
+
+ MT_t* getMT()
+ {
+ return mt;
+ }
+
+ void setMT(MT_t* theMT)
+ {
+ mt = theMT;
+ }
+
+private:
// FD corresponding to TCP sockets this thread is listening on.
// These FDs are also in deferredAdds when we have one socket per
// listener, and in g_deferredAdds instead.
MT_t* mt{nullptr};
uint64_t numberOfDistributedQueries{0};
-private:
- void start(unsigned int id, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
+ void start(unsigned int theId, const string& name, const std::map<unsigned int, std::set<int>>& cpusMap, Logr::log_t);
std::string name;
std::thread thread;
bool handler{false};
// accept incoming queries (and distributes them to the workers if pdns-distributes-queries is set)
bool listener{false};
+ // accept incoming TCP queries (and distributes them to the workers if pdns-distributes-queries is set)
+ bool tcplistener{false};
// process queries
bool worker{false};
// run async tasks: from TaskQueue and ZoneToCache
static bool s_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
static unsigned int s_numDistributorThreads;
static unsigned int s_numWorkerThreads;
+ static unsigned int s_numTCPWorkerThreads;
};
struct ThreadMSG