#include "dnsdist-lua.hh"
#include "dnsdist-resolver.hh"
#include "dnsdist-svc.hh"
+#include "dnsdist-xsk.hh"
#include "dolog.hh"
#include "xsk.hh"
else {
throw std::runtime_error("xskMapPath field is required!");
}
- extern std::vector<std::shared_ptr<XskSocket>> g_xsk;
auto socket = std::make_shared<XskSocket>(frameNums, ifName, queue_id, path);
- g_xsk.push_back(socket);
+ dnsdist::xsk::g_xsk.push_back(socket);
return socket;
});
luaCtx.registerFunction<std::string(std::shared_ptr<XskSocket>::*)()const>("getMetrics", [](const std::shared_ptr<XskSocket>& xsk) {
#include <sys/resource.h>
#include <unistd.h>
-#ifdef HAVE_XSK
-#include <sys/poll.h>
-#endif /* HAVE_XSK */
-
#ifdef HAVE_LIBEDIT
#if defined (__OpenBSD__) || defined(__NetBSD__)
// If this is not undeffed, __attribute__ wil be redefined by /usr/include/readline/rlstdc.h
#include "dnsdist-tcp.hh"
#include "dnsdist-web.hh"
#include "dnsdist-xpf.hh"
+#include "dnsdist-xsk.hh"
#include "base64.hh"
#include "capabilities.hh"
#include "misc.hh"
#include "sstuff.hh"
#include "threadname.hh"
+#include "xsk.hh"
/* Known sins:
std::vector<std::shared_ptr<DOQFrontend>> g_doqlocals;
std::vector<std::shared_ptr<DOH3Frontend>> g_doh3locals;
std::vector<std::shared_ptr<DNSCryptContext>> g_dnsCryptLocals;
-std::vector<std::shared_ptr<XskSocket>> g_xsk;
shared_ptr<BPFFilter> g_defaultBPFFilter{nullptr};
std::vector<std::shared_ptr<DynBPFFilter> > g_dynBPFFilters;
}
}
-static bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
+bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
{
const dnsheader_aligned dh(response.data());
return true;
}
-#ifdef HAVE_XSK
-namespace dnsdist::xsk
-{
-void responderThread(std::shared_ptr<DownstreamState> dss)
-{
- if (dss->xskInfo == nullptr) {
- throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
- }
-
- try {
- setThreadName("dnsdist/XskResp");
- auto localRespRuleActions = g_respruleactions.getLocal();
- auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
- auto xskInfo = dss->xskInfo;
- auto pollfds = getPollFdsForWorker(*xskInfo);
- const auto xskFd = xskInfo->workerWaker.getHandle();
- while (!dss->isStopped()) {
- poll(pollfds.data(), pollfds.size(), -1);
- bool needNotify = false;
- if (pollfds[0].revents & POLLIN) {
- needNotify = true;
-#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
- if (packet.getDataLen() < sizeof(dnsheader)) {
- xskInfo->markAsFree(std::move(packet));
- return;
- }
- const dnsheader_aligned dnsHeader(packet.getPayloadData());
- const auto queryId = dnsHeader->id;
- auto ids = dss->getState(queryId);
- if (ids) {
- if (xskFd != ids->backendFD || !ids->isXSK()) {
- dss->restoreState(queryId, std::move(*ids));
- ids = std::nullopt;
- }
- }
- if (!ids) {
- xskInfo->markAsFree(std::move(packet));
- return;
- }
- auto response = packet.clonePacketBuffer();
- if (response.size() > packet.getCapacity()) {
- /* fallback to sending the packet via normal socket */
- ids->xskPacketHeader.clear();
- }
- if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
- xskInfo->markAsFree(std::move(packet));
- vinfolog("XSK packet pushed to queue because processResponderPacket failed");
- return;
- }
- if (response.size() > packet.getCapacity()) {
- /* fallback to sending the packet via normal socket */
- sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
- vinfolog("XSK packet falling back because packet is too large");
- xskInfo->markAsFree(std::move(packet));
- return;
- }
- packet.setHeader(ids->xskPacketHeader);
- if (!packet.setPayload(response)) {
- vinfolog("Unable to set payload !");
- }
- if (ids->delayMsec > 0) {
- vinfolog("XSK packet - adding delay");
- packet.addDelay(ids->delayMsec);
- }
- packet.updatePacket();
- xskInfo->pushToSendQueue(std::move(packet));
- });
- xskInfo->cleanSocketNotification();
- }
- if (needNotify) {
- xskInfo->notifyXskSocket();
- }
- }
- }
- catch (const std::exception& e) {
- errlog("XSK responder thread died because of exception: %s", e.what());
- }
- catch (const PDNSException& e) {
- errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
- }
- catch (...) {
- errlog("XSK responder thread died because of an exception: %s", "unknown");
- }
-}
-
-static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept
-{
- const auto& from = packet.getFromAddr();
- expectProxyProtocol = expectProxyProtocolFrom(from);
- if (!holders.acl->match(from) && !expectProxyProtocol) {
- vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
- ++dnsdist::metrics::g_stats.aclDrops;
- return false;
- }
- cs.queries++;
- ++dnsdist::metrics::g_stats.queries;
-
- return true;
-}
-
-static void XskRouter(std::shared_ptr<XskSocket> xsk)
-{
- setThreadName("dnsdist/XskRouter");
- uint32_t failed;
- // packets to be submitted for sending
- vector<XskPacket> fillInTx;
- const auto& fds = xsk->getDescriptors();
- // list of workers that need to be notified
- std::set<int> needNotify;
- while (true) {
- try {
- auto ready = xsk->wait(-1);
- // descriptor 0 gets incoming AF_XDP packets
- if (fds.at(0).revents & POLLIN) {
- auto packets = xsk->recv(64, &failed);
- dnsdist::metrics::g_stats.nonCompliantQueries += failed;
- for (auto &packet : packets) {
- const auto dest = packet.getToAddr();
- auto worker = xsk->getWorkerByDestination(dest);
- if (!worker) {
- xsk->markAsFree(std::move(packet));
- continue;
- }
- worker->pushToProcessingQueue(std::move(packet));
- needNotify.insert(worker->workerWaker.getHandle());
- }
- for (auto i : needNotify) {
- uint64_t x = 1;
- auto written = write(i, &x, sizeof(x));
- if (written != sizeof(x)) {
- // oh, well, the worker is clearly overloaded
- // but there is nothing we can do about it,
- // and hopefully the queue will be processed eventually
- }
- }
- needNotify.clear();
- ready--;
- }
- const auto backup = ready;
- for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
- if (fds.at(fdIndex).revents & POLLIN) {
- ready--;
- auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
-#if defined(__SANITIZE_THREAD__)
- info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
- info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
- if (!(packet.getFlags() & XskPacket::UPDATE)) {
- xsk->markAsFree(std::move(packet));
- return;
- }
- if (packet.getFlags() & XskPacket::DELAY) {
- xsk->pushDelayed(std::move(packet));
- return;
- }
- fillInTx.push_back(std::move(packet));
- });
- info->cleanWorkerNotification();
- }
- }
- xsk->pickUpReadyPacket(fillInTx);
- xsk->recycle(4096);
- xsk->fillFq();
- xsk->send(fillInTx);
- ready = backup;
- }
- catch (...) {
- vinfolog("Exception in XSK router loop");
- }
- }
-}
-}
-#endif /* HAVE_XSK */
-
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
void responderThread(std::shared_ptr<DownstreamState> dss)
{
xskPacket->setHeader(ids->xskPacketHeader);
xskPacket->setPayload(response);
xskPacket->updatePacket();
- xskInfo->pushToSendQueue(std::move(*xskPacket));
+ xskInfo->pushToSendQueue(*xskPacket);
xskInfo->notifyXskSocket();
#endif /* HAVE_XSK */
}
#ifdef HAVE_XSK
namespace dnsdist::xsk
{
-static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
+bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
{
uint16_t queryId = 0;
const auto& remote = packet.getFromAddr();
try {
bool expectProxyProtocol = false;
- if (!isXskQueryAcceptable(packet, cs, holders, expectProxyProtocol)) {
+ if (!XskIsQueryAcceptable(packet, cs, holders, expectProxyProtocol)) {
return false;
}
return false;
}
-static void xskClientThread(ClientState* cs)
-{
- setThreadName("dnsdist/xskClient");
- auto xskInfo = cs->xskInfo;
- LocalHolders holders;
-
- for (;;) {
-#if defined(__SANITIZE_THREAD__)
- while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
-#else
- while (!xskInfo->incomingPacketsQueue.read_available()) {
-#endif
- xskInfo->waitForXskSocket();
- }
-#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
- if (ProcessXskQuery(*cs, holders, packet)) {
- packet.updatePacket();
- xskInfo->pushToSendQueue(std::move(packet));
- }
- else {
- xskInfo->markAsFree(std::move(packet));
- }
- });
- xskInfo->notifyXskSocket();
- }
-}
}
#endif /* HAVE_XSK */
static void startFrontends()
{
#ifdef HAVE_XSK
- for (auto& xskContext : g_xsk) {
+ for (auto& xskContext : dnsdist::xsk::g_xsk) {
std::thread xskThread(dnsdist::xsk::XskRouter, std::move(xskContext));
xskThread.detach();
}
for (auto& clientState : g_frontends) {
#ifdef HAVE_XSK
if (clientState->xskInfo) {
- XskSocket::addDestinationAddress(clientState->local);
+ dnsdist::xsk::addDestinationAddress(clientState->local);
- std::thread xskCT(dnsdist::xsk::xskClientThread, clientState.get());
+ std::thread xskCT(dnsdist::xsk::XskClientThread, clientState.get());
if (!clientState->cpus.empty()) {
mapThreadToCPUList(xskCT.native_handle(), clientState->cpus);
}
g_hashperturb = dnsdist::getRandomValue(0xffffffff);
#ifdef HAVE_XSK
- XskSocket::clearDestinationAddresses();
+#warning FIXME: we need to provide a way to clear the map from Lua, as well as a way to change the map path
+ try {
+ dnsdist::xsk::clearDestinationAddresses();
+ }
+ catch (const std::exception& exp) {
+ /* silently handle failures: at this point we don't even know if XSK is enabled,
+ and we might not have the correct map (not the default one). */
+ }
#endif /* HAVE_XSK */
ComboAddress clientAddress = ComboAddress();
#include "uuid-utils.hh"
#include "proxy-protocol.hh"
#include "stat_t.hh"
-#include "xsk.hh"
uint64_t uptimeOfProcess(const std::string& str);
extern QueryCount g_qcount;
+class XskPacket;
+class XskSocket;
+class XskWorker;
+
struct ClientState
{
ClientState(const ComboAddress& local_, bool isTCP_, bool doReusePort, int fastOpenQueue, const std::string& itfName, const std::set<int>& cpus_, bool enableProxyProtocol): cpus(cpus_), interface(itfName), local(local_), fastOpenQueueSize(fastOpenQueue), tcp(isTCP_), reuseport(doReusePort), d_enableProxyProtocol(enableProxyProtocol)
std::string d_dohPath;
std::string name;
std::string nameWithAddr;
- MACAddr sourceMACAddr;
- MACAddr destMACAddr;
+#ifdef HAVE_XSK
+ std::array<uint8_t, 6> sourceMACAddr;
+ std::array<uint8_t, 6> destMACAddr;
+#endif /* HAVE_XSK */
size_t d_numberOfSockets{1};
size_t d_maxInFlightQueriesPerConn{1};
size_t d_tcpConcurrentConnectionsLimit{0};
bool processResponse(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& respRuleActions, const std::vector<DNSDistResponseRuleAction>& insertedRespRuleActions, DNSResponse& dr, bool muted);
bool processRulesResult(const DNSAction::Action& action, DNSQuestion& dq, std::string& ruleresult, bool& drop);
bool processResponseAfterRules(PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, DNSResponse& dr, bool muted);
+bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids);
bool assignOutgoingUDPQueryToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t queryID, DNSQuestion& dq, PacketBuffer& query, bool actuallySend = true);
bool sendUDPResponse(int origFD, const PacketBuffer& response, const int delayMsec, const ComboAddress& origDest, const ComboAddress& origRemote);
void handleResponseSent(const DNSName& qname, const QType& qtype, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, dnsdist::Protocol incomingProtocol, bool fromBackend);
void handleResponseSent(const InternalQueryState& ids, double udiff, const ComboAddress& client, const ComboAddress& backend, unsigned int size, const dnsheader& cleartextDH, dnsdist::Protocol outgoingProtocol, bool fromBackend);
-
-#ifdef HAVE_XSK
-namespace dnsdist::xsk
-{
-void responderThread(std::shared_ptr<DownstreamState> dss);
-}
-#endif /* HAVE_XSK */
dnsdist-tcp.cc dnsdist-tcp.hh \
dnsdist-web.cc dnsdist-web.hh \
dnsdist-xpf.cc dnsdist-xpf.hh \
+ dnsdist-xsk.cc dnsdist-xsk.hh \
dnsdist.cc dnsdist.hh \
dnslabeltext.cc \
dnsname.cc dnsname.hh \
dnsdist-tcp-downstream.cc \
dnsdist-tcp.cc dnsdist-tcp.hh \
dnsdist-xpf.cc dnsdist-xpf.hh \
+ dnsdist-xsk.cc dnsdist-xsk.hh \
dnsdist.hh \
dnslabeltext.cc \
dnsname.cc dnsname.hh \
#include "dnsdist-random.hh"
#include "dnsdist-rings.hh"
#include "dnsdist-tcp.hh"
+#include "dnsdist-xsk.hh"
#include "dolog.hh"
#include "xsk.hh"
auto addresses = d_socketSourceAddresses.write_lock();
addresses->push_back(local);
}
- XskSocket::addDestinationAddress(local);
+ dnsdist::xsk::addDestinationAddress(local);
d_xskSocket->addWorkerRoute(xskInfo, local);
}
return;
}
- XskSocket::removeDestinationAddress(local);
+ dnsdist::xsk::removeDestinationAddress(local);
d_xskSocket->removeWorkerRoute(local);
}
#endif /* HAVE_XSK */
if (connected && !threadStarted.test_and_set()) {
#ifdef HAVE_XSK
if (xskInfo != nullptr) {
- tid = std::thread(dnsdist::xsk::responderThread, shared_from_this());
+ tid = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this());
}
else {
tid = std::thread(responderThread, shared_from_this());
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "dnsdist.hh"
+#include "dnsdist-xsk.hh"
+
+#ifdef HAVE_XSK
+#include <sys/poll.h>
+
+#include "dnsdist-metrics.hh"
+#include "dnsdist-proxy-protocol.hh"
+#include "threadname.hh"
+#include "xsk.hh"
+
+namespace dnsdist::xsk
+{
+std::vector<std::shared_ptr<XskSocket>> g_xsk;
+
+void XskResponderThread(std::shared_ptr<DownstreamState> dss)
+{
+ if (dss->xskInfo == nullptr) {
+ throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
+ }
+
+ try {
+ setThreadName("dnsdist/XskResp");
+ auto localRespRuleActions = g_respruleactions.getLocal();
+ auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
+ auto xskInfo = dss->xskInfo;
+ auto pollfds = getPollFdsForWorker(*xskInfo);
+ const auto xskFd = xskInfo->workerWaker.getHandle();
+ while (!dss->isStopped()) {
+ poll(pollfds.data(), pollfds.size(), -1);
+ bool needNotify = false;
+ if ((pollfds[0].revents & POLLIN) != 0) {
+ needNotify = true;
+#if defined(__SANITIZE_THREAD__)
+ xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
+#else
+ xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
+#endif
+ if (packet.getDataLen() < sizeof(dnsheader)) {
+ xskInfo->markAsFree(packet);
+ return;
+ }
+ const dnsheader_aligned dnsHeader(packet.getPayloadData());
+ const auto queryId = dnsHeader->id;
+ auto ids = dss->getState(queryId);
+ if (ids) {
+ if (xskFd != ids->backendFD || !ids->isXSK()) {
+ dss->restoreState(queryId, std::move(*ids));
+ ids = std::nullopt;
+ }
+ }
+ if (!ids) {
+ xskInfo->markAsFree(packet);
+ return;
+ }
+ auto response = packet.clonePacketBuffer();
+ if (response.size() > packet.getCapacity()) {
+ /* fallback to sending the packet via normal socket */
+ ids->xskPacketHeader.clear();
+ }
+ if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
+ xskInfo->markAsFree(packet);
+ vinfolog("XSK packet pushed to queue because processResponderPacket failed");
+ return;
+ }
+ if (response.size() > packet.getCapacity()) {
+ /* fallback to sending the packet via normal socket */
+ sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
+ vinfolog("XSK packet falling back because packet is too large");
+ xskInfo->markAsFree(packet);
+ return;
+ }
+ packet.setHeader(ids->xskPacketHeader);
+ if (!packet.setPayload(response)) {
+ vinfolog("Unable to set XSK payload !");
+ }
+ if (ids->delayMsec > 0) {
+ packet.addDelay(ids->delayMsec);
+ }
+ packet.updatePacket();
+ xskInfo->pushToSendQueue(packet);
+ });
+ xskInfo->cleanSocketNotification();
+ }
+ if (needNotify) {
+ xskInfo->notifyXskSocket();
+ }
+ }
+ }
+ catch (const std::exception& e) {
+ errlog("XSK responder thread died because of exception: %s", e.what());
+ }
+ catch (const PDNSException& e) {
+ errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
+ }
+ catch (...) {
+ errlog("XSK responder thread died because of an exception: %s", "unknown");
+ }
+}
+
+bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol)
+{
+ const auto& from = packet.getFromAddr();
+ expectProxyProtocol = expectProxyProtocolFrom(from);
+ if (!holders.acl->match(from) && !expectProxyProtocol) {
+ vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
+ ++dnsdist::metrics::g_stats.aclDrops;
+ return false;
+ }
+ clientState.queries++;
+ ++dnsdist::metrics::g_stats.queries;
+
+ return true;
+}
+
+void XskRouter(std::shared_ptr<XskSocket> xsk)
+{
+ setThreadName("dnsdist/XskRouter");
+ uint32_t failed = 0;
+ // packets to be submitted for sending
+ vector<XskPacket> fillInTx;
+ const auto& fds = xsk->getDescriptors();
+ // list of workers that need to be notified
+ std::set<int> needNotify;
+ while (true) {
+ try {
+ auto ready = xsk->wait(-1);
+ // descriptor 0 gets incoming AF_XDP packets
+ if ((fds.at(0).revents & POLLIN) != 0) {
+ auto packets = xsk->recv(64, &failed);
+ dnsdist::metrics::g_stats.nonCompliantQueries += failed;
+ for (auto &packet : packets) {
+ const auto dest = packet.getToAddr();
+ auto worker = xsk->getWorkerByDestination(dest);
+ if (!worker) {
+ xsk->markAsFree(packet);
+ continue;
+ }
+ worker->pushToProcessingQueue(packet);
+ needNotify.insert(worker->workerWaker.getHandle());
+ }
+ for (auto socket : needNotify) {
+ uint64_t value = 1;
+ auto written = write(socket, &value, sizeof(value));
+ if (written != sizeof(value)) {
+ // oh, well, the worker is clearly overloaded
+ // but there is nothing we can do about it,
+ // and hopefully the queue will be processed eventually
+ }
+ }
+ needNotify.clear();
+ ready--;
+ }
+ const auto backup = ready;
+ for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
+ if ((fds.at(fdIndex).revents & POLLIN) != 0) {
+ ready--;
+ const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
+#if defined(__SANITIZE_THREAD__)
+ info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
+#else
+ info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
+#endif
+ if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
+ xsk->markAsFree(packet);
+ return;
+ }
+ if ((packet.getFlags() & XskPacket::DELAY) != 0) {
+ xsk->pushDelayed(packet);
+ return;
+ }
+ fillInTx.push_back(packet);
+ });
+ info->cleanWorkerNotification();
+ }
+ }
+ xsk->pickUpReadyPacket(fillInTx);
+ xsk->recycle(4096);
+ xsk->fillFq();
+ xsk->send(fillInTx);
+ ready = backup;
+ }
+ catch (...) {
+ vinfolog("Exception in XSK router loop");
+ }
+ }
+}
+
+void XskClientThread(ClientState* clientState)
+{
+ setThreadName("dnsdist/xskClient");
+ auto xskInfo = clientState->xskInfo;
+ LocalHolders holders;
+
+ for (;;) {
+#if defined(__SANITIZE_THREAD__)
+ while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) {
+#else
+ while (xskInfo->incomingPacketsQueue.read_available() == 0U) {
+#endif
+ xskInfo->waitForXskSocket();
+ }
+#if defined(__SANITIZE_THREAD__)
+ xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
+#else
+ xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
+#endif
+ if (XskProcessQuery(*clientState, holders, packet)) {
+ packet.updatePacket();
+ xskInfo->pushToSendQueue(packet);
+ }
+ else {
+ xskInfo->markAsFree(packet);
+ }
+ });
+ xskInfo->notifyXskSocket();
+ }
+}
+
+static std::string getDestinationMap(bool isV6) {
+ return !isV6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
+}
+
+void addDestinationAddress(const ComboAddress& addr)
+{
+ auto map = getDestinationMap(addr.isIPv6());
+ XskSocket::addDestinationAddress(map, addr);
+}
+
+void removeDestinationAddress(const ComboAddress& addr)
+{
+ auto map = getDestinationMap(addr.isIPv6());
+ XskSocket::removeDestinationAddress(map, addr);
+}
+
+void clearDestinationAddresses()
+{
+ auto map = getDestinationMap(false);
+ XskSocket::clearDestinationMap(map, false);
+ map = getDestinationMap(true);
+ XskSocket::clearDestinationMap(map, true);
+}
+
+}
+#endif /* HAVE_XSK */
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include "config.h"
+
+#ifdef HAVE_XSK
+class XskPacket;
+class XskSocket;
+class XskWorker;
+
+#include <memory>
+
+namespace dnsdist::xsk
+{
+void XskResponderThread(std::shared_ptr<DownstreamState> dss);
+bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, LocalHolders& holders, bool& expectProxyProtocol);
+bool XskProcessQuery(ClientState& clientState, LocalHolders& holders, XskPacket& packet);
+void XskRouter(std::shared_ptr<XskSocket> xsk);
+void XskClientThread(ClientState* clientState);
+void addDestinationAddress(const ComboAddress& addr);
+void removeDestinationAddress(const ComboAddress& addr);
+void clearDestinationAddresses();
+
+extern std::vector<std::shared_ptr<XskSocket>> g_xsk;
+}
+#endif /* HAVE_XSK */
{
}
-namespace dnsdist::xsk
-{
-void responderThread(std::shared_ptr<DownstreamState> dss)
-{
-}
-}
-
string g_outputBuffer;
std::atomic<bool> g_configurationDone{false};
#include "dnsdist-internal-queries.hh"
#include "dnsdist-tcp.hh"
#include "dnsdist-xpf.hh"
+#include "dnsdist-xsk.hh"
#include "dolog.hh"
#include "dnsname.hh"
{
return false;
}
+namespace dnsdist::xsk
+{
+bool XskProcessQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
+{
+ return false;
+}
+}
-std::vector<std::shared_ptr<XskSocket>> g_xsk;
+bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
+{
+ return false;
+}
BOOST_AUTO_TEST_SUITE(test_dnsdist_cc)
if (waitForDelay.empty()) {
return -1;
}
- timespec now;
+ timespec now{};
gettime(&now);
const auto& firstTime = waitForDelay.top().getSendTime();
const auto res = timeDifference(now, firstTime);
return res;
}
-XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath) :
- frameNum(frameNum_), ifName(ifName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
+XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, const std::string& xskMapPath) :
+ frameNum(frameNum_), ifName(std::move(ifName_)), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
{
if (!isPowOfTwo(frameNum_) || !isPowOfTwo(frameSize)
|| !isPowOfTwo(fqCapacity) || !isPowOfTwo(cqCapacity) || !isPowOfTwo(rxCapacity) || !isPowOfTwo(txCapacity)) {
memset(&tx, 0, sizeof(tx));
memset(&rx, 0, sizeof(rx));
- xsk_umem_config umemCfg;
+ xsk_umem_config umemCfg{};
umemCfg.fill_size = fqCapacity;
umemCfg.comp_size = cqCapacity;
umemCfg.frame_size = frameSize;
umem.umemInit(frameNum_ * frameSize, &cq, &fq, &umemCfg);
{
- xsk_socket_config socketCfg;
+ xsk_socket_config socketCfg{};
socketCfg.rx_size = rxCapacity;
socketCfg.tx_size = txCapacity;
socketCfg.bind_flags = XDP_USE_NEED_WAKEUP;
}
auto ret = bpf_map_update_elem(xskMapFd.getHandle(), &queue_id, &xskfd, 0);
- if (ret) {
+ if (ret != 0) {
throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath + "': " + std::to_string(ret));
}
}
// see xdp.h in contrib/
-
struct IPv4AndPort
{
uint32_t addr;
uint16_t port;
};
-static void clearDestinationMap(bool v6)
+static FDWrapper getDestinationMap(const std::string& mapPath)
{
- const std::string mapPath = !v6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
-
- const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
+ auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
if (destMapFd.getHandle() < 0) {
throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
}
+ return destMapFd;
+}
- if (!v6) {
+void XskSocket::clearDestinationMap(const std::string& mapPath, bool isV6)
+{
+ auto destMapFd = getDestinationMap(mapPath);
+ if (!isV6) {
IPv4AndPort prevKey{};
IPv4AndPort key{};
while (bpf_map_get_next_key(destMapFd.getHandle(), &prevKey, &key) == 0) {
}
}
-void XskSocket::clearDestinationAddresses()
+void XskSocket::addDestinationAddress(const std::string& mapPath, const ComboAddress& destination)
{
- clearDestinationMap(false);
- clearDestinationMap(true);
-}
-
-void XskSocket::addDestinationAddress(const ComboAddress& destination)
-{
- // see xdp.h in contrib/
-
- const std::string mapPath = destination.isIPv4() ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
- //if (!s_destinationAddressesMap) {
- // throw std::runtime_error("The path of the XSK (AF_XDP) destination addresses map has not been set! Please consider using setXSKDestinationAddressesMapPath().");
- //}
-
- const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
- if (destMapFd.getHandle() < 0) {
- throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
- }
-
+ auto destMapFd = getDestinationMap(mapPath);
bool value = true;
if (destination.isIPv4()) {
IPv4AndPort key{};
key.addr = destination.sin4.sin_addr.s_addr;
key.port = destination.sin4.sin_port;
auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0);
- if (ret) {
+ if (ret != 0) {
throw std::runtime_error("Error inserting into xsk_map '" + mapPath + "': " + std::to_string(ret));
}
}
key.addr = destination.sin6.sin6_addr;
key.port = destination.sin6.sin6_port;
auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0);
- if (ret) {
+ if (ret != 0) {
throw std::runtime_error("Error inserting into XSK destination addresses map '" + mapPath + "': " + std::to_string(ret));
}
}
}
-void XskSocket::removeDestinationAddress(const ComboAddress& destination)
+void XskSocket::removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination)
{
- const std::string mapPath = destination.isIPv4() ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
- //if (!s_destinationAddressesMap) {
- // throw std::runtime_error("The path of the XSK (AF_XDP) destination addresses map has not been set! Please consider using setXSKDestinationAddressesMapPath().");
- //}
-
- const auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
- if (destMapFd.getHandle() < 0) {
- throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
- }
-
+ auto destMapFd = getDestinationMap(mapPath);
if (destination.isIPv4()) {
IPv4AndPort key{};
key.addr = destination.sin4.sin_addr.s_addr;
void XskSocket::send(std::vector<XskPacket>& packets)
{
- while (packets.size() > 0) {
+ while (!packets.empty()) {
auto packetSize = packets.size();
if (packetSize > std::numeric_limits<uint32_t>::max()) {
packetSize = std::numeric_limits<uint32_t>::max();
return res;
}
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
const auto baseAddr = reinterpret_cast<uint64_t>(umem.bufBase);
uint32_t failed = 0;
uint32_t processed = 0;
for (; processed < recvSize; processed++) {
try {
const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
#ifdef DEBUG_UMEM
checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
if (!packet.parse(false)) {
++failed;
- markAsFree(std::move(packet));
+ markAsFree(packet);
}
else {
- res.push_back(std::move(packet));
+ res.push_back(packet);
}
}
catch (const std::exception& exp) {
// which will only be made available again when pushed into the fill
// queue
xsk_ring_cons__release(&rx, processed);
- if (failedCount) {
+ if (failedCount != nullptr) {
*failedCount = failed;
}
void XskSocket::pickUpReadyPacket(std::vector<XskPacket>& packets)
{
- timespec now;
+ timespec now{};
gettime(&now);
while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top().getSendTime()) <= 0) {
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
auto& top = const_cast<XskPacket&>(waitForDelay.top());
- packets.push_back(std::move(top));
+ packets.push_back(top);
waitForDelay.pop();
}
}
std::string XskSocket::getMetrics() const
{
- struct xdp_statistics stats;
+ xdp_statistics stats{};
socklen_t optlen = sizeof(stats);
int err = getsockopt(xskFd(), SOL_XDP, XDP_STATISTICS, &stats, &optlen);
- if (err) {
+ if (err != 0) {
return "";
}
if (optlen != sizeof(struct xdp_statistics)) {
return ret.str();
}
-void XskSocket::markAsFree(XskPacket&& packet)
+void XskSocket::markAsFree(const XskPacket& packet)
{
auto offset = frameOffset(packet);
#ifdef DEBUG_UMEM
XskSocket::XskUmem::~XskUmem()
{
- if (umem) {
+ if (umem != nullptr) {
xsk_umem__delete(umem);
}
- if (bufBase) {
+ if (bufBase != nullptr) {
munmap(bufBase, size);
}
}
[[nodiscard]] iphdr XskPacket::getIPv4Header() const noexcept
{
iphdr ipv4Header{};
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv4Header)));
assert(!v6);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(&ipv4Header, frame + sizeof(ethhdr), sizeof(ipv4Header));
return ipv4Header;
}
void XskPacket::setIPv4Header(const iphdr& ipv4Header) noexcept
{
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
assert(frameLength >= (sizeof(ethhdr) + sizeof(iphdr)));
assert(!v6);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(frame + sizeof(ethhdr), &ipv4Header, sizeof(ipv4Header));
}
[[nodiscard]] ipv6hdr XskPacket::getIPv6Header() const noexcept
{
ipv6hdr ipv6Header{};
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
assert(v6);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(&ipv6Header, frame + sizeof(ethhdr), sizeof(ipv6Header));
return ipv6Header;
}
void XskPacket::setIPv6Header(const ipv6hdr& ipv6Header) noexcept
{
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
assert(v6);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(frame + sizeof(ethhdr), &ipv6Header, sizeof(ipv6Header));
}
[[nodiscard]] udphdr XskPacket::getUDPHeader() const noexcept
{
udphdr udpHeader{};
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader)));
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(&udpHeader, frame + getL4HeaderOffset(), sizeof(udpHeader));
return udpHeader;
}
// check ip.check == ipv4Checksum() is not needed!
// We check it in BPF program
// we don't, actually.
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
from = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
to = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
l4Protocol = ipHeader.protocol;
if (!fromSetHeader && (frameLength - sizeof(ethhdr)) != ntohs(ipHeader.tot_len)) {
}
v6 = true;
auto ipHeader = getIPv6Header();
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
from = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
to = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
l4Protocol = ipHeader.nexthdr;
if (!fromSetHeader && (frameLength - (sizeof(ethhdr) + sizeof(ipv6hdr))) != ntohs(ipHeader.payload_len)) {
{
auto ethHeader = getEthernetHeader();
{
- uint8_t tmp[ETH_ALEN];
- static_assert(sizeof(tmp) == sizeof(ethHeader.h_dest), "Size Error");
- static_assert(sizeof(tmp) == sizeof(ethHeader.h_source), "Size Error");
- memcpy(tmp, ethHeader.h_dest, sizeof(tmp));
- memcpy(ethHeader.h_dest, ethHeader.h_source, sizeof(tmp));
- memcpy(ethHeader.h_source, tmp, sizeof(tmp));
+ std::array<uint8_t, ETH_ALEN> tmp;
+ static_assert(tmp.size() == sizeof(ethHeader.h_dest), "Size Error");
+ static_assert(tmp.size() == sizeof(ethHeader.h_source), "Size Error");
+ memcpy(tmp.data(), ethHeader.h_dest, tmp.size());
+ memcpy(ethHeader.h_dest, ethHeader.h_source, tmp.size());
+ memcpy(ethHeader.h_source, tmp.data(), tmp.size());
}
if (ethHeader.h_proto == htons(ETH_P_IPV6)) {
// IPV6
{
const auto size = getDataSize();
PacketBuffer tmp(size);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(tmp.data(), frame + getDataOffset(), size);
return tmp;
}
{
const auto size = getDataSize();
buffer.resize(size);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(buffer.data(), frame + getDataOffset(), size);
}
return false;
}
flags |= UPDATE;
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
memcpy(frame + getDataOffset(), buf.data(), bufSize);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
frameLength = getDataOffset() + bufSize;
return true;
}
void XskPacket::addDelay(const int relativeMilliseconds) noexcept
{
gettime(&sendTime);
- sendTime.tv_nsec += static_cast<uint64_t>(relativeMilliseconds) * 1000000L;
+ sendTime.tv_nsec += static_cast<int64_t>(relativeMilliseconds) * 1000000L;
sendTime.tv_sec += sendTime.tv_nsec / 1000000000L;
sendTime.tv_nsec %= 1000000000L;
}
-bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept
+bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept
{
- return s1.getSendTime() < s2.getSendTime();
+ return lhs.getSendTime() < rhs.getSendTime();
}
const ComboAddress& XskPacket::getFromAddr() const noexcept
return to;
}
-void XskWorker::notify(int fd)
+void XskWorker::notify(int desc)
{
uint64_t value = 1;
ssize_t res = 0;
- while ((res = write(fd, &value, sizeof(value))) == EINTR) {
+ while ((res = write(desc, &value, sizeof(value))) == EINTR) {
}
if (res != sizeof(value)) {
throw runtime_error("Unable Wake Up XskSocket Failed");
{
}
-void XskWorker::pushToProcessingQueue(XskPacket&& packet)
+void XskWorker::pushToProcessingQueue(XskPacket& packet)
{
#if defined(__SANITIZE_THREAD__)
- if (!incomingPacketsQueue.lock()->push(std::move(packet))) {
+ if (!incomingPacketsQueue.lock()->push(packet)) {
#else
- if (!incomingPacketsQueue.push(std::move(packet))) {
+ if (!incomingPacketsQueue.push(packet)) {
#endif
- markAsFree(std::move(packet));
+ markAsFree(packet);
}
}
-void XskWorker::pushToSendQueue(XskPacket&& packet)
+void XskWorker::pushToSendQueue(XskPacket& packet)
{
#if defined(__SANITIZE_THREAD__)
- if (!outgoingPacketsQueue.lock()->push(std::move(packet))) {
+ if (!outgoingPacketsQueue.lock()->push(packet)) {
#else
- if (!outgoingPacketsQueue.push(std::move(packet))) {
+ if (!outgoingPacketsQueue.push(packet)) {
#endif
- markAsFree(std::move(packet));
+ markAsFree(packet);
}
}
const void* XskPacket::getPayloadData() const
{
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
return frame + getDataOffset();
}
void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept
{
auto ethHeader = getEthernetHeader();
- memcpy(ethHeader.h_dest, &toMAC[0], sizeof(MACAddr));
- memcpy(ethHeader.h_source, &fromMAC[0], sizeof(MACAddr));
+ memcpy(ethHeader.h_dest, toMAC.data(), toMAC.size());
+ memcpy(ethHeader.h_source, fromMAC.data(), fromMAC.size());
setEthernetHeader(ethHeader);
to = to_;
from = from_;
setEthernetHeader(ethHeader);
}
-[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ip) noexcept
+[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ipHeader) noexcept
{
- auto partial = ip_checksum_partial(ip, sizeof(iphdr), 0);
+ auto partial = ip_checksum_partial(ipHeader, sizeof(iphdr), 0);
return ip_checksum_fold(partial);
}
-[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ip) const noexcept
+[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ipHeader) const noexcept
{
// ip options is not supported !!!
const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
- auto sum = tcp_udp_v4_header_checksum_partial(ip->saddr, ip->daddr, ip->protocol, l4Length);
+ auto sum = tcp_udp_v4_header_checksum_partial(ipHeader->saddr, ipHeader->daddr, ipHeader->protocol, l4Length);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
return ip_checksum_fold(sum);
}
{
const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
uint64_t sum = tcp_udp_v6_header_checksum_partial(&ipv6->saddr, &ipv6->daddr, ipv6->nexthdr, l4Length);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
return ip_checksum_fold(sum);
}
/* Main loop: 32 bits at a time */
for (position = 0; position < len; position += sizeof(uint32_t)) {
uint32_t value{};
- memcpy(&value, reinterpret_cast<const uint8_t*>(ptr) + position, sizeof(value));
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ memcpy(&value, static_cast<const uint8_t*>(ptr) + position, sizeof(value));
sum += value;
}
/* Handle un-32bit-aligned trailing bytes */
if ((len - position) >= 2) {
uint16_t value{};
- memcpy(&value, reinterpret_cast<const uint8_t*>(ptr) + position, sizeof(value));
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ memcpy(&value, static_cast<const uint8_t*>(ptr) + position, sizeof(value));
sum += value;
position += sizeof(value);
}
if ((len - position) > 0) {
- const auto* p8 = static_cast<const uint8_t*>(ptr) + position;
- sum += ntohs(*p8 << 8); /* RFC says pad last byte */
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ const auto* ptr8 = static_cast<const uint8_t*>(ptr) + position;
+ sum += ntohs(*ptr8 << 8); /* RFC says pad last byte */
}
return sum;
[[nodiscard]] __be16 XskPacket::ip_checksum_fold(uint64_t sum) noexcept
{
- while (sum & ~0xffffffffULL) {
+ while ((sum & ~0xffffffffULL) != 0U) {
sum = (sum >> 32) + (sum & 0xffffffffULL);
}
- while (sum & 0xffff0000ULL) {
+ while ((sum & 0xffff0000ULL) != 0U) {
sum = (sum >> 16) + (sum & 0xffffULL);
}
}
#ifndef __packed
-#define __packed __attribute__((packed))
+#define packed_attribute __attribute__((packed))
+#else
+#define packed_attribute __packed
#endif
+// NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
[[nodiscard]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept
{
struct header
/* We use a union here to avoid aliasing issues with gcc -O2 */
union
{
- header __packed fields;
+ header packed_attribute fields;
+ // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
uint32_t words[3];
};
};
struct in6_addr src_ip;
struct in6_addr dst_ip;
__be32 length;
+ // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
__uint8_t mbz[3];
__uint8_t next_header;
};
/* We use a union here to avoid aliasing issues with gcc -O2 */
union
{
- header __packed fields;
+ header packed_attribute fields;
+ // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
uint32_t words[10];
};
};
pseudo_header.fields.src_ip = *src_ip;
pseudo_header.fields.dst_ip = *dst_ip;
pseudo_header.fields.length = htonl(len);
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
memset(pseudo_header.fields.mbz, 0, sizeof(pseudo_header.fields.mbz));
pseudo_header.fields.next_header = protocol;
return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
int XskWorker::createEventfd()
{
- auto fd = ::eventfd(0, EFD_CLOEXEC);
- if (fd < 0) {
+ auto desc = ::eventfd(0, EFD_CLOEXEC);
+ if (desc < 0) {
throw runtime_error("Unable create eventfd");
}
- return fd;
+ return desc;
}
-void XskWorker::waitForXskSocket() noexcept
+void XskWorker::waitForXskSocket() const noexcept
{
uint64_t x = read(workerWaker, &x, sizeof(x));
}
-void XskWorker::notifyXskSocket() noexcept
+void XskWorker::notifyXskSocket() const
{
notify(xskSocketWaker);
}
void XskSocket::getMACFromIfName()
{
ifreq ifr{};
- auto fd = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0));
- if (fd < 0) {
+ auto desc = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0));
+ if (desc < 0) {
throw std::runtime_error("Error creating a socket to get the MAC address of interface " + ifName);
}
}
strncpy(ifr.ifr_name, ifName.c_str(), ifName.length() + 1);
- if (ioctl(fd.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) {
+ if (ioctl(desc.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) {
throw std::runtime_error("Error getting MAC address for interface " + ifName);
}
static_assert(sizeof(ifr.ifr_hwaddr.sa_data) >= std::tuple_size<decltype(source)>{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected");
memcpy(source.data(), ifr.ifr_hwaddr.sa_data, source.size());
}
-[[nodiscard]] int XskSocket::timeDifference(const timespec& t1, const timespec& t2) noexcept
+[[nodiscard]] int XskSocket::timeDifference(const timespec& lhs, const timespec& rhs) noexcept
{
- const auto res = t1.tv_sec * 1000 + t1.tv_nsec / 1000000L - (t2.tv_sec * 1000 + t2.tv_nsec / 1000000L);
+ const auto res = lhs.tv_sec * 1000 + lhs.tv_nsec / 1000000L - (rhs.tv_sec * 1000 + rhs.tv_nsec / 1000000L);
return static_cast<int>(res);
}
-void XskWorker::cleanWorkerNotification() noexcept
+void XskWorker::cleanWorkerNotification() const noexcept
{
- uint64_t x = read(xskSocketWaker, &x, sizeof(x));
+ uint64_t value = read(xskSocketWaker, &value, sizeof(value));
}
-void XskWorker::cleanSocketNotification() noexcept
+void XskWorker::cleanSocketNotification() const noexcept
{
- uint64_t x = read(workerWaker, &x, sizeof(x));
+ uint64_t value = read(workerWaker, &value, sizeof(value));
}
std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
if (!uniqueEmptyFrameOffset.empty()) {
auto offset = uniqueEmptyFrameOffset.back();
uniqueEmptyFrameOffset.pop_back();
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
return XskPacket(offset + umemBufBase, 0, frameSize);
}
fillUniqueEmptyOffset();
if (!uniqueEmptyFrameOffset.empty()) {
auto offset = uniqueEmptyFrameOffset.back();
uniqueEmptyFrameOffset.pop_back();
+ // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
return XskPacket(offset + umemBufBase, 0, frameSize);
}
return std::nullopt;
}
-void XskWorker::markAsFree(XskPacket&& packet)
+void XskWorker::markAsFree(const XskPacket& packet)
{
auto offset = frameOffset(packet);
#ifdef DEBUG_UMEM
void XskPacket::updatePacket() noexcept
{
- if (!(flags & UPDATE)) {
+ if ((flags & UPDATE) == 0U) {
return;
}
- if (!(flags & REWRITE)) {
+ if ((flags & REWRITE) == 0U) {
changeDirectAndUpdateChecksum();
}
}
class XskWorker;
class XskSocket;
-using MACAddr = std::array<uint8_t,6>;
+using MACAddr = std::array<uint8_t, 6>;
#ifdef HAVE_XSK
using XskPacketPtr = std::unique_ptr<XskPacket>;
xsk_umem* umem{nullptr};
uint8_t* bufBase{nullptr};
size_t size{0};
- void umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config);
+ void umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config);
~XskUmem();
XskUmem() = default;
};
const size_t frameNum;
// responses that have been delayed
std::priority_queue<XskPacket> waitForDelay;
- MACAddr source;
+ MACAddr source{};
const std::string ifName;
// AF_XDP socket then worker waker sockets
vector<pollfd> fds;
// simply recycled from cq after being processed by the kernel
vector<uint64_t> uniqueEmptyFrameOffset;
// completion ring: queue where sent packets are stored by the kernel
- xsk_ring_cons cq;
+ xsk_ring_cons cq{};
// rx ring: queue where the incoming packets are stored, read by XskRouter
- xsk_ring_cons rx;
+ xsk_ring_cons rx{};
// fill ring: queue where umem entries available to be filled (put into rx) are stored
- xsk_ring_prod fq;
+ xsk_ring_prod fq{};
// tx ring: queue where outgoing packets are stored
- xsk_ring_prod tx;
+ xsk_ring_prod tx{};
std::unique_ptr<xsk_socket, void (*)(xsk_socket*)> socket;
XskUmem umem;
void getMACFromIfName();
public:
- static void clearDestinationAddresses();
- static void addDestinationAddress(const ComboAddress& destination);
- static void removeDestinationAddress(const ComboAddress& destination);
+ static void clearDestinationMap(const std::string& mapPath, bool isV6);
+ static void addDestinationAddress(const std::string& mapPath, const ComboAddress& destination);
+ static void removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination);
static constexpr size_t getFrameSize()
{
return frameSize;
}
// list of free umem entries that can be reused
std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
- XskSocket(size_t frameNum, const std::string& ifName, uint32_t queue_id, const std::string& xskMapPath);
+ XskSocket(size_t frameNum, std::string ifName, uint32_t queue_id, const std::string& xskMapPath);
[[nodiscard]] int xskFd() const noexcept;
// wait until one event has occurred
[[nodiscard]] int wait(int timeout);
void send(std::vector<XskPacket>& packets);
// look at incoming packets in rx, return them if parsing succeeeded
[[nodiscard]] std::vector<XskPacket> recv(uint32_t recvSizeMax, uint32_t* failedCount);
- void addWorker(std::shared_ptr<XskWorker> s);
+ void addWorker(std::shared_ptr<XskWorker> worker);
void addWorkerRoute(const std::shared_ptr<XskWorker>& worker, const ComboAddress& dest);
void removeWorkerRoute(const ComboAddress& dest);
[[nodiscard]] std::string getMetrics() const;
- void markAsFree(XskPacket&& packet);
+ void markAsFree(const XskPacket& packet);
[[nodiscard]] const std::shared_ptr<XskWorker>& getWorkerByDescriptor(int desc) const
{
return d_workers.at(desc);
void recycle(size_t size) noexcept;
// look at delayed packets, and send the ones that are ready
void pickUpReadyPacket(std::vector<XskPacket>& packets);
- void pushDelayed(XskPacket&& packet)
+ void pushDelayed(XskPacket& packet)
{
- waitForDelay.push(std::move(packet));
+ waitForDelay.push(packet);
}
};
private:
ComboAddress from;
ComboAddress to;
- timespec sendTime;
+ timespec sendTime{};
uint8_t* frame{nullptr};
size_t frameLength{0};
size_t frameSize{0};
static int createEventfd();
static void notify(int fd);
static std::shared_ptr<XskWorker> create();
- void pushToProcessingQueue(XskPacket&& packet);
- void pushToSendQueue(XskPacket&& packet);
- void markAsFree(XskPacket&& packet);
+ void pushToProcessingQueue(XskPacket& packet);
+ void pushToSendQueue(XskPacket& packet);
+ void markAsFree(const XskPacket& packet);
// notify worker that at least one packet is available for processing
void notifyWorker() noexcept;
// notify the router that packets are ready to be sent
- void notifyXskSocket() noexcept;
- void waitForXskSocket() noexcept;
- void cleanWorkerNotification() noexcept;
- void cleanSocketNotification() noexcept;
+ void notifyXskSocket() const;
+ void waitForXskSocket() const noexcept;
+ void cleanWorkerNotification() const noexcept;
+ void cleanSocketNotification() const noexcept;
[[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
// reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
void fillUniqueEmptyOffset();