From: Remi Gacogne Date: Tue, 26 Sep 2023 10:35:50 +0000 (+0200) Subject: dnsdist: Minor clean ups in the XSK code X-Git-Tag: dnsdist-1.9.0-rc1^2~37 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4dd6b4b46314a0b9e7eab9a9b74104456f71fde3;p=thirdparty%2Fpdns.git dnsdist: Minor clean ups in the XSK code --- diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 82ea4fc849..ae1d00aa1f 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -780,11 +780,11 @@ static void XskHealthCheck(std::shared_ptr& dss, std::unordered data->d_initial = initial; setHealthCheckTime(dss, data); auto* frame = xskInfo->getEmptyframe(); - auto *xskPacket = new XskPacket(frame, 0, xskInfo->frameSize); + auto xskPacket = std::make_unique(frame, 0, xskInfo->frameSize); xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr); xskPacket->setPayload(packet); xskPacket->rewrite(); - xskInfo->sq.push(xskPacket); + xskInfo->pushToSendQueue(std::move(xskPacket)); const auto queryId = data->d_queryID; map[queryId] = std::move(data); } @@ -853,9 +853,10 @@ void responderThread(std::shared_ptr dss) bool needNotify = false; if (pollfds[0].revents & POLLIN) { needNotify = true; - xskInfo->cq.consume_all([&](XskPacket* packet) { + xskInfo->cq.consume_all([&](XskPacket* packetRaw) { + auto packet = XskPacketPtr(packetRaw); if (packet->dataLen() < sizeof(dnsheader)) { - xskInfo->sq.push(packet); + xskInfo->pushToSendQueue(std::move(packet)); return; } const auto* dh = reinterpret_cast(packet->payloadData()); @@ -876,7 +877,7 @@ void responderThread(std::shared_ptr dss) packet->cloneIntoPacketBuffer(data->d_buffer); data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); } - xskInfo->sq.push(packet); + xskInfo->pushToSendQueue(std::move(packet)); return; } auto response = packet->clonePacketBuffer(); @@ -885,13 +886,13 @@ void responderThread(std::shared_ptr dss) ids->xskPacketHeader.reset(); } if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { - xskInfo->sq.push(packet); + xskInfo->pushToSendQueue(std::move(packet)); return; } if (response.size() > packet->capacity()) { /* fallback to sending the packet via normal socket */ sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); - xskInfo->sq.push(packet); + xskInfo->pushToSendQueue(std::move(packet)); return; } packet->setHeader(*ids->xskPacketHeader); @@ -900,7 +901,7 @@ void responderThread(std::shared_ptr dss) packet->addDelay(ids->delayMsec); } packet->updatePacket(); - xskInfo->sq.push(packet); + xskInfo->pushToSendQueue(std::move(packet)); }); xskInfo->cleanSocketNotification(); } @@ -1006,7 +1007,7 @@ void responderThread(std::shared_ptr dss) xskPacket->setHeader(*ids->xskPacketHeader); xskPacket->setPayload(response); xskPacket->updatePacket(); - xskInfo->sq.push(xskPacket.release()); + xskInfo->pushToSendQueue(std::move(xskPacket)); xskInfo->notifyXskSocket(); #endif /* HAVE_XSK */ } @@ -2248,10 +2249,11 @@ static void xskClientThread(ClientState* cs) while (!xskInfo->cq.read_available()) { xskInfo->waitForXskSocket(); } - xskInfo->cq.consume_all([&](XskPacket* packet) { + xskInfo->cq.consume_all([&](XskPacket* packetRaw) { + auto packet = XskPacketPtr(packetRaw); ProcessXskQuery(*cs, holders, *packet); packet->updatePacket(); - xskInfo->sq.push(packet); + xskInfo->pushToSendQueue(std::move(packet)); }); xskInfo->notifyXskSocket(); } @@ -3652,7 +3654,7 @@ void XskRouter(std::shared_ptr xsk) xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet)); continue; } - res->worker->cq.push(packet.release()); + res->worker->pushToProcessingQueue(std::move(packet)); needNotify.insert(res->workerWaker); } for (auto i : needNotify) { @@ -3672,17 +3674,18 @@ void XskRouter(std::shared_ptr xsk) if (xsk->fds[i].revents & POLLIN) { ready--; auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker; - info->sq.consume_all([&](XskPacket* x) { - if (!(x->getFlags() & XskPacket::UPDATE)) { - xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*x)); + info->sq.consume_all([&](XskPacket* packetRaw) { + auto packet = XskPacketPtr(packetRaw); + if (!(packet->getFlags() & XskPacket::UPDATE)) { + xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet)); + packet.release(); return; } - auto ptr = std::unique_ptr(x); - if (x->getFlags() & XskPacket::DELAY) { - xsk->waitForDelay.push(std::move(ptr)); + if (packet->getFlags() & XskPacket::DELAY) { + xsk->waitForDelay.push(std::move(packet)); return; } - fillInTx.push_back(std::move(ptr)); + fillInTx.push_back(std::move(packet)); }); info->cleanWorkerNotification(); } diff --git a/pdns/xsk.cc b/pdns/xsk.cc index c57f97f367..4f055d626c 100644 --- a/pdns/xsk.cc +++ b/pdns/xsk.cc @@ -28,7 +28,6 @@ #include "xsk.hh" #include -#include #include #include #include @@ -65,6 +64,7 @@ constexpr bool XskSocket::isPowOfTwo(uint32_t value) noexcept { return value != 0 && (value & (value - 1)) == 0; } + int XskSocket::firstTimeout() { if (waitForDelay.empty()) { @@ -243,14 +243,14 @@ void XskSocket::recycle(size_t size) noexcept xsk_ring_cons__release(&cq, completeSize); } -void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config) +void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config) { size = memSize; bufBase = static_cast(mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)); if (bufBase == MAP_FAILED) { throw std::runtime_error("mmap failed"); } - auto ret = xsk_umem__create(&umem, bufBase, size, fq, cq, config); + auto ret = xsk_umem__create(&umem, bufBase, size, fillQueue, completionQueue, config); if (ret != 0) { munmap(bufBase, size); throw std::runtime_error("Error creating a umem of size" + std::to_string(size) + stringerror(ret)); @@ -326,7 +326,7 @@ bool XskPacket::parse() if (l4Protocol == IPPROTO_UDP) { // check udp.check == ipv4Checksum() is not needed! // We check it in BPF program - auto* udp = reinterpret_cast(l4Header); + const auto* udp = reinterpret_cast(l4Header); payload = l4Header + sizeof(udphdr); // Because of XskPacket::setHeader // payload = payloadEnd should be allow @@ -341,7 +341,7 @@ bool XskPacket::parse() if (l4Protocol == IPPROTO_TCP) { // check tcp.check == ipv4Checksum() is not needed! // We check it in BPF program - auto* tcp = reinterpret_cast(l4Header); + const auto* tcp = reinterpret_cast(l4Header); if (tcp->doff != static_cast(sizeof(tcphdr) >> 2)) { // tcp is not supported now! return false; @@ -514,6 +514,23 @@ XskWorker::XskWorker() : workerWaker(createEventfd()), xskSocketWaker(createEventfd()) { } + +void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet) +{ + auto raw = packet.release(); + if (!cq.push(raw)) { + delete raw; + } +} + +void XskWorker::pushToSendQueue(XskPacketPtr&& packet) +{ + auto raw = packet.release(); + if (!sq.push(raw)) { + delete raw; + } +} + void* XskPacket::payloadData() { return reinterpret_cast(payload); @@ -670,7 +687,7 @@ void XskPacket::rewrite() noexcept }; }; struct ipv4_pseudo_header_t pseudo_header; - assert(sizeof(pseudo_header) == 12); + static_assert(sizeof(pseudo_header) == 12, "IPv4 pseudo-header size is incorrect"); /* Fill in the pseudo-header. */ pseudo_header.fields.src_ip = src_ip; @@ -701,7 +718,7 @@ void XskPacket::rewrite() noexcept }; }; struct ipv6_pseudo_header_t pseudo_header; - assert(sizeof(pseudo_header) == 40); + static_assert(sizeof(pseudo_header) == 40, "IPv6 pseudo-header size is incorrect"); /* Fill in the pseudo-header. */ pseudo_header.fields.src_ip = *src_ip; @@ -711,13 +728,14 @@ void XskPacket::rewrite() noexcept pseudo_header.fields.next_header = protocol; return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0); } -void XskPacket::setHeader(const PacketBuffer& buf) noexcept +void XskPacket::setHeader(const PacketBuffer& buf) { memcpy(frame, buf.data(), buf.size()); payloadEnd = frame + buf.size(); flags = 0; - const auto res = parse(); - assert(res); + if (!parse()) { + throw std::runtime_error("Error setting the XSK frame header"); + } } std::unique_ptr XskPacket::cloneHeadertoPacketBuffer() const { diff --git a/pdns/xsk.hh b/pdns/xsk.hh index b4e10c6eee..6715571091 100644 --- a/pdns/xsk.hh +++ b/pdns/xsk.hh @@ -208,7 +208,7 @@ public: void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp = false) noexcept; bool setPayload(const PacketBuffer& buf); void rewrite() noexcept; - void setHeader(const PacketBuffer& buf) noexcept; + void setHeader(const PacketBuffer& buf); XskPacket() = default; XskPacket(void* frame, size_t dataSize, size_t frameSize); void addDelay(int relativeMilliseconds) noexcept; @@ -225,13 +225,14 @@ class XskWorker using XskPacketRing = boost::lockfree::spsc_queue>; public: - uint8_t* umemBufBase; - std::shared_ptr>> sharedEmptyFrameOffset; - vector uniqueEmptyFrameOffset; // queue of packets to be processed by this worker XskPacketRing cq; // queue of packets processed by this worker (to be sent, or discarded) XskPacketRing sq; + + uint8_t* umemBufBase; + std::shared_ptr>> sharedEmptyFrameOffset; + vector uniqueEmptyFrameOffset; std::string poolName; size_t frameSize; FDWrapper workerWaker; @@ -241,6 +242,8 @@ public: static int createEventfd(); static void notify(int fd); static std::shared_ptr create(); + void pushToProcessingQueue(XskPacketPtr&& packet); + void pushToSendQueue(XskPacketPtr&& packet); // notify worker that at least one packet is available for processing void notifyWorker() noexcept; // notify the router that packets are ready to be sent