From: Remi Gacogne Date: Mon, 15 Jan 2024 14:14:29 +0000 (+0100) Subject: dnsdist: Clean up and reorganize XSK code X-Git-Tag: dnsdist-1.9.0-rc1^2~31 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2a5e9760ee8474ddfdfd5f2708a576972d2d3421;p=thirdparty%2Fpdns.git dnsdist: Clean up and reorganize XSK code --- diff --git a/pdns/dnsdist-lua-bindings.cc b/pdns/dnsdist-lua-bindings.cc index 45ee564cf6..d0e602e22d 100644 --- a/pdns/dnsdist-lua-bindings.cc +++ b/pdns/dnsdist-lua-bindings.cc @@ -26,6 +26,7 @@ #include "dnsdist-lua.hh" #include "dnsdist-resolver.hh" #include "dnsdist-svc.hh" +#include "dnsdist-xsk.hh" #include "dolog.hh" #include "xsk.hh" @@ -754,9 +755,8 @@ void setupLuaBindings(LuaContext& luaCtx, bool client, bool configCheck) else { throw std::runtime_error("xskMapPath field is required!"); } - extern std::vector> g_xsk; auto socket = std::make_shared(frameNums, ifName, queue_id, path); - g_xsk.push_back(socket); + dnsdist::xsk::g_xsk.push_back(socket); return socket; }); luaCtx.registerFunction::*)()const>("getMetrics", [](const std::shared_ptr& xsk) { diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index cd71ac6b5c..81ad234bfe 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -33,10 +33,6 @@ #include #include -#ifdef HAVE_XSK -#include -#endif /* HAVE_XSK */ - #ifdef HAVE_LIBEDIT #if defined (__OpenBSD__) || defined(__NetBSD__) // If this is not undeffed, __attribute__ wil be redefined by /usr/include/readline/rlstdc.h @@ -73,6 +69,7 @@ #include "dnsdist-tcp.hh" #include "dnsdist-web.hh" #include "dnsdist-xpf.hh" +#include "dnsdist-xsk.hh" #include "base64.hh" #include "capabilities.hh" @@ -88,6 +85,7 @@ #include "misc.hh" #include "sstuff.hh" #include "threadname.hh" +#include "xsk.hh" /* Known sins: @@ -116,7 +114,6 @@ std::vector> g_dohlocals; std::vector> g_doqlocals; std::vector> g_doh3locals; std::vector> g_dnsCryptLocals; -std::vector> g_xsk; shared_ptr g_defaultBPFFilter{nullptr}; std::vector > g_dynBPFFilters; @@ -775,7 +772,7 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re } } -static bool processResponderPacket(std::shared_ptr& dss, PacketBuffer& response, const std::vector& localRespRuleActions, const std::vector& cacheInsertedRespRuleActions, InternalQueryState&& ids) +bool processResponderPacket(std::shared_ptr& dss, PacketBuffer& response, const std::vector& localRespRuleActions, const std::vector& cacheInsertedRespRuleActions, InternalQueryState&& ids) { const dnsheader_aligned dh(response.data()); @@ -811,185 +808,6 @@ static bool processResponderPacket(std::shared_ptr& dss, Packet return true; } -#ifdef HAVE_XSK -namespace dnsdist::xsk -{ -void responderThread(std::shared_ptr dss) -{ - if (dss->xskInfo == nullptr) { - throw std::runtime_error("Starting XSK responder thread for a backend without XSK!"); - } - - try { - setThreadName("dnsdist/XskResp"); - auto localRespRuleActions = g_respruleactions.getLocal(); - auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal(); - auto xskInfo = dss->xskInfo; - auto pollfds = getPollFdsForWorker(*xskInfo); - const auto xskFd = xskInfo->workerWaker.getHandle(); - while (!dss->isStopped()) { - poll(pollfds.data(), pollfds.size(), -1); - bool needNotify = false; - if (pollfds[0].revents & POLLIN) { - needNotify = true; -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif - if (packet.getDataLen() < sizeof(dnsheader)) { - xskInfo->markAsFree(std::move(packet)); - return; - } - const dnsheader_aligned dnsHeader(packet.getPayloadData()); - const auto queryId = dnsHeader->id; - auto ids = dss->getState(queryId); - if (ids) { - if (xskFd != ids->backendFD || !ids->isXSK()) { - dss->restoreState(queryId, std::move(*ids)); - ids = std::nullopt; - } - } - if (!ids) { - xskInfo->markAsFree(std::move(packet)); - return; - } - auto response = packet.clonePacketBuffer(); - if (response.size() > packet.getCapacity()) { - /* fallback to sending the packet via normal socket */ - ids->xskPacketHeader.clear(); - } - if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { - xskInfo->markAsFree(std::move(packet)); - vinfolog("XSK packet pushed to queue because processResponderPacket failed"); - return; - } - if (response.size() > packet.getCapacity()) { - /* fallback to sending the packet via normal socket */ - sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); - vinfolog("XSK packet falling back because packet is too large"); - xskInfo->markAsFree(std::move(packet)); - return; - } - packet.setHeader(ids->xskPacketHeader); - if (!packet.setPayload(response)) { - vinfolog("Unable to set payload !"); - } - if (ids->delayMsec > 0) { - vinfolog("XSK packet - adding delay"); - packet.addDelay(ids->delayMsec); - } - packet.updatePacket(); - xskInfo->pushToSendQueue(std::move(packet)); - }); - xskInfo->cleanSocketNotification(); - } - if (needNotify) { - xskInfo->notifyXskSocket(); - } - } - } - catch (const std::exception& e) { - errlog("XSK responder thread died because of exception: %s", e.what()); - } - catch (const PDNSException& e) { - errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason); - } - catch (...) { - errlog("XSK responder thread died because of an exception: %s", "unknown"); - } -} - -static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept -{ - const auto& from = packet.getFromAddr(); - expectProxyProtocol = expectProxyProtocolFrom(from); - if (!holders.acl->match(from) && !expectProxyProtocol) { - vinfolog("Query from %s dropped because of ACL", from.toStringWithPort()); - ++dnsdist::metrics::g_stats.aclDrops; - return false; - } - cs.queries++; - ++dnsdist::metrics::g_stats.queries; - - return true; -} - -static void XskRouter(std::shared_ptr xsk) -{ - setThreadName("dnsdist/XskRouter"); - uint32_t failed; - // packets to be submitted for sending - vector fillInTx; - const auto& fds = xsk->getDescriptors(); - // list of workers that need to be notified - std::set needNotify; - while (true) { - try { - auto ready = xsk->wait(-1); - // descriptor 0 gets incoming AF_XDP packets - if (fds.at(0).revents & POLLIN) { - auto packets = xsk->recv(64, &failed); - dnsdist::metrics::g_stats.nonCompliantQueries += failed; - for (auto &packet : packets) { - const auto dest = packet.getToAddr(); - auto worker = xsk->getWorkerByDestination(dest); - if (!worker) { - xsk->markAsFree(std::move(packet)); - continue; - } - worker->pushToProcessingQueue(std::move(packet)); - needNotify.insert(worker->workerWaker.getHandle()); - } - for (auto i : needNotify) { - uint64_t x = 1; - auto written = write(i, &x, sizeof(x)); - if (written != sizeof(x)) { - // oh, well, the worker is clearly overloaded - // but there is nothing we can do about it, - // and hopefully the queue will be processed eventually - } - } - needNotify.clear(); - ready--; - } - const auto backup = ready; - for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) { - if (fds.at(fdIndex).revents & POLLIN) { - ready--; - auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd); -#if defined(__SANITIZE_THREAD__) - info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif - if (!(packet.getFlags() & XskPacket::UPDATE)) { - xsk->markAsFree(std::move(packet)); - return; - } - if (packet.getFlags() & XskPacket::DELAY) { - xsk->pushDelayed(std::move(packet)); - return; - } - fillInTx.push_back(std::move(packet)); - }); - info->cleanWorkerNotification(); - } - } - xsk->pickUpReadyPacket(fillInTx); - xsk->recycle(4096); - xsk->fillFq(); - xsk->send(fillInTx); - ready = backup; - } - catch (...) { - vinfolog("Exception in XSK router loop"); - } - } -} -} -#endif /* HAVE_XSK */ - // listens on a dedicated socket, lobs answers from downstream servers to original requestors void responderThread(std::shared_ptr dss) { @@ -1065,7 +883,7 @@ void responderThread(std::shared_ptr dss) xskPacket->setHeader(ids->xskPacketHeader); xskPacket->setPayload(response); xskPacket->updatePacket(); - xskInfo->pushToSendQueue(std::move(*xskPacket)); + xskInfo->pushToSendQueue(*xskPacket); xskInfo->notifyXskSocket(); #endif /* HAVE_XSK */ } @@ -2061,7 +1879,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct #ifdef HAVE_XSK namespace dnsdist::xsk { -static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) +bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) { uint16_t queryId = 0; const auto& remote = packet.getFromAddr(); @@ -2077,7 +1895,7 @@ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p try { bool expectProxyProtocol = false; - if (!isXskQueryAcceptable(packet, cs, holders, expectProxyProtocol)) { + if (!XskIsQueryAcceptable(packet, cs, holders, expectProxyProtocol)) { return false; } @@ -2191,36 +2009,6 @@ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p return false; } -static void xskClientThread(ClientState* cs) -{ - setThreadName("dnsdist/xskClient"); - auto xskInfo = cs->xskInfo; - LocalHolders holders; - - for (;;) { -#if defined(__SANITIZE_THREAD__) - while (!xskInfo->incomingPacketsQueue.lock()->read_available()) { -#else - while (!xskInfo->incomingPacketsQueue.read_available()) { -#endif - xskInfo->waitForXskSocket(); - } -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { -#endif - if (ProcessXskQuery(*cs, holders, packet)) { - packet.updatePacket(); - xskInfo->pushToSendQueue(std::move(packet)); - } - else { - xskInfo->markAsFree(std::move(packet)); - } - }); - xskInfo->notifyXskSocket(); - } -} } #endif /* HAVE_XSK */ @@ -3295,7 +3083,7 @@ namespace dnsdist static void startFrontends() { #ifdef HAVE_XSK - for (auto& xskContext : g_xsk) { + for (auto& xskContext : dnsdist::xsk::g_xsk) { std::thread xskThread(dnsdist::xsk::XskRouter, std::move(xskContext)); xskThread.detach(); } @@ -3306,9 +3094,9 @@ static void startFrontends() for (auto& clientState : g_frontends) { #ifdef HAVE_XSK if (clientState->xskInfo) { - XskSocket::addDestinationAddress(clientState->local); + dnsdist::xsk::addDestinationAddress(clientState->local); - std::thread xskCT(dnsdist::xsk::xskClientThread, clientState.get()); + std::thread xskCT(dnsdist::xsk::XskClientThread, clientState.get()); if (!clientState->cpus.empty()) { mapThreadToCPUList(xskCT.native_handle(), clientState->cpus); } @@ -3415,7 +3203,14 @@ int main(int argc, char** argv) g_hashperturb = dnsdist::getRandomValue(0xffffffff); #ifdef HAVE_XSK - XskSocket::clearDestinationAddresses(); +#warning FIXME: we need to provide a way to clear the map from Lua, as well as a way to change the map path + try { + dnsdist::xsk::clearDestinationAddresses(); + } + catch (const std::exception& exp) { + /* silently handle failures: at this point we don't even know if XSK is enabled, + and we might not have the correct map (not the default one). */ + } #endif /* HAVE_XSK */ ComboAddress clientAddress = ComboAddress(); diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 4cec0d0304..254e64af61 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -56,7 +56,6 @@ #include "uuid-utils.hh" #include "proxy-protocol.hh" #include "stat_t.hh" -#include "xsk.hh" uint64_t uptimeOfProcess(const std::string& str); @@ -472,6 +471,10 @@ struct QueryCount { extern QueryCount g_qcount; +class XskPacket; +class XskSocket; +class XskWorker; + struct ClientState { ClientState(const ComboAddress& local_, bool isTCP_, bool doReusePort, int fastOpenQueue, const std::string& itfName, const std::set& cpus_, bool enableProxyProtocol): cpus(cpus_), interface(itfName), local(local_), fastOpenQueueSize(fastOpenQueue), tcp(isTCP_), reuseport(doReusePort), d_enableProxyProtocol(enableProxyProtocol) @@ -704,8 +707,10 @@ struct DownstreamState: public std::enable_shared_from_this std::string d_dohPath; std::string name; std::string nameWithAddr; - MACAddr sourceMACAddr; - MACAddr destMACAddr; +#ifdef HAVE_XSK + std::array sourceMACAddr; + std::array destMACAddr; +#endif /* HAVE_XSK */ size_t d_numberOfSockets{1}; size_t d_maxInFlightQueriesPerConn{1}; size_t d_tcpConcurrentConnectionsLimit{0}; @@ -1189,6 +1194,7 @@ ProcessQueryResult processQueryAfterRules(DNSQuestion& dq, LocalHolders& holders bool processResponse(PacketBuffer& response, const std::vector& respRuleActions, const std::vector& insertedRespRuleActions, DNSResponse& dr, bool muted); bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop); bool processResponseAfterRules(PacketBuffer& response, const std::vector& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted); +bool processResponderPacket(std::shared_ptr& dss, PacketBuffer& response, const std::vector& localRespRuleActions, const std::vector& cacheInsertedRespRuleActions, InternalQueryState&& ids); bool assignOutgoingUDPQueryToBackend(std::shared_ptr& ds, uint16_t queryID, DNSQuestion& dq, PacketBuffer& query, bool actuallySend = true); @@ -1196,10 +1202,3 @@ ssize_t udpClientSendRequestToBackend(const std::shared_ptr& ss bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote); void handleResponseSent(const DNSName& qname, const QType& qtype, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, dnsdist::Protocol incomingProtocol, bool fromBackend); void handleResponseSent(const InternalQueryState& ids, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, bool fromBackend); - -#ifdef HAVE_XSK -namespace dnsdist::xsk -{ -void responderThread(std::shared_ptr dss); -} -#endif /* HAVE_XSK */ diff --git a/pdns/dnsdistdist/Makefile.am b/pdns/dnsdistdist/Makefile.am index c95629daac..1a1e3080c1 100644 --- a/pdns/dnsdistdist/Makefile.am +++ b/pdns/dnsdistdist/Makefile.am @@ -207,6 +207,7 @@ dnsdist_SOURCES = \ dnsdist-tcp.cc dnsdist-tcp.hh \ dnsdist-web.cc dnsdist-web.hh \ dnsdist-xpf.cc dnsdist-xpf.hh \ + dnsdist-xsk.cc dnsdist-xsk.hh \ dnsdist.cc dnsdist.hh \ dnslabeltext.cc \ dnsname.cc dnsname.hh \ @@ -305,6 +306,7 @@ testrunner_SOURCES = \ dnsdist-tcp-downstream.cc \ dnsdist-tcp.cc dnsdist-tcp.hh \ dnsdist-xpf.cc dnsdist-xpf.hh \ + dnsdist-xsk.cc dnsdist-xsk.hh \ dnsdist.hh \ dnslabeltext.cc \ dnsname.cc dnsname.hh \ diff --git a/pdns/dnsdistdist/dnsdist-backend.cc b/pdns/dnsdistdist/dnsdist-backend.cc index 02cfea5c7d..49b47b8cdc 100644 --- a/pdns/dnsdistdist/dnsdist-backend.cc +++ b/pdns/dnsdistdist/dnsdist-backend.cc @@ -27,6 +27,7 @@ #include "dnsdist-random.hh" #include "dnsdist-rings.hh" #include "dnsdist-tcp.hh" +#include "dnsdist-xsk.hh" #include "dolog.hh" #include "xsk.hh" @@ -53,7 +54,7 @@ void DownstreamState::addXSKDestination(int fd) auto addresses = d_socketSourceAddresses.write_lock(); addresses->push_back(local); } - XskSocket::addDestinationAddress(local); + dnsdist::xsk::addDestinationAddress(local); d_xskSocket->addWorkerRoute(xskInfo, local); } @@ -65,7 +66,7 @@ void DownstreamState::removeXSKDestination(int fd) return; } - XskSocket::removeDestinationAddress(local); + dnsdist::xsk::removeDestinationAddress(local); d_xskSocket->removeWorkerRoute(local); } #endif /* HAVE_XSK */ @@ -329,7 +330,7 @@ void DownstreamState::start() if (connected && !threadStarted.test_and_set()) { #ifdef HAVE_XSK if (xskInfo != nullptr) { - tid = std::thread(dnsdist::xsk::responderThread, shared_from_this()); + tid = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this()); } else { tid = std::thread(responderThread, shared_from_this()); diff --git a/pdns/dnsdistdist/dnsdist-xsk.cc b/pdns/dnsdistdist/dnsdist-xsk.cc new file mode 100644 index 0000000000..4d1bec33cd --- /dev/null +++ b/pdns/dnsdistdist/dnsdist-xsk.cc @@ -0,0 +1,266 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#include "dnsdist.hh" +#include "dnsdist-xsk.hh" + +#ifdef HAVE_XSK +#include + +#include "dnsdist-metrics.hh" +#include "dnsdist-proxy-protocol.hh" +#include "threadname.hh" +#include "xsk.hh" + +namespace dnsdist::xsk +{ +std::vector> g_xsk; + +void XskResponderThread(std::shared_ptr dss) +{ + if (dss->xskInfo == nullptr) { + throw std::runtime_error("Starting XSK responder thread for a backend without XSK!"); + } + + try { + setThreadName("dnsdist/XskResp"); + auto localRespRuleActions = g_respruleactions.getLocal(); + auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal(); + auto xskInfo = dss->xskInfo; + auto pollfds = getPollFdsForWorker(*xskInfo); + const auto xskFd = xskInfo->workerWaker.getHandle(); + while (!dss->isStopped()) { + poll(pollfds.data(), pollfds.size(), -1); + bool needNotify = false; + if ((pollfds[0].revents & POLLIN) != 0) { + needNotify = true; +#if defined(__SANITIZE_THREAD__) + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { +#else + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { +#endif + if (packet.getDataLen() < sizeof(dnsheader)) { + xskInfo->markAsFree(packet); + return; + } + const dnsheader_aligned dnsHeader(packet.getPayloadData()); + const auto queryId = dnsHeader->id; + auto ids = dss->getState(queryId); + if (ids) { + if (xskFd != ids->backendFD || !ids->isXSK()) { + dss->restoreState(queryId, std::move(*ids)); + ids = std::nullopt; + } + } + if (!ids) { + xskInfo->markAsFree(packet); + return; + } + auto response = packet.clonePacketBuffer(); + if (response.size() > packet.getCapacity()) { + /* fallback to sending the packet via normal socket */ + ids->xskPacketHeader.clear(); + } + if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { + xskInfo->markAsFree(packet); + vinfolog("XSK packet pushed to queue because processResponderPacket failed"); + return; + } + if (response.size() > packet.getCapacity()) { + /* fallback to sending the packet via normal socket */ + sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); + vinfolog("XSK packet falling back because packet is too large"); + xskInfo->markAsFree(packet); + return; + } + packet.setHeader(ids->xskPacketHeader); + if (!packet.setPayload(response)) { + vinfolog("Unable to set XSK payload !"); + } + if (ids->delayMsec > 0) { + packet.addDelay(ids->delayMsec); + } + packet.updatePacket(); + xskInfo->pushToSendQueue(packet); + }); + xskInfo->cleanSocketNotification(); + } + if (needNotify) { + xskInfo->notifyXskSocket(); + } + } + } + catch (const std::exception& e) { + errlog("XSK responder thread died because of exception: %s", e.what()); + } + catch (const PDNSException& e) { + errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason); + } + catch (...) { + errlog("XSK responder thread died because of an exception: %s", "unknown"); + } +} + +bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol) +{ + const auto& from = packet.getFromAddr(); + expectProxyProtocol = expectProxyProtocolFrom(from); + if (!holders.acl->match(from) && !expectProxyProtocol) { + vinfolog("Query from %s dropped because of ACL", from.toStringWithPort()); + ++dnsdist::metrics::g_stats.aclDrops; + return false; + } + clientState.queries++; + ++dnsdist::metrics::g_stats.queries; + + return true; +} + +void XskRouter(std::shared_ptr xsk) +{ + setThreadName("dnsdist/XskRouter"); + uint32_t failed = 0; + // packets to be submitted for sending + vector fillInTx; + const auto& fds = xsk->getDescriptors(); + // list of workers that need to be notified + std::set needNotify; + while (true) { + try { + auto ready = xsk->wait(-1); + // descriptor 0 gets incoming AF_XDP packets + if ((fds.at(0).revents & POLLIN) != 0) { + auto packets = xsk->recv(64, &failed); + dnsdist::metrics::g_stats.nonCompliantQueries += failed; + for (auto &packet : packets) { + const auto dest = packet.getToAddr(); + auto worker = xsk->getWorkerByDestination(dest); + if (!worker) { + xsk->markAsFree(packet); + continue; + } + worker->pushToProcessingQueue(packet); + needNotify.insert(worker->workerWaker.getHandle()); + } + for (auto socket : needNotify) { + uint64_t value = 1; + auto written = write(socket, &value, sizeof(value)); + if (written != sizeof(value)) { + // oh, well, the worker is clearly overloaded + // but there is nothing we can do about it, + // and hopefully the queue will be processed eventually + } + } + needNotify.clear(); + ready--; + } + const auto backup = ready; + for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) { + if ((fds.at(fdIndex).revents & POLLIN) != 0) { + ready--; + const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd); +#if defined(__SANITIZE_THREAD__) + info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { +#else + info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) { +#endif + if ((packet.getFlags() & XskPacket::UPDATE) == 0) { + xsk->markAsFree(packet); + return; + } + if ((packet.getFlags() & XskPacket::DELAY) != 0) { + xsk->pushDelayed(packet); + return; + } + fillInTx.push_back(packet); + }); + info->cleanWorkerNotification(); + } + } + xsk->pickUpReadyPacket(fillInTx); + xsk->recycle(4096); + xsk->fillFq(); + xsk->send(fillInTx); + ready = backup; + } + catch (...) { + vinfolog("Exception in XSK router loop"); + } + } +} + +void XskClientThread(ClientState* clientState) +{ + setThreadName("dnsdist/xskClient"); + auto xskInfo = clientState->xskInfo; + LocalHolders holders; + + for (;;) { +#if defined(__SANITIZE_THREAD__) + while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) { +#else + while (xskInfo->incomingPacketsQueue.read_available() == 0U) { +#endif + xskInfo->waitForXskSocket(); + } +#if defined(__SANITIZE_THREAD__) + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) { +#else + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) { +#endif + if (XskProcessQuery(*clientState, holders, packet)) { + packet.updatePacket(); + xskInfo->pushToSendQueue(packet); + } + else { + xskInfo->markAsFree(packet); + } + }); + xskInfo->notifyXskSocket(); + } +} + +static std::string getDestinationMap(bool isV6) { + return !isV6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6"; +} + +void addDestinationAddress(const ComboAddress& addr) +{ + auto map = getDestinationMap(addr.isIPv6()); + XskSocket::addDestinationAddress(map, addr); +} + +void removeDestinationAddress(const ComboAddress& addr) +{ + auto map = getDestinationMap(addr.isIPv6()); + XskSocket::removeDestinationAddress(map, addr); +} + +void clearDestinationAddresses() +{ + auto map = getDestinationMap(false); + XskSocket::clearDestinationMap(map, false); + map = getDestinationMap(true); + XskSocket::clearDestinationMap(map, true); +} + +} +#endif /* HAVE_XSK */ diff --git a/pdns/dnsdistdist/dnsdist-xsk.hh b/pdns/dnsdistdist/dnsdist-xsk.hh new file mode 100644 index 0000000000..6a862d75b4 --- /dev/null +++ b/pdns/dnsdistdist/dnsdist-xsk.hh @@ -0,0 +1,46 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ +#pragma once + +#include "config.h" + +#ifdef HAVE_XSK +class XskPacket; +class XskSocket; +class XskWorker; + +#include + +namespace dnsdist::xsk +{ +void XskResponderThread(std::shared_ptr dss); +bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol); +bool XskProcessQuery(ClientState& clientState, LocalHolders& holders, XskPacket& packet); +void XskRouter(std::shared_ptr xsk); +void XskClientThread(ClientState* clientState); +void addDestinationAddress(const ComboAddress& addr); +void removeDestinationAddress(const ComboAddress& addr); +void clearDestinationAddresses(); + +extern std::vector> g_xsk; +} +#endif /* HAVE_XSK */ diff --git a/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc b/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc index e05c0b56b4..bcb73b2652 100644 --- a/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc +++ b/pdns/dnsdistdist/test-dnsdistlbpolicies_cc.cc @@ -74,13 +74,6 @@ void responderThread(std::shared_ptr dss) { } -namespace dnsdist::xsk -{ -void responderThread(std::shared_ptr dss) -{ -} -} - string g_outputBuffer; std::atomic g_configurationDone{false}; diff --git a/pdns/test-dnsdist_cc.cc b/pdns/test-dnsdist_cc.cc index c29fdabba5..d3e0d29fc0 100644 --- a/pdns/test-dnsdist_cc.cc +++ b/pdns/test-dnsdist_cc.cc @@ -33,6 +33,7 @@ #include "dnsdist-internal-queries.hh" #include "dnsdist-tcp.hh" #include "dnsdist-xpf.hh" +#include "dnsdist-xsk.hh" #include "dolog.hh" #include "dnsname.hh" @@ -73,8 +74,18 @@ bool DNSDistSNMPAgent::sendBackendStatusChangeTrap(DownstreamState const&) { return false; } +namespace dnsdist::xsk +{ +bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) +{ + return false; +} +} -std::vector> g_xsk; +bool processResponderPacket(std::shared_ptr& dss, PacketBuffer& response, const std::vector& localRespRuleActions, const std::vector& cacheInsertedRespRuleActions, InternalQueryState&& ids) +{ + return false; +} BOOST_AUTO_TEST_SUITE(test_dnsdist_cc) diff --git a/pdns/xsk.cc b/pdns/xsk.cc index 15d9f14ec5..b656757a2d 100644 --- a/pdns/xsk.cc +++ b/pdns/xsk.cc @@ -94,7 +94,7 @@ int XskSocket::firstTimeout() if (waitForDelay.empty()) { return -1; } - timespec now; + timespec now{}; gettime(&now); const auto& firstTime = waitForDelay.top().getSendTime(); const auto res = timeDifference(now, firstTime); @@ -104,8 +104,8 @@ int XskSocket::firstTimeout() return res; } -XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath) : - frameNum(frameNum_), ifName(ifName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared>>()) +XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, const std::string& xskMapPath) : + frameNum(frameNum_), ifName(std::move(ifName_)), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared>>()) { if (!isPowOfTwo(frameNum_) || !isPowOfTwo(frameSize) || !isPowOfTwo(fqCapacity) || !isPowOfTwo(cqCapacity) || !isPowOfTwo(rxCapacity) || !isPowOfTwo(txCapacity)) { @@ -118,7 +118,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu memset(&tx, 0, sizeof(tx)); memset(&rx, 0, sizeof(rx)); - xsk_umem_config umemCfg; + xsk_umem_config umemCfg{}; umemCfg.fill_size = fqCapacity; umemCfg.comp_size = cqCapacity; umemCfg.frame_size = frameSize; @@ -127,7 +127,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu umem.umemInit(frameNum_ * frameSize, &cq, &fq, &umemCfg); { - xsk_socket_config socketCfg; + xsk_socket_config socketCfg{}; socketCfg.rx_size = rxCapacity; socketCfg.tx_size = txCapacity; socketCfg.bind_flags = XDP_USE_NEED_WAKEUP; @@ -170,13 +170,12 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu } auto ret = bpf_map_update_elem(xskMapFd.getHandle(), &queue_id, &xskfd, 0); - if (ret) { + if (ret != 0) { throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath + "': " + std::to_string(ret)); } } // see xdp.h in contrib/ - struct IPv4AndPort { uint32_t addr; @@ -188,16 +187,19 @@ struct IPv6AndPort uint16_t port; }; -static void clearDestinationMap(bool v6) +static FDWrapper getDestinationMap(const std::string& mapPath) { - const std::string mapPath = !v6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6"; - - const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str())); + auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str())); if (destMapFd.getHandle() < 0) { throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'"); } + return destMapFd; +} - if (!v6) { +void XskSocket::clearDestinationMap(const std::string& mapPath, bool isV6) +{ + auto destMapFd = getDestinationMap(mapPath); + if (!isV6) { IPv4AndPort prevKey{}; IPv4AndPort key{}; while (bpf_map_get_next_key(destMapFd.getHandle(), &prevKey, &key) == 0) { @@ -215,33 +217,16 @@ static void clearDestinationMap(bool v6) } } -void XskSocket::clearDestinationAddresses() +void XskSocket::addDestinationAddress(const std::string& mapPath, const ComboAddress& destination) { - clearDestinationMap(false); - clearDestinationMap(true); -} - -void XskSocket::addDestinationAddress(const ComboAddress& destination) -{ - // see xdp.h in contrib/ - - const std::string mapPath = destination.isIPv4() ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6"; - //if (!s_destinationAddressesMap) { - // throw std::runtime_error("The path of the XSK (AF_XDP) destination addresses map has not been set! Please consider using setXSKDestinationAddressesMapPath()."); - //} - - const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str())); - if (destMapFd.getHandle() < 0) { - throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'"); - } - + auto destMapFd = getDestinationMap(mapPath); bool value = true; if (destination.isIPv4()) { IPv4AndPort key{}; key.addr = destination.sin4.sin_addr.s_addr; key.port = destination.sin4.sin_port; auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0); - if (ret) { + if (ret != 0) { throw std::runtime_error("Error inserting into xsk_map '" + mapPath + "': " + std::to_string(ret)); } } @@ -250,24 +235,15 @@ void XskSocket::addDestinationAddress(const ComboAddress& destination) key.addr = destination.sin6.sin6_addr; key.port = destination.sin6.sin6_port; auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0); - if (ret) { + if (ret != 0) { throw std::runtime_error("Error inserting into XSK destination addresses map '" + mapPath + "': " + std::to_string(ret)); } } } -void XskSocket::removeDestinationAddress(const ComboAddress& destination) +void XskSocket::removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination) { - const std::string mapPath = destination.isIPv4() ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6"; - //if (!s_destinationAddressesMap) { - // throw std::runtime_error("The path of the XSK (AF_XDP) destination addresses map has not been set! Please consider using setXSKDestinationAddressesMapPath()."); - //} - - const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str())); - if (destMapFd.getHandle() < 0) { - throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'"); - } - + auto destMapFd = getDestinationMap(mapPath); if (destination.isIPv4()) { IPv4AndPort key{}; key.addr = destination.sin4.sin_addr.s_addr; @@ -334,7 +310,7 @@ int XskSocket::wait(int timeout) void XskSocket::send(std::vector& packets) { - while (packets.size() > 0) { + while (!packets.empty()) { auto packetSize = packets.size(); if (packetSize > std::numeric_limits::max()) { packetSize = std::numeric_limits::max(); @@ -375,6 +351,7 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou return res; } + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) const auto baseAddr = reinterpret_cast(umem.bufBase); uint32_t failed = 0; uint32_t processed = 0; @@ -382,6 +359,7 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou for (; processed < recvSize; processed++) { try { const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) XskPacket packet = XskPacket(reinterpret_cast(desc->addr + baseAddr), desc->len, frameSize); #ifdef DEBUG_UMEM checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); @@ -389,10 +367,10 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou if (!packet.parse(false)) { ++failed; - markAsFree(std::move(packet)); + markAsFree(packet); } else { - res.push_back(std::move(packet)); + res.push_back(packet); } } catch (const std::exception& exp) { @@ -409,7 +387,7 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou // which will only be made available again when pushed into the fill // queue xsk_ring_cons__release(&rx, processed); - if (failedCount) { + if (failedCount != nullptr) { *failedCount = failed; } @@ -418,11 +396,12 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCou void XskSocket::pickUpReadyPacket(std::vector& packets) { - timespec now; + timespec now{}; gettime(&now); while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top().getSendTime()) <= 0) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) auto& top = const_cast(waitForDelay.top()); - packets.push_back(std::move(top)); + packets.push_back(top); waitForDelay.pop(); } } @@ -461,10 +440,10 @@ void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue std::string XskSocket::getMetrics() const { - struct xdp_statistics stats; + xdp_statistics stats{}; socklen_t optlen = sizeof(stats); int err = getsockopt(xskFd(), SOL_XDP, XDP_STATISTICS, &stats, &optlen); - if (err) { + if (err != 0) { return ""; } if (optlen != sizeof(struct xdp_statistics)) { @@ -481,7 +460,7 @@ std::string XskSocket::getMetrics() const return ret.str(); } -void XskSocket::markAsFree(XskPacket&& packet) +void XskSocket::markAsFree(const XskPacket& packet) { auto offset = frameOffset(packet); #ifdef DEBUG_UMEM @@ -493,10 +472,10 @@ void XskSocket::markAsFree(XskPacket&& packet) XskSocket::XskUmem::~XskUmem() { - if (umem) { + if (umem != nullptr) { xsk_umem__delete(umem); } - if (bufBase) { + if (bufBase != nullptr) { munmap(bufBase, size); } } @@ -536,39 +515,49 @@ void XskPacket::setEthernetHeader(const ethhdr& ethHeader) noexcept [[nodiscard]] iphdr XskPacket::getIPv4Header() const noexcept { iphdr ipv4Header{}; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv4Header))); assert(!v6); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(&ipv4Header, frame + sizeof(ethhdr), sizeof(ipv4Header)); return ipv4Header; } void XskPacket::setIPv4Header(const iphdr& ipv4Header) noexcept { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) assert(frameLength >= (sizeof(ethhdr) + sizeof(iphdr))); assert(!v6); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(frame + sizeof(ethhdr), &ipv4Header, sizeof(ipv4Header)); } [[nodiscard]] ipv6hdr XskPacket::getIPv6Header() const noexcept { ipv6hdr ipv6Header{}; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header))); assert(v6); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(&ipv6Header, frame + sizeof(ethhdr), sizeof(ipv6Header)); return ipv6Header; } void XskPacket::setIPv6Header(const ipv6hdr& ipv6Header) noexcept { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header))); assert(v6); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(frame + sizeof(ethhdr), &ipv6Header, sizeof(ipv6Header)); } [[nodiscard]] udphdr XskPacket::getUDPHeader() const noexcept { udphdr udpHeader{}; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader))); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(&udpHeader, frame + getL4HeaderOffset(), sizeof(udpHeader)); return udpHeader; } @@ -600,7 +589,9 @@ bool XskPacket::parse(bool fromSetHeader) // check ip.check == ipv4Checksum() is not needed! // We check it in BPF program // we don't, actually. + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) from = makeComboAddressFromRaw(4, reinterpret_cast(&ipHeader.saddr), sizeof(ipHeader.saddr)); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) to = makeComboAddressFromRaw(4, reinterpret_cast(&ipHeader.daddr), sizeof(ipHeader.daddr)); l4Protocol = ipHeader.protocol; if (!fromSetHeader && (frameLength - sizeof(ethhdr)) != ntohs(ipHeader.tot_len)) { @@ -614,7 +605,9 @@ bool XskPacket::parse(bool fromSetHeader) } v6 = true; auto ipHeader = getIPv6Header(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) from = makeComboAddressFromRaw(6, reinterpret_cast(&ipHeader.saddr), sizeof(ipHeader.saddr)); + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) to = makeComboAddressFromRaw(6, reinterpret_cast(&ipHeader.daddr), sizeof(ipHeader.daddr)); l4Protocol = ipHeader.nexthdr; if (!fromSetHeader && (frameLength - (sizeof(ethhdr) + sizeof(ipv6hdr))) != ntohs(ipHeader.payload_len)) { @@ -668,12 +661,12 @@ void XskPacket::changeDirectAndUpdateChecksum() noexcept { auto ethHeader = getEthernetHeader(); { - uint8_t tmp[ETH_ALEN]; - static_assert(sizeof(tmp) == sizeof(ethHeader.h_dest), "Size Error"); - static_assert(sizeof(tmp) == sizeof(ethHeader.h_source), "Size Error"); - memcpy(tmp, ethHeader.h_dest, sizeof(tmp)); - memcpy(ethHeader.h_dest, ethHeader.h_source, sizeof(tmp)); - memcpy(ethHeader.h_source, tmp, sizeof(tmp)); + std::array tmp; + static_assert(tmp.size() == sizeof(ethHeader.h_dest), "Size Error"); + static_assert(tmp.size() == sizeof(ethHeader.h_source), "Size Error"); + memcpy(tmp.data(), ethHeader.h_dest, tmp.size()); + memcpy(ethHeader.h_dest, ethHeader.h_source, tmp.size()); + memcpy(ethHeader.h_source, tmp.data(), tmp.size()); } if (ethHeader.h_proto == htons(ETH_P_IPV6)) { // IPV6 @@ -750,6 +743,7 @@ PacketBuffer XskPacket::clonePacketBuffer() const { const auto size = getDataSize(); PacketBuffer tmp(size); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(tmp.data(), frame + getDataOffset(), size); return tmp; } @@ -758,6 +752,7 @@ void XskPacket::cloneIntoPacketBuffer(PacketBuffer& buffer) const { const auto size = getDataSize(); buffer.resize(size); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(buffer.data(), frame + getDataOffset(), size); } @@ -769,7 +764,9 @@ bool XskPacket::setPayload(const PacketBuffer& buf) return false; } flags |= UPDATE; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) memcpy(frame + getDataOffset(), buf.data(), bufSize); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) frameLength = getDataOffset() + bufSize; return true; } @@ -777,14 +774,14 @@ bool XskPacket::setPayload(const PacketBuffer& buf) void XskPacket::addDelay(const int relativeMilliseconds) noexcept { gettime(&sendTime); - sendTime.tv_nsec += static_cast(relativeMilliseconds) * 1000000L; + sendTime.tv_nsec += static_cast(relativeMilliseconds) * 1000000L; sendTime.tv_sec += sendTime.tv_nsec / 1000000000L; sendTime.tv_nsec %= 1000000000L; } -bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept +bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept { - return s1.getSendTime() < s2.getSendTime(); + return lhs.getSendTime() < rhs.getSendTime(); } const ComboAddress& XskPacket::getFromAddr() const noexcept @@ -797,11 +794,11 @@ const ComboAddress& XskPacket::getToAddr() const noexcept return to; } -void XskWorker::notify(int fd) +void XskWorker::notify(int desc) { uint64_t value = 1; ssize_t res = 0; - while ((res = write(fd, &value, sizeof(value))) == EINTR) { + while ((res = write(desc, &value, sizeof(value))) == EINTR) { } if (res != sizeof(value)) { throw runtime_error("Unable Wake Up XskSocket Failed"); @@ -813,38 +810,39 @@ XskWorker::XskWorker() : { } -void XskWorker::pushToProcessingQueue(XskPacket&& packet) +void XskWorker::pushToProcessingQueue(XskPacket& packet) { #if defined(__SANITIZE_THREAD__) - if (!incomingPacketsQueue.lock()->push(std::move(packet))) { + if (!incomingPacketsQueue.lock()->push(packet)) { #else - if (!incomingPacketsQueue.push(std::move(packet))) { + if (!incomingPacketsQueue.push(packet)) { #endif - markAsFree(std::move(packet)); + markAsFree(packet); } } -void XskWorker::pushToSendQueue(XskPacket&& packet) +void XskWorker::pushToSendQueue(XskPacket& packet) { #if defined(__SANITIZE_THREAD__) - if (!outgoingPacketsQueue.lock()->push(std::move(packet))) { + if (!outgoingPacketsQueue.lock()->push(packet)) { #else - if (!outgoingPacketsQueue.push(std::move(packet))) { + if (!outgoingPacketsQueue.push(packet)) { #endif - markAsFree(std::move(packet)); + markAsFree(packet); } } const void* XskPacket::getPayloadData() const { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) return frame + getDataOffset(); } void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept { auto ethHeader = getEthernetHeader(); - memcpy(ethHeader.h_dest, &toMAC[0], sizeof(MACAddr)); - memcpy(ethHeader.h_source, &fromMAC[0], sizeof(MACAddr)); + memcpy(ethHeader.h_dest, toMAC.data(), toMAC.size()); + memcpy(ethHeader.h_source, fromMAC.data(), fromMAC.size()); setEthernetHeader(ethHeader); to = to_; from = from_; @@ -901,17 +899,18 @@ void XskPacket::rewrite() noexcept setEthernetHeader(ethHeader); } -[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ip) noexcept +[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ipHeader) noexcept { - auto partial = ip_checksum_partial(ip, sizeof(iphdr), 0); + auto partial = ip_checksum_partial(ipHeader, sizeof(iphdr), 0); return ip_checksum_fold(partial); } -[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ip) const noexcept +[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ipHeader) const noexcept { // ip options is not supported !!! const auto l4Length = static_cast(getDataSize() + sizeof(udphdr)); - auto sum = tcp_udp_v4_header_checksum_partial(ip->saddr, ip->daddr, ip->protocol, l4Length); + auto sum = tcp_udp_v4_header_checksum_partial(ipHeader->saddr, ipHeader->daddr, ipHeader->protocol, l4Length); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum); return ip_checksum_fold(sum); } @@ -920,6 +919,7 @@ void XskPacket::rewrite() noexcept { const auto l4Length = static_cast(getDataSize() + sizeof(udphdr)); uint64_t sum = tcp_udp_v6_header_checksum_partial(&ipv6->saddr, &ipv6->daddr, ipv6->nexthdr, l4Length); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum); return ip_checksum_fold(sum); } @@ -930,21 +930,24 @@ void XskPacket::rewrite() noexcept /* Main loop: 32 bits at a time */ for (position = 0; position < len; position += sizeof(uint32_t)) { uint32_t value{}; - memcpy(&value, reinterpret_cast(ptr) + position, sizeof(value)); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + memcpy(&value, static_cast(ptr) + position, sizeof(value)); sum += value; } /* Handle un-32bit-aligned trailing bytes */ if ((len - position) >= 2) { uint16_t value{}; - memcpy(&value, reinterpret_cast(ptr) + position, sizeof(value)); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + memcpy(&value, static_cast(ptr) + position, sizeof(value)); sum += value; position += sizeof(value); } if ((len - position) > 0) { - const auto* p8 = static_cast(ptr) + position; - sum += ntohs(*p8 << 8); /* RFC says pad last byte */ + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + const auto* ptr8 = static_cast(ptr) + position; + sum += ntohs(*ptr8 << 8); /* RFC says pad last byte */ } return sum; @@ -952,10 +955,10 @@ void XskPacket::rewrite() noexcept [[nodiscard]] __be16 XskPacket::ip_checksum_fold(uint64_t sum) noexcept { - while (sum & ~0xffffffffULL) { + while ((sum & ~0xffffffffULL) != 0U) { sum = (sum >> 32) + (sum & 0xffffffffULL); } - while (sum & 0xffff0000ULL) { + while ((sum & 0xffff0000ULL) != 0U) { sum = (sum >> 16) + (sum & 0xffffULL); } @@ -963,9 +966,12 @@ void XskPacket::rewrite() noexcept } #ifndef __packed -#define __packed __attribute__((packed)) +#define packed_attribute __attribute__((packed)) +#else +#define packed_attribute __packed #endif +// NOLINTNEXTLINE(bugprone-easily-swappable-parameters) [[nodiscard]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept { struct header @@ -982,7 +988,8 @@ void XskPacket::rewrite() noexcept /* We use a union here to avoid aliasing issues with gcc -O2 */ union { - header __packed fields; + header packed_attribute fields; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) uint32_t words[3]; }; }; @@ -1005,6 +1012,7 @@ void XskPacket::rewrite() noexcept struct in6_addr src_ip; struct in6_addr dst_ip; __be32 length; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) __uint8_t mbz[3]; __uint8_t next_header; }; @@ -1014,7 +1022,8 @@ void XskPacket::rewrite() noexcept /* We use a union here to avoid aliasing issues with gcc -O2 */ union { - header __packed fields; + header packed_attribute fields; + // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays) uint32_t words[10]; }; }; @@ -1025,6 +1034,7 @@ void XskPacket::rewrite() noexcept pseudo_header.fields.src_ip = *src_ip; pseudo_header.fields.dst_ip = *dst_ip; pseudo_header.fields.length = htonl(len); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay) memset(pseudo_header.fields.mbz, 0, sizeof(pseudo_header.fields.mbz)); pseudo_header.fields.next_header = protocol; return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0); @@ -1051,19 +1061,19 @@ PacketBuffer XskPacket::cloneHeadertoPacketBuffer() const int XskWorker::createEventfd() { - auto fd = ::eventfd(0, EFD_CLOEXEC); - if (fd < 0) { + auto desc = ::eventfd(0, EFD_CLOEXEC); + if (desc < 0) { throw runtime_error("Unable create eventfd"); } - return fd; + return desc; } -void XskWorker::waitForXskSocket() noexcept +void XskWorker::waitForXskSocket() const noexcept { uint64_t x = read(workerWaker, &x, sizeof(x)); } -void XskWorker::notifyXskSocket() noexcept +void XskWorker::notifyXskSocket() const { notify(xskSocketWaker); } @@ -1107,8 +1117,8 @@ void XskWorker::notifyWorker() noexcept void XskSocket::getMACFromIfName() { ifreq ifr{}; - auto fd = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0)); - if (fd < 0) { + auto desc = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0)); + if (desc < 0) { throw std::runtime_error("Error creating a socket to get the MAC address of interface " + ifName); } @@ -1117,27 +1127,27 @@ void XskSocket::getMACFromIfName() } strncpy(ifr.ifr_name, ifName.c_str(), ifName.length() + 1); - if (ioctl(fd.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) { + if (ioctl(desc.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) { throw std::runtime_error("Error getting MAC address for interface " + ifName); } static_assert(sizeof(ifr.ifr_hwaddr.sa_data) >= std::tuple_size{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected"); memcpy(source.data(), ifr.ifr_hwaddr.sa_data, source.size()); } -[[nodiscard]] int XskSocket::timeDifference(const timespec& t1, const timespec& t2) noexcept +[[nodiscard]] int XskSocket::timeDifference(const timespec& lhs, const timespec& rhs) noexcept { - const auto res = t1.tv_sec * 1000 + t1.tv_nsec / 1000000L - (t2.tv_sec * 1000 + t2.tv_nsec / 1000000L); + const auto res = lhs.tv_sec * 1000 + lhs.tv_nsec / 1000000L - (rhs.tv_sec * 1000 + rhs.tv_nsec / 1000000L); return static_cast(res); } -void XskWorker::cleanWorkerNotification() noexcept +void XskWorker::cleanWorkerNotification() const noexcept { - uint64_t x = read(xskSocketWaker, &x, sizeof(x)); + uint64_t value = read(xskSocketWaker, &value, sizeof(value)); } -void XskWorker::cleanSocketNotification() noexcept +void XskWorker::cleanSocketNotification() const noexcept { - uint64_t x = read(workerWaker, &x, sizeof(x)); + uint64_t value = read(workerWaker, &value, sizeof(value)); } std::vector getPollFdsForWorker(XskWorker& info) @@ -1175,18 +1185,20 @@ std::optional XskWorker::getEmptyFrame() if (!uniqueEmptyFrameOffset.empty()) { auto offset = uniqueEmptyFrameOffset.back(); uniqueEmptyFrameOffset.pop_back(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) return XskPacket(offset + umemBufBase, 0, frameSize); } fillUniqueEmptyOffset(); if (!uniqueEmptyFrameOffset.empty()) { auto offset = uniqueEmptyFrameOffset.back(); uniqueEmptyFrameOffset.pop_back(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) return XskPacket(offset + umemBufBase, 0, frameSize); } return std::nullopt; } -void XskWorker::markAsFree(XskPacket&& packet) +void XskWorker::markAsFree(const XskPacket& packet) { auto offset = frameOffset(packet); #ifdef DEBUG_UMEM @@ -1202,10 +1214,10 @@ uint32_t XskPacket::getFlags() const noexcept void XskPacket::updatePacket() noexcept { - if (!(flags & UPDATE)) { + if ((flags & UPDATE) == 0U) { return; } - if (!(flags & REWRITE)) { + if ((flags & REWRITE) == 0U) { changeDirectAndUpdateChecksum(); } } diff --git a/pdns/xsk.hh b/pdns/xsk.hh index 702855a473..a2cc6f036b 100644 --- a/pdns/xsk.hh +++ b/pdns/xsk.hh @@ -54,7 +54,7 @@ class XskPacket; class XskWorker; class XskSocket; -using MACAddr = std::array; +using MACAddr = std::array; #ifdef HAVE_XSK using XskPacketPtr = std::unique_ptr; @@ -76,7 +76,7 @@ class XskSocket xsk_umem* umem{nullptr}; uint8_t* bufBase{nullptr}; size_t size{0}; - void umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config); + void umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config); ~XskUmem(); XskUmem() = default; }; @@ -94,7 +94,7 @@ class XskSocket const size_t frameNum; // responses that have been delayed std::priority_queue waitForDelay; - MACAddr source; + MACAddr source{}; const std::string ifName; // AF_XDP socket then worker waker sockets vector fds; @@ -104,13 +104,13 @@ class XskSocket // simply recycled from cq after being processed by the kernel vector uniqueEmptyFrameOffset; // completion ring: queue where sent packets are stored by the kernel - xsk_ring_cons cq; + xsk_ring_cons cq{}; // rx ring: queue where the incoming packets are stored, read by XskRouter - xsk_ring_cons rx; + xsk_ring_cons rx{}; // fill ring: queue where umem entries available to be filled (put into rx) are stored - xsk_ring_prod fq; + xsk_ring_prod fq{}; // tx ring: queue where outgoing packets are stored - xsk_ring_prod tx; + xsk_ring_prod tx{}; std::unique_ptr socket; XskUmem umem; @@ -127,16 +127,16 @@ class XskSocket void getMACFromIfName(); public: - static void clearDestinationAddresses(); - static void addDestinationAddress(const ComboAddress& destination); - static void removeDestinationAddress(const ComboAddress& destination); + static void clearDestinationMap(const std::string& mapPath, bool isV6); + static void addDestinationAddress(const std::string& mapPath, const ComboAddress& destination); + static void removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination); static constexpr size_t getFrameSize() { return frameSize; } // list of free umem entries that can be reused std::shared_ptr>> sharedEmptyFrameOffset; - XskSocket(size_t frameNum, const std::string& ifName, uint32_t queue_id, const std::string& xskMapPath); + XskSocket(size_t frameNum, std::string ifName, uint32_t queue_id, const std::string& xskMapPath); [[nodiscard]] int xskFd() const noexcept; // wait until one event has occurred [[nodiscard]] int wait(int timeout); @@ -144,11 +144,11 @@ public: void send(std::vector& packets); // look at incoming packets in rx, return them if parsing succeeeded [[nodiscard]] std::vector recv(uint32_t recvSizeMax, uint32_t* failedCount); - void addWorker(std::shared_ptr s); + void addWorker(std::shared_ptr worker); void addWorkerRoute(const std::shared_ptr& worker, const ComboAddress& dest); void removeWorkerRoute(const ComboAddress& dest); [[nodiscard]] std::string getMetrics() const; - void markAsFree(XskPacket&& packet); + void markAsFree(const XskPacket& packet); [[nodiscard]] const std::shared_ptr& getWorkerByDescriptor(int desc) const { return d_workers.at(desc); @@ -181,9 +181,9 @@ public: void recycle(size_t size) noexcept; // look at delayed packets, and send the ones that are ready void pickUpReadyPacket(std::vector& packets); - void pushDelayed(XskPacket&& packet) + void pushDelayed(XskPacket& packet) { - waitForDelay.push(std::move(packet)); + waitForDelay.push(packet); } }; @@ -203,7 +203,7 @@ public: private: ComboAddress from; ComboAddress to; - timespec sendTime; + timespec sendTime{}; uint8_t* frame{nullptr}; size_t frameLength{0}; size_t frameSize{0}; @@ -312,16 +312,16 @@ public: static int createEventfd(); static void notify(int fd); static std::shared_ptr create(); - void pushToProcessingQueue(XskPacket&& packet); - void pushToSendQueue(XskPacket&& packet); - void markAsFree(XskPacket&& packet); + void pushToProcessingQueue(XskPacket& packet); + void pushToSendQueue(XskPacket& packet); + void markAsFree(const XskPacket& packet); // notify worker that at least one packet is available for processing void notifyWorker() noexcept; // notify the router that packets are ready to be sent - void notifyXskSocket() noexcept; - void waitForXskSocket() noexcept; - void cleanWorkerNotification() noexcept; - void cleanSocketNotification() noexcept; + void notifyXskSocket() const; + void waitForXskSocket() const noexcept; + void cleanWorkerNotification() const noexcept; + void cleanSocketNotification() const noexcept; [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept; // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset void fillUniqueEmptyOffset();