// NOLINTNEXTLINE(readability-function-cognitive-complexity): this function declares Lua bindings, even with a good refactoring it will likely blow up the threshold
static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
{
- typedef LuaAssociativeTable<boost::variant<bool, std::string, LuaArray<std::string>, std::shared_ptr<XskSocket>, DownstreamState::checkfunc_t>> newserver_t;
+ using newserver_t = LuaAssociativeTable<boost::variant<bool, std::string, LuaArray<std::string>, LuaArray<std::shared_ptr<XskSocket>>, DownstreamState::checkfunc_t>>;
luaCtx.writeFunction("inClientStartup", [client]() {
return client && !g_configurationDone;
});
// create but don't connect the socket in client or check-config modes
auto ret = std::make_shared<DownstreamState>(std::move(config), std::move(tlsCtx), !(client || configCheck));
#ifdef HAVE_XSK
- std::shared_ptr<XskSocket> xskSocket;
- if (getOptionalValue<std::shared_ptr<XskSocket>>(vars, "xskSocket", xskSocket) > 0) {
+ LuaArray<std::shared_ptr<XskSocket>> luaXskSockets;
+ if (getOptionalValue<LuaArray<std::shared_ptr<XskSocket>>>(vars, "xskSockets", luaXskSockets) > 0 && !luaXskSockets.empty()) {
if (g_configurationDone) {
throw std::runtime_error("Adding a server with xsk at runtime is not supported");
}
- ret->registerXsk(xskSocket);
+ std::vector<std::shared_ptr<XskSocket>> xskSockets;
+ for (auto& socket : luaXskSockets) {
+ xskSockets.push_back(socket.second);
+ }
+ ret->registerXsk(xskSockets);
std::string mac;
if (getOptionalValue<std::string>(vars, "MACAddr", mac) > 0) {
auto* addr = &ret->d_config.destMACAddr[0];
}
memcpy(ret->d_config.destMACAddr.data(), mac.data(), ret->d_config.destMACAddr.size());
}
- infolog("Added downstream server %s via XSK in %s mode", ret->d_config.remote.toStringWithPort(), xskSocket->getXDPMode());
+ infolog("Added downstream server %s via XSK in %s mode", ret->d_config.remote.toStringWithPort(), xskSockets.at(0)->getXDPMode());
}
else if (!(client || configCheck)) {
infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort());
}
#else /* HAVE_XSK */
- if (!(client || configCheck)) {
- infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort());
- }
+ if (!(client || configCheck)) {
+ infolog("Added downstream server %s", ret->d_config.remote.toStringWithPort());
+ }
#endif /* HAVE_XSK */
if (autoUpgrade && ret->getProtocol() != dnsdist::Protocol::DoT && ret->getProtocol() != dnsdist::Protocol::DoH) {
dnsdist::ServiceDiscovery::addUpgradeableServer(ret, upgradeInterval, upgradePool, upgradeDoHKey, keepAfterUpgrade);
}
catch (const std::exception& e) {
if (remote && response.size() > 0 && static_cast<size_t>(response.size()) > sizeof(dnsheader)) {
- vinfolog("Backend %s sent us a response with id %d that did not parse: %s", remote->d_config.remote.toStringWithPort(), ntohs(dh->id), e.what());
+ infolog("Backend %s sent us a response with id %d that did not parse: %s", remote->d_config.remote.toStringWithPort(), ntohs(dh->id), e.what());
}
++dnsdist::metrics::g_stats.nonCompliantResponses;
if (remote) {
continue;
}
xskPacket->setHeader(ids->xskPacketHeader);
- xskPacket->setPayload(response);
+ if (!xskPacket->setPayload(response)) {
+ }
xskPacket->updatePacket();
xskInfo->pushToSendQueue(*xskPacket);
xskInfo->notifyXskSocket();
return false;
}
-#ifdef HAVE_XSK
- if (!ss->xskInfo) {
+ if (ss->d_xskInfos.empty()) {
assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true);
return false;
}
else {
- int fd = ss->xskInfo->workerWaker;
- ids.backendFD = fd;
+ const auto& xskInfo = ss->pickWorkerForSending();
+ ids.backendFD = xskInfo->workerWaker;
assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, false);
auto sourceAddr = ss->pickSourceAddressForSending();
packet.setAddr(sourceAddr, ss->d_config.sourceMACAddr, ss->d_config.remote, ss->d_config.destMACAddr);
packet.rewrite();
return true;
}
-#else /* HAVE_XSK */
- assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true);
- return false;
-#endif /* HAVE_XSK */
}
catch (const std::exception& e) {
vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
else if (clientState.dnscryptCtx != nullptr) {
infolog("Listening on %s for DNSCrypt", addr.toStringWithPort());
}
-#ifdef HAVE_XSK
- else if (clientState.xskInfo != nullptr) {
- infolog("Listening on %s (XSK-enabled)", addr.toStringWithPort());
- }
-#endif
else {
infolog("Listening on %s", addr.toStringWithPort());
}
} else if (clientState.doh3Frontend != nullptr) {
infolog("Listening on %s for DoH3", addr.toStringWithPort());
}
+#ifdef HAVE_XSK
+ else if (clientState.xskInfo != nullptr) {
+ infolog("Listening on %s (XSK-enabled)", addr.toStringWithPort());
+ }
+#endif
}
}
StopWatch sw;
QPSLimiter qps;
#ifdef HAVE_XSK
- std::shared_ptr<XskWorker> xskInfo{nullptr};
- std::shared_ptr<XskSocket> d_xskSocket{nullptr};
+ std::vector<std::shared_ptr<XskWorker>> d_xskInfos;
+ std::vector<std::shared_ptr<XskSocket>> d_xskSockets;
#endif
std::atomic<uint64_t> idOffset{0};
size_t socketsOffset{0};
std::optional<InternalQueryState> getState(uint16_t id);
#ifdef HAVE_XSK
- void registerXsk(std::shared_ptr<XskSocket>& xsk);
+ void registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks);
[[nodiscard]] ComboAddress pickSourceAddressForSending();
+ [[nodiscard]] const std::shared_ptr<XskWorker>& pickWorkerForSending();
#endif /* HAVE_XSK */
dnsdist::Protocol getProtocol() const
addresses->push_back(local);
}
dnsdist::xsk::addDestinationAddress(local);
- d_xskSocket->addWorkerRoute(xskInfo, local);
+ for (size_t idx = 0; idx < d_xskSockets.size(); idx++) {
+ d_xskSockets.at(idx)->addWorkerRoute(d_xskInfos.at(idx), local);
+ }
}
void DownstreamState::removeXSKDestination(int fd)
}
dnsdist::xsk::removeDestinationAddress(local);
- d_xskSocket->removeWorkerRoute(local);
+ for (auto& xskSocket : d_xskSockets) {
+ xskSocket->removeWorkerRoute(local);
+ }
}
#endif /* HAVE_XSK */
connected = false;
#ifdef HAVE_XSK
- if (xskInfo != nullptr) {
+ if (!d_xskInfos.empty()) {
auto addresses = d_socketSourceAddresses.write_lock();
addresses->clear();
}
(*mplexer.lock())->removeReadFD(fd);
}
#ifdef HAVE_XSK
- if (xskInfo != nullptr) {
+ if (d_xskInfos.empty()) {
removeXSKDestination(fd);
}
#endif /* HAVE_XSK */
(*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
}
#ifdef HAVE_XSK
- if (xskInfo != nullptr) {
+ if (!d_xskInfos.empty()) {
addXSKDestination(fd);
}
#endif /* HAVE_XSK */
/* if at least one (re-)connection failed, close all sockets */
if (!connected) {
#ifdef HAVE_XSK
- if (xskInfo != nullptr) {
+ if (!d_xskInfos.empty()) {
auto addresses = d_socketSourceAddresses.write_lock();
addresses->clear();
}
for (auto& fd : sockets) {
if (fd != -1) {
#ifdef HAVE_XSK
- if (xskInfo != nullptr) {
+ if (!d_xskInfos.empty()) {
removeXSKDestination(fd);
}
#endif /* HAVE_XSK */
{
if (connected && !threadStarted.test_and_set()) {
#ifdef HAVE_XSK
- if (xskInfo != nullptr) {
- auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this());
+ for (auto& xskInfo : d_xskInfos) {
+ auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this(), xskInfo);
if (!d_config.d_cpus.empty()) {
mapThreadToCPUList(xskResponderThread.native_handle(), d_config.d_cpus);
}
return (*addresses)[idx % numberOfAddresses];
}
-void DownstreamState::registerXsk(std::shared_ptr<XskSocket>& xsk)
+[[nodiscard]] const std::shared_ptr<XskWorker>& DownstreamState::pickWorkerForSending()
+{
+ auto numberOfWorkers = d_xskInfos.size();
+ if (numberOfWorkers == 0) {
+ throw std::runtime_error("No XSK worker available for sending XSK data to backend " + getNameWithAddr());
+ }
+ size_t idx = dnsdist::getRandomValue(numberOfWorkers);
+ return d_xskInfos[idx % numberOfWorkers];
+}
+
+void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
{
- d_xskSocket = xsk;
+ d_xskSockets = xsks;
if (d_config.sourceAddr.sin4.sin_family == 0 || (IsAnyAddress(d_config.sourceAddr))) {
- const auto& ifName = xsk->getInterfaceName();
+ const auto& ifName = xsks.at(0)->getInterfaceName();
auto addresses = getListOfAddressesOfNetworkInterface(ifName);
if (addresses.empty()) {
throw std::runtime_error("Unable to get source address from interface " + ifName);
}
d_config.sourceAddr = addresses.at(0);
}
- xskInfo = XskWorker::create();
- xsk->addWorker(xskInfo);
+ d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
+
+ for (auto& xsk : d_xskSockets) {
+ auto xskInfo = XskWorker::create();
+ d_xskInfos.push_back(xskInfo);
+ xsk->addWorker(xskInfo);
+ xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
+ }
reconnect(false);
- d_config.sourceMACAddr = xsk->getSourceMACAddress();
- xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
}
#endif /* HAVE_XSK */
{
std::vector<std::shared_ptr<XskSocket>> g_xsk;
-void XskResponderThread(std::shared_ptr<DownstreamState> dss)
+void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<XskWorker> xskInfo)
{
- if (dss->xskInfo == nullptr) {
- throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
- }
-
try {
setThreadName("dnsdist/XskResp");
auto localRespRuleActions = g_respruleactions.getLocal();
auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
- auto xskInfo = dss->xskInfo;
auto pollfds = getPollFdsForWorker(*xskInfo);
const auto xskFd = xskInfo->workerWaker.getHandle();
while (!dss->isStopped()) {
const auto queryId = dnsHeader->id;
auto ids = dss->getState(queryId);
if (ids) {
- if (xskFd != ids->backendFD || !ids->isXSK()) {
+ if (!ids->isXSK()) {
+ // if (xskFd != ids->backendFD || !ids->isXSK()) {
dss->restoreState(queryId, std::move(*ids));
ids = std::nullopt;
}
}
if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
xskInfo->markAsFree(packet);
- vinfolog("XSK packet pushed to queue because processResponderPacket failed");
+ infolog("XSK packet pushed to queue because processResponderPacket failed");
return;
}
if (response.size() > packet.getCapacity()) {
/* fallback to sending the packet via normal socket */
sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
- vinfolog("XSK packet falling back because packet is too large");
+ infolog("XSK packet falling back because packet is too large");
xskInfo->markAsFree(packet);
return;
}
packet.setHeader(ids->xskPacketHeader);
if (!packet.setPayload(response)) {
- vinfolog("Unable to set XSK payload !");
+ infolog("Unable to set XSK payload !");
}
if (ids->delayMsec > 0) {
packet.addDelay(ids->delayMsec);
namespace dnsdist::xsk
{
-void XskResponderThread(std::shared_ptr<DownstreamState> dss);
+void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<XskWorker> xskInfo);
bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol);
bool XskProcessQuery(ClientState& clientState, LocalHolders& holders, XskPacket& packet);
void XskRouter(std::shared_ptr<XskSocket> xsk);