d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
for (auto& xsk : d_xskSockets) {
- auto xskInfo = XskWorker::create();
+ auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
d_xskInfos.push_back(xskInfo);
xsk->addWorker(xskInfo);
- xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
+ xskInfo->setSharedFrames(xsk->sharedEmptyFrameOffset);
}
reconnect(false);
}
std::shared_ptr<XskSocket> socket;
parseXskVars(vars, socket);
if (socket) {
- udpCS->xskInfo = XskWorker::create();
- udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
+ udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
+ udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset);
socket->addWorker(udpCS->xskInfo);
socket->addWorkerRoute(udpCS->xskInfo, loc);
+ udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly);
+ udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset);
+ socket->addWorker(udpCS->xskInfoResponder);
vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
}
#endif /* HAVE_XSK */
std::shared_ptr<XskSocket> socket;
parseXskVars(vars, socket);
if (socket) {
- udpCS->xskInfo = XskWorker::create();
- udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
+ udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional);
+ udpCS->xskInfo->setSharedFrames(socket->sharedEmptyFrameOffset);
socket->addWorker(udpCS->xskInfo);
socket->addWorkerRoute(udpCS->xskInfo, loc);
+ udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly);
+ udpCS->xskInfoResponder->setSharedFrames(socket->sharedEmptyFrameOffset);
+ socket->addWorker(udpCS->xskInfoResponder);
vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
}
#endif /* HAVE_XSK */
if ((pollfds[0].revents & POLLIN) != 0) {
needNotify = true;
xskInfo->cleanSocketNotification();
-#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+ xskInfo->processIncomingFrames([&](XskPacket& packet) {
if (packet.getDataLen() < sizeof(dnsheader)) {
xskInfo->markAsFree(packet);
return;
}
if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
xskInfo->markAsFree(packet);
- infolog("XSK packet pushed to queue because processResponderPacket failed");
+ vinfolog("XSK packet dropped because processResponderPacket failed");
return;
}
if (response.size() > packet.getCapacity()) {
if ((fds.at(fdIndex).revents & POLLIN) != 0) {
ready--;
const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
-#if defined(__SANITIZE_THREAD__)
- info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
- info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+ info->processOutgoingFrames([&](XskPacket& packet) {
if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
xsk->markAsFree(packet);
return;
LocalHolders holders;
for (;;) {
-#if defined(__SANITIZE_THREAD__)
- while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) {
-#else
- while (xskInfo->incomingPacketsQueue.read_available() == 0U) {
-#endif
+ while (!xskInfo->hasIncomingFrames()) {
xskInfo->waitForXskSocket();
}
-#if defined(__SANITIZE_THREAD__)
- xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
-#else
- xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
-#endif
+ xskInfo->processIncomingFrames([&](XskPacket& packet) {
if (XskProcessQuery(*clientState, holders, packet)) {
packet.updatePacket();
xskInfo->pushToSendQueue(packet);
continue;
}
- if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) {
+ if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfoResponder) {
#ifdef HAVE_XSK
- auto& xskInfo = ids->cs->xskInfo;
+ auto& xskInfo = ids->cs->xskInfoResponder;
auto xskPacket = xskInfo->getEmptyFrame();
if (!xskPacket) {
continue;
std::shared_ptr<DOH3Frontend> doh3Frontend{nullptr};
std::shared_ptr<BPFFilter> d_filter{nullptr};
std::shared_ptr<XskWorker> xskInfo{nullptr};
+ std::shared_ptr<XskWorker> xskInfoResponder{nullptr};
size_t d_maxInFlightQueriesPerConn{1};
size_t d_tcpConcurrentConnectionsLimit{0};
int udpFD{-1};
void XskSocket::fillFq(uint32_t fillSize) noexcept
{
- {
+ if (uniqueEmptyFrameOffset.size() < fillSize) {
+ auto frames = sharedEmptyFrameOffset->lock();
+ const auto moveSize = std::min(static_cast<size_t>(fillSize), frames->size());
+ if (moveSize > 0) {
+ // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
+ uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
+ frames->resize(frames->size() - moveSize);
+ }
+ }
+ else if (uniqueEmptyFrameOffset.size() > (10 * fillSize)) {
// if we have less than holdThreshold frames in the shared queue (which might be an issue
// when the XskWorker needs empty frames), move frames from the unique container into the
// shared one. This might not be optimal right now.
}
}
- if (uniqueEmptyFrameOffset.size() < fillSize) {
+ fillSize = std::min(fillSize, static_cast<uint32_t>(uniqueEmptyFrameOffset.size()));
+ if (fillSize == 0) {
+ auto frames = sharedEmptyFrameOffset->lock();
return;
}
}
}
catch (const std::exception& exp) {
- std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl;
+ ++failed;
+ ++processed;
break;
}
catch (...) {
- std::cerr << "Exception while processing the XSK RX queue" << std::endl;
+ ++failed;
+ ++processed;
break;
}
}
}
}
-XskWorker::XskWorker() :
- workerWaker(createEventfd()), xskSocketWaker(createEventfd())
+XskWorker::XskWorker(XskWorker::Type type) :
+ d_type(type), workerWaker(createEventfd()), xskSocketWaker(createEventfd())
{
}
void XskWorker::pushToProcessingQueue(XskPacket& packet)
{
-#if defined(__SANITIZE_THREAD__)
- if (!incomingPacketsQueue.lock()->push(packet)) {
-#else
- if (!incomingPacketsQueue.push(packet)) {
-#endif
+ if (d_type == Type::OutgoingOnly) {
+ throw std::runtime_error("Trying to push an incoming packet into an outgoing-only XSK Worker");
+ }
+ if (!d_incomingPacketsQueue.push(packet)) {
markAsFree(packet);
}
}
void XskWorker::pushToSendQueue(XskPacket& packet)
{
-#if defined(__SANITIZE_THREAD__)
- if (!outgoingPacketsQueue.lock()->push(packet)) {
-#else
- if (!outgoingPacketsQueue.push(packet)) {
-#endif
+ if (!d_outgoingPacketsQueue.push(packet)) {
markAsFree(packet);
}
}
/* needed to get the correct checksum */
setIPv4Header(ipHeader);
setUDPHeader(udpHeader);
- udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
+ //udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
rewriteIpv4Header(&ipHeader, getFrameLen());
setIPv4Header(ipHeader);
setUDPHeader(udpHeader);
notify(xskSocketWaker);
}
-std::shared_ptr<XskWorker> XskWorker::create()
+std::shared_ptr<XskWorker> XskWorker::create(Type type)
{
- return std::make_shared<XskWorker>();
+ return std::make_shared<XskWorker>(type);
}
void XskSocket::addWorker(std::shared_ptr<XskWorker> worker)
{
const auto socketWaker = worker->xskSocketWaker.getHandle();
- worker->umemBufBase = umem.bufBase;
+ worker->setUmemBufBase(umem.bufBase);
d_workers.insert({socketWaker, std::move(worker)});
fds.push_back(pollfd{
.fd = socketWaker,
d_workerRoutes.lock()->erase(dest);
}
+void XskWorker::setSharedFrames(std::shared_ptr<LockGuarded<vector<uint64_t>>>& frames)
+{
+ d_sharedEmptyFrameOffset = frames;
+}
+
+void XskWorker::setUmemBufBase(uint8_t* base)
+{
+ d_umemBufBase = base;
+}
+
uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept
{
- return packet.getFrameOffsetFrom(umemBufBase);
+ return packet.getFrameOffsetFrom(d_umemBufBase);
}
void XskWorker::notifyWorker() const
notify(workerWaker);
}
+bool XskWorker::hasIncomingFrames()
+{
+ if (d_type == Type::OutgoingOnly) {
+ throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+ }
+
+ return d_incomingPacketsQueue.read_available() != 0U;
+}
+
+void XskWorker::processIncomingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+ if (d_type == Type::OutgoingOnly) {
+ throw std::runtime_error("Looking for incoming packets in an outgoing-only XSK Worker");
+ }
+
+ d_incomingPacketsQueue.consume_all(callback);
+}
+
+void XskWorker::processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback)
+{
+ d_outgoingPacketsQueue.consume_all(callback);
+}
+
void XskSocket::getMACFromIfName()
{
ifreq ifr{};
std::optional<XskPacket> XskWorker::getEmptyFrame()
{
- auto frames = sharedEmptyFrameOffset->lock();
+ auto frames = d_sharedEmptyFrameOffset->lock();
if (frames->empty()) {
return std::nullopt;
}
auto offset = frames->back();
frames->pop_back();
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
- return XskPacket(offset + umemBufBase, 0, frameSize);
+ return XskPacket(offset + d_umemBufBase, 0, d_frameSize);
}
void XskWorker::markAsFree(const XskPacket& packet)
checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
#endif /* DEBUG_UMEM */
{
- auto frames = sharedEmptyFrameOffset->lock();
+ auto frames = d_sharedEmptyFrameOffset->lock();
frames->push_back(offset);
}
}
// We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue.
// Once we have read the descriptors from the rx queue we release them, but we own the frames.
// After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent).
-// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue).
+// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue).
// XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet.
// The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively.
};
bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept;
-/* g++ defines __SANITIZE_THREAD__
- clang++ supports the nice __has_feature(thread_sanitizer),
- let's merge them */
-#if defined(__has_feature)
-#if __has_feature(thread_sanitizer)
-#define __SANITIZE_THREAD__ 1
-#endif
-#endif
-
// XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq.
// After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags.
// XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq.
class XskWorker
{
-#if defined(__SANITIZE_THREAD__)
- using XskPacketRing = LockGuarded<boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>>;
-#else
- using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
-#endif
-
public:
+ enum class Type : uint8_t { OutgoingOnly, Bidirectional};
+
+private:
+ using XskPacketRing = boost::lockfree::spsc_queue<XskPacket, boost::lockfree::capacity<XSK_RING_CONS__DEFAULT_NUM_DESCS * 2>>;
// queue of packets to be processed by this worker
- XskPacketRing incomingPacketsQueue;
+ XskPacketRing d_incomingPacketsQueue;
// queue of packets processed by this worker (to be sent, or discarded)
- XskPacketRing outgoingPacketsQueue;
-
- uint8_t* umemBufBase{nullptr};
+ XskPacketRing d_outgoingPacketsQueue;
// list of frames that are shared with the XskRouter
- std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
- const size_t frameSize{XskSocket::getFrameSize()};
+ std::shared_ptr<LockGuarded<vector<uint64_t>>> d_sharedEmptyFrameOffset;
+ uint8_t* d_umemBufBase{nullptr};
+ const size_t d_frameSize{XskSocket::getFrameSize()};
+ Type d_type;
+
+public:
FDWrapper workerWaker;
FDWrapper xskSocketWaker;
- XskWorker();
static int createEventfd();
static void notify(int desc);
- static std::shared_ptr<XskWorker> create();
+ static std::shared_ptr<XskWorker> create(Type);
+
+ XskWorker(Type);
+ void setSharedFrames(std::shared_ptr<LockGuarded<vector<uint64_t>>>& frames);
+ void setUmemBufBase(uint8_t* base);
void pushToProcessingQueue(XskPacket& packet);
void pushToSendQueue(XskPacket& packet);
+ bool hasIncomingFrames();
+ void processIncomingFrames(const std::function<void(XskPacket& packet)>& callback);
+ void processOutgoingFrames(const std::function<void(XskPacket& packet)>& callback);
void markAsFree(const XskPacket& packet);
// notify worker that at least one packet is available for processing
void notifyWorker() const;