]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Minor clean ups in the XSK code
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 26 Sep 2023 10:35:50 +0000 (12:35 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 23 Jan 2024 11:54:18 +0000 (12:54 +0100)
pdns/dnsdist.cc
pdns/xsk.cc
pdns/xsk.hh

index 82ea4fc8494f028abb69fbdad9ab5a9c81086c22..ae1d00aa1ff6bd1428b2010beaf5ec5aa006380a 100644 (file)
@@ -780,11 +780,11 @@ static void XskHealthCheck(std::shared_ptr<DownstreamState>& dss, std::unordered
   data->d_initial = initial;
   setHealthCheckTime(dss, data);
   auto* frame = xskInfo->getEmptyframe();
-  auto *xskPacket = new XskPacket(frame, 0, xskInfo->frameSize);
+  auto xskPacket = std::make_unique<XskPacket>(frame, 0, xskInfo->frameSize);
   xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
   xskPacket->setPayload(packet);
   xskPacket->rewrite();
-  xskInfo->sq.push(xskPacket);
+  xskInfo->pushToSendQueue(std::move(xskPacket));
   const auto queryId = data->d_queryID;
   map[queryId] = std::move(data);
 }
@@ -853,9 +853,10 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
         bool needNotify = false;
         if (pollfds[0].revents & POLLIN) {
           needNotify = true;
-          xskInfo->cq.consume_all([&](XskPacket* packet) {
+          xskInfo->cq.consume_all([&](XskPacket* packetRaw) {
+            auto packet = XskPacketPtr(packetRaw);
             if (packet->dataLen() < sizeof(dnsheader)) {
-              xskInfo->sq.push(packet);
+              xskInfo->pushToSendQueue(std::move(packet));
               return;
             }
             const auto* dh = reinterpret_cast<const struct dnsheader*>(packet->payloadData());
@@ -876,7 +877,7 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
                 packet->cloneIntoPacketBuffer(data->d_buffer);
                 data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
               }
-              xskInfo->sq.push(packet);
+              xskInfo->pushToSendQueue(std::move(packet));
               return;
             }
             auto response = packet->clonePacketBuffer();
@@ -885,13 +886,13 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
               ids->xskPacketHeader.reset();
             }
             if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
-              xskInfo->sq.push(packet);
+              xskInfo->pushToSendQueue(std::move(packet));
               return;
             }
             if (response.size() > packet->capacity()) {
               /* fallback to sending the packet via normal socket */
               sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
-              xskInfo->sq.push(packet);
+              xskInfo->pushToSendQueue(std::move(packet));
               return;
             }
             packet->setHeader(*ids->xskPacketHeader);
@@ -900,7 +901,7 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
               packet->addDelay(ids->delayMsec);
             }
             packet->updatePacket();
-            xskInfo->sq.push(packet);
+            xskInfo->pushToSendQueue(std::move(packet));
           });
           xskInfo->cleanSocketNotification();
         }
@@ -1006,7 +1007,7 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
           xskPacket->setHeader(*ids->xskPacketHeader);
           xskPacket->setPayload(response);
           xskPacket->updatePacket();
-          xskInfo->sq.push(xskPacket.release());
+          xskInfo->pushToSendQueue(std::move(xskPacket));
           xskInfo->notifyXskSocket();
 #endif /* HAVE_XSK */
         }
@@ -2248,10 +2249,11 @@ static void xskClientThread(ClientState* cs)
     while (!xskInfo->cq.read_available()) {
       xskInfo->waitForXskSocket();
     }
-    xskInfo->cq.consume_all([&](XskPacket* packet) {
+    xskInfo->cq.consume_all([&](XskPacket* packetRaw) {
+      auto packet = XskPacketPtr(packetRaw);
       ProcessXskQuery(*cs, holders, *packet);
       packet->updatePacket();
-      xskInfo->sq.push(packet);
+      xskInfo->pushToSendQueue(std::move(packet));
     });
     xskInfo->notifyXskSocket();
   }
@@ -3652,7 +3654,7 @@ void XskRouter(std::shared_ptr<XskSocket> xsk)
           xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet));
           continue;
         }
-        res->worker->cq.push(packet.release());
+        res->worker->pushToProcessingQueue(std::move(packet));
         needNotify.insert(res->workerWaker);
       }
       for (auto i : needNotify) {
@@ -3672,17 +3674,18 @@ void XskRouter(std::shared_ptr<XskSocket> xsk)
       if (xsk->fds[i].revents & POLLIN) {
         ready--;
         auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker;
-        info->sq.consume_all([&](XskPacket* x) {
-          if (!(x->getFlags() & XskPacket::UPDATE)) {
-            xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*x));
+        info->sq.consume_all([&](XskPacket* packetRaw) {
+          auto packet = XskPacketPtr(packetRaw);
+          if (!(packet->getFlags() & XskPacket::UPDATE)) {
+            xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet));
+            packet.release();
             return;
           }
-          auto ptr = std::unique_ptr<XskPacket>(x);
-          if (x->getFlags() & XskPacket::DELAY) {
-            xsk->waitForDelay.push(std::move(ptr));
+          if (packet->getFlags() & XskPacket::DELAY) {
+            xsk->waitForDelay.push(std::move(packet));
             return;
           }
-          fillInTx.push_back(std::move(ptr));
+          fillInTx.push_back(std::move(packet));
         });
         info->cleanWorkerNotification();
       }
index c57f97f3679e5f2238a714652f6e67d34f9aee92..4f055d626c4dcca0d2fd25b21047fc75c6e8d3e3 100644 (file)
@@ -28,7 +28,6 @@
 #include "xsk.hh"
 
 #include <algorithm>
-#include <cassert>
 #include <cstdint>
 #include <cstring>
 #include <fcntl.h>
@@ -65,6 +64,7 @@ constexpr bool XskSocket::isPowOfTwo(uint32_t value) noexcept
 {
   return value != 0 && (value & (value - 1)) == 0;
 }
+
 int XskSocket::firstTimeout()
 {
   if (waitForDelay.empty()) {
@@ -243,14 +243,14 @@ void XskSocket::recycle(size_t size) noexcept
   xsk_ring_cons__release(&cq, completeSize);
 }
 
-void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config)
+void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config)
 {
   size = memSize;
   bufBase = static_cast<uint8_t*>(mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
   if (bufBase == MAP_FAILED) {
     throw std::runtime_error("mmap failed");
   }
-  auto ret = xsk_umem__create(&umem, bufBase, size, fq, cq, config);
+  auto ret = xsk_umem__create(&umem, bufBase, size, fillQueue, completionQueue, config);
   if (ret != 0) {
     munmap(bufBase, size);
     throw std::runtime_error("Error creating a umem of size" + std::to_string(size) + stringerror(ret));
@@ -326,7 +326,7 @@ bool XskPacket::parse()
   if (l4Protocol == IPPROTO_UDP) {
     // check udp.check == ipv4Checksum() is not needed!
     // We check it in BPF program
-    auto* udp = reinterpret_cast<udphdr*>(l4Header);
+    const auto* udp = reinterpret_cast<const udphdr*>(l4Header);
     payload = l4Header + sizeof(udphdr);
     // Because of XskPacket::setHeader
     // payload = payloadEnd should be allow
@@ -341,7 +341,7 @@ bool XskPacket::parse()
   if (l4Protocol == IPPROTO_TCP) {
     // check tcp.check == ipv4Checksum() is not needed!
     // We check it in BPF program
-    auto* tcp = reinterpret_cast<tcphdr*>(l4Header);
+    const auto* tcp = reinterpret_cast<const tcphdr*>(l4Header);
     if (tcp->doff != static_cast<uint32_t>(sizeof(tcphdr) >> 2)) {
       // tcp is not supported now!
       return false;
@@ -514,6 +514,23 @@ XskWorker::XskWorker() :
   workerWaker(createEventfd()), xskSocketWaker(createEventfd())
 {
 }
+
+void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet)
+{
+  auto raw = packet.release();
+  if (!cq.push(raw)) {
+    delete raw;
+  }
+}
+
+void XskWorker::pushToSendQueue(XskPacketPtr&& packet)
+{
+  auto raw = packet.release();
+  if (!sq.push(raw)) {
+    delete raw;
+  }
+}
+
 void* XskPacket::payloadData()
 {
   return reinterpret_cast<void*>(payload);
@@ -670,7 +687,7 @@ void XskPacket::rewrite() noexcept
     };
   };
   struct ipv4_pseudo_header_t pseudo_header;
-  assert(sizeof(pseudo_header) == 12);
+  static_assert(sizeof(pseudo_header) == 12, "IPv4 pseudo-header size is incorrect");
 
   /* Fill in the pseudo-header. */
   pseudo_header.fields.src_ip = src_ip;
@@ -701,7 +718,7 @@ void XskPacket::rewrite() noexcept
     };
   };
   struct ipv6_pseudo_header_t pseudo_header;
-  assert(sizeof(pseudo_header) == 40);
+  static_assert(sizeof(pseudo_header) == 40, "IPv6 pseudo-header size is incorrect");
 
   /* Fill in the pseudo-header. */
   pseudo_header.fields.src_ip = *src_ip;
@@ -711,13 +728,14 @@ void XskPacket::rewrite() noexcept
   pseudo_header.fields.next_header = protocol;
   return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
 }
-void XskPacket::setHeader(const PacketBuffer& buf) noexcept
+void XskPacket::setHeader(const PacketBuffer& buf)
 {
   memcpy(frame, buf.data(), buf.size());
   payloadEnd = frame + buf.size();
   flags = 0;
-  const auto res = parse();
-  assert(res);
+  if (!parse()) {
+    throw std::runtime_error("Error setting the XSK frame header");
+  }
 }
 std::unique_ptr<PacketBuffer> XskPacket::cloneHeadertoPacketBuffer() const
 {
index b4e10c6eeeffa014633dd1e0813363df80bc5580..6715571091125566cb304e421cac4c7824b0b0b7 100644 (file)
@@ -208,7 +208,7 @@ public:
   void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp = false) noexcept;
   bool setPayload(const PacketBuffer& buf);
   void rewrite() noexcept;
-  void setHeader(const PacketBuffer& buf) noexcept;
+  void setHeader(const PacketBuffer& buf);
   XskPacket() = default;
   XskPacket(void* frame, size_t dataSize, size_t frameSize);
   void addDelay(int relativeMilliseconds) noexcept;
@@ -225,13 +225,14 @@ class XskWorker
   using XskPacketRing = boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<512>>;
 
 public:
-  uint8_t* umemBufBase;
-  std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
-  vector<uint64_t> uniqueEmptyFrameOffset;
   // queue of packets to be processed by this worker
   XskPacketRing cq;
   // queue of packets processed by this worker (to be sent, or discarded)
   XskPacketRing sq;
+
+  uint8_t* umemBufBase;
+  std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
+  vector<uint64_t> uniqueEmptyFrameOffset;
   std::string poolName;
   size_t frameSize;
   FDWrapper workerWaker;
@@ -241,6 +242,8 @@ public:
   static int createEventfd();
   static void notify(int fd);
   static std::shared_ptr<XskWorker> create();
+  void pushToProcessingQueue(XskPacketPtr&& packet);
+  void pushToSendQueue(XskPacketPtr&& packet);
   // notify worker that at least one packet is available for processing
   void notifyWorker() noexcept;
   // notify the router that packets are ready to be sent