]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Better handling of multiple carbon servers
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 16 Jan 2023 17:20:28 +0000 (18:20 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 16 Jan 2023 17:20:28 +0000 (18:20 +0100)
.not-formatted
pdns/dnsdist-carbon.cc
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdistdist/dnsdist-carbon.hh

index e0055fbfd2e86aad480317d7d8676b1c3e4c7a27..51a437ae3c833257afe31df9205f0298b191981e 100644 (file)
@@ -48,7 +48,6 @@
 ./pdns/dnsdemog.cc
 ./pdns/dnsdist-cache.cc
 ./pdns/dnsdist-cache.hh
-./pdns/dnsdist-carbon.cc
 ./pdns/dnsdist-console.cc
 ./pdns/dnsdist-console.hh
 ./pdns/dnsdist-dynblocks.hh
index ef7a8560823864364425a4c1d58f095c1a2d8231..6df6d9982b3ee990c8c0bcdb1bbf1dac0388c6c2 100644 (file)
 #include "sstuff.hh"
 #include "threadname.hh"
 
-GlobalStateHolder<vector<CarbonConfig> > g_carbon;
+namespace dnsdist
+{
+
+LockGuarded<Carbon::Config> Carbon::s_config;
 
-void carbonDumpThread()
+static bool doOneCarbonExport(const Carbon::Endpoint& endpoint)
 {
-  try
-  {
-    setThreadName("dnsdist/carbon");
-    auto localCarbon = g_carbon.getLocal();
-    for(int numloops=0;;++numloops) {
-      if(localCarbon->empty()) {
-        sleep(1);
-        continue;
+  const auto& server = endpoint.server;
+  const std::string& namespace_name = endpoint.namespace_name;
+  const std::string& hostname = endpoint.ourname;
+  const std::string& instance_name = endpoint.instance_name;
+
+  try {
+    Socket s(server.sin4.sin_family, SOCK_STREAM);
+    s.setNonBlocking();
+    s.connect(server); // we do the connect so the attempt happens while we gather stats
+    ostringstream str;
+
+    const time_t now = time(nullptr);
+
+    for (const auto& e : g_stats.entries) {
+      str << namespace_name << "." << hostname << "." << instance_name << "." << e.first << ' ';
+      if (const auto& val = boost::get<pdns::stat_t*>(&e.second)) {
+        str << (*val)->load();
+      }
+      else if (const auto& adval = boost::get<pdns::stat_t_trait<double>*>(&e.second)) {
+        str << (*adval)->load();
+      }
+      else if (const auto& dval = boost::get<double*>(&e.second)) {
+        str << **dval;
       }
-      /* this is wrong, we use the interval of the first server
-         for every single one of them */
-      if(numloops) {
-        const unsigned int interval = localCarbon->at(0).interval;
-        sleep(interval);
+      else if (const auto& func = boost::get<DNSDistStats::statfunction_t>(&e.second)) {
+        str << (*func)(e.first);
       }
+      str << ' ' << now << "\r\n";
+    }
 
-      for (const auto& conf : *localCarbon) {
-        const auto& server = conf.server;
-        const std::string& namespace_name = conf.namespace_name;
-        std::string hostname = conf.ourname;
-        if (hostname.empty()) {
-          try {
-            hostname = getCarbonHostName();
-          }
-          catch(const std::exception& e) {
-            throw std::runtime_error(std::string("The 'ourname' setting in 'carbonServer()' has not been set and we are unable to determine the system's hostname: ") + e.what());
-          }
-        }
-        const std::string& instance_name = conf.instance_name;
+    auto states = g_dstates.getLocal();
+    for (const auto& state : *states) {
+      string serverName = state->getName().empty() ? state->d_config.remote.toStringWithPort() : state->getName();
+      boost::replace_all(serverName, ".", "_");
+      const string base = namespace_name + "." + hostname + "." + instance_name + ".servers." + serverName + ".";
+      str << base << "queries" << ' ' << state->queries.load() << " " << now << "\r\n";
+      str << base << "responses" << ' ' << state->responses.load() << " " << now << "\r\n";
+      str << base << "drops" << ' ' << state->reuseds.load() << " " << now << "\r\n";
+      str << base << "latency" << ' ' << (state->d_config.availability != DownstreamState::Availability::Down ? state->latencyUsec / 1000.0 : 0) << " " << now << "\r\n";
+      str << base << "senderrors" << ' ' << state->sendErrors.load() << " " << now << "\r\n";
+      str << base << "outstanding" << ' ' << state->outstanding.load() << " " << now << "\r\n";
+      str << base << "tcpdiedsendingquery" << ' ' << state->tcpDiedSendingQuery.load() << " " << now << "\r\n";
+      str << base << "tcpdiedreaddingresponse" << ' ' << state->tcpDiedReadingResponse.load() << " " << now << "\r\n";
+      str << base << "tcpgaveup" << ' ' << state->tcpGaveUp.load() << " " << now << "\r\n";
+      str << base << "tcpreadimeouts" << ' ' << state->tcpReadTimeouts.load() << " " << now << "\r\n";
+      str << base << "tcpwritetimeouts" << ' ' << state->tcpWriteTimeouts.load() << " " << now << "\r\n";
+      str << base << "tcpconnecttimeouts" << ' ' << state->tcpConnectTimeouts.load() << " " << now << "\r\n";
+      str << base << "tcpcurrentconnections" << ' ' << state->tcpCurrentConnections.load() << " " << now << "\r\n";
+      str << base << "tcpmaxconcurrentconnections" << ' ' << state->tcpMaxConcurrentConnections.load() << " " << now << "\r\n";
+      str << base << "tcpnewconnections" << ' ' << state->tcpNewConnections.load() << " " << now << "\r\n";
+      str << base << "tcpreusedconnections" << ' ' << state->tcpReusedConnections.load() << " " << now << "\r\n";
+      str << base << "tlsresumptions" << ' ' << state->tlsResumptions.load() << " " << now << "\r\n";
+      str << base << "tcpavgqueriesperconnection" << ' ' << state->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n";
+      str << base << "tcpavgconnectionduration" << ' ' << state->tcpAvgConnectionDuration.load() << " " << now << "\r\n";
+      str << base << "tcptoomanyconcurrentconnections" << ' ' << state->tcpTooManyConcurrentConnections.load() << " " << now << "\r\n";
+    }
 
-        try {
-          Socket s(server.sin4.sin_family, SOCK_STREAM);
-          s.setNonBlocking();
-          s.connect(server);  // we do the connect so the attempt happens while we gather stats
-          ostringstream str;
-          time_t now=time(0);
-          for(const auto& e : g_stats.entries) {
-            str<<namespace_name<<"."<<hostname<<"."<<instance_name<<"."<<e.first<<' ';
-            if (const auto& val = boost::get<pdns::stat_t*>(&e.second)) {
-              str<<(*val)->load();
-            }
-            else if(const auto& adval = boost::get<pdns::stat_t_trait<double>*>(&e.second)) {
-              str<<(*adval)->load();
-            }
-            else if (const auto& dval = boost::get<double*>(&e.second)) {
-              str<<**dval;
-            }
-            else if (const auto& func = boost::get<DNSDistStats::statfunction_t>(&e.second)) {
-              str<<(*func)(e.first);
-            }
-            str<<' '<<now<<"\r\n";
-          }
-          auto states = g_dstates.getLocal();
-          for(const auto& state : *states) {
-            string serverName = state->getName().empty() ? state->d_config.remote.toStringWithPort() : state->getName();
-            boost::replace_all(serverName, ".", "_");
-            const string base = namespace_name + "." + hostname + "." + instance_name + ".servers." + serverName + ".";
-            str<<base<<"queries" << ' ' << state->queries.load() << " " << now << "\r\n";
-            str<<base<<"responses" << ' ' << state->responses.load() << " " << now << "\r\n";
-            str<<base<<"drops" << ' ' << state->reuseds.load() << " " << now << "\r\n";
-            str<<base<<"latency" << ' ' << (state->d_config.availability != DownstreamState::Availability::Down ? state->latencyUsec/1000.0 : 0) << " " << now << "\r\n";
-            str<<base<<"senderrors" << ' ' << state->sendErrors.load() << " " << now << "\r\n";
-            str<<base<<"outstanding" << ' ' << state->outstanding.load() << " " << now << "\r\n";
-            str<<base<<"tcpdiedsendingquery" << ' '<< state->tcpDiedSendingQuery.load() << " " << now << "\r\n";
-            str<<base<<"tcpdiedreaddingresponse" << ' '<< state->tcpDiedReadingResponse.load() << " " << now << "\r\n";
-            str<<base<<"tcpgaveup" << ' '<< state->tcpGaveUp.load() << " " << now << "\r\n";
-            str<<base<<"tcpreadimeouts" << ' '<< state->tcpReadTimeouts.load() << " " << now << "\r\n";
-            str<<base<<"tcpwritetimeouts" << ' '<< state->tcpWriteTimeouts.load() << " " << now << "\r\n";
-            str<<base<<"tcpconnecttimeouts" << ' '<< state->tcpConnectTimeouts.load() << " " << now << "\r\n";
-            str<<base<<"tcpcurrentconnections" << ' '<< state->tcpCurrentConnections.load() << " " << now << "\r\n";
-            str<<base<<"tcpmaxconcurrentconnections" << ' '<< state->tcpMaxConcurrentConnections.load() << " " << now << "\r\n";
-            str<<base<<"tcpnewconnections" << ' '<< state->tcpNewConnections.load() << " " << now << "\r\n";
-            str<<base<<"tcpreusedconnections" << ' '<< state->tcpReusedConnections.load() << " " << now << "\r\n";
-            str<<base<<"tlsresumptions" << ' '<< state->tlsResumptions.load() << " " << now << "\r\n";
-            str<<base<<"tcpavgqueriesperconnection" << ' '<< state->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n";
-            str<<base<<"tcpavgconnectionduration" << ' '<< state->tcpAvgConnectionDuration.load() << " " << now << "\r\n";
-            str<<base<<"tcptoomanyconcurrentconnections" << ' '<< state->tcpTooManyConcurrentConnections.load() << " " << now << "\r\n";
-          }
+    std::map<std::string, uint64_t> frontendDuplicates;
+    for (const auto& front : g_frontends) {
+      if (front->udpFD == -1 && front->tcpFD == -1) {
+        continue;
+      }
 
-          std::map<std::string,uint64_t> frontendDuplicates;
-          for(const auto& front : g_frontends) {
-            if (front->udpFD == -1 && front->tcpFD == -1)
-              continue;
+      string frontName = front->local.toStringWithPort() + (front->udpFD >= 0 ? "_udp" : "_tcp");
+      boost::replace_all(frontName, ".", "_");
+      auto dupPair = frontendDuplicates.insert({frontName, 1});
+      if (!dupPair.second) {
+        frontName = frontName + "_" + std::to_string(dupPair.first->second);
+        ++(dupPair.first->second);
+      }
 
-            string frontName = front->local.toStringWithPort() + (front->udpFD >= 0 ? "_udp" : "_tcp");
-            boost::replace_all(frontName, ".", "_");
-            auto dupPair = frontendDuplicates.insert({frontName, 1});
-            if (!dupPair.second) {
-              frontName = frontName + "_" + std::to_string(dupPair.first->second);
-              ++(dupPair.first->second);
-            }
+      const string base = namespace_name + "." + hostname + "." + instance_name + ".frontends." + frontName + ".";
+      str << base << "queries" << ' ' << front->queries.load() << " " << now << "\r\n";
+      str << base << "responses" << ' ' << front->responses.load() << " " << now << "\r\n";
+      str << base << "tcpdiedreadingquery" << ' ' << front->tcpDiedReadingQuery.load() << " " << now << "\r\n";
+      str << base << "tcpdiedsendingresponse" << ' ' << front->tcpDiedSendingResponse.load() << " " << now << "\r\n";
+      str << base << "tcpgaveup" << ' ' << front->tcpGaveUp.load() << " " << now << "\r\n";
+      str << base << "tcpclientimeouts" << ' ' << front->tcpClientTimeouts.load() << " " << now << "\r\n";
+      str << base << "tcpdownstreamtimeouts" << ' ' << front->tcpDownstreamTimeouts.load() << " " << now << "\r\n";
+      str << base << "tcpcurrentconnections" << ' ' << front->tcpCurrentConnections.load() << " " << now << "\r\n";
+      str << base << "tcpmaxconcurrentconnections" << ' ' << front->tcpMaxConcurrentConnections.load() << " " << now << "\r\n";
+      str << base << "tcpavgqueriesperconnection" << ' ' << front->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n";
+      str << base << "tcpavgconnectionduration" << ' ' << front->tcpAvgConnectionDuration.load() << " " << now << "\r\n";
+      str << base << "tls10-queries" << ' ' << front->tls10queries.load() << " " << now << "\r\n";
+      str << base << "tls11-queries" << ' ' << front->tls11queries.load() << " " << now << "\r\n";
+      str << base << "tls12-queries" << ' ' << front->tls12queries.load() << " " << now << "\r\n";
+      str << base << "tls13-queries" << ' ' << front->tls13queries.load() << " " << now << "\r\n";
+      str << base << "tls-unknown-queries" << ' ' << front->tlsUnknownqueries.load() << " " << now << "\r\n";
+      str << base << "tlsnewsessions" << ' ' << front->tlsNewSessions.load() << " " << now << "\r\n";
+      str << base << "tlsresumptions" << ' ' << front->tlsResumptions.load() << " " << now << "\r\n";
+      str << base << "tlsunknownticketkeys" << ' ' << front->tlsUnknownTicketKey.load() << " " << now << "\r\n";
+      str << base << "tlsinactiveticketkeys" << ' ' << front->tlsInactiveTicketKey.load() << " " << now << "\r\n";
 
-            const string base = namespace_name + "." + hostname + "." + instance_name + ".frontends." + frontName + ".";
-            str<<base<<"queries" << ' ' << front->queries.load() << " " << now << "\r\n";
-            str<<base<<"responses" << ' ' << front->responses.load() << " " << now << "\r\n";
-            str<<base<<"tcpdiedreadingquery" << ' '<< front->tcpDiedReadingQuery.load() << " " << now << "\r\n";
-            str<<base<<"tcpdiedsendingresponse" << ' '<< front->tcpDiedSendingResponse.load() << " " << now << "\r\n";
-            str<<base<<"tcpgaveup" << ' '<< front->tcpGaveUp.load() << " " << now << "\r\n";
-            str<<base<<"tcpclientimeouts" << ' '<< front->tcpClientTimeouts.load() << " " << now << "\r\n";
-            str<<base<<"tcpdownstreamtimeouts" << ' '<< front->tcpDownstreamTimeouts.load() << " " << now << "\r\n";
-            str<<base<<"tcpcurrentconnections" << ' '<< front->tcpCurrentConnections.load() << " " << now << "\r\n";
-            str<<base<<"tcpmaxconcurrentconnections" << ' '<< front->tcpMaxConcurrentConnections.load() << " " << now << "\r\n";
-            str<<base<<"tcpavgqueriesperconnection" << ' '<< front->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n";
-            str<<base<<"tcpavgconnectionduration" << ' '<< front->tcpAvgConnectionDuration.load() << " " << now << "\r\n";
-            str<<base<<"tls10-queries" << ' ' << front->tls10queries.load() << " " << now << "\r\n";
-            str<<base<<"tls11-queries" << ' ' << front->tls11queries.load() << " " << now << "\r\n";
-            str<<base<<"tls12-queries" << ' ' << front->tls12queries.load() << " " << now << "\r\n";
-            str<<base<<"tls13-queries" << ' ' << front->tls13queries.load() << " " << now << "\r\n";
-            str<<base<<"tls-unknown-queries" << ' ' << front->tlsUnknownqueries.load() << " " << now << "\r\n";
-            str<<base<<"tlsnewsessions" << ' ' << front->tlsNewSessions.load() << " " << now << "\r\n";
-            str<<base<<"tlsresumptions" << ' ' << front->tlsResumptions.load() << " " << now << "\r\n";
-            str<<base<<"tlsunknownticketkeys" << ' ' << front->tlsUnknownTicketKey.load() << " " << now << "\r\n";
-            str<<base<<"tlsinactiveticketkeys" << ' ' << front->tlsInactiveTicketKey.load() << " " << now << "\r\n";
-            const TLSErrorCounters* errorCounters = nullptr;
-            if (front->tlsFrontend != nullptr) {
-              errorCounters = &front->tlsFrontend->d_tlsCounters;
-            }
-            else if (front->dohFrontend != nullptr) {
-              errorCounters = &front->dohFrontend->d_tlsCounters;
-            }
-            if (errorCounters != nullptr) {
-              str<<base<<"tlsdhkeytoosmall" << ' ' << errorCounters->d_dhKeyTooSmall << " " << now << "\r\n";
-              str<<base<<"tlsinappropriatefallback" << ' ' << errorCounters->d_inappropriateFallBack << " " << now << "\r\n";
-              str<<base<<"tlsnosharedcipher" << ' ' << errorCounters->d_noSharedCipher << " " << now << "\r\n";
-              str<<base<<"tlsunknownciphertype" << ' ' << errorCounters->d_unknownCipherType << " " << now << "\r\n";
-              str<<base<<"tlsunknownkeyexchangetype" << ' ' << errorCounters->d_unknownKeyExchangeType << " " << now << "\r\n";
-              str<<base<<"tlsunknownprotocol" << ' ' << errorCounters->d_unknownProtocol << " " << now << "\r\n";
-              str<<base<<"tlsunsupportedec" << ' ' << errorCounters->d_unsupportedEC << " " << now << "\r\n";
-              str<<base<<"tlsunsupportedprotocol" << ' ' << errorCounters->d_unsupportedProtocol << " " << now << "\r\n";
-            }
-          }
+      const TLSErrorCounters* errorCounters = nullptr;
+      if (front->tlsFrontend != nullptr) {
+        errorCounters = &front->tlsFrontend->d_tlsCounters;
+      }
+      else if (front->dohFrontend != nullptr) {
+        errorCounters = &front->dohFrontend->d_tlsCounters;
+      }
+      if (errorCounters != nullptr) {
+        str << base << "tlsdhkeytoosmall" << ' ' << errorCounters->d_dhKeyTooSmall << " " << now << "\r\n";
+        str << base << "tlsinappropriatefallback" << ' ' << errorCounters->d_inappropriateFallBack << " " << now << "\r\n";
+        str << base << "tlsnosharedcipher" << ' ' << errorCounters->d_noSharedCipher << " " << now << "\r\n";
+        str << base << "tlsunknownciphertype" << ' ' << errorCounters->d_unknownCipherType << " " << now << "\r\n";
+        str << base << "tlsunknownkeyexchangetype" << ' ' << errorCounters->d_unknownKeyExchangeType << " " << now << "\r\n";
+        str << base << "tlsunknownprotocol" << ' ' << errorCounters->d_unknownProtocol << " " << now << "\r\n";
+        str << base << "tlsunsupportedec" << ' ' << errorCounters->d_unsupportedEC << " " << now << "\r\n";
+        str << base << "tlsunsupportedprotocol" << ' ' << errorCounters->d_unsupportedProtocol << " " << now << "\r\n";
+      }
+    }
 
-          auto localPools = g_pools.getLocal();
-          for (const auto& entry : *localPools) {
-            string poolName = entry.first;
-            boost::replace_all(poolName, ".", "_");
-            if (poolName.empty()) {
-              poolName = "_default_";
-            }
-            const string base = namespace_name + "." + hostname + "." + instance_name + ".pools." + poolName + ".";
-            const std::shared_ptr<ServerPool> pool = entry.second;
-            str<<base<<"servers" << " " << pool->countServers(false) << " " << now << "\r\n";
-            str<<base<<"servers-up" << " " << pool->countServers(true) << " " << now << "\r\n";
-            if (pool->packetCache != nullptr) {
-              const auto& cache = pool->packetCache;
-              str<<base<<"cache-size" << " " << cache->getMaxEntries() << " " << now << "\r\n";
-              str<<base<<"cache-entries" << " " << cache->getEntriesCount() << " " << now << "\r\n";
-              str<<base<<"cache-hits" << " " << cache->getHits() << " " << now << "\r\n";
-              str<<base<<"cache-misses" << " " << cache->getMisses() << " " << now << "\r\n";
-              str<<base<<"cache-deferred-inserts" << " " << cache->getDeferredInserts() << " " << now << "\r\n";
-              str<<base<<"cache-deferred-lookups" << " " << cache->getDeferredLookups() << " " << now << "\r\n";
-              str<<base<<"cache-lookup-collisions" << " " << cache->getLookupCollisions() << " " << now << "\r\n";
-              str<<base<<"cache-insert-collisions" << " " << cache->getInsertCollisions() << " " << now << "\r\n";
-              str<<base<<"cache-ttl-too-shorts" << " " << cache->getTTLTooShorts() << " " << now << "\r\n";
-              str<<base<<"cache-cleanup-count" << " " << cache->getCleanupCount() << " " << now << "\r\n";
-            }
-          }
+    auto localPools = g_pools.getLocal();
+    for (const auto& entry : *localPools) {
+      string poolName = entry.first;
+      boost::replace_all(poolName, ".", "_");
+      if (poolName.empty()) {
+        poolName = "_default_";
+      }
+      const string base = namespace_name + "." + hostname + "." + instance_name + ".pools." + poolName + ".";
+      const std::shared_ptr<ServerPool> pool = entry.second;
+      str << base << "servers"
+          << " " << pool->countServers(false) << " " << now << "\r\n";
+      str << base << "servers-up"
+          << " " << pool->countServers(true) << " " << now << "\r\n";
+      if (pool->packetCache != nullptr) {
+        const auto& cache = pool->packetCache;
+        str << base << "cache-size"
+            << " " << cache->getMaxEntries() << " " << now << "\r\n";
+        str << base << "cache-entries"
+            << " " << cache->getEntriesCount() << " " << now << "\r\n";
+        str << base << "cache-hits"
+            << " " << cache->getHits() << " " << now << "\r\n";
+        str << base << "cache-misses"
+            << " " << cache->getMisses() << " " << now << "\r\n";
+        str << base << "cache-deferred-inserts"
+            << " " << cache->getDeferredInserts() << " " << now << "\r\n";
+        str << base << "cache-deferred-lookups"
+            << " " << cache->getDeferredLookups() << " " << now << "\r\n";
+        str << base << "cache-lookup-collisions"
+            << " " << cache->getLookupCollisions() << " " << now << "\r\n";
+        str << base << "cache-insert-collisions"
+            << " " << cache->getInsertCollisions() << " " << now << "\r\n";
+        str << base << "cache-ttl-too-shorts"
+            << " " << cache->getTTLTooShorts() << " " << now << "\r\n";
+        str << base << "cache-cleanup-count"
+            << " " << cache->getCleanupCount() << " " << now << "\r\n";
+      }
+    }
 
 #ifdef HAVE_DNS_OVER_HTTPS
-          {
-            std::map<std::string,uint64_t> dohFrontendDuplicates;
-            const string base = "dnsdist." + hostname + ".main.doh.";
-            for(const auto& doh : g_dohlocals) {
-              string name = doh->d_local.toStringWithPort();
-              boost::replace_all(name, ".", "_");
-              boost::replace_all(name, ":", "_");
-              boost::replace_all(name, "[", "_");
-              boost::replace_all(name, "]", "_");
+    {
+      std::map<std::string, uint64_t> dohFrontendDuplicates;
+      const string base = "dnsdist." + hostname + ".main.doh.";
+      for (const auto& doh : g_dohlocals) {
+        string name = doh->d_local.toStringWithPort();
+        boost::replace_all(name, ".", "_");
+        boost::replace_all(name, ":", "_");
+        boost::replace_all(name, "[", "_");
+        boost::replace_all(name, "]", "_");
 
-              auto dupPair = dohFrontendDuplicates.insert({name, 1});
-              if (!dupPair.second) {
-                name = name + "_" + std::to_string(dupPair.first->second);
-                ++(dupPair.first->second);
-              }
+        auto dupPair = dohFrontendDuplicates.insert({name, 1});
+        if (!dupPair.second) {
+          name = name + "_" + std::to_string(dupPair.first->second);
+          ++(dupPair.first->second);
+        }
 
-              vector<pair<const char*, const pdns::stat_t&>> v{
-                {"http-connects", doh->d_httpconnects},
-                {"http1-queries", doh->d_http1Stats.d_nbQueries},
-                {"http2-queries", doh->d_http2Stats.d_nbQueries},
-                {"http1-200-responses", doh->d_http1Stats.d_nb200Responses},
-                {"http2-200-responses", doh->d_http2Stats.d_nb200Responses},
-                {"http1-400-responses", doh->d_http1Stats.d_nb400Responses},
-                {"http2-400-responses", doh->d_http2Stats.d_nb400Responses},
-                {"http1-403-responses", doh->d_http1Stats.d_nb403Responses},
-                {"http2-403-responses", doh->d_http2Stats.d_nb403Responses},
-                {"http1-500-responses", doh->d_http1Stats.d_nb500Responses},
-                {"http2-500-responses", doh->d_http2Stats.d_nb500Responses},
-                {"http1-502-responses", doh->d_http1Stats.d_nb502Responses},
-                {"http2-502-responses", doh->d_http2Stats.d_nb502Responses},
-                {"http1-other-responses", doh->d_http1Stats.d_nbOtherResponses},
-                {"http2-other-responses", doh->d_http2Stats.d_nbOtherResponses},
-                {"get-queries", doh->d_getqueries},
-                {"post-queries", doh->d_postqueries},
-                {"bad-requests", doh->d_badrequests},
-                {"error-responses", doh->d_errorresponses},
-                {"redirect-responses", doh->d_redirectresponses},
-                {"valid-responses", doh->d_validresponses}
-              };
+        vector<pair<const char*, const pdns::stat_t&>> v{
+          {"http-connects", doh->d_httpconnects},
+          {"http1-queries", doh->d_http1Stats.d_nbQueries},
+          {"http2-queries", doh->d_http2Stats.d_nbQueries},
+          {"http1-200-responses", doh->d_http1Stats.d_nb200Responses},
+          {"http2-200-responses", doh->d_http2Stats.d_nb200Responses},
+          {"http1-400-responses", doh->d_http1Stats.d_nb400Responses},
+          {"http2-400-responses", doh->d_http2Stats.d_nb400Responses},
+          {"http1-403-responses", doh->d_http1Stats.d_nb403Responses},
+          {"http2-403-responses", doh->d_http2Stats.d_nb403Responses},
+          {"http1-500-responses", doh->d_http1Stats.d_nb500Responses},
+          {"http2-500-responses", doh->d_http2Stats.d_nb500Responses},
+          {"http1-502-responses", doh->d_http1Stats.d_nb502Responses},
+          {"http2-502-responses", doh->d_http2Stats.d_nb502Responses},
+          {"http1-other-responses", doh->d_http1Stats.d_nbOtherResponses},
+          {"http2-other-responses", doh->d_http2Stats.d_nbOtherResponses},
+          {"get-queries", doh->d_getqueries},
+          {"post-queries", doh->d_postqueries},
+          {"bad-requests", doh->d_badrequests},
+          {"error-responses", doh->d_errorresponses},
+          {"redirect-responses", doh->d_redirectresponses},
+          {"valid-responses", doh->d_validresponses}};
 
-              for(const auto& item : v) {
-                str<<base<<name<<"."<<item.first << " " << item.second << " " << now <<"\r\n";
-              }
-            }
-          }
+        for (const auto& item : v) {
+          str << base << name << "." << item.first << " " << item.second << " " << now << "\r\n";
+        }
+      }
+    }
 #endif /* HAVE_DNS_OVER_HTTPS */
 
-          {
-            std::string qname;
-            auto records = g_qcount.records.write_lock();
-            for (const auto &record : *records) {
-              qname = record.first;
-              boost::replace_all(qname, ".", "_");
-              str<<"dnsdist.querycount." << qname << ".queries " << record.second << " " << now << "\r\n";
-            }
-            records->clear();
-          }
+    {
+      std::string qname;
+      auto records = g_qcount.records.write_lock();
+      for (const auto& record : *records) {
+        qname = record.first;
+        boost::replace_all(qname, ".", "_");
+        str << "dnsdist.querycount." << qname << ".queries " << record.second << " " << now << "\r\n";
+      }
+      records->clear();
+    }
 
-          const string msg = str.str();
+    const string msg = str.str();
 
-          int ret = waitForRWData(s.getHandle(), false, 1 , 0);
-          if(ret <= 0 ) {
-            vinfolog("Unable to write data to carbon server on %s: %s", server.toStringWithPort(), (ret<0 ? stringerror() : "Timeout"));
-            continue;
-          }
-          s.setBlocking();
-          writen2(s.getHandle(), msg.c_str(), msg.size());
+    int ret = waitForRWData(s.getHandle(), false, 1, 0);
+    if (ret <= 0) {
+      vinfolog("Unable to write data to carbon server on %s: %s", server.toStringWithPort(), (ret < 0 ? stringerror() : "Timeout"));
+      return false;
+    }
+    s.setBlocking();
+    writen2(s.getHandle(), msg.c_str(), msg.size());
+  }
+  catch (const std::exception& e) {
+    warnlog("Problem sending carbon data to %s: %s", server.toStringWithPort(), e.what());
+    return false;
+  }
+
+  return true;
+}
+
+static void carbonHandler(Carbon::Endpoint endpoint)
+{
+  setThreadName("dnsdist/carbon");
+  const auto intervalUSec = endpoint.interval * 1000 * 1000;
+
+  try {
+    size_t consecutiveFailures = 0;
+    do {
+      DTime dt;
+      dt.set();
+      if (doOneCarbonExport(endpoint)) {
+        const auto elapsedUSec = dt.udiff();
+        if (elapsedUSec < 0 || static_cast<unsigned int>(elapsedUSec) <= intervalUSec) {
+          useconds_t toSleepUSec = intervalUSec - elapsedUSec;
+          usleep(toSleepUSec);
+        }
+        else {
+          vinfolog("Carbon export for %s took longer (%s usec) than the configured interval (%d usec)", endpoint.server.toStringWithPort(), elapsedUSec, intervalUSec);
         }
-        catch(const std::exception& e) {
-          warnlog("Problem sending carbon data: %s", e.what());
+        consecutiveFailures = 0;
+      }
+      else {
+        /* maximum interval between two attempts is 10 minutes */
+        const time_t maxBackOff = 10 * 60;
+        time_t backOff = 1;
+        double backOffTmp = std::pow(2.0, consecutiveFailures);
+        if (backOffTmp != HUGE_VAL && static_cast<uint64_t>(backOffTmp) <= static_cast<uint64_t>(std::numeric_limits<time_t>::max())) {
+          backOff = static_cast<time_t>(backOffTmp);
+          if (backOff > maxBackOff) {
+            backOff = maxBackOff;
+          }
         }
+        consecutiveFailures++;
+        vinfolog("Run for %s - %s failed, next attempt in %d", endpoint.server.toStringWithPort(), endpoint.ourname, backOff);
+        sleep(backOff);
       }
+    } while (true);
+  }
+  catch (const PDNSException& e) {
+    errlog("Carbon thread for %s died, PDNSException: %s", endpoint.server.toStringWithPort(), e.reason);
+  }
+  catch (...) {
+    errlog("Carbon thread for %s died", endpoint.server.toStringWithPort());
+  }
+}
+
+bool Carbon::addEndpoint(Carbon::Endpoint&& endpoint)
+{
+  if (endpoint.ourname.empty()) {
+    try {
+      endpoint.ourname = getCarbonHostName();
+    }
+    catch (const std::exception& e) {
+      throw std::runtime_error(std::string("The 'ourname' setting in 'carbonServer()' has not been set and we are unable to determine the system's hostname: ") + e.what());
     }
   }
-  catch(const std::exception& e)
-  {
-    errlog("Carbon thread died: %s", e.what());
+
+  auto config = s_config.lock();
+  if (config->d_running) {
+    // we already started the threads, let's just spawn a new one
+    std::thread newHandler(carbonHandler, endpoint);
+    newHandler.detach();
   }
-  catch(const PDNSException& e)
-  {
-    errlog("Carbon thread died, PDNSException: %s", e.reason);
+  else {
+    config->d_endpoints.push_back(std::move(endpoint));
   }
-  catch(...)
-  {
-    errlog("Carbon thread died");
+  return true;
+}
+
+void Carbon::run()
+{
+  auto config = s_config.lock();
+  if (config->d_running) {
+    throw std::runtime_error("The carbon threads are already running");
   }
+  for (const auto& endpoint : config->d_endpoints) {
+    std::thread newHandler(carbonHandler, endpoint);
+    newHandler.detach();
+  }
+  config->d_running = true;
+}
+
 }
 #endif /* DISABLE_CARBON */
 
index e5b31692f980f067a8270cdae7d3b4a64aee9a2d..c7d65cbb4d5cfe537990f3d5223b16ca3cdb3e67 100644 (file)
@@ -1091,13 +1091,12 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
 #ifndef DISABLE_CARBON
   luaCtx.writeFunction("carbonServer", [](const std::string& address, boost::optional<string> ourName, boost::optional<uint64_t> interval, boost::optional<string> namespace_name, boost::optional<string> instance_name) {
     setLuaSideEffect();
-    auto ours = g_carbon.getCopy();
-    ours.push_back({ComboAddress(address, 2003),
-                    (namespace_name && !namespace_name->empty()) ? *namespace_name : "dnsdist",
-                    ourName ? *ourName : "",
-                    (instance_name && !instance_name->empty()) ? *instance_name : "main",
-                    (interval && *interval < std::numeric_limits<unsigned int>::max()) ? static_cast<unsigned int>(*interval) : 30});
-    g_carbon.setState(ours);
+    dnsdist::Carbon::Endpoint endpoint{ComboAddress(address, 2003),
+                                       (namespace_name && !namespace_name->empty()) ? *namespace_name : "dnsdist",
+                                       ourName ? *ourName : "",
+                                       (instance_name && !instance_name->empty()) ? *instance_name : "main",
+                                       (interval && *interval < std::numeric_limits<unsigned int>::max()) ? static_cast<unsigned int>(*interval) : 30};
+    dnsdist::Carbon::addEndpoint(std::move(endpoint));
   });
 #endif /* DISABLE_CARBON */
 
index a1a6061eae23d099463408928836636a63f7439c..0795b07d961c2133a0950c102c24217af5f2408f 100644 (file)
@@ -2852,8 +2852,7 @@ int main(int argc, char** argv)
     dnsdist::ServiceDiscovery::run();
 
 #ifndef DISABLE_CARBON
-    thread carbonthread(carbonDumpThread);
-    carbonthread.detach();
+    dnsdist::Carbon::run();
 #endif /* DISABLE_CARBON */
 
     thread stattid(maintThread);
index 5b3fbbce0b98ed5c3327125f56e842085e8588b4..0fb04d4f03352d346e4dd38cbb4ff7c4e697bd21 100644 (file)
 #include "config.h"
 
 #ifndef DISABLE_CARBON
-#include "sholder.hh"
+
+#include <thread>
 #include "iputils.hh"
+#include "lock.hh"
 
-struct CarbonConfig
+namespace dnsdist
+{
+class Carbon
 {
-  ComboAddress server;
-  std::string namespace_name;
-  std::string ourname;
-  std::string instance_name;
-  unsigned int interval;
+public:
+  struct Endpoint
+  {
+    ComboAddress server;
+    std::string namespace_name;
+    std::string ourname;
+    std::string instance_name;
+    unsigned int interval;
+  };
+
+  static bool addEndpoint(Endpoint&& endpoint);
+  static void run();
+
+private:
+  struct Config
+  {
+    std::vector<Endpoint> d_endpoints;
+    bool d_running{false};
+  };
+
+  static LockGuarded<Config> s_config;
 };
 
-extern GlobalStateHolder<std::vector<CarbonConfig>> g_carbon;
-void carbonDumpThread();
+}
+
 #endif /* DISABLE_CARBON */