#include "dnsdist-cache.hh"
#include "dnsdist-console.hh"
#include "dnsdist-ecs.hh"
+#include "dnsdist-healthchecks.hh"
#include "dnsdist-lua.hh"
#include "dnsdist-rings.hh"
#include "dnsdist-secpoll.hh"
#include "dolog.hh"
#include "dnsname.hh"
#include "dnsparser.hh"
-#include "dnswriter.hh"
#include "ednsoptions.hh"
#include "gettime.hh"
#include "lock.hh"
bool g_verbose;
struct DNSDistStats g_stats;
-MetricDefinitionStorage g_metricDefinitions;
uint16_t g_maxOutstanding{std::numeric_limits<uint16_t>::max()};
-bool g_verboseHealthChecks{false};
uint32_t g_staleCacheEntriesTTL{0};
bool g_syslog{true};
bool g_allowEmptyResponse{false};
bool g_preserveTrailingData{false};
bool g_roundrobinFailOnNoServer{false};
+std::set<std::string> g_capabilitiesToRetain;
+
static void truncateTC(char* packet, uint16_t* len, size_t responseSize, unsigned int consumed)
try
{
return false;
}
- if (dr.packetCache && !dr.skipCache) {
+ if (dr.packetCache && !dr.skipCache && *responseLen <= s_maxPacketCacheEntrySize) {
if (!dr.useZeroScope) {
/* if the query was not suitable for zero-scope, for
example because it had an existing ECS entry so the hash is
try {
setThreadName("dnsdist/respond");
auto localRespRulactions = g_resprulactions.getLocal();
- char packet[4096 + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
+ char packet[s_maxPacketCacheEntrySize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE];
static_assert(sizeof(packet) <= UINT16_MAX, "Packet size should fit in a uint16_t");
/* when the answer is encrypted in place, we need to get a copy
of the original header before encryption to fill the ring buffer */
du->response = std::string(response, responseLen);
if (send(du->rsock, &du, sizeof(du), 0) != sizeof(du)) {
/* at this point we have the only remaining pointer on this
- DOHUnit object since we did set ids->du to nullptr earlier */
- delete du;
+ DOHUnit object since we did set ids->du to nullptr earlier,
+ except if we got the response before the pointer could be
+ released by the frontend */
+ du->release();
}
#endif /* HAVE_DNS_OVER_HTTPS */
du = nullptr;
}
++g_stats.responses;
+ if (ids->cs) {
+ ++ids->cs->responses;
+ }
+ ++dss->responses;
double udiff = ids->sentTime.udiff();
vinfolog("Got answer from %s, relayed to %s%s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(),
fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
if (!IsAnyAddress(sourceAddr)) {
SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+ if (!sourceItfName.empty()) {
+#ifdef SO_BINDTODEVICE
+ int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, sourceItfName.c_str(), sourceItfName.length());
+ if (res != 0) {
+ infolog("Error setting up the interface on backend socket '%s': %s", remote.toStringWithPort(), stringerror());
+ }
+#endif
+ }
+
SBind(fd, sourceAddr);
}
try {
}
}
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, const std::string& sourceItfName_, size_t numberOfSockets, bool connect=true): sourceItfName(sourceItfName_), remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
{
pthread_rwlock_init(&d_lock, nullptr);
id = getUniqueID();
fd = -1;
}
- if (!IsAnyAddress(remote)) {
+ if (connect && !IsAnyAddress(remote)) {
reconnect();
idStates.resize(g_maxOutstanding);
sw.start();
}
uint32_t g_hashperturb;
+double g_consistentHashBalancingFactor = 0;
shared_ptr<DownstreamState> whashed(const NumberedServerVector& servers, const DNSQuestion* dq)
{
return valrandom(dq->qname->hash(g_hashperturb), servers, dq);
unsigned int min = std::numeric_limits<unsigned int>::max();
shared_ptr<DownstreamState> ret = nullptr, first = nullptr;
+ double targetLoad = std::numeric_limits<double>::max();
+ if (g_consistentHashBalancingFactor > 0) {
+ /* we start with one, representing the query we are currently handling */
+ double currentLoad = 1;
+ for (const auto& pair : servers) {
+ currentLoad += pair.second->outstanding;
+ }
+ targetLoad = (currentLoad / servers.size()) * g_consistentHashBalancingFactor;
+ }
+
for (const auto& d: servers) {
- if (d.second->isUp()) {
+ if (d.second->isUp() && d.second->outstanding <= targetLoad) {
// make sure hashes have been computed
if (d.second->hashes.empty()) {
d.second->hash();
case DNSAction::Action::Truncate:
dq.dh->tc = true;
dq.dh->qr = true;
+ dq.dh->ra = dq.dh->rd;
+ dq.dh->aa = false;
+ dq.dh->ad = false;
return true;
break;
case DNSAction::Action::HeaderModify:
vinfolog("Query from %s truncated because of dynamic block", dq.remote->toStringWithPort());
dq.dh->tc = true;
dq.dh->qr = true;
+ dq.dh->ra = dq.dh->rd;
+ dq.dh->aa = false;
+ dq.dh->ad = false;
return true;
}
else {
vinfolog("Query from %s for %s truncated because of dynamic block", dq.remote->toStringWithPort(), dq.qname->toLogString());
dq.dh->tc = true;
dq.dh->qr = true;
+ dq.dh->ra = dq.dh->rd;
+ dq.dh->aa = false;
+ dq.dh->ad = false;
return true;
}
else {
}
++g_stats.selfAnswered;
+ ++cs.responses;
return ProcessQueryResult::SendAnswer;
}
if (dq.useECS && ((selectedBackend && selectedBackend->useECS) || (!selectedBackend && serverPool->getECS()))) {
// we special case our cache in case a downstream explicitly gave us a universally valid response with a 0 scope
+ // we need ECS parsing (parseECS) to be true so we can be sure that the initial incoming query did not have an existing
+ // ECS option, which would make it unsuitable for the zero-scope feature.
if (dq.packetCache && !dq.skipCache && (!selectedBackend || !selectedBackend->disableZeroScope) && dq.packetCache->isECSParsingEnabled()) {
if (dq.packetCache->get(dq, dq.consumed, dq.dh->id, reinterpret_cast<char*>(dq.dh), &cachedResponseSize, &dq.cacheKeyNoECS, dq.subnet, dq.dnssecOK, allowExpired)) {
dq.len = cachedResponseSize;
{
struct MMReceiver
{
- char packet[4096];
+ char packet[s_maxPacketCacheEntrySize];
ComboAddress remote;
ComboAddress dest;
struct iovec iov;
else
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
{
- char packet[4096];
+ char packet[s_maxPacketCacheEntrySize];
/* the actual buffer is larger because:
- we may have to add EDNS and/or ECS
- we use it for self-generated responses (from rule or cache)
ComboAddress remote;
ComboAddress dest;
remote.sin4.sin_family = cs->local.sin4.sin_family;
- fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
+ fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), packet, s_udpIncomingBufferSize, &remote);
for(;;) {
ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
continue;
}
- processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), s_udpIncomingBufferSize, nullptr, nullptr, nullptr, nullptr);
+ processUDPQuery(*cs, holders, &msgh, remote, dest, packet, static_cast<uint16_t>(got), sizeof(packet), nullptr, nullptr, nullptr, nullptr);
}
}
}
uint16_t getRandomDNSID()
{
#ifdef HAVE_LIBSODIUM
- return (randombytes_random() % 65536);
+ return randombytes_uniform(65536);
#else
return (random() % 65536);
#endif
}
-static bool upCheck(const shared_ptr<DownstreamState>& ds)
-try
-{
- DNSName checkName = ds->checkName;
- uint16_t checkType = ds->checkType.getCode();
- uint16_t checkClass = ds->checkClass;
- dnsheader checkHeader;
- memset(&checkHeader, 0, sizeof(checkHeader));
-
- checkHeader.qdcount = htons(1);
- checkHeader.id = getRandomDNSID();
-
- checkHeader.rd = true;
- if (ds->setCD) {
- checkHeader.cd = true;
- }
-
- if (ds->checkFunction) {
- std::lock_guard<std::mutex> lock(g_luamutex);
- auto ret = ds->checkFunction(checkName, checkType, checkClass, &checkHeader);
- checkName = std::get<0>(ret);
- checkType = std::get<1>(ret);
- checkClass = std::get<2>(ret);
- }
-
- vector<uint8_t> packet;
- DNSPacketWriter dpw(packet, checkName, checkType, checkClass);
- dnsheader * requestHeader = dpw.getHeader();
- *requestHeader = checkHeader;
-
- Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
- sock.setNonBlocking();
- if (!IsAnyAddress(ds->sourceAddr)) {
- sock.setReuseAddr();
- sock.bind(ds->sourceAddr);
- }
- sock.connect(ds->remote);
- ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), reinterpret_cast<char*>(&packet[0]), packet.size(), true);
- if (sent < 0) {
- int ret = errno;
- if (g_verboseHealthChecks)
- infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
- return false;
- }
-
- int ret = waitForRWData(sock.getHandle(), true, /* ms to seconds */ ds->checkTimeout / 1000, /* remaining ms to us */ (ds->checkTimeout % 1000) * 1000);
- if(ret < 0 || !ret) { // error, timeout, both are down!
- if (ret < 0) {
- ret = errno;
- if (g_verboseHealthChecks)
- infolog("Error while waiting for the health check response from backend %s: %d", ds->getNameWithAddr(), ret);
- }
- else {
- if (g_verboseHealthChecks)
- infolog("Timeout while waiting for the health check response from backend %s", ds->getNameWithAddr());
- }
- return false;
- }
-
- string reply;
- ComboAddress from;
- sock.recvFrom(reply, from);
-
- /* we are using a connected socket but hey.. */
- if (from != ds->remote) {
- if (g_verboseHealthChecks)
- infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
- return false;
- }
-
- const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
-
- if (reply.size() < sizeof(*responseHeader)) {
- if (g_verboseHealthChecks)
- infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
- return false;
- }
-
- if (responseHeader->id != requestHeader->id) {
- if (g_verboseHealthChecks)
- infolog("Invalid health check response id %d from backend %s, expecting %d", responseHeader->id, ds->getNameWithAddr(), requestHeader->id);
- return false;
- }
-
- if (!responseHeader->qr) {
- if (g_verboseHealthChecks)
- infolog("Invalid health check response from backend %s, expecting QR to be set", ds->getNameWithAddr());
- return false;
- }
-
- if (responseHeader->rcode == RCode::ServFail) {
- if (g_verboseHealthChecks)
- infolog("Backend %s responded to health check with ServFail", ds->getNameWithAddr());
- return false;
- }
-
- if (ds->mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) {
- if (g_verboseHealthChecks)
- infolog("Backend %s responded to health check with %s while mustResolve is set", ds->getNameWithAddr(), responseHeader->rcode == RCode::NXDomain ? "NXDomain" : "Refused");
- return false;
- }
-
- uint16_t receivedType;
- uint16_t receivedClass;
- DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
-
- if (receivedName != checkName || receivedType != checkType || receivedClass != checkClass) {
- if (g_verboseHealthChecks)
- infolog("Backend %s responded to health check with an invalid qname (%s vs %s), qtype (%s vs %s) or qclass (%d vs %d)", ds->getNameWithAddr(), receivedName.toLogString(), checkName.toLogString(), QType(receivedType).getName(), QType(checkType).getName(), receivedClass, checkClass);
- return false;
- }
-
- return true;
-}
-catch(const std::exception& e)
-{
- if (g_verboseHealthChecks)
- infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
- return false;
-}
-catch(...)
-{
- if (g_verboseHealthChecks)
- infolog("Unknown exception while checking the health of backend %s", ds->getNameWithAddr());
- return false;
-}
-
uint64_t g_maxTCPClientThreads{10};
std::atomic<uint16_t> g_cacheCleaningDelay{60};
std::atomic<uint16_t> g_cacheCleaningPercentage{100};
{
setThreadName("dnsdist/healthC");
- int interval = 1;
+ static const int interval = 1;
for(;;) {
sleep(interval);
- if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads())
+ if(g_tcpclientthreads->getQueuedCount() > 1 && !g_tcpclientthreads->hasReachedMaxThreads()) {
g_tcpclientthreads->addTCPClientThread();
+ }
+ auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
auto states = g_dstates.getLocal(); // this points to the actual shared_ptrs!
for(auto& dss : *states) {
- if(++dss->lastCheck < dss->checkInterval)
+ if(++dss->lastCheck < dss->checkInterval) {
continue;
- dss->lastCheck = 0;
- if(dss->availability==DownstreamState::Availability::Auto) {
- bool newState=upCheck(dss);
- if (newState) {
- /* check succeeded */
- dss->currentCheckFailures = 0;
-
- if (!dss->upStatus) {
- /* we were marked as down */
- dss->consecutiveSuccessfulChecks++;
- if (dss->consecutiveSuccessfulChecks < dss->minRiseSuccesses) {
- /* if we need more than one successful check to rise
- and we didn't reach the threshold yet,
- let's stay down */
- newState = false;
- }
- }
- }
- else {
- /* check failed */
- dss->consecutiveSuccessfulChecks = 0;
-
- if (dss->upStatus) {
- /* we are currently up */
- dss->currentCheckFailures++;
- if (dss->currentCheckFailures < dss->maxCheckFailures) {
- /* we need more than one failure to be marked as down,
- and we did not reach the threshold yet, let's stay down */
- newState = true;
- }
- }
- }
-
- if(newState != dss->upStatus) {
- warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
-
- if (newState && !dss->connected) {
- newState = dss->reconnect();
+ }
- if (dss->connected && !dss->threadStarted.test_and_set()) {
- dss->tid = thread(responderThread, dss);
- }
- }
+ dss->lastCheck = 0;
- dss->upStatus = newState;
- dss->currentCheckFailures = 0;
- dss->consecutiveSuccessfulChecks = 0;
- if (g_snmpAgent && g_snmpTrapsEnabled) {
- g_snmpAgent->sendBackendStatusChangeTrap(dss);
- }
+ if (dss->availability == DownstreamState::Availability::Auto) {
+ if (!queueHealthCheck(mplexer, dss)) {
+ updateHealthCheckResult(dss, false);
}
}
dss->prev.queries.store(dss->queries.load());
dss->prev.reuseds.store(dss->reuseds.load());
- for(IDState& ids : dss->idStates) { // timeouts
+ for (IDState& ids : dss->idStates) { // timeouts
int64_t usageIndicator = ids.usageIndicator;
if(IDState::isInUse(usageIndicator) && ids.age++ > g_udpTimeout) {
/* We mark the state as unused as soon as possible
}
}
}
+
+ handleQueuedHealthChecks(mplexer);
}
}
g_policy.setState(leastOutstandingPol);
if(g_cmdLine.beClient || !g_cmdLine.command.empty()) {
- setupLua(true, g_cmdLine.config);
+ setupLua(true, false, g_cmdLine.config);
if (clientAddress != ComboAddress())
g_serverControl = clientAddress;
doClient(g_serverControl, g_cmdLine.command);
g_consoleACL.setState(consoleACL);
if (g_cmdLine.checkConfig) {
- setupLua(true, g_cmdLine.config);
+ setupLua(false, true, g_cmdLine.config);
// No exception was thrown
infolog("Configuration '%s' OK!", g_cmdLine.config);
_exit(EXIT_SUCCESS);
}
- auto todo=setupLua(false, g_cmdLine.config);
+ auto todo=setupLua(false, false, g_cmdLine.config);
auto localPools = g_pools.getCopy();
{
or as an unprivileged user with ambient
capabilities like CAP_NET_BIND_SERVICE.
*/
- dropCapabilities();
+ dropCapabilities(g_capabilitiesToRetain);
}
catch(const std::exception& e) {
warnlog("%s", e.what());
checkFileDescriptorsLimits(udpBindsCount, tcpBindsCount);
+ auto mplexer = std::shared_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal
- if(dss->availability==DownstreamState::Availability::Auto) {
- bool newState=upCheck(dss);
- warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
- dss->upStatus = newState;
+ if (dss->availability == DownstreamState::Availability::Auto) {
+ if (!queueHealthCheck(mplexer, dss, true)) {
+ dss->upStatus = false;
+ warnlog("Marking downstream %s as 'down'", dss->getNameWithAddr());
+ }
}
}
+ handleQueuedHealthChecks(mplexer, true);
for(auto& cs : g_frontends) {
if (cs->dohFrontend != nullptr) {