]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/xsk.cc
Merge pull request #14020 from omoerbeek/rec-compiling-rust-dcos
[thirdparty/pdns.git] / pdns / xsk.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "config.h"
24
25 #ifdef HAVE_XSK
26
27 #include <algorithm>
28 #include <cstdint>
29 #include <cstring>
30 #include <fcntl.h>
31 #include <iterator>
32 #include <linux/bpf.h>
33 #include <linux/if_ether.h>
34 #include <linux/if_link.h>
35 #include <linux/if_xdp.h>
36 #include <linux/ip.h>
37 #include <linux/ipv6.h>
38 #include <linux/tcp.h>
39 #include <linux/types.h>
40 #include <linux/udp.h>
41 #include <net/if.h>
42 #include <net/if_arp.h>
43 #include <netinet/in.h>
44 #include <poll.h>
45 #include <stdexcept>
46 #include <sys/eventfd.h>
47 #include <sys/ioctl.h>
48 #include <sys/mman.h>
49 #include <sys/socket.h>
50 #include <sys/timerfd.h>
51 #include <unistd.h>
52 #include <vector>
53
54 #include <bpf/bpf.h>
55 #include <bpf/libbpf.h>
56 extern "C"
57 {
58 #include <xdp/libxdp.h>
59 }
60
61 #include "gettime.hh"
62 #include "xsk.hh"
63
64 #ifdef DEBUG_UMEM
65 namespace
66 {
67 struct UmemEntryStatus
68 {
69 enum class Status : uint8_t
70 {
71 Free,
72 FillQueue,
73 Received,
74 TXQueue
75 };
76 Status status{Status::Free};
77 };
78
79 LockGuarded<std::unordered_map<uint64_t, UmemEntryStatus>> s_umems;
80
81 void checkUmemIntegrity(const char* function, int line, uint64_t offset, const std::set<UmemEntryStatus::Status>& validStatuses, UmemEntryStatus::Status newStatus)
82 {
83 auto umems = s_umems.lock();
84 if (validStatuses.count(umems->at(offset).status) == 0) {
85 std::cerr << "UMEM integrity check failed at " << function << ": " << line << ": status is " << static_cast<int>(umems->at(offset).status) << ", expected: ";
86 for (const auto status : validStatuses) {
87 std::cerr << static_cast<int>(status) << " ";
88 }
89 std::cerr << std::endl;
90 abort();
91 }
92 (*umems)[offset].status = newStatus;
93 }
94 }
95 #endif /* DEBUG_UMEM */
96
97 constexpr bool XskSocket::isPowOfTwo(uint32_t value) noexcept
98 {
99 return value != 0 && (value & (value - 1)) == 0;
100 }
101
102 int XskSocket::firstTimeout()
103 {
104 if (waitForDelay.empty()) {
105 return -1;
106 }
107 timespec now{};
108 gettime(&now);
109 const auto& firstTime = waitForDelay.top().getSendTime();
110 const auto res = timeDifference(now, firstTime);
111 if (res <= 0) {
112 return 0;
113 }
114 return res;
115 }
116
117 XskSocket::XskSocket(size_t frameNum_, std::string ifName_, uint32_t queue_id, const std::string& xskMapPath) :
118 frameNum(frameNum_), ifName(std::move(ifName_)), socket(nullptr, xsk_socket__delete), sharedEmptyFrameOffset(std::make_shared<LockGuarded<vector<uint64_t>>>())
119 {
120 if (!isPowOfTwo(frameNum_) || !isPowOfTwo(frameSize)
121 || !isPowOfTwo(fqCapacity) || !isPowOfTwo(cqCapacity) || !isPowOfTwo(rxCapacity) || !isPowOfTwo(txCapacity)) {
122 throw std::runtime_error("The number of frame , the size of frame and the capacity of rings must is a pow of 2");
123 }
124 getMACFromIfName();
125
126 memset(&cq, 0, sizeof(cq));
127 memset(&fq, 0, sizeof(fq));
128 memset(&tx, 0, sizeof(tx));
129 memset(&rx, 0, sizeof(rx));
130
131 xsk_umem_config umemCfg{};
132 umemCfg.fill_size = fqCapacity;
133 umemCfg.comp_size = cqCapacity;
134 umemCfg.frame_size = frameSize;
135 umemCfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
136 umemCfg.flags = 0;
137 umem.umemInit(frameNum_ * frameSize, &cq, &fq, &umemCfg);
138
139 {
140 xsk_socket_config socketCfg{};
141 socketCfg.rx_size = rxCapacity;
142 socketCfg.tx_size = txCapacity;
143 socketCfg.bind_flags = XDP_USE_NEED_WAKEUP;
144 socketCfg.xdp_flags = XDP_FLAGS_SKB_MODE;
145 socketCfg.libxdp_flags = XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD;
146 xsk_socket* tmp = nullptr;
147 auto ret = xsk_socket__create(&tmp, ifName.c_str(), queue_id, umem.umem, &rx, &tx, &socketCfg);
148 if (ret != 0) {
149 throw std::runtime_error("Error creating a xsk socket of if_name " + ifName + ": " + stringerror(ret));
150 }
151 socket = std::unique_ptr<xsk_socket, decltype(&xsk_socket__delete)>(tmp, xsk_socket__delete);
152 }
153
154 uniqueEmptyFrameOffset.reserve(frameNum);
155 {
156 for (uint64_t idx = 0; idx < frameNum; idx++) {
157 uniqueEmptyFrameOffset.push_back(idx * frameSize + XDP_PACKET_HEADROOM);
158 #ifdef DEBUG_UMEM
159 {
160 auto umems = s_umems.lock();
161 (*umems)[idx * frameSize + XDP_PACKET_HEADROOM] = UmemEntryStatus();
162 }
163 #endif /* DEBUG_UMEM */
164 }
165 }
166
167 fillFq(fqCapacity);
168
169 const auto xskfd = xskFd();
170 fds.push_back(pollfd{
171 .fd = xskfd,
172 .events = POLLIN,
173 .revents = 0});
174
175 const auto xskMapFd = FDWrapper(bpf_obj_get(xskMapPath.c_str()));
176
177 if (xskMapFd.getHandle() < 0) {
178 throw std::runtime_error("Error getting BPF map from path '" + xskMapPath + "'");
179 }
180
181 auto ret = bpf_map_update_elem(xskMapFd.getHandle(), &queue_id, &xskfd, 0);
182 if (ret != 0) {
183 throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath + "': " + std::to_string(ret));
184 }
185 }
186
187 // see xdp.h in contrib/
188 struct IPv4AndPort
189 {
190 uint32_t addr;
191 uint16_t port;
192 };
193 struct IPv6AndPort
194 {
195 struct in6_addr addr;
196 uint16_t port;
197 };
198
199 static FDWrapper getDestinationMap(const std::string& mapPath)
200 {
201 auto destMapFd = FDWrapper(bpf_obj_get(mapPath.c_str()));
202 if (destMapFd.getHandle() < 0) {
203 throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath + "'");
204 }
205 return destMapFd;
206 }
207
208 void XskSocket::clearDestinationMap(const std::string& mapPath, bool isV6)
209 {
210 auto destMapFd = getDestinationMap(mapPath);
211 if (!isV6) {
212 IPv4AndPort prevKey{};
213 IPv4AndPort key{};
214 while (bpf_map_get_next_key(destMapFd.getHandle(), &prevKey, &key) == 0) {
215 bpf_map_delete_elem(destMapFd.getHandle(), &key);
216 prevKey = key;
217 }
218 }
219 else {
220 IPv6AndPort prevKey{};
221 IPv6AndPort key{};
222 while (bpf_map_get_next_key(destMapFd.getHandle(), &prevKey, &key) == 0) {
223 bpf_map_delete_elem(destMapFd.getHandle(), &key);
224 prevKey = key;
225 }
226 }
227 }
228
229 void XskSocket::addDestinationAddress(const std::string& mapPath, const ComboAddress& destination)
230 {
231 auto destMapFd = getDestinationMap(mapPath);
232 bool value = true;
233 if (destination.isIPv4()) {
234 IPv4AndPort key{};
235 key.addr = destination.sin4.sin_addr.s_addr;
236 key.port = destination.sin4.sin_port;
237 auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0);
238 if (ret != 0) {
239 throw std::runtime_error("Error inserting into xsk_map '" + mapPath + "': " + std::to_string(ret));
240 }
241 }
242 else {
243 IPv6AndPort key{};
244 key.addr = destination.sin6.sin6_addr;
245 key.port = destination.sin6.sin6_port;
246 auto ret = bpf_map_update_elem(destMapFd.getHandle(), &key, &value, 0);
247 if (ret != 0) {
248 throw std::runtime_error("Error inserting into XSK destination addresses map '" + mapPath + "': " + std::to_string(ret));
249 }
250 }
251 }
252
253 void XskSocket::removeDestinationAddress(const std::string& mapPath, const ComboAddress& destination)
254 {
255 auto destMapFd = getDestinationMap(mapPath);
256 if (destination.isIPv4()) {
257 IPv4AndPort key{};
258 key.addr = destination.sin4.sin_addr.s_addr;
259 key.port = destination.sin4.sin_port;
260 bpf_map_delete_elem(destMapFd.getHandle(), &key);
261 }
262 else {
263 IPv6AndPort key{};
264 key.addr = destination.sin6.sin6_addr;
265 key.port = destination.sin6.sin6_port;
266 bpf_map_delete_elem(destMapFd.getHandle(), &key);
267 }
268 }
269
270 void XskSocket::fillFq(uint32_t fillSize) noexcept
271 {
272 {
273 // if we have less than holdThreshold frames in the shared queue (which might be an issue
274 // when the XskWorker needs empty frames), move frames from the unique container into the
275 // shared one. This might not be optimal right now.
276 auto frames = sharedEmptyFrameOffset->lock();
277 if (frames->size() < holdThreshold) {
278 const auto moveSize = std::min(holdThreshold - frames->size(), uniqueEmptyFrameOffset.size());
279 if (moveSize > 0) {
280 // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
281 frames->insert(frames->end(), std::make_move_iterator(uniqueEmptyFrameOffset.end() - moveSize), std::make_move_iterator(uniqueEmptyFrameOffset.end()));
282 uniqueEmptyFrameOffset.resize(uniqueEmptyFrameOffset.size() - moveSize);
283 }
284 }
285 }
286
287 if (uniqueEmptyFrameOffset.size() < fillSize) {
288 return;
289 }
290
291 uint32_t idx{0};
292 auto toFill = xsk_ring_prod__reserve(&fq, fillSize, &idx);
293 if (toFill == 0) {
294 return;
295 }
296 uint32_t processed = 0;
297 for (; processed < toFill; processed++) {
298 *xsk_ring_prod__fill_addr(&fq, idx++) = uniqueEmptyFrameOffset.back();
299 #ifdef DEBUG_UMEM
300 checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Free}, UmemEntryStatus::Status::FillQueue);
301 #endif /* DEBUG_UMEM */
302 uniqueEmptyFrameOffset.pop_back();
303 }
304
305 xsk_ring_prod__submit(&fq, processed);
306 }
307
308 int XskSocket::wait(int timeout)
309 {
310 auto waitAtMost = std::min(timeout, firstTimeout());
311 return poll(fds.data(), fds.size(), waitAtMost);
312 }
313
314 [[nodiscard]] uint64_t XskSocket::frameOffset(const XskPacket& packet) const noexcept
315 {
316 return packet.getFrameOffsetFrom(umem.bufBase);
317 }
318
319 [[nodiscard]] int XskSocket::xskFd() const noexcept
320 {
321 return xsk_socket__fd(socket.get());
322 }
323
324 void XskSocket::send(std::vector<XskPacket>& packets)
325 {
326 while (!packets.empty()) {
327 auto packetSize = packets.size();
328 if (packetSize > std::numeric_limits<uint32_t>::max()) {
329 packetSize = std::numeric_limits<uint32_t>::max();
330 }
331 size_t toSend = std::min(static_cast<uint32_t>(packetSize), txCapacity);
332 uint32_t idx{0};
333 auto toFill = xsk_ring_prod__reserve(&tx, toSend, &idx);
334 if (toFill == 0) {
335 return;
336 }
337
338 size_t queued = 0;
339 for (const auto& packet : packets) {
340 if (queued == toFill) {
341 break;
342 }
343 *xsk_ring_prod__tx_desc(&tx, idx++) = {
344 .addr = frameOffset(packet),
345 .len = packet.getFrameLen(),
346 .options = 0};
347 #ifdef DEBUG_UMEM
348 checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::Received}, UmemEntryStatus::Status::TXQueue);
349 #endif /* DEBUG_UMEM */
350 queued++;
351 }
352 xsk_ring_prod__submit(&tx, toFill);
353 packets.erase(packets.begin(), packets.begin() + toFill);
354 }
355 }
356
357 std::vector<XskPacket> XskSocket::recv(uint32_t recvSizeMax, uint32_t* failedCount)
358 {
359 uint32_t idx{0};
360 std::vector<XskPacket> res;
361 // how many descriptors to packets have been filled
362 const auto recvSize = xsk_ring_cons__peek(&rx, recvSizeMax, &idx);
363 if (recvSize == 0) {
364 return res;
365 }
366
367 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
368 const auto baseAddr = reinterpret_cast<uint64_t>(umem.bufBase);
369 uint32_t failed = 0;
370 uint32_t processed = 0;
371 res.reserve(recvSize);
372 for (; processed < recvSize; processed++) {
373 try {
374 const auto* desc = xsk_ring_cons__rx_desc(&rx, idx++);
375 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,performance-no-int-to-ptr)
376 XskPacket packet = XskPacket(reinterpret_cast<uint8_t*>(desc->addr + baseAddr), desc->len, frameSize);
377 #ifdef DEBUG_UMEM
378 checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, frameOffset(packet), {UmemEntryStatus::Status::Free, UmemEntryStatus::Status::FillQueue}, UmemEntryStatus::Status::Received);
379 #endif /* DEBUG_UMEM */
380
381 if (!packet.parse(false)) {
382 ++failed;
383 markAsFree(packet);
384 }
385 else {
386 res.push_back(packet);
387 }
388 }
389 catch (const std::exception& exp) {
390 std::cerr << "Exception while processing the XSK RX queue: " << exp.what() << std::endl;
391 break;
392 }
393 catch (...) {
394 std::cerr << "Exception while processing the XSK RX queue" << std::endl;
395 break;
396 }
397 }
398
399 // this releases the descriptor, but not the packet (umem entries)
400 // which will only be made available again when pushed into the fill
401 // queue
402 xsk_ring_cons__release(&rx, processed);
403 if (failedCount != nullptr) {
404 *failedCount = failed;
405 }
406
407 return res;
408 }
409
410 void XskSocket::pickUpReadyPacket(std::vector<XskPacket>& packets)
411 {
412 timespec now{};
413 gettime(&now);
414 while (!waitForDelay.empty() && timeDifference(now, waitForDelay.top().getSendTime()) <= 0) {
415 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
416 auto& top = const_cast<XskPacket&>(waitForDelay.top());
417 packets.push_back(top);
418 waitForDelay.pop();
419 }
420 }
421
422 void XskSocket::recycle(size_t size) noexcept
423 {
424 uint32_t idx{0};
425 const auto completeSize = xsk_ring_cons__peek(&cq, size, &idx);
426 if (completeSize == 0) {
427 return;
428 }
429 uniqueEmptyFrameOffset.reserve(uniqueEmptyFrameOffset.size() + completeSize);
430 uint32_t processed = 0;
431 for (; processed < completeSize; ++processed) {
432 uniqueEmptyFrameOffset.push_back(*xsk_ring_cons__comp_addr(&cq, idx++));
433 #ifdef DEBUG_UMEM
434 checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, uniqueEmptyFrameOffset.back(), {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
435 #endif /* DEBUG_UMEM */
436 }
437 xsk_ring_cons__release(&cq, processed);
438 }
439
440 void XskSocket::XskUmem::umemInit(size_t memSize, xsk_ring_cons* completionQueue, xsk_ring_prod* fillQueue, xsk_umem_config* config)
441 {
442 size = memSize;
443 bufBase = static_cast<uint8_t*>(mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
444 if (bufBase == MAP_FAILED) {
445 throw std::runtime_error("mmap failed");
446 }
447 auto ret = xsk_umem__create(&umem, bufBase, size, fillQueue, completionQueue, config);
448 if (ret != 0) {
449 munmap(bufBase, size);
450 throw std::runtime_error("Error creating a umem of size " + std::to_string(size) + ": " + stringerror(ret));
451 }
452 }
453
454 std::string XskSocket::getMetrics() const
455 {
456 xdp_statistics stats{};
457 socklen_t optlen = sizeof(stats);
458 int err = getsockopt(xskFd(), SOL_XDP, XDP_STATISTICS, &stats, &optlen);
459 if (err != 0) {
460 return "";
461 }
462 if (optlen != sizeof(struct xdp_statistics)) {
463 return "";
464 }
465
466 ostringstream ret;
467 ret << "RX dropped: " << std::to_string(stats.rx_dropped) << std::endl;
468 ret << "RX invalid descs: " << std::to_string(stats.rx_invalid_descs) << std::endl;
469 ret << "TX invalid descs: " << std::to_string(stats.tx_invalid_descs) << std::endl;
470 ret << "RX ring full: " << std::to_string(stats.rx_ring_full) << std::endl;
471 ret << "RX fill ring empty descs: " << std::to_string(stats.rx_fill_ring_empty_descs) << std::endl;
472 ret << "TX ring empty descs: " << std::to_string(stats.tx_ring_empty_descs) << std::endl;
473 return ret.str();
474 }
475
476 [[nodiscard]] std::string XskSocket::getXDPMode() const
477 {
478 #ifdef HAVE_BPF_XDP_QUERY
479 unsigned int itfIdx = if_nametoindex(ifName.c_str());
480 if (itfIdx == 0) {
481 return "unable to get interface index";
482 }
483 bpf_xdp_query_opts info{};
484 info.sz = sizeof(info);
485 int ret = bpf_xdp_query(static_cast<int>(itfIdx), 0, &info);
486 if (ret != 0) {
487 return {};
488 }
489 switch (info.attach_mode) {
490 case XDP_ATTACHED_DRV:
491 case XDP_ATTACHED_HW:
492 return "native";
493 case XDP_ATTACHED_SKB:
494 return "emulated";
495 default:
496 return "unknown";
497 }
498 #else /* HAVE_BPF_XDP_QUERY */
499 return "undetected";
500 #endif /* HAVE_BPF_XDP_QUERY */
501 }
502
503 void XskSocket::markAsFree(const XskPacket& packet)
504 {
505 auto offset = frameOffset(packet);
506 #ifdef DEBUG_UMEM
507 checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
508 #endif /* DEBUG_UMEM */
509
510 uniqueEmptyFrameOffset.push_back(offset);
511 }
512
513 XskSocket::XskUmem::~XskUmem()
514 {
515 if (umem != nullptr) {
516 xsk_umem__delete(umem);
517 }
518 if (bufBase != nullptr) {
519 munmap(bufBase, size);
520 }
521 }
522
523 [[nodiscard]] size_t XskPacket::getL4HeaderOffset() const noexcept
524 {
525 return sizeof(ethhdr) + (v6 ? (sizeof(ipv6hdr)) : sizeof(iphdr));
526 }
527
528 [[nodiscard]] size_t XskPacket::getDataOffset() const noexcept
529 {
530 return getL4HeaderOffset() + sizeof(udphdr);
531 }
532
533 [[nodiscard]] size_t XskPacket::getDataSize() const noexcept
534 {
535 return frameLength - getDataOffset();
536 }
537
538 [[nodiscard]] ethhdr XskPacket::getEthernetHeader() const noexcept
539 {
540 ethhdr ethHeader{};
541 if (frameLength >= sizeof(ethHeader)) {
542 memcpy(&ethHeader, frame, sizeof(ethHeader));
543 }
544 return ethHeader;
545 }
546
547 void XskPacket::setEthernetHeader(const ethhdr& ethHeader) noexcept
548 {
549 if (frameLength < sizeof(ethHeader)) {
550 frameLength = sizeof(ethHeader);
551 }
552 memcpy(frame, &ethHeader, sizeof(ethHeader));
553 }
554
555 [[nodiscard]] iphdr XskPacket::getIPv4Header() const noexcept
556 {
557 iphdr ipv4Header{};
558 assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv4Header)));
559 assert(!v6);
560 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
561 memcpy(&ipv4Header, frame + sizeof(ethhdr), sizeof(ipv4Header));
562 return ipv4Header;
563 }
564
565 void XskPacket::setIPv4Header(const iphdr& ipv4Header) noexcept
566 {
567 assert(frameLength >= (sizeof(ethhdr) + sizeof(iphdr)));
568 assert(!v6);
569 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
570 memcpy(frame + sizeof(ethhdr), &ipv4Header, sizeof(ipv4Header));
571 }
572
573 [[nodiscard]] ipv6hdr XskPacket::getIPv6Header() const noexcept
574 {
575 ipv6hdr ipv6Header{};
576 assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
577 assert(v6);
578 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
579 memcpy(&ipv6Header, frame + sizeof(ethhdr), sizeof(ipv6Header));
580 return ipv6Header;
581 }
582
583 void XskPacket::setIPv6Header(const ipv6hdr& ipv6Header) noexcept
584 {
585 assert(frameLength >= (sizeof(ethhdr) + sizeof(ipv6Header)));
586 assert(v6);
587 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
588 memcpy(frame + sizeof(ethhdr), &ipv6Header, sizeof(ipv6Header));
589 }
590
591 [[nodiscard]] udphdr XskPacket::getUDPHeader() const noexcept
592 {
593 udphdr udpHeader{};
594 assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader)));
595 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
596 memcpy(&udpHeader, frame + getL4HeaderOffset(), sizeof(udpHeader));
597 return udpHeader;
598 }
599
600 void XskPacket::setUDPHeader(const udphdr& udpHeader) noexcept
601 {
602 assert(frameLength >= (sizeof(ethhdr) + (v6 ? sizeof(ipv6hdr) : sizeof(iphdr)) + sizeof(udpHeader)));
603 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
604 memcpy(frame + getL4HeaderOffset(), &udpHeader, sizeof(udpHeader));
605 }
606
607 bool XskPacket::parse(bool fromSetHeader)
608 {
609 if (frameLength <= sizeof(ethhdr)) {
610 return false;
611 }
612
613 auto ethHeader = getEthernetHeader();
614 uint8_t l4Protocol{0};
615 if (ethHeader.h_proto == htons(ETH_P_IP)) {
616 if (frameLength < (sizeof(ethhdr) + sizeof(iphdr) + sizeof(udphdr))) {
617 return false;
618 }
619 v6 = false;
620 auto ipHeader = getIPv4Header();
621 if (ipHeader.ihl != (static_cast<uint8_t>(sizeof(iphdr) / 4))) {
622 // ip options is not supported now!
623 return false;
624 }
625 // check ip.check == ipv4Checksum() is not needed!
626 // We check it in BPF program
627 // we don't, actually.
628 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
629 from = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
630 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
631 to = makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
632 l4Protocol = ipHeader.protocol;
633 if (!fromSetHeader && (frameLength - sizeof(ethhdr)) != ntohs(ipHeader.tot_len)) {
634 // too small, or too large (trailing data), go away
635 return false;
636 }
637 }
638 else if (ethHeader.h_proto == htons(ETH_P_IPV6)) {
639 if (frameLength < (sizeof(ethhdr) + sizeof(ipv6hdr) + sizeof(udphdr))) {
640 return false;
641 }
642 v6 = true;
643 auto ipHeader = getIPv6Header();
644 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
645 from = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.saddr), sizeof(ipHeader.saddr));
646 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
647 to = makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader.daddr), sizeof(ipHeader.daddr));
648 l4Protocol = ipHeader.nexthdr;
649 if (!fromSetHeader && (frameLength - (sizeof(ethhdr) + sizeof(ipv6hdr))) != ntohs(ipHeader.payload_len)) {
650 return false;
651 }
652 }
653 else {
654 return false;
655 }
656
657 if (l4Protocol != IPPROTO_UDP) {
658 return false;
659 }
660
661 // check udp.check == ipv4Checksum() is not needed!
662 // We check it in BPF program
663 // we don't, actually.
664 auto udpHeader = getUDPHeader();
665 if (!fromSetHeader) {
666 // Because of XskPacket::setHeader
667 if (getDataOffset() > frameLength) {
668 return false;
669 }
670
671 if (getDataSize() + sizeof(udphdr) != ntohs(udpHeader.len)) {
672 return false;
673 }
674 }
675
676 from.setPort(ntohs(udpHeader.source));
677 to.setPort(ntohs(udpHeader.dest));
678 return true;
679 }
680
681 uint32_t XskPacket::getDataLen() const noexcept
682 {
683 return getDataSize();
684 }
685
686 uint32_t XskPacket::getFrameLen() const noexcept
687 {
688 return frameLength;
689 }
690
691 size_t XskPacket::getCapacity() const noexcept
692 {
693 return frameSize;
694 }
695
696 void XskPacket::changeDirectAndUpdateChecksum() noexcept
697 {
698 auto ethHeader = getEthernetHeader();
699 {
700 std::array<uint8_t, ETH_ALEN> tmp{};
701 static_assert(tmp.size() == sizeof(ethHeader.h_dest), "Size Error");
702 static_assert(tmp.size() == sizeof(ethHeader.h_source), "Size Error");
703 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
704 memcpy(tmp.data(), ethHeader.h_dest, tmp.size());
705 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
706 memcpy(ethHeader.h_dest, ethHeader.h_source, tmp.size());
707 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
708 memcpy(ethHeader.h_source, tmp.data(), tmp.size());
709 }
710 if (ethHeader.h_proto == htons(ETH_P_IPV6)) {
711 // IPV6
712 auto ipv6 = getIPv6Header();
713 std::swap(ipv6.daddr, ipv6.saddr);
714 assert(ipv6.nexthdr == IPPROTO_UDP);
715
716 auto udp = getUDPHeader();
717 std::swap(udp.dest, udp.source);
718 udp.len = htons(getDataSize() + sizeof(udp));
719 udp.check = 0;
720 /* needed to get the correct checksum */
721 setIPv6Header(ipv6);
722 setUDPHeader(udp);
723 udp.check = tcp_udp_v6_checksum(&ipv6);
724 rewriteIpv6Header(&ipv6, getFrameLen());
725 setIPv6Header(ipv6);
726 setUDPHeader(udp);
727 }
728 else {
729 // IPV4
730 auto ipv4 = getIPv4Header();
731 std::swap(ipv4.daddr, ipv4.saddr);
732 assert(ipv4.protocol == IPPROTO_UDP);
733
734 auto udp = getUDPHeader();
735 std::swap(udp.dest, udp.source);
736 udp.len = htons(getDataSize() + sizeof(udp));
737 udp.check = 0;
738 /* needed to get the correct checksum */
739 setIPv4Header(ipv4);
740 setUDPHeader(udp);
741 udp.check = tcp_udp_v4_checksum(&ipv4);
742 rewriteIpv4Header(&ipv4, getFrameLen());
743 setIPv4Header(ipv4);
744 setUDPHeader(udp);
745 }
746 setEthernetHeader(ethHeader);
747 }
748
749 void XskPacket::rewriteIpv4Header(struct iphdr* ipv4header, size_t frameLen) noexcept
750 {
751 ipv4header->version = 4;
752 ipv4header->ihl = sizeof(iphdr) / 4;
753 ipv4header->tos = 0;
754 ipv4header->tot_len = htons(frameLen - sizeof(ethhdr));
755 ipv4header->id = 0;
756 ipv4header->frag_off = 0;
757 ipv4header->ttl = DefaultTTL;
758 ipv4header->check = 0;
759 ipv4header->check = ipv4Checksum(ipv4header);
760 }
761
762 void XskPacket::rewriteIpv6Header(struct ipv6hdr* ipv6header, size_t frameLen) noexcept
763 {
764 ipv6header->version = 6;
765 ipv6header->priority = 0;
766 ipv6header->payload_len = htons(frameLen - sizeof(ethhdr) - sizeof(ipv6hdr));
767 ipv6header->hop_limit = DefaultTTL;
768 memset(&ipv6header->flow_lbl, 0, sizeof(ipv6header->flow_lbl));
769 }
770
771 bool XskPacket::isIPV6() const noexcept
772 {
773 return v6;
774 }
775
776 XskPacket::XskPacket(uint8_t* frame_, size_t dataSize, size_t frameSize_) :
777 frame(frame_), frameLength(dataSize), frameSize(frameSize_ - XDP_PACKET_HEADROOM)
778 {
779 }
780
781 PacketBuffer XskPacket::clonePacketBuffer() const
782 {
783 const auto size = getDataSize();
784 PacketBuffer tmp(size);
785 if (size > 0) {
786 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
787 memcpy(tmp.data(), frame + getDataOffset(), size);
788 }
789 return tmp;
790 }
791
792 bool XskPacket::setPayload(const PacketBuffer& buf)
793 {
794 const auto bufSize = buf.size();
795 const auto currentCapacity = getCapacity();
796 if (bufSize == 0 || bufSize > currentCapacity) {
797 return false;
798 }
799 flags |= UPDATE;
800 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
801 memcpy(frame + getDataOffset(), buf.data(), bufSize);
802 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
803 frameLength = getDataOffset() + bufSize;
804 return true;
805 }
806
807 void XskPacket::addDelay(const int relativeMilliseconds) noexcept
808 {
809 gettime(&sendTime);
810 sendTime.tv_nsec += static_cast<int64_t>(relativeMilliseconds) * 1000000L;
811 sendTime.tv_sec += sendTime.tv_nsec / 1000000000L;
812 sendTime.tv_nsec %= 1000000000L;
813 }
814
815 bool operator<(const XskPacket& lhs, const XskPacket& rhs) noexcept
816 {
817 return lhs.getSendTime() < rhs.getSendTime();
818 }
819
820 const ComboAddress& XskPacket::getFromAddr() const noexcept
821 {
822 return from;
823 }
824
825 const ComboAddress& XskPacket::getToAddr() const noexcept
826 {
827 return to;
828 }
829
830 void XskWorker::notify(int desc)
831 {
832 uint64_t value = 1;
833 ssize_t res = 0;
834 while ((res = write(desc, &value, sizeof(value))) == EINTR) {
835 }
836 if (res != sizeof(value)) {
837 throw runtime_error("Unable Wake Up XskSocket Failed");
838 }
839 }
840
841 XskWorker::XskWorker() :
842 workerWaker(createEventfd()), xskSocketWaker(createEventfd())
843 {
844 }
845
846 void XskWorker::pushToProcessingQueue(XskPacket& packet)
847 {
848 #if defined(__SANITIZE_THREAD__)
849 if (!incomingPacketsQueue.lock()->push(packet)) {
850 #else
851 if (!incomingPacketsQueue.push(packet)) {
852 #endif
853 markAsFree(packet);
854 }
855 }
856
857 void XskWorker::pushToSendQueue(XskPacket& packet)
858 {
859 #if defined(__SANITIZE_THREAD__)
860 if (!outgoingPacketsQueue.lock()->push(packet)) {
861 #else
862 if (!outgoingPacketsQueue.push(packet)) {
863 #endif
864 markAsFree(packet);
865 }
866 }
867
868 const void* XskPacket::getPayloadData() const
869 {
870 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
871 return frame + getDataOffset();
872 }
873
874 void XskPacket::setAddr(const ComboAddress& from_, MACAddr fromMAC, const ComboAddress& to_, MACAddr toMAC) noexcept
875 {
876 auto ethHeader = getEthernetHeader();
877 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
878 memcpy(ethHeader.h_dest, toMAC.data(), toMAC.size());
879 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
880 memcpy(ethHeader.h_source, fromMAC.data(), fromMAC.size());
881 setEthernetHeader(ethHeader);
882 to = to_;
883 from = from_;
884 v6 = !to.isIPv4();
885 flags = 0;
886 }
887
888 void XskPacket::rewrite() noexcept
889 {
890 flags |= REWRITE;
891 auto ethHeader = getEthernetHeader();
892 if (!v6) {
893 ethHeader.h_proto = htons(ETH_P_IP);
894
895 auto ipHeader = getIPv4Header();
896 ipHeader.daddr = to.sin4.sin_addr.s_addr;
897 ipHeader.saddr = from.sin4.sin_addr.s_addr;
898
899 auto udpHeader = getUDPHeader();
900 ipHeader.protocol = IPPROTO_UDP;
901 udpHeader.source = from.sin4.sin_port;
902 udpHeader.dest = to.sin4.sin_port;
903 udpHeader.len = htons(getDataSize() + sizeof(udpHeader));
904 udpHeader.check = 0;
905 /* needed to get the correct checksum */
906 setIPv4Header(ipHeader);
907 setUDPHeader(udpHeader);
908 udpHeader.check = tcp_udp_v4_checksum(&ipHeader);
909 rewriteIpv4Header(&ipHeader, getFrameLen());
910 setIPv4Header(ipHeader);
911 setUDPHeader(udpHeader);
912 }
913 else {
914 ethHeader.h_proto = htons(ETH_P_IPV6);
915
916 auto ipHeader = getIPv6Header();
917 memcpy(&ipHeader.daddr, &to.sin6.sin6_addr, sizeof(ipHeader.daddr));
918 memcpy(&ipHeader.saddr, &from.sin6.sin6_addr, sizeof(ipHeader.saddr));
919
920 auto udpHeader = getUDPHeader();
921 ipHeader.nexthdr = IPPROTO_UDP;
922 udpHeader.source = from.sin6.sin6_port;
923 udpHeader.dest = to.sin6.sin6_port;
924 udpHeader.len = htons(getDataSize() + sizeof(udpHeader));
925 udpHeader.check = 0;
926 /* needed to get the correct checksum */
927 setIPv6Header(ipHeader);
928 setUDPHeader(udpHeader);
929 udpHeader.check = tcp_udp_v6_checksum(&ipHeader);
930 setIPv6Header(ipHeader);
931 setUDPHeader(udpHeader);
932 }
933
934 setEthernetHeader(ethHeader);
935 }
936
937 [[nodiscard]] __be16 XskPacket::ipv4Checksum(const struct iphdr* ipHeader) noexcept
938 {
939 auto partial = ip_checksum_partial(ipHeader, sizeof(iphdr), 0);
940 return ip_checksum_fold(partial);
941 }
942
943 [[nodiscard]] __be16 XskPacket::tcp_udp_v4_checksum(const struct iphdr* ipHeader) const noexcept
944 {
945 // ip options is not supported !!!
946 const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
947 auto sum = tcp_udp_v4_header_checksum_partial(ipHeader->saddr, ipHeader->daddr, ipHeader->protocol, l4Length);
948 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
949 sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
950 return ip_checksum_fold(sum);
951 }
952
953 [[nodiscard]] __be16 XskPacket::tcp_udp_v6_checksum(const struct ipv6hdr* ipv6) const noexcept
954 {
955 const auto l4Length = static_cast<uint16_t>(getDataSize() + sizeof(udphdr));
956 uint64_t sum = tcp_udp_v6_header_checksum_partial(&ipv6->saddr, &ipv6->daddr, ipv6->nexthdr, l4Length);
957 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
958 sum = ip_checksum_partial(frame + getL4HeaderOffset(), l4Length, sum);
959 return ip_checksum_fold(sum);
960 }
961
962 [[nodiscard]] uint64_t XskPacket::ip_checksum_partial(const void* ptr, const size_t len, uint64_t sum) noexcept
963 {
964 size_t position{0};
965 /* Main loop: 32 bits at a time */
966 for (position = 0; position < len; position += sizeof(uint32_t)) {
967 uint32_t value{};
968 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
969 memcpy(&value, static_cast<const uint8_t*>(ptr) + position, sizeof(value));
970 sum += value;
971 }
972
973 /* Handle un-32bit-aligned trailing bytes */
974 if ((len - position) >= 2) {
975 uint16_t value{};
976 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
977 memcpy(&value, static_cast<const uint8_t*>(ptr) + position, sizeof(value));
978 sum += value;
979 position += sizeof(value);
980 }
981
982 if ((len - position) > 0) {
983 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
984 const auto* ptr8 = static_cast<const uint8_t*>(ptr) + position;
985 sum += ntohs(*ptr8 << 8); /* RFC says pad last byte */
986 }
987
988 return sum;
989 }
990
991 [[nodiscard]] __be16 XskPacket::ip_checksum_fold(uint64_t sum) noexcept
992 {
993 while ((sum & ~0xffffffffULL) != 0U) {
994 sum = (sum >> 32) + (sum & 0xffffffffULL);
995 }
996 while ((sum & 0xffff0000ULL) != 0U) {
997 sum = (sum >> 16) + (sum & 0xffffULL);
998 }
999
1000 return static_cast<__be16>(~sum);
1001 }
1002
1003 #ifndef __packed
1004 #define packed_attribute __attribute__((packed))
1005 #else
1006 #define packed_attribute __packed
1007 #endif
1008
1009 // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
1010 [[nodiscard]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip, __be32 dst_ip, uint8_t protocol, uint16_t len) noexcept
1011 {
1012 struct header
1013 {
1014 __be32 src_ip;
1015 __be32 dst_ip;
1016 __uint8_t mbz;
1017 __uint8_t protocol;
1018 __be16 length;
1019 };
1020 /* The IPv4 pseudo-header is defined in RFC 793, Section 3.1. */
1021 struct ipv4_pseudo_header_t
1022 {
1023 /* We use a union here to avoid aliasing issues with gcc -O2 */
1024 union
1025 {
1026 header packed_attribute fields;
1027 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
1028 uint32_t words[3];
1029 };
1030 };
1031 ipv4_pseudo_header_t pseudo_header{};
1032 static_assert(sizeof(pseudo_header) == 12, "IPv4 pseudo-header size is incorrect");
1033
1034 /* Fill in the pseudo-header. */
1035 pseudo_header.fields.src_ip = src_ip;
1036 pseudo_header.fields.dst_ip = dst_ip;
1037 pseudo_header.fields.mbz = 0;
1038 pseudo_header.fields.protocol = protocol;
1039 pseudo_header.fields.length = htons(len);
1040 return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
1041 }
1042
1043 [[nodiscard]] uint64_t XskPacket::tcp_udp_v6_header_checksum_partial(const struct in6_addr* src_ip, const struct in6_addr* dst_ip, uint8_t protocol, uint32_t len) noexcept
1044 {
1045 struct header
1046 {
1047 struct in6_addr src_ip;
1048 struct in6_addr dst_ip;
1049 __be32 length;
1050 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
1051 __uint8_t mbz[3];
1052 __uint8_t next_header;
1053 };
1054 /* The IPv6 pseudo-header is defined in RFC 2460, Section 8.1. */
1055 struct ipv6_pseudo_header_t
1056 {
1057 /* We use a union here to avoid aliasing issues with gcc -O2 */
1058 union
1059 {
1060 header packed_attribute fields;
1061 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
1062 uint32_t words[10];
1063 };
1064 };
1065 ipv6_pseudo_header_t pseudo_header{};
1066 static_assert(sizeof(pseudo_header) == 40, "IPv6 pseudo-header size is incorrect");
1067
1068 /* Fill in the pseudo-header. */
1069 pseudo_header.fields.src_ip = *src_ip;
1070 pseudo_header.fields.dst_ip = *dst_ip;
1071 pseudo_header.fields.length = htonl(len);
1072 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
1073 memset(pseudo_header.fields.mbz, 0, sizeof(pseudo_header.fields.mbz));
1074 pseudo_header.fields.next_header = protocol;
1075 return ip_checksum_partial(&pseudo_header, sizeof(pseudo_header), 0);
1076 }
1077
1078 void XskPacket::setHeader(PacketBuffer& buf)
1079 {
1080 memcpy(frame, buf.data(), buf.size());
1081 frameLength = buf.size();
1082 buf.clear();
1083 flags = 0;
1084 if (!parse(true)) {
1085 throw std::runtime_error("Error setting the XSK frame header");
1086 }
1087 }
1088
1089 PacketBuffer XskPacket::cloneHeaderToPacketBuffer() const
1090 {
1091 const auto size = getFrameLen() - getDataSize();
1092 PacketBuffer tmp(size);
1093 memcpy(tmp.data(), frame, size);
1094 return tmp;
1095 }
1096
1097 int XskWorker::createEventfd()
1098 {
1099 auto desc = ::eventfd(0, EFD_CLOEXEC);
1100 if (desc < 0) {
1101 throw runtime_error("Unable create eventfd");
1102 }
1103 return desc;
1104 }
1105
1106 void XskWorker::waitForXskSocket() const noexcept
1107 {
1108 uint64_t value = read(workerWaker, &value, sizeof(value));
1109 }
1110
1111 void XskWorker::notifyXskSocket() const
1112 {
1113 notify(xskSocketWaker);
1114 }
1115
1116 std::shared_ptr<XskWorker> XskWorker::create()
1117 {
1118 return std::make_shared<XskWorker>();
1119 }
1120
1121 void XskSocket::addWorker(std::shared_ptr<XskWorker> worker)
1122 {
1123 const auto socketWaker = worker->xskSocketWaker.getHandle();
1124 worker->umemBufBase = umem.bufBase;
1125 d_workers.insert({socketWaker, std::move(worker)});
1126 fds.push_back(pollfd{
1127 .fd = socketWaker,
1128 .events = POLLIN,
1129 .revents = 0});
1130 };
1131
1132 void XskSocket::addWorkerRoute(const std::shared_ptr<XskWorker>& worker, const ComboAddress& dest)
1133 {
1134 d_workerRoutes.lock()->insert({dest, worker});
1135 }
1136
1137 void XskSocket::removeWorkerRoute(const ComboAddress& dest)
1138 {
1139 d_workerRoutes.lock()->erase(dest);
1140 }
1141
1142 uint64_t XskWorker::frameOffset(const XskPacket& packet) const noexcept
1143 {
1144 return packet.getFrameOffsetFrom(umemBufBase);
1145 }
1146
1147 void XskWorker::notifyWorker() const
1148 {
1149 notify(workerWaker);
1150 }
1151
1152 void XskSocket::getMACFromIfName()
1153 {
1154 ifreq ifr{};
1155 auto desc = FDWrapper(::socket(AF_INET, SOCK_DGRAM, 0));
1156 if (desc < 0) {
1157 throw std::runtime_error("Error creating a socket to get the MAC address of interface " + ifName);
1158 }
1159
1160 if (ifName.size() >= IFNAMSIZ) {
1161 throw std::runtime_error("Unable to get MAC address for interface " + ifName + ": name too long");
1162 }
1163
1164 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
1165 strncpy(ifr.ifr_name, ifName.c_str(), ifName.length() + 1);
1166 if (ioctl(desc.getHandle(), SIOCGIFHWADDR, &ifr) < 0 || ifr.ifr_hwaddr.sa_family != ARPHRD_ETHER) {
1167 throw std::runtime_error("Error getting MAC address for interface " + ifName);
1168 }
1169 static_assert(sizeof(ifr.ifr_hwaddr.sa_data) >= std::tuple_size<decltype(source)>{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected");
1170 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
1171 memcpy(source.data(), ifr.ifr_hwaddr.sa_data, source.size());
1172 }
1173
1174 [[nodiscard]] int XskSocket::timeDifference(const timespec& lhs, const timespec& rhs) noexcept
1175 {
1176 const auto res = lhs.tv_sec * 1000 + lhs.tv_nsec / 1000000L - (rhs.tv_sec * 1000 + rhs.tv_nsec / 1000000L);
1177 return static_cast<int>(res);
1178 }
1179
1180 void XskWorker::cleanWorkerNotification() const noexcept
1181 {
1182 uint64_t value = read(xskSocketWaker, &value, sizeof(value));
1183 }
1184
1185 void XskWorker::cleanSocketNotification() const noexcept
1186 {
1187 uint64_t value = read(workerWaker, &value, sizeof(value));
1188 }
1189
1190 std::vector<pollfd> getPollFdsForWorker(XskWorker& info)
1191 {
1192 std::vector<pollfd> fds;
1193 int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
1194 if (timerfd < 0) {
1195 throw std::runtime_error("create_timerfd failed");
1196 }
1197 fds.push_back(pollfd{
1198 .fd = info.workerWaker,
1199 .events = POLLIN,
1200 .revents = 0,
1201 });
1202 fds.push_back(pollfd{
1203 .fd = timerfd,
1204 .events = POLLIN,
1205 .revents = 0,
1206 });
1207 return fds;
1208 }
1209
1210 void XskWorker::fillUniqueEmptyOffset()
1211 {
1212 auto frames = sharedEmptyFrameOffset->lock();
1213 const auto moveSize = std::min(static_cast<size_t>(32), frames->size());
1214 if (moveSize > 0) {
1215 // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
1216 uniqueEmptyFrameOffset.insert(uniqueEmptyFrameOffset.end(), std::make_move_iterator(frames->end() - moveSize), std::make_move_iterator(frames->end()));
1217 frames->resize(frames->size() - moveSize);
1218 }
1219 }
1220
1221 std::optional<XskPacket> XskWorker::getEmptyFrame()
1222 {
1223 if (!uniqueEmptyFrameOffset.empty()) {
1224 auto offset = uniqueEmptyFrameOffset.back();
1225 uniqueEmptyFrameOffset.pop_back();
1226 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
1227 return XskPacket(offset + umemBufBase, 0, frameSize);
1228 }
1229 fillUniqueEmptyOffset();
1230 if (!uniqueEmptyFrameOffset.empty()) {
1231 auto offset = uniqueEmptyFrameOffset.back();
1232 uniqueEmptyFrameOffset.pop_back();
1233 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
1234 return XskPacket(offset + umemBufBase, 0, frameSize);
1235 }
1236 return std::nullopt;
1237 }
1238
1239 void XskWorker::markAsFree(const XskPacket& packet)
1240 {
1241 auto offset = frameOffset(packet);
1242 #ifdef DEBUG_UMEM
1243 checkUmemIntegrity(__PRETTY_FUNCTION__, __LINE__, offset, {UmemEntryStatus::Status::Received, UmemEntryStatus::Status::TXQueue}, UmemEntryStatus::Status::Free);
1244 #endif /* DEBUG_UMEM */
1245 uniqueEmptyFrameOffset.push_back(offset);
1246 }
1247
1248 uint32_t XskPacket::getFlags() const noexcept
1249 {
1250 return flags;
1251 }
1252
1253 void XskPacket::updatePacket() noexcept
1254 {
1255 if ((flags & UPDATE) == 0U) {
1256 return;
1257 }
1258 if ((flags & REWRITE) == 0U) {
1259 changeDirectAndUpdateChecksum();
1260 }
1261 }
1262 #endif /* HAVE_XSK */