*/
#include "dns.hh"
#include "dnsparser.hh"
+#include <stdexcept>
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "misc.hh"
#include "iputils.hh"
#include "lock.hh"
+#include "communicator.hh"
+#include "query-local-address.hh"
#include "logger.hh"
#include "ixfrdist-stats.hh"
#include "ixfrdist-web.hh"
#pragma GCC diagnostic ignored "-Wshadow"
#include <yaml-cpp/yaml.h>
#pragma GCC diagnostic pop
+#include "auth-packetcache.hh"
+#include "auth-querycache.hh"
+#include "auth-zonecache.hh"
/* BEGIN Needed because of deeper dependencies */
#include "arguments.hh"
#include "statbag.hh"
StatBag S;
+AuthPacketCache PC;
+AuthQueryCache QC;
+AuthZoneCache g_zoneCache;
ArgvMap &arg()
{
// Why a struct? This way we can add more options to a domain in the future
struct ixfrdistdomain_t {
set<ComboAddress> primaries; // A set so we can do multiple primary addresses in the future
+ std::set<ComboAddress> notify; // Set of addresses to forward NOTIFY to
uint32_t maxSOARefresh{0}; // Cap SOA refresh value to the given value in seconds
};
// Lazily implemented as a set
static LockGuarded<std::set<DNSName>> g_notifiesReceived;
+// Queue of outgoing NOTIFY
+static LockGuarded<NotificationQueue> g_notificationQueue;
+
// Condition variable for TCP handling
static std::condition_variable g_tcpHandlerCV;
static std::queue<pair<int, ComboAddress>> g_tcpRequestFDs;
// FIXME: also report zone size?
}
+static void sendNotification(int sock, const DNSName& domain, const ComboAddress& remote, uint16_t id)
+{
+ vector<string> meta;
+ vector<uint8_t> packet;
+ DNSPacketWriter pw(packet, domain, QType::SOA, 1, Opcode::Notify);
+ pw.getHeader()->id = id;
+ pw.getHeader()->aa = true;
+
+ if (sendto(sock, &packet[0], packet.size(), 0, (struct sockaddr*)(&remote), remote.getSocklen()) < 0) {
+ throw std::runtime_error("Unable to send notify to " + remote.toStringWithPort() + ": " + stringerror());
+ }
+}
+
+static void communicatorThread()
+{
+ setThreadName("ixfrdist/communicator");
+ auto sock4 = makeQuerySocket(pdns::getQueryLocalAddress(AF_INET, 0), true);
+ auto sock6 = makeQuerySocket(pdns::getQueryLocalAddress(AF_INET6, 0), true);
+
+ while (true) {
+ if (g_exiting) {
+ g_log << Logger::Notice << "Communicator thread stopped" << std::endl;
+ break;
+ }
+ {
+ std::set<int> fds = {sock4, sock6};
+ ComboAddress from;
+ char buffer[1500];
+ int sock;
+
+ // receive incoming notifications on the nonblocking socket and take them off the list
+ while (waitForMultiData(fds, 0, 0, &sock) > 0) {
+ Utility::socklen_t fromlen = sizeof(from);
+ const auto size = recvfrom(sock, buffer, sizeof(buffer), 0, (struct sockaddr*)&from, &fromlen);
+ if (size < 0) {
+ break;
+ }
+ DNSPacket p(true);
+
+ p.setRemote(&from);
+
+ if (p.parse(buffer, (size_t)size) < 0) {
+ g_log << Logger::Warning << "Unable to parse SOA notification answer from " << p.getRemote() << endl;
+ continue;
+ }
+
+ if (p.d.rcode) {
+ g_log << Logger::Warning << "Received unsuccessful notification report for '" << p.qdomain << "' from " << from.toStringWithPort() << ", error: " << RCode::to_s(p.d.rcode) << endl;
+ }
+
+ if (g_notificationQueue.lock()->removeIf(from, p.d.id, p.qdomain)) {
+ g_log << Logger::Notice << "Removed from notification list: '" << p.qdomain << "' to " << from.toStringWithPort() << " " << (p.d.rcode ? RCode::to_s(p.d.rcode) : "(was acknowledged)") << endl;
+ }
+ else {
+ g_log << Logger::Warning << "Received spurious notify answer for '" << p.qdomain << "' from " << from.toStringWithPort() << endl;
+ // d_nq.dump();
+ }
+ }
+ }
+ {
+ DNSName domain;
+ string ip;
+ uint16_t id = 0;
+ bool purged{false};
+
+ while (g_notificationQueue.lock()->getOne(domain, ip, &id, purged)) {
+ if (!purged) {
+ ComboAddress remote(ip, 53); // default to 53
+ sendNotification(remote.sin4.sin_family == AF_INET ? sock4 : sock6, domain, remote, id);
+ } else {
+ g_log << Logger::Warning << "Notification for " << domain << " to " << ip << " failed after retries" << std::endl;
+ }
+ }
+ }
+ sleep(1);
+ }
+}
+
static void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& axfrTimeout, const uint16_t& soaRetry, const uint32_t axfrMaxRecords) { // NOLINT(readability-function-cognitive-complexity) 13400 https://github.com/PowerDNS/pdns/issues/13400 Habbie: ixfrdist: reduce complexity
setThreadName("ixfrdist/update");
std::map<DNSName, time_t> lastCheck;
if (primaryFound) {
g_notifiesReceived.lock()->insert(mdp.d_qname);
+
+ if (!found->second.notify.empty()) {
+ for (const auto& address : found->second.notify) {
+ g_log << Logger::Debug << logPrefix << "Queuing notification for " << mdp.d_qname << " to " << address.toStringWithPort() << std::endl;
+ g_notificationQueue.lock()->add(mdp.d_qname, address);
+ }
+ }
return ResponseType::EmptyNoError;
}
if (domain["max-soa-refresh"].IsDefined()) {
g_domainConfigs[domain["domain"].as<DNSName>()].maxSOARefresh = domain["max-soa-refresh"].as<uint32_t>();
}
+ if (domain["notify"].IsDefined()) {
+ auto& listset = g_domainConfigs[domain["domain"].as<DNSName>()].notify;
+ if (domain["notify"].IsScalar()) {
+ auto remote = domain["notify"].as<std::string>();
+ try {
+ listset.emplace(remote, 53);
+ }
+ catch (PDNSException& e) {
+ g_log << Logger::Error << "Unparseable IP in notify directive " << remote << ". Error: " << e.reason << endl;
+ }
+ } else if (domain["notify"].IsSequence()) {
+ for (const auto& entry: domain["notify"]) {
+ auto remote = entry.as<std::string>();
+ try {
+ listset.emplace(remote, 53);
+ }
+ catch (PDNSException& e) {
+ g_log << Logger::Error << "Unparseable IP in notify directive " << remote << ". Error: " << e.reason << endl;
+ }
+ }
+ }
+ }
g_stats.registerDomain(domain["domain"].as<DNSName>());
}
configuration->axfrTimeout,
configuration->failedSOARetry,
configuration->axfrMaxRecords);
+ std::thread communicator(communicatorThread);
vector<std::thread> tcpHandlers;
tcpHandlers.reserve(configuration->tcpInThreads);
g_log<<Logger::Debug<<"Waiting for all threads to stop"<<endl;
g_tcpHandlerCV.notify_all();
ut.join();
+ communicator.join();
for (auto &t : tcpHandlers) {
t.join();
}