]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Fix a data race in the AF_XDP/XSK dnsdist <-> backend code
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 8 Jul 2024 12:43:21 +0000 (14:43 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 8 Jul 2024 13:29:22 +0000 (15:29 +0200)
The existing code was sharing the same XskWorker between the thread
handling incoming queries (possibly replying right away for
self-answered and cache hit responses) and the one handling responses
coming from a backend (without XSK), which is wrong since the internal
queues are single-producer (and single consumer, but a worker is only
assigned to a single XskRouter which is OK).
This commit introduces a new, separate worker for the threads handling
responses coming from a backend without XSK (it was already the case
for responses coming from a backend via XSK). The new worker is marked
"outgoing-only" to ensure we are not confused about what it can be used
for, which is only sending packets, not receiving any.

(cherry picked from commit 4d2d75ea171df2ac510b566a4386d13195de2314)

pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-backend.cc
pdns/dnsdistdist/dnsdist-xsk.cc
pdns/xsk.cc
pdns/xsk.hh

index 73a8567b241aa09ab1d6b2f886751e1e0441590c..df1b0eb6df0dccbb2a05beaec8ddf6763b50639e 100644 (file)
@@ -809,10 +809,13 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       std::shared_ptr<XskSocket> socket;
       parseXskVars(vars, socket);
       if (socket) {
-        udpCS->xskInfo = XskWorker::create();
-        udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
+        udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
+        udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset);
         socket->addWorker(udpCS->xskInfo);
         socket->addWorkerRoute(udpCS->xskInfo, loc);
+        udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly);
+        udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset);
+        socket->addWorker(udpCS->xskInfoResponder);
         vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
       }
 #endif /* HAVE_XSK */
@@ -863,10 +866,13 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       std::shared_ptr<XskSocket> socket;
       parseXskVars(vars, socket);
       if (socket) {
-        udpCS->xskInfo = XskWorker::create();
-        udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
+        udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
+        udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset);
         socket->addWorker(udpCS->xskInfo);
         socket->addWorkerRoute(udpCS->xskInfo, loc);
+        udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly);
+        udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset);
+        socket->addWorker(udpCS->xskInfoResponder);
         vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
       }
 #endif /* HAVE_XSK */
index 4849a679ff809c30116b208827c987f0b3c44ca6..8c8ea24f93600aae10425ec72d4e0273f48d17ae 100644 (file)
@@ -861,9 +861,9 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
           continue;
         }
 
-        if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) {
+        if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfoResponder) {
 #ifdef HAVE_XSK
-          auto& xskInfo = ids->cs->xskInfo;
+          auto& xskInfo = ids->cs->xskInfoResponder;
           auto xskPacket = xskInfo->getEmptyFrame();
           if (!xskPacket) {
             continue;
index 777b27aaf75ed46ac627042c55900baab8019fb1..4c2b743052832b543c3731af7493e62ea709d8ca 100644 (file)
@@ -516,6 +516,7 @@ struct ClientState
   std::shared_ptr<DOH3Frontend> doh3Frontend{nullptr};
   std::shared_ptr<BPFFilter> d_filter{nullptr};
   std::shared_ptr<XskWorker> xskInfo{nullptr};
+  std::shared_ptr<XskWorker> xskInfoResponder{nullptr};
   size_t d_maxInFlightQueriesPerConn{1};
   size_t d_tcpConcurrentConnectionsLimit{0};
   int udpFD{-1};
index 7f5603482f155214702d814fa2735fd3e954096b..0710d39d66df0c3a25c463d4d8b7afae7f047373 100644 (file)
@@ -905,10 +905,10 @@ void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
   d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
 
   for (auto& xsk : d_xskSockets) {
-    auto xskInfo = XskWorker::create();
+    auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
     d_xskInfos.push_back(xskInfo);
     xsk->addWorker(xskInfo);
-    xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
+    xskInfo->setSharedFrames(xsk->sharedEmptyFrameOffset);
   }
   reconnect(false);
 }
index 058e381908da48c79d69f9c9918eb4c7e282a909..7e83c510093e33a1c202c1508231a0d1478f8951 100644 (file)
@@ -48,11 +48,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs
       if ((pollfds[0].revents & POLLIN) != 0) {
         needNotify = true;
         xskInfo->cleanSocketNotification();
-#if defined(__SANITIZE_THREAD__)
-        xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-        xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+        xskInfo->processIncomingFrames([&](XskPacket& packet) {
           if (packet.getDataLen() < sizeof(dnsheader)) {
             xskInfo->markAsFree(packet);
             return;
@@ -77,7 +73,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs
           }
           if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
             xskInfo->markAsFree(packet);
-            infolog("XSK packet pushed to queue because processResponderPacket failed");
+            vinfolog("XSK packet dropped because processResponderPacket failed");
             return;
           }
           if (response.size() > packet.getCapacity()) {
@@ -171,11 +167,7 @@ void XskRouter(std::shared_ptr<XskSocket> xsk)
         if ((fds.at(fdIndex).revents & POLLIN) != 0) {
           ready--;
           const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
-#if defined(__SANITIZE_THREAD__)
-          info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-          info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+          info->processOutgoingFrames([&](XskPacket& packet) {
             if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
               xsk->markAsFree(packet);
               return;
@@ -207,18 +199,10 @@ void XskClientThread(ClientState* clientState)
   LocalHolders holders;
 
   for (;;) {
-#if defined(__SANITIZE_THREAD__)
-    while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) {
-#else
-    while (xskInfo->incomingPacketsQueue.read_available() == 0U) {
-#endif
+    while (!xskInfo->hasIncomingFrames()) {
       xskInfo->waitForXskSocket();
     }
-#if defined(__SANITIZE_THREAD__)
-    xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
-    xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+    xskInfo->processIncomingFrames([&](XskPacket& packet) {
       if (XskProcessQuery(*clientState, holders, packet)) {
         packet.updatePacket();
         xskInfo->pushToSendQueue(packet);
index c871d81ba06e93b8fec616f2e8634bed392534c8..7ea50a07c46f6232fbf750313eb3a30ca2ffd4df 100644 (file)
@@ -275,7 +275,16 @@ void XskSocket::removeDestinationAddress(const std::string& mapPath, const Combo
 
 void XskSocket::fillFq(uint32_t fillSize) noexcept
 {
-  {
+  if (uniqueEmptyFrameOffset.size() < fillSize) {
+    auto frames = sharedEmptyFrameOffset->lock();
+    const auto moveSize = std::min(static_cast<size_t>(fillSize), frames->size());
+    if (moveSize > 0) {
+      // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
+      uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
+      frames->resize(frames->size() - moveSize);
+    }
+  }
+  else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) {
     // if we have less than holdThreshold frames in the shared queue (which might be an issue
     // when the XskWorker needs empty frames), move frames from the unique container into the
     // shared one. This might not be optimal right now.
@@ -290,7 +299,9 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept
     }
   }
 
-  if (uniqueEmptyFrameOffset.size() < fillSize) {
+  fillSize = std::min(fillSize, static_cast<uint32_t>(uniqueEmptyFrameOffset.size()));
+  if (fillSize == 0) {
+    auto frames = sharedEmptyFrameOffset->lock();
     return;
   }
 
@@ -393,11 +404,13 @@ std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou
       }
     }
     catch (const std::exception& exp) {
-      std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl;
+      ++failed;
+      ++processed;
       break;
     }
     catch (...) {
-      std::cerr << "Exception while processing the XSK RX queue" << std::endl;
+      ++failed;
+      ++processed;
       break;
     }
   }
@@ -844,29 +857,24 @@ void XskWorker::notify(int desc)
   }
 }
 
-XskWorker::XskWorker() :
-  workerWaker(createEventfd()), xskSocketWaker(createEventfd())
+XskWorker::XskWorker(XskWorker::Type type) :
+  d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd())
 {
 }
 
 void XskWorker::pushToProcessingQueue(XskPacket& packet)
 {
-#if defined(__SANITIZE_THREAD__)
-  if (!incomingPacketsQueue.lock()->push(packet)) {
-#else
-  if (!incomingPacketsQueue.push(packet)) {
-#endif
+  if (d_type == Type::OutgoingOnly) {
+    throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker");
+  }
+  if (!d_incomingPacketsQueue.push(packet)) {
     markAsFree(packet);
   }
 }
 
 void XskWorker::pushToSendQueue(XskPacket& packet)
 {
-#if defined(__SANITIZE_THREAD__)
-  if (!outgoingPacketsQueue.lock()->push(packet)) {
-#else
-  if (!outgoingPacketsQueue.push(packet)) {
-#endif
+  if (!d_outgoingPacketsQueue.push(packet)) {
     markAsFree(packet);
   }
 }
@@ -911,7 +919,7 @@ void XskPacket::rewrite() noexcept
     /* needed to get the correct checksum */
     setIPv4Header(ipHeader);
     setUDPHeader(udpHeader);
-    udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
+    //udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
     rewriteIpv4Header(&ipHeader, getFrameLen());
     setIPv4Header(ipHeader);
     setUDPHeader(udpHeader);
@@ -1119,15 +1127,15 @@ void XskWorker::notifyXskSocket() const
   notify(xskSocketWaker);
 }
 
-std::shared_ptr<XskWorker> XskWorker::create()
+std::shared_ptr<XskWorker> XskWorker::create(Type type)
 {
-  return std::make_shared<XskWorker>();
+  return std::make_shared<XskWorker>(type);
 }
 
 void XskSocket::addWorker(std::shared_ptr<XskWorker> worker)
 {
   const auto socketWaker = worker->xskSocketWaker.getHandle();
-  worker->umemBufBase = umem.bufBase;
+  worker->setUmemBufBase(umem.bufBase);
   d_workers.insert({socketWaker, std::move(worker)});
   fds.push_back(pollfd{
     .fd = socketWaker,
@@ -1145,9 +1153,19 @@ void XskSocket::removeWorkerRoute(const ComboAddress& dest)
   d_workerRoutes.lock()->erase(dest);
 }
 
+void XskWorker::setSharedFrames(std::shared_ptr<LockGuarded<vector<uint64_t>>>& frames)
+{
+  d_sharedEmptyFrameOffset = frames;
+}
+
+void XskWorker::setUmemBufBase(uint8_t* base)
+{
+  d_umemBufBase = base;
+}
+
 uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept
 {
-  return packet.getFrameOffsetFrom(umemBufBase);
+  return packet.getFrameOffsetFrom(d_umemBufBase);
 }
 
 void XskWorker::notifyWorker() const
@@ -1155,6 +1173,29 @@ void XskWorker::notifyWorker() const
   notify(workerWaker);
 }
 
+bool XskWorker::hasIncomingFrames()
+{
+  if (d_type == Type::OutgoingOnly) {
+    throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+  }
+
+  return d_incomingPacketsQueue.read_available() != 0U;
+}
+
+void XskWorker::processIncomingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+  if (d_type == Type::OutgoingOnly) {
+    throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+  }
+
+  d_incomingPacketsQueue.consume_all(callback);
+}
+
+void XskWorker::processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+  d_outgoingPacketsQueue.consume_all(callback);
+}
+
 void XskSocket::getMACFromIfName()
 {
   ifreq ifr{};
@@ -1215,14 +1256,14 @@ std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
 
 std::optional<XskPacket> XskWorker::getEmptyFrame()
 {
-  auto frames = sharedEmptyFrameOffset->lock();
+  auto frames = d_sharedEmptyFrameOffset->lock();
   if (frames->empty()) {
     return std::nullopt;
   }
   auto offset = frames->back();
   frames->pop_back();
   // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
-  return XskPacket(offset + umemBufBase, 0, frameSize);
+  return XskPacket(offset + d_umemBufBase, 0, d_frameSize);
 }
 
 void XskWorker::markAsFree(const XskPacket& packet)
@@ -1232,7 +1273,7 @@ void XskWorker::markAsFree(const XskPacket& packet)
   checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
 #endif /* DEBUG_UMEM */
   {
-    auto frames = sharedEmptyFrameOffset->lock();
+    auto frames = d_sharedEmptyFrameOffset->lock();
     frames->push_back(offset);
   }
 }
index ca6e65ca04fd9c11e630b9cab496d53a938b7ba6..df312e399e45367587258ac3baba587651c8476b 100644 (file)
@@ -58,7 +58,7 @@ using MACAddr = std::array<uint8_t, 6>;
 // We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue.
 // Once we have read the descriptors from the rx queue we release them, but we own the frames.
 // After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent).
-// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue).
+// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue).
 
 // XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet.
 // The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively.
@@ -269,45 +269,42 @@ public:
 };
 bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept;
 
-/* g++ defines __SANITIZE_THREAD__
-   clang++ supports the nice __has_feature(thread_sanitizer),
-   let's merge them */
-#if defined(__has_feature)
-#if __has_feature(thread_sanitizer)
-#define __SANITIZE_THREAD__ 1
-#endif
-#endif
-
 // XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq.
 // After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags.
 // XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq.
 class XskWorker
 {
-#if defined(__SANITIZE_THREAD__)
-  using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>>;
-#else
-  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
-#endif
-
 public:
+  enum class Type : uint8_t { OutgoingOnly, Bidirectional};
+
+private:
+  using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
   // queue of packets to be processed by this worker
-  XskPacketRing incomingPacketsQueue;
+  XskPacketRing d_incomingPacketsQueue;
   // queue of packets processed by this worker (to be sent, or discarded)
-  XskPacketRing outgoingPacketsQueue;
-
-  uint8_t* umemBufBase{nullptr};
+  XskPacketRing d_outgoingPacketsQueue;
   // list of frames that are shared with the XskRouter
-  std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
-  const size_t frameSize{XskSocket::getFrameSize()};
+  std::shared_ptr<LockGuarded<vector<uint64_t>>> d_sharedEmptyFrameOffset;
+  uint8_t* d_umemBufBase{nullptr};
+  const size_t d_frameSize{XskSocket::getFrameSize()};
+  Type d_type;
+
+public:
   FDWrapper workerWaker;
   FDWrapper xskSocketWaker;
 
-  XskWorker();
   static int createEventfd();
   static void notify(int desc);
-  static std::shared_ptr<XskWorker> create();
+  static std::shared_ptr<XskWorker> create(Type);
+
+  XskWorker(Type);
+  void setSharedFrames(std::shared_ptr<LockGuarded<vector<uint64_t>>>& frames);
+  void setUmemBufBase(uint8_t* base);
   void pushToProcessingQueue(XskPacket& packet);
   void pushToSendQueue(XskPacket& packet);
+  bool hasIncomingFrames();
+  void processIncomingFrames(const std::function<void(XskPacket& packet)>& callback);
+  void processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback);
   void markAsFree(const XskPacket& packet);
   // notify worker that at least one packet is available for processing
   void notifyWorker() const;