}
}
-#ifdef HAVE_XSK
-static void XskHealthCheck(std::shared_ptr<DownstreamState>& dss, std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>>& map, bool initial = false)
-{
- auto& xskInfo = dss->xskInfo;
- std::shared_ptr<HealthCheckData> data;
- auto packet = getHealthCheckPacket(dss, nullptr, data);
- data->d_initial = initial;
- setHealthCheckTime(dss, data);
- auto xskPacket = xskInfo->getEmptyFrame();
- if (!xskPacket) {
- return;
- }
- xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
- xskPacket->setPayload(packet);
- xskPacket->rewrite();
- xskInfo->pushToSendQueue(std::move(xskPacket));
- const auto queryId = data->d_queryID;
- map[queryId] = std::move(data);
-}
-#endif /* HAVE_XSK */
-
static bool processResponderPacket(std::shared_ptr<DownstreamState>& dss, PacketBuffer& response, const std::vector<DNSDistResponseRuleAction>& localRespRuleActions, const std::vector<DNSDistResponseRuleAction>& cacheInsertedRespRuleActions, InternalQueryState&& ids)
{
return true;
}
-// listens on a dedicated socket, lobs answers from downstream servers to original requestors
+#ifdef HAVE_XSK
+namespace dnsdist::xsk
+{
+static void doHealthCheck(std::shared_ptr<DownstreamState>& dss, std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>>& map, bool initial = false)
+{
+ auto& xskInfo = dss->xskInfo;
+ std::shared_ptr<HealthCheckData> data;
+ auto packet = getHealthCheckPacket(dss, nullptr, data);
+ data->d_initial = initial;
+ setHealthCheckTime(dss, data);
+ auto xskPacket = xskInfo->getEmptyFrame();
+ if (!xskPacket) {
+ return;
+ }
+ xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
+ xskPacket->setPayload(packet);
+ xskPacket->rewrite();
+ xskInfo->pushToSendQueue(std::move(xskPacket));
+ const auto queryId = data->d_queryID;
+ map[queryId] = std::move(data);
+}
+
void responderThread(std::shared_ptr<DownstreamState> dss)
{
+ if (dss->xskInfo == nullptr) {
+ throw std::runtime_error("Starting XSK responder thread for a backend without XSK!");
+ }
+
try {
- setThreadName("dnsdist/respond");
- auto localRespRuleActions = g_respruleactions.getLocal();
- auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
-#ifdef HAVE_XSK
- if (dss->xskInfo) {
- auto xskInfo = dss->xskInfo;
- auto pollfds = getPollFdsForWorker(*xskInfo);
- std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>> healthCheckMap;
- XskHealthCheck(dss, healthCheckMap, true);
- itimerspec tm;
- tm.it_value.tv_sec = dss->d_config.checkTimeout / 1000;
- tm.it_value.tv_nsec = (dss->d_config.checkTimeout % 1000) * 1000000;
- tm.it_interval = tm.it_value;
- auto res = timerfd_settime(pollfds[1].fd, 0, &tm, nullptr);
- if (res) {
- throw std::runtime_error("timerfd_settime failed:" + stringerror(errno));
- }
- const auto xskFd = xskInfo->workerWaker.getHandle();
- while (!dss->isStopped()) {
- poll(pollfds.data(), pollfds.size(), -1);
- bool needNotify = false;
- if (pollfds[0].revents & POLLIN) {
- needNotify = true;
+ setThreadName("dnsdist/XskResp");
+ auto localRespRuleActions = g_respruleactions.getLocal();
+ auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
+ auto xskInfo = dss->xskInfo;
+ auto pollfds = getPollFdsForWorker(*xskInfo);
+ std::unordered_map<uint16_t, std::shared_ptr<HealthCheckData>> healthCheckMap;
+ dnsdist::xsk::doHealthCheck(dss, healthCheckMap, true);
+ itimerspec tm;
+ tm.it_value.tv_sec = dss->d_config.checkTimeout / 1000;
+ tm.it_value.tv_nsec = (dss->d_config.checkTimeout % 1000) * 1000000;
+ tm.it_interval = tm.it_value;
+ auto res = timerfd_settime(pollfds[1].fd, 0, &tm, nullptr);
+ if (res) {
+ throw std::runtime_error("timerfd_settime failed:" + stringerror(errno));
+ }
+ const auto xskFd = xskInfo->workerWaker.getHandle();
+ while (!dss->isStopped()) {
+ poll(pollfds.data(), pollfds.size(), -1);
+ bool needNotify = false;
+ if (pollfds[0].revents & POLLIN) {
+ needNotify = true;
#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+ xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+ xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
#endif
- auto packet = XskPacketPtr(packetRaw);
- if (packet->getDataLen() < sizeof(dnsheader)) {
- xskInfo->markAsFree(std::move(packet));
- return;
- }
- const dnsheader_aligned dnsHeader(packet->getPayloadData());
- const auto queryId = dnsHeader->id;
- auto ids = dss->getState(queryId);
- if (ids) {
- if (xskFd != ids->backendFD || !ids->xskPacketHeader) {
- dss->restoreState(queryId, std::move(*ids));
- ids = std::nullopt;
- }
- }
- if (!ids) {
- // this has to go before we can refactor the duplicated response handling code
- auto iter = healthCheckMap.find(queryId);
- if (iter != healthCheckMap.end()) {
- auto data = std::move(iter->second);
- healthCheckMap.erase(iter);
- packet->cloneIntoPacketBuffer(data->d_buffer);
- data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
- }
- xskInfo->markAsFree(std::move(packet));
- return;
- }
- auto response = packet->clonePacketBuffer();
- if (response.size() > packet->getCapacity()) {
- /* fallback to sending the packet via normal socket */
- ids->xskPacketHeader.reset();
- }
- if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
- xskInfo->markAsFree(std::move(packet));
- vinfolog("XSK packet pushed to queue because processResponderPacket failed");
- return;
- }
- vinfolog("XSK packet - processResponderPacket OK");
- if (response.size() > packet->getCapacity()) {
- /* fallback to sending the packet via normal socket */
- sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
- vinfolog("XSK packet falling back because packet is too large");
- xskInfo->markAsFree(std::move(packet));
- return;
- }
- //vinfolog("XSK packet - set header");
- packet->setHeader(*ids->xskPacketHeader);
- //vinfolog("XSK packet - set payload");
- if (!packet->setPayload(response)) {
- vinfolog("Unable to set payload !");
- }
- if (ids->delayMsec > 0) {
- vinfolog("XSK packet - adding delay");
- packet->addDelay(ids->delayMsec);
- }
- //vinfolog("XSK packet - update packet");
- packet->updatePacket();
- //vinfolog("XSK packet pushed to send queue");
- xskInfo->pushToSendQueue(std::move(packet));
- });
- xskInfo->cleanSocketNotification();
- }
- if (pollfds[1].revents & POLLIN) {
- timeval now;
- gettimeofday(&now, nullptr);
- for (auto i = healthCheckMap.begin(); i != healthCheckMap.end();) {
- auto& ttd = i->second->d_ttd;
- if (ttd < now) {
- dss->submitHealthCheckResult(i->second->d_initial, false);
- i = healthCheckMap.erase(i);
- }
- else {
- ++i;
+ auto packet = XskPacketPtr(packetRaw);
+ if (packet->getDataLen() < sizeof(dnsheader)) {
+ xskInfo->markAsFree(std::move(packet));
+ return;
+ }
+ const dnsheader_aligned dnsHeader(packet->getPayloadData());
+ const auto queryId = dnsHeader->id;
+ auto ids = dss->getState(queryId);
+ if (ids) {
+ if (xskFd != ids->backendFD || !ids->xskPacketHeader) {
+ dss->restoreState(queryId, std::move(*ids));
+ ids = std::nullopt;
}
}
- needNotify = true;
- dss->updateStatisticsInfo();
- dss->handleUDPTimeouts();
- if (dss->d_nextCheck <= 1) {
- dss->d_nextCheck = dss->d_config.checkInterval;
- if (dss->d_config.availability == DownstreamState::Availability::Auto) {
- XskHealthCheck(dss, healthCheckMap);
+ if (!ids) {
+ // this has to go before we can refactor the duplicated response handling code
+ auto iter = healthCheckMap.find(queryId);
+ if (iter != healthCheckMap.end()) {
+ auto data = std::move(iter->second);
+ healthCheckMap.erase(iter);
+ packet->cloneIntoPacketBuffer(data->d_buffer);
+ data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
}
+ xskInfo->markAsFree(std::move(packet));
+ return;
+ }
+ auto response = packet->clonePacketBuffer();
+ if (response.size() > packet->getCapacity()) {
+ /* fallback to sending the packet via normal socket */
+ ids->xskPacketHeader.reset();
+ }
+ if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
+ xskInfo->markAsFree(std::move(packet));
+ vinfolog("XSK packet pushed to queue because processResponderPacket failed");
+ return;
+ }
+ vinfolog("XSK packet - processResponderPacket OK");
+ if (response.size() > packet->getCapacity()) {
+ /* fallback to sending the packet via normal socket */
+ sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
+ vinfolog("XSK packet falling back because packet is too large");
+ xskInfo->markAsFree(std::move(packet));
+ return;
+ }
+ //vinfolog("XSK packet - set header");
+ packet->setHeader(*ids->xskPacketHeader);
+ //vinfolog("XSK packet - set payload");
+ if (!packet->setPayload(response)) {
+ vinfolog("Unable to set payload !");
+ }
+ if (ids->delayMsec > 0) {
+ vinfolog("XSK packet - adding delay");
+ packet->addDelay(ids->delayMsec);
+ }
+ //vinfolog("XSK packet - update packet");
+ packet->updatePacket();
+ //vinfolog("XSK packet pushed to send queue");
+ xskInfo->pushToSendQueue(std::move(packet));
+ });
+ xskInfo->cleanSocketNotification();
+ }
+ if (pollfds[1].revents & POLLIN) {
+ timeval now;
+ gettimeofday(&now, nullptr);
+ for (auto i = healthCheckMap.begin(); i != healthCheckMap.end();) {
+ auto& ttd = i->second->d_ttd;
+ if (ttd < now) {
+ dss->submitHealthCheckResult(i->second->d_initial, false);
+ i = healthCheckMap.erase(i);
}
else {
- --dss->d_nextCheck;
+ ++i;
+ }
+ }
+ needNotify = true;
+ dss->updateStatisticsInfo();
+ dss->handleUDPTimeouts();
+ if (dss->d_nextCheck <= 1) {
+ dss->d_nextCheck = dss->d_config.checkInterval;
+ if (dss->d_config.availability == DownstreamState::Availability::Auto) {
+ doHealthCheck(dss, healthCheckMap);
}
+ }
+ else {
+ --dss->d_nextCheck;
+ }
+
+ uint64_t tmp;
+ res = read(pollfds[1].fd, &tmp, sizeof(tmp));
+ }
+ if (needNotify) {
+ xskInfo->notifyXskSocket();
+ }
+ }
+ }
+ catch (const std::exception& e) {
+ errlog("XSK responder thread died because of exception: %s", e.what());
+ }
+ catch (const PDNSException& e) {
+ errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
+ }
+ catch (...) {
+ errlog("XSK responder thread died because of an exception: %s", "unknown");
+ }
+}
+
+static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept
+{
+ const auto& from = packet.getFromAddr();
+ expectProxyProtocol = expectProxyProtocolFrom(from);
+ if (!holders.acl->match(from) && !expectProxyProtocol) {
+ vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
+ ++dnsdist::metrics::g_stats.aclDrops;
+ return false;
+ }
+ cs.queries++;
+ ++dnsdist::metrics::g_stats.queries;
- uint64_t tmp;
- res = read(pollfds[1].fd, &tmp, sizeof(tmp));
+ return true;
+}
+
+void XskRouter(std::shared_ptr<XskSocket> xsk)
+{
+ setThreadName("dnsdist/XskRouter");
+ uint32_t failed;
+ // packets to be submitted for sending
+ vector<XskPacketPtr> fillInTx;
+ const auto& fds = xsk->getDescriptors();
+ // list of workers that need to be notified
+ std::set<int> needNotify;
+ const auto& xskWakerIdx = xsk->getWorkers().get<0>();
+ const auto& destIdx = xsk->getWorkers().get<1>();
+ while (true) {
+ try {
+ auto ready = xsk->wait(-1);
+ // descriptor 0 gets incoming AF_XDP packets
+ if (fds.at(0).revents & POLLIN) {
+ auto packets = xsk->recv(64, &failed);
+ dnsdist::metrics::g_stats.nonCompliantQueries += failed;
+ for (auto &packet : packets) {
+ const auto dest = packet->getToAddr();
+ auto res = destIdx.find(dest);
+ if (res == destIdx.end()) {
+ xsk->markAsFree(std::move(packet));
+ continue;
+ }
+ res->worker->pushToProcessingQueue(std::move(packet));
+ needNotify.insert(res->workerWaker);
}
- if (needNotify) {
- xskInfo->notifyXskSocket();
+ for (auto i : needNotify) {
+ uint64_t x = 1;
+ auto written = write(i, &x, sizeof(x));
+ if (written != sizeof(x)) {
+ // oh, well, the worker is clearly overloaded
+ // but there is nothing we can do about it,
+ // and hopefully the queue will be processed eventually
+ }
+ }
+ needNotify.clear();
+ ready--;
+ }
+ const auto backup = ready;
+ for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
+ if (fds.at(fdIndex).revents & POLLIN) {
+ ready--;
+ auto& info = xskWakerIdx.find(fds.at(fdIndex).fd)->worker;
+#if defined(__SANITIZE_THREAD__)
+ info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+ info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
+ auto packet = XskPacketPtr(packetRaw);
+ if (!(packet->getFlags() & XskPacket::UPDATE)) {
+ xsk->markAsFree(std::move(packet));
+ return;
+ }
+ if (packet->getFlags() & XskPacket::DELAY) {
+ xsk->pushDelayed(std::move(packet));
+ return;
+ }
+ fillInTx.push_back(std::move(packet));
+ });
+ info->cleanWorkerNotification();
}
}
+ xsk->pickUpReadyPacket(fillInTx);
+ xsk->recycle(4096);
+ xsk->fillFq();
+ xsk->send(fillInTx);
+ ready = backup;
}
- else {
+ catch (...) {
+ vinfolog("Exception in XSK router loop");
+ }
+ }
+}
+}
#endif /* HAVE_XSK */
+
+// listens on a dedicated socket, lobs answers from downstream servers to original requestors
+void responderThread(std::shared_ptr<DownstreamState> dss)
+{
+ try {
+ setThreadName("dnsdist/respond");
+ auto localRespRuleActions = g_respruleactions.getLocal();
+ auto localCacheInsertedRespRuleActions = g_cacheInsertedRespRuleActions.getLocal();
const size_t initialBufferSize = getInitialUDPPacketBufferSize(false);
/* allocate one more byte so we can detect truncation */
PacketBuffer response(initialBufferSize + 1);
vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->d_config.remote.toStringWithPort(), queryId, e.what());
}
}
-#ifdef HAVE_XSK
- }
-#endif /* HAVE_XSK */
}
catch (const std::exception& e) {
errlog("UDP responder thread died because of exception: %s", e.what());
return true;
}
-#ifdef HAVE_XSK
-static bool isXskQueryAcceptable(const XskPacket& packet, ClientState& cs, LocalHolders& holders, bool& expectProxyProtocol) noexcept
-{
- const auto& from = packet.getFromAddr();
- expectProxyProtocol = expectProxyProtocolFrom(from);
- if (!holders.acl->match(from) && !expectProxyProtocol) {
- vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
- ++dnsdist::metrics::g_stats.aclDrops;
- return false;
- }
- cs.queries++;
- ++dnsdist::metrics::g_stats.queries;
-
- return true;
-}
-#endif /* HAVE_XSK */
-
bool checkDNSCryptQuery(const ClientState& cs, PacketBuffer& query, std::unique_ptr<DNSCryptQuery>& dnsCryptQuery, time_t now, bool tcp)
{
if (cs.dnscryptCtx) {
}
#ifdef HAVE_XSK
+namespace dnsdist::xsk
+{
static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet)
{
uint16_t queryId = 0;
}
return false;
}
+
+static void xskClientThread(ClientState* cs)
+{
+ setThreadName("dnsdist/xskClient");
+ auto xskInfo = cs->xskInfo;
+ LocalHolders holders;
+
+ for (;;) {
+#if defined(__SANITIZE_THREAD__)
+ while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
+#else
+ while (!xskInfo->incomingPacketsQueue.read_available()) {
+#endif
+ xskInfo->waitForXskSocket();
+ }
+#if defined(__SANITIZE_THREAD__)
+ xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+#else
+ xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+#endif
+ auto packet = XskPacketPtr(packetRaw);
+ if (ProcessXskQuery(*cs, holders, *packet)) {
+ packet->updatePacket();
+ xskInfo->pushToSendQueue(std::move(packet));
+ }
+ else {
+ xskInfo->markAsFree(std::move(packet));
+ }
+ });
+ xskInfo->notifyXskSocket();
+ }
+}
+}
#endif /* HAVE_XSK */
#ifndef DISABLE_RECVMMSG
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
#endif /* DISABLE_RECVMMSG */
-#ifdef HAVE_XSK
-static void xskClientThread(ClientState* cs)
-{
- setThreadName("dnsdist/xskClient");
- auto xskInfo = cs->xskInfo;
- LocalHolders holders;
-
- for (;;) {
-#if defined(__SANITIZE_THREAD__)
- while (!xskInfo->incomingPacketsQueue.lock()->read_available()) {
-#else
- while (!xskInfo->incomingPacketsQueue.read_available()) {
-#endif
- xskInfo->waitForXskSocket();
- }
-#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
-#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
-#endif
- auto packet = XskPacketPtr(packetRaw);
- if (ProcessXskQuery(*cs, holders, *packet)) {
- packet->updatePacket();
- xskInfo->pushToSendQueue(std::move(packet));
- }
- else {
- xskInfo->markAsFree(std::move(packet));
- }
- });
- xskInfo->notifyXskSocket();
- }
-}
-#endif /* HAVE_XSK */
-
// listens to incoming queries, sends out to downstream servers, noting the intended return path
static void udpClientThread(std::vector<ClientState*> states)
{
}
}
-#ifdef HAVE_XSK
-void XskRouter(std::shared_ptr<XskSocket> xsk);
-#endif /* HAVE_XSK */
-
namespace dnsdist
{
static void startFrontends()
{
#ifdef HAVE_XSK
for (auto& xskContext : g_xsk) {
- std::thread xskThread(XskRouter, std::move(xskContext));
+ std::thread xskThread(dnsdist::xsk::XskRouter, std::move(xskContext));
xskThread.detach();
}
#endif /* HAVE_XSK */
for (auto& clientState : g_frontends) {
#ifdef HAVE_XSK
if (clientState->xskInfo) {
- std::thread xskCT(xskClientThread, clientState.get());
+ std::thread xskCT(dnsdist::xsk::xskClientThread, clientState.get());
if (!clientState->cpus.empty()) {
mapThreadToCPUList(xskCT.native_handle(), clientState->cpus);
}
#endif
}
}
-
-#ifdef HAVE_XSK
-void XskRouter(std::shared_ptr<XskSocket> xsk)
-{
- setThreadName("dnsdist/XskRouter");
- uint32_t failed;
- // packets to be submitted for sending
- vector<XskPacketPtr> fillInTx;
- const auto size = xsk->fds.size();
- // list of workers that need to be notified
- std::set<int> needNotify;
- const auto& xskWakerIdx = xsk->workers.get<0>();
- const auto& destIdx = xsk->workers.get<1>();
- while (true) {
- try {
- auto ready = xsk->wait(-1);
- // descriptor 0 gets incoming AF_XDP packets
- if (xsk->fds[0].revents & POLLIN) {
- auto packets = xsk->recv(64, &failed);
- dnsdist::metrics::g_stats.nonCompliantQueries += failed;
- for (auto &packet : packets) {
- const auto dest = packet->getToAddr();
- auto res = destIdx.find(dest);
- if (res == destIdx.end()) {
- xsk->markAsFree(std::move(packet));
- continue;
- }
- res->worker->pushToProcessingQueue(std::move(packet));
- needNotify.insert(res->workerWaker);
- }
- for (auto i : needNotify) {
- uint64_t x = 1;
- auto written = write(i, &x, sizeof(x));
- if (written != sizeof(x)) {
- // oh, well, the worker is clearly overloaded
- // but there is nothing we can do about it,
- // and hopefully the queue will be processed eventually
- }
- }
- needNotify.clear();
- ready--;
- }
- const auto backup = ready;
- for (size_t i = 1; i < size && ready > 0; i++) {
- if (xsk->fds[i].revents & POLLIN) {
- ready--;
- auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker;
-#if defined(__SANITIZE_THREAD__)
- info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
-#else
- info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
-#endif
- auto packet = XskPacketPtr(packetRaw);
- if (!(packet->getFlags() & XskPacket::UPDATE)) {
- xsk->markAsFree(std::move(packet));
- return;
- }
- if (packet->getFlags() & XskPacket::DELAY) {
- xsk->waitForDelay.push(std::move(packet));
- return;
- }
- fillInTx.push_back(std::move(packet));
- });
- info->cleanWorkerNotification();
- }
- }
- xsk->pickUpReadyPacket(fillInTx);
- xsk->recycle(4096);
- xsk->fillFq();
- xsk->send(fillInTx);
- ready = backup;
- }
- catch (...) {
- vinfolog("Exception in XSK router loop");
- }
- }
-}
-#endif /* HAVE_XSK */
~XskUmem();
XskUmem() = default;
};
- boost::multi_index_container<
+ using WorkerContainer = boost::multi_index_container<
XskRouteInfo,
boost::multi_index::indexed_by<
boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, int, &XskRouteInfo::xskSocketWaker>>,
- boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, ComboAddress, &XskRouteInfo::dest>, ComboAddress::addressPortOnlyHash>>>
- workers;
+ boost::multi_index::hashed_unique<boost::multi_index::member<XskRouteInfo, ComboAddress, &XskRouteInfo::dest>, ComboAddress::addressPortOnlyHash>>>;
+ WorkerContainer workers;
// number of frames to keep in sharedEmptyFrameOffset
static constexpr size_t holdThreshold = 256;
// number of frames to insert into the fill queue
static constexpr size_t frameSize = 2048;
// number of entries (frames) in the umem
const size_t frameNum;
- // ID of the network queue
- const uint32_t queueId;
// responses that have been delayed
std::priority_queue<XskPacketPtr> waitForDelay;
const std::string ifName;
xsk_ring_prod tx;
std::unique_ptr<xsk_socket, void (*)(xsk_socket*)> socket;
XskUmem umem;
- bpf_object* prog;
static constexpr uint32_t fqCapacity = XSK_RING_PROD__DEFAULT_NUM_DESCS * 4;
static constexpr uint32_t cqCapacity = XSK_RING_CONS__DEFAULT_NUM_DESCS * 4;
constexpr static bool isPowOfTwo(uint32_t value) noexcept;
[[nodiscard]] static int timeDifference(const timespec& t1, const timespec& t2) noexcept;
- friend void XskRouter(std::shared_ptr<XskSocket> xsk);
[[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
[[nodiscard]] int firstTimeout();
- // pick ups available frames from uniqueEmptyFrameOffset
- // insert entries from uniqueEmptyFrameOffset into fq
- void fillFq(uint32_t fillSize = fillThreshold) noexcept;
- // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
- void recycle(size_t size) noexcept;
void getMACFromIfName();
- // look at delayed packets, and send the ones that are ready
- void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
public:
static constexpr size_t getFrameSize()
void addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest);
[[nodiscard]] std::string getMetrics() const;
void markAsFree(XskPacketPtr&& packet);
+ [[nodiscard]] WorkerContainer& getWorkers()
+ {
+ return workers;
+ }
+ [[nodiscard]] const std::vector<pollfd>& getDescriptors() const
+ {
+ return fds;
+ }
+ // pick ups available frames from uniqueEmptyFrameOffset
+ // insert entries from uniqueEmptyFrameOffset into fq
+ void fillFq(uint32_t fillSize = fillThreshold) noexcept;
+ // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
+ void recycle(size_t size) noexcept;
+ // look at delayed packets, and send the ones that are ready
+ void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
+ void pushDelayed(XskPacketPtr&& packet)
+ {
+ waitForDelay.push(std::move(packet));
+ }
};
struct iphdr;
void setIPv6Header(const ipv6hdr& ipv6Header) noexcept;
[[nodiscard]] udphdr getUDPHeader() const noexcept;
void setUDPHeader(const udphdr& udpHeader) noexcept;
- // parse IP and UDP payloads
- bool parse(bool fromSetHeader);
void changeDirectAndUpdateChecksum() noexcept;
- friend XskSocket;
- friend XskWorker;
- friend bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
-
constexpr static uint8_t DefaultTTL = 64;
public:
XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize);
void addDelay(int relativeMilliseconds) noexcept;
void updatePacket() noexcept;
+ // parse IP and UDP payloads
+ bool parse(bool fromSetHeader);
[[nodiscard]] uint32_t getFlags() const noexcept;
+ [[nodiscard]] timespec getSendTime() const noexcept
+ {
+ return sendTime;
+ }
+ [[nodiscard]] uint64_t getFrameOffsetFrom(const uint8_t* base) const noexcept
+ {
+ return frame - base;
+ }
};
bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
void waitForXskSocket() noexcept;
void cleanWorkerNotification() noexcept;
void cleanSocketNotification() noexcept;
- [[nodiscard]] uint64_t frameOffset(const XskPacket& s) const noexcept;
+ [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
// reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
void fillUniqueEmptyOffset();
// look for an empty umem entry in uniqueEmptyFrameOffset