data->d_initial = initial;
setHealthCheckTime(dss, data);
auto* frame = xskInfo->getEmptyframe();
- auto *xskPacket = new XskPacket(frame, 0, xskInfo->frameSize);
+ auto xskPacket = std::make_unique<XskPacket>(frame, 0, xskInfo->frameSize);
xskPacket->setAddr(dss->d_config.sourceAddr, dss->d_config.sourceMACAddr, dss->d_config.remote, dss->d_config.destMACAddr);
xskPacket->setPayload(packet);
xskPacket->rewrite();
- xskInfo->sq.push(xskPacket);
+ xskInfo->pushToSendQueue(std::move(xskPacket));
const auto queryId = data->d_queryID;
map[queryId] = std::move(data);
}
bool needNotify = false;
if (pollfds[0].revents & POLLIN) {
needNotify = true;
- xskInfo->cq.consume_all([&](XskPacket* packet) {
+ xskInfo->cq.consume_all([&](XskPacket* packetRaw) {
+ auto packet = XskPacketPtr(packetRaw);
if (packet->dataLen() < sizeof(dnsheader)) {
- xskInfo->sq.push(packet);
+ xskInfo->pushToSendQueue(std::move(packet));
return;
}
const auto* dh = reinterpret_cast<const struct dnsheader*>(packet->payloadData());
packet->cloneIntoPacketBuffer(data->d_buffer);
data->d_ds->submitHealthCheckResult(data->d_initial, handleResponse(data));
}
- xskInfo->sq.push(packet);
+ xskInfo->pushToSendQueue(std::move(packet));
return;
}
auto response = packet->clonePacketBuffer();
ids->xskPacketHeader.reset();
}
if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
- xskInfo->sq.push(packet);
+ xskInfo->pushToSendQueue(std::move(packet));
return;
}
if (response.size() > packet->capacity()) {
/* fallback to sending the packet via normal socket */
sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
- xskInfo->sq.push(packet);
+ xskInfo->pushToSendQueue(std::move(packet));
return;
}
packet->setHeader(*ids->xskPacketHeader);
packet->addDelay(ids->delayMsec);
}
packet->updatePacket();
- xskInfo->sq.push(packet);
+ xskInfo->pushToSendQueue(std::move(packet));
});
xskInfo->cleanSocketNotification();
}
xskPacket->setHeader(*ids->xskPacketHeader);
xskPacket->setPayload(response);
xskPacket->updatePacket();
- xskInfo->sq.push(xskPacket.release());
+ xskInfo->pushToSendQueue(std::move(xskPacket));
xskInfo->notifyXskSocket();
#endif /* HAVE_XSK */
}
while (!xskInfo->cq.read_available()) {
xskInfo->waitForXskSocket();
}
- xskInfo->cq.consume_all([&](XskPacket* packet) {
+ xskInfo->cq.consume_all([&](XskPacket* packetRaw) {
+ auto packet = XskPacketPtr(packetRaw);
ProcessXskQuery(*cs, holders, *packet);
packet->updatePacket();
- xskInfo->sq.push(packet);
+ xskInfo->pushToSendQueue(std::move(packet));
});
xskInfo->notifyXskSocket();
}
xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet));
continue;
}
- res->worker->cq.push(packet.release());
+ res->worker->pushToProcessingQueue(std::move(packet));
needNotify.insert(res->workerWaker);
}
for (auto i : needNotify) {
if (xsk->fds[i].revents & POLLIN) {
ready--;
auto& info = xskWakerIdx.find(xsk->fds[i].fd)->worker;
- info->sq.consume_all([&](XskPacket* x) {
- if (!(x->getFlags() & XskPacket::UPDATE)) {
- xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*x));
+ info->sq.consume_all([&](XskPacket* packetRaw) {
+ auto packet = XskPacketPtr(packetRaw);
+ if (!(packet->getFlags() & XskPacket::UPDATE)) {
+ xsk->uniqueEmptyFrameOffset.push_back(xsk->frameOffset(*packet));
+ packet.release();
return;
}
- auto ptr = std::unique_ptr<XskPacket>(x);
- if (x->getFlags() & XskPacket::DELAY) {
- xsk->waitForDelay.push(std::move(ptr));
+ if (packet->getFlags() & XskPacket::DELAY) {
+ xsk->waitForDelay.push(std::move(packet));
return;
}
- fillInTx.push_back(std::move(ptr));
+ fillInTx.push_back(std::move(packet));
});
info->cleanWorkerNotification();
}
#include "xsk.hh"
#include <algorithm>
-#include <cassert>
#include <cstdint>
#include <cstring>
#include <fcntl.h>
{
return value != 0 && (value & (value - 1)) == 0;
}
+
int XskSocket::firstTimeout()
{
if (waitForDelay.empty()) {
xsk_ring_cons__release(&cq, completeSize);
}
-void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config)
+void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config)
{
size = memSize;
bufBase = static_cast<uint8_t*>(mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
if (bufBase == MAP_FAILED) {
throw std::runtime_error("mmap failed");
}
- auto ret = xsk_umem__create(&umem, bufBase, size, fq, cq, config);
+ auto ret = xsk_umem__create(&umem, bufBase, size, fillQueue, completionQueue, config);
if (ret != 0) {
munmap(bufBase, size);
throw std::runtime_error("Error creating a umem of size" + std::to_string(size) + stringerror(ret));
if (l4Protocol == IPPROTO_UDP) {
// check udp.check == ipv4Checksum() is not needed!
// We check it in BPF program
- auto* udp = reinterpret_cast<udphdr*>(l4Header);
+ const auto* udp = reinterpret_cast<const udphdr*>(l4Header);
payload = l4Header + sizeof(udphdr);
// Because of XskPacket::setHeader
// payload = payloadEnd should be allow
if (l4Protocol == IPPROTO_TCP) {
// check tcp.check == ipv4Checksum() is not needed!
// We check it in BPF program
- auto* tcp = reinterpret_cast<tcphdr*>(l4Header);
+ const auto* tcp = reinterpret_cast<const tcphdr*>(l4Header);
if (tcp->doff != static_cast<uint32_t>(sizeof(tcphdr) >> 2)) {
// tcp is not supported now!
return false;
workerWaker(createEventfd()), xskSocketWaker(createEventfd())
{
}
+
+void XskWorker::pushToProcessingQueue(XskPacketPtr&& packet)
+{
+ auto raw = packet.release();
+ if (!cq.push(raw)) {
+ delete raw;
+ }
+}
+
+void XskWorker::pushToSendQueue(XskPacketPtr&& packet)
+{
+ auto raw = packet.release();
+ if (!sq.push(raw)) {
+ delete raw;
+ }
+}
+
void* XskPacket::payloadData()
{
return reinterpret_cast<void*>(payload);
};
};
struct ipv4_pseudo_header_t pseudo_header;
- assert(sizeof(pseudo_header) == 12);
+ static_assert(sizeof(pseudo_header) == 12, "IPv4 pseudo-header size is incorrect");
/* Fill in the pseudo-header. */
pseudo_header.fields.src_ip = src_ip;
};
};
struct ipv6_pseudo_header_t pseudo_header;
- assert(sizeof(pseudo_header) == 40);
+ static_assert(sizeof(pseudo_header) == 40, "IPv6 pseudo-header size is incorrect");
/* Fill in the pseudo-header. */
pseudo_header.fields.src_ip = *src_ip;
pseudo_header.fields.next_header = protocol;
return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
}
-void XskPacket::setHeader(const PacketBuffer& buf) noexcept
+void XskPacket::setHeader(const PacketBuffer& buf)
{
memcpy(frame, buf.data(), buf.size());
payloadEnd = frame + buf.size();
flags = 0;
- const auto res = parse();
- assert(res);
+ if (!parse()) {
+ throw std::runtime_error("Error setting the XSK frame header");
+ }
}
std::unique_ptr<PacketBuffer> XskPacket::cloneHeadertoPacketBuffer() const
{
void setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC, bool tcp = false) noexcept;
bool setPayload(const PacketBuffer& buf);
void rewrite() noexcept;
- void setHeader(const PacketBuffer& buf) noexcept;
+ void setHeader(const PacketBuffer& buf);
XskPacket() = default;
XskPacket(void* frame, size_t dataSize, size_t frameSize);
void addDelay(int relativeMilliseconds) noexcept;
using XskPacketRing = boost::lockfree::spsc_queue<XskPacket*, boost::lockfree::capacity<512>>;
public:
- uint8_t* umemBufBase;
- std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
- vector<uint64_t> uniqueEmptyFrameOffset;
// queue of packets to be processed by this worker
XskPacketRing cq;
// queue of packets processed by this worker (to be sent, or discarded)
XskPacketRing sq;
+
+ uint8_t* umemBufBase;
+ std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
+ vector<uint64_t> uniqueEmptyFrameOffset;
std::string poolName;
size_t frameSize;
FDWrapper workerWaker;
static int createEventfd();
static void notify(int fd);
static std::shared_ptr<XskWorker> create();
+ void pushToProcessingQueue(XskPacketPtr&& packet);
+ void pushToSendQueue(XskPacketPtr&& packet);
// notify worker that at least one packet is available for processing
void notifyWorker() noexcept;
// notify the router that packets are ready to be sent