]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add support for sharding the packet cache
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 11 Jul 2017 14:30:58 +0000 (16:30 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 5 Sep 2017 14:44:54 +0000 (16:44 +0200)
Also make the cache insert lock optionally deferrable.

m4/pdns_check_network_libs.m4
pdns/dnsdist-cache.cc
pdns/dnsdist-cache.hh
pdns/dnsdist-lua2.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/gettime.cc
pdns/iputils.cc
pdns/iputils.hh
pdns/misc.hh

index 953d58eaaaa27260ab6760b6ae4e233d3134cc89..19e8d5899a4a6d9bfe9598caaaf30c82242c9e92 100644 (file)
@@ -4,4 +4,3 @@ AC_DEFUN([PDNS_CHECK_NETWORK_LIBS],[
   AC_SEARCH_LIBS([socket], [socket])
   AC_SEARCH_LIBS([gethostent], [nsl])
 ])
-
index 305dcc677e663561b52d21647c526f9d254e47f3..da8d15052fc6072d7fb12eddf18105a4314e318d 100644 (file)
 #include "dnsparser.hh"
 #include "dnsdist-cache.hh"
 
-DNSDistPacketCache::DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL, uint32_t minTTL, uint32_t tempFailureTTL, uint32_t staleTTL, bool dontAge): d_maxEntries(maxEntries), d_maxTTL(maxTTL), d_tempFailureTTL(tempFailureTTL), d_minTTL(minTTL), d_staleTTL(staleTTL), d_dontAge(dontAge)
+DNSDistPacketCache::DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL, uint32_t minTTL, uint32_t tempFailureTTL, uint32_t staleTTL, bool dontAge, uint32_t shards, bool deferrableInsertLock): d_maxEntries(maxEntries), d_shardCount(shards), d_maxTTL(maxTTL), d_tempFailureTTL(tempFailureTTL), d_minTTL(minTTL), d_staleTTL(staleTTL), d_dontAge(dontAge), d_deferrableInsertLock(deferrableInsertLock)
 {
-  pthread_rwlock_init(&d_lock, 0);
+  d_shards.resize(d_shardCount);
+
   /* we reserve maxEntries + 1 to avoid rehashing from occurring
      when we get to maxEntries, as it means a load factor of 1 */
-  d_map.reserve(maxEntries + 1);
+  for (auto& shard : d_shards) {
+    shard.setSize((maxEntries / d_shardCount) + 1);
+  }
 }
 
 DNSDistPacketCache::~DNSDistPacketCache()
 {
   try {
-    WriteLock l(&d_lock);
+    vector<std::unique_ptr<WriteLock>> locks;
+    for (uint32_t shardIndex = 0; shardIndex < d_shardCount; shardIndex++) {
+      locks.push_back(std::unique_ptr<WriteLock>(new WriteLock(&d_shards.at(shardIndex).d_lock)));
+    }
   }
-  catch(const PDNSException& pe) {
+  catch(...) {
   }
 }
 
@@ -48,6 +54,41 @@ bool DNSDistPacketCache::cachedValueMatches(const CacheValue& cachedValue, const
   return true;
 }
 
+void DNSDistPacketCache::insertLocked(CacheShard& shard, uint32_t key, const DNSName& qname, uint16_t qtype, uint16_t qclass, bool tcp, CacheValue& newValue, time_t now, time_t newValidity)
+{
+  auto& map = shard.d_map;
+  /* check again now that we hold the lock to prevent a race */
+  if (map.size() >= (d_maxEntries / d_shardCount)) {
+    return;
+  }
+
+  std::unordered_map<uint32_t,CacheValue>::iterator it;
+  bool result;
+  tie(it, result) = map.insert({key, newValue});
+
+  if (result) {
+    shard.d_entriesCount++;
+    return;
+  }
+
+  /* in case of collision, don't override the existing entry
+     except if it has expired */
+  CacheValue& value = it->second;
+  bool wasExpired = value.validity <= now;
+
+  if (!wasExpired && !cachedValueMatches(value, qname, qtype, qclass, tcp)) {
+    d_insertCollisions++;
+    return;
+  }
+
+  /* if the existing entry had a longer TTD, keep it */
+  if (newValidity <= value.validity) {
+    return;
+  }
+
+  value = newValue;
+}
+
 void DNSDistPacketCache::insert(uint32_t key, const DNSName& qname, uint16_t qtype, uint16_t qclass, const char* response, uint16_t responseLen, bool tcp, uint8_t rcode)
 {
   if (responseLen < sizeof(dnsheader))
@@ -79,20 +120,13 @@ void DNSDistPacketCache::insert(uint32_t key, const DNSName& qname, uint16_t qty
     }
   }
 
-  {
-    TryReadLock r(&d_lock);
-    if (!r.gotIt()) {
-      d_deferredInserts++;
-      return;
-    }
-    if (d_map.size() >= d_maxEntries) {
-      return;
-    }
+  uint32_t shardIndex = getShardIndex(key);
+
+  if (d_shards.at(shardIndex).d_entriesCount >= (d_maxEntries / d_shardCount)) {
+    return;
   }
 
   const time_t now = time(NULL);
-  std::unordered_map<uint32_t,CacheValue>::iterator it;
-  bool result;
   time_t newValidity = now + minTTL;
   CacheValue newValue;
   newValue.qname = qname;
@@ -104,36 +138,21 @@ void DNSDistPacketCache::insert(uint32_t key, const DNSName& qname, uint16_t qty
   newValue.tcp = tcp;
   newValue.value = std::string(response, responseLen);
 
-  {
-    TryWriteLock w(&d_lock);
+  auto& shard = d_shards.at(shardIndex);
+
+  if (d_deferrableInsertLock) {
+    TryWriteLock w(&shard.d_lock);
 
     if (!w.gotIt()) {
       d_deferredInserts++;
       return;
     }
+    insertLocked(shard, key, qname, qtype, qclass, tcp, newValue, now, newValidity)    ;
+  }
+  else {
+    WriteLock w(&shard.d_lock);
 
-    tie(it, result) = d_map.insert({key, newValue});
-
-    if (result) {
-      return;
-    }
-
-    /* in case of collision, don't override the existing entry
-       except if it has expired */
-    CacheValue& value = it->second;
-    bool wasExpired = value.validity <= now;
-
-    if (!wasExpired && !cachedValueMatches(value, qname, qtype, qclass, tcp)) {
-      d_insertCollisions++;
-      return;
-    }
-
-    /* if the existing entry had a longer TTD, keep it */
-    if (newValidity <= value.validity) {
-      return;
-    }
-
-    value = newValue;
+    insertLocked(shard, key, qname, qtype, qclass, tcp, newValue, now, newValidity)    ;
   }
 }
 
@@ -143,18 +162,21 @@ bool DNSDistPacketCache::get(const DNSQuestion& dq, uint16_t consumed, uint16_t
   if (keyOut)
     *keyOut = key;
 
+  uint32_t shardIndex = getShardIndex(key);
   time_t now = time(NULL);
   time_t age;
   bool stale = false;
+  auto& shard = d_shards.at(shardIndex);
+  auto& map = shard.d_map;
   {
-    TryReadLock r(&d_lock);
+    TryReadLock r(&shard.d_lock);
     if (!r.gotIt()) {
       d_deferredLookups++;
       return false;
     }
 
-    std::unordered_map<uint32_t,CacheValue>::const_iterator it = d_map.find(key);
-    if (it == d_map.end()) {
+    std::unordered_map<uint32_t,CacheValue>::const_iterator it = map.find(key);
+    if (it == map.end()) {
       d_misses++;
       return false;
     }
@@ -223,60 +245,104 @@ bool DNSDistPacketCache::get(const DNSQuestion& dq, uint16_t consumed, uint16_t
 void DNSDistPacketCache::purgeExpired(size_t upTo)
 {
   time_t now = time(NULL);
-  WriteLock w(&d_lock);
-  if (upTo >= d_map.size()) {
+  uint64_t size = getSize();
+
+  if (upTo >= size) {
     return;
   }
 
-  size_t toRemove = d_map.size() - upTo;
-  for(auto it = d_map.begin(); toRemove > 0 && it != d_map.end(); ) {
-    const CacheValue& value = it->second;
+  size_t toRemove = size - upTo;
 
-    if (value.validity < now) {
-        it = d_map.erase(it);
+  size_t scannedMaps = 0;
+
+  do {
+    uint32_t shardIndex = (d_expungeIndex++ % d_shardCount);
+    WriteLock w(&d_shards.at(shardIndex).d_lock);
+    auto& map = d_shards[shardIndex].d_map;
+
+    for(auto it = map.begin(); toRemove > 0 && it != map.end(); ) {
+      const CacheValue& value = it->second;
+
+      if (value.validity < now) {
+        it = map.erase(it);
         --toRemove;
-    } else {
-      ++it;
+        d_shards[shardIndex].d_entriesCount--;
+      } else {
+        ++it;
+      }
     }
+
+    scannedMaps++;
   }
+  while (toRemove > 0 && scannedMaps < d_shardCount);
 }
 
 /* Remove all entries, keeping only upTo
    entries in the cache */
 void DNSDistPacketCache::expunge(size_t upTo)
 {
-  WriteLock w(&d_lock);
+  const uint64_t size = getSize();
 
-  if (upTo >= d_map.size()) {
+  if (upTo >= size) {
     return;
   }
 
-  size_t toRemove = d_map.size() - upTo;
-  auto beginIt = d_map.begin();
-  auto endIt = beginIt;
-  std::advance(endIt, toRemove);
-  d_map.erase(beginIt, endIt);
+  size_t toRemove = size - upTo;
+  size_t removed = 0;
+
+  for (uint32_t shardIndex = 0; shardIndex < d_shardCount; shardIndex++) {
+    WriteLock w(&d_shards.at(shardIndex).d_lock);
+    auto& map = d_shards[shardIndex].d_map;
+    auto beginIt = map.begin();
+    auto endIt = beginIt;
+    size_t removeFromThisShard = (toRemove - removed) / (d_shardCount - shardIndex);
+    if (map.size() >= removeFromThisShard) {
+      std::advance(endIt, removeFromThisShard);
+      map.erase(beginIt, endIt);
+      d_shards[shardIndex].d_entriesCount -= removeFromThisShard;
+      removed += removeFromThisShard;
+    }
+    else {
+      removed += map.size();
+      map.clear();
+      d_shards[shardIndex].d_entriesCount = 0;
+    }
+  }
 }
 
 void DNSDistPacketCache::expungeByName(const DNSName& name, uint16_t qtype, bool suffixMatch)
 {
-  WriteLock w(&d_lock);
-
-  for(auto it = d_map.begin(); it != d_map.end(); ) {
-    const CacheValue& value = it->second;
-
-    if ((value.qname == name || (suffixMatch && value.qname.isPartOf(name))) && (qtype == QType::ANY || qtype == value.qtype)) {
-      it = d_map.erase(it);
-    } else {
-      ++it;
+  for (uint32_t shardIndex = 0; shardIndex < d_shardCount; shardIndex++) {
+    WriteLock w(&d_shards.at(shardIndex).d_lock);
+    auto& map = d_shards[shardIndex].d_map;
+
+    for(auto it = map.begin(); it != map.end(); ) {
+      const CacheValue& value = it->second;
+
+      if ((value.qname == name || (suffixMatch && value.qname.isPartOf(name))) && (qtype == QType::ANY || qtype == value.qtype)) {
+        it = map.erase(it);
+        d_shards[shardIndex].d_entriesCount--;
+      } else {
+        ++it;
+      }
     }
   }
 }
 
 bool DNSDistPacketCache::isFull()
 {
-    ReadLock r(&d_lock);
-    return (d_map.size() >= d_maxEntries);
+    return (getSize() >= d_maxEntries);
+}
+
+uint64_t DNSDistPacketCache::getSize()
+{
+  uint64_t count = 0;
+
+  for (uint32_t shardIndex = 0; shardIndex < d_shardCount; shardIndex++) {
+    count += d_shards.at(shardIndex).d_entriesCount;
+  }
+
+  return count;
 }
 
 uint32_t DNSDistPacketCache::getMinTTL(const char* packet, uint16_t length)
@@ -303,14 +369,17 @@ uint32_t DNSDistPacketCache::getKey(const DNSName& qname, uint16_t consumed, con
   return result;
 }
 
+uint32_t DNSDistPacketCache::getShardIndex(uint32_t key) const
+{
+  return key % d_shardCount;
+}
+
 string DNSDistPacketCache::toString()
 {
-  ReadLock r(&d_lock);
-  return std::to_string(d_map.size()) + "/" + std::to_string(d_maxEntries);
+  return std::to_string(getSize()) + "/" + std::to_string(d_maxEntries);
 }
 
 uint64_t DNSDistPacketCache::getEntriesCount()
 {
-  ReadLock r(&d_lock);
-  return d_map.size();
+  return getSize();
 }
index 4fcd2dabbcd5195d13c336770e4f4a1af7ebad60..923f6e006f2514e7332bc20e61655118ae15eebc 100644 (file)
@@ -30,7 +30,7 @@ struct DNSQuestion;
 class DNSDistPacketCache : boost::noncopyable
 {
 public:
-  DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL=86400, uint32_t minTTL=0, uint32_t tempFailureTTL=60, uint32_t staleTTL=60, bool dontAge=false);
+  DNSDistPacketCache(size_t maxEntries, uint32_t maxTTL=86400, uint32_t minTTL=0, uint32_t tempFailureTTL=60, uint32_t staleTTL=60, bool dontAge=false, uint32_t shards=1, bool deferrableInsertLock=true);
   ~DNSDistPacketCache();
 
   void insert(uint32_t key, const DNSName& qname, uint16_t qtype, uint16_t qclass, const char* response, uint16_t responseLen, bool tcp, uint8_t rcode);
@@ -40,7 +40,7 @@ public:
   void expungeByName(const DNSName& name, uint16_t qtype=QType::ANY, bool suffixMatch=false);
   bool isFull();
   string toString();
-  uint64_t getSize() const { return d_map.size(); }
+  uint64_t getSize();
   uint64_t getHits() const { return d_hits; }
   uint64_t getMisses() const { return d_misses; }
   uint64_t getDeferredLookups() const { return d_deferredLookups; }
@@ -68,11 +68,35 @@ private:
     bool tcp{false};
   };
 
+  class CacheShard
+  {
+  public:
+    CacheShard(): d_entriesCount(0)
+    {
+      pthread_rwlock_init(&d_lock, 0);
+    }
+    CacheShard(const CacheShard& old): d_entriesCount(0)
+    {
+      pthread_rwlock_init(&d_lock, 0);
+    }
+
+    void setSize(size_t maxSize)
+    {
+      d_map.reserve(maxSize);
+    }
+
+    std::unordered_map<uint32_t,CacheValue> d_map;
+    pthread_rwlock_t d_lock;
+    std::atomic<uint64_t> d_entriesCount;
+  };
+
   static uint32_t getKey(const DNSName& qname, uint16_t consumed, const unsigned char* packet, uint16_t packetLen, bool tcp);
   static bool cachedValueMatches(const CacheValue& cachedValue, const DNSName& qname, uint16_t qtype, uint16_t qclass, bool tcp);
+  uint32_t getShardIndex(uint32_t key) const;
+  void insertLocked(CacheShard& shard, uint32_t key, const DNSName& qname, uint16_t qtype, uint16_t qclass, bool tcp, CacheValue& newValue, time_t now, time_t newValidity);
+
+  std::vector<CacheShard> d_shards;
 
-  pthread_rwlock_t d_lock;
-  std::unordered_map<uint32_t,CacheValue> d_map;
   std::atomic<uint64_t> d_deferredLookups{0};
   std::atomic<uint64_t> d_deferredInserts{0};
   std::atomic<uint64_t> d_hits{0};
@@ -80,10 +104,14 @@ private:
   std::atomic<uint64_t> d_insertCollisions{0};
   std::atomic<uint64_t> d_lookupCollisions{0};
   std::atomic<uint64_t> d_ttlTooShorts{0};
+
   size_t d_maxEntries;
+  uint32_t d_expungeIndex{0};
+  uint32_t d_shardCount;
   uint32_t d_maxTTL;
   uint32_t d_tempFailureTTL;
   uint32_t d_minTTL;
   uint32_t d_staleTTL;
   bool d_dontAge;
+  bool d_deferrableInsertLock;
 };
index 2d27ca6365a5218358241a8319413b36c712809b..26eeaebbaf7af0aae43ef8a6483a81283e7fbc4a 100644 (file)
@@ -762,8 +762,8 @@ void moreLua(bool client)
         }
     });
 
-    g_lua.writeFunction("newPacketCache", [client](size_t maxEntries, boost::optional<uint32_t> maxTTL, boost::optional<uint32_t> minTTL, boost::optional<uint32_t> tempFailTTL, boost::optional<uint32_t> staleTTL, boost::optional<bool> dontAge) {
-        return std::make_shared<DNSDistPacketCache>(maxEntries, maxTTL ? *maxTTL : 86400, minTTL ? *minTTL : 0, tempFailTTL ? *tempFailTTL : 60, staleTTL ? *staleTTL : 60, dontAge ? *dontAge : false);
+    g_lua.writeFunction("newPacketCache", [client](size_t maxEntries, boost::optional<uint32_t> maxTTL, boost::optional<uint32_t> minTTL, boost::optional<uint32_t> tempFailTTL, boost::optional<uint32_t> staleTTL, boost::optional<bool> dontAge, boost::optional<size_t> numberOfShards, boost::optional<bool> deferrableInsertLock) {
+        return std::make_shared<DNSDistPacketCache>(maxEntries, maxTTL ? *maxTTL : 86400, minTTL ? *minTTL : 0, tempFailTTL ? *tempFailTTL : 60, staleTTL ? *staleTTL : 60, dontAge ? *dontAge : false, numberOfShards ? *numberOfShards : 1, deferrableInsertLock ? *deferrableInsertLock : true);
       });
     g_lua.registerFunction("toString", &DNSDistPacketCache::toString);
     g_lua.registerFunction("isFull", &DNSDistPacketCache::isFull);
index 725d07b2eaaecc192a204c4c1f36b37a319ea4e1..fa3293df1782d316758c3da6cbe58ad004190975 100644 (file)
@@ -1077,11 +1077,9 @@ static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd,
 static void* udpClientThread(ClientState* cs)
 try
 {
-  ComboAddress remote;
-  remote.sin4.sin_family = cs->local.sin4.sin_family;
-  char packet[1500];
   string largerQuery;
   uint16_t qtype, qclass;
+  uint16_t queryId;
 #ifdef HAVE_PROTOBUF
   boost::uuids::random_generator uuidGenerator;
 #endif
@@ -1094,21 +1092,52 @@ try
   auto localDynNMGBlock = g_dynblockNMG.getLocal();
   auto localDynSMTBlock = g_dynblockSMT.getLocal();
   auto localPools = g_pools.getLocal();
-  struct msghdr msgh;
-  struct iovec iov;
-  uint16_t queryId = 0;
-  /* used by HarvestDestinationAddress */
-  char cbuf[256];
-  remote.sin6.sin6_family=cs->local.sin6.sin6_family;
-  fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), packet, sizeof(packet), &remote);
+
+  static const size_t vectSize = 50;
+  struct
+  {
+    char packet[4096];
+    /* used by HarvestDestinationAddress */
+    char cbuf[256];
+    ComboAddress remote;
+    ComboAddress dest;
+    struct iovec iov;
+  }
+  data[vectSize];
+  struct mmsghdr msgVec[vectSize];
+  struct mmsghdr outMsgVec[vectSize];
+
+  for (size_t idx = 0; idx < vectSize; idx++) {
+    data[idx].remote.sin4.sin_family = cs->local.sin4.sin_family;
+
+    fillMSGHdr(&msgVec[idx].msg_hdr, &data[idx].iov, data[idx].cbuf, sizeof(data[idx].cbuf), data[idx].packet, sizeof(data[idx].packet), &data[idx].remote);
+  }
 
   for(;;) {
-    try {
 #ifdef HAVE_DNSCRYPT
       std::shared_ptr<DnsCryptQuery> dnsCryptQuery = 0;
 #endif
-      char* query = packet;
-      ssize_t ret = recvmsg(cs->udpFD, &msgh, 0);
+      for (size_t idx = 0; idx < vectSize; idx++) {
+        data[idx].iov.iov_base = data[idx].packet;
+        data[idx].iov.iov_len = sizeof(data[idx].packet);
+      }
+      int msgsGot = recvmmsg(cs->udpFD, msgVec, vectSize, MSG_WAITFORONE | MSG_TRUNC, nullptr);
+
+      if (msgsGot <= 0) {
+        vinfolog("recvmmsg() failed with: %s", strerror(errno));
+        continue;
+      }
+      //vinfolog("Got %d messages", msgsGot);
+      unsigned int msgsToSend = 0;
+
+    for (int msgIdx = 0; msgIdx < msgsGot; msgIdx++) {
+      const struct msghdr* msgh = &msgVec[msgIdx].msg_hdr;
+      unsigned int ret = msgVec[msgIdx].msg_len;
+      const ComboAddress& remote = data[msgIdx].remote;
+
+    try {
+      char* query = data[msgIdx].packet;
+
       queryId = 0;
 
       if(!acl->match(remote)) {
@@ -1125,7 +1154,7 @@ try
        continue;
       }
 
-      if (msgh.msg_flags & MSG_TRUNC) {
+      if (msgh->msg_flags & MSG_TRUNC) {
         /* message was too large for our buffer */
         vinfolog("Dropping message too large for our buffer");
         g_stats.nonCompliantQueries++;
@@ -1133,13 +1162,13 @@ try
       }
 
       uint16_t len = (uint16_t) ret;
-      ComboAddress dest;
-      if (HarvestDestinationAddress(&msgh, &dest)) {
+
+      if (HarvestDestinationAddress(msgh, &data[msgIdx].dest)) {
         /* we don't get the port, only the address */
-        dest.sin4.sin_port = cs->local.sin4.sin_port;
+        data[msgIdx].dest.sin4.sin_port = cs->local.sin4.sin_port;
       }
       else {
-        dest.sin4.sin_family = 0;
+        data[msgIdx].dest.sin4.sin_family = 0;
       }
 
 #ifdef HAVE_DNSCRYPT
@@ -1152,7 +1181,7 @@ try
 
         if (!decrypted) {
           if (response.size() > 0) {
-            sendUDPResponse(cs->udpFD, reinterpret_cast<char*>(response.data()), (uint16_t) response.size(), 0, dest, remote);
+            sendUDPResponse(cs->udpFD, reinterpret_cast<char*>(response.data()), (uint16_t) response.size(), 0, data[msgIdx].dest, remote);
           }
           continue;
         }
@@ -1181,7 +1210,7 @@ try
       const uint16_t origFlags = *flags;
       unsigned int consumed = 0;
       DNSName qname(query, len, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
-      DNSQuestion dq(&qname, qtype, qclass, dest.sin4.sin_family != 0 ? &dest : &cs->local, &remote, dh, sizeof(packet), len, false);
+      DNSQuestion dq(&qname, qtype, qclass, data[msgIdx].dest.sin4.sin_family != 0 ? &data[msgIdx].dest : &cs->local, &remote, dh, sizeof(data[msgIdx].packet), len, false);
 #ifdef HAVE_PROTOBUF
       dq.uniqueId = uuidGenerator();
 #endif
@@ -1214,7 +1243,18 @@ try
             continue;
           }
 #endif
-          sendUDPResponse(cs->udpFD, response, responseLen, delayMsec, dest, remote);
+          outMsgVec[msgsToSend].msg_len = 0;
+          fillMSGHdr(&outMsgVec[msgsToSend].msg_hdr, &data[msgIdx].iov, nullptr, 0, response, responseLen, &data[msgIdx].remote);
+
+          if (data[msgIdx].dest.sin4.sin_family == 0) {
+            outMsgVec[msgsToSend].msg_hdr.msg_control = nullptr;
+          }
+          else {
+            addCMsgSrcAddr(&outMsgVec[msgsToSend].msg_hdr, &data[msgIdx].cbuf, &data[msgIdx].dest, 0);
+          }
+
+          msgsToSend++;
+          //sendUDPResponse(cs->udpFD, response, responseLen, delayMsec, dest, remote);
         }
 
         continue;
@@ -1241,11 +1281,11 @@ try
 
       uint32_t cacheKey = 0;
       if (packetCache && !dq.skipCache) {
-        char cachedResponse[4096];
-        uint16_t cachedResponseSize = sizeof cachedResponse;
+//        char cachedResponse[4096];
+        uint16_t cachedResponseSize = dq.size;
         uint32_t allowExpired = ss ? 0 : g_staleCacheEntriesTTL;
-        if (packetCache->get(dq, consumed, dh->id, cachedResponse, &cachedResponseSize, &cacheKey, allowExpired)) {
-          DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, (dnsheader*) cachedResponse, sizeof cachedResponse, cachedResponseSize, false, &realTime);
+        if (packetCache->get(dq, consumed, dh->id, query, &cachedResponseSize, &cacheKey, allowExpired)) {
+          DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, (dnsheader*) query, dq.size, cachedResponseSize, false, &realTime);
 #ifdef HAVE_PROTOBUF
           dr.uniqueId = dq.uniqueId;
 #endif
@@ -1255,11 +1295,21 @@ try
 
           if (!cs->muted) {
 #ifdef HAVE_DNSCRYPT
-            if (!encryptResponse(cachedResponse, &cachedResponseSize, sizeof cachedResponse, false, dnsCryptQuery, nullptr, nullptr)) {
+            if (!encryptResponse(query, &cachedResponseSize, dq.size, false, dnsCryptQuery, nullptr, nullptr)) {
               continue;
             }
 #endif
-            sendUDPResponse(cs->udpFD, cachedResponse, cachedResponseSize, delayMsec, dest, remote);
+            //sendUDPResponse(cs->udpFD, cachedResponse, cachedResponseSize, delayMsec, dest, remote);
+            outMsgVec[msgsToSend].msg_len = 0;
+            fillMSGHdr(&outMsgVec[msgsToSend].msg_hdr, &data[msgIdx].iov, nullptr, 0, query, cachedResponseSize, &data[msgIdx].remote);
+            if (data[msgIdx].dest.sin4.sin_family == 0) {
+              outMsgVec[msgsToSend].msg_hdr.msg_control = nullptr;
+            }
+            else {
+              addCMsgSrcAddr(&outMsgVec[msgsToSend].msg_hdr, &data[msgIdx].cbuf, &data[msgIdx].dest, 0);
+            }
+
+            msgsToSend++;
           }
 
           g_stats.cacheHits++;
@@ -1286,9 +1336,19 @@ try
             continue;
           }
 #endif
-          sendUDPResponse(cs->udpFD, response, responseLen, 0, dest, remote);
+          outMsgVec[msgsToSend].msg_len = 0;
+          fillMSGHdr(&outMsgVec[msgsToSend].msg_hdr, &data[msgIdx].iov, nullptr, 0, response, responseLen, &data[msgIdx].remote);
+          if (data[msgIdx].dest.sin4.sin_family == 0) {
+            outMsgVec[msgsToSend].msg_hdr.msg_control = nullptr;
+          }
+          else {
+            addCMsgSrcAddr(&outMsgVec[msgsToSend].msg_hdr, &data[msgIdx].cbuf, &data[msgIdx].dest, 0);
+          }
+
+          msgsToSend++;
+//          sendUDPResponse(cs->udpFD, response, responseLen, 0, dest, remote);
         }
-        vinfolog("Dropped query for %s|%s from %s, no policy applied", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
+        vinfolog("%s query for %s|%s from %s, no policy applied", g_servFailOnNoPolicy ? "Dropped" : "ServFailed", dq.qname->toString(), QType(dq.qtype).getName(), remote.toStringWithPort());
         continue;
 
       }
@@ -1328,8 +1388,8 @@ try
          We need to keep track of which one it is since we may
          want to use the real but not the listening addr to reply.
       */
-      if (dest.sin4.sin_family != 0) {
-        ids->origDest = dest;
+      if (data[msgIdx].dest.sin4.sin_family != 0) {
+        ids->origDest = data[msgIdx].dest;
         ids->destHarvested = true;
       }
       else {
@@ -1360,18 +1420,26 @@ try
 
       vinfolog("Got query for %s|%s from %s, relayed to %s", ids->qname.toString(), QType(ids->qtype).getName(), remote.toStringWithPort(), ss->getName());
     }
-    catch(std::exception& e){
+    catch(const std::exception& e){
       vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
     }
+    }
+    if (msgsToSend > 0) {
+      int sent = sendmmsg(cs->udpFD, outMsgVec, msgsToSend, 0);
+      if (sent < 0 || static_cast<unsigned int>(sent) != msgsToSend) {
+        vinfolog("Error sending responses with sendmmsg (%d on %u): %s", sent, msgsToSend, strerror(errno));
+      }
+      //vinfolog("Sent %d responses", sent);
+    }
   }
   return 0;
 }
-catch(std::exception &e)
+catch(const std::exception &e)
 {
   errlog("UDP client thread died because of exception: %s", e.what());
   return 0;
 }
-catch(PDNSException &e)
+catch(const PDNSException &e)
 {
   errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
   return 0;
index d1c7891333b77a3690617a8eefbaf45b4348c832..01f9b7da20ea0b702a4a8c818168cab4bb8cc373 100644 (file)
@@ -150,7 +150,7 @@ struct DNSQuestion
 
 struct DNSResponse : DNSQuestion
 {
-  DNSResponse(const DNSName* name, uint16_t type, uint16_t class_, const ComboAddress* lc, const ComboAddress* rem, struct dnsheader* header, size_t bufferSize, uint16_t queryLen, bool isTcp, const struct timespec* queryTime_): DNSQuestion(name, type, class_, lc, rem, header, bufferSize, queryLen, isTcp), queryTime(queryTime_) { }
+  DNSResponse(const DNSName* name, uint16_t type, uint16_t class_, const ComboAddress* lc, const ComboAddress* rem, struct dnsheader* header, size_t bufferSize, uint16_t responseLen, bool isTcp, const struct timespec* queryTime_): DNSQuestion(name, type, class_, lc, rem, header, bufferSize, responseLen, isTcp), queryTime(queryTime_) { }
 
   const struct timespec* queryTime;
 };
index 09ed5e678163c1b5c74a146cbe319e9a10acbf85..b6d95a4ed29f865de4bc89c04ab3cdcbcc965fd5 100644 (file)
@@ -31,7 +31,7 @@
 
 int gettime(struct timespec *tp, bool needRealTime)
 {
-       return clock_gettime(needRealTime ? CLOCK_REALTIME : CLOCK_MONOTONIC_RAW, tp);
+       return clock_gettime(needRealTime ? CLOCK_REALTIME : CLOCK_MONOTONIC, tp);
 }
 
 #else
index 53308e5d6201a7a92d3db2dce4dc1784a20bd4d6..6549431d12c683a2485825116567eace148ad4b0 100644 (file)
@@ -143,11 +143,11 @@ bool HarvestTimestamp(struct msghdr* msgh, struct timeval* tv)
 #endif
   return false;
 }
-bool HarvestDestinationAddress(struct msghdr* msgh, ComboAddress* destination)
+bool HarvestDestinationAddress(const struct msghdr* msgh, ComboAddress* destination)
 {
   memset(destination, 0, sizeof(*destination));
-  struct cmsghdr *cmsg;
-  for (cmsg = CMSG_FIRSTHDR(msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(msgh,cmsg)) {
+  const struct cmsghdr* cmsg;
+  for (cmsg = CMSG_FIRSTHDR(msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(const_cast<struct msghdr*>(msgh), const_cast<struct cmsghdr*>(cmsg))) {
 #if defined(IP_PKTINFO)
      if ((cmsg->cmsg_level == IPPROTO_IP) && (cmsg->cmsg_type == IP_PKTINFO)) {
         struct in_pktinfo *i = (struct in_pktinfo *) CMSG_DATA(cmsg);
index cb2553c5009a397a28bfcf87d52d03433d85462f..11941befe7e541fdd32c150715be62a5dfadd36d 100644 (file)
@@ -927,7 +927,7 @@ int SSetsockopt(int sockfd, int level, int opname, int value);
   #define GEN_IP_PKTINFO IP_RECVDSTADDR 
 #endif
 bool IsAnyAddress(const ComboAddress& addr);
-bool HarvestDestinationAddress(struct msghdr* msgh, ComboAddress* destination);
+bool HarvestDestinationAddress(const struct msghdr* msgh, ComboAddress* destination);
 bool HarvestTimestamp(struct msghdr* msgh, struct timeval* tv);
 void fillMSGHdr(struct msghdr* msgh, struct iovec* iov, char* cbuf, size_t cbufsize, char* data, size_t datalen, ComboAddress* addr);
 ssize_t sendfromto(int sock, const char* data, size_t len, int flags, const ComboAddress& from, const ComboAddress& to);
index ba7964d0470e344b6489005b6852261f0f68bec3..f9db931a7cf0ae1260eb8d6f03f18b6ca8a3b737 100644 (file)
@@ -232,8 +232,9 @@ inline int DTime::udiffNoReset()
 inline const string toLower(const string &upper)
 {
   string reply(upper);
+  const size_t length = reply.length();
   char c;
-  for(unsigned int i = 0; i < reply.length(); i++) {
+  for(unsigned int i = 0; i < length; ++i) {
     c = dns_tolower(upper[i]);
     if( c != upper[i])
       reply[i] = c;