From: Remi Gacogne Date: Mon, 8 Jul 2024 12:43:21 +0000 (+0200) Subject: dnsdist: Fix a data race in the AF_XDP/XSK dnsdist <-> backend code X-Git-Tag: rec-5.2.0-alpha1~179^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4d2d75ea171df2ac510b566a4386d13195de2314;p=thirdparty%2Fpdns.git dnsdist: Fix a data race in the AF_XDP/XSK dnsdist <-> backend code The existing code was sharing the same XskWorker between the thread handling incoming queries (possibly replying right away for self-answered and cache hit responses) and the one handling responses coming from a backend (without XSK), which is wrong since the internal queues are single-producer (and single consumer, but a worker is only assigned to a single XskRouter which is OK). This commit introduces a new, separate worker for the threads handling responses coming from a backend without XSK (it was already the case for responses coming from a backend via XSK). The new worker is marked "outgoing-only" to ensure we are not confused about what it can be used for, which is only sending packets, not receiving any. --- diff --git a/pdns/dnsdistdist/dnsdist-backend.cc b/pdns/dnsdistdist/dnsdist-backend.cc index 54c4fc76e7..495242f493 100644 --- a/pdns/dnsdistdist/dnsdist-backend.cc +++ b/pdns/dnsdistdist/dnsdist-backend.cc @@ -905,10 +905,10 @@ void DownstreamState::registerXsk(std::vector>& xsks) d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress(); for (auto& xsk : d_xskSockets) { - auto xskInfo = XskWorker::create(); + auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional); d_xskInfos.push_back(xskInfo); xsk->addWorker(xskInfo); - xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset; + xskInfo->setSharedFrames(xsk->sharedEmptyFrameOffset); } reconnect(false); } diff --git a/pdns/dnsdistdist/dnsdist-lua.cc b/pdns/dnsdistdist/dnsdist-lua.cc index c526a93cc2..ed127a735b 100644 --- a/pdns/dnsdistdist/dnsdist-lua.cc +++ b/pdns/dnsdistdist/dnsdist-lua.cc @@ -816,10 +816,13 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) std::shared_ptr socket; parseXskVars(vars, socket); if (socket) { - udpCS->xskInfo = XskWorker::create(); - udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset; + udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional); + udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset); socket->addWorker(udpCS->xskInfo); socket->addWorkerRoute(udpCS->xskInfo, loc); + udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly); + udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset); + socket->addWorker(udpCS->xskInfoResponder); vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort()); } #endif /* HAVE_XSK */ @@ -871,10 +874,13 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) std::shared_ptr socket; parseXskVars(vars, socket); if (socket) { - udpCS->xskInfo = XskWorker::create(); - udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset; + udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional); + udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset); socket->addWorker(udpCS->xskInfo); socket->addWorkerRoute(udpCS->xskInfo, loc); + udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly); + udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset); + socket->addWorker(udpCS->xskInfoResponder); vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort()); } #endif /* HAVE_XSK */ diff --git a/pdns/dnsdistdist/dnsdist-xsk.cc b/pdns/dnsdistdist/dnsdist-xsk.cc index 8ef59833a1..f411fa171a 100644 --- a/pdns/dnsdistdist/dnsdist-xsk.cc +++ b/pdns/dnsdistdist/dnsdist-xsk.cc @@ -48,11 +48,7 @@ void XskResponderThread(std::shared_ptr dss, std::shared_ptrcleanSocketNotification(); -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif + xskInfo->processIncomingFrames([&](XskPacket& packet) { if (packet.getDataLen() < sizeof(dnsheader)) { xskInfo->markAsFree(packet); return; @@ -77,7 +73,7 @@ void XskResponderThread(std::shared_ptr dss, std::shared_ptrmarkAsFree(packet); - infolog("XSK packet pushed to queue because processResponderPacket failed"); + vinfolog("XSK packet dropped because processResponderPacket failed"); return; } if (response.size() > packet.getCapacity()) { @@ -171,11 +167,7 @@ void XskRouter(std::shared_ptr xsk) if ((fds.at(fdIndex).revents & POLLIN) != 0) { ready--; const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd); -#if defined(__SANITIZE_THREAD__) - info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif + info->processOutgoingFrames([&](XskPacket& packet) { if ((packet.getFlags() & XskPacket::UPDATE) == 0) { xsk->markAsFree(packet); return; @@ -207,18 +199,10 @@ void XskClientThread(ClientState* clientState) LocalHolders holders; for (;;) { -#if defined(__SANITIZE_THREAD__) - while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) { -#else - while (xskInfo->incomingPacketsQueue.read_available() == 0U) { -#endif + while (!xskInfo->hasIncomingFrames()) { xskInfo->waitForXskSocket(); } -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif + xskInfo->processIncomingFrames([&](XskPacket& packet) { if (XskProcessQuery(*clientState, holders, packet)) { packet.updatePacket(); xskInfo->pushToSendQueue(packet); diff --git a/pdns/dnsdistdist/dnsdist.cc b/pdns/dnsdistdist/dnsdist.cc index 4fd2dc499b..0336904f53 100644 --- a/pdns/dnsdistdist/dnsdist.cc +++ b/pdns/dnsdistdist/dnsdist.cc @@ -852,9 +852,9 @@ void responderThread(std::shared_ptr dss) continue; } - if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) { + if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfoResponder) { #ifdef HAVE_XSK - auto& xskInfo = ids->cs->xskInfo; + auto& xskInfo = ids->cs->xskInfoResponder; auto xskPacket = xskInfo->getEmptyFrame(); if (!xskPacket) { continue; diff --git a/pdns/dnsdistdist/dnsdist.hh b/pdns/dnsdistdist/dnsdist.hh index cd1ef18800..d3c02c4995 100644 --- a/pdns/dnsdistdist/dnsdist.hh +++ b/pdns/dnsdistdist/dnsdist.hh @@ -565,6 +565,7 @@ struct ClientState std::shared_ptr doh3Frontend{nullptr}; std::shared_ptr d_filter{nullptr}; std::shared_ptr xskInfo{nullptr}; + std::shared_ptr xskInfoResponder{nullptr}; size_t d_maxInFlightQueriesPerConn{1}; size_t d_tcpConcurrentConnectionsLimit{0}; int udpFD{-1}; diff --git a/pdns/xsk.cc b/pdns/xsk.cc index c871d81ba0..7ea50a07c4 100644 --- a/pdns/xsk.cc +++ b/pdns/xsk.cc @@ -275,7 +275,16 @@ void XskSocket::removeDestinationAddress(const std::string& mapPath, const Combo void XskSocket::fillFq(uint32_t fillSize) noexcept { - { + if (uniqueEmptyFrameOffset.size() < fillSize) { + auto frames = sharedEmptyFrameOffset->lock(); + const auto moveSize = std::min(static_cast(fillSize), frames->size()); + if (moveSize > 0) { + // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions) + uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end())); + frames->resize(frames->size() - moveSize); + } + } + else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) { // if we have less than holdThreshold frames in the shared queue (which might be an issue // when the XskWorker needs empty frames), move frames from the unique container into the // shared one. This might not be optimal right now. @@ -290,7 +299,9 @@ void XskSocket::fillFq(uint32_t fillSize) noexcept } } - if (uniqueEmptyFrameOffset.size() < fillSize) { + fillSize = std::min(fillSize, static_cast(uniqueEmptyFrameOffset.size())); + if (fillSize == 0) { + auto frames = sharedEmptyFrameOffset->lock(); return; } @@ -393,11 +404,13 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou } } catch (const std::exception& exp) { - std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl; + ++failed; + ++processed; break; } catch (...) { - std::cerr << "Exception while processing the XSK RX queue" << std::endl; + ++failed; + ++processed; break; } } @@ -844,29 +857,24 @@ void XskWorker::notify(int desc) } } -XskWorker::XskWorker() : - workerWaker(createEventfd()), xskSocketWaker(createEventfd()) +XskWorker::XskWorker(XskWorker::Type type) : + d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd()) { } void XskWorker::pushToProcessingQueue(XskPacket& packet) { -#if defined(__SANITIZE_THREAD__) - if (!incomingPacketsQueue.lock()->push(packet)) { -#else - if (!incomingPacketsQueue.push(packet)) { -#endif + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker"); + } + if (!d_incomingPacketsQueue.push(packet)) { markAsFree(packet); } } void XskWorker::pushToSendQueue(XskPacket& packet) { -#if defined(__SANITIZE_THREAD__) - if (!outgoingPacketsQueue.lock()->push(packet)) { -#else - if (!outgoingPacketsQueue.push(packet)) { -#endif + if (!d_outgoingPacketsQueue.push(packet)) { markAsFree(packet); } } @@ -911,7 +919,7 @@ void XskPacket::rewrite() noexcept /* needed to get the correct checksum */ setIPv4Header(ipHeader); setUDPHeader(udpHeader); - udpHeader.check = tcp_udp_v4_checksum(&ipHeader); + //udpHeader.check = tcp_udp_v4_checksum(&ipHeader); rewriteIpv4Header(&ipHeader, getFrameLen()); setIPv4Header(ipHeader); setUDPHeader(udpHeader); @@ -1119,15 +1127,15 @@ void XskWorker::notifyXskSocket() const notify(xskSocketWaker); } -std::shared_ptr XskWorker::create() +std::shared_ptr XskWorker::create(Type type) { - return std::make_shared(); + return std::make_shared(type); } void XskSocket::addWorker(std::shared_ptr worker) { const auto socketWaker = worker->xskSocketWaker.getHandle(); - worker->umemBufBase = umem.bufBase; + worker->setUmemBufBase(umem.bufBase); d_workers.insert({socketWaker, std::move(worker)}); fds.push_back(pollfd{ .fd = socketWaker, @@ -1145,9 +1153,19 @@ void XskSocket::removeWorkerRoute(const ComboAddress& dest) d_workerRoutes.lock()->erase(dest); } +void XskWorker::setSharedFrames(std::shared_ptr>>& frames) +{ + d_sharedEmptyFrameOffset = frames; +} + +void XskWorker::setUmemBufBase(uint8_t* base) +{ + d_umemBufBase = base; +} + uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept { - return packet.getFrameOffsetFrom(umemBufBase); + return packet.getFrameOffsetFrom(d_umemBufBase); } void XskWorker::notifyWorker() const @@ -1155,6 +1173,29 @@ void XskWorker::notifyWorker() const notify(workerWaker); } +bool XskWorker::hasIncomingFrames() +{ + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker"); + } + + return d_incomingPacketsQueue.read_available() != 0U; +} + +void XskWorker::processIncomingFrames(const std::function& callback) +{ + if (d_type == Type::OutgoingOnly) { + throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker"); + } + + d_incomingPacketsQueue.consume_all(callback); +} + +void XskWorker::processOutgoingFrames(const std::function& callback) +{ + d_outgoingPacketsQueue.consume_all(callback); +} + void XskSocket::getMACFromIfName() { ifreq ifr{}; @@ -1215,14 +1256,14 @@ std::vector getPollFdsForWorker(XskWorker& info) std::optional XskWorker::getEmptyFrame() { - auto frames = sharedEmptyFrameOffset->lock(); + auto frames = d_sharedEmptyFrameOffset->lock(); if (frames->empty()) { return std::nullopt; } auto offset = frames->back(); frames->pop_back(); // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - return XskPacket(offset + umemBufBase, 0, frameSize); + return XskPacket(offset + d_umemBufBase, 0, d_frameSize); } void XskWorker::markAsFree(const XskPacket& packet) @@ -1232,7 +1273,7 @@ void XskWorker::markAsFree(const XskPacket& packet) checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ { - auto frames = sharedEmptyFrameOffset->lock(); + auto frames = d_sharedEmptyFrameOffset->lock(); frames->push_back(offset); } } diff --git a/pdns/xsk.hh b/pdns/xsk.hh index ca6e65ca04..df312e399e 100644 --- a/pdns/xsk.hh +++ b/pdns/xsk.hh @@ -58,7 +58,7 @@ using MACAddr = std::array; // We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue. // Once we have read the descriptors from the rx queue we release them, but we own the frames. // After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent). -// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue). +// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue). // XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet. // The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively. @@ -269,45 +269,42 @@ public: }; bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept; -/* g++ defines __SANITIZE_THREAD__ - clang++ supports the nice __has_feature(thread_sanitizer), - let's merge them */ -#if defined(__has_feature) -#if __has_feature(thread_sanitizer) -#define __SANITIZE_THREAD__ 1 -#endif -#endif - // XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq. // After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags. // XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq. class XskWorker { -#if defined(__SANITIZE_THREAD__) - using XskPacketRing = LockGuarded>>; -#else - using XskPacketRing = boost::lockfree::spsc_queue>; -#endif - public: + enum class Type : uint8_t { OutgoingOnly, Bidirectional}; + +private: + using XskPacketRing = boost::lockfree::spsc_queue>; // queue of packets to be processed by this worker - XskPacketRing incomingPacketsQueue; + XskPacketRing d_incomingPacketsQueue; // queue of packets processed by this worker (to be sent, or discarded) - XskPacketRing outgoingPacketsQueue; - - uint8_t* umemBufBase{nullptr}; + XskPacketRing d_outgoingPacketsQueue; // list of frames that are shared with the XskRouter - std::shared_ptr>> sharedEmptyFrameOffset; - const size_t frameSize{XskSocket::getFrameSize()}; + std::shared_ptr>> d_sharedEmptyFrameOffset; + uint8_t* d_umemBufBase{nullptr}; + const size_t d_frameSize{XskSocket::getFrameSize()}; + Type d_type; + +public: FDWrapper workerWaker; FDWrapper xskSocketWaker; - XskWorker(); static int createEventfd(); static void notify(int desc); - static std::shared_ptr create(); + static std::shared_ptr create(Type); + + XskWorker(Type); + void setSharedFrames(std::shared_ptr>>& frames); + void setUmemBufBase(uint8_t* base); void pushToProcessingQueue(XskPacket& packet); void pushToSendQueue(XskPacket& packet); + bool hasIncomingFrames(); + void processIncomingFrames(const std::function& callback); + void processOutgoingFrames(const std::function& callback); void markAsFree(const XskPacket& packet); // notify worker that at least one packet is available for processing void notifyWorker() const;