//! The qthread receives questions over the internet via the Nameserver class, and hands them to the Distributor for further processing
static void qthread(unsigned int num)
-try {
- setThreadName("pdns/receiver");
-
- s_distributors[num] = DNSDistributor::Create(::arg().asNum("distributor-threads", 1));
- DNSDistributor* distributor = s_distributors[num]; // the big dispatcher!
- DNSPacket question(true);
- DNSPacket cached(false);
-
- AtomicCounter& numreceived = *S.getPointer("udp-queries");
- AtomicCounter& numreceiveddo = *S.getPointer("udp-do-queries");
- AtomicCounter& numreceivedcookie = *S.getPointer("udp-cookie-queries");
-
- AtomicCounter& numreceived4 = *S.getPointer("udp4-queries");
-
- AtomicCounter& numreceived6 = *S.getPointer("udp6-queries");
- AtomicCounter& overloadDrops = *S.getPointer("overload-drops");
-
- int diff, start;
- bool logDNSQueries = ::arg().mustDo("log-dns-queries");
- shared_ptr<UDPNameserver> NS;
- std::string buffer;
- ComboAddress accountremote;
-
- // If we have SO_REUSEPORT then create a new port for all receiver threads
- // other than the first one.
- if (s_udpNameserver->canReusePort()) {
- NS = s_udpReceivers[num];
- if (NS == nullptr) {
+{
+ try {
+ setThreadName("pdns/receiver");
+
+ s_distributors[num] = DNSDistributor::Create(::arg().asNum("distributor-threads", 1));
+ DNSDistributor* distributor = s_distributors[num]; // the big dispatcher!
+ DNSPacket question(true);
+ DNSPacket cached(false);
+
+ AtomicCounter& numreceived = *S.getPointer("udp-queries");
+ AtomicCounter& numreceiveddo = *S.getPointer("udp-do-queries");
+ AtomicCounter& numreceivedcookie = *S.getPointer("udp-cookie-queries");
+
+ AtomicCounter& numreceived4 = *S.getPointer("udp4-queries");
+
+ AtomicCounter& numreceived6 = *S.getPointer("udp6-queries");
+ AtomicCounter& overloadDrops = *S.getPointer("overload-drops");
+
+ int diff, start;
+ bool logDNSQueries = ::arg().mustDo("log-dns-queries");
+ shared_ptr<UDPNameserver> NS;
+ std::string buffer;
+ ComboAddress accountremote;
+
+ // If we have SO_REUSEPORT then create a new port for all receiver threads
+ // other than the first one.
+ if (s_udpNameserver->canReusePort()) {
+ NS = s_udpReceivers[num];
+ if (NS == nullptr) {
+ NS = s_udpNameserver;
+ }
+ }
+ else {
NS = s_udpNameserver;
}
- }
- else {
- NS = s_udpNameserver;
- }
-
- for (;;) {
- try {
- if (g_proxyProtocolACL.empty()) {
- buffer.resize(DNSPacket::s_udpTruncationThreshold);
- }
- else {
- buffer.resize(DNSPacket::s_udpTruncationThreshold + g_proxyProtocolMaximumSize);
- }
- if (!NS->receive(question, buffer)) { // receive a packet inline
- continue; // packet was broken, try again
- }
+ for (;;) {
+ try {
+ if (g_proxyProtocolACL.empty()) {
+ buffer.resize(DNSPacket::s_udpTruncationThreshold);
+ }
+ else {
+ buffer.resize(DNSPacket::s_udpTruncationThreshold + g_proxyProtocolMaximumSize);
+ }
- diff = question.d_dt.udiffNoReset();
- receive_latency = 0.999 * receive_latency + 0.001 * std::max(diff, 0);
+ if (!NS->receive(question, buffer)) { // receive a packet inline
+ continue; // packet was broken, try again
+ }
- numreceived++;
+ diff = question.d_dt.udiffNoReset();
+ receive_latency = 0.999 * receive_latency + 0.001 * std::max(diff, 0);
- accountremote = question.d_remote;
- if (question.d_inner_remote)
- accountremote = *question.d_inner_remote;
+ numreceived++;
- if (accountremote.sin4.sin_family == AF_INET)
- numreceived4++;
- else
- numreceived6++;
+ accountremote = question.d_remote;
+ if (question.d_inner_remote) {
+ accountremote = *question.d_inner_remote;
+ }
- if (question.d_dnssecOk)
- numreceiveddo++;
+ if (accountremote.sin4.sin_family == AF_INET) {
+ numreceived4++;
+ }
+ else {
+ numreceived6++;
+ }
- if (question.hasEDNSCookie())
- numreceivedcookie++;
+ if (question.d_dnssecOk) {
+ numreceiveddo++;
+ }
- if (question.d.qr)
- continue;
+ if (question.hasEDNSCookie()) {
+ numreceivedcookie++;
+ }
- S.ringAccount("queries", question.qdomain, question.qtype);
- S.ringAccount("remotes", question.getInnerRemote());
- if (logDNSQueries) {
- g_log << Logger::Notice << "Remote " << question.getRemoteString() << " wants '" << question.qdomain << "|" << question.qtype << "', do = " << question.d_dnssecOk << ", bufsize = " << question.getMaxReplyLen();
- if (question.d_ednsRawPacketSizeLimit > 0 && question.getMaxReplyLen() != (unsigned int)question.d_ednsRawPacketSizeLimit)
- g_log << " (" << question.d_ednsRawPacketSizeLimit << ")";
- }
+ if (question.d.qr) {
+ continue;
+ }
- if (PC.enabled() && (question.d.opcode != Opcode::Notify && question.d.opcode != Opcode::Update) && question.couldBeCached()) {
- start = diff;
- std::string view{};
- if (g_views) {
- Netmask netmask(accountremote);
- view = g_zoneCache.getViewFromNetwork(&netmask);
+ S.ringAccount("queries", question.qdomain, question.qtype);
+ S.ringAccount("remotes", question.getInnerRemote());
+ if (logDNSQueries) {
+ g_log << Logger::Notice << "Remote " << question.getRemoteString() << " wants '" << question.qdomain << "|" << question.qtype << "', do = " << question.d_dnssecOk << ", bufsize = " << question.getMaxReplyLen();
+ if (question.d_ednsRawPacketSizeLimit > 0 && question.getMaxReplyLen() != (unsigned int)question.d_ednsRawPacketSizeLimit)
+ g_log << " (" << question.d_ednsRawPacketSizeLimit << ")";
}
- bool haveSomething = PC.get(question, cached, view); // does the PacketCache recognize this question?
- if (haveSomething) {
- if (logDNSQueries)
- g_log << ": packetcache HIT" << endl;
- cached.setRemote(&question.d_remote); // inlined
- cached.d_inner_remote = question.d_inner_remote;
- cached.setSocket(question.getSocket()); // inlined
- cached.d_anyLocal = question.d_anyLocal;
- cached.setMaxReplyLen(question.getMaxReplyLen());
- cached.d.rd = question.d.rd; // copy in recursion desired bit
- cached.d.id = question.d.id;
- cached.commitD(); // commit d to the packet inlined
+ if (PC.enabled() && (question.d.opcode != Opcode::Notify && question.d.opcode != Opcode::Update) && question.couldBeCached()) {
+ start = diff;
+ std::string view{};
+ if (g_views) {
+ Netmask netmask(accountremote);
+ view = g_zoneCache.getViewFromNetwork(&netmask);
+ }
+ bool haveSomething = PC.get(question, cached, view); // does the PacketCache recognize this question?
+ if (haveSomething) {
+ if (logDNSQueries) {
+ g_log << ": packetcache HIT" << endl;
+ }
+ cached.setRemote(&question.d_remote); // inlined
+ cached.d_inner_remote = question.d_inner_remote;
+ cached.setSocket(question.getSocket()); // inlined
+ cached.d_anyLocal = question.d_anyLocal;
+ cached.setMaxReplyLen(question.getMaxReplyLen());
+ cached.d.rd = question.d.rd; // copy in recursion desired bit
+ cached.d.id = question.d.id;
+ cached.commitD(); // commit d to the packet inlined
+
+ diff = question.d_dt.udiffNoReset();
+ cache_latency = 0.999 * cache_latency + 0.001 * std::max(diff - start, 0);
+ start = diff;
+
+ NS->send(cached); // answer it then inlined
+
+ diff = question.d_dt.udiff();
+ update_latencies(start, diff);
+ continue;
+ }
diff = question.d_dt.udiffNoReset();
cache_latency = 0.999 * cache_latency + 0.001 * std::max(diff - start, 0);
- start = diff;
-
- NS->send(cached); // answer it then inlined
+ }
- diff = question.d_dt.udiff();
- update_latencies(start, diff);
+ if (distributor->isOverloaded()) {
+ if (logDNSQueries) {
+ g_log << ": Dropped query, backends are overloaded" << endl;
+ }
+ overloadDrops++;
continue;
}
- diff = question.d_dt.udiffNoReset();
- cache_latency = 0.999 * cache_latency + 0.001 * std::max(diff - start, 0);
- }
- if (distributor->isOverloaded()) {
- if (logDNSQueries)
- g_log << ": Dropped query, backends are overloaded" << endl;
- overloadDrops++;
- continue;
- }
+ if (logDNSQueries) {
+ if (PC.enabled()) {
+ g_log << ": packetcache MISS" << endl;
+ }
+ else {
+ g_log << endl;
+ }
+ }
- if (logDNSQueries) {
- if (PC.enabled()) {
- g_log << ": packetcache MISS" << endl;
+ try {
+ distributor->question(question, &sendout); // otherwise, give to the distributor
}
- else {
- g_log << endl;
+ catch (DistributorFatal& df) { // when this happens, we have leaked loads of memory. Bailing out time.
+ _exit(1);
}
}
-
- try {
- distributor->question(question, &sendout); // otherwise, give to the distributor
- }
- catch (DistributorFatal& df) { // when this happens, we have leaked loads of memory. Bailing out time.
- _exit(1);
+ catch (const std::exception& e) {
+ g_log << Logger::Error << "Caught unhandled exception in question thread: " << e.what() << endl;
}
}
- catch (const std::exception& e) {
- g_log << Logger::Error << "Caught unhandled exception in question thread: " << e.what() << endl;
- }
}
-}
-catch (PDNSException& pe) {
- g_log << Logger::Error << "Fatal error in question thread: " << pe.reason << endl;
- _exit(1);
+ catch (PDNSException& pe) {
+ g_log << Logger::Error << "Fatal error in question thread: " << pe.reason << endl;
+ _exit(1);
+ }
}
static void dummyThread()