]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add an option to use several source ports toward a backend
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 1 Mar 2018 11:19:29 +0000 (11:19 +0000)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 6 Mar 2018 16:02:27 +0000 (17:02 +0100)
This is very useful if the backend is distributing queries based
only on (source IP, source port, destination IP, destination port),
which is for example the case of PowerDNS Recursor with several
threads, reuseport set and pdns-distribute-queries not set.

pdns/dnsdist-console.cc
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/docs/reference/config.rst

index 7d9d0209eedbb9c934338d71942db6f3a9d2d92a..d83b6c1e2191a675faa9460291ecf49fd0ee6759 100644 (file)
@@ -349,7 +349,7 @@ const std::vector<ConsoleKeyword> g_consoleKeywords{
   { "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" },
   { "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" },
   { "newRuleAction", true, "DNS rule, DNS action [, {uuid=\"UUID\"}]", "return a pair of DNS Rule and DNS Action, to be used with `setRules()`" },
-  { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\"}", "instantiate a server" },
+  { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\", sockets=1}", "instantiate a server" },
   { "newServerPolicy", true, "name, function", "create a policy object from a Lua function" },
   { "newSuffixMatchNode", true, "", "returns a new SuffixMatchNode" },
   { "NoRecurseAction", true, "", "strip RD bit from the question, let it go through" },
index d8a8c916f2e1e16637922fcba8aa2960452ef3f6..a73b6d1b1090b43700cec17e008ab4e2f1b9ca8b 100644 (file)
@@ -117,6 +117,7 @@ void setupLuaConfig(bool client)
                        }
                        ComboAddress sourceAddr;
                        unsigned int sourceItf = 0;
+                        size_t numberOfSockets = 1;
                         std::set<int> cpus;
                        if(auto addressStr = boost::get<string>(&pvars)) {
                          std::shared_ptr<DownstreamState> ret;
@@ -216,6 +217,10 @@ void setupLuaConfig(bool client)
                          }
                        }
 
+                       if(vars.count("sockets")) {
+                         numberOfSockets=std::stoi(boost::get<string>(vars["sockets"]));
+                       }
+
                        std::shared_ptr<DownstreamState> ret;
                        try {
                          ComboAddress address(boost::get<string>(vars["address"]), 53);
@@ -224,7 +229,7 @@ void setupLuaConfig(bool client)
                            errlog("Error creating new server: %s is not a valid address for a downstream server", boost::get<string>(vars["address"]));
                            return ret;
                          }
-                         ret=std::make_shared<DownstreamState>(address, sourceAddr, sourceItf);
+                         ret=std::make_shared<DownstreamState>(address, sourceAddr, sourceItf, numberOfSockets);
                        }
                        catch(const PDNSException& e) {
                          g_outputBuffer="Error creating new server: "+string(e.reason);
index e525ba9c054c47dc1a0d7ef8ed3fd7c31f506e28..5061e6336057c517ef2b9a0a36b220d5bbaf7c1d 100644 (file)
@@ -383,6 +383,34 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in
   return true;
 }
 
+
+static int pickBackendFD(DownstreamState* state)
+{
+  return state->fds[state->fdOffset++ % state->fds.size()];
+}
+
+static int selectBackendFD(const std::shared_ptr<DownstreamState>& state)
+{
+  if (state->fds.size() == 1) {
+    return state->fds[0];
+  }
+
+  std::set<int> fds;
+  for (auto fd : state->fds) {
+    if (fd >= 0) {
+      fds.insert(fd);
+    }
+  }
+
+  int selected = -1;
+  int res = waitForMultiData(fds, -1, -1, &selected);
+  if (res != 1) {
+    throw std::runtime_error("Error selecting a socket for a backend " + state->remote.toStringWithPort() + ": " + strerror(errno));
+  }
+
+  return selected;
+}
+
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
 void* responderThread(std::shared_ptr<DownstreamState> dss)
 try {
@@ -403,7 +431,8 @@ try {
     dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
     bool outstandingDecreased = false;
     try {
-      ssize_t got = recv(dss->fd, packet, sizeof(packet), 0);
+      int fd = selectBackendFD(dss);
+      ssize_t got = recv(fd, packet, sizeof(packet), 0);
       char * response = packet;
       size_t responseSize = sizeof(packet);
 
@@ -541,30 +570,37 @@ catch(...)
 void DownstreamState::reconnect()
 {
   connected = false;
-  if (fd != -1) {
-    /* shutdown() is needed to wake up recv() in the responderThread */
-    shutdown(fd, SHUT_RDWR);
-    close(fd);
-    fd = -1;
-  }
-  if (!IsAnyAddress(remote)) {
-    fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
-    if (!IsAnyAddress(sourceAddr)) {
-      SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
-      SBind(fd, sourceAddr);
-    }
-    try {
-      SConnect(fd, remote);
-      connected = true;
-    }
-    catch(const std::runtime_error& error) {
-      infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+  for (auto& fd : fds) {
+    if (fd != -1) {
+      /* shutdown() is needed to wake up recv() in the responderThread */
+      shutdown(fd, SHUT_RDWR);
+      close(fd);
+      fd = -1;
+    }
+    if (!IsAnyAddress(remote)) {
+      fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0);
+      if (!IsAnyAddress(sourceAddr)) {
+        SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1);
+        SBind(fd, sourceAddr);
+      }
+      try {
+        SConnect(fd, remote);
+        connected = true;
+      }
+      catch(const std::runtime_error& error) {
+        infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what());
+      }
     }
   }
 }
 
-DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
+DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
 {
+  fds.resize(numberOfSockets);
+  for (auto& fd : fds) {
+    fd = -1;
+  }
+
   if (!IsAnyAddress(remote)) {
     reconnect();
     idStates.resize(g_maxOutstanding);
@@ -1462,7 +1498,8 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 
     dh->id = idOffset;
 
-    ssize_t ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len);
+    int fd = pickBackendFD(ss);
+    ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len);
 
     if(ret < 0) {
       ss->sendErrors++;
@@ -1791,15 +1828,19 @@ void* healthChecksThread()
           warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
 
           if (newState && !dss->connected) {
-            try {
-              SConnect(dss->fd, dss->remote);
-              dss->connected = true;
-              dss->tid = thread(responderThread, dss);
+            for (auto& fd : dss->fds) {
+              try {
+                SConnect(fd, dss->remote);
+                dss->connected = true;
+              }
+              catch(const std::runtime_error& error) {
+                infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
+                newState = false;
+                dss->connected = false;
+              }
             }
-            catch(const std::runtime_error& error) {
-              infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what());
-              newState = false;
-              dss->connected = false;
+            if (dss->connected) {
+              dss->tid = thread(responderThread, dss);
             }
           }
 
index cfc92475ff9018312e73a59fcb8f41b6953adf24..131b280a95acc8119ce53bc5ed4f018cc84e0ea4 100644 (file)
@@ -523,15 +523,19 @@ extern std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
 
 struct DownstreamState
 {
-  DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf);
-  DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {}
+  DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, size_t numberOfSockets);
+  DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {}
   ~DownstreamState()
   {
-    if (fd >= 0)
-      close(fd);
+    for (auto& fd : fds) {
+      if (fd >= 0) {
+        close(fd);
+        fd = -1;
+      }
+    }
   }
 
-  int fd{-1};
+  std::vector<int> fds;
   std::thread tid;
   ComboAddress remote;
   QPSLimiter qps;
@@ -551,6 +555,7 @@ struct DownstreamState
     std::atomic<uint64_t> queries{0};
   } prev;
   string name;
+  size_t fdOffset{0};
   double queryLoad{0.0};
   double dropRate{0.0};
   double latencyUsec{0.0};
index a89d466346c8013b83865114dfb52828d23b6f81..b136c5ceaa9fb66c106f20d7c0ca518388fc8a03 100644 (file)
@@ -268,8 +268,9 @@ Servers
                              --   "address", e.g. "192.0.2.2"
                              --   "interface name", e.g. "eth0"
                              --   "address@interface", e.g. "192.0.2.2@eth0"
-      addXPF=NUM             -- Add the client's IP address and port to the query, along with the original destination address and port,
+      addXPF=NUM,            -- Add the client's IP address and port to the query, along with the original destination address and port,
                              -- using the experimental XPF record from `draft-bellis-dnsop-xpf <https://datatracker.ietf.org/doc/draft-bellis-dnsop-xpf/>`_ and the specified option code. Default is disabled (0)
+      sockets=NUM            -- Number of sockets (and thus source ports) used toward the backend server, defaults to a single one
     })
 
   :param str server_string: A simple IP:PORT string.