]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Refactor XSK support between dnsdist and the backends
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 19 Jan 2024 16:22:57 +0000 (17:22 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 23 Jan 2024 11:54:30 +0000 (12:54 +0100)
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-backend.cc
pdns/dnsdistdist/dnsdist-xsk.cc
pdns/dnsdistdist/dnsdist-xsk.hh

index 8d931a4c1fe3ebd4da9fa6e130284c85090aa4a6..e22a83cd68d61751796573355506ec0e49474de4 100644 (file)
@@ -310,7 +310,7 @@ static bool checkConfigurationTime(const std::string& name)
 // NOLINTNEXTLINE(readability-function-cognitive-complexity): this function declares Lua bindings, even with a good refactoring it will likely blow up the threshold
 static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
 {
-  typedef LuaAssociativeTable<boost::variant<bool, std::string, LuaArray<std::string>, std::shared_ptr<XskSocket>, DownstreamState::checkfunc_t>> newserver_t;
+  using newserver_t = LuaAssociativeTable<boost::variant<bool, std::string, LuaArray<std::string>, LuaArray<std::shared_ptr<XskSocket>>, DownstreamState::checkfunc_t>>;
   luaCtx.writeFunction("inClientStartup", [client]() {
     return client && !g_configurationDone;
   });
@@ -631,12 +631,16 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
                          // create but don't connect the socket in client or check-config modes
                          auto ret = std::make_shared<DownstreamState>(std::move(config), std::move(tlsCtx), !(client || configCheck));
 #ifdef HAVE_XSK
-                         std::shared_ptr<XskSocket> xskSocket;
-                         if (getOptionalValue<std::shared_ptr<XskSocket>>(vars, "xskSocket", xskSocket) > 0) {
+                         LuaArray<std::shared_ptr<XskSocket>> luaXskSockets;
+                         if (getOptionalValue<LuaArray<std::shared_ptr<XskSocket>>>(vars, "xskSockets", luaXskSockets) > 0 && !luaXskSockets.empty()) {
                            if (g_configurationDone) {
                              throw std::runtime_error("Adding a server with xsk at runtime is not supported");
                            }
-                           ret->registerXsk(xskSocket);
+                           std::vector<std::shared_ptr<XskSocket>> xskSockets;
+                           for (auto& socket : luaXskSockets) {
+                             xskSockets.push_back(socket.second);
+                           }
+                           ret->registerXsk(xskSockets);
                            std::string mac;
                            if (getOptionalValue<std::string>(vars, "MACAddr", mac) > 0) {
                              auto* addr = &ret->d_config.destMACAddr[0];
@@ -649,15 +653,15 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
                              }
                              memcpy(ret->d_config.destMACAddr.data(), mac.data(), ret->d_config.destMACAddr.size());
                            }
-                           infolog("Added downstream server %s via XSK in %s mode", ret->d_config.remote.toStringWithPort(), xskSocket->getXDPMode());
+                           infolog("Added downstream server %s via XSK in %s mode", ret->d_config.remote.toStringWithPort(), xskSockets.at(0)->getXDPMode());
                          }
                          else if (!(client || configCheck)) {
                            infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort());
                          }
 #else /* HAVE_XSK */
-                         if (!(client || configCheck)) {
-                           infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort());
-                         }
+      if (!(client || configCheck)) {
+        infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort());
+      }
 #endif /* HAVE_XSK */
                          if (autoUpgrade && ret->getProtocol() != dnsdist::Protocol::DoT && ret->getProtocol() != dnsdist::Protocol::DoH) {
                            dnsdist::ServiceDiscovery::addUpgradeableServer(ret, upgradeInterval, upgradePool, upgradeDoHKey, keepAfterUpgrade);
index 6f4bca6c6804d18cf6ea69813d7c25454bf6c11b..32521e7fbe2965cc99f6e03b504ca198bfe96450 100644 (file)
@@ -370,7 +370,7 @@ bool responseContentMatches(const PacketBuffer& response, const DNSName& qname,
   }
   catch (const std::exception& e) {
     if (remote && response.size() > 0 && static_cast<size_t>(response.size()) > sizeof(dnsheader)) {
-      vinfolog("Backend %s sent us a response with id %d that did not parse: %s", remote->d_config.remote.toStringWithPort(), ntohs(dh->id), e.what());
+      infolog("Backend %s sent us a response with id %d that did not parse: %s", remote->d_config.remote.toStringWithPort(), ntohs(dh->id), e.what());
     }
     ++dnsdist::metrics::g_stats.nonCompliantResponses;
     if (remote) {
@@ -881,7 +881,8 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
             continue;
           }
           xskPacket->setHeader(ids->xskPacketHeader);
-          xskPacket->setPayload(response);
+          if (!xskPacket->setPayload(response)) {
+         }
           xskPacket->updatePacket();
           xskInfo->pushToSendQueue(*xskPacket);
           xskInfo->notifyXskSocket();
@@ -1983,14 +1984,13 @@ bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
       return false;
     }
 
-#ifdef HAVE_XSK
-    if (!ss->xskInfo) {
+    if (ss->d_xskInfos.empty()) {
       assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true);
       return false;
     }
     else {
-      int fd = ss->xskInfo->workerWaker;
-      ids.backendFD = fd;
+      const auto& xskInfo = ss->pickWorkerForSending();
+      ids.backendFD = xskInfo->workerWaker;
       assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, false);
       auto sourceAddr = ss->pickSourceAddressForSending();
       packet.setAddr(sourceAddr, ss->d_config.sourceMACAddr, ss->d_config.remote, ss->d_config.destMACAddr);
@@ -1998,10 +1998,6 @@ bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
       packet.rewrite();
       return true;
     }
-#else /* HAVE_XSK */
-    assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true);
-    return false;
-#endif /* HAVE_XSK */
   }
   catch (const std::exception& e) {
     vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
@@ -2624,11 +2620,6 @@ static void setupLocalSocket(ClientState& clientState, const ComboAddress& addr,
     else if (clientState.dnscryptCtx != nullptr) {
       infolog("Listening on %s for DNSCrypt", addr.toStringWithPort());
     }
-#ifdef HAVE_XSK
-    else if (clientState.xskInfo != nullptr) {
-      infolog("Listening on %s (XSK-enabled)", addr.toStringWithPort());
-    }
-#endif
     else {
       infolog("Listening on %s", addr.toStringWithPort());
     }
@@ -2638,6 +2629,11 @@ static void setupLocalSocket(ClientState& clientState, const ComboAddress& addr,
     } else if (clientState.doh3Frontend != nullptr) {
       infolog("Listening on %s for DoH3", addr.toStringWithPort());
     }
+#ifdef HAVE_XSK
+    else if (clientState.xskInfo != nullptr) {
+      infolog("Listening on %s (XSK-enabled)", addr.toStringWithPort());
+    }
+#endif
   }
 }
 
index 32fdf9561f438cb3ff9a5a67f57b9dfafd6d453d..52019d56e28512d90156ed225dea754b98cd54f0 100644 (file)
@@ -825,8 +825,8 @@ public:
   StopWatch sw;
   QPSLimiter qps;
 #ifdef HAVE_XSK
-  std::shared_ptr<XskWorker> xskInfo{nullptr};
-  std::shared_ptr<XskSocket> d_xskSocket{nullptr};
+  std::vector<std::shared_ptr<XskWorker>> d_xskInfos;
+  std::vector<std::shared_ptr<XskSocket>> d_xskSockets;
 #endif
   std::atomic<uint64_t> idOffset{0};
   size_t socketsOffset{0};
@@ -993,8 +993,9 @@ public:
   std::optional<InternalQueryState> getState(uint16_t id);
 
 #ifdef HAVE_XSK
-  void registerXsk(std::shared_ptr<XskSocket>& xsk);
+  void registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks);
   [[nodiscard]] ComboAddress pickSourceAddressForSending();
+  [[nodiscard]] const std::shared_ptr<XskWorker>& pickWorkerForSending();
 #endif /* HAVE_XSK */
 
   dnsdist::Protocol getProtocol() const
index 0138c6e3987a1751fd9cfe6ef14ded9fb9c67a06..966b3ff9c20a46f83900760ea6ef1ea8d5d4e130 100644 (file)
@@ -55,7 +55,9 @@ void DownstreamState::addXSKDestination(int fd)
     addresses->push_back(local);
   }
   dnsdist::xsk::addDestinationAddress(local);
-  d_xskSocket->addWorkerRoute(xskInfo, local);
+  for (size_t idx = 0; idx < d_xskSockets.size(); idx++) {
+    d_xskSockets.at(idx)->addWorkerRoute(d_xskInfos.at(idx), local);
+  }
 }
 
 void DownstreamState::removeXSKDestination(int fd)
@@ -67,7 +69,9 @@ void DownstreamState::removeXSKDestination(int fd)
   }
 
   dnsdist::xsk::removeDestinationAddress(local);
-  d_xskSocket->removeWorkerRoute(local);
+  for (auto& xskSocket : d_xskSockets) {
+    xskSocket->removeWorkerRoute(local);
+  }
 }
 #endif /* HAVE_XSK */
 
@@ -85,7 +89,7 @@ bool DownstreamState::reconnect(bool initialAttempt)
 
   connected = false;
 #ifdef HAVE_XSK
-  if (xskInfo != nullptr) {
+  if (!d_xskInfos.empty()) {
     auto addresses = d_socketSourceAddresses.write_lock();
     addresses->clear();
   }
@@ -97,7 +101,7 @@ bool DownstreamState::reconnect(bool initialAttempt)
         (*mplexer.lock())->removeReadFD(fd);
       }
 #ifdef HAVE_XSK
-      if (xskInfo != nullptr) {
+      if (d_xskInfos.empty()) {
         removeXSKDestination(fd);
       }
 #endif /* HAVE_XSK */
@@ -132,7 +136,7 @@ bool DownstreamState::reconnect(bool initialAttempt)
         (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
       }
 #ifdef HAVE_XSK
-      if (xskInfo != nullptr) {
+      if (!d_xskInfos.empty()) {
         addXSKDestination(fd);
       }
 #endif /* HAVE_XSK */
@@ -150,7 +154,7 @@ bool DownstreamState::reconnect(bool initialAttempt)
   /* if at least one (re-)connection failed, close all sockets */
   if (!connected) {
 #ifdef HAVE_XSK
-    if (xskInfo != nullptr) {
+    if (!d_xskInfos.empty()) {
       auto addresses = d_socketSourceAddresses.write_lock();
       addresses->clear();
     }
@@ -158,7 +162,7 @@ bool DownstreamState::reconnect(bool initialAttempt)
     for (auto& fd : sockets) {
       if (fd != -1) {
 #ifdef HAVE_XSK
-        if (xskInfo != nullptr) {
+        if (!d_xskInfos.empty()) {
           removeXSKDestination(fd);
         }
 #endif /* HAVE_XSK */
@@ -329,8 +333,8 @@ void DownstreamState::start()
 {
   if (connected && !threadStarted.test_and_set()) {
 #ifdef HAVE_XSK
-    if (xskInfo != nullptr) {
-      auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this());
+    for (auto& xskInfo : d_xskInfos) {
+      auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this(), xskInfo);
       if (!d_config.d_cpus.empty()) {
         mapThreadToCPUList(xskResponderThread.native_handle(), d_config.d_cpus);
       }
@@ -881,12 +885,22 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
   return (*addresses)[idx % numberOfAddresses];
 }
 
-void DownstreamState::registerXsk(std::shared_ptr<XskSocket>& xsk)
+[[nodiscard]] const std::shared_ptr<XskWorker>& DownstreamState::pickWorkerForSending()
+{
+  auto numberOfWorkers = d_xskInfos.size();
+  if (numberOfWorkers == 0) {
+    throw std::runtime_error("No XSK worker available for sending XSK data to backend " + getNameWithAddr());
+  }
+  size_t idx = dnsdist::getRandomValue(numberOfWorkers);
+  return d_xskInfos[idx % numberOfWorkers];
+}
+
+void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
 {
-  d_xskSocket = xsk;
+  d_xskSockets = xsks;
 
   if (d_config.sourceAddr.sin4.sin_family == 0 || (IsAnyAddress(d_config.sourceAddr))) {
-    const auto& ifName = xsk->getInterfaceName();
+    const auto& ifName = xsks.at(0)->getInterfaceName();
     auto addresses = getListOfAddressesOfNetworkInterface(ifName);
     if (addresses.empty()) {
       throw std::runtime_error("Unable to get source address from interface " + ifName);
@@ -897,11 +911,15 @@ void DownstreamState::registerXsk(std::shared_ptr<XskSocket>& xsk)
     }
     d_config.sourceAddr = addresses.at(0);
   }
-  xskInfo = XskWorker::create();
-  xsk->addWorker(xskInfo);
+  d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
+
+  for (auto& xsk : d_xskSockets) {
+    auto xskInfo = XskWorker::create();
+    d_xskInfos.push_back(xskInfo);
+    xsk->addWorker(xskInfo);
+    xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
+  }
   reconnect(false);
-  d_config.sourceMACAddr = xsk->getSourceMACAddress();
-  xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
 }
 #endif /* HAVE_XSK */
 
index 81480a99a179d1d8a1e733da22fba9f4c31454c3..2996683c95de9989fd697c3698ca424c4e2556fa 100644 (file)
@@ -35,17 +35,12 @@ namespace dnsdist::xsk
 {
 std::vector<std::shared_ptr<XskSocket>> g_xsk;
 
-void XskResponderThread(std::shared_ptr<DownstreamState> dss)
+void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<XskWorker> xskInfo)
 {
-  if (dss->xskInfo == nullptr) {
-    throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
-  }
-
   try {
     setThreadName("dnsdist/XskResp");
     auto localRespRuleActions = g_respruleactions.getLocal();
     auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
-    auto xskInfo = dss->xskInfo;
     auto pollfds = getPollFdsForWorker(*xskInfo);
     const auto xskFd = xskInfo->workerWaker.getHandle();
     while (!dss->isStopped()) {
@@ -66,7 +61,8 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss)
           const auto queryId = dnsHeader->id;
           auto ids = dss->getState(queryId);
           if (ids) {
-            if (xskFd != ids->backendFD || !ids->isXSK()) {
+            if (!ids->isXSK()) {
+              // if (xskFd != ids->backendFD || !ids->isXSK()) {
               dss->restoreState(queryId, std::move(*ids));
               ids = std::nullopt;
             }
@@ -82,19 +78,19 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss)
           }
           if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
             xskInfo->markAsFree(packet);
-            vinfolog("XSK packet pushed to queue because processResponderPacket failed");
+            infolog("XSK packet pushed to queue because processResponderPacket failed");
             return;
           }
           if (response.size() > packet.getCapacity()) {
             /* fallback to sending the packet via normal socket */
             sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
-            vinfolog("XSK packet falling back because packet is too large");
+            infolog("XSK packet falling back because packet is too large");
             xskInfo->markAsFree(packet);
             return;
           }
           packet.setHeader(ids->xskPacketHeader);
           if (!packet.setPayload(response)) {
-            vinfolog("Unable to set XSK payload !");
+            infolog("Unable to set XSK payload !");
           }
           if (ids->delayMsec > 0) {
             packet.addDelay(ids->delayMsec);
index 6a862d75b4f7774bd1ea1ff1d0abcf62469a6fdc..f677b78604dc3d72691a5f5f5d636628cc97cb50 100644 (file)
@@ -32,7 +32,7 @@ class XskWorker;
 
 namespace dnsdist::xsk
 {
-void XskResponderThread(std::shared_ptr<DownstreamState> dss);
+void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<XskWorker> xskInfo);
 bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol);
 bool XskProcessQuery(ClientState& clientState, LocalHolders& holders, XskPacket& packet);
 void XskRouter(std::shared_ptr<XskSocket> xsk);