]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/ixfrdist.cc
Merge remote-tracking branch 'origin/master' into ixfrdist-limit-size
[thirdparty/pdns.git] / pdns / ixfrdist.cc
index 0c516fdce96bb7bbb7a5d6afff8e5c36a45c8224..8174a1c40d8ef303310b82afd87894a8970d20cc 100644 (file)
@@ -68,7 +68,7 @@ struct convert<ComboAddress> {
       return false;
     }
     try {
-      rhs = ComboAddress(node.as<string>());
+      rhs = ComboAddress(node.as<string>(), 53);
       return true;
     } catch(const runtime_error &e) {
       return false;
@@ -99,6 +99,19 @@ struct convert<DNSName> {
 };
 } // namespace YAML
 
+struct ixfrdiff_t {
+  shared_ptr<SOARecordContent> oldSOA;
+  shared_ptr<SOARecordContent> newSOA;
+  vector<DNSRecord> removals;
+  vector<DNSRecord> additions;
+};
+
+struct ixfrinfo_t {
+  shared_ptr<SOARecordContent> soa; // The SOA of the latestAXFR
+  records_t latestAXFR;             // The most recent AXFR
+  vector<std::shared_ptr<ixfrdiff_t>> ixfrDiffs;
+};
+
 // Why a struct? This way we can add more options to a domain in the future
 struct ixfrdistdomain_t {
   set<ComboAddress> masters; // A set so we can do multiple master addresses in the future
@@ -108,7 +121,7 @@ struct ixfrdistdomain_t {
 map<DNSName, ixfrdistdomain_t> g_domainConfigs;
 
 // Map domains and their data
-std::map<DNSName, ixfrinfo_t> g_soas;
+std::map<DNSName, std::shared_ptr<ixfrinfo_t>> g_soas;
 std::mutex g_soas_mutex;
 
 // Condition variable for TCP handling
@@ -128,7 +141,7 @@ void handleSignal(int signum) {
     g_log<<Logger::Notice<<", this is the second time we were asked to stop, forcefully exiting"<<endl;
     exit(EXIT_FAILURE);
   }
-  g_log<<Logger::Notice<<", stopping"<<endl;
+  g_log<<Logger::Notice<<", stopping, this may take a few second due to in-progress transfers and cleanup. Send this signal again to forcefully stop"<<endl;
   g_exiting = true;
 }
 
@@ -193,19 +206,32 @@ static shared_ptr<SOARecordContent> getSOAFromRecords(const records_t& records)
   throw PDNSException("No SOA in supplied records");
 }
 
-static void makeIXFRDiff(const records_t& from, const records_t& to, ixfrdiff_t& diff, const shared_ptr<SOARecordContent>& fromSOA = nullptr, const shared_ptr<SOARecordContent>& toSOA = nullptr) {
-  set_difference(from.cbegin(), from.cend(), to.cbegin(), to.cend(), back_inserter(diff.removals), from.value_comp());
-  set_difference(to.cbegin(), to.cend(), from.cbegin(), from.cend(), back_inserter(diff.additions), from.value_comp());
-  diff.oldSOA = fromSOA;
+static void makeIXFRDiff(const records_t& from, const records_t& to, std::shared_ptr<ixfrdiff_t>& diff, const shared_ptr<SOARecordContent>& fromSOA = nullptr, const shared_ptr<SOARecordContent>& toSOA = nullptr) {
+  set_difference(from.cbegin(), from.cend(), to.cbegin(), to.cend(), back_inserter(diff->removals), from.value_comp());
+  set_difference(to.cbegin(), to.cend(), from.cbegin(), from.cend(), back_inserter(diff->additions), from.value_comp());
+  diff->oldSOA = fromSOA;
   if (fromSOA == nullptr) {
-    getSOAFromRecords(from);
+    diff->oldSOA = getSOAFromRecords(from);
   }
-  diff.newSOA = toSOA;
+  diff->newSOA = toSOA;
   if (toSOA == nullptr) {
-    getSOAFromRecords(to);
+    diff->newSOA = getSOAFromRecords(to);
   }
 }
 
+/* you can _never_ alter the content of the resulting shared pointer */
+static std::shared_ptr<ixfrinfo_t> getCurrentZoneInfo(const DNSName& domain)
+{
+  std::lock_guard<std::mutex> guard(g_soas_mutex);
+  return g_soas[domain];
+}
+
+static void updateCurrentZoneInfo(const DNSName& domain, std::shared_ptr<ixfrinfo_t>& newInfo)
+{
+  std::lock_guard<std::mutex> guard(g_soas_mutex);
+  g_soas[domain] = newInfo;
+}
+
 void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& axfrTimeout, const uint32_t axfrMaxRecords) {
   std::map<DNSName, time_t> lastCheck;
 
@@ -225,9 +251,10 @@ void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& a
         if (soa != nullptr) {
           loadZoneFromDisk(records, fname, domain);
         }
-        std::lock_guard<std::mutex> guard(g_soas_mutex);
-        g_soas[domain].latestAXFR = records;
-        g_soas[domain].soa = soa;
+        auto zoneInfo = std::make_shared<ixfrinfo_t>();
+        zoneInfo->latestAXFR = std::move(records);
+        zoneInfo->soa = soa;
+        updateCurrentZoneInfo(domain, zoneInfo);
       }
       if (soa != nullptr) {
         g_log<<Logger::Notice<<"Loaded zone "<<domain<<" with serial "<<soa->d_st.serial<<endl;
@@ -254,29 +281,34 @@ void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& a
     }
     time_t now = time(nullptr);
     for (const auto &domainConfig : g_domainConfigs) {
+
+      if (g_exiting) {
+        break;
+      }
+
       DNSName domain = domainConfig.first;
       shared_ptr<SOARecordContent> current_soa;
-      {
-        std::lock_guard<std::mutex> guard(g_soas_mutex);
-        if (g_soas.find(domain) != g_soas.end()) {
-          current_soa = g_soas[domain].soa;
-        }
+      const auto& zoneInfo = getCurrentZoneInfo(domain);
+      if (zoneInfo != nullptr) {
+        current_soa = zoneInfo->soa;
       }
-      if ((current_soa != nullptr && now - lastCheck[domain] < current_soa->d_st.refresh) || // Only check if we have waited `refresh` seconds
-          (current_soa == nullptr && now - lastCheck[domain] < 30))  {                       // Or if we could not get an update at all still, every 30 seconds
+
+      auto& zoneLastCheck = lastCheck[domain];
+      if ((current_soa != nullptr && now - zoneLastCheck < current_soa->d_st.refresh) || // Only check if we have waited `refresh` seconds
+          (current_soa == nullptr && now - zoneLastCheck < 30))  {                       // Or if we could not get an update at all still, every 30 seconds
         continue;
       }
 
       // TODO Keep track of 'down' masters
-      set<ComboAddress>::const_iterator it(g_domainConfigs[domain].masters.begin());
-      std::advance(it, random() % g_domainConfigs[domain].masters.size());
+      set<ComboAddress>::const_iterator it(domainConfig.second.masters.begin());
+      std::advance(it, random() % domainConfig.second.masters.size());
       ComboAddress master = *it;
 
       string dir = workdir + "/" + domain.toString();
       g_log<<Logger::Info<<"Attempting to retrieve SOA Serial update for '"<<domain<<"' from '"<<master.toStringWithPort()<<"'"<<endl;
       shared_ptr<SOARecordContent> sr;
       try {
-        lastCheck[domain] = now;
+        zoneLastCheck = now;
         auto newSerial = getSerialFromMaster(master, domain, sr); // TODO TSIG
         if(current_soa != nullptr) {
           g_log<<Logger::Info<<"Got SOA Serial for "<<domain<<" from "<<master.toStringWithPort()<<": "<< newSerial<<", had Serial: "<<current_soa->d_st.serial;
@@ -287,7 +319,7 @@ void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& a
           g_log<<Logger::Info<<", will update."<<endl;
         }
       } catch (runtime_error &e) {
-        g_log<<Logger::Warning<<"Unable to get SOA serial update for '"<<domain<<"': "<<e.what()<<endl;
+        g_log<<Logger::Warning<<"Unable to get SOA serial update for '"<<domain<<"' from master "<<master.toStringWithPort()<<": "<<e.what()<<endl;
         continue;
       }
       // Now get the full zone!
@@ -297,12 +329,12 @@ void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& a
 
       // The *new* SOA
       shared_ptr<SOARecordContent> soa;
+      records_t records;
       try {
         AXFRRetriever axfr(master, domain, tt, &local);
         uint32_t nrecords=0;
         Resolver::res_t nop;
         vector<DNSRecord> chunk;
-        records_t records;
         time_t t_start = time(nullptr);
         time_t axfr_now = time(nullptr);
         while(axfr.getChunk(nop, &chunk, (axfr_now - t_start + axfrTimeout))) {
@@ -329,27 +361,46 @@ void updateThread(const string& workdir, const uint16_t& keep, const uint16_t& a
           continue;
         }
         g_log<<Logger::Notice<<"Retrieved all zone data for "<<domain<<". Received "<<nrecords<<" records."<<endl;
+      } catch (PDNSException &e) {
+        g_log<<Logger::Warning<<"Could not retrieve AXFR for '"<<domain<<"': "<<e.reason<<endl;
+        continue;
+      } catch (runtime_error &e) {
+        g_log<<Logger::Warning<<"Could not retrieve AXFR for zone '"<<domain<<"': "<<e.what()<<endl;
+        continue;
+      }
+
+      try {
+
         writeZoneToDisk(records, domain, dir);
         g_log<<Logger::Notice<<"Wrote zonedata for "<<domain<<" with serial "<<soa->d_st.serial<<" to "<<dir<<endl;
-        {
-          std::lock_guard<std::mutex> guard(g_soas_mutex);
-          ixfrdiff_t diff;
-          if (!g_soas[domain].latestAXFR.empty()) {
-            makeIXFRDiff(g_soas[domain].latestAXFR, records, diff, g_soas[domain].soa, soa);
-            g_soas[domain].ixfrDiffs.push_back(diff);
-          }
-          // Clean up the diffs
-          while (g_soas[domain].ixfrDiffs.size() > keep) {
-            g_soas[domain].ixfrDiffs.erase(g_soas[domain].ixfrDiffs.begin());
-          }
-          g_soas[domain].latestAXFR = records;
-          g_soas[domain].soa = soa;
+
+        const auto oldZoneInfo = getCurrentZoneInfo(domain);
+        auto zoneInfo = std::make_shared<ixfrinfo_t>();
+
+        if (oldZoneInfo && !oldZoneInfo->latestAXFR.empty()) {
+          auto diff = std::make_shared<ixfrdiff_t>();
+          zoneInfo->ixfrDiffs = oldZoneInfo->ixfrDiffs;
+          g_log<<Logger::Debug<<"Calculating diff for "<<domain<<endl;
+          makeIXFRDiff(oldZoneInfo->latestAXFR, records, diff, oldZoneInfo->soa, soa);
+          g_log<<Logger::Debug<<"Calculated diff for "<<domain<<", we had "<<diff->removals.size()<<" removals and "<<diff->additions.size()<<" additions"<<endl;
+          zoneInfo->ixfrDiffs.push_back(std::move(diff));
         }
+
+        // Clean up the diffs
+        while (zoneInfo->ixfrDiffs.size() > keep) {
+          zoneInfo->ixfrDiffs.erase(zoneInfo->ixfrDiffs.begin());
+        }
+
+        g_log<<Logger::Debug<<"Zone "<<domain<<" previously contained "<<(oldZoneInfo ? oldZoneInfo->latestAXFR.size() : 0)<<" entries, "<<records.size()<<" now"<<endl;
+        zoneInfo->latestAXFR = std::move(records);
+        zoneInfo->soa = soa;
+        updateCurrentZoneInfo(domain, zoneInfo);
       } catch (PDNSException &e) {
-        g_log<<Logger::Warning<<"Could not retrieve AXFR for '"<<domain<<"': "<<e.reason<<endl;
+        g_log<<Logger::Warning<<"Could not save zone '"<<domain<<"' to disk: "<<e.reason<<endl;
       } catch (runtime_error &e) {
         g_log<<Logger::Warning<<"Could not save zone '"<<domain<<"' to disk: "<<e.what()<<endl;
       }
+
       // Now clean up the directory
       cleanUpDomain(domain, keep, workdir);
     } /* for (const auto &domain : domains) */
@@ -371,12 +422,12 @@ bool checkQuery(const MOADNSParser& mdp, const ComboAddress& saddr, const bool u
   }
 
   {
-    std::lock_guard<std::mutex> guard(g_soas_mutex);
     if (g_domainConfigs.find(mdp.d_qname) == g_domainConfigs.end()) {
       info_msg.push_back("Domain name '" + mdp.d_qname.toLogString() + "' is not configured for distribution");
     }
 
-    if (g_soas.find(mdp.d_qname) == g_soas.end()) {
+    const auto zoneInfo = getCurrentZoneInfo(mdp.d_qname);
+    if (zoneInfo == nullptr) {
       info_msg.push_back("Domain has not been transferred yet");
     }
   }
@@ -404,16 +455,19 @@ bool checkQuery(const MOADNSParser& mdp, const ComboAddress& saddr, const bool u
  * query. QNAME is read from mdp.
  */
 bool makeSOAPacket(const MOADNSParser& mdp, vector<uint8_t>& packet) {
+
+  auto zoneInfo = getCurrentZoneInfo(mdp.d_qname);
+  if (zoneInfo == nullptr) {
+    return false;
+  }
+
   DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
   pw.getHeader()->id = mdp.d_header.id;
   pw.getHeader()->rd = mdp.d_header.rd;
   pw.getHeader()->qr = 1;
 
   pw.startRecord(mdp.d_qname, QType::SOA);
-  {
-    std::lock_guard<std::mutex> guard(g_soas_mutex);
-    g_soas[mdp.d_qname].soa->toPacket(pw);
-  }
+  zoneInfo->soa->toPacket(pw);
   pw.commit();
 
   return true;
@@ -434,17 +488,22 @@ vector<uint8_t> getSOAPacket(const MOADNSParser& mdp, const shared_ptr<SOARecord
 }
 
 bool makeAXFRPackets(const MOADNSParser& mdp, vector<vector<uint8_t>>& packets) {
-  shared_ptr<SOARecordContent> soa;
-  records_t records;
-  {
-    // Make copies of what we have
-    std::lock_guard<std::mutex> guard(g_soas_mutex);
-    soa = g_soas[mdp.d_qname].soa;
-    records = g_soas[mdp.d_qname].latestAXFR;
+  /* we get a shared pointer of the zone info that we can't modify, ever.
+     A newer one may arise in the meantime, but this one will stay valid
+     until we release it.
+  */
+  auto zoneInfo = getCurrentZoneInfo(mdp.d_qname);
+  if (zoneInfo == nullptr) {
+    return false;
   }
 
+  shared_ptr<SOARecordContent> soa = zoneInfo->soa;
+  const records_t& records = zoneInfo->latestAXFR;
+  packets.reserve(packets.size() + /* SOAs */ 2 + records.size());
+
   // Initial SOA
-  packets.push_back(getSOAPacket(mdp, soa));
+  const auto soaPacket = getSOAPacket(mdp, soa);
+  packets.push_back(soaPacket);
 
   for (auto const &record : records) {
     if (record.d_type == QType::SOA) {
@@ -462,16 +521,18 @@ bool makeAXFRPackets(const MOADNSParser& mdp, vector<vector<uint8_t>>& packets)
   }
 
   // Final SOA
-  packets.push_back(getSOAPacket(mdp, soa));
+  packets.push_back(soaPacket);
 
   return true;
 }
 
 void makeXFRPacketsFromDNSRecords(const MOADNSParser& mdp, const vector<DNSRecord>& records, vector<vector<uint8_t>>& packets) {
+
   for(const auto& r : records) {
     if (r.d_type == QType::SOA) {
       continue;
     }
+
     vector<uint8_t> packet;
     DNSPacketWriter pw(packet, mdp.d_qname, mdp.d_qtype);
     pw.getHeader()->id = mdp.d_header.id;
@@ -488,16 +549,20 @@ void makeXFRPacketsFromDNSRecords(const MOADNSParser& mdp, const vector<DNSRecor
  * creates a SOA or AXFR packet when required by the RFC.
  */
 bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>& clientSOA, vector<vector<uint8_t>>& packets) {
-  // Get the new SOA only once, so it will not change under our noses from the
-  // updateThread.
-  vector<ixfrdiff_t> toSend;
-  uint32_t ourLatestSerial;
-  {
-    std::lock_guard<std::mutex> guard(g_soas_mutex);
-    ourLatestSerial = g_soas[mdp.d_qname].soa->d_st.serial;
+  vector<std::shared_ptr<ixfrdiff_t>> toSend;
+
+  /* we get a shared pointer of the zone info that we can't modify, ever.
+     A newer one may arise in the meantime, but this one will stay valid
+     until we release it.
+  */
+  auto zoneInfo = getCurrentZoneInfo(mdp.d_qname);
+  if (zoneInfo == nullptr) {
+    return false;
   }
 
-  if (rfc1982LessThan(ourLatestSerial, clientSOA->d_st.serial) || ourLatestSerial == clientSOA->d_st.serial){
+  uint32_t ourLatestSerial = zoneInfo->soa->d_st.serial;
+
+  if (rfc1982LessThan(ourLatestSerial, clientSOA->d_st.serial) || ourLatestSerial == clientSOA->d_st.serial) {
     /* RFC 1995 Section 2
      *    If an IXFR query with the same or newer version number than that of
      *    the server is received, it is replied to with a single SOA record of
@@ -511,21 +576,18 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>
     return ret;
   }
 
-  {
-    // as we use push_back in the updater, we know the vector is sorted as oldest first
-    bool shouldAdd = false;
-    // Get all relevant IXFR differences
-    std::lock_guard<std::mutex> guard(g_soas_mutex);
-    for (const auto& diff : g_soas[mdp.d_qname].ixfrDiffs) {
-      if (shouldAdd) {
-        toSend.push_back(diff);
-        continue;
-      }
-      if (diff.oldSOA->d_st.serial == clientSOA->d_st.serial) {
-        toSend.push_back(diff);
-        // Add all consecutive diffs
-        shouldAdd = true;
-      }
+  // as we use push_back in the updater, we know the vector is sorted as oldest first
+  bool shouldAdd = false;
+  // Get all relevant IXFR differences
+  for (const auto& diff : zoneInfo->ixfrDiffs) {
+    if (shouldAdd) {
+      toSend.push_back(diff);
+      continue;
+    }
+    if (diff->oldSOA->d_st.serial == clientSOA->d_st.serial) {
+      toSend.push_back(diff);
+      // Add all consecutive diffs
+      shouldAdd = true;
     }
   }
 
@@ -543,12 +605,14 @@ bool makeIXFRPackets(const MOADNSParser& mdp, const shared_ptr<SOARecordContent>
      * ... added records ...
      * SOA new_serial
      */
-    packets.push_back(getSOAPacket(mdp, diff.newSOA));
-    packets.push_back(getSOAPacket(mdp, diff.oldSOA));
-    makeXFRPacketsFromDNSRecords(mdp, diff.removals, packets);
-    packets.push_back(getSOAPacket(mdp, diff.newSOA));
-    makeXFRPacketsFromDNSRecords(mdp, diff.additions, packets);
-    packets.push_back(getSOAPacket(mdp, diff.newSOA));
+    packets.reserve(packets.size() + /* SOAs */ 4 + diff->removals.size() + diff->additions.size());
+
+    packets.push_back(getSOAPacket(mdp, diff->newSOA));
+    packets.push_back(getSOAPacket(mdp, diff->oldSOA));
+    makeXFRPacketsFromDNSRecords(mdp, diff->removals, packets);
+    packets.push_back(getSOAPacket(mdp, diff->newSOA));
+    makeXFRPacketsFromDNSRecords(mdp, diff->additions, packets);
+    packets.push_back(getSOAPacket(mdp, diff->newSOA));
   }
 
   return true;
@@ -800,14 +864,14 @@ bool parseAndCheckConfig(const string& configpath, YAML::Node& config) {
       g_log<<Logger::Error<<"Unable to read 'axfr-timeout' value: "<<e.what()<<endl;
     }
   } else {
-    config["axfr-timeout"] = 10;
+    config["axfr-timeout"] = 20;
   }
 
   if (config["tcp-in-threads"]) {
     try {
       config["tcp-in-threads"].as<uint16_t>();
     } catch (const runtime_error &e) {
-      g_log<<Logger::Error<<"Unable to read 'tcp-in-thread' value: "<<e.what()<<endl;
+      g_log<<Logger::Error<<"Unable to read 'tcp-in-threads' value: "<<e.what()<<endl;
     }
   } else {
     config["tcp-in-threads"] = 10;
@@ -1064,7 +1128,6 @@ int main(int argc, char** argv) {
   // It all starts here
   signal(SIGTERM, handleSignal);
   signal(SIGINT, handleSignal);
-  signal(SIGSTOP, handleSignal);
   signal(SIGPIPE, SIG_IGN);
 
   // Init the things we need
@@ -1089,7 +1152,7 @@ int main(int argc, char** argv) {
     gettimeofday(&now, 0);
     fdm->run(&now);
     if (g_exiting) {
-      g_log<<Logger::Notice<<"Shutting down!"<<endl;
+      g_log<<Logger::Debug<<"Closing listening sockets"<<endl;
       for (const int& fd : allSockets) {
         try {
           closesocket(fd);
@@ -1100,6 +1163,7 @@ int main(int argc, char** argv) {
       break;
     }
   }
+  g_log<<Logger::Debug<<"Waiting for al threads to stop"<<endl;
   g_tcpHandlerCV.notify_all();
   ut.join();
   for (auto &t : tcpHandlers) {