]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Clean up and reorganize XSK code
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 15 Jan 2024 14:14:29 +0000 (15:14 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 23 Jan 2024 11:54:21 +0000 (12:54 +0100)
pdns/dnsdist-lua-bindings.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/Makefile.am
pdns/dnsdistdist/dnsdist-backend.cc
pdns/dnsdistdist/dnsdist-xsk.cc [new file with mode: 0644]
pdns/dnsdistdist/dnsdist-xsk.hh [new file with mode: 0644]
pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc
pdns/test-dnsdist_cc.cc
pdns/xsk.cc
pdns/xsk.hh

index 45ee564cf64afd580e5db95e256a02debe4985e6..d0e602e22dfcdbe46f53eec2b8f057ebdeb90147 100644 (file)
@@ -26,6 +26,7 @@
 #include "dnsdist-lua.hh"
 #include "dnsdist-resolver.hh"
 #include "dnsdist-svc.hh"
+#include "dnsdist-xsk.hh"
 
 #include "dolog.hh"
 #include "xsk.hh"
@@ -754,9 +755,8 @@ void setupLuaBindings(LuaContext& luaCtx, bool client, bool configCheck)
     else {
       throw std::runtime_error("xskMapPath field is required!");
     }
-    extern std::vector<std::shared_ptr<XskSocket>> g_xsk;
     auto socket = std::make_shared<XskSocket>(frameNums, ifName, queue_id, path);
-    g_xsk.push_back(socket);
+    dnsdist::xsk::g_xsk.push_back(socket);
     return socket;
   });
   luaCtx.registerFunction<std::string(std::shared_ptr<XskSocket>::*)()const>("getMetrics", [](const std::shared_ptr<XskSocket>& xsk) {
index cd71ac6b5c3d76ff8bc09d74790b0289896820b8..81ad234bfe2713e00bb31195a9feb4feb344705f 100644 (file)
 #include <sys/resource.h>
 #include <unistd.h>
 
-#ifdef HAVE_XSK
-#include <sys/poll.h>
-#endif /* HAVE_XSK */
-
 #ifdef HAVE_LIBEDIT
 #if defined (__OpenBSD__) || defined(__NetBSD__)
 // If this is not undeffed, __attribute__ wil be redefined by /usr/include/readline/rlstdc.h
@@ -73,6 +69,7 @@
 #include "dnsdist-tcp.hh"
 #include "dnsdist-web.hh"
 #include "dnsdist-xpf.hh"
+#include "dnsdist-xsk.hh"
 
 #include "base64.hh"
 #include "capabilities.hh"
@@ -88,6 +85,7 @@
 #include "misc.hh"
 #include "sstuff.hh"
 #include "threadname.hh"
+#include "xsk.hh"
 
 /* Known sins:
 
@@ -116,7 +114,6 @@ std::vector<std::shared_ptr<DOHFrontend>> g_dohlocals;
 std::vector<std::shared_ptr<DOQFrontend>> g_doqlocals;
 std::vector<std::shared_ptr<DOH3Frontend>> g_doh3locals;
 std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
-std::vector<std::shared_ptr<XskSocket>> g_xsk;
 
 shared_ptr<BPFFilter> g_defaultBPFFilter{nullptr};
 std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
@@ -775,7 +772,7 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re
   }
 }
 
-static bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
+bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
 {
 
   const dnsheader_aligned dh(response.data());
@@ -811,185 +808,6 @@ static bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, Packet
   return true;
 }
 
-#ifdef HAVE_XSK
-namespace dnsdist::xsk
-{
-void responderThread(std::shared_ptr<DownstreamState> dss)
-{
-  if (dss->xskInfo == nullptr) {
-    throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
-  }
-
-  try {
-    setThreadName("dnsdist/XskResp");
-    auto localRespRuleActions = g_respruleactions.getLocal();
-    auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
-    auto xskInfo = dss->xskInfo;
-    auto pollfds = getPollFdsForWorker(*xskInfo);
-    const auto xskFd = xskInfo->workerWaker.getHandle();
-    while (!dss->isStopped()) {
-      poll(pollfds.data(), pollfds.size(), -1);
-      bool needNotify = false;
-      if (pollfds[0].revents & POLLIN) {
-        needNotify = true;
-#if defined(__SANITIZE_THREAD__)
-        xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-        xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
-          if (packet.getDataLen() < sizeof(dnsheader)) {
-            xskInfo->markAsFree(std::move(packet));
-            return;
-          }
-          const dnsheader_aligned dnsHeader(packet.getPayloadData());
-          const auto queryId = dnsHeader->id;
-          auto ids = dss->getState(queryId);
-          if (ids) {
-            if (xskFd != ids->backendFD || !ids->isXSK()) {
-              dss->restoreState(queryId, std::move(*ids));
-              ids = std::nullopt;
-            }
-          }
-          if (!ids) {
-            xskInfo->markAsFree(std::move(packet));
-            return;
-          }
-          auto response = packet.clonePacketBuffer();
-          if (response.size() > packet.getCapacity()) {
-            /* fallback to sending the packet via normal socket */
-            ids->xskPacketHeader.clear();
-          }
-          if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
-            xskInfo->markAsFree(std::move(packet));
-            vinfolog("XSK packet pushed to queue because processResponderPacket failed");
-            return;
-          }
-          if (response.size() > packet.getCapacity()) {
-            /* fallback to sending the packet via normal socket */
-            sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
-            vinfolog("XSK packet falling back because packet is too large");
-            xskInfo->markAsFree(std::move(packet));
-            return;
-          }
-          packet.setHeader(ids->xskPacketHeader);
-          if (!packet.setPayload(response)) {
-            vinfolog("Unable to set payload !");
-          }
-          if (ids->delayMsec > 0) {
-            vinfolog("XSK packet - adding delay");
-            packet.addDelay(ids->delayMsec);
-          }
-          packet.updatePacket();
-          xskInfo->pushToSendQueue(std::move(packet));
-        });
-        xskInfo->cleanSocketNotification();
-      }
-      if (needNotify) {
-        xskInfo->notifyXskSocket();
-      }
-    }
-  }
-  catch (const std::exception& e) {
-    errlog("XSK responder thread died because of exception: %s", e.what());
-  }
-  catch (const PDNSException& e) {
-    errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
-  }
-  catch (...) {
-    errlog("XSK responder thread died because of an exception: %s", "unknown");
-  }
-}
-
-static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept
-{
-  const auto& from = packet.getFromAddr();
-  expectProxyProtocol = expectProxyProtocolFrom(from);
-  if (!holders.acl->match(from) && !expectProxyProtocol) {
-    vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
-    ++dnsdist::metrics::g_stats.aclDrops;
-    return false;
-  }
-  cs.queries++;
-  ++dnsdist::metrics::g_stats.queries;
-
-  return true;
-}
-
-static void XskRouter(std::shared_ptr<XskSocket> xsk)
-{
-  setThreadName("dnsdist/XskRouter");
-  uint32_t failed;
-  // packets to be submitted for sending
-  vector<XskPacket> fillInTx;
-  const auto& fds = xsk->getDescriptors();
-  // list of workers that need to be notified
-  std::set<int> needNotify;
-  while (true) {
-    try {
-      auto ready = xsk->wait(-1);
-      // descriptor 0 gets incoming AF_XDP packets
-      if (fds.at(0).revents & POLLIN) {
-        auto packets = xsk->recv(64, &failed);
-        dnsdist::metrics::g_stats.nonCompliantQueries += failed;
-        for (auto &packet : packets) {
-          const auto dest = packet.getToAddr();
-          auto worker = xsk->getWorkerByDestination(dest);
-          if (!worker) {
-            xsk->markAsFree(std::move(packet));
-            continue;
-          }
-          worker->pushToProcessingQueue(std::move(packet));
-          needNotify.insert(worker->workerWaker.getHandle());
-        }
-        for (auto i : needNotify) {
-          uint64_t x = 1;
-          auto written = write(i, &x, sizeof(x));
-          if (written != sizeof(x)) {
-            // oh, well, the worker is clearly overloaded
-            // but there is nothing we can do about it,
-            // and hopefully the queue will be processed eventually
-          }
-        }
-        needNotify.clear();
-        ready--;
-      }
-      const auto backup = ready;
-      for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
-        if (fds.at(fdIndex).revents & POLLIN) {
-          ready--;
-          auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
-#if defined(__SANITIZE_THREAD__)
-          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-          info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
-            if (!(packet.getFlags() & XskPacket::UPDATE)) {
-              xsk->markAsFree(std::move(packet));
-              return;
-            }
-            if (packet.getFlags() & XskPacket::DELAY) {
-              xsk->pushDelayed(std::move(packet));
-              return;
-            }
-            fillInTx.push_back(std::move(packet));
-          });
-          info->cleanWorkerNotification();
-        }
-      }
-      xsk->pickUpReadyPacket(fillInTx);
-      xsk->recycle(4096);
-      xsk->fillFq();
-      xsk->send(fillInTx);
-      ready = backup;
-    }
-    catch (...) {
-      vinfolog("Exception in XSK router loop");
-    }
-  }
-}
-}
-#endif /* HAVE_XSK */
-
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
 void responderThread(std::shared_ptr<DownstreamState> dss)
 {
@@ -1065,7 +883,7 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
           xskPacket->setHeader(ids->xskPacketHeader);
           xskPacket->setPayload(response);
           xskPacket->updatePacket();
-          xskInfo->pushToSendQueue(std::move(*xskPacket));
+          xskInfo->pushToSendQueue(*xskPacket);
           xskInfo->notifyXskSocket();
 #endif /* HAVE_XSK */
         }
@@ -2061,7 +1879,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 #ifdef HAVE_XSK
 namespace dnsdist::xsk
 {
-static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
+bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
 {
   uint16_t queryId = 0;
   const auto& remote = packet.getFromAddr();
@@ -2077,7 +1895,7 @@ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
 
   try {
     bool expectProxyProtocol = false;
-    if (!isXskQueryAcceptable(packet, cs, holders, expectProxyProtocol)) {
+    if (!XskIsQueryAcceptable(packet, cs, holders, expectProxyProtocol)) {
       return false;
     }
 
@@ -2191,36 +2009,6 @@ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
   return false;
 }
 
-static void xskClientThread(ClientState* cs)
-{
-  setThreadName("dnsdist/xskClient");
-  auto xskInfo = cs->xskInfo;
-  LocalHolders holders;
-
-  for (;;) {
-#if defined(__SANITIZE_THREAD__)
-    while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
-#else
-    while (!xskInfo->incomingPacketsQueue.read_available()) {
-#endif
-      xskInfo->waitForXskSocket();
-    }
-#if defined(__SANITIZE_THREAD__)
-    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
-      if (ProcessXskQuery(*cs, holders, packet)) {
-        packet.updatePacket();
-        xskInfo->pushToSendQueue(std::move(packet));
-      }
-      else {
-        xskInfo->markAsFree(std::move(packet));
-      }
-    });
-    xskInfo->notifyXskSocket();
-  }
-}
 }
 #endif /* HAVE_XSK */
 
@@ -3295,7 +3083,7 @@ namespace dnsdist
 static void startFrontends()
 {
 #ifdef HAVE_XSK
-  for (auto& xskContext : g_xsk) {
+  for (auto& xskContext : dnsdist::xsk::g_xsk) {
     std::thread xskThread(dnsdist::xsk::XskRouter, std::move(xskContext));
     xskThread.detach();
   }
@@ -3306,9 +3094,9 @@ static void startFrontends()
   for (auto& clientState : g_frontends) {
 #ifdef HAVE_XSK
     if (clientState->xskInfo) {
-      XskSocket::addDestinationAddress(clientState->local);
+      dnsdist::xsk::addDestinationAddress(clientState->local);
 
-      std::thread xskCT(dnsdist::xsk::xskClientThread, clientState.get());
+      std::thread xskCT(dnsdist::xsk::XskClientThread, clientState.get());
       if (!clientState->cpus.empty()) {
         mapThreadToCPUList(xskCT.native_handle(), clientState->cpus);
       }
@@ -3415,7 +3203,14 @@ int main(int argc, char** argv)
     g_hashperturb = dnsdist::getRandomValue(0xffffffff);
 
 #ifdef HAVE_XSK
-    XskSocket::clearDestinationAddresses();
+#warning FIXME: we need to provide a way to clear the map from Lua, as well as a way to change the map path
+    try {
+      dnsdist::xsk::clearDestinationAddresses();
+    }
+    catch (const std::exception& exp) {
+      /* silently handle failures: at this point we don't even know if XSK is enabled,
+         and we might not have the correct map (not the default one). */
+    }
 #endif /* HAVE_XSK */
 
     ComboAddress clientAddress = ComboAddress();
index 4cec0d03043cda85abc2922aa9aac9f85b99f008..254e64af61e4d3d82b2ac14a7e111d7c0fb752c7 100644 (file)
@@ -56,7 +56,6 @@
 #include "uuid-utils.hh"
 #include "proxy-protocol.hh"
 #include "stat_t.hh"
-#include "xsk.hh"
 
 uint64_t uptimeOfProcess(const std::string& str);
 
@@ -472,6 +471,10 @@ struct QueryCount {
 
 extern QueryCount g_qcount;
 
+class XskPacket;
+class XskSocket;
+class XskWorker;
+
 struct ClientState
 {
   ClientState(const ComboAddress& local_, bool isTCP_, bool doReusePort, int fastOpenQueue, const std::string& itfName, const std::set<int>& cpus_, bool enableProxyProtocol): cpus(cpus_), interface(itfName), local(local_), fastOpenQueueSize(fastOpenQueue), tcp(isTCP_), reuseport(doReusePort), d_enableProxyProtocol(enableProxyProtocol)
@@ -704,8 +707,10 @@ struct DownstreamState: public std::enable_shared_from_this<DownstreamState>
     std::string d_dohPath;
     std::string name;
     std::string nameWithAddr;
-    MACAddr sourceMACAddr;
-    MACAddr destMACAddr;
+#ifdef HAVE_XSK
+    std::array<uint8_t, 6> sourceMACAddr;
+    std::array<uint8_t, 6> destMACAddr;
+#endif /* HAVE_XSK */
     size_t d_numberOfSockets{1};
     size_t d_maxInFlightQueriesPerConn{1};
     size_t d_tcpConcurrentConnectionsLimit{0};
@@ -1189,6 +1194,7 @@ ProcessQueryResult processQueryAfterRules(DNSQuestion& dq, LocalHolders& holders
 bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& insertedRespRuleActions, DNSResponse& dr, bool muted);
 bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop);
 bool processResponseAfterRules(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted);
+bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids);
 
 bool assignOutgoingUDPQueryToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t queryID, DNSQuestion& dq, PacketBuffer& query, bool actuallySend = true);
 
@@ -1196,10 +1202,3 @@ ssize_t udpClientSendRequestToBackend(const std::shared_ptr<DownstreamState>& ss
 bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote);
 void handleResponseSent(const DNSName& qname, const QType& qtype, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, dnsdist::Protocol incomingProtocol, bool fromBackend);
 void handleResponseSent(const InternalQueryState& ids, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, bool fromBackend);
-
-#ifdef HAVE_XSK
-namespace dnsdist::xsk
-{
-void responderThread(std::shared_ptr<DownstreamState> dss);
-}
-#endif /* HAVE_XSK */
index c95629daac9797c3727630ca1be79c29600620ed..1a1e3080c1c3b32f82958098449c6e02197679d7 100644 (file)
@@ -207,6 +207,7 @@ dnsdist_SOURCES = \
        dnsdist-tcp.cc dnsdist-tcp.hh \
        dnsdist-web.cc dnsdist-web.hh \
        dnsdist-xpf.cc dnsdist-xpf.hh \
+       dnsdist-xsk.cc dnsdist-xsk.hh \
        dnsdist.cc dnsdist.hh \
        dnslabeltext.cc \
        dnsname.cc dnsname.hh \
@@ -305,6 +306,7 @@ testrunner_SOURCES = \
        dnsdist-tcp-downstream.cc \
        dnsdist-tcp.cc dnsdist-tcp.hh \
        dnsdist-xpf.cc dnsdist-xpf.hh \
+       dnsdist-xsk.cc dnsdist-xsk.hh \
        dnsdist.hh \
        dnslabeltext.cc \
        dnsname.cc dnsname.hh \
index 02cfea5c7d3036e8851c19bd5c8b6e509a64ec04..49b47b8cdcca5acb07c36d05ed139f8d18459c0d 100644 (file)
@@ -27,6 +27,7 @@
 #include "dnsdist-random.hh"
 #include "dnsdist-rings.hh"
 #include "dnsdist-tcp.hh"
+#include "dnsdist-xsk.hh"
 #include "dolog.hh"
 #include "xsk.hh"
 
@@ -53,7 +54,7 @@ void DownstreamState::addXSKDestination(int fd)
     auto addresses = d_socketSourceAddresses.write_lock();
     addresses->push_back(local);
   }
-  XskSocket::addDestinationAddress(local);
+  dnsdist::xsk::addDestinationAddress(local);
   d_xskSocket->addWorkerRoute(xskInfo, local);
 }
 
@@ -65,7 +66,7 @@ void DownstreamState::removeXSKDestination(int fd)
     return;
   }
 
-  XskSocket::removeDestinationAddress(local);
+  dnsdist::xsk::removeDestinationAddress(local);
   d_xskSocket->removeWorkerRoute(local);
 }
 #endif /* HAVE_XSK */
@@ -329,7 +330,7 @@ void DownstreamState::start()
   if (connected && !threadStarted.test_and_set()) {
 #ifdef HAVE_XSK
     if (xskInfo != nullptr) {
-      tid = std::thread(dnsdist::xsk::responderThread, shared_from_this());
+      tid = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this());
     }
     else {
       tid = std::thread(responderThread, shared_from_this());
diff --git a/pdns/dnsdistdist/dnsdist-xsk.cc b/pdns/dnsdistdist/dnsdist-xsk.cc
new file mode 100644 (file)
index 0000000..4d1bec3
--- /dev/null
@@ -0,0 +1,266 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "dnsdist.hh"
+#include "dnsdist-xsk.hh"
+
+#ifdef HAVE_XSK
+#include <sys/poll.h>
+
+#include "dnsdist-metrics.hh"
+#include "dnsdist-proxy-protocol.hh"
+#include "threadname.hh"
+#include "xsk.hh"
+
+namespace dnsdist::xsk
+{
+std::vector<std::shared_ptr<XskSocket>> g_xsk;
+
+void XskResponderThread(std::shared_ptr<DownstreamState> dss)
+{
+  if (dss->xskInfo == nullptr) {
+    throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
+  }
+
+  try {
+    setThreadName("dnsdist/XskResp");
+    auto localRespRuleActions = g_respruleactions.getLocal();
+    auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
+    auto xskInfo = dss->xskInfo;
+    auto pollfds = getPollFdsForWorker(*xskInfo);
+    const auto xskFd = xskInfo->workerWaker.getHandle();
+    while (!dss->isStopped()) {
+      poll(pollfds.data(), pollfds.size(), -1);
+      bool needNotify = false;
+      if ((pollfds[0].revents & POLLIN) != 0) {
+        needNotify = true;
+#if defined(__SANITIZE_THREAD__)
+        xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
+#else
+        xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
+#endif
+          if (packet.getDataLen() < sizeof(dnsheader)) {
+            xskInfo->markAsFree(packet);
+            return;
+          }
+          const dnsheader_aligned dnsHeader(packet.getPayloadData());
+          const auto queryId = dnsHeader->id;
+          auto ids = dss->getState(queryId);
+          if (ids) {
+            if (xskFd != ids->backendFD || !ids->isXSK()) {
+              dss->restoreState(queryId, std::move(*ids));
+              ids = std::nullopt;
+            }
+          }
+          if (!ids) {
+            xskInfo->markAsFree(packet);
+            return;
+          }
+          auto response = packet.clonePacketBuffer();
+          if (response.size() > packet.getCapacity()) {
+            /* fallback to sending the packet via normal socket */
+            ids->xskPacketHeader.clear();
+          }
+          if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
+            xskInfo->markAsFree(packet);
+            vinfolog("XSK packet pushed to queue because processResponderPacket failed");
+            return;
+          }
+          if (response.size() > packet.getCapacity()) {
+            /* fallback to sending the packet via normal socket */
+            sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
+            vinfolog("XSK packet falling back because packet is too large");
+            xskInfo->markAsFree(packet);
+            return;
+          }
+          packet.setHeader(ids->xskPacketHeader);
+          if (!packet.setPayload(response)) {
+            vinfolog("Unable to set XSK payload !");
+          }
+          if (ids->delayMsec > 0) {
+            packet.addDelay(ids->delayMsec);
+          }
+          packet.updatePacket();
+          xskInfo->pushToSendQueue(packet);
+        });
+        xskInfo->cleanSocketNotification();
+      }
+      if (needNotify) {
+        xskInfo->notifyXskSocket();
+      }
+    }
+  }
+  catch (const std::exception& e) {
+    errlog("XSK responder thread died because of exception: %s", e.what());
+  }
+  catch (const PDNSException& e) {
+    errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
+  }
+  catch (...) {
+    errlog("XSK responder thread died because of an exception: %s", "unknown");
+  }
+}
+
+bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol)
+{
+  const auto& from = packet.getFromAddr();
+  expectProxyProtocol = expectProxyProtocolFrom(from);
+  if (!holders.acl->match(from) && !expectProxyProtocol) {
+    vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
+    ++dnsdist::metrics::g_stats.aclDrops;
+    return false;
+  }
+  clientState.queries++;
+  ++dnsdist::metrics::g_stats.queries;
+
+  return true;
+}
+
+void XskRouter(std::shared_ptr<XskSocket> xsk)
+{
+  setThreadName("dnsdist/XskRouter");
+  uint32_t failed = 0;
+  // packets to be submitted for sending
+  vector<XskPacket> fillInTx;
+  const auto& fds = xsk->getDescriptors();
+  // list of workers that need to be notified
+  std::set<int> needNotify;
+  while (true) {
+    try {
+      auto ready = xsk->wait(-1);
+      // descriptor 0 gets incoming AF_XDP packets
+      if ((fds.at(0).revents & POLLIN) != 0) {
+        auto packets = xsk->recv(64, &failed);
+        dnsdist::metrics::g_stats.nonCompliantQueries += failed;
+        for (auto &packet : packets) {
+          const auto dest = packet.getToAddr();
+          auto worker = xsk->getWorkerByDestination(dest);
+          if (!worker) {
+            xsk->markAsFree(packet);
+            continue;
+          }
+          worker->pushToProcessingQueue(packet);
+          needNotify.insert(worker->workerWaker.getHandle());
+        }
+        for (auto socket : needNotify) {
+          uint64_t value = 1;
+          auto written = write(socket, &value, sizeof(value));
+          if (written != sizeof(value)) {
+            // oh, well, the worker is clearly overloaded
+            // but there is nothing we can do about it,
+            // and hopefully the queue will be processed eventually
+          }
+        }
+        needNotify.clear();
+        ready--;
+      }
+      const auto backup = ready;
+      for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
+        if ((fds.at(fdIndex).revents & POLLIN) != 0) {
+          ready--;
+          const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
+#if defined(__SANITIZE_THREAD__)
+          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
+#else
+          info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
+#endif
+            if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
+              xsk->markAsFree(packet);
+              return;
+            }
+            if ((packet.getFlags() & XskPacket::DELAY) != 0) {
+              xsk->pushDelayed(packet);
+              return;
+            }
+            fillInTx.push_back(packet);
+          });
+          info->cleanWorkerNotification();
+        }
+      }
+      xsk->pickUpReadyPacket(fillInTx);
+      xsk->recycle(4096);
+      xsk->fillFq();
+      xsk->send(fillInTx);
+      ready = backup;
+    }
+    catch (...) {
+      vinfolog("Exception in XSK router loop");
+    }
+  }
+}
+
+void XskClientThread(ClientState* clientState)
+{
+  setThreadName("dnsdist/xskClient");
+  auto xskInfo = clientState->xskInfo;
+  LocalHolders holders;
+
+  for (;;) {
+#if defined(__SANITIZE_THREAD__)
+    while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) {
+#else
+    while (xskInfo->incomingPacketsQueue.read_available() == 0U) {
+#endif
+      xskInfo->waitForXskSocket();
+    }
+#if defined(__SANITIZE_THREAD__)
+    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
+#else
+    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
+#endif
+      if (XskProcessQuery(*clientState, holders, packet)) {
+        packet.updatePacket();
+        xskInfo->pushToSendQueue(packet);
+      }
+      else {
+        xskInfo->markAsFree(packet);
+      }
+    });
+    xskInfo->notifyXskSocket();
+  }
+}
+
+static std::string getDestinationMap(bool isV6) {
+ return !isV6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
+}
+
+void addDestinationAddress(const ComboAddress& addr)
+{
+  auto map = getDestinationMap(addr.isIPv6());
+  XskSocket::addDestinationAddress(map, addr);
+}
+
+void removeDestinationAddress(const ComboAddress& addr)
+{
+  auto map = getDestinationMap(addr.isIPv6());
+  XskSocket::removeDestinationAddress(map, addr);
+}
+
+void clearDestinationAddresses()
+{
+  auto map = getDestinationMap(false);
+  XskSocket::clearDestinationMap(map, false);
+  map = getDestinationMap(true);
+  XskSocket::clearDestinationMap(map, true);
+}
+
+}
+#endif /* HAVE_XSK */
diff --git a/pdns/dnsdistdist/dnsdist-xsk.hh b/pdns/dnsdistdist/dnsdist-xsk.hh
new file mode 100644 (file)
index 0000000..6a862d7
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include "config.h"
+
+#ifdef HAVE_XSK
+class XskPacket;
+class XskSocket;
+class XskWorker;
+
+#include <memory>
+
+namespace dnsdist::xsk
+{
+void XskResponderThread(std::shared_ptr<DownstreamState> dss);
+bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol);
+bool XskProcessQuery(ClientState& clientState, LocalHolders& holders, XskPacket& packet);
+void XskRouter(std::shared_ptr<XskSocket> xsk);
+void XskClientThread(ClientState* clientState);
+void addDestinationAddress(const ComboAddress& addr);
+void removeDestinationAddress(const ComboAddress& addr);
+void clearDestinationAddresses();
+
+extern std::vector<std::shared_ptr<XskSocket>> g_xsk;
+}
+#endif /* HAVE_XSK */
index e05c0b56b4af626e38c926317ff6e0681cbb13a8..bcb73b26529cd858245f22dd9aae0fd6e933e0c7 100644 (file)
@@ -74,13 +74,6 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
 {
 }
 
-namespace dnsdist::xsk
-{
-void responderThread(std::shared_ptr<DownstreamState> dss)
-{
-}
-}
-
 string g_outputBuffer;
 std::atomic<bool> g_configurationDone{false};
 
index c29fdabba5e9c61618e8ee0fcea6b2f330587f4e..d3e0d29fc0b951e37728b86a44957646a1f12196 100644 (file)
@@ -33,6 +33,7 @@
 #include "dnsdist-internal-queries.hh"
 #include "dnsdist-tcp.hh"
 #include "dnsdist-xpf.hh"
+#include "dnsdist-xsk.hh"
 
 #include "dolog.hh"
 #include "dnsname.hh"
@@ -73,8 +74,18 @@ bool DNSDistSNMPAgent::sendBackendStatusChangeTrap(DownstreamState const&)
 {
   return false;
 }
+namespace dnsdist::xsk
+{
+bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
+{
+  return false;
+}
+}
 
-std::vector<std::shared_ptr<XskSocket>> g_xsk;
+bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
+{
+  return false;
+}
 
 BOOST_AUTO_TEST_SUITE(test_dnsdist_cc)
 
index 15d9f14ec5f40a0f6a6948d9a3166b9ff33b85cb..b656757a2d22e679347a7b23bb736293b70aa01c 100644 (file)
@@ -94,7 +94,7 @@ int XskSocket::firstTimeout()
   if (waitForDelay.empty()) {
     return -1;
   }
-  timespec now;
+  timespec now{};
   gettime(&now);
   const auto& firstTime = waitForDelay.top().getSendTime();
   const auto res = timeDifference(now, firstTime);
@@ -104,8 +104,8 @@ int XskSocket::firstTimeout()
   return res;
 }
 
-XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath) :
-  frameNum(frameNum_), ifName(ifName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
+XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, const std::string& xskMapPath) :
+  frameNum(frameNum_), ifName(std::move(ifName_)), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
 {
   if (!isPowOfTwo(frameNum_) || !isPowOfTwo(frameSize)
       || !isPowOfTwo(fqCapacity) || !isPowOfTwo(cqCapacity) || !isPowOfTwo(rxCapacity) || !isPowOfTwo(txCapacity)) {
@@ -118,7 +118,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu
   memset(&tx, 0, sizeof(tx));
   memset(&rx, 0, sizeof(rx));
 
-  xsk_umem_config umemCfg;
+  xsk_umem_config umemCfg{};
   umemCfg.fill_size = fqCapacity;
   umemCfg.comp_size = cqCapacity;
   umemCfg.frame_size = frameSize;
@@ -127,7 +127,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu
   umem.umemInit(frameNum_ * frameSize, &cq, &fq, &umemCfg);
 
   {
-    xsk_socket_config socketCfg;
+    xsk_socket_config socketCfg{};
     socketCfg.rx_size = rxCapacity;
     socketCfg.tx_size = txCapacity;
     socketCfg.bind_flags = XDP_USE_NEED_WAKEUP;
@@ -170,13 +170,12 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu
   }
 
   auto ret = bpf_map_update_elem(xskMapFd.getHandle(), &queue_id, &xskfd, 0);
-  if (ret) {
+  if (ret != 0) {
     throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath + "': " + std::to_string(ret));
   }
 }
 
 // see xdp.h in contrib/
-
 struct IPv4AndPort
 {
   uint32_t addr;
@@ -188,16 +187,19 @@ struct IPv6AndPort
   uint16_t port;
 };
 
-static void clearDestinationMap(bool v6)
+static FDWrapper getDestinationMap(const std::string& mapPath)
 {
-  const std::string mapPath = !v6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
-
-  const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
+  auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
   if (destMapFd.getHandle() < 0) {
     throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
   }
+  return destMapFd;
+}
 
-  if (!v6) {
+void XskSocket::clearDestinationMap(const std::string& mapPath, bool isV6)
+{
+  auto destMapFd = getDestinationMap(mapPath);
+  if (!isV6) {
     IPv4AndPort prevKey{};
     IPv4AndPort key{};
     while (bpf_map_get_next_key(destMapFd.getHandle(), &prevKey, &key) == 0) {
@@ -215,33 +217,16 @@ static void clearDestinationMap(bool v6)
   }
 }
 
-void XskSocket::clearDestinationAddresses()
+void XskSocket::addDestinationAddress(const std::string& mapPath, const ComboAddress& destination)
 {
-  clearDestinationMap(false);
-  clearDestinationMap(true);
-}
-
-void XskSocket::addDestinationAddress(const ComboAddress& destination)
-{
-  // see xdp.h in contrib/
-
-  const std::string mapPath = destination.isIPv4() ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
-  //if (!s_destinationAddressesMap) {
-  //  throw std::runtime_error("The path of the XSK (AF_XDP) destination addresses map has not been set! Please consider using setXSKDestinationAddressesMapPath().");
-  //}
-
-  const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
-  if (destMapFd.getHandle() < 0) {
-    throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
-  }
-
+  auto destMapFd = getDestinationMap(mapPath);
   bool value = true;
   if (destination.isIPv4()) {
     IPv4AndPort key{};
     key.addr = destination.sin4.sin_addr.s_addr;
     key.port = destination.sin4.sin_port;
     auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0);
-    if (ret) {
+    if (ret != 0) {
       throw std::runtime_error("Error inserting into xsk_map '" + mapPath + "': " + std::to_string(ret));
     }
   }
@@ -250,24 +235,15 @@ void XskSocket::addDestinationAddress(const ComboAddress& destination)
     key.addr = destination.sin6.sin6_addr;
     key.port = destination.sin6.sin6_port;
     auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0);
-    if (ret) {
+    if (ret != 0) {
       throw std::runtime_error("Error inserting into XSK destination addresses map '" + mapPath + "': " + std::to_string(ret));
     }
   }
 }
 
-void XskSocket::removeDestinationAddress(const ComboAddress& destination)
+void XskSocket::removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination)
 {
-  const std::string mapPath = destination.isIPv4() ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
-  //if (!s_destinationAddressesMap) {
-  //  throw std::runtime_error("The path of the XSK (AF_XDP) destination addresses map has not been set! Please consider using setXSKDestinationAddressesMapPath().");
-  //}
-
-  const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
-  if (destMapFd.getHandle() < 0) {
-    throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
-  }
-
+  auto destMapFd = getDestinationMap(mapPath);
   if (destination.isIPv4()) {
     IPv4AndPort key{};
     key.addr = destination.sin4.sin_addr.s_addr;
@@ -334,7 +310,7 @@ int XskSocket::wait(int timeout)
 
 void XskSocket::send(std::vector<XskPacket>& packets)
 {
-  while (packets.size() > 0) {
+  while (!packets.empty()) {
     auto packetSize = packets.size();
     if (packetSize > std::numeric_limits<uint32_t>::max()) {
       packetSize = std::numeric_limits<uint32_t>::max();
@@ -375,6 +351,7 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
     return res;
   }
 
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
   const auto baseAddr = reinterpret_cast<uint64_t>(umem.bufBase);
   uint32_t failed = 0;
   uint32_t processed = 0;
@@ -382,6 +359,7 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
   for (; processed < recvSize; processed++) {
     try {
       const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++);
+      // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
       XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
 #ifdef DEBUG_UMEM
       checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
@@ -389,10 +367,10 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
 
       if (!packet.parse(false)) {
         ++failed;
-        markAsFree(std::move(packet));
+        markAsFree(packet);
       }
       else {
-        res.push_back(std::move(packet));
+        res.push_back(packet);
       }
     }
     catch (const std::exception& exp) {
@@ -409,7 +387,7 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
   // which will only be made available again when pushed into the fill
   // queue
   xsk_ring_cons__release(&rx, processed);
-  if (failedCount) {
+  if (failedCount != nullptr) {
     *failedCount = failed;
   }
 
@@ -418,11 +396,12 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
 
 void XskSocket::pickUpReadyPacket(std::vector<XskPacket>& packets)
 {
-  timespec now;
+  timespec now{};
   gettime(&now);
   while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top().getSendTime()) <= 0) {
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
     auto& top = const_cast<XskPacket&>(waitForDelay.top());
-    packets.push_back(std::move(top));
+    packets.push_back(top);
     waitForDelay.pop();
   }
 }
@@ -461,10 +440,10 @@ void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue
 
 std::string XskSocket::getMetrics() const
 {
-  struct xdp_statistics stats;
+  xdp_statistics stats{};
   socklen_t optlen = sizeof(stats);
   int err = getsockopt(xskFd(), SOL_XDP, XDP_STATISTICS, &stats, &optlen);
-  if (err) {
+  if (err != 0) {
     return "";
   }
   if (optlen != sizeof(struct xdp_statistics)) {
@@ -481,7 +460,7 @@ std::string XskSocket::getMetrics() const
   return ret.str();
 }
 
-void XskSocket::markAsFree(XskPacket&& packet)
+void XskSocket::markAsFree(const XskPacket& packet)
 {
   auto offset = frameOffset(packet);
 #ifdef DEBUG_UMEM
@@ -493,10 +472,10 @@ void XskSocket::markAsFree(XskPacket&& packet)
 
 XskSocket::XskUmem::~XskUmem()
 {
-  if (umem) {
+  if (umem != nullptr) {
     xsk_umem__delete(umem);
   }
-  if (bufBase) {
+  if (bufBase != nullptr) {
     munmap(bufBase, size);
   }
 }
@@ -536,39 +515,49 @@ void XskPacket::setEthernetHeader(const ethhdr& ethHeader) noexcept
 [[nodiscard]] iphdr XskPacket::getIPv4Header() const noexcept
 {
   iphdr ipv4Header{};
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv4Header)));
   assert(!v6);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(&ipv4Header, frame + sizeof(ethhdr), sizeof(ipv4Header));
   return ipv4Header;
 }
 
 void XskPacket::setIPv4Header(const iphdr& ipv4Header) noexcept
 {
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   assert(frameLength >= (sizeof(ethhdr) + sizeof(iphdr)));
   assert(!v6);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(frame + sizeof(ethhdr), &ipv4Header, sizeof(ipv4Header));
 }
 
 [[nodiscard]] ipv6hdr XskPacket::getIPv6Header() const noexcept
 {
   ipv6hdr ipv6Header{};
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
   assert(v6);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(&ipv6Header, frame + sizeof(ethhdr), sizeof(ipv6Header));
   return ipv6Header;
 }
 
 void XskPacket::setIPv6Header(const ipv6hdr& ipv6Header) noexcept
 {
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
   assert(v6);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(frame + sizeof(ethhdr), &ipv6Header, sizeof(ipv6Header));
 }
 
 [[nodiscard]] udphdr XskPacket::getUDPHeader() const noexcept
 {
   udphdr udpHeader{};
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader)));
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(&udpHeader, frame + getL4HeaderOffset(), sizeof(udpHeader));
   return udpHeader;
 }
@@ -600,7 +589,9 @@ bool XskPacket::parse(bool fromSetHeader)
     // check ip.check == ipv4Checksum() is not needed!
     // We check it in BPF program
     // we don't, actually.
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
     from = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
     to = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
     l4Protocol = ipHeader.protocol;
     if (!fromSetHeader && (frameLength - sizeof(ethhdr)) != ntohs(ipHeader.tot_len)) {
@@ -614,7 +605,9 @@ bool XskPacket::parse(bool fromSetHeader)
     }
     v6 = true;
     auto ipHeader = getIPv6Header();
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
     from = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
     to = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
     l4Protocol = ipHeader.nexthdr;
     if (!fromSetHeader && (frameLength - (sizeof(ethhdr) + sizeof(ipv6hdr))) != ntohs(ipHeader.payload_len)) {
@@ -668,12 +661,12 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept
 {
   auto ethHeader = getEthernetHeader();
   {
-    uint8_t tmp[ETH_ALEN];
-    static_assert(sizeof(tmp) == sizeof(ethHeader.h_dest), "Size Error");
-    static_assert(sizeof(tmp) == sizeof(ethHeader.h_source), "Size Error");
-    memcpy(tmp, ethHeader.h_dest, sizeof(tmp));
-    memcpy(ethHeader.h_dest, ethHeader.h_source, sizeof(tmp));
-    memcpy(ethHeader.h_source, tmp, sizeof(tmp));
+    std::array<uint8_t, ETH_ALEN> tmp;
+    static_assert(tmp.size() == sizeof(ethHeader.h_dest), "Size Error");
+    static_assert(tmp.size() == sizeof(ethHeader.h_source), "Size Error");
+    memcpy(tmp.data(), ethHeader.h_dest, tmp.size());
+    memcpy(ethHeader.h_dest, ethHeader.h_source, tmp.size());
+    memcpy(ethHeader.h_source, tmp.data(), tmp.size());
   }
   if (ethHeader.h_proto == htons(ETH_P_IPV6)) {
     // IPV6
@@ -750,6 +743,7 @@ PacketBuffer XskPacket::clonePacketBuffer() const
 {
   const auto size = getDataSize();
   PacketBuffer tmp(size);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(tmp.data(), frame + getDataOffset(), size);
   return tmp;
 }
@@ -758,6 +752,7 @@ void XskPacket::cloneIntoPacketBuffer(PacketBuffer& buffer) const
 {
   const auto size = getDataSize();
   buffer.resize(size);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(buffer.data(), frame + getDataOffset(), size);
 }
 
@@ -769,7 +764,9 @@ bool XskPacket::setPayload(const PacketBuffer& buf)
     return false;
   }
   flags |= UPDATE;
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   memcpy(frame + getDataOffset(), buf.data(), bufSize);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   frameLength = getDataOffset() + bufSize;
   return true;
 }
@@ -777,14 +774,14 @@ bool XskPacket::setPayload(const PacketBuffer& buf)
 void XskPacket::addDelay(const int relativeMilliseconds) noexcept
 {
   gettime(&sendTime);
-  sendTime.tv_nsec += static_cast<uint64_t>(relativeMilliseconds) * 1000000L;
+  sendTime.tv_nsec += static_cast<int64_t>(relativeMilliseconds) * 1000000L;
   sendTime.tv_sec += sendTime.tv_nsec / 1000000000L;
   sendTime.tv_nsec %= 1000000000L;
 }
 
-bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept
+bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept
 {
-  return s1.getSendTime() < s2.getSendTime();
+  return lhs.getSendTime() < rhs.getSendTime();
 }
 
 const ComboAddress& XskPacket::getFromAddr() const noexcept
@@ -797,11 +794,11 @@ const ComboAddress& XskPacket::getToAddr() const noexcept
   return to;
 }
 
-void XskWorker::notify(int fd)
+void XskWorker::notify(int desc)
 {
   uint64_t value = 1;
   ssize_t res = 0;
-  while ((res = write(fd, &value, sizeof(value))) == EINTR) {
+  while ((res = write(desc, &value, sizeof(value))) == EINTR) {
   }
   if (res != sizeof(value)) {
     throw runtime_error("Unable Wake Up XskSocket Failed");
@@ -813,38 +810,39 @@ XskWorker::XskWorker() :
 {
 }
 
-void XskWorker::pushToProcessingQueue(XskPacket&& packet)
+void XskWorker::pushToProcessingQueue(XskPacket& packet)
 {
 #if defined(__SANITIZE_THREAD__)
-  if (!incomingPacketsQueue.lock()->push(std::move(packet))) {
+  if (!incomingPacketsQueue.lock()->push(packet)) {
 #else
-  if (!incomingPacketsQueue.push(std::move(packet))) {
+  if (!incomingPacketsQueue.push(packet)) {
 #endif
-    markAsFree(std::move(packet));
+    markAsFree(packet);
   }
 }
 
-void XskWorker::pushToSendQueue(XskPacket&& packet)
+void XskWorker::pushToSendQueue(XskPacket& packet)
 {
 #if defined(__SANITIZE_THREAD__)
-  if (!outgoingPacketsQueue.lock()->push(std::move(packet))) {
+  if (!outgoingPacketsQueue.lock()->push(packet)) {
 #else
-  if (!outgoingPacketsQueue.push(std::move(packet))) {
+  if (!outgoingPacketsQueue.push(packet)) {
 #endif
-    markAsFree(std::move(packet));
+    markAsFree(packet);
   }
 }
 
 const void* XskPacket::getPayloadData() const
 {
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   return frame + getDataOffset();
 }
 
 void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept
 {
   auto ethHeader = getEthernetHeader();
-  memcpy(ethHeader.h_dest, &toMAC[0], sizeof(MACAddr));
-  memcpy(ethHeader.h_source, &fromMAC[0], sizeof(MACAddr));
+  memcpy(ethHeader.h_dest, toMAC.data(), toMAC.size());
+  memcpy(ethHeader.h_source, fromMAC.data(), fromMAC.size());
   setEthernetHeader(ethHeader);
   to = to_;
   from = from_;
@@ -901,17 +899,18 @@ void XskPacket::rewrite() noexcept
   setEthernetHeader(ethHeader);
 }
 
-[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ip) noexcept
+[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ipHeader) noexcept
 {
-  auto partial = ip_checksum_partial(ip, sizeof(iphdr), 0);
+  auto partial = ip_checksum_partial(ipHeader, sizeof(iphdr), 0);
   return ip_checksum_fold(partial);
 }
 
-[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ip) const noexcept
+[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ipHeader) const noexcept
 {
   // ip options is not supported !!!
   const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
-  auto sum = tcp_udp_v4_header_checksum_partial(ip->saddr, ip->daddr, ip->protocol, l4Length);
+  auto sum = tcp_udp_v4_header_checksum_partial(ipHeader->saddr, ipHeader->daddr, ipHeader->protocol, l4Length);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
   return ip_checksum_fold(sum);
 }
@@ -920,6 +919,7 @@ void XskPacket::rewrite() noexcept
 {
   const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
   uint64_t sum = tcp_udp_v6_header_checksum_partial(&ipv6->saddr, &ipv6->daddr, ipv6->nexthdr, l4Length);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
   sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
   return ip_checksum_fold(sum);
 }
@@ -930,21 +930,24 @@ void XskPacket::rewrite() noexcept
   /* Main loop: 32 bits at a time */
   for (position = 0; position < len; position += sizeof(uint32_t)) {
     uint32_t value{};
-    memcpy(&value, reinterpret_cast<const uint8_t*>(ptr) + position, sizeof(value));
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+    memcpy(&value, static_cast<const uint8_t*>(ptr) + position, sizeof(value));
     sum += value;
   }
 
   /* Handle un-32bit-aligned trailing bytes */
   if ((len - position) >= 2) {
     uint16_t value{};
-    memcpy(&value, reinterpret_cast<const uint8_t*>(ptr) + position, sizeof(value));
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+    memcpy(&value, static_cast<const uint8_t*>(ptr) + position, sizeof(value));
     sum += value;
     position += sizeof(value);
   }
 
   if ((len - position) > 0) {
-    const auto* p8 = static_cast<const uint8_t*>(ptr) + position;
-    sum += ntohs(*p8 << 8); /* RFC says pad last byte */
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+    const auto* ptr8 = static_cast<const uint8_t*>(ptr) + position;
+    sum += ntohs(*ptr8 << 8); /* RFC says pad last byte */
   }
 
   return sum;
@@ -952,10 +955,10 @@ void XskPacket::rewrite() noexcept
 
 [[nodiscard]] __be16 XskPacket::ip_checksum_fold(uint64_t sum) noexcept
 {
-  while (sum & ~0xffffffffULL) {
+  while ((sum & ~0xffffffffULL) != 0U) {
     sum = (sum >> 32) + (sum & 0xffffffffULL);
   }
-  while (sum & 0xffff0000ULL) {
+  while ((sum & 0xffff0000ULL) != 0U) {
     sum = (sum >> 16) + (sum & 0xffffULL);
   }
 
@@ -963,9 +966,12 @@ void XskPacket::rewrite() noexcept
 }
 
 #ifndef __packed
-#define __packed __attribute__((packed))
+#define packed_attribute __attribute__((packed))
+#else
+#define packed_attribute __packed
 #endif
 
+// NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
 [[nodiscard]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept
 {
   struct header
@@ -982,7 +988,8 @@ void XskPacket::rewrite() noexcept
     /* We use a union here to avoid aliasing issues with gcc -O2 */
     union
     {
-      header __packed fields;
+      header packed_attribute fields;
+      // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
       uint32_t words[3];
     };
   };
@@ -1005,6 +1012,7 @@ void XskPacket::rewrite() noexcept
     struct in6_addr src_ip;
     struct in6_addr dst_ip;
     __be32 length;
+    // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
     __uint8_t mbz[3];
     __uint8_t next_header;
   };
@@ -1014,7 +1022,8 @@ void XskPacket::rewrite() noexcept
     /* We use a union here to avoid aliasing issues with gcc -O2 */
     union
     {
-      header __packed fields;
+      header packed_attribute fields;
+      // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
       uint32_t words[10];
     };
   };
@@ -1025,6 +1034,7 @@ void XskPacket::rewrite() noexcept
   pseudo_header.fields.src_ip = *src_ip;
   pseudo_header.fields.dst_ip = *dst_ip;
   pseudo_header.fields.length = htonl(len);
+  // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
   memset(pseudo_header.fields.mbz, 0, sizeof(pseudo_header.fields.mbz));
   pseudo_header.fields.next_header = protocol;
   return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
@@ -1051,19 +1061,19 @@ PacketBuffer XskPacket::cloneHeadertoPacketBuffer() const
 
 int XskWorker::createEventfd()
 {
-  auto fd = ::eventfd(0, EFD_CLOEXEC);
-  if (fd < 0) {
+  auto desc = ::eventfd(0, EFD_CLOEXEC);
+  if (desc < 0) {
     throw runtime_error("Unable create eventfd");
   }
-  return fd;
+  return desc;
 }
 
-void XskWorker::waitForXskSocket() noexcept
+void XskWorker::waitForXskSocket() const noexcept
 {
   uint64_t x = read(workerWaker, &x, sizeof(x));
 }
 
-void XskWorker::notifyXskSocket() noexcept
+void XskWorker::notifyXskSocket() const
 {
   notify(xskSocketWaker);
 }
@@ -1107,8 +1117,8 @@ void XskWorker::notifyWorker() noexcept
 void XskSocket::getMACFromIfName()
 {
   ifreq ifr{};
-  auto fd = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0));
-  if (fd < 0) {
+  auto desc = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0));
+  if (desc < 0) {
     throw std::runtime_error("Error creating a socket to get the MAC address of interface " + ifName);
   }
 
@@ -1117,27 +1127,27 @@ void XskSocket::getMACFromIfName()
   }
 
   strncpy(ifr.ifr_name, ifName.c_str(), ifName.length() + 1);
-  if (ioctl(fd.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) {
+  if (ioctl(desc.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) {
     throw std::runtime_error("Error getting MAC address for interface " + ifName);
   }
   static_assert(sizeof(ifr.ifr_hwaddr.sa_data) >= std::tuple_size<decltype(source)>{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected");
   memcpy(source.data(), ifr.ifr_hwaddr.sa_data, source.size());
 }
 
-[[nodiscard]] int XskSocket::timeDifference(const timespec& t1, const timespec& t2) noexcept
+[[nodiscard]] int XskSocket::timeDifference(const timespec& lhs, const timespec& rhs) noexcept
 {
-  const auto res = t1.tv_sec * 1000 + t1.tv_nsec / 1000000L - (t2.tv_sec * 1000 + t2.tv_nsec / 1000000L);
+  const auto res = lhs.tv_sec * 1000 + lhs.tv_nsec / 1000000L - (rhs.tv_sec * 1000 + rhs.tv_nsec / 1000000L);
   return static_cast<int>(res);
 }
 
-void XskWorker::cleanWorkerNotification() noexcept
+void XskWorker::cleanWorkerNotification() const noexcept
 {
-  uint64_t x = read(xskSocketWaker, &x, sizeof(x));
+  uint64_t value = read(xskSocketWaker, &value, sizeof(value));
 }
 
-void XskWorker::cleanSocketNotification() noexcept
+void XskWorker::cleanSocketNotification() const noexcept
 {
-  uint64_t x = read(workerWaker, &x, sizeof(x));
+  uint64_t value = read(workerWaker, &value, sizeof(value));
 }
 
 std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
@@ -1175,18 +1185,20 @@ std::optional<XskPacket> XskWorker::getEmptyFrame()
   if (!uniqueEmptyFrameOffset.empty()) {
     auto offset = uniqueEmptyFrameOffset.back();
     uniqueEmptyFrameOffset.pop_back();
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     return XskPacket(offset + umemBufBase, 0, frameSize);
   }
   fillUniqueEmptyOffset();
   if (!uniqueEmptyFrameOffset.empty()) {
     auto offset = uniqueEmptyFrameOffset.back();
     uniqueEmptyFrameOffset.pop_back();
+    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
     return XskPacket(offset + umemBufBase, 0, frameSize);
   }
   return std::nullopt;
 }
 
-void XskWorker::markAsFree(XskPacket&& packet)
+void XskWorker::markAsFree(const XskPacket& packet)
 {
   auto offset = frameOffset(packet);
 #ifdef DEBUG_UMEM
@@ -1202,10 +1214,10 @@ uint32_t XskPacket::getFlags() const noexcept
 
 void XskPacket::updatePacket() noexcept
 {
-  if (!(flags & UPDATE)) {
+  if ((flags & UPDATE) == 0U) {
     return;
   }
-  if (!(flags & REWRITE)) {
+  if ((flags & REWRITE) == 0U) {
     changeDirectAndUpdateChecksum();
   }
 }
index 702855a4739f32e8b38829a012a6e5102f1db339..a2cc6f036b71c79614564cc066e41db3af3ccaba 100644 (file)
@@ -54,7 +54,7 @@ class XskPacket;
 class XskWorker;
 class XskSocket;
 
-using MACAddr = std::array<uint8_t,6>;
+using MACAddr = std::array<uint8_t, 6>;
 
 #ifdef HAVE_XSK
 using XskPacketPtr = std::unique_ptr<XskPacket>;
@@ -76,7 +76,7 @@ class XskSocket
     xsk_umem* umem{nullptr};
     uint8_t* bufBase{nullptr};
     size_t size{0};
-    void umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config);
+    void umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config);
     ~XskUmem();
     XskUmem() = default;
   };
@@ -94,7 +94,7 @@ class XskSocket
   const size_t frameNum;
   // responses that have been delayed
   std::priority_queue<XskPacket> waitForDelay;
-  MACAddr source;
+  MACAddr source{};
   const std::string ifName;
   // AF_XDP socket then worker waker sockets
   vector<pollfd> fds;
@@ -104,13 +104,13 @@ class XskSocket
   // simply recycled from cq after being processed by the kernel
   vector<uint64_t> uniqueEmptyFrameOffset;
   // completion ring: queue where sent packets are stored by the kernel
-  xsk_ring_cons cq;
+  xsk_ring_cons cq{};
   // rx ring: queue where the incoming packets are stored, read by XskRouter
-  xsk_ring_cons rx;
+  xsk_ring_cons rx{};
   // fill ring: queue where umem entries available to be filled (put into rx) are stored
-  xsk_ring_prod fq;
+  xsk_ring_prod fq{};
   // tx ring: queue where outgoing packets are stored
-  xsk_ring_prod tx;
+  xsk_ring_prod tx{};
   std::unique_ptr<xsk_socket, void (*)(xsk_socket*)> socket;
   XskUmem umem;
 
@@ -127,16 +127,16 @@ class XskSocket
   void getMACFromIfName();
 
 public:
-  static void clearDestinationAddresses();
-  static void addDestinationAddress(const ComboAddress& destination);
-  static void removeDestinationAddress(const ComboAddress& destination);
+  static void clearDestinationMap(const std::string& mapPath, bool isV6);
+  static void addDestinationAddress(const std::string& mapPath, const ComboAddress& destination);
+  static void removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination);
   static constexpr size_t getFrameSize()
   {
     return frameSize;
   }
   // list of free umem entries that can be reused
   std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
-  XskSocket(size_t frameNum, const std::string& ifName, uint32_t queue_id, const std::string& xskMapPath);
+  XskSocket(size_t frameNum, std::string ifName, uint32_t queue_id, const std::string& xskMapPath);
   [[nodiscard]] int xskFd() const noexcept;
   // wait until one event has occurred
   [[nodiscard]] int wait(int timeout);
@@ -144,11 +144,11 @@ public:
   void send(std::vector<XskPacket>& packets);
   // look at incoming packets in rx, return them if parsing succeeeded
   [[nodiscard]] std::vector<XskPacket> recv(uint32_t recvSizeMax, uint32_t* failedCount);
-  void addWorker(std::shared_ptr<XskWorker> s);
+  void addWorker(std::shared_ptr<XskWorker> worker);
   void addWorkerRoute(const std::shared_ptr<XskWorker>& worker, const ComboAddress& dest);
   void removeWorkerRoute(const ComboAddress& dest);
   [[nodiscard]] std::string getMetrics() const;
-  void markAsFree(XskPacket&& packet);
+  void markAsFree(const XskPacket& packet);
   [[nodiscard]] const std::shared_ptr<XskWorker>& getWorkerByDescriptor(int desc) const
   {
     return d_workers.at(desc);
@@ -181,9 +181,9 @@ public:
   void recycle(size_t size) noexcept;
   // look at delayed packets, and send the ones that are ready
   void pickUpReadyPacket(std::vector<XskPacket>& packets);
-  void pushDelayed(XskPacket&& packet)
+  void pushDelayed(XskPacket& packet)
   {
-    waitForDelay.push(std::move(packet));
+    waitForDelay.push(packet);
   }
 };
 
@@ -203,7 +203,7 @@ public:
 private:
   ComboAddress from;
   ComboAddress to;
-  timespec sendTime;
+  timespec sendTime{};
   uint8_t* frame{nullptr};
   size_t frameLength{0};
   size_t frameSize{0};
@@ -312,16 +312,16 @@ public:
   static int createEventfd();
   static void notify(int fd);
   static std::shared_ptr<XskWorker> create();
-  void pushToProcessingQueue(XskPacket&& packet);
-  void pushToSendQueue(XskPacket&& packet);
-  void markAsFree(XskPacket&& packet);
+  void pushToProcessingQueue(XskPacket& packet);
+  void pushToSendQueue(XskPacket& packet);
+  void markAsFree(const XskPacket& packet);
   // notify worker that at least one packet is available for processing
   void notifyWorker() noexcept;
   // notify the router that packets are ready to be sent
-  void notifyXskSocket() noexcept;
-  void waitForXskSocket() noexcept;
-  void cleanWorkerNotification() noexcept;
-  void cleanSocketNotification() noexcept;
+  void notifyXskSocket() const;
+  void waitForXskSocket() const noexcept;
+  void cleanWorkerNotification() const noexcept;
+  void cleanSocketNotification() const noexcept;
   [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
   // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
   void fillUniqueEmptyOffset();