From: Remi Gacogne Date: Thu, 28 Dec 2023 15:20:40 +0000 (+0100) Subject: dnsdist: Get rid of memory allocations in the XSK hot path X-Git-Tag: dnsdist-1.9.0-rc1^2~33 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ae61f00ab206e812791838ddd9fe798e146e6ac4;p=thirdparty%2Fpdns.git dnsdist: Get rid of memory allocations in the XSK hot path --- diff --git a/pdns/dnsdist-idstate.hh b/pdns/dnsdist-idstate.hh index 49248a08c7..f83f9b71f5 100644 --- a/pdns/dnsdist-idstate.hh +++ b/pdns/dnsdist-idstate.hh @@ -120,6 +120,15 @@ struct InternalQueryState InternalQueryState(const InternalQueryState& orig) = delete; InternalQueryState& operator=(const InternalQueryState& orig) = delete; + bool isXSK() const noexcept + { +#ifdef HAVE_XSK + return !xskPacketHeader.empty(); +#else + return false; +#endif /* HAVE_XSK */ + } + boost::optional subnet{boost::none}; // 40 ComboAddress origRemote; // 28 ComboAddress origDest; // 28 @@ -129,12 +138,14 @@ struct InternalQueryState std::string poolName; // 24 StopWatch queryRealTime{true}; // 24 std::shared_ptr packetCache{nullptr}; // 16 +#ifdef HAVE_XSK + PacketBuffer xskPacketHeader; // 8 +#endif /* HAVE_XSK */ std::unique_ptr dnsCryptQuery{nullptr}; // 8 std::unique_ptr qTag{nullptr}; // 8 std::unique_ptr d_packet{nullptr}; // Initial packet, so we can restart the query from the response path if needed // 8 std::unique_ptr d_protoBufData{nullptr}; std::unique_ptr d_extendedError{nullptr}; - std::unique_ptr xskPacketHeader; // 8 boost::optional tempFailureTTL{boost::none}; // 8 ClientState* cs{nullptr}; // 8 std::unique_ptr du; // 8 diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index fb1c0e4379..a4dcd3f1bd 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -750,7 +750,7 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re } bool muted = true; - if (ids.cs && !ids.cs->muted && !ids.xskPacketHeader) { + if (ids.cs && !ids.cs->muted && !ids.isXSK()) { sendUDPResponse(ids.cs->udpFD, response, dr.ids.delayMsec, ids.hopLocal, ids.hopRemote); muted = false; } @@ -761,7 +761,7 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re vinfolog("Got answer from %s, relayed to %s (UDP), took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff); } else { - if (!ids.xskPacketHeader) { + if (!ids.isXSK()) { vinfolog("Got answer from %s, NOT relayed to %s (UDP) since that frontend is muted, took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff); } else { @@ -829,7 +829,7 @@ static void doHealthCheck(std::shared_ptr& dss, std::unordered_ xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr); xskPacket->setPayload(packet); xskPacket->rewrite(); - xskInfo->pushToSendQueue(std::move(xskPacket)); + xskInfo->pushToSendQueue(std::move(*xskPacket)); const auto queryId = data->d_queryID; map[queryId] = std::move(data); } @@ -863,20 +863,19 @@ void responderThread(std::shared_ptr dss) if (pollfds[0].revents & POLLIN) { needNotify = true; #if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { #else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { #endif - auto packet = XskPacketPtr(packetRaw); - if (packet->getDataLen() < sizeof(dnsheader)) { + if (packet.getDataLen() < sizeof(dnsheader)) { xskInfo->markAsFree(std::move(packet)); return; } - const dnsheader_aligned dnsHeader(packet->getPayloadData()); + const dnsheader_aligned dnsHeader(packet.getPayloadData()); const auto queryId = dnsHeader->id; auto ids = dss->getState(queryId); if (ids) { - if (xskFd != ids->backendFD || !ids->xskPacketHeader) { + if (xskFd != ids->backendFD || !ids->isXSK()) { dss->restoreState(queryId, std::move(*ids)); ids = std::nullopt; } @@ -887,16 +886,16 @@ void responderThread(std::shared_ptr dss) if (iter != healthCheckMap.end()) { auto data = std::move(iter->second); healthCheckMap.erase(iter); - packet->cloneIntoPacketBuffer(data->d_buffer); + packet.cloneIntoPacketBuffer(data->d_buffer); data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); } xskInfo->markAsFree(std::move(packet)); return; } - auto response = packet->clonePacketBuffer(); - if (response.size() > packet->getCapacity()) { + auto response = packet.clonePacketBuffer(); + if (response.size() > packet.getCapacity()) { /* fallback to sending the packet via normal socket */ - ids->xskPacketHeader.reset(); + ids->xskPacketHeader.clear(); } if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { xskInfo->markAsFree(std::move(packet)); @@ -904,7 +903,7 @@ void responderThread(std::shared_ptr dss) return; } vinfolog("XSK packet - processResponderPacket OK"); - if (response.size() > packet->getCapacity()) { + if (response.size() > packet.getCapacity()) { /* fallback to sending the packet via normal socket */ sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); vinfolog("XSK packet falling back because packet is too large"); @@ -912,17 +911,17 @@ void responderThread(std::shared_ptr dss) return; } //vinfolog("XSK packet - set header"); - packet->setHeader(*ids->xskPacketHeader); + packet.setHeader(ids->xskPacketHeader); //vinfolog("XSK packet - set payload"); - if (!packet->setPayload(response)) { + if (!packet.setPayload(response)) { vinfolog("Unable to set payload !"); } if (ids->delayMsec > 0) { vinfolog("XSK packet - adding delay"); - packet->addDelay(ids->delayMsec); + packet.addDelay(ids->delayMsec); } //vinfolog("XSK packet - update packet"); - packet->updatePacket(); + packet.updatePacket(); //vinfolog("XSK packet pushed to send queue"); xskInfo->pushToSendQueue(std::move(packet)); }); @@ -993,7 +992,7 @@ void XskRouter(std::shared_ptr xsk) setThreadName("dnsdist/XskRouter"); uint32_t failed; // packets to be submitted for sending - vector fillInTx; + vector fillInTx; const auto& fds = xsk->getDescriptors(); // list of workers that need to be notified std::set needNotify; @@ -1007,7 +1006,7 @@ void XskRouter(std::shared_ptr xsk) auto packets = xsk->recv(64, &failed); dnsdist::metrics::g_stats.nonCompliantQueries += failed; for (auto &packet : packets) { - const auto dest = packet->getToAddr(); + const auto dest = packet.getToAddr(); auto res = destIdx.find(dest); if (res == destIdx.end()) { xsk->markAsFree(std::move(packet)); @@ -1034,16 +1033,15 @@ void XskRouter(std::shared_ptr xsk) ready--; auto& info = xskWakerIdx.find(fds.at(fdIndex).fd)->worker; #if defined(__SANITIZE_THREAD__) - info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { + info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { #else - info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) { + info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) { #endif - auto packet = XskPacketPtr(packetRaw); - if (!(packet->getFlags() & XskPacket::UPDATE)) { + if (!(packet.getFlags() & XskPacket::UPDATE)) { xsk->markAsFree(std::move(packet)); return; } - if (packet->getFlags() & XskPacket::DELAY) { + if (packet.getFlags() & XskPacket::DELAY) { xsk->pushDelayed(std::move(packet)); return; } @@ -1131,7 +1129,7 @@ void responderThread(std::shared_ptr dss) continue; } - if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->xskPacketHeader && ids->cs->xskInfo) { + if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) { #ifdef HAVE_XSK //vinfolog("processResponderPacket OK"); auto& xskInfo = ids->cs->xskInfo; @@ -1140,13 +1138,13 @@ void responderThread(std::shared_ptr dss) continue; } //vinfolog("XSK setHeader"); - xskPacket->setHeader(*ids->xskPacketHeader); + xskPacket->setHeader(ids->xskPacketHeader); //vinfolog("XSK payload"); xskPacket->setPayload(response); //vinfolog("XSK update packet"); xskPacket->updatePacket(); //vinfolog("XSK pushed to send queue"); - xskInfo->pushToSendQueue(std::move(xskPacket)); + xskInfo->pushToSendQueue(std::move(*xskPacket)); xskInfo->notifyXskSocket(); #endif /* HAVE_XSK */ } @@ -2284,13 +2282,12 @@ static void xskClientThread(ClientState* cs) xskInfo->waitForXskSocket(); } #if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { #else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { #endif - auto packet = XskPacketPtr(packetRaw); - if (ProcessXskQuery(*cs, holders, *packet)) { - packet->updatePacket(); + if (ProcessXskQuery(*cs, holders, packet)) { + packet.updatePacket(); xskInfo->pushToSendQueue(std::move(packet)); } else { diff --git a/pdns/xsk.cc b/pdns/xsk.cc index 5b3c2142df..371587da52 100644 --- a/pdns/xsk.cc +++ b/pdns/xsk.cc @@ -58,7 +58,6 @@ extern "C" #include "gettime.hh" #include "xsk.hh" -#define DEBUG_UMEM 0 #ifdef DEBUG_UMEM namespace { struct UmemEntryStatus @@ -97,7 +96,7 @@ int XskSocket::firstTimeout() } timespec now; gettime(&now); - const auto& firstTime = waitForDelay.top()->getSendTime(); + const auto& firstTime = waitForDelay.top().getSendTime(); const auto res = timeDifference(now, firstTime); if (res <= 0) { return 0; @@ -226,7 +225,7 @@ int XskSocket::wait(int timeout) return xsk_socket__fd(socket.get()); } -void XskSocket::send(std::vector& packets) +void XskSocket::send(std::vector& packets) { while (packets.size() > 0) { auto packetSize = packets.size(); @@ -246,11 +245,11 @@ void XskSocket::send(std::vector& packets) break; } *xsk_ring_prod__tx_desc(&tx, idx++) = { - .addr = frameOffset(*packet), - .len = packet->getFrameLen(), + .addr = frameOffset(packet), + .len = packet.getFrameLen(), .options = 0}; #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); #endif /* DEBUG_UMEM */ queued++; } @@ -259,10 +258,10 @@ void XskSocket::send(std::vector& packets) } } -std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount) +std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount) { uint32_t idx{0}; - std::vector res; + std::vector res; // how many descriptors to packets have been filled const auto recvSize = xsk_ring_cons__peek(&rx, recvSizeMax, &idx); if (recvSize == 0) { @@ -276,17 +275,17 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failed for (; processed < recvSize; processed++) { try { const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++); - auto ptr = std::make_unique(reinterpret_cast(desc->addr + baseAddr), desc->len, frameSize); + XskPacket packet = XskPacket(reinterpret_cast(desc->addr + baseAddr), desc->len, frameSize); #ifdef DEBUG_UMEM - checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*ptr), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); #endif /* DEBUG_UMEM */ - if (!ptr->parse(false)) { + if (!packet.parse(false)) { ++failed; - markAsFree(std::move(ptr)); + markAsFree(std::move(packet)); } else { - res.push_back(std::move(ptr)); + res.push_back(std::move(packet)); } } catch (const std::exception& exp) { @@ -310,12 +309,12 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failed return res; } -void XskSocket::pickUpReadyPacket(std::vector& packets) +void XskSocket::pickUpReadyPacket(std::vector& packets) { timespec now; gettime(&now); - while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top()->getSendTime()) <= 0) { - auto& top = const_cast(waitForDelay.top()); + while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top().getSendTime()) <= 0) { + auto& top = const_cast(waitForDelay.top()); packets.push_back(std::move(top)); waitForDelay.pop(); } @@ -375,15 +374,14 @@ std::string XskSocket::getMetrics() const return ret.str(); } -void XskSocket::markAsFree(XskPacketPtr&& packet) +void XskSocket::markAsFree(XskPacket&& packet) { - auto offset = frameOffset(*packet); + auto offset = frameOffset(packet); #ifdef DEBUG_UMEM checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ uniqueEmptyFrameOffset.push_back(offset); - packet.release(); } XskSocket::XskUmem::~XskUmem() @@ -674,9 +672,9 @@ void XskPacket::addDelay(const int relativeMilliseconds) noexcept sendTime.tv_nsec %= 1000000000L; } -bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept +bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept { - return s1->getSendTime() < s2->getSendTime(); + return s1.getSendTime() < s2.getSendTime(); } const ComboAddress& XskPacket::getFromAddr() const noexcept @@ -705,27 +703,25 @@ XskWorker::XskWorker() : { } -void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet) +void XskWorker::pushToProcessingQueue(XskPacket&& packet) { - auto raw = packet.release(); #if defined(__SANITIZE_THREAD__) - if (!incomingPacketsQueue.lock()->push(std::move(raw))) { + if (!incomingPacketsQueue.lock()->push(std::move(packet))) { #else - if (!incomingPacketsQueue.push(std::move(raw))) { + if (!incomingPacketsQueue.push(std::move(packet))) { #endif - markAsFree(XskPacketPtr(raw)); + markAsFree(std::move(packet)); } } -void XskWorker::pushToSendQueue(XskPacketPtr&& packet) +void XskWorker::pushToSendQueue(XskPacket&& packet) { - auto raw = packet.release(); #if defined(__SANITIZE_THREAD__) - if (!outgoingPacketsQueue.lock()->push(raw)) { + if (!outgoingPacketsQueue.lock()->push(std::move(packet))) { #else - if (!outgoingPacketsQueue.push(raw)) { + if (!outgoingPacketsQueue.push(std::move(packet))) { #endif - markAsFree(XskPacketPtr(raw)); + markAsFree(std::move(packet)); } } @@ -924,21 +920,22 @@ void XskPacket::rewrite() noexcept return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0); } -void XskPacket::setHeader(const PacketBuffer& buf) +void XskPacket::setHeader(PacketBuffer& buf) { memcpy(frame, buf.data(), buf.size()); frameLength = buf.size(); + buf.clear(); flags = 0; if (!parse(true)) { throw std::runtime_error("Error setting the XSK frame header"); } } -std::unique_ptr XskPacket::cloneHeadertoPacketBuffer() const +PacketBuffer XskPacket::cloneHeadertoPacketBuffer() const { const auto size = getFrameLen() - getDataSize(); - auto tmp = std::make_unique(size); - memcpy(tmp->data(), frame, size); + PacketBuffer tmp(size); + memcpy(tmp.data(), frame, size); return tmp; } @@ -1068,31 +1065,29 @@ void XskWorker::fillUniqueEmptyOffset() } } -XskPacketPtr XskWorker::getEmptyFrame() +std::optional XskWorker::getEmptyFrame() { if (!uniqueEmptyFrameOffset.empty()) { auto offset = uniqueEmptyFrameOffset.back(); uniqueEmptyFrameOffset.pop_back(); - return std::make_unique(offset + umemBufBase, 0, frameSize); + return XskPacket(offset + umemBufBase, 0, frameSize); } fillUniqueEmptyOffset(); if (!uniqueEmptyFrameOffset.empty()) { auto offset = uniqueEmptyFrameOffset.back(); uniqueEmptyFrameOffset.pop_back(); - return std::make_unique(offset + umemBufBase, 0, frameSize); + return XskPacket(offset + umemBufBase, 0, frameSize); } - return nullptr; + return std::nullopt; } -void XskWorker::markAsFree(XskPacketPtr&& packet) +void XskWorker::markAsFree(XskPacket&& packet) { - - auto offset = frameOffset(*packet); + auto offset = frameOffset(packet); #ifdef DEBUG_UMEM checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); #endif /* DEBUG_UMEM */ uniqueEmptyFrameOffset.push_back(offset); - packet.release(); } uint32_t XskPacket::getFlags() const noexcept diff --git a/pdns/xsk.hh b/pdns/xsk.hh index d8dc067563..dc9f285751 100644 --- a/pdns/xsk.hh +++ b/pdns/xsk.hh @@ -101,7 +101,7 @@ class XskSocket // number of entries (frames) in the umem const size_t frameNum; // responses that have been delayed - std::priority_queue waitForDelay; + std::priority_queue waitForDelay; const std::string ifName; const std::string poolName; // AF_XDP socket then worker waker sockets @@ -147,12 +147,12 @@ public: // wait until one event has occurred [[nodiscard]] int wait(int timeout); // add as many packets as possible to the rx queue for sending */ - void send(std::vector& packets); + void send(std::vector& packets); // look at incoming packets in rx, return them if parsing succeeeded - [[nodiscard]] std::vector recv(uint32_t recvSizeMax, uint32_t* failedCount); + [[nodiscard]] std::vector recv(uint32_t recvSizeMax, uint32_t* failedCount); void addWorker(std::shared_ptr s, const ComboAddress& dest); [[nodiscard]] std::string getMetrics() const; - void markAsFree(XskPacketPtr&& packet); + void markAsFree(XskPacket&& packet); [[nodiscard]] WorkerContainer& getWorkers() { return workers; @@ -167,8 +167,8 @@ public: // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset void recycle(size_t size) noexcept; // look at delayed packets, and send the ones that are ready - void pickUpReadyPacket(std::vector& packets); - void pushDelayed(XskPacketPtr&& packet) + void pickUpReadyPacket(std::vector& packets); + void pushDelayed(XskPacket&& packet) { waitForDelay.push(std::move(packet)); } @@ -238,11 +238,11 @@ public: [[nodiscard]] uint32_t getFrameLen() const noexcept; [[nodiscard]] PacketBuffer clonePacketBuffer() const; void cloneIntoPacketBuffer(PacketBuffer& buffer) const; - [[nodiscard]] std::unique_ptr cloneHeadertoPacketBuffer() const; + [[nodiscard]] PacketBuffer cloneHeadertoPacketBuffer() const; void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept; bool setPayload(const PacketBuffer& buf); void rewrite() noexcept; - void setHeader(const PacketBuffer& buf); + void setHeader(PacketBuffer& buf); XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize); void addDelay(int relativeMilliseconds) noexcept; void updatePacket() noexcept; @@ -258,7 +258,7 @@ public: return frame - base; } }; -bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept; +bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept; /* g++ defines __SANITIZE_THREAD__ clang++ supports the nice __has_feature(thread_sanitizer), @@ -275,9 +275,9 @@ bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept; class XskWorker { #if defined(__SANITIZE_THREAD__) - using XskPacketRing = LockGuarded>>; + using XskPacketRing = LockGuarded>>; #else - using XskPacketRing = boost::lockfree::spsc_queue>; + using XskPacketRing = boost::lockfree::spsc_queue>; #endif public: @@ -300,9 +300,9 @@ public: static int createEventfd(); static void notify(int fd); static std::shared_ptr create(); - void pushToProcessingQueue(XskPacketPtr&& packet); - void pushToSendQueue(XskPacketPtr&& packet); - void markAsFree(XskPacketPtr&& packet); + void pushToProcessingQueue(XskPacket&& packet); + void pushToSendQueue(XskPacket&& packet); + void markAsFree(XskPacket&& packet); // notify worker that at least one packet is available for processing void notifyWorker() noexcept; // notify the router that packets are ready to be sent @@ -315,7 +315,7 @@ public: void fillUniqueEmptyOffset(); // look for an empty umem entry in uniqueEmptyFrameOffset // then sharedEmptyFrameOffset if needed - XskPacketPtr getEmptyFrame(); + std::optional getEmptyFrame(); }; std::vector getPollFdsForWorker(XskWorker& info); #else