* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
+
+#include "config.h"
+
+#ifdef HAVE_XSK
+
#include "gettime.hh"
#include "xsk.hh"
#include <unistd.h>
#include <vector>
-#ifdef HAVE_XSK
#include <bpf/bpf.h>
#include <bpf/libbpf.h>
extern "C"
{
uint32_t idx;
std::vector<XskPacketPtr> res;
+ // how many descriptors to packets have been filled
const auto recvSize = xsk_ring_cons__peek(&rx, recvSizeMax, &idx);
if (recvSize <= 0) {
return res;
}
+
const auto baseAddr = reinterpret_cast<uint64_t>(umem.bufBase);
uint32_t count = 0;
for (uint32_t i = 0; i < recvSize; i++) {
res.push_back(std::move(ptr));
}
}
+
+ // this releases the descriptor, but not the packet (umem entries)
+ // which will only be made available again when pushed into the fill
+ // queue
xsk_ring_cons__release(&rx, recvSize);
if (failedCount) {
*failedCount = count;
}
+
return res;
}
void XskSocket::pickUpReadyPacket(std::vector<XskPacketPtr>& packets)
auto fd = ::socket(AF_INET, SOCK_DGRAM, 0);
strncpy(ifr.ifr_name, ifName.c_str(), ifName.length() + 1);
if (ioctl(fd, SIOCGIFHWADDR, &ifr) < 0) {
- throw runtime_error("Error get MAC addr");
+ throw runtime_error("Error getting MAC addr");
}
memcpy(source, ifr.ifr_hwaddr.sa_data, sizeof(source));
close(fd);
*/
#pragma once
-#include "iputils.hh"
-#include "misc.hh"
-#include "noinitvector.hh"
-#include "lock.hh"
+#ifdef HAVE_XSK
#include <array>
#include <bits/types/struct_timespec.h>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/multi_index/hashed_index.hpp>
-#include <boost/multi_index/indexed_by.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
-#include <cstdint>
-#include <cstring>
-#include <linux/types.h>
#include <memory>
#include <queue>
#include <stdexcept>
#include <unistd.h>
#include <unordered_map>
#include <vector>
+#include <linux/types.h>
-#ifdef HAVE_XSK
#include <xdp/xsk.h>
+
+#include "iputils.hh"
+#include "lock.hh"
+#include "misc.hh"
+#include "noinitvector.hh"
#endif /* HAVE_XSK */
class XskPacket;
{
xsk_umem* umem{nullptr};
uint8_t* bufBase{nullptr};
- size_t size;
+ size_t size{0};
void umemInit(size_t memSize, xsk_ring_cons* cq, xsk_ring_prod* fq, xsk_umem_config* config);
~XskUmem();
XskUmem() = default;
static constexpr size_t holdThreshold = 256;
static constexpr size_t fillThreshold = 128;
static constexpr size_t frameSize = 2048;
+ // number of entries (frames) in the umem
const size_t frameNum;
+ // ID of the network queue
const uint32_t queueId;
+ // responses that have been delayed
std::priority_queue<XskPacketPtr> waitForDelay;
const std::string ifName;
const std::string poolName;
+ // AF_XDP socket then worker waker sockets
vector<pollfd> fds;
+ // list of (indexes of) umem entries that can be reused
vector<uint64_t> uniqueEmptyFrameOffset;
+ // completion ring: queue where sent packets are stored by the kernel
xsk_ring_cons cq;
+ // rx ring: queue where the incoming packets are stored, read by XskRouter
xsk_ring_cons rx;
+ // fill ring: queue where umem entries available to be filled (put into rx) are stored
xsk_ring_prod fq;
+ // tx ring: queue where outgoing packets are stored
xsk_ring_prod tx;
std::unique_ptr<xsk_socket, void (*)(xsk_socket*)> socket;
XskUmem umem;
[[nodiscard]] uint64_t frameOffset(const XskPacket& packet) const noexcept;
int firstTimeout();
+ // pick ups as many available frames as possible from uniqueEmptyFrameOffset
+ // and put them into sharedEmptyFrameOffset
+ // then insert them into fq
void fillFq(uint32_t fillSize = fillThreshold) noexcept;
+ // picks up entries that have been processed (sent) and push them into uniqueEmptyFrameOffset
void recycle(size_t size) noexcept;
void getMACFromIfName();
+ // look at delayed packets, and send the ones that are ready
void pickUpReadyPacket(std::vector<XskPacketPtr>& packets);
public:
+ // list of free umem entries that can be reused
std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
XskSocket(size_t frameNum, const std::string& ifName, uint32_t queue_id, const std::string& xskMapPath, const std::string& poolName_);
MACAddr source;
[[nodiscard]] int xskFd() const noexcept;
+ // wait until one event has occurred
int wait(int timeout);
+ // add as many packets as possible to the rx queue for sending */
void send(std::vector<XskPacketPtr>& packets);
+ // look at incoming packets in rx, return them if parsing succeeeded
std::vector<XskPacketPtr> recv(uint32_t recvSizeMax, uint32_t* failedCount);
void addWorker(std::shared_ptr<XskWorker> s, const ComboAddress& dest, bool isTCP);
};
friend bool operator<(const XskPacketPtr& s1, const XskPacketPtr& s2) noexcept;
constexpr static uint8_t DefaultTTL = 64;
+ // parse IP and UDP/TCP payloads
bool parse();
void changeDirectAndUpdateChecksum() noexcept;
uint8_t* umemBufBase;
std::shared_ptr<LockGuarded<vector<uint64_t>>> sharedEmptyFrameOffset;
vector<uint64_t> uniqueEmptyFrameOffset;
+ // queue of packets to be processed by this worker
XskPacketRing cq;
+ // queue of packets processed by this worker (to be sent, or discarded)
XskPacketRing sq;
std::string poolName;
size_t frameSize;
static int createEventfd();
static void notify(int fd);
static std::shared_ptr<XskWorker> create();
+ // notify worker that at least one packet is available for processing
void notifyWorker() noexcept;
+ // notify the router that packets are ready to be sent
void notifyXskSocket() noexcept;
void waitForXskSocket() noexcept;
void cleanWorkerNotification() noexcept;
void cleanSocketNotification() noexcept;
[[nodiscard]] uint64_t frameOffset(const XskPacket& s) const noexcept;
+ // reap empty umeme entry from sharedEmptyFrameOffset into uniqueEmptyFrameOffset
void fillUniqueEmptyOffset();
+ // look for an empty umem entry in uniqueEmptyFrameOffset
+ // then sharedEmptyFrameOffset if needed
void* getEmptyframe();
};
std::vector<pollfd> getPollFdsForWorker(XskWorker& info);