]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Fix a data race in the AF_XDP/XSK dnsdist <-> backend code
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 8 Jul 2024 12:43:21 +0000 (14:43 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 8 Jul 2024 12:55:52 +0000 (14:55 +0200)
The existing code was sharing the same XskWorker between the thread
handling incoming queries (possibly replying right away for
self-answered and cache hit responses) and the one handling responses
coming from a backend (without XSK), which is wrong since the internal
queues are single-producer (and single consumer, but a worker is only
assigned to a single XskRouter which is OK).
This commit introduces a new, separate worker for the threads handling
responses coming from a backend without XSK (it was already the case
for responses coming from a backend via XSK). The new worker is marked
"outgoing-only" to ensure we are not confused about what it can be used
for, which is only sending packets, not receiving any.

pdns/dnsdistdist/dnsdist-backend.cc
pdns/dnsdistdist/dnsdist-lua.cc
pdns/dnsdistdist/dnsdist-xsk.cc
pdns/dnsdistdist/dnsdist.cc
pdns/dnsdistdist/dnsdist.hh
pdns/xsk.cc
pdns/xsk.hh

index 54c4fc76e7bae039904822a8e0b952fd3559ccf0..495242f4934d9b87b3d599dcbd5a583d45877e95 100644 (file)
@@ -905,10 +905,10 @@ void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
   d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
 
   for (auto& xsk : d_xskSockets) {
-    auto xskInfo = XskWorker::create();
+    auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
     d_xskInfos.push_back(xskInfo);
     xsk->addWorker(xskInfo);
-    xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
+    xskInfo->setSharedFrames(xsk->sharedEmptyFrameOffset);
   }
   reconnect(false);
 }
index c526a93cc2d6d0855b479b36b270b7bc4df8a803..ed127a735b8e9a3d8a7e1933bb39f7d04a0d5601 100644 (file)
@@ -816,10 +816,13 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       std::shared_ptr<XskSocket> socket;
       parseXskVars(vars, socket);
       if (socket) {
-        udpCS->xskInfo = XskWorker::create();
-        udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
+        udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
+        udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset);
         socket->addWorker(udpCS->xskInfo);
         socket->addWorkerRoute(udpCS->xskInfo, loc);
+        udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly);
+        udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset);
+        socket->addWorker(udpCS->xskInfoResponder);
         vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
       }
 #endif /* HAVE_XSK */
@@ -871,10 +874,13 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       std::shared_ptr<XskSocket> socket;
       parseXskVars(vars, socket);
       if (socket) {
-        udpCS->xskInfo = XskWorker::create();
-        udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
+        udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
+        udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset);
         socket->addWorker(udpCS->xskInfo);
         socket->addWorkerRoute(udpCS->xskInfo, loc);
+        udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly);
+        udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset);
+        socket->addWorker(udpCS->xskInfoResponder);
         vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
       }
 #endif /* HAVE_XSK */
index 8ef59833a19689a106ed6f6757cb380963cc9219..f411fa171a45ca919fc520ba2f32b28284129272 100644 (file)
@@ -48,11 +48,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs
       if ((pollfds[0].revents & POLLIN) != 0) {
         needNotify = true;
         xskInfo->cleanSocketNotification();
-#if defined(__SANITIZE_THREAD__)
-        xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-        xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+        xskInfo->processIncomingFrames([&](XskPacket& packet) {
           if (packet.getDataLen() < sizeof(dnsheader)) {
             xskInfo->markAsFree(packet);
             return;
@@ -77,7 +73,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs
           }
           if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
             xskInfo->markAsFree(packet);
-            infolog("XSK packet pushed to queue because processResponderPacket failed");
+            vinfolog("XSK packet dropped because processResponderPacket failed");
             return;
           }
           if (response.size() > packet.getCapacity()) {
@@ -171,11 +167,7 @@ void XskRouter(std::shared_ptr<XskSocket> xsk)
         if ((fds.at(fdIndex).revents & POLLIN) != 0) {
           ready--;
           const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
-#if defined(__SANITIZE_THREAD__)
-          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-          info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+          info->processOutgoingFrames([&](XskPacket& packet) {
             if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
               xsk->markAsFree(packet);
               return;
@@ -207,18 +199,10 @@ void XskClientThread(ClientState* clientState)
   LocalHolders holders;
 
   for (;;) {
-#if defined(__SANITIZE_THREAD__)
-    while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) {
-#else
-    while (xskInfo->incomingPacketsQueue.read_available() == 0U) {
-#endif
+    while (!xskInfo->hasIncomingFrames()) {
       xskInfo->waitForXskSocket();
     }
-#if defined(__SANITIZE_THREAD__)
-    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+    xskInfo->processIncomingFrames([&](XskPacket& packet) {
       if (XskProcessQuery(*clientState, holders, packet)) {
         packet.updatePacket();
         xskInfo->pushToSendQueue(packet);
index 4fd2dc499b034c56b9495dca28fbf547bf21da41..0336904f5390f0930e4472193773cb4b19b7582d 100644 (file)
@@ -852,9 +852,9 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
             continue;
           }
 
-          if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) {
+          if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfoResponder) {
 #ifdef HAVE_XSK
-            auto& xskInfo = ids->cs->xskInfo;
+            auto& xskInfo = ids->cs->xskInfoResponder;
             auto xskPacket = xskInfo->getEmptyFrame();
             if (!xskPacket) {
               continue;
index cd1ef1880059a8b6fb855bbc008bb302ec20be9b..d3c02c4995a83b750a7fe58077797a9408cf605d 100644 (file)
@@ -565,6 +565,7 @@ struct ClientState
   std::shared_ptr<DOH3Frontend> doh3Frontend{nullptr};
   std::shared_ptr<BPFFilter> d_filter{nullptr};
   std::shared_ptr<XskWorker> xskInfo{nullptr};
+  std::shared_ptr<XskWorker> xskInfoResponder{nullptr};
   size_t d_maxInFlightQueriesPerConn{1};
   size_t d_tcpConcurrentConnectionsLimit{0};
   int udpFD{-1};
index c871d81ba06e93b8fec616f2e8634bed392534c8..7ea50a07c46f6232fbf750313eb3a30ca2ffd4df 100644 (file)
@@ -275,7 +275,16 @@ void XskSocket::removeDestinationAddress(const std::string& mapPath, const Combo
 
 void XskSocket::fillFq(uint32_t fillSize) noexcept
 {
-  {
+  if (uniqueEmptyFrameOffset.size() < fillSize) {
+    auto frames = sharedEmptyFrameOffset->lock();
+    const auto moveSize = std::min(static_cast<size_t>(fillSize), frames->size());
+    if (moveSize > 0) {
+      // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
+      uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
+      frames->resize(frames->size() - moveSize);
+    }
+  }
+  else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) {
     // if we have less than holdThreshold frames in the shared queue (which might be an issue
     // when the XskWorker needs empty frames), move frames from the unique container into the
     // shared one. This might not be optimal right now.
@@ -290,7 +299,9 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept
     }
   }
 
-  if (uniqueEmptyFrameOffset.size() < fillSize) {
+  fillSize = std::min(fillSize, static_cast<uint32_t>(uniqueEmptyFrameOffset.size()));
+  if (fillSize == 0) {
+    auto frames = sharedEmptyFrameOffset->lock();
     return;
   }
 
@@ -393,11 +404,13 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
       }
     }
     catch (const std::exception& exp) {
-      std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl;
+      ++failed;
+      ++processed;
       break;
     }
     catch (...) {
-      std::cerr << "Exception while processing the XSK RX queue" << std::endl;
+      ++failed;
+      ++processed;
       break;
     }
   }
@@ -844,29 +857,24 @@ void XskWorker::notify(int desc)
   }
 }
 
-XskWorker::XskWorker() :
-  workerWaker(createEventfd()), xskSocketWaker(createEventfd())
+XskWorker::XskWorker(XskWorker::Type type) :
+  d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd())
 {
 }
 
 void XskWorker::pushToProcessingQueue(XskPacket& packet)
 {
-#if defined(__SANITIZE_THREAD__)
-  if (!incomingPacketsQueue.lock()->push(packet)) {
-#else
-  if (!incomingPacketsQueue.push(packet)) {
-#endif
+  if (d_type == Type::OutgoingOnly) {
+    throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker");
+  }
+  if (!d_incomingPacketsQueue.push(packet)) {
     markAsFree(packet);
   }
 }
 
 void XskWorker::pushToSendQueue(XskPacket& packet)
 {
-#if defined(__SANITIZE_THREAD__)
-  if (!outgoingPacketsQueue.lock()->push(packet)) {
-#else
-  if (!outgoingPacketsQueue.push(packet)) {
-#endif
+  if (!d_outgoingPacketsQueue.push(packet)) {
     markAsFree(packet);
   }
 }
@@ -911,7 +919,7 @@ void XskPacket::rewrite() noexcept
     /* needed to get the correct checksum */
     setIPv4Header(ipHeader);
     setUDPHeader(udpHeader);
-    udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
+    //udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
     rewriteIpv4Header(&ipHeader, getFrameLen());
     setIPv4Header(ipHeader);
     setUDPHeader(udpHeader);
@@ -1119,15 +1127,15 @@ void XskWorker::notifyXskSocket() const
   notify(xskSocketWaker);
 }
 
-std::shared_ptr<XskWorker> XskWorker::create()
+std::shared_ptr<XskWorker> XskWorker::create(Type type)
 {
-  return std::make_shared<XskWorker>();
+  return std::make_shared<XskWorker>(type);
 }
 
 void XskSocket::addWorker(std::shared_ptr<XskWorker> worker)
 {
   const auto socketWaker = worker->xskSocketWaker.getHandle();
-  worker->umemBufBase = umem.bufBase;
+  worker->setUmemBufBase(umem.bufBase);
   d_workers.insert({socketWaker, std::move(worker)});
   fds.push_back(pollfd{
     .fd = socketWaker,
@@ -1145,9 +1153,19 @@ void XskSocket::removeWorkerRoute(const ComboAddress& dest)
   d_workerRoutes.lock()->erase(dest);
 }
 
+void XskWorker::setSharedFrames(std::shared_ptr<LockGuarded<vector<uint64_t>>>& frames)
+{
+  d_sharedEmptyFrameOffset = frames;
+}
+
+void XskWorker::setUmemBufBase(uint8_t* base)
+{
+  d_umemBufBase = base;
+}
+
 uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept
 {
-  return packet.getFrameOffsetFrom(umemBufBase);
+  return packet.getFrameOffsetFrom(d_umemBufBase);
 }
 
 void XskWorker::notifyWorker() const
@@ -1155,6 +1173,29 @@ void XskWorker::notifyWorker() const
   notify(workerWaker);
 }
 
+bool XskWorker::hasIncomingFrames()
+{
+  if (d_type == Type::OutgoingOnly) {
+    throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+  }
+
+  return d_incomingPacketsQueue.read_available() != 0U;
+}
+
+void XskWorker::processIncomingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+  if (d_type == Type::OutgoingOnly) {
+    throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+  }
+
+  d_incomingPacketsQueue.consume_all(callback);
+}
+
+void XskWorker::processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+  d_outgoingPacketsQueue.consume_all(callback);
+}
+
 void XskSocket::getMACFromIfName()
 {
   ifreq ifr{};
@@ -1215,14 +1256,14 @@ std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
 
 std::optional<XskPacket> XskWorker::getEmptyFrame()
 {
-  auto frames = sharedEmptyFrameOffset->lock();
+  auto frames = d_sharedEmptyFrameOffset->lock();
   if (frames->empty()) {
     return std::nullopt;
   }
   auto offset = frames->back();
   frames->pop_back();
   // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
-  return XskPacket(offset + umemBufBase, 0, frameSize);
+  return XskPacket(offset + d_umemBufBase, 0, d_frameSize);
 }
 
 void XskWorker::markAsFree(const XskPacket& packet)
@@ -1232,7 +1273,7 @@ void XskWorker::markAsFree(const XskPacket& packet)
   checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
 #endif /* DEBUG_UMEM */
   {
-    auto frames = sharedEmptyFrameOffset->lock();
+    auto frames = d_sharedEmptyFrameOffset->lock();
     frames->push_back(offset);
   }
 }
index ca6e65ca04fd9c11e630b9cab496d53a938b7ba6..df312e399e45367587258ac3baba587651c8476b 100644 (file)
@@ -58,7 +58,7 @@ using MACAddr = std::array<uint8_t, 6>;
 // We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue.
 // Once we have read the descriptors from the rx queue we release them, but we own the frames.
 // After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent).
-// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue).
+// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue).
 
 // XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet.
 // The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively.
@@ -269,45 +269,42 @@ public:
 };
 bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept;
 
-/* g++ defines __SANITIZE_THREAD__
-   clang++ supports the nice __has_feature(thread_sanitizer),
-   let's merge them */
-#if defined(__has_feature)
-#if __has_feature(thread_sanitizer)
-#define __SANITIZE_THREAD__ 1
-#endif
-#endif
-
 // XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq.
 // After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags.
 // XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq.
 class XskWorker
 {
-#if defined(__SANITIZE_THREAD__)
-  using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>>;
-#else
-  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
-#endif
-
 public:
+  enum class Type : uint8_t { OutgoingOnly, Bidirectional};
+
+private:
+  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
   // queue of packets to be processed by this worker
-  XskPacketRing incomingPacketsQueue;
+  XskPacketRing d_incomingPacketsQueue;
   // queue of packets processed by this worker (to be sent, or discarded)
-  XskPacketRing outgoingPacketsQueue;
-
-  uint8_t* umemBufBase{nullptr};
+  XskPacketRing d_outgoingPacketsQueue;
   // list of frames that are shared with the XskRouter
-  std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
-  const size_t frameSize{XskSocket::getFrameSize()};
+  std::shared_ptr<LockGuarded<vector<uint64_t>>> d_sharedEmptyFrameOffset;
+  uint8_t* d_umemBufBase{nullptr};
+  const size_t d_frameSize{XskSocket::getFrameSize()};
+  Type d_type;
+
+public:
   FDWrapper workerWaker;
   FDWrapper xskSocketWaker;
 
-  XskWorker();
   static int createEventfd();
   static void notify(int desc);
-  static std::shared_ptr<XskWorker> create();
+  static std::shared_ptr<XskWorker> create(Type);
+
+  XskWorker(Type);
+  void setSharedFrames(std::shared_ptr<LockGuarded<vector<uint64_t>>>& frames);
+  void setUmemBufBase(uint8_t* base);
   void pushToProcessingQueue(XskPacket& packet);
   void pushToSendQueue(XskPacket& packet);
+  bool hasIncomingFrames();
+  void processIncomingFrames(const std::function<void(XskPacket& packet)>& callback);
+  void processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback);
   void markAsFree(const XskPacket& packet);
   // notify worker that at least one packet is available for processing
   void notifyWorker() const;