]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Use `sendSizeAndMsgWithTimeout()` for TCP, Fast Open toward backends 4985/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Sun, 5 Feb 2017 11:15:37 +0000 (12:15 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 6 Feb 2017 15:23:25 +0000 (16:23 +0100)
pdns/README-dnsdist.md
pdns/dnsdist-lua.cc
pdns/dnsdist-tcp.cc
pdns/dnsdist.hh
regression-tests.dnsdist/dnsdisttests.py

index c41f94542d114aeedaf0bf14352c4be90692c4f1..2db7f0f6cd33d06e7c9e467a37488e43ce72eee8 100644 (file)
@@ -1308,7 +1308,7 @@ Here are all functions:
     * `setVerboseHealthChecks(bool)`: set whether health check errors will be logged
  * Server related:
     * `newServer("ip:port")`: instantiate a new downstream server with default settings
-    * `newServer({address="ip:port", qps=1000, order=1, weight=10, pool="abuse", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName="a.root-servers.net.", checkType="A", setCD=false, maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source="address|interface name|address@interface"})`:
+    * `newServer({address="ip:port", qps=1000, order=1, weight=10, pool="abuse", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, tcpFastOpen=false, checkName="a.root-servers.net.", checkType="A", setCD=false, maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source="address|interface name|address@interface"})`:
 instantiate a server with additional parameters
     * `showServers()`: output all servers
     * `getServer(n)`: returns server with index n 
index b6687597d1b2c3c2bd0ac6f1fc3038074bedf490..9ecc47e5e477e29f3d6a7e4cca6a3a99d9e5697a 100644 (file)
@@ -412,6 +412,10 @@ vector<std::function<void(void)>> setupLua(bool client, const std::string& confi
                          ret->tcpRecvTimeout=std::stoi(boost::get<string>(vars["tcpRecvTimeout"]));
                        }
 
+                       if(vars.count("tcpFastOpen")) {
+                         ret->tcpFastOpen=boost::get<bool>(vars["tcpFastOpen"]);
+                       }
+
                        if(vars.count("name")) {
                          ret->name=boost::get<string>(vars["name"]);
                        }
index 4c4a8994cc50fe3c8e6e82c00423304d8cc3e44d..3d2bfef7a04d700793798e8f847f929f5722935e 100644 (file)
@@ -61,7 +61,9 @@ static int setupTCPDownstream(shared_ptr<DownstreamState> ds, uint16_t& downstre
         SBind(sock, ds->sourceAddr);
       }
       setNonBlocking(sock);
-      SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
+      if (!ds->tcpFastOpen) {
+        SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
+      }
       return sock;
     }
     catch(const std::runtime_error& e) {
@@ -175,38 +177,9 @@ catch(...) {
   return false;
 }
 
-static bool putNonBlockingMsgLen(int fd, uint16_t len, int timeout)
-try
-{
-  uint16_t raw = htons(len);
-  size_t ret = writen2WithTimeout(fd, &raw, sizeof raw, timeout);
-  return ret == sizeof raw;
-}
-catch(...) {
-  return false;
-}
-
-static bool sendNonBlockingMsgLen(int fd, uint16_t len, int timeout, ComboAddress& dest, ComboAddress& local, unsigned int localItf)
-try
-{
-  if (localItf == 0)
-    return putNonBlockingMsgLen(fd, len, timeout);
-
-  uint16_t raw = htons(len);
-  ssize_t ret = sendMsgWithTimeout(fd, (char*) &raw, sizeof raw, timeout, dest, local, localItf);
-  return ret == sizeof raw;
-}
-catch(...) {
-  return false;
-}
-
 static bool sendResponseToClient(int fd, const char* response, uint16_t responseLen)
 {
-  if (!putNonBlockingMsgLen(fd, responseLen, g_tcpSendTimeout))
-    return false;
-
-  writen2WithTimeout(fd, response, responseLen, g_tcpSendTimeout);
-  return true;
+  return sendSizeAndMsgWithTimeout(fd, responseLen, response, g_tcpSendTimeout, nullptr, nullptr, 0, 0, 0);
 }
 
 static bool maxConnectionDurationReached(unsigned int maxConnectionDuration, time_t start, unsigned int& remainingTime)
@@ -456,12 +429,15 @@ void* tcpClientThread(int pipefd)
 
        int dsock = -1;
        uint16_t downstreamFailures=0;
+       bool freshConn = true;
        if(sockets.count(ds->remote) == 0) {
          dsock=setupTCPDownstream(ds, downstreamFailures);
          sockets[ds->remote]=dsock;
        }
-       else
+       else {
          dsock=sockets[ds->remote];
+         freshConn = false;
+        }
 
         ds->queries++;
         ds->outstanding++;
@@ -481,24 +457,14 @@ void* tcpClientThread(int pipefd)
           break;
         }
 
-        if(!sendNonBlockingMsgLen(dsock, dq.len, ds->tcpSendTimeout, ds->remote, ds->sourceAddr, ds->sourceItf)) {
-         vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
-          close(dsock);
-          dsock=-1;
-          sockets.erase(ds->remote);
-          downstreamFailures++;
-          dsock=setupTCPDownstream(ds, downstreamFailures);
-          sockets[ds->remote]=dsock;
-          goto retry;
-        }
-
         try {
-          if (ds->sourceItf == 0) {
-            writen2WithTimeout(dsock, query, dq.len, ds->tcpSendTimeout);
-          }
-          else {
-            sendMsgWithTimeout(dsock, query, dq.len, ds->tcpSendTimeout, ds->remote, ds->sourceAddr, ds->sourceItf);
+          int socketFlags = 0;
+#ifdef MSG_FASTOPEN
+          if (ds->tcpFastOpen && freshConn) {
+            socketFlags |= MSG_FASTOPEN;
           }
+#endif /* MSG_FASTOPEN */
+          sendSizeAndMsgWithTimeout(dsock, dq.len, query, ds->tcpSendTimeout, &ds->remote, &ds->sourceAddr, ds->sourceItf, 0, socketFlags);
         }
         catch(const runtime_error& e) {
           vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
@@ -508,6 +474,7 @@ void* tcpClientThread(int pipefd)
           downstreamFailures++;
           dsock=setupTCPDownstream(ds, downstreamFailures);
           sockets[ds->remote]=dsock;
+          freshConn=true;
           goto retry;
         }
 
@@ -527,6 +494,7 @@ void* tcpClientThread(int pipefd)
           downstreamFailures++;
           dsock=setupTCPDownstream(ds, downstreamFailures);
           sockets[ds->remote]=dsock;
+          freshConn=true;
           if(xfrStarted) {
             goto drop;
           }
@@ -585,16 +553,22 @@ void* tcpClientThread(int pipefd)
           break;
         }
 
-        if (isXFR && dh->rcode == 0 && dh->ancount != 0) {
-          if (xfrStarted == false) {
-            xfrStarted = true;
-            if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 1) {
+        if (isXFR) {
+          if (dh->rcode == 0 && dh->ancount != 0) {
+            if (xfrStarted == false) {
+              xfrStarted = true;
+              if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 1) {
+                goto getpacket;
+              }
+            }
+            else if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 0) {
               goto getpacket;
             }
           }
-          else if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 0) {
-            goto getpacket;
-          }
+          /* Don't reuse the TCP connection after an {A,I}XFR */
+          close(dsock);
+          dsock=-1;
+          sockets.erase(ds->remote);
         }
 
         g_stats.responses++;
index db256d85cf2db32f2854043977856db53c25fbfc..74de1df9238eeb6c04dca24719301d64088c00d0 100644 (file)
@@ -484,6 +484,7 @@ struct DownstreamState
   bool useECS{false};
   bool setCD{false};
   std::atomic<bool> connected{false};
+  bool tcpFastOpen{false};
   bool isUp() const
   {
     if(availability == Availability::Down)
index 08f5467da430fc2a707923ac1989730f0e0ddb8a..048479d68f5f3eebe480ca6770572d903aa9a587 100644 (file)
@@ -200,8 +200,13 @@ class DNSDistTest(unittest.TestCase):
                 response = copy.copy(response)
                 response.id = request.id
                 wire = response.to_wire()
-                conn.send(struct.pack("!H", len(wire)))
-                conn.send(wire)
+                try:
+                    conn.send(struct.pack("!H", len(wire)))
+                    conn.send(wire)
+                except socket.error as e:
+                    # some of the tests are going to close
+                    # the connection on us, just deal with it
+                    break
 
             conn.close()