From: Remi Gacogne Date: Fri, 19 Jan 2024 16:22:57 +0000 (+0100) Subject: dnsdist: Refactor XSK support between dnsdist and the backends X-Git-Tag: dnsdist-1.9.0-rc1^2~15 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=db96b412c8563fd2a5d50670b868bd33a7a54bb6;p=thirdparty%2Fpdns.git dnsdist: Refactor XSK support between dnsdist and the backends --- diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 8d931a4c1f..e22a83cd68 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -310,7 +310,7 @@ static bool checkConfigurationTime(const std::string& name) // NOLINTNEXTLINE(readability-function-cognitive-complexity): this function declares Lua bindings, even with a good refactoring it will likely blow up the threshold static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) { - typedef LuaAssociativeTable, std::shared_ptr, DownstreamState::checkfunc_t>> newserver_t; + using newserver_t = LuaAssociativeTable, LuaArray>, DownstreamState::checkfunc_t>>; luaCtx.writeFunction("inClientStartup", [client]() { return client && !g_configurationDone; }); @@ -631,12 +631,16 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) // create but don't connect the socket in client or check-config modes auto ret = std::make_shared(std::move(config), std::move(tlsCtx), !(client || configCheck)); #ifdef HAVE_XSK - std::shared_ptr xskSocket; - if (getOptionalValue>(vars, "xskSocket", xskSocket) > 0) { + LuaArray> luaXskSockets; + if (getOptionalValue>>(vars, "xskSockets", luaXskSockets) > 0 && !luaXskSockets.empty()) { if (g_configurationDone) { throw std::runtime_error("Adding a server with xsk at runtime is not supported"); } - ret->registerXsk(xskSocket); + std::vector> xskSockets; + for (auto& socket : luaXskSockets) { + xskSockets.push_back(socket.second); + } + ret->registerXsk(xskSockets); std::string mac; if (getOptionalValue(vars, "MACAddr", mac) > 0) { auto* addr = &ret->d_config.destMACAddr[0]; @@ -649,15 +653,15 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) } memcpy(ret->d_config.destMACAddr.data(), mac.data(), ret->d_config.destMACAddr.size()); } - infolog("Added downstream server %s via XSK in %s mode", ret->d_config.remote.toStringWithPort(), xskSocket->getXDPMode()); + infolog("Added downstream server %s via XSK in %s mode", ret->d_config.remote.toStringWithPort(), xskSockets.at(0)->getXDPMode()); } else if (!(client || configCheck)) { infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort()); } #else /* HAVE_XSK */ - if (!(client || configCheck)) { - infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort()); - } + if (!(client || configCheck)) { + infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort()); + } #endif /* HAVE_XSK */ if (autoUpgrade && ret->getProtocol() != dnsdist::Protocol::DoT && ret->getProtocol() != dnsdist::Protocol::DoH) { dnsdist::ServiceDiscovery::addUpgradeableServer(ret, upgradeInterval, upgradePool, upgradeDoHKey, keepAfterUpgrade); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index 6f4bca6c68..32521e7fbe 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -370,7 +370,7 @@ bool responseContentMatches(const PacketBuffer& response, const DNSName& qname, } catch (const std::exception& e) { if (remote && response.size() > 0 && static_cast(response.size()) > sizeof(dnsheader)) { - vinfolog("Backend %s sent us a response with id %d that did not parse: %s", remote->d_config.remote.toStringWithPort(), ntohs(dh->id), e.what()); + infolog("Backend %s sent us a response with id %d that did not parse: %s", remote->d_config.remote.toStringWithPort(), ntohs(dh->id), e.what()); } ++dnsdist::metrics::g_stats.nonCompliantResponses; if (remote) { @@ -881,7 +881,8 @@ void responderThread(std::shared_ptr dss) continue; } xskPacket->setHeader(ids->xskPacketHeader); - xskPacket->setPayload(response); + if (!xskPacket->setPayload(response)) { + } xskPacket->updatePacket(); xskInfo->pushToSendQueue(*xskPacket); xskInfo->notifyXskSocket(); @@ -1983,14 +1984,13 @@ bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) return false; } -#ifdef HAVE_XSK - if (!ss->xskInfo) { + if (ss->d_xskInfos.empty()) { assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true); return false; } else { - int fd = ss->xskInfo->workerWaker; - ids.backendFD = fd; + const auto& xskInfo = ss->pickWorkerForSending(); + ids.backendFD = xskInfo->workerWaker; assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, false); auto sourceAddr = ss->pickSourceAddressForSending(); packet.setAddr(sourceAddr, ss->d_config.sourceMACAddr, ss->d_config.remote, ss->d_config.destMACAddr); @@ -1998,10 +1998,6 @@ bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) packet.rewrite(); return true; } -#else /* HAVE_XSK */ - assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true); - return false; -#endif /* HAVE_XSK */ } catch (const std::exception& e) { vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what()); @@ -2624,11 +2620,6 @@ static void setupLocalSocket(ClientState& clientState, const ComboAddress& addr, else if (clientState.dnscryptCtx != nullptr) { infolog("Listening on %s for DNSCrypt", addr.toStringWithPort()); } -#ifdef HAVE_XSK - else if (clientState.xskInfo != nullptr) { - infolog("Listening on %s (XSK-enabled)", addr.toStringWithPort()); - } -#endif else { infolog("Listening on %s", addr.toStringWithPort()); } @@ -2638,6 +2629,11 @@ static void setupLocalSocket(ClientState& clientState, const ComboAddress& addr, } else if (clientState.doh3Frontend != nullptr) { infolog("Listening on %s for DoH3", addr.toStringWithPort()); } +#ifdef HAVE_XSK + else if (clientState.xskInfo != nullptr) { + infolog("Listening on %s (XSK-enabled)", addr.toStringWithPort()); + } +#endif } } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 32fdf9561f..52019d56e2 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -825,8 +825,8 @@ public: StopWatch sw; QPSLimiter qps; #ifdef HAVE_XSK - std::shared_ptr xskInfo{nullptr}; - std::shared_ptr d_xskSocket{nullptr}; + std::vector> d_xskInfos; + std::vector> d_xskSockets; #endif std::atomic idOffset{0}; size_t socketsOffset{0}; @@ -993,8 +993,9 @@ public: std::optional getState(uint16_t id); #ifdef HAVE_XSK - void registerXsk(std::shared_ptr& xsk); + void registerXsk(std::vector>& xsks); [[nodiscard]] ComboAddress pickSourceAddressForSending(); + [[nodiscard]] const std::shared_ptr& pickWorkerForSending(); #endif /* HAVE_XSK */ dnsdist::Protocol getProtocol() const diff --git a/pdns/dnsdistdist/dnsdist-backend.cc b/pdns/dnsdistdist/dnsdist-backend.cc index 0138c6e398..966b3ff9c2 100644 --- a/pdns/dnsdistdist/dnsdist-backend.cc +++ b/pdns/dnsdistdist/dnsdist-backend.cc @@ -55,7 +55,9 @@ void DownstreamState::addXSKDestination(int fd) addresses->push_back(local); } dnsdist::xsk::addDestinationAddress(local); - d_xskSocket->addWorkerRoute(xskInfo, local); + for (size_t idx = 0; idx < d_xskSockets.size(); idx++) { + d_xskSockets.at(idx)->addWorkerRoute(d_xskInfos.at(idx), local); + } } void DownstreamState::removeXSKDestination(int fd) @@ -67,7 +69,9 @@ void DownstreamState::removeXSKDestination(int fd) } dnsdist::xsk::removeDestinationAddress(local); - d_xskSocket->removeWorkerRoute(local); + for (auto& xskSocket : d_xskSockets) { + xskSocket->removeWorkerRoute(local); + } } #endif /* HAVE_XSK */ @@ -85,7 +89,7 @@ bool DownstreamState::reconnect(bool initialAttempt) connected = false; #ifdef HAVE_XSK - if (xskInfo != nullptr) { + if (!d_xskInfos.empty()) { auto addresses = d_socketSourceAddresses.write_lock(); addresses->clear(); } @@ -97,7 +101,7 @@ bool DownstreamState::reconnect(bool initialAttempt) (*mplexer.lock())->removeReadFD(fd); } #ifdef HAVE_XSK - if (xskInfo != nullptr) { + if (d_xskInfos.empty()) { removeXSKDestination(fd); } #endif /* HAVE_XSK */ @@ -132,7 +136,7 @@ bool DownstreamState::reconnect(bool initialAttempt) (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {}); } #ifdef HAVE_XSK - if (xskInfo != nullptr) { + if (!d_xskInfos.empty()) { addXSKDestination(fd); } #endif /* HAVE_XSK */ @@ -150,7 +154,7 @@ bool DownstreamState::reconnect(bool initialAttempt) /* if at least one (re-)connection failed, close all sockets */ if (!connected) { #ifdef HAVE_XSK - if (xskInfo != nullptr) { + if (!d_xskInfos.empty()) { auto addresses = d_socketSourceAddresses.write_lock(); addresses->clear(); } @@ -158,7 +162,7 @@ bool DownstreamState::reconnect(bool initialAttempt) for (auto& fd : sockets) { if (fd != -1) { #ifdef HAVE_XSK - if (xskInfo != nullptr) { + if (!d_xskInfos.empty()) { removeXSKDestination(fd); } #endif /* HAVE_XSK */ @@ -329,8 +333,8 @@ void DownstreamState::start() { if (connected && !threadStarted.test_and_set()) { #ifdef HAVE_XSK - if (xskInfo != nullptr) { - auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this()); + for (auto& xskInfo : d_xskInfos) { + auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this(), xskInfo); if (!d_config.d_cpus.empty()) { mapThreadToCPUList(xskResponderThread.native_handle(), d_config.d_cpus); } @@ -881,12 +885,22 @@ void DownstreamState::submitHealthCheckResult(bool initial, bool newResult) return (*addresses)[idx % numberOfAddresses]; } -void DownstreamState::registerXsk(std::shared_ptr& xsk) +[[nodiscard]] const std::shared_ptr& DownstreamState::pickWorkerForSending() +{ + auto numberOfWorkers = d_xskInfos.size(); + if (numberOfWorkers == 0) { + throw std::runtime_error("No XSK worker available for sending XSK data to backend " + getNameWithAddr()); + } + size_t idx = dnsdist::getRandomValue(numberOfWorkers); + return d_xskInfos[idx % numberOfWorkers]; +} + +void DownstreamState::registerXsk(std::vector>& xsks) { - d_xskSocket = xsk; + d_xskSockets = xsks; if (d_config.sourceAddr.sin4.sin_family == 0 || (IsAnyAddress(d_config.sourceAddr))) { - const auto& ifName = xsk->getInterfaceName(); + const auto& ifName = xsks.at(0)->getInterfaceName(); auto addresses = getListOfAddressesOfNetworkInterface(ifName); if (addresses.empty()) { throw std::runtime_error("Unable to get source address from interface " + ifName); @@ -897,11 +911,15 @@ void DownstreamState::registerXsk(std::shared_ptr& xsk) } d_config.sourceAddr = addresses.at(0); } - xskInfo = XskWorker::create(); - xsk->addWorker(xskInfo); + d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress(); + + for (auto& xsk : d_xskSockets) { + auto xskInfo = XskWorker::create(); + d_xskInfos.push_back(xskInfo); + xsk->addWorker(xskInfo); + xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset; + } reconnect(false); - d_config.sourceMACAddr = xsk->getSourceMACAddress(); - xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset; } #endif /* HAVE_XSK */ diff --git a/pdns/dnsdistdist/dnsdist-xsk.cc b/pdns/dnsdistdist/dnsdist-xsk.cc index 81480a99a1..2996683c95 100644 --- a/pdns/dnsdistdist/dnsdist-xsk.cc +++ b/pdns/dnsdistdist/dnsdist-xsk.cc @@ -35,17 +35,12 @@ namespace dnsdist::xsk { std::vector> g_xsk; -void XskResponderThread(std::shared_ptr dss) +void XskResponderThread(std::shared_ptr dss, std::shared_ptr xskInfo) { - if (dss->xskInfo == nullptr) { - throw std::runtime_error("Starting XSK responder thread for a backend without XSK!"); - } - try { setThreadName("dnsdist/XskResp"); auto localRespRuleActions = g_respruleactions.getLocal(); auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal(); - auto xskInfo = dss->xskInfo; auto pollfds = getPollFdsForWorker(*xskInfo); const auto xskFd = xskInfo->workerWaker.getHandle(); while (!dss->isStopped()) { @@ -66,7 +61,8 @@ void XskResponderThread(std::shared_ptr dss) const auto queryId = dnsHeader->id; auto ids = dss->getState(queryId); if (ids) { - if (xskFd != ids->backendFD || !ids->isXSK()) { + if (!ids->isXSK()) { + // if (xskFd != ids->backendFD || !ids->isXSK()) { dss->restoreState(queryId, std::move(*ids)); ids = std::nullopt; } @@ -82,19 +78,19 @@ void XskResponderThread(std::shared_ptr dss) } if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { xskInfo->markAsFree(packet); - vinfolog("XSK packet pushed to queue because processResponderPacket failed"); + infolog("XSK packet pushed to queue because processResponderPacket failed"); return; } if (response.size() > packet.getCapacity()) { /* fallback to sending the packet via normal socket */ sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); - vinfolog("XSK packet falling back because packet is too large"); + infolog("XSK packet falling back because packet is too large"); xskInfo->markAsFree(packet); return; } packet.setHeader(ids->xskPacketHeader); if (!packet.setPayload(response)) { - vinfolog("Unable to set XSK payload !"); + infolog("Unable to set XSK payload !"); } if (ids->delayMsec > 0) { packet.addDelay(ids->delayMsec); diff --git a/pdns/dnsdistdist/dnsdist-xsk.hh b/pdns/dnsdistdist/dnsdist-xsk.hh index 6a862d75b4..f677b78604 100644 --- a/pdns/dnsdistdist/dnsdist-xsk.hh +++ b/pdns/dnsdistdist/dnsdist-xsk.hh @@ -32,7 +32,7 @@ class XskWorker; namespace dnsdist::xsk { -void XskResponderThread(std::shared_ptr dss); +void XskResponderThread(std::shared_ptr dss, std::shared_ptr xskInfo); bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol); bool XskProcessQuery(ClientState& clientState, LocalHolders& holders, XskPacket& packet); void XskRouter(std::shared_ptr xsk);