From ea247879c44e957d91f4c22595810f4bef491be7 Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Thu, 28 Dec 2023 15:18:44 +0100 Subject: [PATCH] dnsdist: Refactor the XSK code into a proper namespace --- pdns/dnsdist.cc | 550 ++++++++++++++++++++++++------------------------ pdns/xsk.cc | 14 +- pdns/xsk.hh | 54 +++-- 3 files changed, 320 insertions(+), 298 deletions(-) diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index f3e8408143..fb1c0e4379 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -776,27 +776,6 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re } } -#ifdef HAVE_XSK -static void XskHealthCheck(std::shared_ptr& dss, std::unordered_map>& map, bool initial = false) -{ - auto& xskInfo = dss->xskInfo; - std::shared_ptr data; - auto packet = getHealthCheckPacket(dss, nullptr, data); - data->d_initial = initial; - setHealthCheckTime(dss, data); - auto xskPacket = xskInfo->getEmptyFrame(); - if (!xskPacket) { - return; - } - xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr); - xskPacket->setPayload(packet); - xskPacket->rewrite(); - xskInfo->pushToSendQueue(std::move(xskPacket)); - const auto queryId = data->d_queryID; - map[queryId] = std::move(data); -} -#endif /* HAVE_XSK */ - static bool processResponderPacket(std::shared_ptr& dss, PacketBuffer& response, const std::vector& localRespRuleActions, const std::vector& cacheInsertedRespRuleActions, InternalQueryState&& ids) { @@ -833,135 +812,267 @@ static bool processResponderPacket(std::shared_ptr& dss, Packet return true; } -// listens on a dedicated socket, lobs answers from downstream servers to original requestors +#ifdef HAVE_XSK +namespace dnsdist::xsk +{ +static void doHealthCheck(std::shared_ptr& dss, std::unordered_map>& map, bool initial = false) +{ + auto& xskInfo = dss->xskInfo; + std::shared_ptr data; + auto packet = getHealthCheckPacket(dss, nullptr, data); + data->d_initial = initial; + setHealthCheckTime(dss, data); + auto xskPacket = xskInfo->getEmptyFrame(); + if (!xskPacket) { + return; + } + xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr); + xskPacket->setPayload(packet); + xskPacket->rewrite(); + xskInfo->pushToSendQueue(std::move(xskPacket)); + const auto queryId = data->d_queryID; + map[queryId] = std::move(data); +} + void responderThread(std::shared_ptr dss) { + if (dss->xskInfo == nullptr) { + throw std::runtime_error("Starting XSK responder thread for a backend without XSK!"); + } + try { - setThreadName("dnsdist/respond"); - auto localRespRuleActions = g_respruleactions.getLocal(); - auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal(); -#ifdef HAVE_XSK - if (dss->xskInfo) { - auto xskInfo = dss->xskInfo; - auto pollfds = getPollFdsForWorker(*xskInfo); - std::unordered_map> healthCheckMap; - XskHealthCheck(dss, healthCheckMap, true); - itimerspec tm; - tm.it_value.tv_sec = dss->d_config.checkTimeout / 1000; - tm.it_value.tv_nsec = (dss->d_config.checkTimeout % 1000) * 1000000; - tm.it_interval = tm.it_value; - auto res = timerfd_settime(pollfds[1].fd, 0, &tm, nullptr); - if (res) { - throw std::runtime_error("timerfd_settime failed:" + stringerror(errno)); - } - const auto xskFd = xskInfo->workerWaker.getHandle(); - while (!dss->isStopped()) { - poll(pollfds.data(), pollfds.size(), -1); - bool needNotify = false; - if (pollfds[0].revents & POLLIN) { - needNotify = true; + setThreadName("dnsdist/XskResp"); + auto localRespRuleActions = g_respruleactions.getLocal(); + auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal(); + auto xskInfo = dss->xskInfo; + auto pollfds = getPollFdsForWorker(*xskInfo); + std::unordered_map> healthCheckMap; + dnsdist::xsk::doHealthCheck(dss, healthCheckMap, true); + itimerspec tm; + tm.it_value.tv_sec = dss->d_config.checkTimeout / 1000; + tm.it_value.tv_nsec = (dss->d_config.checkTimeout % 1000) * 1000000; + tm.it_interval = tm.it_value; + auto res = timerfd_settime(pollfds[1].fd, 0, &tm, nullptr); + if (res) { + throw std::runtime_error("timerfd_settime failed:" + stringerror(errno)); + } + const auto xskFd = xskInfo->workerWaker.getHandle(); + while (!dss->isStopped()) { + poll(pollfds.data(), pollfds.size(), -1); + bool needNotify = false; + if (pollfds[0].revents & POLLIN) { + needNotify = true; #if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { #else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { #endif - auto packet = XskPacketPtr(packetRaw); - if (packet->getDataLen() < sizeof(dnsheader)) { - xskInfo->markAsFree(std::move(packet)); - return; - } - const dnsheader_aligned dnsHeader(packet->getPayloadData()); - const auto queryId = dnsHeader->id; - auto ids = dss->getState(queryId); - if (ids) { - if (xskFd != ids->backendFD || !ids->xskPacketHeader) { - dss->restoreState(queryId, std::move(*ids)); - ids = std::nullopt; - } - } - if (!ids) { - // this has to go before we can refactor the duplicated response handling code - auto iter = healthCheckMap.find(queryId); - if (iter != healthCheckMap.end()) { - auto data = std::move(iter->second); - healthCheckMap.erase(iter); - packet->cloneIntoPacketBuffer(data->d_buffer); - data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); - } - xskInfo->markAsFree(std::move(packet)); - return; - } - auto response = packet->clonePacketBuffer(); - if (response.size() > packet->getCapacity()) { - /* fallback to sending the packet via normal socket */ - ids->xskPacketHeader.reset(); - } - if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { - xskInfo->markAsFree(std::move(packet)); - vinfolog("XSK packet pushed to queue because processResponderPacket failed"); - return; - } - vinfolog("XSK packet - processResponderPacket OK"); - if (response.size() > packet->getCapacity()) { - /* fallback to sending the packet via normal socket */ - sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); - vinfolog("XSK packet falling back because packet is too large"); - xskInfo->markAsFree(std::move(packet)); - return; - } - //vinfolog("XSK packet - set header"); - packet->setHeader(*ids->xskPacketHeader); - //vinfolog("XSK packet - set payload"); - if (!packet->setPayload(response)) { - vinfolog("Unable to set payload !"); - } - if (ids->delayMsec > 0) { - vinfolog("XSK packet - adding delay"); - packet->addDelay(ids->delayMsec); - } - //vinfolog("XSK packet - update packet"); - packet->updatePacket(); - //vinfolog("XSK packet pushed to send queue"); - xskInfo->pushToSendQueue(std::move(packet)); - }); - xskInfo->cleanSocketNotification(); - } - if (pollfds[1].revents & POLLIN) { - timeval now; - gettimeofday(&now, nullptr); - for (auto i = healthCheckMap.begin(); i != healthCheckMap.end();) { - auto& ttd = i->second->d_ttd; - if (ttd < now) { - dss->submitHealthCheckResult(i->second->d_initial, false); - i = healthCheckMap.erase(i); - } - else { - ++i; + auto packet = XskPacketPtr(packetRaw); + if (packet->getDataLen() < sizeof(dnsheader)) { + xskInfo->markAsFree(std::move(packet)); + return; + } + const dnsheader_aligned dnsHeader(packet->getPayloadData()); + const auto queryId = dnsHeader->id; + auto ids = dss->getState(queryId); + if (ids) { + if (xskFd != ids->backendFD || !ids->xskPacketHeader) { + dss->restoreState(queryId, std::move(*ids)); + ids = std::nullopt; } } - needNotify = true; - dss->updateStatisticsInfo(); - dss->handleUDPTimeouts(); - if (dss->d_nextCheck <= 1) { - dss->d_nextCheck = dss->d_config.checkInterval; - if (dss->d_config.availability == DownstreamState::Availability::Auto) { - XskHealthCheck(dss, healthCheckMap); + if (!ids) { + // this has to go before we can refactor the duplicated response handling code + auto iter = healthCheckMap.find(queryId); + if (iter != healthCheckMap.end()) { + auto data = std::move(iter->second); + healthCheckMap.erase(iter); + packet->cloneIntoPacketBuffer(data->d_buffer); + data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); } + xskInfo->markAsFree(std::move(packet)); + return; + } + auto response = packet->clonePacketBuffer(); + if (response.size() > packet->getCapacity()) { + /* fallback to sending the packet via normal socket */ + ids->xskPacketHeader.reset(); + } + if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { + xskInfo->markAsFree(std::move(packet)); + vinfolog("XSK packet pushed to queue because processResponderPacket failed"); + return; + } + vinfolog("XSK packet - processResponderPacket OK"); + if (response.size() > packet->getCapacity()) { + /* fallback to sending the packet via normal socket */ + sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); + vinfolog("XSK packet falling back because packet is too large"); + xskInfo->markAsFree(std::move(packet)); + return; + } + //vinfolog("XSK packet - set header"); + packet->setHeader(*ids->xskPacketHeader); + //vinfolog("XSK packet - set payload"); + if (!packet->setPayload(response)) { + vinfolog("Unable to set payload !"); + } + if (ids->delayMsec > 0) { + vinfolog("XSK packet - adding delay"); + packet->addDelay(ids->delayMsec); + } + //vinfolog("XSK packet - update packet"); + packet->updatePacket(); + //vinfolog("XSK packet pushed to send queue"); + xskInfo->pushToSendQueue(std::move(packet)); + }); + xskInfo->cleanSocketNotification(); + } + if (pollfds[1].revents & POLLIN) { + timeval now; + gettimeofday(&now, nullptr); + for (auto i = healthCheckMap.begin(); i != healthCheckMap.end();) { + auto& ttd = i->second->d_ttd; + if (ttd < now) { + dss->submitHealthCheckResult(i->second->d_initial, false); + i = healthCheckMap.erase(i); } else { - --dss->d_nextCheck; + ++i; + } + } + needNotify = true; + dss->updateStatisticsInfo(); + dss->handleUDPTimeouts(); + if (dss->d_nextCheck <= 1) { + dss->d_nextCheck = dss->d_config.checkInterval; + if (dss->d_config.availability == DownstreamState::Availability::Auto) { + doHealthCheck(dss, healthCheckMap); } + } + else { + --dss->d_nextCheck; + } + + uint64_t tmp; + res = read(pollfds[1].fd, &tmp, sizeof(tmp)); + } + if (needNotify) { + xskInfo->notifyXskSocket(); + } + } + } + catch (const std::exception& e) { + errlog("XSK responder thread died because of exception: %s", e.what()); + } + catch (const PDNSException& e) { + errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason); + } + catch (...) { + errlog("XSK responder thread died because of an exception: %s", "unknown"); + } +} + +static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept +{ + const auto& from = packet.getFromAddr(); + expectProxyProtocol = expectProxyProtocolFrom(from); + if (!holders.acl->match(from) && !expectProxyProtocol) { + vinfolog("Query from %s dropped because of ACL", from.toStringWithPort()); + ++dnsdist::metrics::g_stats.aclDrops; + return false; + } + cs.queries++; + ++dnsdist::metrics::g_stats.queries; - uint64_t tmp; - res = read(pollfds[1].fd, &tmp, sizeof(tmp)); + return true; +} + +void XskRouter(std::shared_ptr xsk) +{ + setThreadName("dnsdist/XskRouter"); + uint32_t failed; + // packets to be submitted for sending + vector fillInTx; + const auto& fds = xsk->getDescriptors(); + // list of workers that need to be notified + std::set needNotify; + const auto& xskWakerIdx = xsk->getWorkers().get<0>(); + const auto& destIdx = xsk->getWorkers().get<1>(); + while (true) { + try { + auto ready = xsk->wait(-1); + // descriptor 0 gets incoming AF_XDP packets + if (fds.at(0).revents & POLLIN) { + auto packets = xsk->recv(64, &failed); + dnsdist::metrics::g_stats.nonCompliantQueries += failed; + for (auto &packet : packets) { + const auto dest = packet->getToAddr(); + auto res = destIdx.find(dest); + if (res == destIdx.end()) { + xsk->markAsFree(std::move(packet)); + continue; + } + res->worker->pushToProcessingQueue(std::move(packet)); + needNotify.insert(res->workerWaker); } - if (needNotify) { - xskInfo->notifyXskSocket(); + for (auto i : needNotify) { + uint64_t x = 1; + auto written = write(i, &x, sizeof(x)); + if (written != sizeof(x)) { + // oh, well, the worker is clearly overloaded + // but there is nothing we can do about it, + // and hopefully the queue will be processed eventually + } + } + needNotify.clear(); + ready--; + } + const auto backup = ready; + for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) { + if (fds.at(fdIndex).revents & POLLIN) { + ready--; + auto& info = xskWakerIdx.find(fds.at(fdIndex).fd)->worker; +#if defined(__SANITIZE_THREAD__) + info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { +#else + info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) { +#endif + auto packet = XskPacketPtr(packetRaw); + if (!(packet->getFlags() & XskPacket::UPDATE)) { + xsk->markAsFree(std::move(packet)); + return; + } + if (packet->getFlags() & XskPacket::DELAY) { + xsk->pushDelayed(std::move(packet)); + return; + } + fillInTx.push_back(std::move(packet)); + }); + info->cleanWorkerNotification(); } } + xsk->pickUpReadyPacket(fillInTx); + xsk->recycle(4096); + xsk->fillFq(); + xsk->send(fillInTx); + ready = backup; } - else { + catch (...) { + vinfolog("Exception in XSK router loop"); + } + } +} +} #endif /* HAVE_XSK */ + +// listens on a dedicated socket, lobs answers from downstream servers to original requestors +void responderThread(std::shared_ptr dss) +{ + try { + setThreadName("dnsdist/respond"); + auto localRespRuleActions = g_respruleactions.getLocal(); + auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal(); const size_t initialBufferSize = getInitialUDPPacketBufferSize(false); /* allocate one more byte so we can detect truncation */ PacketBuffer response(initialBufferSize + 1); @@ -1045,9 +1156,6 @@ void responderThread(std::shared_ptr dss) vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->d_config.remote.toStringWithPort(), queryId, e.what()); } } -#ifdef HAVE_XSK - } -#endif /* HAVE_XSK */ } catch (const std::exception& e) { errlog("UDP responder thread died because of exception: %s", e.what()); @@ -1467,23 +1575,6 @@ static bool isUDPQueryAcceptable(ClientState& cs, LocalHolders& holders, const s return true; } -#ifdef HAVE_XSK -static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept -{ - const auto& from = packet.getFromAddr(); - expectProxyProtocol = expectProxyProtocolFrom(from); - if (!holders.acl->match(from) && !expectProxyProtocol) { - vinfolog("Query from %s dropped because of ACL", from.toStringWithPort()); - ++dnsdist::metrics::g_stats.aclDrops; - return false; - } - cs.queries++; - ++dnsdist::metrics::g_stats.queries; - - return true; -} -#endif /* HAVE_XSK */ - bool checkDNSCryptQuery(const ClientState& cs, PacketBuffer& query, std::unique_ptr& dnsCryptQuery, time_t now, bool tcp) { if (cs.dnscryptCtx) { @@ -2054,6 +2145,8 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct } #ifdef HAVE_XSK +namespace dnsdist::xsk +{ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) { uint16_t queryId = 0; @@ -2175,6 +2268,39 @@ static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p } return false; } + +static void xskClientThread(ClientState* cs) +{ + setThreadName("dnsdist/xskClient"); + auto xskInfo = cs->xskInfo; + LocalHolders holders; + + for (;;) { +#if defined(__SANITIZE_THREAD__) + while (!xskInfo->incomingPacketsQueue.lock()->read_available()) { +#else + while (!xskInfo->incomingPacketsQueue.read_available()) { +#endif + xskInfo->waitForXskSocket(); + } +#if defined(__SANITIZE_THREAD__) + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { +#else + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { +#endif + auto packet = XskPacketPtr(packetRaw); + if (ProcessXskQuery(*cs, holders, *packet)) { + packet->updatePacket(); + xskInfo->pushToSendQueue(std::move(packet)); + } + else { + xskInfo->markAsFree(std::move(packet)); + } + }); + xskInfo->notifyXskSocket(); + } +} +} #endif /* HAVE_XSK */ #ifndef DISABLE_RECVMMSG @@ -2269,40 +2395,6 @@ static void MultipleMessagesUDPClientThread(ClientState* cs, LocalHolders& holde #endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */ #endif /* DISABLE_RECVMMSG */ -#ifdef HAVE_XSK -static void xskClientThread(ClientState* cs) -{ - setThreadName("dnsdist/xskClient"); - auto xskInfo = cs->xskInfo; - LocalHolders holders; - - for (;;) { -#if defined(__SANITIZE_THREAD__) - while (!xskInfo->incomingPacketsQueue.lock()->read_available()) { -#else - while (!xskInfo->incomingPacketsQueue.read_available()) { -#endif - xskInfo->waitForXskSocket(); - } -#if defined(__SANITIZE_THREAD__) - xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { -#else - xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { -#endif - auto packet = XskPacketPtr(packetRaw); - if (ProcessXskQuery(*cs, holders, *packet)) { - packet->updatePacket(); - xskInfo->pushToSendQueue(std::move(packet)); - } - else { - xskInfo->markAsFree(std::move(packet)); - } - }); - xskInfo->notifyXskSocket(); - } -} -#endif /* HAVE_XSK */ - // listens to incoming queries, sends out to downstream servers, noting the intended return path static void udpClientThread(std::vector states) { @@ -3282,17 +3374,13 @@ static void initFrontends() } } -#ifdef HAVE_XSK -void XskRouter(std::shared_ptr xsk); -#endif /* HAVE_XSK */ - namespace dnsdist { static void startFrontends() { #ifdef HAVE_XSK for (auto& xskContext : g_xsk) { - std::thread xskThread(XskRouter, std::move(xskContext)); + std::thread xskThread(dnsdist::xsk::XskRouter, std::move(xskContext)); xskThread.detach(); } #endif /* HAVE_XSK */ @@ -3302,7 +3390,7 @@ static void startFrontends() for (auto& clientState : g_frontends) { #ifdef HAVE_XSK if (clientState->xskInfo) { - std::thread xskCT(xskClientThread, clientState.get()); + std::thread xskCT(dnsdist::xsk::xskClientThread, clientState.get()); if (!clientState->cpus.empty()) { mapThreadToCPUList(xskCT.native_handle(), clientState->cpus); } @@ -3671,81 +3759,3 @@ int main(int argc, char** argv) #endif } } - -#ifdef HAVE_XSK -void XskRouter(std::shared_ptr xsk) -{ - setThreadName("dnsdist/XskRouter"); - uint32_t failed; - // packets to be submitted for sending - vector fillInTx; - const auto size = xsk->fds.size(); - // list of workers that need to be notified - std::set needNotify; - const auto& xskWakerIdx = xsk->workers.get<0>(); - const auto& destIdx = xsk->workers.get<1>(); - while (true) { - try { - auto ready = xsk->wait(-1); - // descriptor 0 gets incoming AF_XDP packets - if (xsk->fds[0].revents & POLLIN) { - auto packets = xsk->recv(64, &failed); - dnsdist::metrics::g_stats.nonCompliantQueries += failed; - for (auto &packet : packets) { - const auto dest = packet->getToAddr(); - auto res = destIdx.find(dest); - if (res == destIdx.end()) { - xsk->markAsFree(std::move(packet)); - continue; - } - res->worker->pushToProcessingQueue(std::move(packet)); - needNotify.insert(res->workerWaker); - } - for (auto i : needNotify) { - uint64_t x = 1; - auto written = write(i, &x, sizeof(x)); - if (written != sizeof(x)) { - // oh, well, the worker is clearly overloaded - // but there is nothing we can do about it, - // and hopefully the queue will be processed eventually - } - } - needNotify.clear(); - ready--; - } - const auto backup = ready; - for (size_t i = 1; i < size && ready > 0; i++) { - if (xsk->fds[i].revents & POLLIN) { - ready--; - auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker; -#if defined(__SANITIZE_THREAD__) - info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { -#else - info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) { -#endif - auto packet = XskPacketPtr(packetRaw); - if (!(packet->getFlags() & XskPacket::UPDATE)) { - xsk->markAsFree(std::move(packet)); - return; - } - if (packet->getFlags() & XskPacket::DELAY) { - xsk->waitForDelay.push(std::move(packet)); - return; - } - fillInTx.push_back(std::move(packet)); - }); - info->cleanWorkerNotification(); - } - } - xsk->pickUpReadyPacket(fillInTx); - xsk->recycle(4096); - xsk->fillFq(); - xsk->send(fillInTx); - ready = backup; - } - catch (...) { - vinfolog("Exception in XSK router loop"); - } - } -} -#endif /* HAVE_XSK */ diff --git a/pdns/xsk.cc b/pdns/xsk.cc index 2f438f9fd9..5b3c2142df 100644 --- a/pdns/xsk.cc +++ b/pdns/xsk.cc @@ -97,7 +97,7 @@ int XskSocket::firstTimeout() } timespec now; gettime(&now); - const auto& firstTime = waitForDelay.top()->sendTime; + const auto& firstTime = waitForDelay.top()->getSendTime(); const auto res = timeDifference(now, firstTime); if (res <= 0) { return 0; @@ -106,7 +106,7 @@ int XskSocket::firstTimeout() } XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_) : - frameNum(frameNum_), queueId(queue_id), ifName(ifName_), poolName(poolName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared>>()) + frameNum(frameNum_), ifName(ifName_), poolName(poolName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared>>()) { if (!isPowOfTwo(frameNum_) || !isPowOfTwo(frameSize) || !isPowOfTwo(fqCapacity) || !isPowOfTwo(cqCapacity) || !isPowOfTwo(rxCapacity) || !isPowOfTwo(txCapacity)) { @@ -219,7 +219,7 @@ int XskSocket::wait(int timeout) [[nodiscard]] uint64_t XskSocket::frameOffset(const XskPacket& packet) const noexcept { - return packet.frame - umem.bufBase; + return packet.getFrameOffsetFrom(umem.bufBase); } [[nodiscard]] int XskSocket::xskFd() const noexcept { @@ -314,7 +314,7 @@ void XskSocket::pickUpReadyPacket(std::vector& packets) { timespec now; gettime(&now); - while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top()->sendTime) <= 0) { + while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top()->getSendTime()) <= 0) { auto& top = const_cast(waitForDelay.top()); packets.push_back(std::move(top)); waitForDelay.pop(); @@ -676,7 +676,7 @@ void XskPacket::addDelay(const int relativeMilliseconds) noexcept bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept { - return s1->sendTime < s2->sendTime; + return s1->getSendTime() < s2->getSendTime(); } const ComboAddress& XskPacket::getFromAddr() const noexcept @@ -992,9 +992,9 @@ void XskSocket::addWorker(std::shared_ptr s, const ComboAddress& dest .revents = 0}); }; -uint64_t XskWorker::frameOffset(const XskPacket& s) const noexcept +uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept { - return s.frame - umemBufBase; + return packet.getFrameOffsetFrom(umemBufBase); } void XskWorker::notifyWorker() noexcept diff --git a/pdns/xsk.hh b/pdns/xsk.hh index 551c23074c..d8dc067563 100644 --- a/pdns/xsk.hh +++ b/pdns/xsk.hh @@ -87,12 +87,12 @@ class XskSocket ~XskUmem(); XskUmem() = default; }; - boost::multi_index_container< + using WorkerContainer = boost::multi_index_container< XskRouteInfo, boost::multi_index::indexed_by< boost::multi_index::hashed_unique>, - boost::multi_index::hashed_unique, ComboAddress::addressPortOnlyHash>>> - workers; + boost::multi_index::hashed_unique, ComboAddress::addressPortOnlyHash>>>; + WorkerContainer workers; // number of frames to keep in sharedEmptyFrameOffset static constexpr size_t holdThreshold = 256; // number of frames to insert into the fill queue @@ -100,8 +100,6 @@ class XskSocket static constexpr size_t frameSize = 2048; // number of entries (frames) in the umem const size_t frameNum; - // ID of the network queue - const uint32_t queueId; // responses that have been delayed std::priority_queue waitForDelay; const std::string ifName; @@ -123,7 +121,6 @@ class XskSocket xsk_ring_prod tx; std::unique_ptr socket; XskUmem umem; - bpf_object* prog; static constexpr uint32_t fqCapacity = XSK_RING_PROD__DEFAULT_NUM_DESCS * 4; static constexpr uint32_t cqCapacity = XSK_RING_CONS__DEFAULT_NUM_DESCS * 4; @@ -132,18 +129,10 @@ class XskSocket constexpr static bool isPowOfTwo(uint32_t value) noexcept; [[nodiscard]] static int timeDifference(const timespec& t1, const timespec& t2) noexcept; - friend void XskRouter(std::shared_ptr xsk); [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept; [[nodiscard]] int firstTimeout(); - // pick ups available frames from uniqueEmptyFrameOffset - // insert entries from uniqueEmptyFrameOffset into fq - void fillFq(uint32_t fillSize = fillThreshold) noexcept; - // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset - void recycle(size_t size) noexcept; void getMACFromIfName(); - // look at delayed packets, and send the ones that are ready - void pickUpReadyPacket(std::vector& packets); public: static constexpr size_t getFrameSize() @@ -164,6 +153,25 @@ public: void addWorker(std::shared_ptr s, const ComboAddress& dest); [[nodiscard]] std::string getMetrics() const; void markAsFree(XskPacketPtr&& packet); + [[nodiscard]] WorkerContainer& getWorkers() + { + return workers; + } + [[nodiscard]] const std::vector& getDescriptors() const + { + return fds; + } + // pick ups available frames from uniqueEmptyFrameOffset + // insert entries from uniqueEmptyFrameOffset into fq + void fillFq(uint32_t fillSize = fillThreshold) noexcept; + // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset + void recycle(size_t size) noexcept; + // look at delayed packets, and send the ones that are ready + void pickUpReadyPacket(std::vector& packets); + void pushDelayed(XskPacketPtr&& packet) + { + waitForDelay.push(std::move(packet)); + } }; struct iphdr; @@ -216,14 +224,8 @@ private: void setIPv6Header(const ipv6hdr& ipv6Header) noexcept; [[nodiscard]] udphdr getUDPHeader() const noexcept; void setUDPHeader(const udphdr& udpHeader) noexcept; - // parse IP and UDP payloads - bool parse(bool fromSetHeader); void changeDirectAndUpdateChecksum() noexcept; - friend XskSocket; - friend XskWorker; - friend bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept; - constexpr static uint8_t DefaultTTL = 64; public: @@ -244,7 +246,17 @@ public: XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize); void addDelay(int relativeMilliseconds) noexcept; void updatePacket() noexcept; + // parse IP and UDP payloads + bool parse(bool fromSetHeader); [[nodiscard]] uint32_t getFlags() const noexcept; + [[nodiscard]] timespec getSendTime() const noexcept + { + return sendTime; + } + [[nodiscard]] uint64_t getFrameOffsetFrom(const uint8_t* base) const noexcept + { + return frame - base; + } }; bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept; @@ -298,7 +310,7 @@ public: void waitForXskSocket() noexcept; void cleanWorkerNotification() noexcept; void cleanSocketNotification() noexcept; - [[nodiscard]] uint64_t frameOffset(const XskPacket& s) const noexcept; + [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept; // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset void fillUniqueEmptyOffset(); // look for an empty umem entry in uniqueEmptyFrameOffset -- 2.47.2