if (!g_quiet || tracedQuery) {
if (!g_slogStructured) {
- g_log << Logger::Warning << RecThreadInfo::id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] " << (comboWriter->d_tcp ? "TCP " : "") << "question for '" << comboWriter->d_mdp.d_qname << "|"
+ g_log << Logger::Warning << RecThreadInfo::thread_local_id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] " << (comboWriter->d_tcp ? "TCP " : "") << "question for '" << comboWriter->d_mdp.d_qname << "|"
<< QType(comboWriter->d_mdp.d_qtype) << "' from " << comboWriter->getRemote();
if (!comboWriter->d_ednssubnet.getSource().empty()) {
g_log << " (ecs " << comboWriter->d_ednssubnet.getSource().toString() << ")";
pbMessage.setDeviceName(dnsQuestion.deviceName);
pbMessage.setToPort(comboWriter->d_destination.getPort());
pbMessage.addPolicyTags(comboWriter->d_gettagPolicyTags);
- pbMessage.setWorkerId(RecThreadInfo::id());
+ pbMessage.setWorkerId(RecThreadInfo::thread_local_id());
pbMessage.setPacketCacheHit(false);
pbMessage.setOutgoingQueries(resolver.d_outqueries);
for (const auto& metaValue : dnsQuestion.meta) {
uint64_t spentUsec = uSec(resolver.getNow() - comboWriter->d_now);
if (!g_quiet) {
if (!g_slogStructured) {
- g_log << Logger::Error << RecThreadInfo::id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] answer to " << (comboWriter->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << comboWriter->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(comboWriter->d_mdp.d_qtype);
+ g_log << Logger::Error << RecThreadInfo::thread_local_id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] answer to " << (comboWriter->d_mdp.d_header.rd ? "" : "non-rd ") << "question '" << comboWriter->d_mdp.d_qname << "|" << DNSRecordContent::NumberToType(comboWriter->d_mdp.d_qtype);
g_log << "': " << ntohs(packetWriter.getHeader()->ancount) << " answers, " << ntohs(packetWriter.getHeader()->arcount) << " additional, took " << resolver.d_outqueries << " packets, " << resolver.d_totUsec / 1000.0 << " netw ms, " << static_cast<double>(spentUsec) / 1000.0 << " tot ms, " << resolver.d_throttledqueries << " throttled, " << resolver.d_timeouts << " timeouts, " << resolver.d_tcpoutqueries << "/" << resolver.d_dotoutqueries << " tcp/dot connections, rcode=" << res;
if (!shouldNotValidate && resolver.isDNSSECValidationRequested()) {
static deferredAdd_t s_deferredUDPadds;
static deferredAdd_t s_deferredTCPadds;
-/* first we have the handler thread, t_id == 0 (some other
- helper threads like SNMP might have t_id == 0 as well)
+/* first we have the handler thread, t_id == 0 (thread not created as a RecursorThread have t_id = NOT_INITED)
then the distributor threads if any
and finally the workers */
std::vector<RecThreadInfo> RecThreadInfo::s_threadInfos;
unsigned int RecThreadInfo::s_numDistributorThreads;
unsigned int RecThreadInfo::s_numUDPWorkerThreads;
unsigned int RecThreadInfo::s_numTCPWorkerThreads;
-thread_local unsigned int RecThreadInfo::t_id;
+thread_local unsigned int RecThreadInfo::t_id{RecThreadInfo::TID_NOT_INITED};
pdns::RateLimitedLog g_rateLimitedLogger;
serveRustWeb();
}
for (auto& tInfo : RecThreadInfo::infos()) {
- if (tInfo.getName() == "web+stat") { // XXX testing for isHandler() does not work as expected!
+ // who handles the handler? the caller!
+ if (tInfo.isHandler()) {
continue;
}
tInfo.thread.join();
msg.setRequestorId(requestorId);
msg.setDeviceId(deviceId);
msg.setDeviceName(deviceName);
- msg.setWorkerId(RecThreadInfo::id());
+ msg.setWorkerId(RecThreadInfo::thread_local_id());
// For queries, packetCacheHit and outgoingQueries are not relevant
if (!policyTags.empty()) {
pbMessage.setDeviceId(deviceId);
pbMessage.setDeviceName(deviceName);
pbMessage.setToPort(destination.getPort());
- pbMessage.setWorkerId(RecThreadInfo::id());
+ pbMessage.setWorkerId(RecThreadInfo::thread_local_id());
// this method is only used for PC cache hits
pbMessage.setPacketCacheHit(true);
// we do not set outgoingQueries, it is not relevant for PC cache hits
}
// Thread id filled in by backend, since the SL code does not know about RecursorThreads
// We use the Recursor thread, other threads get id 0. May need to revisit.
- appendKeyAndVal("TID", std::to_string(RecThreadInfo::id()));
+ appendKeyAndVal("TID", std::to_string(RecThreadInfo::thread_local_id()));
vector<iovec> iov;
iov.reserve(strings.size());
{"level", std::to_string(entry.level)},
// Thread id filled in by backend, since the SL code does not know about RecursorThreads
// We use the Recursor thread, other threads get id 0. May need to revisit.
- {"tid", std::to_string(RecThreadInfo::id())},
+ {"tid", std::to_string(RecThreadInfo::thread_local_id())},
{"ts", Logging::toTimestampStringMilli(entry.d_timestamp, timebuf)},
};
}
// Thread id filled in by backend, since the SL code does not know about RecursorThreads
// We use the Recursor thread, other threads get id 0. May need to revisit.
- buf << " tid=" << std::quoted(std::to_string(RecThreadInfo::id()));
+ buf << " tid=" << std::quoted(std::to_string(RecThreadInfo::thread_local_id()));
std::array<char, 64> timebuf{};
buf << " ts=" << std::quoted(Logging::toTimestampStringMilli(entry.d_timestamp, timebuf));
for (auto const& value : entry.values) {
void broadcastFunction(const pipefunc_t& func)
{
- /* This function might be called by the worker with t_id 0 during startup
+ /* This function might be called by the worker with t_id not inited during startup
for the initialization of ACLs and domain maps. After that it should only
be called by the handler. */
- if (RecThreadInfo::infos().empty() && RecThreadInfo::id() == 0) {
+ if (RecThreadInfo::infos().empty() && !RecThreadInfo::is_thread_inited()) {
/* the handler and distributors will call themselves below, but
during startup we get called while g_threadInfos has not been
populated yet to update the ACL or domain maps, so we need to
unsigned int thread = 0;
for (const auto& threadInfo : RecThreadInfo::infos()) {
- if (thread++ == RecThreadInfo::id()) {
+ if (thread++ == RecThreadInfo::thread_local_id()) {
func(); // don't write to ourselves!
continue;
}
template <class T>
T broadcastAccFunction(const std::function<T*()>& func)
{
- if (!RecThreadInfo::self().isHandler()) {
- SLOG(g_log << Logger::Error << "broadcastAccFunction has been called by a worker (" << RecThreadInfo::id() << ")" << endl,
- g_slog->withName("runtime")->info(Logr::Critical, "broadcastAccFunction has been called by a worker")); // tid will be added
+ if (RecThreadInfo::thread_local_id() != 0) {
+ g_slog->withName("runtime")->info(Logr::Critical, "broadcastAccFunction has been called by a worker"); // tid will be added
_exit(1);
}
unsigned int thread = 0;
T ret = T();
for (const auto& threadInfo : RecThreadInfo::infos()) {
- if (thread++ == RecThreadInfo::id()) {
+ if (thread++ == RecThreadInfo::thread_local_id()) {
continue;
}
g_reusePort = ::arg().mustDo("reuseport");
#endif
- RecThreadInfo::infos().resize(RecThreadInfo::numRecursorThreads());
+ RecThreadInfo::resize(RecThreadInfo::numRecursorThreads());
if (g_reusePort) {
unsigned int threadNum = 1;
t_pdl->loadFile(fname);
}
catch (std::runtime_error& ex) {
- string msg = std::to_string(RecThreadInfo::id()) + " Retaining current script, could not read '" + fname + "': " + ex.what();
+ string msg = std::to_string(RecThreadInfo::thread_local_id()) + " Retaining current script, could not read '" + fname + "': " + ex.what();
SLOG(g_log << Logger::Error << msg << endl,
log->error(Logr::Error, ex.what(), "Retaining current script, could not read new script"));
return new RecursorControlChannel::Answer{1, msg + "\n"};
}
}
catch (std::exception& e) {
- SLOG(g_log << Logger::Error << RecThreadInfo::id() << " Retaining current script, error from '" << fname << "': " << e.what() << endl,
+ SLOG(g_log << Logger::Error << RecThreadInfo::thread_local_id() << " Retaining current script, error from '" << fname << "': " << e.what() << endl,
log->error(Logr::Error, e.what(), "Retaining current script, error in new script"));
return new RecursorControlChannel::Answer{1, string("retaining current script, error from '" + fname + "': " + e.what() + "\n")};
}
// For communicating with our threads effectively readonly after
// startup.
-// First we have the handler thread, t_id == 0 (some other helper
-// threads like SNMP might have t_id == 0 as well) then the
+// First we have the handler thread, t_id == 0 then the
// distributor threads if any and finally the workers
struct RecThreadInfo
{
public:
static RecThreadInfo& self()
{
- return s_threadInfos.at(t_id);
+ auto& info = s_threadInfos.at(t_id);
+ assert(info.d_myid == t_id); // internal consistency check
+ return info;
}
static RecThreadInfo& info(unsigned int index)
{
- return s_threadInfos.at(index);
+ auto& info = s_threadInfos.at(index);
+ assert(info.d_myid == index);
+ return info;
}
static vector<RecThreadInfo>& infos()
[[nodiscard]] bool isDistributor() const
{
- if (t_id == 0) {
- return false;
- }
return s_weDistributeQueries && listener;
}
[[nodiscard]] bool isHandler() const
{
- if (t_id == 0) {
- return true;
- }
return handler;
}
taskThread = true;
}
- static unsigned int id()
+ static unsigned int thread_local_id()
{
+ if (t_id == TID_NOT_INITED) {
+ return 0; // backward compatibility
+ }
return t_id;
}
+ static bool is_thread_inited()
+ {
+ return t_id != TID_NOT_INITED;
+ }
+
+ [[nodiscard]] unsigned int id() const
+ {
+ return d_myid;
+ }
+
static void setThreadId(unsigned int arg)
{
t_id = arg;
info(0).thread.join();
}
+ static void resize(size_t size)
+ {
+ s_threadInfos.resize(size);
+ for (unsigned int i = 0; i < size; i++) {
+ s_threadInfos.at(i).d_myid = i;
+ }
+ }
+ static constexpr unsigned int TID_NOT_INITED = std::numeric_limits<unsigned int>::max();
+
private:
// FD corresponding to TCP sockets this thread is listening on.
// These FDs are also in deferredAdds when we have one socket per
std::string name;
std::thread thread;
int exitCode{0};
+ unsigned int d_myid{TID_NOT_INITED}; // should always equal to the thread_local tid;
// handle the web server, carbon, statistics and the control channel
bool handler{false};