InternalQueryState(const InternalQueryState& orig) = delete;
InternalQueryState& operator=(const InternalQueryState& orig) = delete;
+ bool isXSK() const noexcept
+ {
+#ifdef HAVE_XSK
+ return !xskPacketHeader.empty();
+#else
+ return false;
+#endif /* HAVE_XSK */
+ }
+
boost::optional<Netmask> subnet{boost::none}; // 40
ComboAddress origRemote; // 28
ComboAddress origDest; // 28
std::string poolName; // 24
StopWatch queryRealTime{true}; // 24
std::shared_ptr<DNSDistPacketCache> packetCache{nullptr}; // 16
+#ifdef HAVE_XSK
+ PacketBuffer xskPacketHeader; // 8
+#endif /* HAVE_XSK */
std::unique_ptr<DNSCryptQuery> dnsCryptQuery{nullptr}; // 8
std::unique_ptr<QTag> qTag{nullptr}; // 8
std::unique_ptr<PacketBuffer> d_packet{nullptr}; // Initial packet, so we can restart the query from the response path if needed // 8
std::unique_ptr<ProtoBufData> d_protoBufData{nullptr};
std::unique_ptr<EDNSExtendedError> d_extendedError{nullptr};
- std::unique_ptr<PacketBuffer> xskPacketHeader; // 8
boost::optional<uint32_t> tempFailureTTL{boost::none}; // 8
ClientState* cs{nullptr}; // 8
std::unique_ptr<DOHUnitInterface> du; // 8
}
bool muted = true;
- if (ids.cs && !ids.cs->muted && !ids.xskPacketHeader) {
+ if (ids.cs && !ids.cs->muted && !ids.isXSK()) {
sendUDPResponse(ids.cs->udpFD, response, dr.ids.delayMsec, ids.hopLocal, ids.hopRemote);
muted = false;
}
vinfolog("Got answer from %s, relayed to %s (UDP), took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
}
else {
- if (!ids.xskPacketHeader) {
+ if (!ids.isXSK()) {
vinfolog("Got answer from %s, NOT relayed to %s (UDP) since that frontend is muted, took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff);
}
else {
xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
xskPacket->setPayload(packet);
xskPacket->rewrite();
- xskInfo->pushToSendQueue(std::move(xskPacket));
+ xskInfo->pushToSendQueue(std::move(*xskPacket));
const auto queryId = data->d_queryID;
map[queryId] = std::move(data);
}
if (pollfds[0].revents & POLLIN) {
needNotify = true;
#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+ xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+ xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
#endif
- auto packet = XskPacketPtr(packetRaw);
- if (packet->getDataLen() < sizeof(dnsheader)) {
+ if (packet.getDataLen() < sizeof(dnsheader)) {
xskInfo->markAsFree(std::move(packet));
return;
}
- const dnsheader_aligned dnsHeader(packet->getPayloadData());
+ const dnsheader_aligned dnsHeader(packet.getPayloadData());
const auto queryId = dnsHeader->id;
auto ids = dss->getState(queryId);
if (ids) {
- if (xskFd != ids->backendFD || !ids->xskPacketHeader) {
+ if (xskFd != ids->backendFD || !ids->isXSK()) {
dss->restoreState(queryId, std::move(*ids));
ids = std::nullopt;
}
if (iter != healthCheckMap.end()) {
auto data = std::move(iter->second);
healthCheckMap.erase(iter);
- packet->cloneIntoPacketBuffer(data->d_buffer);
+ packet.cloneIntoPacketBuffer(data->d_buffer);
data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
}
xskInfo->markAsFree(std::move(packet));
return;
}
- auto response = packet->clonePacketBuffer();
- if (response.size() > packet->getCapacity()) {
+ auto response = packet.clonePacketBuffer();
+ if (response.size() > packet.getCapacity()) {
/* fallback to sending the packet via normal socket */
- ids->xskPacketHeader.reset();
+ ids->xskPacketHeader.clear();
}
if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
xskInfo->markAsFree(std::move(packet));
return;
}
vinfolog("XSK packet - processResponderPacket OK");
- if (response.size() > packet->getCapacity()) {
+ if (response.size() > packet.getCapacity()) {
/* fallback to sending the packet via normal socket */
sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
vinfolog("XSK packet falling back because packet is too large");
return;
}
//vinfolog("XSK packet - set header");
- packet->setHeader(*ids->xskPacketHeader);
+ packet.setHeader(ids->xskPacketHeader);
//vinfolog("XSK packet - set payload");
- if (!packet->setPayload(response)) {
+ if (!packet.setPayload(response)) {
vinfolog("Unable to set payload !");
}
if (ids->delayMsec > 0) {
vinfolog("XSK packet - adding delay");
- packet->addDelay(ids->delayMsec);
+ packet.addDelay(ids->delayMsec);
}
//vinfolog("XSK packet - update packet");
- packet->updatePacket();
+ packet.updatePacket();
//vinfolog("XSK packet pushed to send queue");
xskInfo->pushToSendQueue(std::move(packet));
});
setThreadName("dnsdist/XskRouter");
uint32_t failed;
// packets to be submitted for sending
- vector<XskPacketPtr> fillInTx;
+ vector<XskPacket> fillInTx;
const auto& fds = xsk->getDescriptors();
// list of workers that need to be notified
std::set<int> needNotify;
auto packets = xsk->recv(64, &failed);
dnsdist::metrics::g_stats.nonCompliantQueries += failed;
for (auto &packet : packets) {
- const auto dest = packet->getToAddr();
+ const auto dest = packet.getToAddr();
auto res = destIdx.find(dest);
if (res == destIdx.end()) {
xsk->markAsFree(std::move(packet));
ready--;
auto& info = xskWakerIdx.find(fds.at(fdIndex).fd)->worker;
#if defined(__SANITIZE_THREAD__)
- info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+ info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
#else
- info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+ info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
#endif
- auto packet = XskPacketPtr(packetRaw);
- if (!(packet->getFlags() & XskPacket::UPDATE)) {
+ if (!(packet.getFlags() & XskPacket::UPDATE)) {
xsk->markAsFree(std::move(packet));
return;
}
- if (packet->getFlags() & XskPacket::DELAY) {
+ if (packet.getFlags() & XskPacket::DELAY) {
xsk->pushDelayed(std::move(packet));
return;
}
continue;
}
- if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->xskPacketHeader && ids->cs->xskInfo) {
+ if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) {
#ifdef HAVE_XSK
//vinfolog("processResponderPacket OK");
auto& xskInfo = ids->cs->xskInfo;
continue;
}
//vinfolog("XSK setHeader");
- xskPacket->setHeader(*ids->xskPacketHeader);
+ xskPacket->setHeader(ids->xskPacketHeader);
//vinfolog("XSK payload");
xskPacket->setPayload(response);
//vinfolog("XSK update packet");
xskPacket->updatePacket();
//vinfolog("XSK pushed to send queue");
- xskInfo->pushToSendQueue(std::move(xskPacket));
+ xskInfo->pushToSendQueue(std::move(*xskPacket));
xskInfo->notifyXskSocket();
#endif /* HAVE_XSK */
}
xskInfo->waitForXskSocket();
}
#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) {
+ xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) {
+ xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
#endif
- auto packet = XskPacketPtr(packetRaw);
- if (ProcessXskQuery(*cs, holders, *packet)) {
- packet->updatePacket();
+ if (ProcessXskQuery(*cs, holders, packet)) {
+ packet.updatePacket();
xskInfo->pushToSendQueue(std::move(packet));
}
else {
#include "gettime.hh"
#include "xsk.hh"
-#define DEBUG_UMEM 0
#ifdef DEBUG_UMEM
namespace {
struct UmemEntryStatus
}
timespec now;
gettime(&now);
- const auto& firstTime = waitForDelay.top()->getSendTime();
+ const auto& firstTime = waitForDelay.top().getSendTime();
const auto res = timeDifference(now, firstTime);
if (res <= 0) {
return 0;
return xsk_socket__fd(socket.get());
}
-void XskSocket::send(std::vector<XskPacketPtr>& packets)
+void XskSocket::send(std::vector<XskPacket>& packets)
{
while (packets.size() > 0) {
auto packetSize = packets.size();
break;
}
*xsk_ring_prod__tx_desc(&tx, idx++) = {
- .addr = frameOffset(*packet),
- .len = packet->getFrameLen(),
+ .addr = frameOffset(packet),
+ .len = packet.getFrameLen(),
.options = 0};
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue);
#endif /* DEBUG_UMEM */
queued++;
}
}
}
-std::vector<XskPacketPtr> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount)
+std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount)
{
uint32_t idx{0};
- std::vector<XskPacketPtr> res;
+ std::vector<XskPacket> res;
// how many descriptors to packets have been filled
const auto recvSize = xsk_ring_cons__peek(&rx, recvSizeMax, &idx);
if (recvSize == 0) {
for (; processed < recvSize; processed++) {
try {
const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++);
- auto ptr = std::make_unique<XskPacket>(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
+ XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
#ifdef DEBUG_UMEM
- checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*ptr), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
+ checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
#endif /* DEBUG_UMEM */
- if (!ptr->parse(false)) {
+ if (!packet.parse(false)) {
++failed;
- markAsFree(std::move(ptr));
+ markAsFree(std::move(packet));
}
else {
- res.push_back(std::move(ptr));
+ res.push_back(std::move(packet));
}
}
catch (const std::exception& exp) {
return res;
}
-void XskSocket::pickUpReadyPacket(std::vector<XskPacketPtr>& packets)
+void XskSocket::pickUpReadyPacket(std::vector<XskPacket>& packets)
{
timespec now;
gettime(&now);
- while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top()->getSendTime()) <= 0) {
- auto& top = const_cast<XskPacketPtr&>(waitForDelay.top());
+ while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top().getSendTime()) <= 0) {
+ auto& top = const_cast<XskPacket&>(waitForDelay.top());
packets.push_back(std::move(top));
waitForDelay.pop();
}
return ret.str();
}
-void XskSocket::markAsFree(XskPacketPtr&& packet)
+void XskSocket::markAsFree(XskPacket&& packet)
{
- auto offset = frameOffset(*packet);
+ auto offset = frameOffset(packet);
#ifdef DEBUG_UMEM
checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
#endif /* DEBUG_UMEM */
uniqueEmptyFrameOffset.push_back(offset);
- packet.release();
}
XskSocket::XskUmem::~XskUmem()
sendTime.tv_nsec %= 1000000000L;
}
-bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept
+bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept
{
- return s1->getSendTime() < s2->getSendTime();
+ return s1.getSendTime() < s2.getSendTime();
}
const ComboAddress& XskPacket::getFromAddr() const noexcept
{
}
-void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet)
+void XskWorker::pushToProcessingQueue(XskPacket&& packet)
{
- auto raw = packet.release();
#if defined(__SANITIZE_THREAD__)
- if (!incomingPacketsQueue.lock()->push(std::move(raw))) {
+ if (!incomingPacketsQueue.lock()->push(std::move(packet))) {
#else
- if (!incomingPacketsQueue.push(std::move(raw))) {
+ if (!incomingPacketsQueue.push(std::move(packet))) {
#endif
- markAsFree(XskPacketPtr(raw));
+ markAsFree(std::move(packet));
}
}
-void XskWorker::pushToSendQueue(XskPacketPtr&& packet)
+void XskWorker::pushToSendQueue(XskPacket&& packet)
{
- auto raw = packet.release();
#if defined(__SANITIZE_THREAD__)
- if (!outgoingPacketsQueue.lock()->push(raw)) {
+ if (!outgoingPacketsQueue.lock()->push(std::move(packet))) {
#else
- if (!outgoingPacketsQueue.push(raw)) {
+ if (!outgoingPacketsQueue.push(std::move(packet))) {
#endif
- markAsFree(XskPacketPtr(raw));
+ markAsFree(std::move(packet));
}
}
return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
}
-void XskPacket::setHeader(const PacketBuffer& buf)
+void XskPacket::setHeader(PacketBuffer& buf)
{
memcpy(frame, buf.data(), buf.size());
frameLength = buf.size();
+ buf.clear();
flags = 0;
if (!parse(true)) {
throw std::runtime_error("Error setting the XSK frame header");
}
}
-std::unique_ptr<PacketBuffer> XskPacket::cloneHeadertoPacketBuffer() const
+PacketBuffer XskPacket::cloneHeadertoPacketBuffer() const
{
const auto size = getFrameLen() - getDataSize();
- auto tmp = std::make_unique<PacketBuffer>(size);
- memcpy(tmp->data(), frame, size);
+ PacketBuffer tmp(size);
+ memcpy(tmp.data(), frame, size);
return tmp;
}
}
}
-XskPacketPtr XskWorker::getEmptyFrame()
+std::optional<XskPacket> XskWorker::getEmptyFrame()
{
if (!uniqueEmptyFrameOffset.empty()) {
auto offset = uniqueEmptyFrameOffset.back();
uniqueEmptyFrameOffset.pop_back();
- return std::make_unique<XskPacket>(offset + umemBufBase, 0, frameSize);
+ return XskPacket(offset + umemBufBase, 0, frameSize);
}
fillUniqueEmptyOffset();
if (!uniqueEmptyFrameOffset.empty()) {
auto offset = uniqueEmptyFrameOffset.back();
uniqueEmptyFrameOffset.pop_back();
- return std::make_unique<XskPacket>(offset + umemBufBase, 0, frameSize);
+ return XskPacket(offset + umemBufBase, 0, frameSize);
}
- return nullptr;
+ return std::nullopt;
}
-void XskWorker::markAsFree(XskPacketPtr&& packet)
+void XskWorker::markAsFree(XskPacket&& packet)
{
-
- auto offset = frameOffset(*packet);
+ auto offset = frameOffset(packet);
#ifdef DEBUG_UMEM
checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
#endif /* DEBUG_UMEM */
uniqueEmptyFrameOffset.push_back(offset);
- packet.release();
}
uint32_t XskPacket::getFlags() const noexcept
// number of entries (frames) in the umem
const size_t frameNum;
// responses that have been delayed
- std::priority_queue<XskPacketPtr> waitForDelay;
+ std::priority_queue<XskPacket> waitForDelay;
const std::string ifName;
const std::string poolName;
// AF_XDP socket then worker waker sockets
// wait until one event has occurred
[[nodiscard]] int wait(int timeout);
// add as many packets as possible to the rx queue for sending */
- void send(std::vector<XskPacketPtr>& packets);
+ void send(std::vector<XskPacket>& packets);
// look at incoming packets in rx, return them if parsing succeeeded
- [[nodiscard]] std::vector<XskPacketPtr> recv(uint32_t recvSizeMax, uint32_t* failedCount);
+ [[nodiscard]] std::vector<XskPacket> recv(uint32_t recvSizeMax, uint32_t* failedCount);
void addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest);
[[nodiscard]] std::string getMetrics() const;
- void markAsFree(XskPacketPtr&& packet);
+ void markAsFree(XskPacket&& packet);
[[nodiscard]] WorkerContainer& getWorkers()
{
return workers;
// picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset
void recycle(size_t size) noexcept;
// look at delayed packets, and send the ones that are ready
- void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
- void pushDelayed(XskPacketPtr&& packet)
+ void pickUpReadyPacket(std::vector<XskPacket>& packets);
+ void pushDelayed(XskPacket&& packet)
{
waitForDelay.push(std::move(packet));
}
[[nodiscard]] uint32_t getFrameLen() const noexcept;
[[nodiscard]] PacketBuffer clonePacketBuffer() const;
void cloneIntoPacketBuffer(PacketBuffer& buffer) const;
- [[nodiscard]] std::unique_ptr<PacketBuffer> cloneHeadertoPacketBuffer() const;
+ [[nodiscard]] PacketBuffer cloneHeadertoPacketBuffer() const;
void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept;
bool setPayload(const PacketBuffer& buf);
void rewrite() noexcept;
- void setHeader(const PacketBuffer& buf);
+ void setHeader(PacketBuffer& buf);
XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize);
void addDelay(int relativeMilliseconds) noexcept;
void updatePacket() noexcept;
return frame - base;
}
};
-bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
+bool operator<(const XskPacket& s1, const XskPacket& s2) noexcept;
/* g++ defines __SANITIZE_THREAD__
clang++ supports the nice __has_feature(thread_sanitizer),
class XskWorker
{
#if defined(__SANITIZE_THREAD__)
- using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS*2>>>;
+ using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS*2>>>;
#else
- using XskPacketRing = boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS*2>>;
+ using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS*2>>;
#endif
public:
static int createEventfd();
static void notify(int fd);
static std::shared_ptr<XskWorker> create();
- void pushToProcessingQueue(XskPacketPtr&& packet);
- void pushToSendQueue(XskPacketPtr&& packet);
- void markAsFree(XskPacketPtr&& packet);
+ void pushToProcessingQueue(XskPacket&& packet);
+ void pushToSendQueue(XskPacket&& packet);
+ void markAsFree(XskPacket&& packet);
// notify worker that at least one packet is available for processing
void notifyWorker() noexcept;
// notify the router that packets are ready to be sent
void fillUniqueEmptyOffset();
// look for an empty umem entry in uniqueEmptyFrameOffset
// then sharedEmptyFrameOffset if needed
- XskPacketPtr getEmptyFrame();
+ std::optional<XskPacket> getEmptyFrame();
};
std::vector<pollfd> getPollFdsForWorker(XskWorker& info);
#else