]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Refactor the XSK code into a proper namespace
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 28 Dec 2023 14:18:44 +0000 (15:18 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 23 Jan 2024 11:54:20 +0000 (12:54 +0100)
pdns/dnsdist.cc
pdns/xsk.cc
pdns/xsk.hh

index f3e84081436400d5935f2cfbbd431cd4e4b6d399..fb1c0e43797558d0b8e9b8d83e8763479b19a1c2 100644 (file)
@@ -776,27 +776,6 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re
   }
 }
 
-#ifdef HAVE_XSK
-static void XskHealthCheck(std::shared_ptr<DownstreamState>& dss, std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>>& map, bool initial = false)
-{
-  auto& xskInfo = dss->xskInfo;
-  std::shared_ptr<HealthCheckData> data;
-  auto packet = getHealthCheckPacket(dss, nullptr, data);
-  data->d_initial = initial;
-  setHealthCheckTime(dss, data);
-  auto xskPacket = xskInfo->getEmptyFrame();
-  if (!xskPacket) {
-    return;
-  }
-  xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
-  xskPacket->setPayload(packet);
-  xskPacket->rewrite();
-  xskInfo->pushToSendQueue(std::move(xskPacket));
-  const auto queryId = data->d_queryID;
-  map[queryId] = std::move(data);
-}
-#endif /* HAVE_XSK */
-
 static bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
 {
 
@@ -833,135 +812,267 @@ static bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, Packet
   return true;
 }
 
-// listens on a dedicated socket, lobs answers from downstream servers to original requestors
+#ifdef HAVE_XSK
+namespace dnsdist::xsk
+{
+static void doHealthCheck(std::shared_ptr<DownstreamState>& dss, std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>>& map, bool initial = false)
+{
+  auto& xskInfo = dss->xskInfo;
+  std::shared_ptr<HealthCheckData> data;
+  auto packet = getHealthCheckPacket(dss, nullptr, data);
+  data->d_initial = initial;
+  setHealthCheckTime(dss, data);
+  auto xskPacket = xskInfo->getEmptyFrame();
+  if (!xskPacket) {
+    return;
+  }
+  xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
+  xskPacket->setPayload(packet);
+  xskPacket->rewrite();
+  xskInfo->pushToSendQueue(std::move(xskPacket));
+  const auto queryId = data->d_queryID;
+  map[queryId] = std::move(data);
+}
+
 void responderThread(std::shared_ptr<DownstreamState> dss)
 {
+  if (dss->xskInfo == nullptr) {
+    throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
+  }
+
   try {
-  setThreadName("dnsdist/respond");
-  auto localRespRuleActions = g_respruleactions.getLocal();
-  auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
-#ifdef HAVE_XSK
-    if (dss->xskInfo) {
-      auto xskInfo = dss->xskInfo;
-      auto pollfds = getPollFdsForWorker(*xskInfo);
-      std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>> healthCheckMap;
-      XskHealthCheck(dss, healthCheckMap, true);
-      itimerspec tm;
-      tm.it_value.tv_sec = dss->d_config.checkTimeout / 1000;
-      tm.it_value.tv_nsec = (dss->d_config.checkTimeout % 1000) * 1000000;
-      tm.it_interval = tm.it_value;
-      auto res = timerfd_settime(pollfds[1].fd, 0, &tm, nullptr);
-      if (res) {
-        throw std::runtime_error("timerfd_settime failed:" + stringerror(errno));
-      }
-      const auto xskFd = xskInfo->workerWaker.getHandle();
-      while (!dss->isStopped()) {
-        poll(pollfds.data(), pollfds.size(), -1);
-        bool needNotify = false;
-        if (pollfds[0].revents & POLLIN) {
-          needNotify = true;
+    setThreadName("dnsdist/XskResp");
+    auto localRespRuleActions = g_respruleactions.getLocal();
+    auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
+    auto xskInfo = dss->xskInfo;
+    auto pollfds = getPollFdsForWorker(*xskInfo);
+    std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>> healthCheckMap;
+    dnsdist::xsk::doHealthCheck(dss, healthCheckMap, true);
+    itimerspec tm;
+    tm.it_value.tv_sec = dss->d_config.checkTimeout / 1000;
+    tm.it_value.tv_nsec = (dss->d_config.checkTimeout % 1000) * 1000000;
+    tm.it_interval = tm.it_value;
+    auto res = timerfd_settime(pollfds[1].fd, 0, &tm, nullptr);
+    if (res) {
+      throw std::runtime_error("timerfd_settime failed:" + stringerror(errno));
+    }
+    const auto xskFd = xskInfo->workerWaker.getHandle();
+    while (!dss->isStopped()) {
+      poll(pollfds.data(), pollfds.size(), -1);
+      bool needNotify = false;
+      if (pollfds[0].revents & POLLIN) {
+        needNotify = true;
 #if defined(__SANITIZE_THREAD__)
-          xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+        xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
 #else
-          xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+        xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
 #endif
-            auto packet = XskPacketPtr(packetRaw);
-            if (packet->getDataLen() < sizeof(dnsheader)) {
-              xskInfo->markAsFree(std::move(packet));
-              return;
-            }
-            const dnsheader_aligned dnsHeader(packet->getPayloadData());
-            const auto queryId = dnsHeader->id;
-            auto ids = dss->getState(queryId);
-            if (ids) {
-              if (xskFd != ids->backendFD || !ids->xskPacketHeader) {
-                dss->restoreState(queryId, std::move(*ids));
-                ids = std::nullopt;
-              }
-            }
-            if (!ids) {
-              // this has to go before we can refactor the duplicated response handling code
-              auto iter = healthCheckMap.find(queryId);
-              if (iter != healthCheckMap.end()) {
-                auto data = std::move(iter->second);
-                healthCheckMap.erase(iter);
-                packet->cloneIntoPacketBuffer(data->d_buffer);
-                data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
-              }
-              xskInfo->markAsFree(std::move(packet));
-              return;
-            }
-            auto response = packet->clonePacketBuffer();
-            if (response.size() > packet->getCapacity()) {
-              /* fallback to sending the packet via normal socket */
-              ids->xskPacketHeader.reset();
-            }
-            if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
-              xskInfo->markAsFree(std::move(packet));
-              vinfolog("XSK packet pushed to queue because processResponderPacket failed");
-              return;
-            }
-            vinfolog("XSK packet - processResponderPacket OK");
-            if (response.size() > packet->getCapacity()) {
-              /* fallback to sending the packet via normal socket */
-              sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
-              vinfolog("XSK packet falling back because packet is too large");
-              xskInfo->markAsFree(std::move(packet));
-              return;
-            }
-            //vinfolog("XSK packet - set header");
-            packet->setHeader(*ids->xskPacketHeader);
-            //vinfolog("XSK packet - set payload");
-            if (!packet->setPayload(response)) {
-              vinfolog("Unable to set payload !");
-            }
-            if (ids->delayMsec > 0) {
-              vinfolog("XSK packet - adding delay");
-              packet->addDelay(ids->delayMsec);
-            }
-            //vinfolog("XSK packet - update packet");
-            packet->updatePacket();
-            //vinfolog("XSK packet pushed to send queue");
-            xskInfo->pushToSendQueue(std::move(packet));
-          });
-          xskInfo->cleanSocketNotification();
-        }
-        if (pollfds[1].revents & POLLIN) {
-          timeval now;
-          gettimeofday(&now, nullptr);
-          for (auto i = healthCheckMap.begin(); i != healthCheckMap.end();) {
-            auto& ttd = i->second->d_ttd;
-            if (ttd < now) {
-              dss->submitHealthCheckResult(i->second->d_initial, false);
-              i = healthCheckMap.erase(i);
-            }
-            else {
-              ++i;
+          auto packet = XskPacketPtr(packetRaw);
+          if (packet->getDataLen() < sizeof(dnsheader)) {
+            xskInfo->markAsFree(std::move(packet));
+            return;
+          }
+          const dnsheader_aligned dnsHeader(packet->getPayloadData());
+          const auto queryId = dnsHeader->id;
+          auto ids = dss->getState(queryId);
+          if (ids) {
+            if (xskFd != ids->backendFD || !ids->xskPacketHeader) {
+              dss->restoreState(queryId, std::move(*ids));
+              ids = std::nullopt;
             }
           }
-          needNotify = true;
-          dss->updateStatisticsInfo();
-          dss->handleUDPTimeouts();
-          if (dss->d_nextCheck <= 1) {
-            dss->d_nextCheck = dss->d_config.checkInterval;
-            if (dss->d_config.availability == DownstreamState::Availability::Auto) {
-              XskHealthCheck(dss, healthCheckMap);
+          if (!ids) {
+            // this has to go before we can refactor the duplicated response handling code
+            auto iter = healthCheckMap.find(queryId);
+            if (iter != healthCheckMap.end()) {
+              auto data = std::move(iter->second);
+              healthCheckMap.erase(iter);
+              packet->cloneIntoPacketBuffer(data->d_buffer);
+              data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
             }
+            xskInfo->markAsFree(std::move(packet));
+            return;
+          }
+          auto response = packet->clonePacketBuffer();
+          if (response.size() > packet->getCapacity()) {
+            /* fallback to sending the packet via normal socket */
+            ids->xskPacketHeader.reset();
+          }
+          if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
+            xskInfo->markAsFree(std::move(packet));
+            vinfolog("XSK packet pushed to queue because processResponderPacket failed");
+            return;
+          }
+          vinfolog("XSK packet - processResponderPacket OK");
+          if (response.size() > packet->getCapacity()) {
+            /* fallback to sending the packet via normal socket */
+            sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
+            vinfolog("XSK packet falling back because packet is too large");
+            xskInfo->markAsFree(std::move(packet));
+            return;
+          }
+          //vinfolog("XSK packet - set header");
+          packet->setHeader(*ids->xskPacketHeader);
+          //vinfolog("XSK packet - set payload");
+          if (!packet->setPayload(response)) {
+            vinfolog("Unable to set payload !");
+          }
+          if (ids->delayMsec > 0) {
+            vinfolog("XSK packet - adding delay");
+            packet->addDelay(ids->delayMsec);
+          }
+          //vinfolog("XSK packet - update packet");
+          packet->updatePacket();
+          //vinfolog("XSK packet pushed to send queue");
+          xskInfo->pushToSendQueue(std::move(packet));
+        });
+        xskInfo->cleanSocketNotification();
+      }
+      if (pollfds[1].revents & POLLIN) {
+        timeval now;
+        gettimeofday(&now, nullptr);
+        for (auto i = healthCheckMap.begin(); i != healthCheckMap.end();) {
+          auto& ttd = i->second->d_ttd;
+          if (ttd < now) {
+            dss->submitHealthCheckResult(i->second->d_initial, false);
+            i = healthCheckMap.erase(i);
           }
           else {
-            --dss->d_nextCheck;
+            ++i;
+          }
+        }
+        needNotify = true;
+        dss->updateStatisticsInfo();
+        dss->handleUDPTimeouts();
+        if (dss->d_nextCheck <= 1) {
+          dss->d_nextCheck = dss->d_config.checkInterval;
+          if (dss->d_config.availability == DownstreamState::Availability::Auto) {
+            doHealthCheck(dss, healthCheckMap);
           }
+        }
+        else {
+          --dss->d_nextCheck;
+        }
+
+        uint64_t tmp;
+        res = read(pollfds[1].fd, &tmp, sizeof(tmp));
+      }
+      if (needNotify) {
+        xskInfo->notifyXskSocket();
+      }
+    }
+  }
+  catch (const std::exception& e) {
+    errlog("XSK responder thread died because of exception: %s", e.what());
+  }
+  catch (const PDNSException& e) {
+    errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
+  }
+  catch (...) {
+    errlog("XSK responder thread died because of an exception: %s", "unknown");
+  }
+}
+
+static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept
+{
+  const auto& from = packet.getFromAddr();
+  expectProxyProtocol = expectProxyProtocolFrom(from);
+  if (!holders.acl->match(from) && !expectProxyProtocol) {
+    vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
+    ++dnsdist::metrics::g_stats.aclDrops;
+    return false;
+  }
+  cs.queries++;
+  ++dnsdist::metrics::g_stats.queries;
 
-          uint64_t tmp;
-          res = read(pollfds[1].fd, &tmp, sizeof(tmp));
+  return true;
+}
+
+void XskRouter(std::shared_ptr<XskSocket> xsk)
+{
+  setThreadName("dnsdist/XskRouter");
+  uint32_t failed;
+  // packets to be submitted for sending
+  vector<XskPacketPtr> fillInTx;
+  const auto& fds = xsk->getDescriptors();
+  // list of workers that need to be notified
+  std::set<int> needNotify;
+  const auto& xskWakerIdx = xsk->getWorkers().get<0>();
+  const auto& destIdx = xsk->getWorkers().get<1>();
+  while (true) {
+    try {
+      auto ready = xsk->wait(-1);
+      // descriptor 0 gets incoming AF_XDP packets
+      if (fds.at(0).revents & POLLIN) {
+        auto packets = xsk->recv(64, &failed);
+        dnsdist::metrics::g_stats.nonCompliantQueries += failed;
+        for (auto &packet : packets) {
+          const auto dest = packet->getToAddr();
+          auto res = destIdx.find(dest);
+          if (res == destIdx.end()) {
+            xsk->markAsFree(std::move(packet));
+            continue;
+          }
+          res->worker->pushToProcessingQueue(std::move(packet));
+          needNotify.insert(res->workerWaker);
         }
-        if (needNotify) {
-          xskInfo->notifyXskSocket();
+        for (auto i : needNotify) {
+          uint64_t x = 1;
+          auto written = write(i, &x, sizeof(x));
+          if (written != sizeof(x)) {
+            // oh, well, the worker is clearly overloaded
+            // but there is nothing we can do about it,
+            // and hopefully the queue will be processed eventually
+          }
+        }
+        needNotify.clear();
+        ready--;
+      }
+      const auto backup = ready;
+      for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
+        if (fds.at(fdIndex).revents & POLLIN) {
+          ready--;
+          auto& info = xskWakerIdx.find(fds.at(fdIndex).fd)->worker;
+#if defined(__SANITIZE_THREAD__)
+          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+          info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
+            auto packet = XskPacketPtr(packetRaw);
+            if (!(packet->getFlags() & XskPacket::UPDATE)) {
+              xsk->markAsFree(std::move(packet));
+              return;
+            }
+            if (packet->getFlags() & XskPacket::DELAY) {
+              xsk->pushDelayed(std::move(packet));
+              return;
+            }
+            fillInTx.push_back(std::move(packet));
+          });
+          info->cleanWorkerNotification();
         }
       }
+      xsk->pickUpReadyPacket(fillInTx);
+      xsk->recycle(4096);
+      xsk->fillFq();
+      xsk->send(fillInTx);
+      ready = backup;
     }
-    else {
+    catch (...) {
+      vinfolog("Exception in XSK router loop");
+    }
+  }
+}
+}
 #endif /* HAVE_XSK */
+
+// listens on a dedicated socket, lobs answers from downstream servers to original requestors
+void responderThread(std::shared_ptr<DownstreamState> dss)
+{
+  try {
+  setThreadName("dnsdist/respond");
+  auto localRespRuleActions = g_respruleactions.getLocal();
+  auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
   const size_t initialBufferSize = getInitialUDPPacketBufferSize(false);
   /* allocate one more byte so we can detect truncation */
   PacketBuffer response(initialBufferSize + 1);
@@ -1045,9 +1156,6 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
       vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->d_config.remote.toStringWithPort(), queryId, e.what());
     }
   }
-#ifdef HAVE_XSK
-  }
-#endif /* HAVE_XSK */
 }
 catch (const std::exception& e) {
   errlog("UDP responder thread died because of exception: %s", e.what());
@@ -1467,23 +1575,6 @@ static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const s
   return true;
 }
 
-#ifdef HAVE_XSK
-static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept
-{
-  const auto& from = packet.getFromAddr();
-  expectProxyProtocol = expectProxyProtocolFrom(from);
-  if (!holders.acl->match(from) && !expectProxyProtocol) {
-    vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
-    ++dnsdist::metrics::g_stats.aclDrops;
-    return false;
-  }
-  cs.queries++;
-  ++dnsdist::metrics::g_stats.queries;
-
-  return true;
-}
-#endif /* HAVE_XSK */
-
 bool checkDNSCryptQuery(const ClientState& cs, PacketBuffer& query, std::unique_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
 {
   if (cs.dnscryptCtx) {
@@ -2054,6 +2145,8 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 }
 
 #ifdef HAVE_XSK
+namespace dnsdist::xsk
+{
 static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
 {
   uint16_t queryId = 0;
@@ -2175,6 +2268,39 @@ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
   }
   return false;
 }
+
+static void xskClientThread(ClientState* cs)
+{
+  setThreadName("dnsdist/xskClient");
+  auto xskInfo = cs->xskInfo;
+  LocalHolders holders;
+
+  for (;;) {
+#if defined(__SANITIZE_THREAD__)
+    while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
+#else
+    while (!xskInfo->incomingPacketsQueue.read_available()) {
+#endif
+      xskInfo->waitForXskSocket();
+    }
+#if defined(__SANITIZE_THREAD__)
+    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
+      auto packet = XskPacketPtr(packetRaw);
+      if (ProcessXskQuery(*cs, holders, *packet)) {
+        packet->updatePacket();
+        xskInfo->pushToSendQueue(std::move(packet));
+      }
+      else {
+        xskInfo->markAsFree(std::move(packet));
+      }
+    });
+    xskInfo->notifyXskSocket();
+  }
+}
+}
 #endif /* HAVE_XSK */
 
 #ifndef DISABLE_RECVMMSG
@@ -2269,40 +2395,6 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde
 #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
 #endif /* DISABLE_RECVMMSG */
 
-#ifdef HAVE_XSK
-static void xskClientThread(ClientState* cs)
-{
-  setThreadName("dnsdist/xskClient");
-  auto xskInfo = cs->xskInfo;
-  LocalHolders holders;
-
-  for (;;) {
-#if defined(__SANITIZE_THREAD__)
-    while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
-#else
-    while (!xskInfo->incomingPacketsQueue.read_available()) {
-#endif
-      xskInfo->waitForXskSocket();
-    }
-#if defined(__SANITIZE_THREAD__)
-    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
-#else
-    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
-#endif
-      auto packet = XskPacketPtr(packetRaw);
-      if (ProcessXskQuery(*cs, holders, *packet)) {
-        packet->updatePacket();
-        xskInfo->pushToSendQueue(std::move(packet));
-      }
-      else {
-        xskInfo->markAsFree(std::move(packet));
-      }
-    });
-    xskInfo->notifyXskSocket();
-  }
-}
-#endif /* HAVE_XSK */
-
 // listens to incoming queries, sends out to downstream servers, noting the intended return path
 static void udpClientThread(std::vector<ClientState*> states)
 {
@@ -3282,17 +3374,13 @@ static void initFrontends()
   }
 }
 
-#ifdef HAVE_XSK
-void XskRouter(std::shared_ptr<XskSocket> xsk);
-#endif /* HAVE_XSK */
-
 namespace dnsdist
 {
 static void startFrontends()
 {
 #ifdef HAVE_XSK
   for (auto& xskContext : g_xsk) {
-    std::thread xskThread(XskRouter, std::move(xskContext));
+    std::thread xskThread(dnsdist::xsk::XskRouter, std::move(xskContext));
     xskThread.detach();
   }
 #endif /* HAVE_XSK */
@@ -3302,7 +3390,7 @@ static void startFrontends()
   for (auto& clientState : g_frontends) {
 #ifdef HAVE_XSK
     if (clientState->xskInfo) {
-      std::thread xskCT(xskClientThread, clientState.get());
+      std::thread xskCT(dnsdist::xsk::xskClientThread, clientState.get());
       if (!clientState->cpus.empty()) {
         mapThreadToCPUList(xskCT.native_handle(), clientState->cpus);
       }
@@ -3671,81 +3759,3 @@ int main(int argc, char** argv)
 #endif
   }
 }
-
-#ifdef HAVE_XSK
-void XskRouter(std::shared_ptr<XskSocket> xsk)
-{
-  setThreadName("dnsdist/XskRouter");
-  uint32_t failed;
-  // packets to be submitted for sending
-  vector<XskPacketPtr> fillInTx;
-  const auto size = xsk->fds.size();
-  // list of workers that need to be notified
-  std::set<int> needNotify;
-  const auto& xskWakerIdx = xsk->workers.get<0>();
-  const auto& destIdx = xsk->workers.get<1>();
-  while (true) {
-    try {
-      auto ready = xsk->wait(-1);
-      // descriptor 0 gets incoming AF_XDP packets
-      if (xsk->fds[0].revents & POLLIN) {
-        auto packets = xsk->recv(64, &failed);
-        dnsdist::metrics::g_stats.nonCompliantQueries += failed;
-        for (auto &packet : packets) {
-          const auto dest = packet->getToAddr();
-          auto res = destIdx.find(dest);
-          if (res == destIdx.end()) {
-            xsk->markAsFree(std::move(packet));
-            continue;
-          }
-          res->worker->pushToProcessingQueue(std::move(packet));
-          needNotify.insert(res->workerWaker);
-        }
-        for (auto i : needNotify) {
-          uint64_t x = 1;
-          auto written = write(i, &x, sizeof(x));
-          if (written != sizeof(x)) {
-            // oh, well, the worker is clearly overloaded
-            // but there is nothing we can do about it,
-            // and hopefully the queue will be processed eventually
-          }
-        }
-        needNotify.clear();
-        ready--;
-      }
-      const auto backup = ready;
-      for (size_t i = 1; i < size && ready > 0; i++) {
-        if (xsk->fds[i].revents & POLLIN) {
-          ready--;
-          auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker;
-#if defined(__SANITIZE_THREAD__)
-          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
-#else
-          info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
-#endif
-            auto packet = XskPacketPtr(packetRaw);
-            if (!(packet->getFlags() & XskPacket::UPDATE)) {
-              xsk->markAsFree(std::move(packet));
-              return;
-            }
-            if (packet->getFlags() & XskPacket::DELAY) {
-              xsk->waitForDelay.push(std::move(packet));
-              return;
-            }
-            fillInTx.push_back(std::move(packet));
-          });
-          info->cleanWorkerNotification();
-        }
-      }
-      xsk->pickUpReadyPacket(fillInTx);
-      xsk->recycle(4096);
-      xsk->fillFq();
-      xsk->send(fillInTx);
-      ready = backup;
-    }
-    catch (...) {
-      vinfolog("Exception in XSK router loop");
-    }
-  }
-}
-#endif /* HAVE_XSK */
index 2f438f9fd9c16d24d87348c87b0eae3c75426905..5b3c2142dfe0a67b265d8fcd844de32f21cd9b3c 100644 (file)
@@ -97,7 +97,7 @@ int XskSocket::firstTimeout()
   }
   timespec now;
   gettime(&now);
-  const auto& firstTime = waitForDelay.top()->sendTime;
+  const auto& firstTime = waitForDelay.top()->getSendTime();
   const auto res = timeDifference(now, firstTime);
   if (res <= 0) {
     return 0;
@@ -106,7 +106,7 @@ int XskSocket::firstTimeout()
 }
 
 XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_) :
-  frameNum(frameNum_), queueId(queue_id), ifName(ifName_), poolName(poolName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
+  frameNum(frameNum_), ifName(ifName_), poolName(poolName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
 {
   if (!isPowOfTwo(frameNum_) || !isPowOfTwo(frameSize)
       || !isPowOfTwo(fqCapacity) || !isPowOfTwo(cqCapacity) || !isPowOfTwo(rxCapacity) || !isPowOfTwo(txCapacity)) {
@@ -219,7 +219,7 @@ int XskSocket::wait(int timeout)
 
 [[nodiscard]] uint64_t XskSocket::frameOffset(const XskPacket& packet) const noexcept
 {
-  return packet.frame - umem.bufBase;
+  return packet.getFrameOffsetFrom(umem.bufBase);
 }
 
 [[nodiscard]] int XskSocket::xskFd() const noexcept {
@@ -314,7 +314,7 @@ void XskSocket::pickUpReadyPacket(std::vector<XskPacketPtr>& packets)
 {
   timespec now;
   gettime(&now);
-  while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top()->sendTime) <= 0) {
+  while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top()->getSendTime()) <= 0) {
     auto& top = const_cast<XskPacketPtr&>(waitForDelay.top());
     packets.push_back(std::move(top));
     waitForDelay.pop();
@@ -676,7 +676,7 @@ void XskPacket::addDelay(const int relativeMilliseconds) noexcept
 
 bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept
 {
-  return s1->sendTime < s2->sendTime;
+  return s1->getSendTime() < s2->getSendTime();
 }
 
 const ComboAddress& XskPacket::getFromAddr() const noexcept
@@ -992,9 +992,9 @@ void XskSocket::addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest
     .revents = 0});
 };
 
-uint64_t XskWorker::frameOffset(const XskPacket& s) const noexcept
+uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept
 {
-  return s.frame - umemBufBase;
+  return packet.getFrameOffsetFrom(umemBufBase);
 }
 
 void XskWorker::notifyWorker() noexcept
index 551c23074c9465db143a3777a857bfd283f3c533..d8dc067563755ec9f0ee5e24af3045ab931c2f7e 100644 (file)
@@ -87,12 +87,12 @@ class XskSocket
     ~XskUmem();
     XskUmem() = default;
   };
-  boost::multi_index_container<
+  using WorkerContainer = boost::multi_index_container<
     XskRouteInfo,
     boost::multi_index::indexed_by<
       boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, int, &XskRouteInfo::xskSocketWaker>>,
-      boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, ComboAddress, &XskRouteInfo::dest>, ComboAddress::addressPortOnlyHash>>>
-    workers;
+      boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, ComboAddress, &XskRouteInfo::dest>, ComboAddress::addressPortOnlyHash>>>;
+  WorkerContainer workers;
   // number of frames to keep in sharedEmptyFrameOffset
   static constexpr size_t holdThreshold = 256;
   // number of frames to insert into the fill queue
@@ -100,8 +100,6 @@ class XskSocket
   static constexpr size_t frameSize = 2048;
   // number of entries (frames) in the umem
   const size_t frameNum;
-  // ID of the network queue
-  const uint32_t queueId;
   // responses that have been delayed
   std::priority_queue<XskPacketPtr> waitForDelay;
   const std::string ifName;
@@ -123,7 +121,6 @@ class XskSocket
   xsk_ring_prod tx;
   std::unique_ptr<xsk_socket, void (*)(xsk_socket*)> socket;
   XskUmem umem;
-  bpf_object* prog;
 
   static constexpr uint32_t fqCapacity = XSK_RING_PROD__DEFAULT_NUM_DESCS * 4;
   static constexpr uint32_t cqCapacity = XSK_RING_CONS__DEFAULT_NUM_DESCS * 4;
@@ -132,18 +129,10 @@ class XskSocket
 
   constexpr static bool isPowOfTwo(uint32_t value) noexcept;
   [[nodiscard]] static int timeDifference(const timespec& t1, const timespec& t2) noexcept;
-  friend void XskRouter(std::shared_ptr<XskSocket> xsk);
 
   [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
   [[nodiscard]] int firstTimeout();
-  // pick ups available frames from uniqueEmptyFrameOffset
-  // insert entries from uniqueEmptyFrameOffset into fq
-  void fillFq(uint32_t fillSize = fillThreshold) noexcept;
-  // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
-  void recycle(size_t size) noexcept;
   void getMACFromIfName();
-  // look at delayed packets, and send the ones that are ready
-  void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
 
 public:
   static constexpr size_t getFrameSize()
@@ -164,6 +153,25 @@ public:
   void addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest);
   [[nodiscard]] std::string getMetrics() const;
   void markAsFree(XskPacketPtr&& packet);
+  [[nodiscard]] WorkerContainer& getWorkers()
+  {
+    return workers;
+  }
+  [[nodiscard]] const std::vector<pollfd>& getDescriptors() const
+  {
+    return fds;
+  }
+  // pick ups available frames from uniqueEmptyFrameOffset
+  // insert entries from uniqueEmptyFrameOffset into fq
+  void fillFq(uint32_t fillSize = fillThreshold) noexcept;
+  // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
+  void recycle(size_t size) noexcept;
+  // look at delayed packets, and send the ones that are ready
+  void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
+  void pushDelayed(XskPacketPtr&& packet)
+  {
+    waitForDelay.push(std::move(packet));
+  }
 };
 
 struct iphdr;
@@ -216,14 +224,8 @@ private:
   void setIPv6Header(const ipv6hdr& ipv6Header) noexcept;
   [[nodiscard]] udphdr getUDPHeader() const noexcept;
   void setUDPHeader(const udphdr& udpHeader) noexcept;
-  // parse IP and UDP payloads
-  bool parse(bool fromSetHeader);
   void changeDirectAndUpdateChecksum() noexcept;
 
-  friend XskSocket;
-  friend XskWorker;
-  friend bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
-
   constexpr static uint8_t DefaultTTL = 64;
 
 public:
@@ -244,7 +246,17 @@ public:
   XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize);
   void addDelay(int relativeMilliseconds) noexcept;
   void updatePacket() noexcept;
+  // parse IP and UDP payloads
+  bool parse(bool fromSetHeader);
   [[nodiscard]] uint32_t getFlags() const noexcept;
+  [[nodiscard]] timespec getSendTime() const noexcept
+  {
+    return sendTime;
+  }
+  [[nodiscard]] uint64_t getFrameOffsetFrom(const uint8_t* base) const noexcept
+  {
+    return frame - base;
+  }
 };
 bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
 
@@ -298,7 +310,7 @@ public:
   void waitForXskSocket() noexcept;
   void cleanWorkerNotification() noexcept;
   void cleanSocketNotification() noexcept;
-  [[nodiscard]] uint64_t frameOffset(const XskPacket& s) const noexcept;
+  [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
   // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
   void fillUniqueEmptyOffset();
   // look for an empty umem entry in uniqueEmptyFrameOffset