]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Cleanup of the XSK code, fixing alignment issues
authorRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 28 Dec 2023 10:53:49 +0000 (11:53 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 23 Jan 2024 11:54:19 +0000 (12:54 +0100)
Also add UMEM checks for debugging.

contrib/xdp.py
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/xsk.cc
pdns/xsk.hh

index bd96ddb1ce58756fb552551a5bfa1c458ec012fe..1b9187007ff22db4ea1c4efed5eec154eae6066c 100644 (file)
@@ -14,7 +14,7 @@ DROP_ACTION = 1
 TC_ACTION = 2
 
 # The interface on wich the filter will be attached 
-DEV = "eth0"
+DEV = "eth1"
 
 # The list of blocked IPv4, IPv6 and QNames
 # IP format : (IPAddress, Action)
index 3338ceea3120ed64d37d06fc3bb4ee54d1ad64e2..c4ee51962812c14d6a753f10a8ac8bc766076140 100644 (file)
@@ -783,9 +783,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       if (socket) {
         udpCS->xskInfo = XskWorker::create();
         udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
-        socket->addWorker(udpCS->xskInfo, loc, false);
-        // tcpCS->xskInfo=XskWorker::create();
-        // TODO: socket->addWorker(tcpCS->xskInfo, loc, true);
+        socket->addWorker(udpCS->xskInfo, loc);
       }
 #endif /* HAVE_XSK */
       g_frontends.push_back(std::move(udpCS));
@@ -837,9 +835,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       if (socket) {
         udpCS->xskInfo = XskWorker::create();
         udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
-        socket->addWorker(udpCS->xskInfo, loc, false);
-        // TODO tcpCS->xskInfo=XskWorker::create();
-        // TODO socket->addWorker(tcpCS->xskInfo, loc, true);
+        socket->addWorker(udpCS->xskInfo, loc);
       }
 #endif /* HAVE_XSK */
       g_frontends.push_back(std::move(udpCS));
index ae1d00aa1ff6bd1428b2010beaf5ec5aa006380a..f3e84081436400d5935f2cfbbd431cd4e4b6d399 100644 (file)
@@ -761,7 +761,12 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re
       vinfolog("Got answer from %s, relayed to %s (UDP), took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
     }
     else {
-      vinfolog("Got answer from %s, NOT relayed to %s (UDP) since that frontend is muted, took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
+      if (!ids.xskPacketHeader) {
+        vinfolog("Got answer from %s, NOT relayed to %s (UDP) since that frontend is muted, took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
+      }
+      else {
+        vinfolog("Got answer from %s, relayed to %s (UDP via XSK), took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
+      }
     }
 
     handleResponseSent(ids, udiff, dr.ids.origRemote, ds->d_config.remote, response.size(), cleartextDH, ds->getProtocol(), true);
@@ -779,8 +784,10 @@ static void XskHealthCheck(std::shared_ptr<DownstreamState>& dss, std::unordered
   auto packet = getHealthCheckPacket(dss, nullptr, data);
   data->d_initial = initial;
   setHealthCheckTime(dss, data);
-  auto* frame = xskInfo->getEmptyframe();
-  auto xskPacket = std::make_unique<XskPacket>(frame, 0, xskInfo->frameSize);
+  auto xskPacket = xskInfo->getEmptyFrame();
+  if (!xskPacket) {
+    return;
+  }
   xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
   xskPacket->setPayload(packet);
   xskPacket->rewrite();
@@ -853,14 +860,18 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
         bool needNotify = false;
         if (pollfds[0].revents & POLLIN) {
           needNotify = true;
-          xskInfo->cq.consume_all([&](XskPacket* packetRaw) {
+#if defined(__SANITIZE_THREAD__)
+          xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+          xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
             auto packet = XskPacketPtr(packetRaw);
-            if (packet->dataLen() < sizeof(dnsheader)) {
-              xskInfo->pushToSendQueue(std::move(packet));
+            if (packet->getDataLen() < sizeof(dnsheader)) {
+              xskInfo->markAsFree(std::move(packet));
               return;
             }
-            const auto* dh = reinterpret_cast<const struct dnsheader*>(packet->payloadData());
-            const auto queryId = dh->id;
+            const dnsheader_aligned dnsHeader(packet->getPayloadData());
+            const auto queryId = dnsHeader->id;
             auto ids = dss->getState(queryId);
             if (ids) {
               if (xskFd != ids->backendFD || !ids->xskPacketHeader) {
@@ -877,30 +888,40 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
                 packet->cloneIntoPacketBuffer(data->d_buffer);
                 data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
               }
-              xskInfo->pushToSendQueue(std::move(packet));
+              xskInfo->markAsFree(std::move(packet));
               return;
             }
             auto response = packet->clonePacketBuffer();
-            if (response.size() > packet->capacity()) {
+            if (response.size() > packet->getCapacity()) {
               /* fallback to sending the packet via normal socket */
               ids->xskPacketHeader.reset();
             }
             if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
-              xskInfo->pushToSendQueue(std::move(packet));
+              xskInfo->markAsFree(std::move(packet));
+              vinfolog("XSK packet pushed to queue because processResponderPacket failed");
               return;
             }
-            if (response.size() > packet->capacity()) {
+            vinfolog("XSK packet - processResponderPacket OK");
+            if (response.size() > packet->getCapacity()) {
               /* fallback to sending the packet via normal socket */
               sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
-              xskInfo->pushToSendQueue(std::move(packet));
+              vinfolog("XSK packet falling back because packet is too large");
+              xskInfo->markAsFree(std::move(packet));
               return;
             }
+            //vinfolog("XSK packet - set header");
             packet->setHeader(*ids->xskPacketHeader);
-            packet->setPayload(response);
+            //vinfolog("XSK packet - set payload");
+            if (!packet->setPayload(response)) {
+              vinfolog("Unable to set payload !");
+            }
             if (ids->delayMsec > 0) {
+              vinfolog("XSK packet - adding delay");
               packet->addDelay(ids->delayMsec);
             }
+            //vinfolog("XSK packet - update packet");
             packet->updatePacket();
+            //vinfolog("XSK packet pushed to send queue");
             xskInfo->pushToSendQueue(std::move(packet));
           });
           xskInfo->cleanSocketNotification();
@@ -1001,12 +1022,19 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
 
         if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->xskPacketHeader && ids->cs->xskInfo) {
 #ifdef HAVE_XSK
+          //vinfolog("processResponderPacket OK");
           auto& xskInfo = ids->cs->xskInfo;
-          auto* frame = xskInfo->getEmptyframe();
-          auto xskPacket = std::make_unique<XskPacket>(frame, 0, xskInfo->frameSize);
+          auto xskPacket = xskInfo->getEmptyFrame();
+          if (!xskPacket) {
+            continue;
+          }
+          //vinfolog("XSK setHeader");
           xskPacket->setHeader(*ids->xskPacketHeader);
+          //vinfolog("XSK payload");
           xskPacket->setPayload(response);
+          //vinfolog("XSK update packet");
           xskPacket->updatePacket();
+          //vinfolog("XSK pushed to send queue");
           xskInfo->pushToSendQueue(std::move(xskPacket));
           xskInfo->notifyXskSocket();
 #endif /* HAVE_XSK */
@@ -2026,7 +2054,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct
 }
 
 #ifdef HAVE_XSK
-static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
+static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
 {
   uint16_t queryId = 0;
   const auto& remote = packet.getFromAddr();
@@ -2043,13 +2071,13 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
   try {
     bool expectProxyProtocol = false;
     if (!isXskQueryAcceptable(packet, cs, holders, expectProxyProtocol)) {
-      return;
+      return false;
     }
 
     auto query = packet.clonePacketBuffer();
     std::vector<ProxyProtocolValue> proxyProtocolValues;
     if (expectProxyProtocol && !handleProxyProtocol(remote, false, *holders.acl, query, ids.origRemote, ids.origDest, proxyProtocolValues)) {
-      return;
+      return false;
     }
 
     ids.queryRealTime.start();
@@ -2057,7 +2085,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
     auto dnsCryptResponse = checkDNSCryptQuery(cs, query, ids.dnsCryptQuery, ids.queryRealTime.d_start.tv_sec, false);
     if (dnsCryptResponse) {
       packet.setPayload(query);
-      return;
+      return true;
     }
 
     {
@@ -2066,7 +2094,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
       queryId = ntohs(dnsHeader->id);
 
       if (!checkQueryHeaders(dnsHeader.get(), cs)) {
-        return;
+        return false;
       }
 
       if (dnsHeader->qdcount == 0) {
@@ -2076,7 +2104,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
           return true;
         });
         packet.setPayload(query);
-        return;
+        return true;
       }
     }
 
@@ -2095,7 +2123,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
     auto result = processQuery(dq, holders, ss);
 
     if (result == ProcessQueryResult::Drop) {
-      return;
+      return false;
     }
 
     if (result == ProcessQueryResult::SendAnswer) {
@@ -2103,11 +2131,11 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
       if (dq.ids.delayMsec > 0) {
         packet.addDelay(dq.ids.delayMsec);
       }
-      return;
+      return true;
     }
 
     if (result != ProcessQueryResult::PassToBackend || ss == nullptr) {
-      return;
+      return false;
     }
 
     // the buffer might have been invalidated by now (resized)
@@ -2125,11 +2153,12 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
       cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
 
       ss->passCrossProtocolQuery(std::move(cpq));
-      return;
+      return false;
     }
 
     if (!ss->xskInfo) {
       assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true);
+      return false;
     }
     else {
       int fd = ss->xskInfo->workerWaker;
@@ -2138,11 +2167,13 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p
       packet.setAddr(ss->d_config.sourceAddr,ss->d_config.sourceMACAddr, ss->d_config.remote,ss->d_config.destMACAddr);
       packet.setPayload(query);
       packet.rewrite();
+      return true;
     }
   }
   catch (const std::exception& e) {
     vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
   }
+  return false;
 }
 #endif /* HAVE_XSK */
 
@@ -2246,14 +2277,26 @@ static void xskClientThread(ClientState* cs)
   LocalHolders holders;
 
   for (;;) {
-    while (!xskInfo->cq.read_available()) {
+#if defined(__SANITIZE_THREAD__)
+    while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
+#else
+    while (!xskInfo->incomingPacketsQueue.read_available()) {
+#endif
       xskInfo->waitForXskSocket();
     }
-    xskInfo->cq.consume_all([&](XskPacket* packetRaw) {
+#if defined(__SANITIZE_THREAD__)
+    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
       auto packet = XskPacketPtr(packetRaw);
-      ProcessXskQuery(*cs, holders, *packet);
-      packet->updatePacket();
-      xskInfo->pushToSendQueue(std::move(packet));
+      if (ProcessXskQuery(*cs, holders, *packet)) {
+        packet->updatePacket();
+        xskInfo->pushToSendQueue(std::move(packet));
+      }
+      else {
+        xskInfo->markAsFree(std::move(packet));
+      }
     });
     xskInfo->notifyXskSocket();
   }
@@ -3642,59 +3685,67 @@ void XskRouter(std::shared_ptr<XskSocket> xsk)
   const auto& xskWakerIdx = xsk->workers.get<0>();
   const auto& destIdx = xsk->workers.get<1>();
   while (true) {
-    auto ready = xsk->wait(-1);
-    // descriptor 0 gets incoming AF_XDP packets
-    if (xsk->fds[0].revents & POLLIN) {
-      auto packets = xsk->recv(64, &failed);
-      dnsdist::metrics::g_stats.nonCompliantQueries += failed;
-      for (auto &packet : packets) {
-        const auto dest = packet->getToAddr();
-        auto res = destIdx.find(dest);
-        if (res == destIdx.end()) {
-          xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet));
-          continue;
+    try {
+      auto ready = xsk->wait(-1);
+      // descriptor 0 gets incoming AF_XDP packets
+      if (xsk->fds[0].revents & POLLIN) {
+        auto packets = xsk->recv(64, &failed);
+        dnsdist::metrics::g_stats.nonCompliantQueries += failed;
+        for (auto &packet : packets) {
+          const auto dest = packet->getToAddr();
+          auto res = destIdx.find(dest);
+          if (res == destIdx.end()) {
+            xsk->markAsFree(std::move(packet));
+            continue;
+          }
+          res->worker->pushToProcessingQueue(std::move(packet));
+          needNotify.insert(res->workerWaker);
         }
-        res->worker->pushToProcessingQueue(std::move(packet));
-        needNotify.insert(res->workerWaker);
-      }
-      for (auto i : needNotify) {
-        uint64_t x = 1;
-        auto written = write(i, &x, sizeof(x));
-        if (written != sizeof(x)) {
-          // oh, well, the worker is clearly overloaded
-          // but there is nothing we can do about it,
-          // and hopefully the queue will be processed eventually
+        for (auto i : needNotify) {
+          uint64_t x = 1;
+          auto written = write(i, &x, sizeof(x));
+          if (written != sizeof(x)) {
+            // oh, well, the worker is clearly overloaded
+            // but there is nothing we can do about it,
+            // and hopefully the queue will be processed eventually
+          }
         }
-      }
-      needNotify.clear();
-      ready--;
-    }
-    const auto backup = ready;
-    for (size_t i = 1; i < size && ready > 0; i++) {
-      if (xsk->fds[i].revents & POLLIN) {
+        needNotify.clear();
         ready--;
-        auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker;
-        info->sq.consume_all([&](XskPacket* packetRaw) {
-          auto packet = XskPacketPtr(packetRaw);
-          if (!(packet->getFlags() & XskPacket::UPDATE)) {
-            xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet));
-            packet.release();
-            return;
-          }
-          if (packet->getFlags() & XskPacket::DELAY) {
-            xsk->waitForDelay.push(std::move(packet));
-            return;
-          }
-          fillInTx.push_back(std::move(packet));
-        });
-        info->cleanWorkerNotification();
       }
+      const auto backup = ready;
+      for (size_t i = 1; i < size && ready > 0; i++) {
+        if (xsk->fds[i].revents & POLLIN) {
+          ready--;
+          auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker;
+#if defined(__SANITIZE_THREAD__)
+          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+          info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
+            auto packet = XskPacketPtr(packetRaw);
+            if (!(packet->getFlags() & XskPacket::UPDATE)) {
+              xsk->markAsFree(std::move(packet));
+              return;
+            }
+            if (packet->getFlags() & XskPacket::DELAY) {
+              xsk->waitForDelay.push(std::move(packet));
+              return;
+            }
+            fillInTx.push_back(std::move(packet));
+          });
+          info->cleanWorkerNotification();
+        }
+      }
+      xsk->pickUpReadyPacket(fillInTx);
+      xsk->recycle(4096);
+      xsk->fillFq();
+      xsk->send(fillInTx);
+      ready = backup;
+    }
+    catch (...) {
+      vinfolog("Exception in XSK router loop");
     }
-    xsk->pickUpReadyPacket(fillInTx);
-    xsk->recycle(64);
-    xsk->fillFq();
-    xsk->send(fillInTx);
-    ready = backup;
   }
 }
 #endif /* HAVE_XSK */
index 9b5d33e38c5958b01dcc1f094a5e21317a93d3da..0081c492a625066c872c96aac3adafbe3a484211 100644 (file)
@@ -985,7 +985,7 @@ public:
     if (d_config.sourceAddr.sin4.sin_family == 0) {
       throw runtime_error("invalid source addr");
     }
-    xsk->addWorker(xskInfo, d_config.sourceAddr, getProtocol() != dnsdist::Protocol::DoUDP);
+    xsk->addWorker(xskInfo, d_config.sourceAddr);
     d_config.sourceMACAddr = xsk->source;
     xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
   }
index f1fa45fbf18fe8621457805161244d7911eff201..2f438f9fd9c16d24d87348c87b0eae3c75426905 100644 (file)
 
 #ifdef HAVE_XSK
 
-#include "gettime.hh"
-#include "xsk.hh"
-
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
 #include <fcntl.h>
 #include <iterator>
 #include <linux/bpf.h>
-#include <linux/if_ether.h>
 #include <linux/if_link.h>
 #include <linux/if_xdp.h>
 #include <linux/ip.h>
 #include <linux/ipv6.h>
 #include <linux/tcp.h>
-#include <linux/udp.h>
 #include <net/if.h>
 #include <net/if_arp.h>
 #include <netinet/in.h>
@@ -60,6 +55,36 @@ extern "C"
 #include <xdp/libxdp.h>
 }
 
+#include "gettime.hh"
+#include "xsk.hh"
+
+#define DEBUG_UMEM 0
+#ifdef DEBUG_UMEM
+namespace {
+struct UmemEntryStatus
+{
+  enum class Status: uint8_t { Free, FillQueue, Received, TXQueue };
+  Status status{Status::Free};
+};
+
+LockGuarded<std::unordered_map<uint64_t, UmemEntryStatus>> s_umems;
+
+void checkUmemIntegrity(const char* function, int line, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus)
+{
+  auto umems = s_umems.lock();
+  if (validStatuses.count(umems->at(offset).status) == 0) {
+    std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status is " << static_cast<int>(umems->at(offset).status) << ", expected: ";
+    for (const auto status : validStatuses) {
+      std::cerr << static_cast<int>(status) << " ";
+    }
+    std::cerr << std::endl;
+    abort();
+  }
+  (*umems)[offset].status = newStatus;
+}
+}
+#endif /* DEBUG_UMEM */
+
 constexpr bool XskSocket::isPowOfTwo(uint32_t value) noexcept
 {
   return value != 0 && (value & (value - 1)) == 0;
@@ -79,6 +104,7 @@ int XskSocket::firstTimeout()
   }
   return res;
 }
+
 XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_) :
   frameNum(frameNum_), queueId(queue_id), ifName(ifName_), poolName(poolName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
 {
@@ -92,6 +118,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu
   memset(&fq, 0, sizeof(fq));
   memset(&tx, 0, sizeof(tx));
   memset(&rx, 0, sizeof(rx));
+
   xsk_umem_config umemCfg;
   umemCfg.fill_size = fqCapacity;
   umemCfg.comp_size = cqCapacity;
@@ -99,6 +126,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu
   umemCfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
   umemCfg.flags = 0;
   umem.umemInit(frameNum_ * frameSize, &cq, &fq, &umemCfg);
+
   {
     xsk_socket_config socketCfg;
     socketCfg.rx_size = rxCapacity;
@@ -109,106 +137,165 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu
     xsk_socket* tmp = nullptr;
     auto ret = xsk_socket__create(&tmp, ifName.c_str(), queue_id, umem.umem, &rx, &tx, &socketCfg);
     if (ret != 0) {
-      throw std::runtime_error("Error creating a xsk socket of if_name" + ifName + stringerror(ret));
+      throw std::runtime_error("Error creating a xsk socket of if_name " + ifName + ": " + stringerror(ret));
     }
-    socket = std::unique_ptr<xsk_socket, void (*)(xsk_socket*)>(tmp, xsk_socket__delete);
+    socket = std::unique_ptr<xsk_socket, decltype(&xsk_socket__delete)>(tmp, xsk_socket__delete);
   }
-  for (uint64_t i = 0; i < frameNum; i++) {
-    uniqueEmptyFrameOffset.push_back(i * frameSize + XDP_PACKET_HEADROOM);
+
+  uniqueEmptyFrameOffset.reserve(frameNum);
+  {
+    for (uint64_t i = 0; i < frameNum; i++) {
+      //uniqueEmptyFrameOffset.push_back(i * frameSize);
+      uniqueEmptyFrameOffset.push_back(i * frameSize + XDP_PACKET_HEADROOM);
+#ifdef DEBUG_UMEM
+      {
+        auto umems = s_umems.lock();
+        (*umems)[i * frameSize + XDP_PACKET_HEADROOM] = UmemEntryStatus();
+      }
+#endif /* DEBUG_UMEM */
+    }
   }
+
   fillFq(fqCapacity);
+
   const auto xskfd = xskFd();
   fds.push_back(pollfd{
     .fd = xskfd,
     .events = POLLIN,
     .revents = 0});
+
   const auto xskMapFd = FDWrapper(bpf_obj_get(xskMapPath.c_str()));
+
   if (xskMapFd.getHandle() < 0) {
     throw std::runtime_error("Error getting BPF map from path '" + xskMapPath + "'");
   }
+
   auto ret = bpf_map_update_elem(xskMapFd.getHandle(), &queue_id, &xskfd, 0);
   if (ret) {
     throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath + "': " + std::to_string(ret));
   }
 }
+
 void XskSocket::fillFq(uint32_t fillSize) noexcept
 {
   {
+#warning why are we collecting frames from unique into shared here, even though we need unique ones?
     auto frames = sharedEmptyFrameOffset->lock();
     if (frames->size() < holdThreshold) {
       const auto moveSize = std::min(holdThreshold - frames->size(), uniqueEmptyFrameOffset.size());
       if (moveSize > 0) {
         frames->insert(frames->end(), std::make_move_iterator(uniqueEmptyFrameOffset.end() - moveSize), std::make_move_iterator(uniqueEmptyFrameOffset.end()));
+        uniqueEmptyFrameOffset.resize(uniqueEmptyFrameOffset.size() - moveSize);
       }
     }
   }
+
   if (uniqueEmptyFrameOffset.size() < fillSize) {
     return;
   }
-  uint32_t idx;
-  if (xsk_ring_prod__reserve(&fq, fillSize, &idx) != fillSize) {
+
+  uint32_t idx{0};
+  auto toFill = xsk_ring_prod__reserve(&fq, fillSize, &idx);
+  if (toFill == 0) {
     return;
   }
   uint32_t processed = 0;
-  for (; processed < fillSize; processed++) {
+  for (; processed < toFill; processed++) {
     *xsk_ring_prod__fill_addr(&fq, idx++) = uniqueEmptyFrameOffset.back();
+#ifdef DEBUG_UMEM
+    checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue);
+#endif /* DEBUG_UMEM */
     uniqueEmptyFrameOffset.pop_back();
   }
+
   xsk_ring_prod__submit(&fq, processed);
 }
+
 int XskSocket::wait(int timeout)
 {
-  return poll(fds.data(), fds.size(), static_cast<int>(std::min(static_cast<uint32_t>(timeout), static_cast<uint32_t>(firstTimeout()))));
+  auto waitAtMost = std::min(timeout, firstTimeout());
+  return poll(fds.data(), fds.size(), waitAtMost);
 }
+
 [[nodiscard]] uint64_t XskSocket::frameOffset(const XskPacket& packet) const noexcept
 {
-  return reinterpret_cast<uint64_t>(packet.frame) - reinterpret_cast<uint64_t>(umem.bufBase);
+  return packet.frame - umem.bufBase;
 }
 
-int XskSocket::xskFd() const noexcept { return xsk_socket__fd(socket.get()); }
+[[nodiscard]] int XskSocket::xskFd() const noexcept {
+  return xsk_socket__fd(socket.get());
+}
 
 void XskSocket::send(std::vector<XskPacketPtr>& packets)
 {
-  const auto packetSize = packets.size();
-  if (packetSize == 0) {
-    return;
-  }
-  uint32_t idx{0};
-  if (xsk_ring_prod__reserve(&tx, packetSize, &idx) != packets.size()) {
-    return;
-  }
+  while (packets.size() > 0) {
+    auto packetSize = packets.size();
+    if (packetSize > std::numeric_limits<uint32_t>::max()) {
+      packetSize = std::numeric_limits<uint32_t>::max();
+    }
+    size_t toSend = std::min(static_cast<uint32_t>(packetSize), txCapacity);
+    uint32_t idx{0};
+    auto toFill = xsk_ring_prod__reserve(&tx, toSend, &idx);
+    if (toFill == 0) {
+      return;
+    }
 
-  for (const auto& packet : packets) {
-    *xsk_ring_prod__tx_desc(&tx, idx++) = {
-      .addr = frameOffset(*packet),
-      .len = packet->FrameLen(),
-      .options = 0};
+    size_t queued = 0;
+    for (const auto& packet : packets) {
+      if (queued == toFill) {
+        break;
+      }
+      *xsk_ring_prod__tx_desc(&tx, idx++) = {
+        .addr = frameOffset(*packet),
+        .len = packet->getFrameLen(),
+        .options = 0};
+#ifdef DEBUG_UMEM
+      checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue);
+#endif /* DEBUG_UMEM */
+      queued++;
+    }
+    xsk_ring_prod__submit(&tx, toFill);
+    packets.erase(packets.begin(), packets.begin() + toFill);
   }
-  xsk_ring_prod__submit(&tx, packetSize);
-  packets.clear();
 }
+
 std::vector<XskPacketPtr> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount)
 {
-  uint32_t idx;
+  uint32_t idx{0};
   std::vector<XskPacketPtr> res;
   // how many descriptors to packets have been filled
   const auto recvSize = xsk_ring_cons__peek(&rx, recvSizeMax, &idx);
-  if (recvSize <= 0) {
+  if (recvSize == 0) {
     return res;
   }
 
   const auto baseAddr = reinterpret_cast<uint64_t>(umem.bufBase);
   uint32_t failed = 0;
   uint32_t processed = 0;
+  res.reserve(recvSize);
   for (; processed < recvSize; processed++) {
-    const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++);
-    auto ptr = std::make_unique<XskPacket>(reinterpret_cast<void*>(desc->addr + baseAddr), desc->len, frameSize);
-    if (!ptr->parse()) {
-      ++failed;
-      uniqueEmptyFrameOffset.push_back(frameOffset(*ptr));
+    try {
+      const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++);
+      auto ptr = std::make_unique<XskPacket>(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
+#ifdef DEBUG_UMEM
+      checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*ptr), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
+#endif /* DEBUG_UMEM */
+
+      if (!ptr->parse(false)) {
+        ++failed;
+        markAsFree(std::move(ptr));
+      }
+      else {
+        res.push_back(std::move(ptr));
+      }
+    }
+    catch (const std::exception& exp) {
+      std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl;
+      break;
     }
-    else {
-      res.push_back(std::move(ptr));
+    catch (...) {
+      std::cerr << "Exception while processing the XSK RX queue" << std::endl;
+      break;
     }
   }
 
@@ -222,6 +309,7 @@ std::vector<XskPacketPtr> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failed
 
   return res;
 }
+
 void XskSocket::pickUpReadyPacket(std::vector<XskPacketPtr>& packets)
 {
   timespec now;
@@ -232,17 +320,23 @@ void XskSocket::pickUpReadyPacket(std::vector<XskPacketPtr>& packets)
     waitForDelay.pop();
   }
 }
+
 void XskSocket::recycle(size_t size) noexcept
 {
-  uint32_t idx;
+  uint32_t idx{0};
   const auto completeSize = xsk_ring_cons__peek(&cq, size, &idx);
-  if (completeSize <= 0) {
+  if (completeSize == 0) {
     return;
   }
-  for (uint32_t processed = 0; processed < completeSize; ++processed) {
+  uniqueEmptyFrameOffset.reserve(uniqueEmptyFrameOffset.size() + completeSize);
+  uint32_t processed = 0;
+  for (; processed < completeSize; ++processed) {
     uniqueEmptyFrameOffset.push_back(*xsk_ring_cons__comp_addr(&cq, idx++));
+#ifdef DEBUG_UMEM
+    checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
+#endif /* DEBUG_UMEM */
   }
-  xsk_ring_cons__release(&cq, completeSize);
+  xsk_ring_cons__release(&cq, processed);
 }
 
 void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config)
@@ -255,7 +349,7 @@ void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue
   auto ret = xsk_umem__create(&umem, bufBase, size, fillQueue, completionQueue, config);
   if (ret != 0) {
     munmap(bufBase, size);
-    throw std::runtime_error("Error creating a umem of size" + std::to_string(size) + stringerror(ret));
+    throw std::runtime_error("Error creating a umem of size " + std::to_string(size) + ": " + stringerror(ret));
   }
 }
 
@@ -277,10 +371,21 @@ std::string XskSocket::getMetrics() const
   ret << "TX invalid descs: " << std::to_string(stats.tx_invalid_descs) << std::endl;
   ret << "RX ring full: " << std::to_string(stats.rx_ring_full) << std::endl;
   ret << "RX fill ring empty descs: " << std::to_string(stats.rx_fill_ring_empty_descs) << std::endl;
-  ret << "RX ring empty descs: " << std::to_string(stats.tx_ring_empty_descs) << std::endl;
+  ret << "TX ring empty descs: " << std::to_string(stats.tx_ring_empty_descs) << std::endl;
   return ret.str();
 }
 
+void XskSocket::markAsFree(XskPacketPtr&& packet)
+{
+  auto offset = frameOffset(*packet);
+#ifdef DEBUG_UMEM
+  checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
+#endif /* DEBUG_UMEM */
+
+  uniqueEmptyFrameOffset.push_back(offset);
+  packet.release();
+}
+
 XskSocket::XskUmem::~XskUmem()
 {
   if (umem) {
@@ -291,198 +396,276 @@ XskSocket::XskUmem::~XskUmem()
   }
 }
 
-bool XskPacket::parse()
+[[nodiscard]] size_t XskPacket::getL4HeaderOffset() const noexcept
+{
+  return sizeof(ethhdr) + (v6 ? (sizeof(ipv6hdr)) : sizeof(iphdr));
+}
+
+[[nodiscard]] size_t XskPacket::getDataOffset() const noexcept
+{
+  return getL4HeaderOffset() + sizeof(udphdr);
+}
+
+[[nodiscard]] size_t XskPacket::getDataSize() const noexcept
+{
+  return frameLength - getDataOffset();
+}
+
+[[nodiscard]] ethhdr XskPacket::getEthernetHeader() const noexcept
+{
+  ethhdr ethHeader{};
+  assert(frameLength >= sizeof(ethHeader));
+  memcpy(&ethHeader, frame, sizeof(ethHeader));
+  return ethHeader;
+}
+
+void XskPacket::setEthernetHeader(const ethhdr& ethHeader) noexcept
 {
-  // payloadEnd must bigger than payload + sizeof(ethhdr) + sizoef(iphdr) + sizeof(udphdr)
-  auto* eth = reinterpret_cast<ethhdr*>(frame);
-  uint8_t l4Protocol;
-  if (eth->h_proto == htons(ETH_P_IP)) {
-    auto* ip = reinterpret_cast<iphdr*>(eth + 1);
-    if (ip->ihl != static_cast<uint8_t>(sizeof(iphdr) >> 2)) {
-      // ip->ihl*4 != sizeof(iphdr)
+  assert(frameLength >= sizeof(ethHeader));
+  memcpy(frame, &ethHeader, sizeof(ethHeader));
+}
+
+[[nodiscard]] iphdr XskPacket::getIPv4Header() const noexcept
+{
+  iphdr ipv4Header{};
+  assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv4Header)));
+  assert(!v6);
+  memcpy(&ipv4Header, frame + sizeof(ethhdr), sizeof(ipv4Header));
+  return ipv4Header;
+}
+
+void XskPacket::setIPv4Header(const iphdr& ipv4Header) noexcept
+{
+  assert(frameLength >= (sizeof(ethhdr) + sizeof(iphdr)));
+  assert(!v6);
+  memcpy(frame + sizeof(ethhdr), &ipv4Header, sizeof(ipv4Header));
+}
+
+[[nodiscard]] ipv6hdr XskPacket::getIPv6Header() const noexcept
+{
+  ipv6hdr ipv6Header{};
+  assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
+  assert(v6);
+  memcpy(&ipv6Header, frame + sizeof(ethhdr), sizeof(ipv6Header));
+  return ipv6Header;
+}
+
+void XskPacket::setIPv6Header(const ipv6hdr& ipv6Header) noexcept
+{
+  assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
+  assert(v6);
+  memcpy(frame + sizeof(ethhdr), &ipv6Header, sizeof(ipv6Header));
+}
+
+[[nodiscard]] udphdr XskPacket::getUDPHeader() const noexcept
+{
+  udphdr udpHeader{};
+  assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader)));
+  memcpy(&udpHeader, frame + getL4HeaderOffset(), sizeof(udpHeader));
+  return udpHeader;
+}
+
+void XskPacket::setUDPHeader(const udphdr& udpHeader) noexcept
+{
+  assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader)));
+  memcpy(frame + getL4HeaderOffset(), &udpHeader, sizeof(udpHeader));
+}
+
+bool XskPacket::parse(bool fromSetHeader)
+{
+  if (frameLength <= sizeof(ethhdr)) {
+    return false;
+  }
+
+  auto ethHeader = getEthernetHeader();
+  uint8_t l4Protocol{0};
+  if (ethHeader.h_proto == htons(ETH_P_IP)) {
+    if (frameLength < (sizeof(ethhdr) + sizeof(iphdr) + sizeof(udphdr))) {
+      return false;
+    }
+    v6 = false;
+    auto ipHeader = getIPv4Header();
+    if (ipHeader.ihl != (static_cast<uint8_t>(sizeof(iphdr) / 4))) {
       // ip options is not supported now!
       return false;
     }
     // check ip.check == ipv4Checksum() is not needed!
     // We check it in BPF program
-    from = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ip->saddr), sizeof(ip->saddr));
-    to = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ip->daddr), sizeof(ip->daddr));
-    l4Protocol = ip->protocol;
-    l4Header = reinterpret_cast<uint8_t*>(ip + 1);
-    payloadEnd = std::min(reinterpret_cast<uint8_t*>(ip) + ntohs(ip->tot_len), payloadEnd);
-  }
-  else if (eth->h_proto == htons(ETH_P_IPV6)) {
-    auto* ipv6 = reinterpret_cast<ipv6hdr*>(eth + 1);
-    l4Header = reinterpret_cast<uint8_t*>(ipv6 + 1);
-    if (l4Header >= payloadEnd) {
+    // we don't, actually.
+    from = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
+    to = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
+    l4Protocol = ipHeader.protocol;
+    if (!fromSetHeader && (frameLength - sizeof(ethhdr)) != ntohs(ipHeader.tot_len)) {
+      // too small, or too large (trailing data), go away
+      return false;
+    }
+  }
+  else if (ethHeader.h_proto == htons(ETH_P_IPV6)) {
+    if (frameLength < (sizeof(ethhdr) + sizeof(ipv6hdr) + sizeof(udphdr))) {
+      return false;
+    }
+    v6 = true;
+    auto ipHeader = getIPv6Header();
+    from = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
+    to = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
+    l4Protocol = ipHeader.nexthdr;
+    if (!fromSetHeader && (frameLength - (sizeof(ethhdr) + sizeof(ipv6hdr))) != ntohs(ipHeader.payload_len)) {
       return false;
     }
-    from = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipv6->saddr), sizeof(ipv6->saddr));
-    to = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipv6->daddr), sizeof(ipv6->daddr));
-    l4Protocol = ipv6->nexthdr;
-    payloadEnd = std::min(l4Header + ntohs(ipv6->payload_len), payloadEnd);
   }
   else {
     return false;
   }
-  if (l4Protocol == IPPROTO_UDP) {
-    // check udp.check == ipv4Checksum() is not needed!
-    // We check it in BPF program
-    const auto* udp = reinterpret_cast<const udphdr*>(l4Header);
-    payload = l4Header + sizeof(udphdr);
-    // Because of XskPacket::setHeader
-    // payload = payloadEnd should be allow
-    if (payload > payloadEnd) {
-      return false;
-    }
-    payloadEnd = std::min(l4Header + ntohs(udp->len), payloadEnd);
-    from.setPort(ntohs(udp->source));
-    to.setPort(ntohs(udp->dest));
-    return true;
+
+  if (l4Protocol != IPPROTO_UDP) {
+    return false;
   }
-  if (l4Protocol == IPPROTO_TCP) {
-    // check tcp.check == ipv4Checksum() is not needed!
-    // We check it in BPF program
-    const auto* tcp = reinterpret_cast<const tcphdr*>(l4Header);
-    if (tcp->doff != static_cast<uint32_t>(sizeof(tcphdr) >> 2)) {
-      // tcp is not supported now!
+
+  // check udp.check == ipv4Checksum() is not needed!
+  // We check it in BPF program
+  // we don't, actually.
+  auto udpHeader = getUDPHeader();
+  if (!fromSetHeader) {
+    // Because of XskPacket::setHeader
+    if (getDataOffset() > frameLength) {
       return false;
     }
-    payload = l4Header + sizeof(tcphdr);
-    //
-    if (payload > payloadEnd) {
+
+    if (getDataSize() + sizeof(udphdr) != ntohs(udpHeader.len)) {
       return false;
     }
-    from.setPort(ntohs(tcp->source));
-    to.setPort(ntohs(tcp->dest));
-    flags |= TCP;
-    return true;
   }
-  // ipv6 extension header is not supported now!
-  return false;
+
+  from.setPort(ntohs(udpHeader.source));
+  to.setPort(ntohs(udpHeader.dest));
+  return true;
 }
 
-uint32_t XskPacket::dataLen() const noexcept
+uint32_t XskPacket::getDataLen() const noexcept
 {
-  return payloadEnd - payload;
+  return getDataSize();
 }
-uint32_t XskPacket::FrameLen() const noexcept
+
+uint32_t XskPacket::getFrameLen() const noexcept
 {
-  return payloadEnd - frame;
+  return frameLength;
 }
-size_t XskPacket::capacity() const noexcept
+
+size_t XskPacket::getCapacity() const noexcept
 {
-  return frameEnd - payloadEnd;
+  return frameSize;
 }
 
 void XskPacket::changeDirectAndUpdateChecksum() noexcept
 {
-  auto* eth = reinterpret_cast<ethhdr*>(frame);
+  auto ethHeader = getEthernetHeader();
   {
     uint8_t tmp[ETH_ALEN];
-    static_assert(sizeof(tmp) == sizeof(eth->h_dest), "Size Error");
-    static_assert(sizeof(tmp) == sizeof(eth->h_source), "Size Error");
-    memcpy(tmp, eth->h_dest, sizeof(tmp));
-    memcpy(eth->h_dest, eth->h_source, sizeof(tmp));
-    memcpy(eth->h_source, tmp, sizeof(tmp));
+    static_assert(sizeof(tmp) == sizeof(ethHeader.h_dest), "Size Error");
+    static_assert(sizeof(tmp) == sizeof(ethHeader.h_source), "Size Error");
+    memcpy(tmp, ethHeader.h_dest, sizeof(tmp));
+    memcpy(ethHeader.h_dest, ethHeader.h_source, sizeof(tmp));
+    memcpy(ethHeader.h_source, tmp, sizeof(tmp));
   }
-  if (eth->h_proto == htons(ETH_P_IPV6)) {
+  if (ethHeader.h_proto == htons(ETH_P_IPV6)) {
     // IPV6
-    auto* ipv6 = reinterpret_cast<ipv6hdr*>(eth + 1);
-    std::swap(ipv6->daddr, ipv6->saddr);
-    if (ipv6->nexthdr == IPPROTO_UDP) {
-      // UDP
-      auto* udp = reinterpret_cast<udphdr*>(ipv6 + 1);
-      std::swap(udp->dest, udp->source);
-      udp->len = htons(payloadEnd - reinterpret_cast<uint8_t*>(udp));
-      udp->check = 0;
-      udp->check = tcp_udp_v6_checksum();
-    }
-    else {
-      // TCP
-      auto* tcp = reinterpret_cast<tcphdr*>(ipv6 + 1);
-      std::swap(tcp->dest, tcp->source);
-      // TODO
-    }
-    rewriteIpv6Header(ipv6);
+    auto ipv6 = getIPv6Header();
+    std::swap(ipv6.daddr, ipv6.saddr);
+    assert(ipv6.nexthdr == IPPROTO_UDP);
+
+    auto udp = getUDPHeader();
+    std::swap(udp.dest, udp.source);
+    udp.len = htons(getDataSize() + sizeof(udp));
+    udp.check = 0;
+    /* needed to get the correct checksum */
+    setIPv6Header(ipv6);
+    setUDPHeader(udp);
+    udp.check = tcp_udp_v6_checksum(&ipv6);
+    rewriteIpv6Header(&ipv6, getFrameLen());
+    setIPv6Header(ipv6);
+    setUDPHeader(udp);
   }
   else {
     // IPV4
-    auto* ipv4 = reinterpret_cast<iphdr*>(eth + 1);
-    std::swap(ipv4->daddr, ipv4->saddr);
-    if (ipv4->protocol == IPPROTO_UDP) {
-      // UDP
-      auto* udp = reinterpret_cast<udphdr*>(ipv4 + 1);
-      std::swap(udp->dest, udp->source);
-      udp->len = htons(payloadEnd - reinterpret_cast<uint8_t*>(udp));
-      udp->check = 0;
-      udp->check = tcp_udp_v4_checksum();
-    }
-    else {
-      // TCP
-      auto* tcp = reinterpret_cast<tcphdr*>(ipv4 + 1);
-      std::swap(tcp->dest, tcp->source);
-      // TODO
-    }
-    rewriteIpv4Header(ipv4);
+    auto ipv4 = getIPv4Header();
+    std::swap(ipv4.daddr, ipv4.saddr);
+    assert(ipv4.protocol == IPPROTO_UDP);
+
+    auto udp = getUDPHeader();
+    std::swap(udp.dest, udp.source);
+    udp.len = htons(getDataSize() + sizeof(udp));
+    udp.check = 0;
+    /* needed to get the correct checksum */
+    setIPv4Header(ipv4);
+    setUDPHeader(udp);
+    udp.check = tcp_udp_v4_checksum(&ipv4);
+    rewriteIpv4Header(&ipv4, getFrameLen());
+    setIPv4Header(ipv4);
+    setUDPHeader(udp);
   }
+  setEthernetHeader(ethHeader);
 }
-void XskPacket::rewriteIpv4Header(void* ipv4header) noexcept
+
+void XskPacket::rewriteIpv4Header(struct iphdr* ipv4header, size_t frameLen) noexcept
 {
-  auto* ipv4 = static_cast<iphdr*>(ipv4header);
-  ipv4->version = 4;
-  ipv4->ihl = sizeof(iphdr) / 4;
-  ipv4->tos = 0;
-  ipv4->tot_len = htons(payloadEnd - reinterpret_cast<uint8_t*>(ipv4));
-  ipv4->id = 0;
-  ipv4->frag_off = 0;
-  ipv4->ttl = DefaultTTL;
-  ipv4->check = 0;
-  ipv4->check = ipv4Checksum();
+  ipv4header->version = 4;
+  ipv4header->ihl = sizeof(iphdr) / 4;
+  ipv4header->tos = 0;
+  ipv4header->tot_len = htons(frameLen - sizeof(ethhdr));
+  ipv4header->id = 0;
+  ipv4header->frag_off = 0;
+  ipv4header->ttl = DefaultTTL;
+  ipv4header->check = 0;
+  ipv4header->check = ipv4Checksum(ipv4header);
 }
-void XskPacket::rewriteIpv6Header(void* ipv6header) noexcept
+
+void XskPacket::rewriteIpv6Header(struct ipv6hdr* ipv6header, size_t frameLen) noexcept
 {
-  auto* ipv6 = static_cast<ipv6hdr*>(ipv6header);
-  ipv6->version = 6;
-  ipv6->priority = 0;
-  ipv6->payload_len = htons(payloadEnd - reinterpret_cast<uint8_t*>(ipv6 + 1));
-  ipv6->hop_limit = DefaultTTL;
-  memset(&ipv6->flow_lbl, 0, sizeof(ipv6->flow_lbl));
+  ipv6header->version = 6;
+  ipv6header->priority = 0;
+  ipv6header->payload_len = htons(frameLen - sizeof(ethhdr) - sizeof(ipv6hdr));
+  ipv6header->hop_limit = DefaultTTL;
+  memset(&ipv6header->flow_lbl, 0, sizeof(ipv6header->flow_lbl));
 }
 
 bool XskPacket::isIPV6() const noexcept
 {
-  const auto* eth = reinterpret_cast<ethhdr*>(frame);
-  return eth->h_proto == htons(ETH_P_IPV6);
+  return v6;
 }
-XskPacket::XskPacket(void* frame_, size_t dataSize, size_t frameSize) :
-  frame(static_cast<uint8_t*>(frame_)), payloadEnd(static_cast<uint8_t*>(frame) + dataSize), frameEnd(static_cast<uint8_t*>(frame) + frameSize - XDP_PACKET_HEADROOM)
+
+XskPacket::XskPacket(uint8_t* frame_, size_t dataSize, size_t frameSize) :
+  frame(frame_), frameLength(dataSize), frameSize(frameSize - XDP_PACKET_HEADROOM)
 {
 }
+
 PacketBuffer XskPacket::clonePacketBuffer() const
 {
-  const auto size = dataLen();
+  const auto size = getDataSize();
   PacketBuffer tmp(size);
-  memcpy(tmp.data(), payload, size);
+  memcpy(tmp.data(), frame + getDataOffset(), size);
   return tmp;
 }
+
 void XskPacket::cloneIntoPacketBuffer(PacketBuffer& buffer) const
 {
-  const auto size = dataLen();
+  const auto size = getDataSize();
   buffer.resize(size);
-  memcpy(buffer.data(), payload, size);
+  memcpy(buffer.data(), frame + getDataOffset(), size);
 }
+
 bool XskPacket::setPayload(const PacketBuffer& buf)
 {
   const auto bufSize = buf.size();
-  if (bufSize == 0 || bufSize > capacity()) {
+  const auto currentCapacity = getCapacity();
+  if (bufSize == 0 || bufSize > currentCapacity) {
     return false;
   }
   flags |= UPDATE;
-  memcpy(payload, buf.data(), bufSize);
-  payloadEnd = payload + bufSize;
+  memcpy(frame + getDataOffset(), buf.data(), bufSize);
+  frameLength = getDataOffset() + bufSize;
   return true;
 }
+
 void XskPacket::addDelay(const int relativeMilliseconds) noexcept
 {
   gettime(&sendTime);
@@ -490,18 +673,22 @@ void XskPacket::addDelay(const int relativeMilliseconds) noexcept
   sendTime.tv_sec += sendTime.tv_nsec / 1000000000L;
   sendTime.tv_nsec %= 1000000000L;
 }
+
 bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept
 {
   return s1->sendTime < s2->sendTime;
 }
+
 const ComboAddress& XskPacket::getFromAddr() const noexcept
 {
   return from;
 }
+
 const ComboAddress& XskPacket::getToAddr() const noexcept
 {
   return to;
 }
+
 void XskWorker::notify(int fd)
 {
   uint64_t value = 1;
@@ -512,6 +699,7 @@ void XskWorker::notify(int fd)
     throw runtime_error("Unable Wake Up XskSocket Failed");
   }
 }
+
 XskWorker::XskWorker() :
   workerWaker(createEventfd()), xskSocketWaker(createEventfd())
 {
@@ -520,154 +708,158 @@ XskWorker::XskWorker() :
 void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet)
 {
   auto raw = packet.release();
-  if (!cq.push(raw)) {
-    delete raw;
+#if defined(__SANITIZE_THREAD__)
+  if (!incomingPacketsQueue.lock()->push(std::move(raw))) {
+#else
+  if (!incomingPacketsQueue.push(std::move(raw))) {
+#endif
+    markAsFree(XskPacketPtr(raw));
   }
 }
 
 void XskWorker::pushToSendQueue(XskPacketPtr&& packet)
 {
   auto raw = packet.release();
-  if (!sq.push(raw)) {
-    delete raw;
+#if defined(__SANITIZE_THREAD__)
+  if (!outgoingPacketsQueue.lock()->push(raw)) {
+#else
+  if (!outgoingPacketsQueue.push(raw)) {
+#endif
+    markAsFree(XskPacketPtr(raw));
   }
 }
 
-void* XskPacket::payloadData()
-{
-  return reinterpret_cast<void*>(payload);
-}
-const void* XskPacket::payloadData() const
+const void* XskPacket::getPayloadData() const
 {
-  return reinterpret_cast<const void*>(payload);
+  return frame + getDataOffset();
 }
-void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp) noexcept
+
+void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept
 {
-  auto* eth = reinterpret_cast<ethhdr*>(frame);
-  memcpy(eth->h_dest, &toMAC[0], sizeof(MACAddr));
-  memcpy(eth->h_source, &fromMAC[0], sizeof(MACAddr));
+  auto ethHeader = getEthernetHeader();
+  memcpy(ethHeader.h_dest, &toMAC[0], sizeof(MACAddr));
+  memcpy(ethHeader.h_source, &fromMAC[0], sizeof(MACAddr));
+  setEthernetHeader(ethHeader);
   to = to_;
   from = from_;
-  l4Header = frame + sizeof(ethhdr) + (to.isIPv4() ? sizeof(iphdr) : sizeof(ipv6hdr));
-  if (tcp) {
-    flags = TCP;
-    payload = l4Header + sizeof(tcphdr);
-  }
-  else {
-    flags = 0;
-    payload = l4Header + sizeof(udphdr);
-  }
+  v6 = !to.isIPv4();
+  flags = 0;
 }
+
 void XskPacket::rewrite() noexcept
 {
   flags |= REWRITE;
-  auto* eth = reinterpret_cast<ethhdr*>(frame);
-  if (to.isIPv4()) {
-    eth->h_proto = htons(ETH_P_IP);
-    auto* ipv4 = reinterpret_cast<iphdr*>(eth + 1);
-
-    ipv4->daddr = to.sin4.sin_addr.s_addr;
-    ipv4->saddr = from.sin4.sin_addr.s_addr;
-    if (flags & XskPacket::TCP) {
-      auto* tcp = reinterpret_cast<tcphdr*>(ipv4 + 1);
-      ipv4->protocol = IPPROTO_TCP;
-      tcp->source = from.sin4.sin_port;
-      tcp->dest = to.sin4.sin_port;
-      // TODO
-    }
-    else {
-      auto* udp = reinterpret_cast<udphdr*>(ipv4 + 1);
-      ipv4->protocol = IPPROTO_UDP;
-      udp->source = from.sin4.sin_port;
-      udp->dest = to.sin4.sin_port;
-      udp->len = htons(payloadEnd - reinterpret_cast<uint8_t*>(udp));
-      udp->check = 0;
-      udp->check = tcp_udp_v4_checksum();
-    }
-    rewriteIpv4Header(ipv4);
+  auto ethHeader = getEthernetHeader();
+  if (!v6) {
+    ethHeader.h_proto = htons(ETH_P_IP);
+
+    auto ipHeader = getIPv4Header();
+    ipHeader.daddr = to.sin4.sin_addr.s_addr;
+    ipHeader.saddr = from.sin4.sin_addr.s_addr;
+
+    auto udpHeader = getUDPHeader();
+    ipHeader.protocol = IPPROTO_UDP;
+    udpHeader.source = from.sin4.sin_port;
+    udpHeader.dest = to.sin4.sin_port;
+    udpHeader.len = htons(getDataSize());
+    udpHeader.check = 0;
+    /* needed to get the correct checksum */
+    setIPv4Header(ipHeader);
+    setUDPHeader(udpHeader);
+    udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
+    rewriteIpv4Header(&ipHeader, getFrameLen());
+    setIPv4Header(ipHeader);
+    setUDPHeader(udpHeader);
   }
   else {
-    auto* ipv6 = reinterpret_cast<ipv6hdr*>(eth + 1);
-    memcpy(&ipv6->daddr, &to.sin6.sin6_addr, sizeof(ipv6->daddr));
-    memcpy(&ipv6->saddr, &from.sin6.sin6_addr, sizeof(ipv6->saddr));
-    if (flags & XskPacket::TCP) {
-      auto* tcp = reinterpret_cast<tcphdr*>(ipv6 + 1);
-      ipv6->nexthdr = IPPROTO_TCP;
-      tcp->source = from.sin6.sin6_port;
-      tcp->dest = to.sin6.sin6_port;
-      // TODO
-    }
-    else {
-      auto* udp = reinterpret_cast<udphdr*>(ipv6 + 1);
-      ipv6->nexthdr = IPPROTO_UDP;
-      udp->source = from.sin6.sin6_port;
-      udp->dest = to.sin6.sin6_port;
-      udp->len = htons(payloadEnd - reinterpret_cast<uint8_t*>(udp));
-      udp->check = 0;
-      udp->check = tcp_udp_v6_checksum();
-    }
+    ethHeader.h_proto = htons(ETH_P_IPV6);
+
+    auto ipHeader = getIPv6Header();
+    memcpy(&ipHeader.daddr, &to.sin6.sin6_addr, sizeof(ipHeader.daddr));
+    memcpy(&ipHeader.saddr, &from.sin6.sin6_addr, sizeof(ipHeader.saddr));
+
+    auto udpHeader = getUDPHeader();
+    ipHeader.nexthdr = IPPROTO_UDP;
+    udpHeader.source = from.sin6.sin6_port;
+    udpHeader.dest = to.sin6.sin6_port;
+    udpHeader.len = htons(getDataSize());
+    udpHeader.check = 0;
+    /* needed to get the correct checksum */
+    setIPv6Header(ipHeader);
+    setUDPHeader(udpHeader);
+    udpHeader.check = tcp_udp_v6_checksum(&ipHeader);
+    setIPv6Header(ipHeader);
+    setUDPHeader(udpHeader);
   }
+
+  setEthernetHeader(ethHeader);
 }
 
-[[nodiscard]] __be16 XskPacket::ipv4Checksum() const noexcept
+[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ip) noexcept
 {
-  auto* ip = reinterpret_cast<iphdr*>(frame + sizeof(ethhdr));
-  return ip_checksum_fold(ip_checksum_partial(ip, sizeof(iphdr), 0));
+  auto partial = ip_checksum_partial(ip, sizeof(iphdr), 0);
+  return ip_checksum_fold(partial);
 }
-[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum() const noexcept
+
+[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ip) const noexcept
 {
-  const auto* ip = reinterpret_cast<iphdr*>(frame + sizeof(ethhdr));
   // ip options is not supported !!!
-  const auto l4Length = static_cast<uint16_t>(payloadEnd - l4Header);
+  const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
   auto sum = tcp_udp_v4_header_checksum_partial(ip->saddr, ip->daddr, ip->protocol, l4Length);
-  sum = ip_checksum_partial(l4Header, l4Length, sum);
+  sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
   return ip_checksum_fold(sum);
 }
-[[nodiscard]] __be16 XskPacket::tcp_udp_v6_checksum() const noexcept
+
+[[nodiscard]] __be16 XskPacket::tcp_udp_v6_checksum(const struct ipv6hdr* ipv6) const noexcept
 {
-  const auto* ipv6 = reinterpret_cast<ipv6hdr*>(frame + sizeof(ethhdr));
-  const auto l4Length = static_cast<uint16_t>(payloadEnd - l4Header);
+  const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
   uint64_t sum = tcp_udp_v6_header_checksum_partial(&ipv6->saddr, &ipv6->daddr, ipv6->nexthdr, l4Length);
-  sum = ip_checksum_partial(l4Header, l4Length, sum);
+  sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
   return ip_checksum_fold(sum);
 }
 
-#ifndef __packed
-#define __packed __attribute__((packed))
-#endif
-[[nodiscard]] uint64_t XskPacket::ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept
+[[nodiscard]] uint64_t XskPacket::ip_checksum_partial(const void* ptr, const size_t len, uint64_t sum) noexcept
 {
-  /* Main loop: 32 bits at a time.
-   * We take advantage of intel's ability to do unaligned memory
-   * accesses with minimal additional cost. Other architectures
-   * probably want to be more careful here.
-   */
-  const uint32_t* p32 = (const uint32_t*)(p);
-  for (; len >= sizeof(*p32); len -= sizeof(*p32))
-    sum += *p32++;
+  size_t position{0};
+  /* Main loop: 32 bits at a time */
+  for (position = 0; position < len; position += sizeof(uint32_t)) {
+    uint32_t value{};
+    memcpy(&value, reinterpret_cast<const uint8_t*>(ptr) + position, sizeof(value));
+    sum += value;
+  }
 
   /* Handle un-32bit-aligned trailing bytes */
-  const uint16_t* p16 = (const uint16_t*)(p32);
-  if (len >= 2) {
-    sum += *p16++;
-    len -= sizeof(*p16);
+  if ((len - position) >= 2) {
+    uint16_t value{};
+    memcpy(&value, reinterpret_cast<const uint8_t*>(ptr) + position, sizeof(value));
+    sum += value;
+    position += sizeof(value);
   }
-  if (len > 0) {
-    const uint8_t* p8 = (const uint8_t*)(p16);
+
+  if ((len - position) > 0) {
+    const auto* p8 = static_cast<const uint8_t*>(ptr) + position;
     sum += ntohs(*p8 << 8); /* RFC says pad last byte */
   }
 
   return sum;
 }
+
 [[nodiscard]] __be16 XskPacket::ip_checksum_fold(uint64_t sum) noexcept
 {
-  while (sum & ~0xffffffffULL)
+  while (sum & ~0xffffffffULL) {
     sum = (sum >> 32) + (sum & 0xffffffffULL);
-  while (sum & 0xffff0000ULL)
+  }
+  while (sum & 0xffff0000ULL) {
     sum = (sum >> 16) + (sum & 0xffffULL);
+  }
 
-  return ~sum;
+  return static_cast<__be16>(~sum);
 }
+
+#ifndef __packed
+#define __packed __attribute__((packed))
+#endif
+
 [[nodiscard]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept
 {
   struct header
@@ -699,6 +891,7 @@ void XskPacket::rewrite() noexcept
   pseudo_header.fields.length = htons(len);
   return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
 }
+
 [[nodiscard]] uint64_t XskPacket::tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept
 {
   struct header
@@ -730,22 +923,25 @@ void XskPacket::rewrite() noexcept
   pseudo_header.fields.next_header = protocol;
   return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
 }
+
 void XskPacket::setHeader(const PacketBuffer& buf)
 {
   memcpy(frame, buf.data(), buf.size());
-  payloadEnd = frame + buf.size();
+  frameLength = buf.size();
   flags = 0;
-  if (!parse()) {
+  if (!parse(true)) {
     throw std::runtime_error("Error setting the XSK frame header");
   }
 }
+
 std::unique_ptr<PacketBuffer> XskPacket::cloneHeadertoPacketBuffer() const
 {
-  const auto size = payload - frame;
+  const auto size = getFrameLen() - getDataSize();
   auto tmp = std::make_unique<PacketBuffer>(size);
   memcpy(tmp->data(), frame, size);
   return tmp;
 }
+
 int XskWorker::createEventfd()
 {
   auto fd = ::eventfd(0, EFD_CLOEXEC);
@@ -754,10 +950,12 @@ int XskWorker::createEventfd()
   }
   return fd;
 }
+
 void XskWorker::waitForXskSocket() noexcept
 {
   uint64_t x = read(workerWaker, &x, sizeof(x));
 }
+
 void XskWorker::notifyXskSocket() noexcept
 {
   notify(xskSocketWaker);
@@ -767,7 +965,8 @@ std::shared_ptr<XskWorker> XskWorker::create()
 {
   return std::make_shared<XskWorker>();
 }
-void XskSocket::addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest, bool isTCP)
+
+void XskSocket::addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest)
 {
   extern std::atomic<bool> g_configurationDone;
   if (g_configurationDone) {
@@ -792,14 +991,17 @@ void XskSocket::addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest
     .events = POLLIN,
     .revents = 0});
 };
+
 uint64_t XskWorker::frameOffset(const XskPacket& s) const noexcept
 {
   return s.frame - umemBufBase;
 }
+
 void XskWorker::notifyWorker() noexcept
 {
   notify(workerWaker);
 }
+
 void XskSocket::getMACFromIfName()
 {
   ifreq ifr{};
@@ -819,19 +1021,23 @@ void XskSocket::getMACFromIfName()
   static_assert(sizeof(ifr.ifr_hwaddr.sa_data) >= std::tuple_size<decltype(source)>{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected");
   memcpy(source.data(), ifr.ifr_hwaddr.sa_data, source.size());
 }
+
 [[nodiscard]] int XskSocket::timeDifference(const timespec& t1, const timespec& t2) noexcept
 {
   const auto res = t1.tv_sec * 1000 + t1.tv_nsec / 1000000L - (t2.tv_sec * 1000 + t2.tv_nsec / 1000000L);
   return static_cast<int>(res);
 }
+
 void XskWorker::cleanWorkerNotification() noexcept
 {
   uint64_t x = read(xskSocketWaker, &x, sizeof(x));
 }
+
 void XskWorker::cleanSocketNotification() noexcept
 {
   uint64_t x = read(workerWaker, &x, sizeof(x));
 }
+
 std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
 {
   std::vector<pollfd> fds;
@@ -851,33 +1057,49 @@ std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
   });
   return fds;
 }
+
 void XskWorker::fillUniqueEmptyOffset()
 {
   auto frames = sharedEmptyFrameOffset->lock();
   const auto moveSize = std::min(static_cast<size_t>(32), frames->size());
   if (moveSize > 0) {
     uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
+    frames->resize(frames->size() - moveSize);
   }
 }
-void* XskWorker::getEmptyframe()
+
+XskPacketPtr XskWorker::getEmptyFrame()
 {
   if (!uniqueEmptyFrameOffset.empty()) {
     auto offset = uniqueEmptyFrameOffset.back();
     uniqueEmptyFrameOffset.pop_back();
-    return offset + umemBufBase;
+    return std::make_unique<XskPacket>(offset + umemBufBase, 0, frameSize);
   }
   fillUniqueEmptyOffset();
   if (!uniqueEmptyFrameOffset.empty()) {
     auto offset = uniqueEmptyFrameOffset.back();
     uniqueEmptyFrameOffset.pop_back();
-    return offset + umemBufBase;
+    return std::make_unique<XskPacket>(offset + umemBufBase, 0, frameSize);
   }
   return nullptr;
 }
+
+void XskWorker::markAsFree(XskPacketPtr&& packet)
+{
+
+  auto offset = frameOffset(*packet);
+#ifdef DEBUG_UMEM
+  checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
+#endif /* DEBUG_UMEM */
+  uniqueEmptyFrameOffset.push_back(offset);
+  packet.release();
+}
+
 uint32_t XskPacket::getFlags() const noexcept
 {
   return flags;
 }
+
 void XskPacket::updatePacket() noexcept
 {
   if (!(flags & UPDATE)) {
index 6715571091125566cb304e421cac4c7824b0b0b7..551c23074c9465db143a3777a857bfd283f3c533 100644 (file)
 #include <boost/multi_index_container.hpp>
 #include <boost/multi_index/member.hpp>
 #include <memory>
+#include <poll.h>
 #include <queue>
 #include <stdexcept>
 #include <string>
-#include <sys/poll.h>
-#include <sys/types.h>
+//#include <sys/types.h>
 #include <unistd.h>
 #include <unordered_map>
 #include <vector>
+#include <linux/if_ether.h>
 #include <linux/types.h>
+#include <linux/udp.h>
 
 #include <xdp/xsk.h>
 
@@ -59,8 +61,14 @@ using XskPacketPtr = std::unique_ptr<XskPacket>;
 
 // We use an XskSocket to manage an AF_XDP Socket corresponding to a NIC queue.
 // The XDP program running in the kernel redirects the data to the XskSocket in userspace.
+// We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue.
+// Once we have read the descriptors from the rx queue we release them, but we own the frames.
+// After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent).
+// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue).
+
 // XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet.
 // The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively.
+
 class XskSocket
 {
   struct XskRouteInfo
@@ -85,7 +93,9 @@ class XskSocket
       boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, int, &XskRouteInfo::xskSocketWaker>>,
       boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, ComboAddress, &XskRouteInfo::dest>, ComboAddress::addressPortOnlyHash>>>
     workers;
+  // number of frames to keep in sharedEmptyFrameOffset
   static constexpr size_t holdThreshold = 256;
+  // number of frames to insert into the fill queue
   static constexpr size_t fillThreshold = 128;
   static constexpr size_t frameSize = 2048;
   // number of entries (frames) in the umem
@@ -98,7 +108,10 @@ class XskSocket
   const std::string poolName;
   // AF_XDP socket then worker waker sockets
   vector<pollfd> fds;
-  // list of (indexes of) umem entries that can be reused
+  // list of frames, aka (indexes of) umem entries that can be reused to fill fq,
+  // collected from packets that we could not route (unknown destination),
+  // could not parse, were dropped during processing (!UPDATE), or
+  // simply recycled from cq after being processed by the kernel
   vector<uint64_t> uniqueEmptyFrameOffset;
   // completion ring: queue where sent packets are stored by the kernel
   xsk_ring_cons cq;
@@ -122,119 +135,152 @@ class XskSocket
   friend void XskRouter(std::shared_ptr<XskSocket> xsk);
 
   [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
-  int firstTimeout();
-  // pick ups as many available frames as possible from uniqueEmptyFrameOffset
-  // and put them into sharedEmptyFrameOffset
-  // then insert them into fq
+  [[nodiscard]] int firstTimeout();
+  // pick ups available frames from uniqueEmptyFrameOffset
+  // insert entries from uniqueEmptyFrameOffset into fq
   void fillFq(uint32_t fillSize = fillThreshold) noexcept;
-  // picks up entries that have been processed (sent) and push them into uniqueEmptyFrameOffset
+  // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
   void recycle(size_t size) noexcept;
   void getMACFromIfName();
   // look at delayed packets, and send the ones that are ready
   void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
 
 public:
+  static constexpr size_t getFrameSize()
+  {
+    return frameSize;
+  }
   // list of free umem entries that can be reused
   std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
   XskSocket(size_t frameNum, const std::string& ifName, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_);
   MACAddr source;
   [[nodiscard]] int xskFd() const noexcept;
   // wait until one event has occurred
-  int wait(int timeout);
+  [[nodiscard]] int wait(int timeout);
   // add as many packets as possible to the rx queue for sending */
   void send(std::vector<XskPacketPtr>& packets);
   // look at incoming packets in rx, return them if parsing succeeeded
-  std::vector<XskPacketPtr> recv(uint32_t recvSizeMax, uint32_t* failedCount);
-  void addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest, bool isTCP);
-  std::string getMetrics() const;
+  [[nodiscard]] std::vector<XskPacketPtr> recv(uint32_t recvSizeMax, uint32_t* failedCount);
+  void addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest);
+  [[nodiscard]] std::string getMetrics() const;
+  void markAsFree(XskPacketPtr&& packet);
 };
+
+struct iphdr;
+struct ipv6hdr;
+
 class XskPacket
 {
 public:
   enum Flags : uint32_t
   {
-    TCP = 1 << 0,
-    UPDATE = 1 << 1,
-    DELAY = 1 << 3,
-    REWRITE = 1 << 4
+    UPDATE = 1 << 0,
+    DELAY = 1 << 1,
+    REWRITE = 1 << 2
   };
 
 private:
   ComboAddress from;
   ComboAddress to;
   timespec sendTime;
-  uint8_t* frame;
-  uint8_t* l4Header;
-  uint8_t* payload;
-  uint8_t* payloadEnd;
-  uint8_t* frameEnd;
+  uint8_t* frame{nullptr};
+  size_t frameLength{0};
+  size_t frameSize{0};
   uint32_t flags{0};
+  bool v6{false};
+
+  // You must set ipHeader.check = 0 before calling this method
+  [[nodiscard]] static __be16 ipv4Checksum(const struct iphdr*) noexcept;
+  [[nodiscard]] static uint64_t ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept;
+  [[nodiscard]] static __be16 ip_checksum_fold(uint64_t sum) noexcept;
+  [[nodiscard]] static uint64_t tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept;
+  [[nodiscard]] static uint64_t tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept;
+  static void rewriteIpv4Header(struct iphdr* ipv4header, size_t frameLen) noexcept;
+  static void rewriteIpv6Header(struct ipv6hdr* ipv6header, size_t frameLen) noexcept;
+
+  // You must set l4Header.check = 0 before calling this method
+  // ip options is not supported
+  [[nodiscard]] __be16 tcp_udp_v4_checksum(const struct iphdr*) const noexcept;
+  // You must set l4Header.check = 0 before calling this method
+  [[nodiscard]] __be16 tcp_udp_v6_checksum(const struct ipv6hdr*) const noexcept;
+    /* offset of the L4 (udphdr) header (after ethhdr and iphdr/ipv6hdr) */
+  [[nodiscard]] size_t getL4HeaderOffset() const noexcept;
+  /* offset of the data after the UDP header */
+  [[nodiscard]] size_t getDataOffset() const noexcept;
+  [[nodiscard]] size_t getDataSize() const noexcept;
+  [[nodiscard]] ethhdr getEthernetHeader() const noexcept;
+  void setEthernetHeader(const ethhdr& ethHeader) noexcept;
+  [[nodiscard]] iphdr getIPv4Header() const noexcept;
+  void setIPv4Header(const iphdr& ipv4Header) noexcept;
+  [[nodiscard]] ipv6hdr getIPv6Header() const noexcept;
+  void setIPv6Header(const ipv6hdr& ipv6Header) noexcept;
+  [[nodiscard]] udphdr getUDPHeader() const noexcept;
+  void setUDPHeader(const udphdr& udpHeader) noexcept;
+  // parse IP and UDP payloads
+  bool parse(bool fromSetHeader);
+  void changeDirectAndUpdateChecksum() noexcept;
 
   friend XskSocket;
   friend XskWorker;
   friend bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
 
   constexpr static uint8_t DefaultTTL = 64;
-  // parse IP and UDP/TCP payloads
-  bool parse();
-  void changeDirectAndUpdateChecksum() noexcept;
-
-  // You must set ipHeader.check = 0 before call this method
-  [[nodiscard]] __be16 ipv4Checksum() const noexcept;
-  // You must set l4Header.check = 0 before call this method
-  // ip options is not supported
-  [[nodiscard]] __be16 tcp_udp_v4_checksum() const noexcept;
-  // You must set l4Header.check = 0 before call this method
-  [[nodiscard]] __be16 tcp_udp_v6_checksum() const noexcept;
-  [[nodiscard]] static uint64_t ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept;
-  [[nodiscard]] static __be16 ip_checksum_fold(uint64_t sum) noexcept;
-  [[nodiscard]] static uint64_t tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept;
-  [[nodiscard]] static uint64_t tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept;
-  void rewriteIpv4Header(void* ipv4header) noexcept;
-  void rewriteIpv6Header(void* ipv6header) noexcept;
 
 public:
   [[nodiscard]] const ComboAddress& getFromAddr() const noexcept;
   [[nodiscard]] const ComboAddress& getToAddr() const noexcept;
-  [[nodiscard]] const void* payloadData() const;
+  [[nodiscard]] const void* getPayloadData() const;
   [[nodiscard]] bool isIPV6() const noexcept;
-  [[nodiscard]] size_t capacity() const noexcept;
-  [[nodiscard]] uint32_t dataLen() const noexcept;
-  [[nodiscard]] uint32_t FrameLen() const noexcept;
+  [[nodiscard]] size_t getCapacity() const noexcept;
+  [[nodiscard]] uint32_t getDataLen() const noexcept;
+  [[nodiscard]] uint32_t getFrameLen() const noexcept;
   [[nodiscard]] PacketBuffer clonePacketBuffer() const;
   void cloneIntoPacketBuffer(PacketBuffer& buffer) const;
   [[nodiscard]] std::unique_ptr<PacketBuffer> cloneHeadertoPacketBuffer() const;
-  [[nodiscard]] void* payloadData();
-  void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp = false) noexcept;
+  void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept;
   bool setPayload(const PacketBuffer& buf);
   void rewrite() noexcept;
   void setHeader(const PacketBuffer& buf);
-  XskPacket() = default;
-  XskPacket(void* frame, size_t dataSize, size_t frameSize);
+  XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize);
   void addDelay(int relativeMilliseconds) noexcept;
   void updatePacket() noexcept;
   [[nodiscard]] uint32_t getFlags() const noexcept;
 };
 bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
 
+/* g++ defines __SANITIZE_THREAD__
+   clang++ supports the nice __has_feature(thread_sanitizer),
+   let's merge them */
+#if defined(__has_feature)
+#if __has_feature(thread_sanitizer)
+#define __SANITIZE_THREAD__ 1
+#endif
+#endif
+
 // XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq.
 // After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags.
 // XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq.
 class XskWorker
 {
-  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<512>>;
+#if defined(__SANITIZE_THREAD__)
+  using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS*2>>>;
+#else
+  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS*2>>;
+#endif
 
 public:
   // queue of packets to be processed by this worker
-  XskPacketRing cq;
+  XskPacketRing incomingPacketsQueue;
   // queue of packets processed by this worker (to be sent, or discarded)
-  XskPacketRing sq;
+  XskPacketRing outgoingPacketsQueue;
 
   uint8_t* umemBufBase;
+  // list of frames that are shared with the XskRouter
   std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
+  // list of frames that we own, used to generate new packets (health-check)
   vector<uint64_t> uniqueEmptyFrameOffset;
   std::string poolName;
-  size_t frameSize;
+  const size_t frameSize{XskSocket::getFrameSize()};
   FDWrapper workerWaker;
   FDWrapper xskSocketWaker;
 
@@ -244,6 +290,7 @@ public:
   static std::shared_ptr<XskWorker> create();
   void pushToProcessingQueue(XskPacketPtr&& packet);
   void pushToSendQueue(XskPacketPtr&& packet);
+  void markAsFree(XskPacketPtr&& packet);
   // notify worker that at least one packet is available for processing
   void notifyWorker() noexcept;
   // notify the router that packets are ready to be sent
@@ -252,11 +299,11 @@ public:
   void cleanWorkerNotification() noexcept;
   void cleanSocketNotification() noexcept;
   [[nodiscard]] uint64_t frameOffset(const XskPacket& s) const noexcept;
-  // reap empty umeme entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
+  // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
   void fillUniqueEmptyOffset();
   // look for an empty umem entry in uniqueEmptyFrameOffset
   // then sharedEmptyFrameOffset if needed
-  void* getEmptyframe();
+  XskPacketPtr getEmptyFrame();
 };
 std::vector<pollfd> getPollFdsForWorker(XskWorker& info);
 #else