From: Remi Gacogne Date: Mon, 16 Jan 2023 17:20:28 +0000 (+0100) Subject: dnsdist: Better handling of multiple carbon servers X-Git-Tag: dnsdist-1.8.0-rc1~88^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5552745fa8c74ade21ab9b7b4e9afbaa86f21409;p=thirdparty%2Fpdns.git dnsdist: Better handling of multiple carbon servers --- diff --git a/.not-formatted b/.not-formatted index e0055fbfd2..51a437ae3c 100644 --- a/.not-formatted +++ b/.not-formatted @@ -48,7 +48,6 @@ ./pdns/dnsdemog.cc ./pdns/dnsdist-cache.cc ./pdns/dnsdist-cache.hh -./pdns/dnsdist-carbon.cc ./pdns/dnsdist-console.cc ./pdns/dnsdist-console.hh ./pdns/dnsdist-dynblocks.hh diff --git a/pdns/dnsdist-carbon.cc b/pdns/dnsdist-carbon.cc index ef7a856082..6df6d9982b 100644 --- a/pdns/dnsdist-carbon.cc +++ b/pdns/dnsdist-carbon.cc @@ -31,255 +31,321 @@ #include "sstuff.hh" #include "threadname.hh" -GlobalStateHolder > g_carbon; +namespace dnsdist +{ + +LockGuarded Carbon::s_config; -void carbonDumpThread() +static bool doOneCarbonExport(const Carbon::Endpoint& endpoint) { - try - { - setThreadName("dnsdist/carbon"); - auto localCarbon = g_carbon.getLocal(); - for(int numloops=0;;++numloops) { - if(localCarbon->empty()) { - sleep(1); - continue; + const auto& server = endpoint.server; + const std::string& namespace_name = endpoint.namespace_name; + const std::string& hostname = endpoint.ourname; + const std::string& instance_name = endpoint.instance_name; + + try { + Socket s(server.sin4.sin_family, SOCK_STREAM); + s.setNonBlocking(); + s.connect(server); // we do the connect so the attempt happens while we gather stats + ostringstream str; + + const time_t now = time(nullptr); + + for (const auto& e : g_stats.entries) { + str << namespace_name << "." << hostname << "." << instance_name << "." << e.first << ' '; + if (const auto& val = boost::get(&e.second)) { + str << (*val)->load(); + } + else if (const auto& adval = boost::get*>(&e.second)) { + str << (*adval)->load(); + } + else if (const auto& dval = boost::get(&e.second)) { + str << **dval; } - /* this is wrong, we use the interval of the first server - for every single one of them */ - if(numloops) { - const unsigned int interval = localCarbon->at(0).interval; - sleep(interval); + else if (const auto& func = boost::get(&e.second)) { + str << (*func)(e.first); } + str << ' ' << now << "\r\n"; + } - for (const auto& conf : *localCarbon) { - const auto& server = conf.server; - const std::string& namespace_name = conf.namespace_name; - std::string hostname = conf.ourname; - if (hostname.empty()) { - try { - hostname = getCarbonHostName(); - } - catch(const std::exception& e) { - throw std::runtime_error(std::string("The 'ourname' setting in 'carbonServer()' has not been set and we are unable to determine the system's hostname: ") + e.what()); - } - } - const std::string& instance_name = conf.instance_name; + auto states = g_dstates.getLocal(); + for (const auto& state : *states) { + string serverName = state->getName().empty() ? state->d_config.remote.toStringWithPort() : state->getName(); + boost::replace_all(serverName, ".", "_"); + const string base = namespace_name + "." + hostname + "." + instance_name + ".servers." + serverName + "."; + str << base << "queries" << ' ' << state->queries.load() << " " << now << "\r\n"; + str << base << "responses" << ' ' << state->responses.load() << " " << now << "\r\n"; + str << base << "drops" << ' ' << state->reuseds.load() << " " << now << "\r\n"; + str << base << "latency" << ' ' << (state->d_config.availability != DownstreamState::Availability::Down ? state->latencyUsec / 1000.0 : 0) << " " << now << "\r\n"; + str << base << "senderrors" << ' ' << state->sendErrors.load() << " " << now << "\r\n"; + str << base << "outstanding" << ' ' << state->outstanding.load() << " " << now << "\r\n"; + str << base << "tcpdiedsendingquery" << ' ' << state->tcpDiedSendingQuery.load() << " " << now << "\r\n"; + str << base << "tcpdiedreaddingresponse" << ' ' << state->tcpDiedReadingResponse.load() << " " << now << "\r\n"; + str << base << "tcpgaveup" << ' ' << state->tcpGaveUp.load() << " " << now << "\r\n"; + str << base << "tcpreadimeouts" << ' ' << state->tcpReadTimeouts.load() << " " << now << "\r\n"; + str << base << "tcpwritetimeouts" << ' ' << state->tcpWriteTimeouts.load() << " " << now << "\r\n"; + str << base << "tcpconnecttimeouts" << ' ' << state->tcpConnectTimeouts.load() << " " << now << "\r\n"; + str << base << "tcpcurrentconnections" << ' ' << state->tcpCurrentConnections.load() << " " << now << "\r\n"; + str << base << "tcpmaxconcurrentconnections" << ' ' << state->tcpMaxConcurrentConnections.load() << " " << now << "\r\n"; + str << base << "tcpnewconnections" << ' ' << state->tcpNewConnections.load() << " " << now << "\r\n"; + str << base << "tcpreusedconnections" << ' ' << state->tcpReusedConnections.load() << " " << now << "\r\n"; + str << base << "tlsresumptions" << ' ' << state->tlsResumptions.load() << " " << now << "\r\n"; + str << base << "tcpavgqueriesperconnection" << ' ' << state->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n"; + str << base << "tcpavgconnectionduration" << ' ' << state->tcpAvgConnectionDuration.load() << " " << now << "\r\n"; + str << base << "tcptoomanyconcurrentconnections" << ' ' << state->tcpTooManyConcurrentConnections.load() << " " << now << "\r\n"; + } - try { - Socket s(server.sin4.sin_family, SOCK_STREAM); - s.setNonBlocking(); - s.connect(server); // we do the connect so the attempt happens while we gather stats - ostringstream str; - time_t now=time(0); - for(const auto& e : g_stats.entries) { - str<(&e.second)) { - str<<(*val)->load(); - } - else if(const auto& adval = boost::get*>(&e.second)) { - str<<(*adval)->load(); - } - else if (const auto& dval = boost::get(&e.second)) { - str<<**dval; - } - else if (const auto& func = boost::get(&e.second)) { - str<<(*func)(e.first); - } - str<<' '<getName().empty() ? state->d_config.remote.toStringWithPort() : state->getName(); - boost::replace_all(serverName, ".", "_"); - const string base = namespace_name + "." + hostname + "." + instance_name + ".servers." + serverName + "."; - str<queries.load() << " " << now << "\r\n"; - str<responses.load() << " " << now << "\r\n"; - str<reuseds.load() << " " << now << "\r\n"; - str<d_config.availability != DownstreamState::Availability::Down ? state->latencyUsec/1000.0 : 0) << " " << now << "\r\n"; - str<sendErrors.load() << " " << now << "\r\n"; - str<outstanding.load() << " " << now << "\r\n"; - str<tcpDiedSendingQuery.load() << " " << now << "\r\n"; - str<tcpDiedReadingResponse.load() << " " << now << "\r\n"; - str<tcpGaveUp.load() << " " << now << "\r\n"; - str<tcpReadTimeouts.load() << " " << now << "\r\n"; - str<tcpWriteTimeouts.load() << " " << now << "\r\n"; - str<tcpConnectTimeouts.load() << " " << now << "\r\n"; - str<tcpCurrentConnections.load() << " " << now << "\r\n"; - str<tcpMaxConcurrentConnections.load() << " " << now << "\r\n"; - str<tcpNewConnections.load() << " " << now << "\r\n"; - str<tcpReusedConnections.load() << " " << now << "\r\n"; - str<tlsResumptions.load() << " " << now << "\r\n"; - str<tcpAvgQueriesPerConnection.load() << " " << now << "\r\n"; - str<tcpAvgConnectionDuration.load() << " " << now << "\r\n"; - str<tcpTooManyConcurrentConnections.load() << " " << now << "\r\n"; - } + std::map frontendDuplicates; + for (const auto& front : g_frontends) { + if (front->udpFD == -1 && front->tcpFD == -1) { + continue; + } - std::map frontendDuplicates; - for(const auto& front : g_frontends) { - if (front->udpFD == -1 && front->tcpFD == -1) - continue; + string frontName = front->local.toStringWithPort() + (front->udpFD >= 0 ? "_udp" : "_tcp"); + boost::replace_all(frontName, ".", "_"); + auto dupPair = frontendDuplicates.insert({frontName, 1}); + if (!dupPair.second) { + frontName = frontName + "_" + std::to_string(dupPair.first->second); + ++(dupPair.first->second); + } - string frontName = front->local.toStringWithPort() + (front->udpFD >= 0 ? "_udp" : "_tcp"); - boost::replace_all(frontName, ".", "_"); - auto dupPair = frontendDuplicates.insert({frontName, 1}); - if (!dupPair.second) { - frontName = frontName + "_" + std::to_string(dupPair.first->second); - ++(dupPair.first->second); - } + const string base = namespace_name + "." + hostname + "." + instance_name + ".frontends." + frontName + "."; + str << base << "queries" << ' ' << front->queries.load() << " " << now << "\r\n"; + str << base << "responses" << ' ' << front->responses.load() << " " << now << "\r\n"; + str << base << "tcpdiedreadingquery" << ' ' << front->tcpDiedReadingQuery.load() << " " << now << "\r\n"; + str << base << "tcpdiedsendingresponse" << ' ' << front->tcpDiedSendingResponse.load() << " " << now << "\r\n"; + str << base << "tcpgaveup" << ' ' << front->tcpGaveUp.load() << " " << now << "\r\n"; + str << base << "tcpclientimeouts" << ' ' << front->tcpClientTimeouts.load() << " " << now << "\r\n"; + str << base << "tcpdownstreamtimeouts" << ' ' << front->tcpDownstreamTimeouts.load() << " " << now << "\r\n"; + str << base << "tcpcurrentconnections" << ' ' << front->tcpCurrentConnections.load() << " " << now << "\r\n"; + str << base << "tcpmaxconcurrentconnections" << ' ' << front->tcpMaxConcurrentConnections.load() << " " << now << "\r\n"; + str << base << "tcpavgqueriesperconnection" << ' ' << front->tcpAvgQueriesPerConnection.load() << " " << now << "\r\n"; + str << base << "tcpavgconnectionduration" << ' ' << front->tcpAvgConnectionDuration.load() << " " << now << "\r\n"; + str << base << "tls10-queries" << ' ' << front->tls10queries.load() << " " << now << "\r\n"; + str << base << "tls11-queries" << ' ' << front->tls11queries.load() << " " << now << "\r\n"; + str << base << "tls12-queries" << ' ' << front->tls12queries.load() << " " << now << "\r\n"; + str << base << "tls13-queries" << ' ' << front->tls13queries.load() << " " << now << "\r\n"; + str << base << "tls-unknown-queries" << ' ' << front->tlsUnknownqueries.load() << " " << now << "\r\n"; + str << base << "tlsnewsessions" << ' ' << front->tlsNewSessions.load() << " " << now << "\r\n"; + str << base << "tlsresumptions" << ' ' << front->tlsResumptions.load() << " " << now << "\r\n"; + str << base << "tlsunknownticketkeys" << ' ' << front->tlsUnknownTicketKey.load() << " " << now << "\r\n"; + str << base << "tlsinactiveticketkeys" << ' ' << front->tlsInactiveTicketKey.load() << " " << now << "\r\n"; - const string base = namespace_name + "." + hostname + "." + instance_name + ".frontends." + frontName + "."; - str<queries.load() << " " << now << "\r\n"; - str<responses.load() << " " << now << "\r\n"; - str<tcpDiedReadingQuery.load() << " " << now << "\r\n"; - str<tcpDiedSendingResponse.load() << " " << now << "\r\n"; - str<tcpGaveUp.load() << " " << now << "\r\n"; - str<tcpClientTimeouts.load() << " " << now << "\r\n"; - str<tcpDownstreamTimeouts.load() << " " << now << "\r\n"; - str<tcpCurrentConnections.load() << " " << now << "\r\n"; - str<tcpMaxConcurrentConnections.load() << " " << now << "\r\n"; - str<tcpAvgQueriesPerConnection.load() << " " << now << "\r\n"; - str<tcpAvgConnectionDuration.load() << " " << now << "\r\n"; - str<tls10queries.load() << " " << now << "\r\n"; - str<tls11queries.load() << " " << now << "\r\n"; - str<tls12queries.load() << " " << now << "\r\n"; - str<tls13queries.load() << " " << now << "\r\n"; - str<tlsUnknownqueries.load() << " " << now << "\r\n"; - str<tlsNewSessions.load() << " " << now << "\r\n"; - str<tlsResumptions.load() << " " << now << "\r\n"; - str<tlsUnknownTicketKey.load() << " " << now << "\r\n"; - str<tlsInactiveTicketKey.load() << " " << now << "\r\n"; - const TLSErrorCounters* errorCounters = nullptr; - if (front->tlsFrontend != nullptr) { - errorCounters = &front->tlsFrontend->d_tlsCounters; - } - else if (front->dohFrontend != nullptr) { - errorCounters = &front->dohFrontend->d_tlsCounters; - } - if (errorCounters != nullptr) { - str<d_dhKeyTooSmall << " " << now << "\r\n"; - str<d_inappropriateFallBack << " " << now << "\r\n"; - str<d_noSharedCipher << " " << now << "\r\n"; - str<d_unknownCipherType << " " << now << "\r\n"; - str<d_unknownKeyExchangeType << " " << now << "\r\n"; - str<d_unknownProtocol << " " << now << "\r\n"; - str<d_unsupportedEC << " " << now << "\r\n"; - str<d_unsupportedProtocol << " " << now << "\r\n"; - } - } + const TLSErrorCounters* errorCounters = nullptr; + if (front->tlsFrontend != nullptr) { + errorCounters = &front->tlsFrontend->d_tlsCounters; + } + else if (front->dohFrontend != nullptr) { + errorCounters = &front->dohFrontend->d_tlsCounters; + } + if (errorCounters != nullptr) { + str << base << "tlsdhkeytoosmall" << ' ' << errorCounters->d_dhKeyTooSmall << " " << now << "\r\n"; + str << base << "tlsinappropriatefallback" << ' ' << errorCounters->d_inappropriateFallBack << " " << now << "\r\n"; + str << base << "tlsnosharedcipher" << ' ' << errorCounters->d_noSharedCipher << " " << now << "\r\n"; + str << base << "tlsunknownciphertype" << ' ' << errorCounters->d_unknownCipherType << " " << now << "\r\n"; + str << base << "tlsunknownkeyexchangetype" << ' ' << errorCounters->d_unknownKeyExchangeType << " " << now << "\r\n"; + str << base << "tlsunknownprotocol" << ' ' << errorCounters->d_unknownProtocol << " " << now << "\r\n"; + str << base << "tlsunsupportedec" << ' ' << errorCounters->d_unsupportedEC << " " << now << "\r\n"; + str << base << "tlsunsupportedprotocol" << ' ' << errorCounters->d_unsupportedProtocol << " " << now << "\r\n"; + } + } - auto localPools = g_pools.getLocal(); - for (const auto& entry : *localPools) { - string poolName = entry.first; - boost::replace_all(poolName, ".", "_"); - if (poolName.empty()) { - poolName = "_default_"; - } - const string base = namespace_name + "." + hostname + "." + instance_name + ".pools." + poolName + "."; - const std::shared_ptr pool = entry.second; - str<countServers(false) << " " << now << "\r\n"; - str<countServers(true) << " " << now << "\r\n"; - if (pool->packetCache != nullptr) { - const auto& cache = pool->packetCache; - str<getMaxEntries() << " " << now << "\r\n"; - str<getEntriesCount() << " " << now << "\r\n"; - str<getHits() << " " << now << "\r\n"; - str<getMisses() << " " << now << "\r\n"; - str<getDeferredInserts() << " " << now << "\r\n"; - str<getDeferredLookups() << " " << now << "\r\n"; - str<getLookupCollisions() << " " << now << "\r\n"; - str<getInsertCollisions() << " " << now << "\r\n"; - str<getTTLTooShorts() << " " << now << "\r\n"; - str<getCleanupCount() << " " << now << "\r\n"; - } - } + auto localPools = g_pools.getLocal(); + for (const auto& entry : *localPools) { + string poolName = entry.first; + boost::replace_all(poolName, ".", "_"); + if (poolName.empty()) { + poolName = "_default_"; + } + const string base = namespace_name + "." + hostname + "." + instance_name + ".pools." + poolName + "."; + const std::shared_ptr pool = entry.second; + str << base << "servers" + << " " << pool->countServers(false) << " " << now << "\r\n"; + str << base << "servers-up" + << " " << pool->countServers(true) << " " << now << "\r\n"; + if (pool->packetCache != nullptr) { + const auto& cache = pool->packetCache; + str << base << "cache-size" + << " " << cache->getMaxEntries() << " " << now << "\r\n"; + str << base << "cache-entries" + << " " << cache->getEntriesCount() << " " << now << "\r\n"; + str << base << "cache-hits" + << " " << cache->getHits() << " " << now << "\r\n"; + str << base << "cache-misses" + << " " << cache->getMisses() << " " << now << "\r\n"; + str << base << "cache-deferred-inserts" + << " " << cache->getDeferredInserts() << " " << now << "\r\n"; + str << base << "cache-deferred-lookups" + << " " << cache->getDeferredLookups() << " " << now << "\r\n"; + str << base << "cache-lookup-collisions" + << " " << cache->getLookupCollisions() << " " << now << "\r\n"; + str << base << "cache-insert-collisions" + << " " << cache->getInsertCollisions() << " " << now << "\r\n"; + str << base << "cache-ttl-too-shorts" + << " " << cache->getTTLTooShorts() << " " << now << "\r\n"; + str << base << "cache-cleanup-count" + << " " << cache->getCleanupCount() << " " << now << "\r\n"; + } + } #ifdef HAVE_DNS_OVER_HTTPS - { - std::map dohFrontendDuplicates; - const string base = "dnsdist." + hostname + ".main.doh."; - for(const auto& doh : g_dohlocals) { - string name = doh->d_local.toStringWithPort(); - boost::replace_all(name, ".", "_"); - boost::replace_all(name, ":", "_"); - boost::replace_all(name, "[", "_"); - boost::replace_all(name, "]", "_"); + { + std::map dohFrontendDuplicates; + const string base = "dnsdist." + hostname + ".main.doh."; + for (const auto& doh : g_dohlocals) { + string name = doh->d_local.toStringWithPort(); + boost::replace_all(name, ".", "_"); + boost::replace_all(name, ":", "_"); + boost::replace_all(name, "[", "_"); + boost::replace_all(name, "]", "_"); - auto dupPair = dohFrontendDuplicates.insert({name, 1}); - if (!dupPair.second) { - name = name + "_" + std::to_string(dupPair.first->second); - ++(dupPair.first->second); - } + auto dupPair = dohFrontendDuplicates.insert({name, 1}); + if (!dupPair.second) { + name = name + "_" + std::to_string(dupPair.first->second); + ++(dupPair.first->second); + } - vector> v{ - {"http-connects", doh->d_httpconnects}, - {"http1-queries", doh->d_http1Stats.d_nbQueries}, - {"http2-queries", doh->d_http2Stats.d_nbQueries}, - {"http1-200-responses", doh->d_http1Stats.d_nb200Responses}, - {"http2-200-responses", doh->d_http2Stats.d_nb200Responses}, - {"http1-400-responses", doh->d_http1Stats.d_nb400Responses}, - {"http2-400-responses", doh->d_http2Stats.d_nb400Responses}, - {"http1-403-responses", doh->d_http1Stats.d_nb403Responses}, - {"http2-403-responses", doh->d_http2Stats.d_nb403Responses}, - {"http1-500-responses", doh->d_http1Stats.d_nb500Responses}, - {"http2-500-responses", doh->d_http2Stats.d_nb500Responses}, - {"http1-502-responses", doh->d_http1Stats.d_nb502Responses}, - {"http2-502-responses", doh->d_http2Stats.d_nb502Responses}, - {"http1-other-responses", doh->d_http1Stats.d_nbOtherResponses}, - {"http2-other-responses", doh->d_http2Stats.d_nbOtherResponses}, - {"get-queries", doh->d_getqueries}, - {"post-queries", doh->d_postqueries}, - {"bad-requests", doh->d_badrequests}, - {"error-responses", doh->d_errorresponses}, - {"redirect-responses", doh->d_redirectresponses}, - {"valid-responses", doh->d_validresponses} - }; + vector> v{ + {"http-connects", doh->d_httpconnects}, + {"http1-queries", doh->d_http1Stats.d_nbQueries}, + {"http2-queries", doh->d_http2Stats.d_nbQueries}, + {"http1-200-responses", doh->d_http1Stats.d_nb200Responses}, + {"http2-200-responses", doh->d_http2Stats.d_nb200Responses}, + {"http1-400-responses", doh->d_http1Stats.d_nb400Responses}, + {"http2-400-responses", doh->d_http2Stats.d_nb400Responses}, + {"http1-403-responses", doh->d_http1Stats.d_nb403Responses}, + {"http2-403-responses", doh->d_http2Stats.d_nb403Responses}, + {"http1-500-responses", doh->d_http1Stats.d_nb500Responses}, + {"http2-500-responses", doh->d_http2Stats.d_nb500Responses}, + {"http1-502-responses", doh->d_http1Stats.d_nb502Responses}, + {"http2-502-responses", doh->d_http2Stats.d_nb502Responses}, + {"http1-other-responses", doh->d_http1Stats.d_nbOtherResponses}, + {"http2-other-responses", doh->d_http2Stats.d_nbOtherResponses}, + {"get-queries", doh->d_getqueries}, + {"post-queries", doh->d_postqueries}, + {"bad-requests", doh->d_badrequests}, + {"error-responses", doh->d_errorresponses}, + {"redirect-responses", doh->d_redirectresponses}, + {"valid-responses", doh->d_validresponses}}; - for(const auto& item : v) { - str<clear(); - } + { + std::string qname; + auto records = g_qcount.records.write_lock(); + for (const auto& record : *records) { + qname = record.first; + boost::replace_all(qname, ".", "_"); + str << "dnsdist.querycount." << qname << ".queries " << record.second << " " << now << "\r\n"; + } + records->clear(); + } - const string msg = str.str(); + const string msg = str.str(); - int ret = waitForRWData(s.getHandle(), false, 1 , 0); - if(ret <= 0 ) { - vinfolog("Unable to write data to carbon server on %s: %s", server.toStringWithPort(), (ret<0 ? stringerror() : "Timeout")); - continue; - } - s.setBlocking(); - writen2(s.getHandle(), msg.c_str(), msg.size()); + int ret = waitForRWData(s.getHandle(), false, 1, 0); + if (ret <= 0) { + vinfolog("Unable to write data to carbon server on %s: %s", server.toStringWithPort(), (ret < 0 ? stringerror() : "Timeout")); + return false; + } + s.setBlocking(); + writen2(s.getHandle(), msg.c_str(), msg.size()); + } + catch (const std::exception& e) { + warnlog("Problem sending carbon data to %s: %s", server.toStringWithPort(), e.what()); + return false; + } + + return true; +} + +static void carbonHandler(Carbon::Endpoint endpoint) +{ + setThreadName("dnsdist/carbon"); + const auto intervalUSec = endpoint.interval * 1000 * 1000; + + try { + size_t consecutiveFailures = 0; + do { + DTime dt; + dt.set(); + if (doOneCarbonExport(endpoint)) { + const auto elapsedUSec = dt.udiff(); + if (elapsedUSec < 0 || static_cast(elapsedUSec) <= intervalUSec) { + useconds_t toSleepUSec = intervalUSec - elapsedUSec; + usleep(toSleepUSec); + } + else { + vinfolog("Carbon export for %s took longer (%s usec) than the configured interval (%d usec)", endpoint.server.toStringWithPort(), elapsedUSec, intervalUSec); } - catch(const std::exception& e) { - warnlog("Problem sending carbon data: %s", e.what()); + consecutiveFailures = 0; + } + else { + /* maximum interval between two attempts is 10 minutes */ + const time_t maxBackOff = 10 * 60; + time_t backOff = 1; + double backOffTmp = std::pow(2.0, consecutiveFailures); + if (backOffTmp != HUGE_VAL && static_cast(backOffTmp) <= static_cast(std::numeric_limits::max())) { + backOff = static_cast(backOffTmp); + if (backOff > maxBackOff) { + backOff = maxBackOff; + } } + consecutiveFailures++; + vinfolog("Run for %s - %s failed, next attempt in %d", endpoint.server.toStringWithPort(), endpoint.ourname, backOff); + sleep(backOff); } + } while (true); + } + catch (const PDNSException& e) { + errlog("Carbon thread for %s died, PDNSException: %s", endpoint.server.toStringWithPort(), e.reason); + } + catch (...) { + errlog("Carbon thread for %s died", endpoint.server.toStringWithPort()); + } +} + +bool Carbon::addEndpoint(Carbon::Endpoint&& endpoint) +{ + if (endpoint.ourname.empty()) { + try { + endpoint.ourname = getCarbonHostName(); + } + catch (const std::exception& e) { + throw std::runtime_error(std::string("The 'ourname' setting in 'carbonServer()' has not been set and we are unable to determine the system's hostname: ") + e.what()); } } - catch(const std::exception& e) - { - errlog("Carbon thread died: %s", e.what()); + + auto config = s_config.lock(); + if (config->d_running) { + // we already started the threads, let's just spawn a new one + std::thread newHandler(carbonHandler, endpoint); + newHandler.detach(); } - catch(const PDNSException& e) - { - errlog("Carbon thread died, PDNSException: %s", e.reason); + else { + config->d_endpoints.push_back(std::move(endpoint)); } - catch(...) - { - errlog("Carbon thread died"); + return true; +} + +void Carbon::run() +{ + auto config = s_config.lock(); + if (config->d_running) { + throw std::runtime_error("The carbon threads are already running"); } + for (const auto& endpoint : config->d_endpoints) { + std::thread newHandler(carbonHandler, endpoint); + newHandler.detach(); + } + config->d_running = true; +} + } #endif /* DISABLE_CARBON */ diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index e5b31692f9..c7d65cbb4d 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -1091,13 +1091,12 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) #ifndef DISABLE_CARBON luaCtx.writeFunction("carbonServer", [](const std::string& address, boost::optional ourName, boost::optional interval, boost::optional namespace_name, boost::optional instance_name) { setLuaSideEffect(); - auto ours = g_carbon.getCopy(); - ours.push_back({ComboAddress(address, 2003), - (namespace_name && !namespace_name->empty()) ? *namespace_name : "dnsdist", - ourName ? *ourName : "", - (instance_name && !instance_name->empty()) ? *instance_name : "main", - (interval && *interval < std::numeric_limits::max()) ? static_cast(*interval) : 30}); - g_carbon.setState(ours); + dnsdist::Carbon::Endpoint endpoint{ComboAddress(address, 2003), + (namespace_name && !namespace_name->empty()) ? *namespace_name : "dnsdist", + ourName ? *ourName : "", + (instance_name && !instance_name->empty()) ? *instance_name : "main", + (interval && *interval < std::numeric_limits::max()) ? static_cast(*interval) : 30}; + dnsdist::Carbon::addEndpoint(std::move(endpoint)); }); #endif /* DISABLE_CARBON */ diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index a1a6061eae..0795b07d96 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -2852,8 +2852,7 @@ int main(int argc, char** argv) dnsdist::ServiceDiscovery::run(); #ifndef DISABLE_CARBON - thread carbonthread(carbonDumpThread); - carbonthread.detach(); + dnsdist::Carbon::run(); #endif /* DISABLE_CARBON */ thread stattid(maintThread); diff --git a/pdns/dnsdistdist/dnsdist-carbon.hh b/pdns/dnsdistdist/dnsdist-carbon.hh index 5b3fbbce0b..0fb04d4f03 100644 --- a/pdns/dnsdistdist/dnsdist-carbon.hh +++ b/pdns/dnsdistdist/dnsdist-carbon.hh @@ -24,18 +24,38 @@ #include "config.h" #ifndef DISABLE_CARBON -#include "sholder.hh" + +#include #include "iputils.hh" +#include "lock.hh" -struct CarbonConfig +namespace dnsdist +{ +class Carbon { - ComboAddress server; - std::string namespace_name; - std::string ourname; - std::string instance_name; - unsigned int interval; +public: + struct Endpoint + { + ComboAddress server; + std::string namespace_name; + std::string ourname; + std::string instance_name; + unsigned int interval; + }; + + static bool addEndpoint(Endpoint&& endpoint); + static void run(); + +private: + struct Config + { + std::vector d_endpoints; + bool d_running{false}; + }; + + static LockGuarded s_config; }; -extern GlobalStateHolder> g_carbon; -void carbonDumpThread(); +} + #endif /* DISABLE_CARBON */