From: Remi Gacogne Date: Thu, 28 Dec 2023 10:53:49 +0000 (+0100) Subject: dnsdist: Cleanup of the XSK code, fixing alignment issues X-Git-Tag: dnsdist-1.9.0-rc1^2~35 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=677f9d2b7da2e1c5833eb26003be307487b8099c;p=thirdparty%2Fpdns.git dnsdist: Cleanup of the XSK code, fixing alignment issues Also add UMEM checks for debugging. --- diff --git a/contrib/xdp.py b/contrib/xdp.py index bd96ddb1ce..1b9187007f 100644 --- a/contrib/xdp.py +++ b/contrib/xdp.py @@ -14,7 +14,7 @@ DROP_ACTION = 1 TC_ACTION = 2 # The interface on wich the filter will be attached -DEV = "eth0" +DEV = "eth1" # The list of blocked IPv4, IPv6 and QNames # IP format : (IPAddress, Action) diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 3338ceea31..c4ee519628 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -783,9 +783,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) if (socket) { udpCS->xskInfo = XskWorker::create(); udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset; - socket->addWorker(udpCS->xskInfo, loc, false); - // tcpCS->xskInfo=XskWorker::create(); - // TODO: socket->addWorker(tcpCS->xskInfo, loc, true); + socket->addWorker(udpCS->xskInfo, loc); } #endif /* HAVE_XSK */ g_frontends.push_back(std::move(udpCS)); @@ -837,9 +835,7 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) if (socket) { udpCS->xskInfo = XskWorker::create(); udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset; - socket->addWorker(udpCS->xskInfo, loc, false); - // TODO tcpCS->xskInfo=XskWorker::create(); - // TODO socket->addWorker(tcpCS->xskInfo, loc, true); + socket->addWorker(udpCS->xskInfo, loc); } #endif /* HAVE_XSK */ g_frontends.push_back(std::move(udpCS)); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index ae1d00aa1f..f3e8408143 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -761,7 +761,12 @@ static void handleResponseForUDPClient(InternalQueryState& ids, PacketBuffer& re vinfolog("Got answer from %s, relayed to %s (UDP), took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff); } else { - vinfolog("Got answer from %s, NOT relayed to %s (UDP) since that frontend is muted, took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff); + if (!ids.xskPacketHeader) { + vinfolog("Got answer from %s, NOT relayed to %s (UDP) since that frontend is muted, took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff); + } + else { + vinfolog("Got answer from %s, relayed to %s (UDP via XSK), took %f us", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), udiff); + } } handleResponseSent(ids, udiff, dr.ids.origRemote, ds->d_config.remote, response.size(), cleartextDH, ds->getProtocol(), true); @@ -779,8 +784,10 @@ static void XskHealthCheck(std::shared_ptr& dss, std::unordered auto packet = getHealthCheckPacket(dss, nullptr, data); data->d_initial = initial; setHealthCheckTime(dss, data); - auto* frame = xskInfo->getEmptyframe(); - auto xskPacket = std::make_unique(frame, 0, xskInfo->frameSize); + auto xskPacket = xskInfo->getEmptyFrame(); + if (!xskPacket) { + return; + } xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr); xskPacket->setPayload(packet); xskPacket->rewrite(); @@ -853,14 +860,18 @@ void responderThread(std::shared_ptr dss) bool needNotify = false; if (pollfds[0].revents & POLLIN) { needNotify = true; - xskInfo->cq.consume_all([&](XskPacket* packetRaw) { +#if defined(__SANITIZE_THREAD__) + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { +#else + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { +#endif auto packet = XskPacketPtr(packetRaw); - if (packet->dataLen() < sizeof(dnsheader)) { - xskInfo->pushToSendQueue(std::move(packet)); + if (packet->getDataLen() < sizeof(dnsheader)) { + xskInfo->markAsFree(std::move(packet)); return; } - const auto* dh = reinterpret_cast(packet->payloadData()); - const auto queryId = dh->id; + const dnsheader_aligned dnsHeader(packet->getPayloadData()); + const auto queryId = dnsHeader->id; auto ids = dss->getState(queryId); if (ids) { if (xskFd != ids->backendFD || !ids->xskPacketHeader) { @@ -877,30 +888,40 @@ void responderThread(std::shared_ptr dss) packet->cloneIntoPacketBuffer(data->d_buffer); data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data)); } - xskInfo->pushToSendQueue(std::move(packet)); + xskInfo->markAsFree(std::move(packet)); return; } auto response = packet->clonePacketBuffer(); - if (response.size() > packet->capacity()) { + if (response.size() > packet->getCapacity()) { /* fallback to sending the packet via normal socket */ ids->xskPacketHeader.reset(); } if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) { - xskInfo->pushToSendQueue(std::move(packet)); + xskInfo->markAsFree(std::move(packet)); + vinfolog("XSK packet pushed to queue because processResponderPacket failed"); return; } - if (response.size() > packet->capacity()) { + vinfolog("XSK packet - processResponderPacket OK"); + if (response.size() > packet->getCapacity()) { /* fallback to sending the packet via normal socket */ sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote); - xskInfo->pushToSendQueue(std::move(packet)); + vinfolog("XSK packet falling back because packet is too large"); + xskInfo->markAsFree(std::move(packet)); return; } + //vinfolog("XSK packet - set header"); packet->setHeader(*ids->xskPacketHeader); - packet->setPayload(response); + //vinfolog("XSK packet - set payload"); + if (!packet->setPayload(response)) { + vinfolog("Unable to set payload !"); + } if (ids->delayMsec > 0) { + vinfolog("XSK packet - adding delay"); packet->addDelay(ids->delayMsec); } + //vinfolog("XSK packet - update packet"); packet->updatePacket(); + //vinfolog("XSK packet pushed to send queue"); xskInfo->pushToSendQueue(std::move(packet)); }); xskInfo->cleanSocketNotification(); @@ -1001,12 +1022,19 @@ void responderThread(std::shared_ptr dss) if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->xskPacketHeader && ids->cs->xskInfo) { #ifdef HAVE_XSK + //vinfolog("processResponderPacket OK"); auto& xskInfo = ids->cs->xskInfo; - auto* frame = xskInfo->getEmptyframe(); - auto xskPacket = std::make_unique(frame, 0, xskInfo->frameSize); + auto xskPacket = xskInfo->getEmptyFrame(); + if (!xskPacket) { + continue; + } + //vinfolog("XSK setHeader"); xskPacket->setHeader(*ids->xskPacketHeader); + //vinfolog("XSK payload"); xskPacket->setPayload(response); + //vinfolog("XSK update packet"); xskPacket->updatePacket(); + //vinfolog("XSK pushed to send queue"); xskInfo->pushToSendQueue(std::move(xskPacket)); xskInfo->notifyXskSocket(); #endif /* HAVE_XSK */ @@ -2026,7 +2054,7 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct } #ifdef HAVE_XSK -static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) +static bool ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& packet) { uint16_t queryId = 0; const auto& remote = packet.getFromAddr(); @@ -2043,13 +2071,13 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p try { bool expectProxyProtocol = false; if (!isXskQueryAcceptable(packet, cs, holders, expectProxyProtocol)) { - return; + return false; } auto query = packet.clonePacketBuffer(); std::vector proxyProtocolValues; if (expectProxyProtocol && !handleProxyProtocol(remote, false, *holders.acl, query, ids.origRemote, ids.origDest, proxyProtocolValues)) { - return; + return false; } ids.queryRealTime.start(); @@ -2057,7 +2085,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p auto dnsCryptResponse = checkDNSCryptQuery(cs, query, ids.dnsCryptQuery, ids.queryRealTime.d_start.tv_sec, false); if (dnsCryptResponse) { packet.setPayload(query); - return; + return true; } { @@ -2066,7 +2094,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p queryId = ntohs(dnsHeader->id); if (!checkQueryHeaders(dnsHeader.get(), cs)) { - return; + return false; } if (dnsHeader->qdcount == 0) { @@ -2076,7 +2104,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p return true; }); packet.setPayload(query); - return; + return true; } } @@ -2095,7 +2123,7 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p auto result = processQuery(dq, holders, ss); if (result == ProcessQueryResult::Drop) { - return; + return false; } if (result == ProcessQueryResult::SendAnswer) { @@ -2103,11 +2131,11 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p if (dq.ids.delayMsec > 0) { packet.addDelay(dq.ids.delayMsec); } - return; + return true; } if (result != ProcessQueryResult::PassToBackend || ss == nullptr) { - return; + return false; } // the buffer might have been invalidated by now (resized) @@ -2125,11 +2153,12 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload); ss->passCrossProtocolQuery(std::move(cpq)); - return; + return false; } if (!ss->xskInfo) { assignOutgoingUDPQueryToBackend(ss, dh->id, dq, query, true); + return false; } else { int fd = ss->xskInfo->workerWaker; @@ -2138,11 +2167,13 @@ static void ProcessXskQuery(ClientState& cs, LocalHolders& holders, XskPacket& p packet.setAddr(ss->d_config.sourceAddr,ss->d_config.sourceMACAddr, ss->d_config.remote,ss->d_config.destMACAddr); packet.setPayload(query); packet.rewrite(); + return true; } } catch (const std::exception& e) { vinfolog("Got an error in UDP question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what()); } + return false; } #endif /* HAVE_XSK */ @@ -2246,14 +2277,26 @@ static void xskClientThread(ClientState* cs) LocalHolders holders; for (;;) { - while (!xskInfo->cq.read_available()) { +#if defined(__SANITIZE_THREAD__) + while (!xskInfo->incomingPacketsQueue.lock()->read_available()) { +#else + while (!xskInfo->incomingPacketsQueue.read_available()) { +#endif xskInfo->waitForXskSocket(); } - xskInfo->cq.consume_all([&](XskPacket* packetRaw) { +#if defined(__SANITIZE_THREAD__) + xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { +#else + xskInfo->incomingPacketsQueue.consume_all([&](XskPacket* packetRaw) { +#endif auto packet = XskPacketPtr(packetRaw); - ProcessXskQuery(*cs, holders, *packet); - packet->updatePacket(); - xskInfo->pushToSendQueue(std::move(packet)); + if (ProcessXskQuery(*cs, holders, *packet)) { + packet->updatePacket(); + xskInfo->pushToSendQueue(std::move(packet)); + } + else { + xskInfo->markAsFree(std::move(packet)); + } }); xskInfo->notifyXskSocket(); } @@ -3642,59 +3685,67 @@ void XskRouter(std::shared_ptr xsk) const auto& xskWakerIdx = xsk->workers.get<0>(); const auto& destIdx = xsk->workers.get<1>(); while (true) { - auto ready = xsk->wait(-1); - // descriptor 0 gets incoming AF_XDP packets - if (xsk->fds[0].revents & POLLIN) { - auto packets = xsk->recv(64, &failed); - dnsdist::metrics::g_stats.nonCompliantQueries += failed; - for (auto &packet : packets) { - const auto dest = packet->getToAddr(); - auto res = destIdx.find(dest); - if (res == destIdx.end()) { - xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet)); - continue; + try { + auto ready = xsk->wait(-1); + // descriptor 0 gets incoming AF_XDP packets + if (xsk->fds[0].revents & POLLIN) { + auto packets = xsk->recv(64, &failed); + dnsdist::metrics::g_stats.nonCompliantQueries += failed; + for (auto &packet : packets) { + const auto dest = packet->getToAddr(); + auto res = destIdx.find(dest); + if (res == destIdx.end()) { + xsk->markAsFree(std::move(packet)); + continue; + } + res->worker->pushToProcessingQueue(std::move(packet)); + needNotify.insert(res->workerWaker); } - res->worker->pushToProcessingQueue(std::move(packet)); - needNotify.insert(res->workerWaker); - } - for (auto i : needNotify) { - uint64_t x = 1; - auto written = write(i, &x, sizeof(x)); - if (written != sizeof(x)) { - // oh, well, the worker is clearly overloaded - // but there is nothing we can do about it, - // and hopefully the queue will be processed eventually + for (auto i : needNotify) { + uint64_t x = 1; + auto written = write(i, &x, sizeof(x)); + if (written != sizeof(x)) { + // oh, well, the worker is clearly overloaded + // but there is nothing we can do about it, + // and hopefully the queue will be processed eventually + } } - } - needNotify.clear(); - ready--; - } - const auto backup = ready; - for (size_t i = 1; i < size && ready > 0; i++) { - if (xsk->fds[i].revents & POLLIN) { + needNotify.clear(); ready--; - auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker; - info->sq.consume_all([&](XskPacket* packetRaw) { - auto packet = XskPacketPtr(packetRaw); - if (!(packet->getFlags() & XskPacket::UPDATE)) { - xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet)); - packet.release(); - return; - } - if (packet->getFlags() & XskPacket::DELAY) { - xsk->waitForDelay.push(std::move(packet)); - return; - } - fillInTx.push_back(std::move(packet)); - }); - info->cleanWorkerNotification(); } + const auto backup = ready; + for (size_t i = 1; i < size && ready > 0; i++) { + if (xsk->fds[i].revents & POLLIN) { + ready--; + auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker; +#if defined(__SANITIZE_THREAD__) + info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket* packetRaw) { +#else + info->outgoingPacketsQueue.consume_all([&](XskPacket* packetRaw) { +#endif + auto packet = XskPacketPtr(packetRaw); + if (!(packet->getFlags() & XskPacket::UPDATE)) { + xsk->markAsFree(std::move(packet)); + return; + } + if (packet->getFlags() & XskPacket::DELAY) { + xsk->waitForDelay.push(std::move(packet)); + return; + } + fillInTx.push_back(std::move(packet)); + }); + info->cleanWorkerNotification(); + } + } + xsk->pickUpReadyPacket(fillInTx); + xsk->recycle(4096); + xsk->fillFq(); + xsk->send(fillInTx); + ready = backup; + } + catch (...) { + vinfolog("Exception in XSK router loop"); } - xsk->pickUpReadyPacket(fillInTx); - xsk->recycle(64); - xsk->fillFq(); - xsk->send(fillInTx); - ready = backup; } } #endif /* HAVE_XSK */ diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 9b5d33e38c..0081c492a6 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -985,7 +985,7 @@ public: if (d_config.sourceAddr.sin4.sin_family == 0) { throw runtime_error("invalid source addr"); } - xsk->addWorker(xskInfo, d_config.sourceAddr, getProtocol() != dnsdist::Protocol::DoUDP); + xsk->addWorker(xskInfo, d_config.sourceAddr); d_config.sourceMACAddr = xsk->source; xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset; } diff --git a/pdns/xsk.cc b/pdns/xsk.cc index f1fa45fbf1..2f438f9fd9 100644 --- a/pdns/xsk.cc +++ b/pdns/xsk.cc @@ -24,22 +24,17 @@ #ifdef HAVE_XSK -#include "gettime.hh" -#include "xsk.hh" - #include #include #include #include #include #include -#include #include #include #include #include #include -#include #include #include #include @@ -60,6 +55,36 @@ extern "C" #include } +#include "gettime.hh" +#include "xsk.hh" + +#define DEBUG_UMEM 0 +#ifdef DEBUG_UMEM +namespace { +struct UmemEntryStatus +{ + enum class Status: uint8_t { Free, FillQueue, Received, TXQueue }; + Status status{Status::Free}; +}; + +LockGuarded> s_umems; + +void checkUmemIntegrity(const char* function, int line, uint64_t offset, const std::set& validStatuses, UmemEntryStatus::Status newStatus) +{ + auto umems = s_umems.lock(); + if (validStatuses.count(umems->at(offset).status) == 0) { + std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status is " << static_cast(umems->at(offset).status) << ", expected: "; + for (const auto status : validStatuses) { + std::cerr << static_cast(status) << " "; + } + std::cerr << std::endl; + abort(); + } + (*umems)[offset].status = newStatus; +} +} +#endif /* DEBUG_UMEM */ + constexpr bool XskSocket::isPowOfTwo(uint32_t value) noexcept { return value != 0 && (value & (value - 1)) == 0; @@ -79,6 +104,7 @@ int XskSocket::firstTimeout() } return res; } + XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_) : frameNum(frameNum_), queueId(queue_id), ifName(ifName_), poolName(poolName_), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared>>()) { @@ -92,6 +118,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu memset(&fq, 0, sizeof(fq)); memset(&tx, 0, sizeof(tx)); memset(&rx, 0, sizeof(rx)); + xsk_umem_config umemCfg; umemCfg.fill_size = fqCapacity; umemCfg.comp_size = cqCapacity; @@ -99,6 +126,7 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu umemCfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM; umemCfg.flags = 0; umem.umemInit(frameNum_ * frameSize, &cq, &fq, &umemCfg); + { xsk_socket_config socketCfg; socketCfg.rx_size = rxCapacity; @@ -109,106 +137,165 @@ XskSocket::XskSocket(size_t frameNum_, const std::string& ifName_, uint32_t queu xsk_socket* tmp = nullptr; auto ret = xsk_socket__create(&tmp, ifName.c_str(), queue_id, umem.umem, &rx, &tx, &socketCfg); if (ret != 0) { - throw std::runtime_error("Error creating a xsk socket of if_name" + ifName + stringerror(ret)); + throw std::runtime_error("Error creating a xsk socket of if_name " + ifName + ": " + stringerror(ret)); } - socket = std::unique_ptr(tmp, xsk_socket__delete); + socket = std::unique_ptr(tmp, xsk_socket__delete); } - for (uint64_t i = 0; i < frameNum; i++) { - uniqueEmptyFrameOffset.push_back(i * frameSize + XDP_PACKET_HEADROOM); + + uniqueEmptyFrameOffset.reserve(frameNum); + { + for (uint64_t i = 0; i < frameNum; i++) { + //uniqueEmptyFrameOffset.push_back(i * frameSize); + uniqueEmptyFrameOffset.push_back(i * frameSize + XDP_PACKET_HEADROOM); +#ifdef DEBUG_UMEM + { + auto umems = s_umems.lock(); + (*umems)[i * frameSize + XDP_PACKET_HEADROOM] = UmemEntryStatus(); + } +#endif /* DEBUG_UMEM */ + } } + fillFq(fqCapacity); + const auto xskfd = xskFd(); fds.push_back(pollfd{ .fd = xskfd, .events = POLLIN, .revents = 0}); + const auto xskMapFd = FDWrapper(bpf_obj_get(xskMapPath.c_str())); + if (xskMapFd.getHandle() < 0) { throw std::runtime_error("Error getting BPF map from path '" + xskMapPath + "'"); } + auto ret = bpf_map_update_elem(xskMapFd.getHandle(), &queue_id, &xskfd, 0); if (ret) { throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath + "': " + std::to_string(ret)); } } + void XskSocket::fillFq(uint32_t fillSize) noexcept { { +#warning why are we collecting frames from unique into shared here, even though we need unique ones? auto frames = sharedEmptyFrameOffset->lock(); if (frames->size() < holdThreshold) { const auto moveSize = std::min(holdThreshold - frames->size(), uniqueEmptyFrameOffset.size()); if (moveSize > 0) { frames->insert(frames->end(), std::make_move_iterator(uniqueEmptyFrameOffset.end() - moveSize), std::make_move_iterator(uniqueEmptyFrameOffset.end())); + uniqueEmptyFrameOffset.resize(uniqueEmptyFrameOffset.size() - moveSize); } } } + if (uniqueEmptyFrameOffset.size() < fillSize) { return; } - uint32_t idx; - if (xsk_ring_prod__reserve(&fq, fillSize, &idx) != fillSize) { + + uint32_t idx{0}; + auto toFill = xsk_ring_prod__reserve(&fq, fillSize, &idx); + if (toFill == 0) { return; } uint32_t processed = 0; - for (; processed < fillSize; processed++) { + for (; processed < toFill; processed++) { *xsk_ring_prod__fill_addr(&fq, idx++) = uniqueEmptyFrameOffset.back(); +#ifdef DEBUG_UMEM + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue); +#endif /* DEBUG_UMEM */ uniqueEmptyFrameOffset.pop_back(); } + xsk_ring_prod__submit(&fq, processed); } + int XskSocket::wait(int timeout) { - return poll(fds.data(), fds.size(), static_cast(std::min(static_cast(timeout), static_cast(firstTimeout())))); + auto waitAtMost = std::min(timeout, firstTimeout()); + return poll(fds.data(), fds.size(), waitAtMost); } + [[nodiscard]] uint64_t XskSocket::frameOffset(const XskPacket& packet) const noexcept { - return reinterpret_cast(packet.frame) - reinterpret_cast(umem.bufBase); + return packet.frame - umem.bufBase; } -int XskSocket::xskFd() const noexcept { return xsk_socket__fd(socket.get()); } +[[nodiscard]] int XskSocket::xskFd() const noexcept { + return xsk_socket__fd(socket.get()); +} void XskSocket::send(std::vector& packets) { - const auto packetSize = packets.size(); - if (packetSize == 0) { - return; - } - uint32_t idx{0}; - if (xsk_ring_prod__reserve(&tx, packetSize, &idx) != packets.size()) { - return; - } + while (packets.size() > 0) { + auto packetSize = packets.size(); + if (packetSize > std::numeric_limits::max()) { + packetSize = std::numeric_limits::max(); + } + size_t toSend = std::min(static_cast(packetSize), txCapacity); + uint32_t idx{0}; + auto toFill = xsk_ring_prod__reserve(&tx, toSend, &idx); + if (toFill == 0) { + return; + } - for (const auto& packet : packets) { - *xsk_ring_prod__tx_desc(&tx, idx++) = { - .addr = frameOffset(*packet), - .len = packet->FrameLen(), - .options = 0}; + size_t queued = 0; + for (const auto& packet : packets) { + if (queued == toFill) { + break; + } + *xsk_ring_prod__tx_desc(&tx, idx++) = { + .addr = frameOffset(*packet), + .len = packet->getFrameLen(), + .options = 0}; +#ifdef DEBUG_UMEM + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue); +#endif /* DEBUG_UMEM */ + queued++; + } + xsk_ring_prod__submit(&tx, toFill); + packets.erase(packets.begin(), packets.begin() + toFill); } - xsk_ring_prod__submit(&tx, packetSize); - packets.clear(); } + std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount) { - uint32_t idx; + uint32_t idx{0}; std::vector res; // how many descriptors to packets have been filled const auto recvSize = xsk_ring_cons__peek(&rx, recvSizeMax, &idx); - if (recvSize <= 0) { + if (recvSize == 0) { return res; } const auto baseAddr = reinterpret_cast(umem.bufBase); uint32_t failed = 0; uint32_t processed = 0; + res.reserve(recvSize); for (; processed < recvSize; processed++) { - const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++); - auto ptr = std::make_unique(reinterpret_cast(desc->addr + baseAddr), desc->len, frameSize); - if (!ptr->parse()) { - ++failed; - uniqueEmptyFrameOffset.push_back(frameOffset(*ptr)); + try { + const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++); + auto ptr = std::make_unique(reinterpret_cast(desc->addr + baseAddr), desc->len, frameSize); +#ifdef DEBUG_UMEM + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(*ptr), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received); +#endif /* DEBUG_UMEM */ + + if (!ptr->parse(false)) { + ++failed; + markAsFree(std::move(ptr)); + } + else { + res.push_back(std::move(ptr)); + } + } + catch (const std::exception& exp) { + std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl; + break; } - else { - res.push_back(std::move(ptr)); + catch (...) { + std::cerr << "Exception while processing the XSK RX queue" << std::endl; + break; } } @@ -222,6 +309,7 @@ std::vector XskSocket::recv(uint32_t recvSizeMax, uint32_t* failed return res; } + void XskSocket::pickUpReadyPacket(std::vector& packets) { timespec now; @@ -232,17 +320,23 @@ void XskSocket::pickUpReadyPacket(std::vector& packets) waitForDelay.pop(); } } + void XskSocket::recycle(size_t size) noexcept { - uint32_t idx; + uint32_t idx{0}; const auto completeSize = xsk_ring_cons__peek(&cq, size, &idx); - if (completeSize <= 0) { + if (completeSize == 0) { return; } - for (uint32_t processed = 0; processed < completeSize; ++processed) { + uniqueEmptyFrameOffset.reserve(uniqueEmptyFrameOffset.size() + completeSize); + uint32_t processed = 0; + for (; processed < completeSize; ++processed) { uniqueEmptyFrameOffset.push_back(*xsk_ring_cons__comp_addr(&cq, idx++)); +#ifdef DEBUG_UMEM + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); +#endif /* DEBUG_UMEM */ } - xsk_ring_cons__release(&cq, completeSize); + xsk_ring_cons__release(&cq, processed); } void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config) @@ -255,7 +349,7 @@ void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue auto ret = xsk_umem__create(&umem, bufBase, size, fillQueue, completionQueue, config); if (ret != 0) { munmap(bufBase, size); - throw std::runtime_error("Error creating a umem of size" + std::to_string(size) + stringerror(ret)); + throw std::runtime_error("Error creating a umem of size " + std::to_string(size) + ": " + stringerror(ret)); } } @@ -277,10 +371,21 @@ std::string XskSocket::getMetrics() const ret << "TX invalid descs: " << std::to_string(stats.tx_invalid_descs) << std::endl; ret << "RX ring full: " << std::to_string(stats.rx_ring_full) << std::endl; ret << "RX fill ring empty descs: " << std::to_string(stats.rx_fill_ring_empty_descs) << std::endl; - ret << "RX ring empty descs: " << std::to_string(stats.tx_ring_empty_descs) << std::endl; + ret << "TX ring empty descs: " << std::to_string(stats.tx_ring_empty_descs) << std::endl; return ret.str(); } +void XskSocket::markAsFree(XskPacketPtr&& packet) +{ + auto offset = frameOffset(*packet); +#ifdef DEBUG_UMEM + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); +#endif /* DEBUG_UMEM */ + + uniqueEmptyFrameOffset.push_back(offset); + packet.release(); +} + XskSocket::XskUmem::~XskUmem() { if (umem) { @@ -291,198 +396,276 @@ XskSocket::XskUmem::~XskUmem() } } -bool XskPacket::parse() +[[nodiscard]] size_t XskPacket::getL4HeaderOffset() const noexcept +{ + return sizeof(ethhdr) + (v6 ? (sizeof(ipv6hdr)) : sizeof(iphdr)); +} + +[[nodiscard]] size_t XskPacket::getDataOffset() const noexcept +{ + return getL4HeaderOffset() + sizeof(udphdr); +} + +[[nodiscard]] size_t XskPacket::getDataSize() const noexcept +{ + return frameLength - getDataOffset(); +} + +[[nodiscard]] ethhdr XskPacket::getEthernetHeader() const noexcept +{ + ethhdr ethHeader{}; + assert(frameLength >= sizeof(ethHeader)); + memcpy(ðHeader, frame, sizeof(ethHeader)); + return ethHeader; +} + +void XskPacket::setEthernetHeader(const ethhdr& ethHeader) noexcept { - // payloadEnd must bigger than payload + sizeof(ethhdr) + sizoef(iphdr) + sizeof(udphdr) - auto* eth = reinterpret_cast(frame); - uint8_t l4Protocol; - if (eth->h_proto == htons(ETH_P_IP)) { - auto* ip = reinterpret_cast(eth + 1); - if (ip->ihl != static_cast(sizeof(iphdr) >> 2)) { - // ip->ihl*4 != sizeof(iphdr) + assert(frameLength >= sizeof(ethHeader)); + memcpy(frame, ðHeader, sizeof(ethHeader)); +} + +[[nodiscard]] iphdr XskPacket::getIPv4Header() const noexcept +{ + iphdr ipv4Header{}; + assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv4Header))); + assert(!v6); + memcpy(&ipv4Header, frame + sizeof(ethhdr), sizeof(ipv4Header)); + return ipv4Header; +} + +void XskPacket::setIPv4Header(const iphdr& ipv4Header) noexcept +{ + assert(frameLength >= (sizeof(ethhdr) + sizeof(iphdr))); + assert(!v6); + memcpy(frame + sizeof(ethhdr), &ipv4Header, sizeof(ipv4Header)); +} + +[[nodiscard]] ipv6hdr XskPacket::getIPv6Header() const noexcept +{ + ipv6hdr ipv6Header{}; + assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header))); + assert(v6); + memcpy(&ipv6Header, frame + sizeof(ethhdr), sizeof(ipv6Header)); + return ipv6Header; +} + +void XskPacket::setIPv6Header(const ipv6hdr& ipv6Header) noexcept +{ + assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header))); + assert(v6); + memcpy(frame + sizeof(ethhdr), &ipv6Header, sizeof(ipv6Header)); +} + +[[nodiscard]] udphdr XskPacket::getUDPHeader() const noexcept +{ + udphdr udpHeader{}; + assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader))); + memcpy(&udpHeader, frame + getL4HeaderOffset(), sizeof(udpHeader)); + return udpHeader; +} + +void XskPacket::setUDPHeader(const udphdr& udpHeader) noexcept +{ + assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader))); + memcpy(frame + getL4HeaderOffset(), &udpHeader, sizeof(udpHeader)); +} + +bool XskPacket::parse(bool fromSetHeader) +{ + if (frameLength <= sizeof(ethhdr)) { + return false; + } + + auto ethHeader = getEthernetHeader(); + uint8_t l4Protocol{0}; + if (ethHeader.h_proto == htons(ETH_P_IP)) { + if (frameLength < (sizeof(ethhdr) + sizeof(iphdr) + sizeof(udphdr))) { + return false; + } + v6 = false; + auto ipHeader = getIPv4Header(); + if (ipHeader.ihl != (static_cast(sizeof(iphdr) / 4))) { // ip options is not supported now! return false; } // check ip.check == ipv4Checksum() is not needed! // We check it in BPF program - from = makeComboAddressFromRaw(4, reinterpret_cast(&ip->saddr), sizeof(ip->saddr)); - to = makeComboAddressFromRaw(4, reinterpret_cast(&ip->daddr), sizeof(ip->daddr)); - l4Protocol = ip->protocol; - l4Header = reinterpret_cast(ip + 1); - payloadEnd = std::min(reinterpret_cast(ip) + ntohs(ip->tot_len), payloadEnd); - } - else if (eth->h_proto == htons(ETH_P_IPV6)) { - auto* ipv6 = reinterpret_cast(eth + 1); - l4Header = reinterpret_cast(ipv6 + 1); - if (l4Header >= payloadEnd) { + // we don't, actually. + from = makeComboAddressFromRaw(4, reinterpret_cast(&ipHeader.saddr), sizeof(ipHeader.saddr)); + to = makeComboAddressFromRaw(4, reinterpret_cast(&ipHeader.daddr), sizeof(ipHeader.daddr)); + l4Protocol = ipHeader.protocol; + if (!fromSetHeader && (frameLength - sizeof(ethhdr)) != ntohs(ipHeader.tot_len)) { + // too small, or too large (trailing data), go away + return false; + } + } + else if (ethHeader.h_proto == htons(ETH_P_IPV6)) { + if (frameLength < (sizeof(ethhdr) + sizeof(ipv6hdr) + sizeof(udphdr))) { + return false; + } + v6 = true; + auto ipHeader = getIPv6Header(); + from = makeComboAddressFromRaw(6, reinterpret_cast(&ipHeader.saddr), sizeof(ipHeader.saddr)); + to = makeComboAddressFromRaw(6, reinterpret_cast(&ipHeader.daddr), sizeof(ipHeader.daddr)); + l4Protocol = ipHeader.nexthdr; + if (!fromSetHeader && (frameLength - (sizeof(ethhdr) + sizeof(ipv6hdr))) != ntohs(ipHeader.payload_len)) { return false; } - from = makeComboAddressFromRaw(6, reinterpret_cast(&ipv6->saddr), sizeof(ipv6->saddr)); - to = makeComboAddressFromRaw(6, reinterpret_cast(&ipv6->daddr), sizeof(ipv6->daddr)); - l4Protocol = ipv6->nexthdr; - payloadEnd = std::min(l4Header + ntohs(ipv6->payload_len), payloadEnd); } else { return false; } - if (l4Protocol == IPPROTO_UDP) { - // check udp.check == ipv4Checksum() is not needed! - // We check it in BPF program - const auto* udp = reinterpret_cast(l4Header); - payload = l4Header + sizeof(udphdr); - // Because of XskPacket::setHeader - // payload = payloadEnd should be allow - if (payload > payloadEnd) { - return false; - } - payloadEnd = std::min(l4Header + ntohs(udp->len), payloadEnd); - from.setPort(ntohs(udp->source)); - to.setPort(ntohs(udp->dest)); - return true; + + if (l4Protocol != IPPROTO_UDP) { + return false; } - if (l4Protocol == IPPROTO_TCP) { - // check tcp.check == ipv4Checksum() is not needed! - // We check it in BPF program - const auto* tcp = reinterpret_cast(l4Header); - if (tcp->doff != static_cast(sizeof(tcphdr) >> 2)) { - // tcp is not supported now! + + // check udp.check == ipv4Checksum() is not needed! + // We check it in BPF program + // we don't, actually. + auto udpHeader = getUDPHeader(); + if (!fromSetHeader) { + // Because of XskPacket::setHeader + if (getDataOffset() > frameLength) { return false; } - payload = l4Header + sizeof(tcphdr); - // - if (payload > payloadEnd) { + + if (getDataSize() + sizeof(udphdr) != ntohs(udpHeader.len)) { return false; } - from.setPort(ntohs(tcp->source)); - to.setPort(ntohs(tcp->dest)); - flags |= TCP; - return true; } - // ipv6 extension header is not supported now! - return false; + + from.setPort(ntohs(udpHeader.source)); + to.setPort(ntohs(udpHeader.dest)); + return true; } -uint32_t XskPacket::dataLen() const noexcept +uint32_t XskPacket::getDataLen() const noexcept { - return payloadEnd - payload; + return getDataSize(); } -uint32_t XskPacket::FrameLen() const noexcept + +uint32_t XskPacket::getFrameLen() const noexcept { - return payloadEnd - frame; + return frameLength; } -size_t XskPacket::capacity() const noexcept + +size_t XskPacket::getCapacity() const noexcept { - return frameEnd - payloadEnd; + return frameSize; } void XskPacket::changeDirectAndUpdateChecksum() noexcept { - auto* eth = reinterpret_cast(frame); + auto ethHeader = getEthernetHeader(); { uint8_t tmp[ETH_ALEN]; - static_assert(sizeof(tmp) == sizeof(eth->h_dest), "Size Error"); - static_assert(sizeof(tmp) == sizeof(eth->h_source), "Size Error"); - memcpy(tmp, eth->h_dest, sizeof(tmp)); - memcpy(eth->h_dest, eth->h_source, sizeof(tmp)); - memcpy(eth->h_source, tmp, sizeof(tmp)); + static_assert(sizeof(tmp) == sizeof(ethHeader.h_dest), "Size Error"); + static_assert(sizeof(tmp) == sizeof(ethHeader.h_source), "Size Error"); + memcpy(tmp, ethHeader.h_dest, sizeof(tmp)); + memcpy(ethHeader.h_dest, ethHeader.h_source, sizeof(tmp)); + memcpy(ethHeader.h_source, tmp, sizeof(tmp)); } - if (eth->h_proto == htons(ETH_P_IPV6)) { + if (ethHeader.h_proto == htons(ETH_P_IPV6)) { // IPV6 - auto* ipv6 = reinterpret_cast(eth + 1); - std::swap(ipv6->daddr, ipv6->saddr); - if (ipv6->nexthdr == IPPROTO_UDP) { - // UDP - auto* udp = reinterpret_cast(ipv6 + 1); - std::swap(udp->dest, udp->source); - udp->len = htons(payloadEnd - reinterpret_cast(udp)); - udp->check = 0; - udp->check = tcp_udp_v6_checksum(); - } - else { - // TCP - auto* tcp = reinterpret_cast(ipv6 + 1); - std::swap(tcp->dest, tcp->source); - // TODO - } - rewriteIpv6Header(ipv6); + auto ipv6 = getIPv6Header(); + std::swap(ipv6.daddr, ipv6.saddr); + assert(ipv6.nexthdr == IPPROTO_UDP); + + auto udp = getUDPHeader(); + std::swap(udp.dest, udp.source); + udp.len = htons(getDataSize() + sizeof(udp)); + udp.check = 0; + /* needed to get the correct checksum */ + setIPv6Header(ipv6); + setUDPHeader(udp); + udp.check = tcp_udp_v6_checksum(&ipv6); + rewriteIpv6Header(&ipv6, getFrameLen()); + setIPv6Header(ipv6); + setUDPHeader(udp); } else { // IPV4 - auto* ipv4 = reinterpret_cast(eth + 1); - std::swap(ipv4->daddr, ipv4->saddr); - if (ipv4->protocol == IPPROTO_UDP) { - // UDP - auto* udp = reinterpret_cast(ipv4 + 1); - std::swap(udp->dest, udp->source); - udp->len = htons(payloadEnd - reinterpret_cast(udp)); - udp->check = 0; - udp->check = tcp_udp_v4_checksum(); - } - else { - // TCP - auto* tcp = reinterpret_cast(ipv4 + 1); - std::swap(tcp->dest, tcp->source); - // TODO - } - rewriteIpv4Header(ipv4); + auto ipv4 = getIPv4Header(); + std::swap(ipv4.daddr, ipv4.saddr); + assert(ipv4.protocol == IPPROTO_UDP); + + auto udp = getUDPHeader(); + std::swap(udp.dest, udp.source); + udp.len = htons(getDataSize() + sizeof(udp)); + udp.check = 0; + /* needed to get the correct checksum */ + setIPv4Header(ipv4); + setUDPHeader(udp); + udp.check = tcp_udp_v4_checksum(&ipv4); + rewriteIpv4Header(&ipv4, getFrameLen()); + setIPv4Header(ipv4); + setUDPHeader(udp); } + setEthernetHeader(ethHeader); } -void XskPacket::rewriteIpv4Header(void* ipv4header) noexcept + +void XskPacket::rewriteIpv4Header(struct iphdr* ipv4header, size_t frameLen) noexcept { - auto* ipv4 = static_cast(ipv4header); - ipv4->version = 4; - ipv4->ihl = sizeof(iphdr) / 4; - ipv4->tos = 0; - ipv4->tot_len = htons(payloadEnd - reinterpret_cast(ipv4)); - ipv4->id = 0; - ipv4->frag_off = 0; - ipv4->ttl = DefaultTTL; - ipv4->check = 0; - ipv4->check = ipv4Checksum(); + ipv4header->version = 4; + ipv4header->ihl = sizeof(iphdr) / 4; + ipv4header->tos = 0; + ipv4header->tot_len = htons(frameLen - sizeof(ethhdr)); + ipv4header->id = 0; + ipv4header->frag_off = 0; + ipv4header->ttl = DefaultTTL; + ipv4header->check = 0; + ipv4header->check = ipv4Checksum(ipv4header); } -void XskPacket::rewriteIpv6Header(void* ipv6header) noexcept + +void XskPacket::rewriteIpv6Header(struct ipv6hdr* ipv6header, size_t frameLen) noexcept { - auto* ipv6 = static_cast(ipv6header); - ipv6->version = 6; - ipv6->priority = 0; - ipv6->payload_len = htons(payloadEnd - reinterpret_cast(ipv6 + 1)); - ipv6->hop_limit = DefaultTTL; - memset(&ipv6->flow_lbl, 0, sizeof(ipv6->flow_lbl)); + ipv6header->version = 6; + ipv6header->priority = 0; + ipv6header->payload_len = htons(frameLen - sizeof(ethhdr) - sizeof(ipv6hdr)); + ipv6header->hop_limit = DefaultTTL; + memset(&ipv6header->flow_lbl, 0, sizeof(ipv6header->flow_lbl)); } bool XskPacket::isIPV6() const noexcept { - const auto* eth = reinterpret_cast(frame); - return eth->h_proto == htons(ETH_P_IPV6); + return v6; } -XskPacket::XskPacket(void* frame_, size_t dataSize, size_t frameSize) : - frame(static_cast(frame_)), payloadEnd(static_cast(frame) + dataSize), frameEnd(static_cast(frame) + frameSize - XDP_PACKET_HEADROOM) + +XskPacket::XskPacket(uint8_t* frame_, size_t dataSize, size_t frameSize) : + frame(frame_), frameLength(dataSize), frameSize(frameSize - XDP_PACKET_HEADROOM) { } + PacketBuffer XskPacket::clonePacketBuffer() const { - const auto size = dataLen(); + const auto size = getDataSize(); PacketBuffer tmp(size); - memcpy(tmp.data(), payload, size); + memcpy(tmp.data(), frame + getDataOffset(), size); return tmp; } + void XskPacket::cloneIntoPacketBuffer(PacketBuffer& buffer) const { - const auto size = dataLen(); + const auto size = getDataSize(); buffer.resize(size); - memcpy(buffer.data(), payload, size); + memcpy(buffer.data(), frame + getDataOffset(), size); } + bool XskPacket::setPayload(const PacketBuffer& buf) { const auto bufSize = buf.size(); - if (bufSize == 0 || bufSize > capacity()) { + const auto currentCapacity = getCapacity(); + if (bufSize == 0 || bufSize > currentCapacity) { return false; } flags |= UPDATE; - memcpy(payload, buf.data(), bufSize); - payloadEnd = payload + bufSize; + memcpy(frame + getDataOffset(), buf.data(), bufSize); + frameLength = getDataOffset() + bufSize; return true; } + void XskPacket::addDelay(const int relativeMilliseconds) noexcept { gettime(&sendTime); @@ -490,18 +673,22 @@ void XskPacket::addDelay(const int relativeMilliseconds) noexcept sendTime.tv_sec += sendTime.tv_nsec / 1000000000L; sendTime.tv_nsec %= 1000000000L; } + bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept { return s1->sendTime < s2->sendTime; } + const ComboAddress& XskPacket::getFromAddr() const noexcept { return from; } + const ComboAddress& XskPacket::getToAddr() const noexcept { return to; } + void XskWorker::notify(int fd) { uint64_t value = 1; @@ -512,6 +699,7 @@ void XskWorker::notify(int fd) throw runtime_error("Unable Wake Up XskSocket Failed"); } } + XskWorker::XskWorker() : workerWaker(createEventfd()), xskSocketWaker(createEventfd()) { @@ -520,154 +708,158 @@ XskWorker::XskWorker() : void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet) { auto raw = packet.release(); - if (!cq.push(raw)) { - delete raw; +#if defined(__SANITIZE_THREAD__) + if (!incomingPacketsQueue.lock()->push(std::move(raw))) { +#else + if (!incomingPacketsQueue.push(std::move(raw))) { +#endif + markAsFree(XskPacketPtr(raw)); } } void XskWorker::pushToSendQueue(XskPacketPtr&& packet) { auto raw = packet.release(); - if (!sq.push(raw)) { - delete raw; +#if defined(__SANITIZE_THREAD__) + if (!outgoingPacketsQueue.lock()->push(raw)) { +#else + if (!outgoingPacketsQueue.push(raw)) { +#endif + markAsFree(XskPacketPtr(raw)); } } -void* XskPacket::payloadData() -{ - return reinterpret_cast(payload); -} -const void* XskPacket::payloadData() const +const void* XskPacket::getPayloadData() const { - return reinterpret_cast(payload); + return frame + getDataOffset(); } -void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp) noexcept + +void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept { - auto* eth = reinterpret_cast(frame); - memcpy(eth->h_dest, &toMAC[0], sizeof(MACAddr)); - memcpy(eth->h_source, &fromMAC[0], sizeof(MACAddr)); + auto ethHeader = getEthernetHeader(); + memcpy(ethHeader.h_dest, &toMAC[0], sizeof(MACAddr)); + memcpy(ethHeader.h_source, &fromMAC[0], sizeof(MACAddr)); + setEthernetHeader(ethHeader); to = to_; from = from_; - l4Header = frame + sizeof(ethhdr) + (to.isIPv4() ? sizeof(iphdr) : sizeof(ipv6hdr)); - if (tcp) { - flags = TCP; - payload = l4Header + sizeof(tcphdr); - } - else { - flags = 0; - payload = l4Header + sizeof(udphdr); - } + v6 = !to.isIPv4(); + flags = 0; } + void XskPacket::rewrite() noexcept { flags |= REWRITE; - auto* eth = reinterpret_cast(frame); - if (to.isIPv4()) { - eth->h_proto = htons(ETH_P_IP); - auto* ipv4 = reinterpret_cast(eth + 1); - - ipv4->daddr = to.sin4.sin_addr.s_addr; - ipv4->saddr = from.sin4.sin_addr.s_addr; - if (flags & XskPacket::TCP) { - auto* tcp = reinterpret_cast(ipv4 + 1); - ipv4->protocol = IPPROTO_TCP; - tcp->source = from.sin4.sin_port; - tcp->dest = to.sin4.sin_port; - // TODO - } - else { - auto* udp = reinterpret_cast(ipv4 + 1); - ipv4->protocol = IPPROTO_UDP; - udp->source = from.sin4.sin_port; - udp->dest = to.sin4.sin_port; - udp->len = htons(payloadEnd - reinterpret_cast(udp)); - udp->check = 0; - udp->check = tcp_udp_v4_checksum(); - } - rewriteIpv4Header(ipv4); + auto ethHeader = getEthernetHeader(); + if (!v6) { + ethHeader.h_proto = htons(ETH_P_IP); + + auto ipHeader = getIPv4Header(); + ipHeader.daddr = to.sin4.sin_addr.s_addr; + ipHeader.saddr = from.sin4.sin_addr.s_addr; + + auto udpHeader = getUDPHeader(); + ipHeader.protocol = IPPROTO_UDP; + udpHeader.source = from.sin4.sin_port; + udpHeader.dest = to.sin4.sin_port; + udpHeader.len = htons(getDataSize()); + udpHeader.check = 0; + /* needed to get the correct checksum */ + setIPv4Header(ipHeader); + setUDPHeader(udpHeader); + udpHeader.check = tcp_udp_v4_checksum(&ipHeader); + rewriteIpv4Header(&ipHeader, getFrameLen()); + setIPv4Header(ipHeader); + setUDPHeader(udpHeader); } else { - auto* ipv6 = reinterpret_cast(eth + 1); - memcpy(&ipv6->daddr, &to.sin6.sin6_addr, sizeof(ipv6->daddr)); - memcpy(&ipv6->saddr, &from.sin6.sin6_addr, sizeof(ipv6->saddr)); - if (flags & XskPacket::TCP) { - auto* tcp = reinterpret_cast(ipv6 + 1); - ipv6->nexthdr = IPPROTO_TCP; - tcp->source = from.sin6.sin6_port; - tcp->dest = to.sin6.sin6_port; - // TODO - } - else { - auto* udp = reinterpret_cast(ipv6 + 1); - ipv6->nexthdr = IPPROTO_UDP; - udp->source = from.sin6.sin6_port; - udp->dest = to.sin6.sin6_port; - udp->len = htons(payloadEnd - reinterpret_cast(udp)); - udp->check = 0; - udp->check = tcp_udp_v6_checksum(); - } + ethHeader.h_proto = htons(ETH_P_IPV6); + + auto ipHeader = getIPv6Header(); + memcpy(&ipHeader.daddr, &to.sin6.sin6_addr, sizeof(ipHeader.daddr)); + memcpy(&ipHeader.saddr, &from.sin6.sin6_addr, sizeof(ipHeader.saddr)); + + auto udpHeader = getUDPHeader(); + ipHeader.nexthdr = IPPROTO_UDP; + udpHeader.source = from.sin6.sin6_port; + udpHeader.dest = to.sin6.sin6_port; + udpHeader.len = htons(getDataSize()); + udpHeader.check = 0; + /* needed to get the correct checksum */ + setIPv6Header(ipHeader); + setUDPHeader(udpHeader); + udpHeader.check = tcp_udp_v6_checksum(&ipHeader); + setIPv6Header(ipHeader); + setUDPHeader(udpHeader); } + + setEthernetHeader(ethHeader); } -[[nodiscard]] __be16 XskPacket::ipv4Checksum() const noexcept +[[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ip) noexcept { - auto* ip = reinterpret_cast(frame + sizeof(ethhdr)); - return ip_checksum_fold(ip_checksum_partial(ip, sizeof(iphdr), 0)); + auto partial = ip_checksum_partial(ip, sizeof(iphdr), 0); + return ip_checksum_fold(partial); } -[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum() const noexcept + +[[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ip) const noexcept { - const auto* ip = reinterpret_cast(frame + sizeof(ethhdr)); // ip options is not supported !!! - const auto l4Length = static_cast(payloadEnd - l4Header); + const auto l4Length = static_cast(getDataSize() + sizeof(udphdr)); auto sum = tcp_udp_v4_header_checksum_partial(ip->saddr, ip->daddr, ip->protocol, l4Length); - sum = ip_checksum_partial(l4Header, l4Length, sum); + sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum); return ip_checksum_fold(sum); } -[[nodiscard]] __be16 XskPacket::tcp_udp_v6_checksum() const noexcept + +[[nodiscard]] __be16 XskPacket::tcp_udp_v6_checksum(const struct ipv6hdr* ipv6) const noexcept { - const auto* ipv6 = reinterpret_cast(frame + sizeof(ethhdr)); - const auto l4Length = static_cast(payloadEnd - l4Header); + const auto l4Length = static_cast(getDataSize() + sizeof(udphdr)); uint64_t sum = tcp_udp_v6_header_checksum_partial(&ipv6->saddr, &ipv6->daddr, ipv6->nexthdr, l4Length); - sum = ip_checksum_partial(l4Header, l4Length, sum); + sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum); return ip_checksum_fold(sum); } -#ifndef __packed -#define __packed __attribute__((packed)) -#endif -[[nodiscard]] uint64_t XskPacket::ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept +[[nodiscard]] uint64_t XskPacket::ip_checksum_partial(const void* ptr, const size_t len, uint64_t sum) noexcept { - /* Main loop: 32 bits at a time. - * We take advantage of intel's ability to do unaligned memory - * accesses with minimal additional cost. Other architectures - * probably want to be more careful here. - */ - const uint32_t* p32 = (const uint32_t*)(p); - for (; len >= sizeof(*p32); len -= sizeof(*p32)) - sum += *p32++; + size_t position{0}; + /* Main loop: 32 bits at a time */ + for (position = 0; position < len; position += sizeof(uint32_t)) { + uint32_t value{}; + memcpy(&value, reinterpret_cast(ptr) + position, sizeof(value)); + sum += value; + } /* Handle un-32bit-aligned trailing bytes */ - const uint16_t* p16 = (const uint16_t*)(p32); - if (len >= 2) { - sum += *p16++; - len -= sizeof(*p16); + if ((len - position) >= 2) { + uint16_t value{}; + memcpy(&value, reinterpret_cast(ptr) + position, sizeof(value)); + sum += value; + position += sizeof(value); } - if (len > 0) { - const uint8_t* p8 = (const uint8_t*)(p16); + + if ((len - position) > 0) { + const auto* p8 = static_cast(ptr) + position; sum += ntohs(*p8 << 8); /* RFC says pad last byte */ } return sum; } + [[nodiscard]] __be16 XskPacket::ip_checksum_fold(uint64_t sum) noexcept { - while (sum & ~0xffffffffULL) + while (sum & ~0xffffffffULL) { sum = (sum >> 32) + (sum & 0xffffffffULL); - while (sum & 0xffff0000ULL) + } + while (sum & 0xffff0000ULL) { sum = (sum >> 16) + (sum & 0xffffULL); + } - return ~sum; + return static_cast<__be16>(~sum); } + +#ifndef __packed +#define __packed __attribute__((packed)) +#endif + [[nodiscard]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept { struct header @@ -699,6 +891,7 @@ void XskPacket::rewrite() noexcept pseudo_header.fields.length = htons(len); return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0); } + [[nodiscard]] uint64_t XskPacket::tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept { struct header @@ -730,22 +923,25 @@ void XskPacket::rewrite() noexcept pseudo_header.fields.next_header = protocol; return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0); } + void XskPacket::setHeader(const PacketBuffer& buf) { memcpy(frame, buf.data(), buf.size()); - payloadEnd = frame + buf.size(); + frameLength = buf.size(); flags = 0; - if (!parse()) { + if (!parse(true)) { throw std::runtime_error("Error setting the XSK frame header"); } } + std::unique_ptr XskPacket::cloneHeadertoPacketBuffer() const { - const auto size = payload - frame; + const auto size = getFrameLen() - getDataSize(); auto tmp = std::make_unique(size); memcpy(tmp->data(), frame, size); return tmp; } + int XskWorker::createEventfd() { auto fd = ::eventfd(0, EFD_CLOEXEC); @@ -754,10 +950,12 @@ int XskWorker::createEventfd() } return fd; } + void XskWorker::waitForXskSocket() noexcept { uint64_t x = read(workerWaker, &x, sizeof(x)); } + void XskWorker::notifyXskSocket() noexcept { notify(xskSocketWaker); @@ -767,7 +965,8 @@ std::shared_ptr XskWorker::create() { return std::make_shared(); } -void XskSocket::addWorker(std::shared_ptr s, const ComboAddress& dest, bool isTCP) + +void XskSocket::addWorker(std::shared_ptr s, const ComboAddress& dest) { extern std::atomic g_configurationDone; if (g_configurationDone) { @@ -792,14 +991,17 @@ void XskSocket::addWorker(std::shared_ptr s, const ComboAddress& dest .events = POLLIN, .revents = 0}); }; + uint64_t XskWorker::frameOffset(const XskPacket& s) const noexcept { return s.frame - umemBufBase; } + void XskWorker::notifyWorker() noexcept { notify(workerWaker); } + void XskSocket::getMACFromIfName() { ifreq ifr{}; @@ -819,19 +1021,23 @@ void XskSocket::getMACFromIfName() static_assert(sizeof(ifr.ifr_hwaddr.sa_data) >= std::tuple_size{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected"); memcpy(source.data(), ifr.ifr_hwaddr.sa_data, source.size()); } + [[nodiscard]] int XskSocket::timeDifference(const timespec& t1, const timespec& t2) noexcept { const auto res = t1.tv_sec * 1000 + t1.tv_nsec / 1000000L - (t2.tv_sec * 1000 + t2.tv_nsec / 1000000L); return static_cast(res); } + void XskWorker::cleanWorkerNotification() noexcept { uint64_t x = read(xskSocketWaker, &x, sizeof(x)); } + void XskWorker::cleanSocketNotification() noexcept { uint64_t x = read(workerWaker, &x, sizeof(x)); } + std::vector getPollFdsForWorker(XskWorker& info) { std::vector fds; @@ -851,33 +1057,49 @@ std::vector getPollFdsForWorker(XskWorker& info) }); return fds; } + void XskWorker::fillUniqueEmptyOffset() { auto frames = sharedEmptyFrameOffset->lock(); const auto moveSize = std::min(static_cast(32), frames->size()); if (moveSize > 0) { uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end())); + frames->resize(frames->size() - moveSize); } } -void* XskWorker::getEmptyframe() + +XskPacketPtr XskWorker::getEmptyFrame() { if (!uniqueEmptyFrameOffset.empty()) { auto offset = uniqueEmptyFrameOffset.back(); uniqueEmptyFrameOffset.pop_back(); - return offset + umemBufBase; + return std::make_unique(offset + umemBufBase, 0, frameSize); } fillUniqueEmptyOffset(); if (!uniqueEmptyFrameOffset.empty()) { auto offset = uniqueEmptyFrameOffset.back(); uniqueEmptyFrameOffset.pop_back(); - return offset + umemBufBase; + return std::make_unique(offset + umemBufBase, 0, frameSize); } return nullptr; } + +void XskWorker::markAsFree(XskPacketPtr&& packet) +{ + + auto offset = frameOffset(*packet); +#ifdef DEBUG_UMEM + checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free); +#endif /* DEBUG_UMEM */ + uniqueEmptyFrameOffset.push_back(offset); + packet.release(); +} + uint32_t XskPacket::getFlags() const noexcept { return flags; } + void XskPacket::updatePacket() noexcept { if (!(flags & UPDATE)) { diff --git a/pdns/xsk.hh b/pdns/xsk.hh index 6715571091..551c23074c 100644 --- a/pdns/xsk.hh +++ b/pdns/xsk.hh @@ -30,15 +30,17 @@ #include #include #include +#include #include #include #include -#include -#include +//#include #include #include #include +#include #include +#include #include @@ -59,8 +61,14 @@ using XskPacketPtr = std::unique_ptr; // We use an XskSocket to manage an AF_XDP Socket corresponding to a NIC queue. // The XDP program running in the kernel redirects the data to the XskSocket in userspace. +// We allocate frames that are placed into the descriptors in the fill queue, allowing the kernel to put incoming packets into the frames and place descriptors into the rx queue. +// Once we have read the descriptors from the rx queue we release them, but we own the frames. +// After we are done with the frame, we place them into descriptors of either the fill queue (empty frames) or tx queues (packets to be sent). +// Once the kernel is done, it places descriptors referencing these frames into the cq where we can recycle them (packets destined to the tx queue or empty frame to the fill queue queue). + // XskSocket routes packets to multiple worker threads registered on XskSocket via XskSocket::addWorker based on the destination port number of the packet. // The kernel and the worker thread holding XskWorker will wake up the XskSocket through XskFd and the Eventfd corresponding to each worker thread, respectively. + class XskSocket { struct XskRouteInfo @@ -85,7 +93,9 @@ class XskSocket boost::multi_index::hashed_unique>, boost::multi_index::hashed_unique, ComboAddress::addressPortOnlyHash>>> workers; + // number of frames to keep in sharedEmptyFrameOffset static constexpr size_t holdThreshold = 256; + // number of frames to insert into the fill queue static constexpr size_t fillThreshold = 128; static constexpr size_t frameSize = 2048; // number of entries (frames) in the umem @@ -98,7 +108,10 @@ class XskSocket const std::string poolName; // AF_XDP socket then worker waker sockets vector fds; - // list of (indexes of) umem entries that can be reused + // list of frames, aka (indexes of) umem entries that can be reused to fill fq, + // collected from packets that we could not route (unknown destination), + // could not parse, were dropped during processing (!UPDATE), or + // simply recycled from cq after being processed by the kernel vector uniqueEmptyFrameOffset; // completion ring: queue where sent packets are stored by the kernel xsk_ring_cons cq; @@ -122,119 +135,152 @@ class XskSocket friend void XskRouter(std::shared_ptr xsk); [[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept; - int firstTimeout(); - // pick ups as many available frames as possible from uniqueEmptyFrameOffset - // and put them into sharedEmptyFrameOffset - // then insert them into fq + [[nodiscard]] int firstTimeout(); + // pick ups available frames from uniqueEmptyFrameOffset + // insert entries from uniqueEmptyFrameOffset into fq void fillFq(uint32_t fillSize = fillThreshold) noexcept; - // picks up entries that have been processed (sent) and push them into uniqueEmptyFrameOffset + // picks up entries that have been processed (sent) from cq and push them into uniqueEmptyFrameOffset void recycle(size_t size) noexcept; void getMACFromIfName(); // look at delayed packets, and send the ones that are ready void pickUpReadyPacket(std::vector& packets); public: + static constexpr size_t getFrameSize() + { + return frameSize; + } // list of free umem entries that can be reused std::shared_ptr>> sharedEmptyFrameOffset; XskSocket(size_t frameNum, const std::string& ifName, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_); MACAddr source; [[nodiscard]] int xskFd() const noexcept; // wait until one event has occurred - int wait(int timeout); + [[nodiscard]] int wait(int timeout); // add as many packets as possible to the rx queue for sending */ void send(std::vector& packets); // look at incoming packets in rx, return them if parsing succeeeded - std::vector recv(uint32_t recvSizeMax, uint32_t* failedCount); - void addWorker(std::shared_ptr s, const ComboAddress& dest, bool isTCP); - std::string getMetrics() const; + [[nodiscard]] std::vector recv(uint32_t recvSizeMax, uint32_t* failedCount); + void addWorker(std::shared_ptr s, const ComboAddress& dest); + [[nodiscard]] std::string getMetrics() const; + void markAsFree(XskPacketPtr&& packet); }; + +struct iphdr; +struct ipv6hdr; + class XskPacket { public: enum Flags : uint32_t { - TCP = 1 << 0, - UPDATE = 1 << 1, - DELAY = 1 << 3, - REWRITE = 1 << 4 + UPDATE = 1 << 0, + DELAY = 1 << 1, + REWRITE = 1 << 2 }; private: ComboAddress from; ComboAddress to; timespec sendTime; - uint8_t* frame; - uint8_t* l4Header; - uint8_t* payload; - uint8_t* payloadEnd; - uint8_t* frameEnd; + uint8_t* frame{nullptr}; + size_t frameLength{0}; + size_t frameSize{0}; uint32_t flags{0}; + bool v6{false}; + + // You must set ipHeader.check = 0 before calling this method + [[nodiscard]] static __be16 ipv4Checksum(const struct iphdr*) noexcept; + [[nodiscard]] static uint64_t ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept; + [[nodiscard]] static __be16 ip_checksum_fold(uint64_t sum) noexcept; + [[nodiscard]] static uint64_t tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept; + [[nodiscard]] static uint64_t tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept; + static void rewriteIpv4Header(struct iphdr* ipv4header, size_t frameLen) noexcept; + static void rewriteIpv6Header(struct ipv6hdr* ipv6header, size_t frameLen) noexcept; + + // You must set l4Header.check = 0 before calling this method + // ip options is not supported + [[nodiscard]] __be16 tcp_udp_v4_checksum(const struct iphdr*) const noexcept; + // You must set l4Header.check = 0 before calling this method + [[nodiscard]] __be16 tcp_udp_v6_checksum(const struct ipv6hdr*) const noexcept; + /* offset of the L4 (udphdr) header (after ethhdr and iphdr/ipv6hdr) */ + [[nodiscard]] size_t getL4HeaderOffset() const noexcept; + /* offset of the data after the UDP header */ + [[nodiscard]] size_t getDataOffset() const noexcept; + [[nodiscard]] size_t getDataSize() const noexcept; + [[nodiscard]] ethhdr getEthernetHeader() const noexcept; + void setEthernetHeader(const ethhdr& ethHeader) noexcept; + [[nodiscard]] iphdr getIPv4Header() const noexcept; + void setIPv4Header(const iphdr& ipv4Header) noexcept; + [[nodiscard]] ipv6hdr getIPv6Header() const noexcept; + void setIPv6Header(const ipv6hdr& ipv6Header) noexcept; + [[nodiscard]] udphdr getUDPHeader() const noexcept; + void setUDPHeader(const udphdr& udpHeader) noexcept; + // parse IP and UDP payloads + bool parse(bool fromSetHeader); + void changeDirectAndUpdateChecksum() noexcept; friend XskSocket; friend XskWorker; friend bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept; constexpr static uint8_t DefaultTTL = 64; - // parse IP and UDP/TCP payloads - bool parse(); - void changeDirectAndUpdateChecksum() noexcept; - - // You must set ipHeader.check = 0 before call this method - [[nodiscard]] __be16 ipv4Checksum() const noexcept; - // You must set l4Header.check = 0 before call this method - // ip options is not supported - [[nodiscard]] __be16 tcp_udp_v4_checksum() const noexcept; - // You must set l4Header.check = 0 before call this method - [[nodiscard]] __be16 tcp_udp_v6_checksum() const noexcept; - [[nodiscard]] static uint64_t ip_checksum_partial(const void* p, size_t len, uint64_t sum) noexcept; - [[nodiscard]] static __be16 ip_checksum_fold(uint64_t sum) noexcept; - [[nodiscard]] static uint64_t tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept; - [[nodiscard]] static uint64_t tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept; - void rewriteIpv4Header(void* ipv4header) noexcept; - void rewriteIpv6Header(void* ipv6header) noexcept; public: [[nodiscard]] const ComboAddress& getFromAddr() const noexcept; [[nodiscard]] const ComboAddress& getToAddr() const noexcept; - [[nodiscard]] const void* payloadData() const; + [[nodiscard]] const void* getPayloadData() const; [[nodiscard]] bool isIPV6() const noexcept; - [[nodiscard]] size_t capacity() const noexcept; - [[nodiscard]] uint32_t dataLen() const noexcept; - [[nodiscard]] uint32_t FrameLen() const noexcept; + [[nodiscard]] size_t getCapacity() const noexcept; + [[nodiscard]] uint32_t getDataLen() const noexcept; + [[nodiscard]] uint32_t getFrameLen() const noexcept; [[nodiscard]] PacketBuffer clonePacketBuffer() const; void cloneIntoPacketBuffer(PacketBuffer& buffer) const; [[nodiscard]] std::unique_ptr cloneHeadertoPacketBuffer() const; - [[nodiscard]] void* payloadData(); - void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp = false) noexcept; + void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept; bool setPayload(const PacketBuffer& buf); void rewrite() noexcept; void setHeader(const PacketBuffer& buf); - XskPacket() = default; - XskPacket(void* frame, size_t dataSize, size_t frameSize); + XskPacket(uint8_t* frame, size_t dataSize, size_t frameSize); void addDelay(int relativeMilliseconds) noexcept; void updatePacket() noexcept; [[nodiscard]] uint32_t getFlags() const noexcept; }; bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept; +/* g++ defines __SANITIZE_THREAD__ + clang++ supports the nice __has_feature(thread_sanitizer), + let's merge them */ +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#define __SANITIZE_THREAD__ 1 +#endif +#endif + // XskWorker obtains XskPackets of specific ports in the NIC from XskSocket through cq. // After finishing processing the packet, XskWorker puts the packet into sq so that XskSocket decides whether to send it through the network card according to XskPacket::flags. // XskWorker wakes up XskSocket via xskSocketWaker after putting the packets in sq. class XskWorker { - using XskPacketRing = boost::lockfree::spsc_queue>; +#if defined(__SANITIZE_THREAD__) + using XskPacketRing = LockGuarded>>; +#else + using XskPacketRing = boost::lockfree::spsc_queue>; +#endif public: // queue of packets to be processed by this worker - XskPacketRing cq; + XskPacketRing incomingPacketsQueue; // queue of packets processed by this worker (to be sent, or discarded) - XskPacketRing sq; + XskPacketRing outgoingPacketsQueue; uint8_t* umemBufBase; + // list of frames that are shared with the XskRouter std::shared_ptr>> sharedEmptyFrameOffset; + // list of frames that we own, used to generate new packets (health-check) vector uniqueEmptyFrameOffset; std::string poolName; - size_t frameSize; + const size_t frameSize{XskSocket::getFrameSize()}; FDWrapper workerWaker; FDWrapper xskSocketWaker; @@ -244,6 +290,7 @@ public: static std::shared_ptr create(); void pushToProcessingQueue(XskPacketPtr&& packet); void pushToSendQueue(XskPacketPtr&& packet); + void markAsFree(XskPacketPtr&& packet); // notify worker that at least one packet is available for processing void notifyWorker() noexcept; // notify the router that packets are ready to be sent @@ -252,11 +299,11 @@ public: void cleanWorkerNotification() noexcept; void cleanSocketNotification() noexcept; [[nodiscard]] uint64_t frameOffset(const XskPacket& s) const noexcept; - // reap empty umeme entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset + // reap empty umem entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset void fillUniqueEmptyOffset(); // look for an empty umem entry in uniqueEmptyFrameOffset // then sharedEmptyFrameOffset if needed - void* getEmptyframe(); + XskPacketPtr getEmptyFrame(); }; std::vector getPollFdsForWorker(XskWorker& info); #else