2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
32 #include <linux/bpf.h>
33 #include <linux/if_ether.h>
34 #include <linux/if_link.h>
35 #include <linux/if_xdp.h>
37 #include <linux/ipv6.h>
38 #include <linux/tcp.h>
39 #include <linux/types.h>
40 #include <linux/udp.h>
42 #include <net/if_arp.h>
43 #include <netinet/in.h>
46 #include <sys/eventfd.h>
47 #include <sys/ioctl.h>
49 #include <sys/socket.h>
50 #include <sys/timerfd.h>
55 #include <bpf/libbpf.h>
58 #include <xdp/libxdp.h>
67 struct UmemEntryStatus
69 enum class Status
: uint8_t
76 Status status
{Status::Free
};
79 LockGuarded
<std::unordered_map
<uint64_t, UmemEntryStatus
>> s_umems
;
81 void checkUmemIntegrity(const char* function
, int line
, uint64_t offset
, const std::set
<UmemEntryStatus::Status
>& validStatuses
, UmemEntryStatus::Status newStatus
)
83 auto umems
= s_umems
.lock();
84 if (validStatuses
.count(umems
->at(offset
).status
) == 0) {
85 std::cerr
<< "UMEM integrity check failed at " << function
<< ": " << line
<< ": status is " << static_cast<int>(umems
->at(offset
).status
) << ", expected: ";
86 for (const auto status
: validStatuses
) {
87 std::cerr
<< static_cast<int>(status
) << " ";
89 std::cerr
<< std::endl
;
92 (*umems
)[offset
].status
= newStatus
;
95 #endif /* DEBUG_UMEM */
97 constexpr bool XskSocket::isPowOfTwo(uint32_t value
) noexcept
99 return value
!= 0 && (value
& (value
- 1)) == 0;
102 int XskSocket::firstTimeout()
104 if (waitForDelay
.empty()) {
109 const auto& firstTime
= waitForDelay
.top().getSendTime();
110 const auto res
= timeDifference(now
, firstTime
);
117 XskSocket::XskSocket(size_t frameNum_
, std::string ifName_
, uint32_t queue_id
, const std::string
& xskMapPath
) :
118 frameNum(frameNum_
), ifName(std::move(ifName_
)), socket(nullptr, xsk_socket__delete
), sharedEmptyFrameOffset(std::make_shared
<LockGuarded
<vector
<uint64_t>>>())
120 if (!isPowOfTwo(frameNum_
) || !isPowOfTwo(frameSize
)
121 || !isPowOfTwo(fqCapacity
) || !isPowOfTwo(cqCapacity
) || !isPowOfTwo(rxCapacity
) || !isPowOfTwo(txCapacity
)) {
122 throw std::runtime_error("The number of frame , the size of frame and the capacity of rings must is a pow of 2");
126 memset(&cq
, 0, sizeof(cq
));
127 memset(&fq
, 0, sizeof(fq
));
128 memset(&tx
, 0, sizeof(tx
));
129 memset(&rx
, 0, sizeof(rx
));
131 xsk_umem_config umemCfg
{};
132 umemCfg
.fill_size
= fqCapacity
;
133 umemCfg
.comp_size
= cqCapacity
;
134 umemCfg
.frame_size
= frameSize
;
135 umemCfg
.frame_headroom
= XSK_UMEM__DEFAULT_FRAME_HEADROOM
;
137 umem
.umemInit(frameNum_
* frameSize
, &cq
, &fq
, &umemCfg
);
140 xsk_socket_config socketCfg
{};
141 socketCfg
.rx_size
= rxCapacity
;
142 socketCfg
.tx_size
= txCapacity
;
143 socketCfg
.bind_flags
= XDP_USE_NEED_WAKEUP
;
144 socketCfg
.xdp_flags
= XDP_FLAGS_SKB_MODE
;
145 socketCfg
.libxdp_flags
= XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD
;
146 xsk_socket
* tmp
= nullptr;
147 auto ret
= xsk_socket__create(&tmp
, ifName
.c_str(), queue_id
, umem
.umem
, &rx
, &tx
, &socketCfg
);
149 throw std::runtime_error("Error creating a xsk socket of if_name " + ifName
+ ": " + stringerror(ret
));
151 socket
= std::unique_ptr
<xsk_socket
, decltype(&xsk_socket__delete
)>(tmp
, xsk_socket__delete
);
154 uniqueEmptyFrameOffset
.reserve(frameNum
);
156 for (uint64_t idx
= 0; idx
< frameNum
; idx
++) {
157 uniqueEmptyFrameOffset
.push_back(idx
* frameSize
+ XDP_PACKET_HEADROOM
);
160 auto umems
= s_umems
.lock();
161 (*umems
)[idx
* frameSize
+ XDP_PACKET_HEADROOM
] = UmemEntryStatus();
163 #endif /* DEBUG_UMEM */
169 const auto xskfd
= xskFd();
170 fds
.push_back(pollfd
{
175 const auto xskMapFd
= FDWrapper(bpf_obj_get(xskMapPath
.c_str()));
177 if (xskMapFd
.getHandle() < 0) {
178 throw std::runtime_error("Error getting BPF map from path '" + xskMapPath
+ "'");
181 auto ret
= bpf_map_update_elem(xskMapFd
.getHandle(), &queue_id
, &xskfd
, 0);
183 throw std::runtime_error("Error inserting into xsk_map '" + xskMapPath
+ "': " + std::to_string(ret
));
187 // see xdp.h in contrib/
195 struct in6_addr addr
;
199 static FDWrapper
getDestinationMap(const std::string
& mapPath
)
201 auto destMapFd
= FDWrapper(bpf_obj_get(mapPath
.c_str()));
202 if (destMapFd
.getHandle() < 0) {
203 throw std::runtime_error("Error getting the XSK destination addresses map path '" + mapPath
+ "'");
208 void XskSocket::clearDestinationMap(const std::string
& mapPath
, bool isV6
)
210 auto destMapFd
= getDestinationMap(mapPath
);
212 IPv4AndPort prevKey
{};
214 while (bpf_map_get_next_key(destMapFd
.getHandle(), &prevKey
, &key
) == 0) {
215 bpf_map_delete_elem(destMapFd
.getHandle(), &key
);
220 IPv6AndPort prevKey
{};
222 while (bpf_map_get_next_key(destMapFd
.getHandle(), &prevKey
, &key
) == 0) {
223 bpf_map_delete_elem(destMapFd
.getHandle(), &key
);
229 void XskSocket::addDestinationAddress(const std::string
& mapPath
, const ComboAddress
& destination
)
231 auto destMapFd
= getDestinationMap(mapPath
);
233 if (destination
.isIPv4()) {
235 key
.addr
= destination
.sin4
.sin_addr
.s_addr
;
236 key
.port
= destination
.sin4
.sin_port
;
237 auto ret
= bpf_map_update_elem(destMapFd
.getHandle(), &key
, &value
, 0);
239 throw std::runtime_error("Error inserting into xsk_map '" + mapPath
+ "': " + std::to_string(ret
));
244 key
.addr
= destination
.sin6
.sin6_addr
;
245 key
.port
= destination
.sin6
.sin6_port
;
246 auto ret
= bpf_map_update_elem(destMapFd
.getHandle(), &key
, &value
, 0);
248 throw std::runtime_error("Error inserting into XSK destination addresses map '" + mapPath
+ "': " + std::to_string(ret
));
253 void XskSocket::removeDestinationAddress(const std::string
& mapPath
, const ComboAddress
& destination
)
255 auto destMapFd
= getDestinationMap(mapPath
);
256 if (destination
.isIPv4()) {
258 key
.addr
= destination
.sin4
.sin_addr
.s_addr
;
259 key
.port
= destination
.sin4
.sin_port
;
260 bpf_map_delete_elem(destMapFd
.getHandle(), &key
);
264 key
.addr
= destination
.sin6
.sin6_addr
;
265 key
.port
= destination
.sin6
.sin6_port
;
266 bpf_map_delete_elem(destMapFd
.getHandle(), &key
);
270 void XskSocket::fillFq(uint32_t fillSize
) noexcept
273 // if we have less than holdThreshold frames in the shared queue (which might be an issue
274 // when the XskWorker needs empty frames), move frames from the unique container into the
275 // shared one. This might not be optimal right now.
276 auto frames
= sharedEmptyFrameOffset
->lock();
277 if (frames
->size() < holdThreshold
) {
278 const auto moveSize
= std::min(holdThreshold
- frames
->size(), uniqueEmptyFrameOffset
.size());
280 // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
281 frames
->insert(frames
->end(), std::make_move_iterator(uniqueEmptyFrameOffset
.end() - moveSize
), std::make_move_iterator(uniqueEmptyFrameOffset
.end()));
282 uniqueEmptyFrameOffset
.resize(uniqueEmptyFrameOffset
.size() - moveSize
);
287 if (uniqueEmptyFrameOffset
.size() < fillSize
) {
292 auto toFill
= xsk_ring_prod__reserve(&fq
, fillSize
, &idx
);
296 uint32_t processed
= 0;
297 for (; processed
< toFill
; processed
++) {
298 *xsk_ring_prod__fill_addr(&fq
, idx
++) = uniqueEmptyFrameOffset
.back();
300 checkUmemIntegrity(__PRETTY_FUNCTION__
, __LINE__
, uniqueEmptyFrameOffset
.back(), {UmemEntryStatus::Status::Free
}, UmemEntryStatus::Status::FillQueue
);
301 #endif /* DEBUG_UMEM */
302 uniqueEmptyFrameOffset
.pop_back();
305 xsk_ring_prod__submit(&fq
, processed
);
308 int XskSocket::wait(int timeout
)
310 auto waitAtMost
= std::min(timeout
, firstTimeout());
311 return poll(fds
.data(), fds
.size(), waitAtMost
);
314 [[nodiscard
]] uint64_t XskSocket::frameOffset(const XskPacket
& packet
) const noexcept
316 return packet
.getFrameOffsetFrom(umem
.bufBase
);
319 [[nodiscard
]] int XskSocket::xskFd() const noexcept
321 return xsk_socket__fd(socket
.get());
324 void XskSocket::send(std::vector
<XskPacket
>& packets
)
326 while (!packets
.empty()) {
327 auto packetSize
= packets
.size();
328 if (packetSize
> std::numeric_limits
<uint32_t>::max()) {
329 packetSize
= std::numeric_limits
<uint32_t>::max();
331 size_t toSend
= std::min(static_cast<uint32_t>(packetSize
), txCapacity
);
333 auto toFill
= xsk_ring_prod__reserve(&tx
, toSend
, &idx
);
339 for (const auto& packet
: packets
) {
340 if (queued
== toFill
) {
343 *xsk_ring_prod__tx_desc(&tx
, idx
++) = {
344 .addr
= frameOffset(packet
),
345 .len
= packet
.getFrameLen(),
348 checkUmemIntegrity(__PRETTY_FUNCTION__
, __LINE__
, frameOffset(packet
), {UmemEntryStatus::Status::Free
, UmemEntryStatus::Status::Received
}, UmemEntryStatus::Status::TXQueue
);
349 #endif /* DEBUG_UMEM */
352 xsk_ring_prod__submit(&tx
, toFill
);
353 packets
.erase(packets
.begin(), packets
.begin() + toFill
);
357 std::vector
<XskPacket
> XskSocket::recv(uint32_t recvSizeMax
, uint32_t* failedCount
)
360 std::vector
<XskPacket
> res
;
361 // how many descriptors to packets have been filled
362 const auto recvSize
= xsk_ring_cons__peek(&rx
, recvSizeMax
, &idx
);
367 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
368 const auto baseAddr
= reinterpret_cast<uint64_t>(umem
.bufBase
);
370 uint32_t processed
= 0;
371 res
.reserve(recvSize
);
372 for (; processed
< recvSize
; processed
++) {
374 const auto* desc
= xsk_ring_cons__rx_desc(&rx
, idx
++);
375 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast,performance-no-int-to-ptr)
376 XskPacket packet
= XskPacket(reinterpret_cast<uint8_t*>(desc
->addr
+ baseAddr
), desc
->len
, frameSize
);
378 checkUmemIntegrity(__PRETTY_FUNCTION__
, __LINE__
, frameOffset(packet
), {UmemEntryStatus::Status::Free
, UmemEntryStatus::Status::FillQueue
}, UmemEntryStatus::Status::Received
);
379 #endif /* DEBUG_UMEM */
381 if (!packet
.parse(false)) {
386 res
.push_back(packet
);
389 catch (const std::exception
& exp
) {
390 std::cerr
<< "Exception while processing the XSK RX queue: " << exp
.what() << std::endl
;
394 std::cerr
<< "Exception while processing the XSK RX queue" << std::endl
;
399 // this releases the descriptor, but not the packet (umem entries)
400 // which will only be made available again when pushed into the fill
402 xsk_ring_cons__release(&rx
, processed
);
403 if (failedCount
!= nullptr) {
404 *failedCount
= failed
;
410 void XskSocket::pickUpReadyPacket(std::vector
<XskPacket
>& packets
)
414 while (!waitForDelay
.empty() && timeDifference(now
, waitForDelay
.top().getSendTime()) <= 0) {
415 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast)
416 auto& top
= const_cast<XskPacket
&>(waitForDelay
.top());
417 packets
.push_back(top
);
422 void XskSocket::recycle(size_t size
) noexcept
425 const auto completeSize
= xsk_ring_cons__peek(&cq
, size
, &idx
);
426 if (completeSize
== 0) {
429 uniqueEmptyFrameOffset
.reserve(uniqueEmptyFrameOffset
.size() + completeSize
);
430 uint32_t processed
= 0;
431 for (; processed
< completeSize
; ++processed
) {
432 uniqueEmptyFrameOffset
.push_back(*xsk_ring_cons__comp_addr(&cq
, idx
++));
434 checkUmemIntegrity(__PRETTY_FUNCTION__
, __LINE__
, uniqueEmptyFrameOffset
.back(), {UmemEntryStatus::Status::Received
, UmemEntryStatus::Status::TXQueue
}, UmemEntryStatus::Status::Free
);
435 #endif /* DEBUG_UMEM */
437 xsk_ring_cons__release(&cq
, processed
);
440 void XskSocket::XskUmem::umemInit(size_t memSize
, xsk_ring_cons
* completionQueue
, xsk_ring_prod
* fillQueue
, xsk_umem_config
* config
)
443 bufBase
= static_cast<uint8_t*>(mmap(nullptr, size
, PROT_READ
| PROT_WRITE
, MAP_PRIVATE
| MAP_ANONYMOUS
, -1, 0));
444 if (bufBase
== MAP_FAILED
) {
445 throw std::runtime_error("mmap failed");
447 auto ret
= xsk_umem__create(&umem
, bufBase
, size
, fillQueue
, completionQueue
, config
);
449 munmap(bufBase
, size
);
450 throw std::runtime_error("Error creating a umem of size " + std::to_string(size
) + ": " + stringerror(ret
));
454 std::string
XskSocket::getMetrics() const
456 xdp_statistics stats
{};
457 socklen_t optlen
= sizeof(stats
);
458 int err
= getsockopt(xskFd(), SOL_XDP
, XDP_STATISTICS
, &stats
, &optlen
);
462 if (optlen
!= sizeof(struct xdp_statistics
)) {
467 ret
<< "RX dropped: " << std::to_string(stats
.rx_dropped
) << std::endl
;
468 ret
<< "RX invalid descs: " << std::to_string(stats
.rx_invalid_descs
) << std::endl
;
469 ret
<< "TX invalid descs: " << std::to_string(stats
.tx_invalid_descs
) << std::endl
;
470 ret
<< "RX ring full: " << std::to_string(stats
.rx_ring_full
) << std::endl
;
471 ret
<< "RX fill ring empty descs: " << std::to_string(stats
.rx_fill_ring_empty_descs
) << std::endl
;
472 ret
<< "TX ring empty descs: " << std::to_string(stats
.tx_ring_empty_descs
) << std::endl
;
476 [[nodiscard
]] std::string
XskSocket::getXDPMode() const
478 #ifdef HAVE_BPF_XDP_QUERY
479 unsigned int itfIdx
= if_nametoindex(ifName
.c_str());
481 return "unable to get interface index";
483 bpf_xdp_query_opts info
{};
484 info
.sz
= sizeof(info
);
485 int ret
= bpf_xdp_query(static_cast<int>(itfIdx
), 0, &info
);
489 switch (info
.attach_mode
) {
490 case XDP_ATTACHED_DRV
:
491 case XDP_ATTACHED_HW
:
493 case XDP_ATTACHED_SKB
:
498 #else /* HAVE_BPF_XDP_QUERY */
500 #endif /* HAVE_BPF_XDP_QUERY */
503 void XskSocket::markAsFree(const XskPacket
& packet
)
505 auto offset
= frameOffset(packet
);
507 checkUmemIntegrity(__PRETTY_FUNCTION__
, __LINE__
, offset
, {UmemEntryStatus::Status::Received
, UmemEntryStatus::Status::TXQueue
}, UmemEntryStatus::Status::Free
);
508 #endif /* DEBUG_UMEM */
510 uniqueEmptyFrameOffset
.push_back(offset
);
513 XskSocket::XskUmem::~XskUmem()
515 if (umem
!= nullptr) {
516 xsk_umem__delete(umem
);
518 if (bufBase
!= nullptr) {
519 munmap(bufBase
, size
);
523 [[nodiscard
]] size_t XskPacket::getL4HeaderOffset() const noexcept
525 return sizeof(ethhdr
) + (v6
? (sizeof(ipv6hdr
)) : sizeof(iphdr
));
528 [[nodiscard
]] size_t XskPacket::getDataOffset() const noexcept
530 return getL4HeaderOffset() + sizeof(udphdr
);
533 [[nodiscard
]] size_t XskPacket::getDataSize() const noexcept
535 return frameLength
- getDataOffset();
538 [[nodiscard
]] ethhdr
XskPacket::getEthernetHeader() const noexcept
541 if (frameLength
>= sizeof(ethHeader
)) {
542 memcpy(ðHeader
, frame
, sizeof(ethHeader
));
547 void XskPacket::setEthernetHeader(const ethhdr
& ethHeader
) noexcept
549 if (frameLength
< sizeof(ethHeader
)) {
550 frameLength
= sizeof(ethHeader
);
552 memcpy(frame
, ðHeader
, sizeof(ethHeader
));
555 [[nodiscard
]] iphdr
XskPacket::getIPv4Header() const noexcept
558 assert(frameLength
>= (sizeof(ethhdr
) + sizeof(ipv4Header
)));
560 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
561 memcpy(&ipv4Header
, frame
+ sizeof(ethhdr
), sizeof(ipv4Header
));
565 void XskPacket::setIPv4Header(const iphdr
& ipv4Header
) noexcept
567 assert(frameLength
>= (sizeof(ethhdr
) + sizeof(iphdr
)));
569 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
570 memcpy(frame
+ sizeof(ethhdr
), &ipv4Header
, sizeof(ipv4Header
));
573 [[nodiscard
]] ipv6hdr
XskPacket::getIPv6Header() const noexcept
575 ipv6hdr ipv6Header
{};
576 assert(frameLength
>= (sizeof(ethhdr
) + sizeof(ipv6Header
)));
578 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
579 memcpy(&ipv6Header
, frame
+ sizeof(ethhdr
), sizeof(ipv6Header
));
583 void XskPacket::setIPv6Header(const ipv6hdr
& ipv6Header
) noexcept
585 assert(frameLength
>= (sizeof(ethhdr
) + sizeof(ipv6Header
)));
587 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
588 memcpy(frame
+ sizeof(ethhdr
), &ipv6Header
, sizeof(ipv6Header
));
591 [[nodiscard
]] udphdr
XskPacket::getUDPHeader() const noexcept
594 assert(frameLength
>= (sizeof(ethhdr
) + (v6
? sizeof(ipv6hdr
) : sizeof(iphdr
)) + sizeof(udpHeader
)));
595 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
596 memcpy(&udpHeader
, frame
+ getL4HeaderOffset(), sizeof(udpHeader
));
600 void XskPacket::setUDPHeader(const udphdr
& udpHeader
) noexcept
602 assert(frameLength
>= (sizeof(ethhdr
) + (v6
? sizeof(ipv6hdr
) : sizeof(iphdr
)) + sizeof(udpHeader
)));
603 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
604 memcpy(frame
+ getL4HeaderOffset(), &udpHeader
, sizeof(udpHeader
));
607 bool XskPacket::parse(bool fromSetHeader
)
609 if (frameLength
<= sizeof(ethhdr
)) {
613 auto ethHeader
= getEthernetHeader();
614 uint8_t l4Protocol
{0};
615 if (ethHeader
.h_proto
== htons(ETH_P_IP
)) {
616 if (frameLength
< (sizeof(ethhdr
) + sizeof(iphdr
) + sizeof(udphdr
))) {
620 auto ipHeader
= getIPv4Header();
621 if (ipHeader
.ihl
!= (static_cast<uint8_t>(sizeof(iphdr
) / 4))) {
622 // ip options is not supported now!
625 // check ip.check == ipv4Checksum() is not needed!
626 // We check it in BPF program
627 // we don't, actually.
628 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
629 from
= makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader
.saddr
), sizeof(ipHeader
.saddr
));
630 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
631 to
= makeComboAddressFromRaw(4, reinterpret_cast<const char*>(&ipHeader
.daddr
), sizeof(ipHeader
.daddr
));
632 l4Protocol
= ipHeader
.protocol
;
633 if (!fromSetHeader
&& (frameLength
- sizeof(ethhdr
)) != ntohs(ipHeader
.tot_len
)) {
634 // too small, or too large (trailing data), go away
638 else if (ethHeader
.h_proto
== htons(ETH_P_IPV6
)) {
639 if (frameLength
< (sizeof(ethhdr
) + sizeof(ipv6hdr
) + sizeof(udphdr
))) {
643 auto ipHeader
= getIPv6Header();
644 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
645 from
= makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader
.saddr
), sizeof(ipHeader
.saddr
));
646 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
647 to
= makeComboAddressFromRaw(6, reinterpret_cast<const char*>(&ipHeader
.daddr
), sizeof(ipHeader
.daddr
));
648 l4Protocol
= ipHeader
.nexthdr
;
649 if (!fromSetHeader
&& (frameLength
- (sizeof(ethhdr
) + sizeof(ipv6hdr
))) != ntohs(ipHeader
.payload_len
)) {
657 if (l4Protocol
!= IPPROTO_UDP
) {
661 // check udp.check == ipv4Checksum() is not needed!
662 // We check it in BPF program
663 // we don't, actually.
664 auto udpHeader
= getUDPHeader();
665 if (!fromSetHeader
) {
666 // Because of XskPacket::setHeader
667 if (getDataOffset() > frameLength
) {
671 if (getDataSize() + sizeof(udphdr
) != ntohs(udpHeader
.len
)) {
676 from
.setPort(ntohs(udpHeader
.source
));
677 to
.setPort(ntohs(udpHeader
.dest
));
681 uint32_t XskPacket::getDataLen() const noexcept
683 return getDataSize();
686 uint32_t XskPacket::getFrameLen() const noexcept
691 size_t XskPacket::getCapacity() const noexcept
696 void XskPacket::changeDirectAndUpdateChecksum() noexcept
698 auto ethHeader
= getEthernetHeader();
700 std::array
<uint8_t, ETH_ALEN
> tmp
{};
701 static_assert(tmp
.size() == sizeof(ethHeader
.h_dest
), "Size Error");
702 static_assert(tmp
.size() == sizeof(ethHeader
.h_source
), "Size Error");
703 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
704 memcpy(tmp
.data(), ethHeader
.h_dest
, tmp
.size());
705 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
706 memcpy(ethHeader
.h_dest
, ethHeader
.h_source
, tmp
.size());
707 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
708 memcpy(ethHeader
.h_source
, tmp
.data(), tmp
.size());
710 if (ethHeader
.h_proto
== htons(ETH_P_IPV6
)) {
712 auto ipv6
= getIPv6Header();
713 std::swap(ipv6
.daddr
, ipv6
.saddr
);
714 assert(ipv6
.nexthdr
== IPPROTO_UDP
);
716 auto udp
= getUDPHeader();
717 std::swap(udp
.dest
, udp
.source
);
718 udp
.len
= htons(getDataSize() + sizeof(udp
));
720 /* needed to get the correct checksum */
723 udp
.check
= tcp_udp_v6_checksum(&ipv6
);
724 rewriteIpv6Header(&ipv6
, getFrameLen());
730 auto ipv4
= getIPv4Header();
731 std::swap(ipv4
.daddr
, ipv4
.saddr
);
732 assert(ipv4
.protocol
== IPPROTO_UDP
);
734 auto udp
= getUDPHeader();
735 std::swap(udp
.dest
, udp
.source
);
736 udp
.len
= htons(getDataSize() + sizeof(udp
));
738 /* needed to get the correct checksum */
741 udp
.check
= tcp_udp_v4_checksum(&ipv4
);
742 rewriteIpv4Header(&ipv4
, getFrameLen());
746 setEthernetHeader(ethHeader
);
749 void XskPacket::rewriteIpv4Header(struct iphdr
* ipv4header
, size_t frameLen
) noexcept
751 ipv4header
->version
= 4;
752 ipv4header
->ihl
= sizeof(iphdr
) / 4;
754 ipv4header
->tot_len
= htons(frameLen
- sizeof(ethhdr
));
756 ipv4header
->frag_off
= 0;
757 ipv4header
->ttl
= DefaultTTL
;
758 ipv4header
->check
= 0;
759 ipv4header
->check
= ipv4Checksum(ipv4header
);
762 void XskPacket::rewriteIpv6Header(struct ipv6hdr
* ipv6header
, size_t frameLen
) noexcept
764 ipv6header
->version
= 6;
765 ipv6header
->priority
= 0;
766 ipv6header
->payload_len
= htons(frameLen
- sizeof(ethhdr
) - sizeof(ipv6hdr
));
767 ipv6header
->hop_limit
= DefaultTTL
;
768 memset(&ipv6header
->flow_lbl
, 0, sizeof(ipv6header
->flow_lbl
));
771 bool XskPacket::isIPV6() const noexcept
776 XskPacket::XskPacket(uint8_t* frame_
, size_t dataSize
, size_t frameSize_
) :
777 frame(frame_
), frameLength(dataSize
), frameSize(frameSize_
- XDP_PACKET_HEADROOM
)
781 PacketBuffer
XskPacket::clonePacketBuffer() const
783 const auto size
= getDataSize();
784 PacketBuffer
tmp(size
);
786 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
787 memcpy(tmp
.data(), frame
+ getDataOffset(), size
);
792 bool XskPacket::setPayload(const PacketBuffer
& buf
)
794 const auto bufSize
= buf
.size();
795 const auto currentCapacity
= getCapacity();
796 if (bufSize
== 0 || bufSize
> currentCapacity
) {
800 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
801 memcpy(frame
+ getDataOffset(), buf
.data(), bufSize
);
802 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
803 frameLength
= getDataOffset() + bufSize
;
807 void XskPacket::addDelay(const int relativeMilliseconds
) noexcept
810 sendTime
.tv_nsec
+= static_cast<int64_t>(relativeMilliseconds
) * 1000000L;
811 sendTime
.tv_sec
+= sendTime
.tv_nsec
/ 1000000000L;
812 sendTime
.tv_nsec
%= 1000000000L;
815 bool operator<(const XskPacket
& lhs
, const XskPacket
& rhs
) noexcept
817 return lhs
.getSendTime() < rhs
.getSendTime();
820 const ComboAddress
& XskPacket::getFromAddr() const noexcept
825 const ComboAddress
& XskPacket::getToAddr() const noexcept
830 void XskWorker::notify(int desc
)
834 while ((res
= write(desc
, &value
, sizeof(value
))) == EINTR
) {
836 if (res
!= sizeof(value
)) {
837 throw runtime_error("Unable Wake Up XskSocket Failed");
841 XskWorker::XskWorker() :
842 workerWaker(createEventfd()), xskSocketWaker(createEventfd())
846 void XskWorker::pushToProcessingQueue(XskPacket
& packet
)
848 #if defined(__SANITIZE_THREAD__)
849 if (!incomingPacketsQueue
.lock()->push(packet
)) {
851 if (!incomingPacketsQueue
.push(packet
)) {
857 void XskWorker::pushToSendQueue(XskPacket
& packet
)
859 #if defined(__SANITIZE_THREAD__)
860 if (!outgoingPacketsQueue
.lock()->push(packet
)) {
862 if (!outgoingPacketsQueue
.push(packet
)) {
868 const void* XskPacket::getPayloadData() const
870 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
871 return frame
+ getDataOffset();
874 void XskPacket::setAddr(const ComboAddress
& from_
, MACAddr fromMAC
, const ComboAddress
& to_
, MACAddr toMAC
) noexcept
876 auto ethHeader
= getEthernetHeader();
877 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
878 memcpy(ethHeader
.h_dest
, toMAC
.data(), toMAC
.size());
879 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
880 memcpy(ethHeader
.h_source
, fromMAC
.data(), fromMAC
.size());
881 setEthernetHeader(ethHeader
);
888 void XskPacket::rewrite() noexcept
891 auto ethHeader
= getEthernetHeader();
893 ethHeader
.h_proto
= htons(ETH_P_IP
);
895 auto ipHeader
= getIPv4Header();
896 ipHeader
.daddr
= to
.sin4
.sin_addr
.s_addr
;
897 ipHeader
.saddr
= from
.sin4
.sin_addr
.s_addr
;
899 auto udpHeader
= getUDPHeader();
900 ipHeader
.protocol
= IPPROTO_UDP
;
901 udpHeader
.source
= from
.sin4
.sin_port
;
902 udpHeader
.dest
= to
.sin4
.sin_port
;
903 udpHeader
.len
= htons(getDataSize() + sizeof(udpHeader
));
905 /* needed to get the correct checksum */
906 setIPv4Header(ipHeader
);
907 setUDPHeader(udpHeader
);
908 udpHeader
.check
= tcp_udp_v4_checksum(&ipHeader
);
909 rewriteIpv4Header(&ipHeader
, getFrameLen());
910 setIPv4Header(ipHeader
);
911 setUDPHeader(udpHeader
);
914 ethHeader
.h_proto
= htons(ETH_P_IPV6
);
916 auto ipHeader
= getIPv6Header();
917 memcpy(&ipHeader
.daddr
, &to
.sin6
.sin6_addr
, sizeof(ipHeader
.daddr
));
918 memcpy(&ipHeader
.saddr
, &from
.sin6
.sin6_addr
, sizeof(ipHeader
.saddr
));
920 auto udpHeader
= getUDPHeader();
921 ipHeader
.nexthdr
= IPPROTO_UDP
;
922 udpHeader
.source
= from
.sin6
.sin6_port
;
923 udpHeader
.dest
= to
.sin6
.sin6_port
;
924 udpHeader
.len
= htons(getDataSize() + sizeof(udpHeader
));
926 /* needed to get the correct checksum */
927 setIPv6Header(ipHeader
);
928 setUDPHeader(udpHeader
);
929 udpHeader
.check
= tcp_udp_v6_checksum(&ipHeader
);
930 setIPv6Header(ipHeader
);
931 setUDPHeader(udpHeader
);
934 setEthernetHeader(ethHeader
);
937 [[nodiscard
]] __be16
XskPacket::ipv4Checksum(const struct iphdr
* ipHeader
) noexcept
939 auto partial
= ip_checksum_partial(ipHeader
, sizeof(iphdr
), 0);
940 return ip_checksum_fold(partial
);
943 [[nodiscard
]] __be16
XskPacket::tcp_udp_v4_checksum(const struct iphdr
* ipHeader
) const noexcept
945 // ip options is not supported !!!
946 const auto l4Length
= static_cast<uint16_t>(getDataSize() + sizeof(udphdr
));
947 auto sum
= tcp_udp_v4_header_checksum_partial(ipHeader
->saddr
, ipHeader
->daddr
, ipHeader
->protocol
, l4Length
);
948 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
949 sum
= ip_checksum_partial(frame
+ getL4HeaderOffset(), l4Length
, sum
);
950 return ip_checksum_fold(sum
);
953 [[nodiscard
]] __be16
XskPacket::tcp_udp_v6_checksum(const struct ipv6hdr
* ipv6
) const noexcept
955 const auto l4Length
= static_cast<uint16_t>(getDataSize() + sizeof(udphdr
));
956 uint64_t sum
= tcp_udp_v6_header_checksum_partial(&ipv6
->saddr
, &ipv6
->daddr
, ipv6
->nexthdr
, l4Length
);
957 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
958 sum
= ip_checksum_partial(frame
+ getL4HeaderOffset(), l4Length
, sum
);
959 return ip_checksum_fold(sum
);
962 [[nodiscard
]] uint64_t XskPacket::ip_checksum_partial(const void* ptr
, const size_t len
, uint64_t sum
) noexcept
965 /* Main loop: 32 bits at a time */
966 for (position
= 0; position
< len
; position
+= sizeof(uint32_t)) {
968 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
969 memcpy(&value
, static_cast<const uint8_t*>(ptr
) + position
, sizeof(value
));
973 /* Handle un-32bit-aligned trailing bytes */
974 if ((len
- position
) >= 2) {
976 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
977 memcpy(&value
, static_cast<const uint8_t*>(ptr
) + position
, sizeof(value
));
979 position
+= sizeof(value
);
982 if ((len
- position
) > 0) {
983 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
984 const auto* ptr8
= static_cast<const uint8_t*>(ptr
) + position
;
985 sum
+= ntohs(*ptr8
<< 8); /* RFC says pad last byte */
991 [[nodiscard
]] __be16
XskPacket::ip_checksum_fold(uint64_t sum
) noexcept
993 while ((sum
& ~0xffffffffULL
) != 0U) {
994 sum
= (sum
>> 32) + (sum
& 0xffffffffULL
);
996 while ((sum
& 0xffff0000ULL
) != 0U) {
997 sum
= (sum
>> 16) + (sum
& 0xffffULL
);
1000 return static_cast<__be16
>(~sum
);
1004 #define packed_attribute __attribute__((packed))
1006 #define packed_attribute __packed
1009 // NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
1010 [[nodiscard
]] uint64_t XskPacket::tcp_udp_v4_header_checksum_partial(__be32 src_ip
, __be32 dst_ip
, uint8_t protocol
, uint16_t len
) noexcept
1020 /* The IPv4 pseudo-header is defined in RFC 793, Section 3.1. */
1021 struct ipv4_pseudo_header_t
1023 /* We use a union here to avoid aliasing issues with gcc -O2 */
1026 header packed_attribute fields
;
1027 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
1031 ipv4_pseudo_header_t pseudo_header
{};
1032 static_assert(sizeof(pseudo_header
) == 12, "IPv4 pseudo-header size is incorrect");
1034 /* Fill in the pseudo-header. */
1035 pseudo_header
.fields
.src_ip
= src_ip
;
1036 pseudo_header
.fields
.dst_ip
= dst_ip
;
1037 pseudo_header
.fields
.mbz
= 0;
1038 pseudo_header
.fields
.protocol
= protocol
;
1039 pseudo_header
.fields
.length
= htons(len
);
1040 return ip_checksum_partial(&pseudo_header
, sizeof(pseudo_header
), 0);
1043 [[nodiscard
]] uint64_t XskPacket::tcp_udp_v6_header_checksum_partial(const struct in6_addr
* src_ip
, const struct in6_addr
* dst_ip
, uint8_t protocol
, uint32_t len
) noexcept
1047 struct in6_addr src_ip
;
1048 struct in6_addr dst_ip
;
1050 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
1052 __uint8_t next_header
;
1054 /* The IPv6 pseudo-header is defined in RFC 2460, Section 8.1. */
1055 struct ipv6_pseudo_header_t
1057 /* We use a union here to avoid aliasing issues with gcc -O2 */
1060 header packed_attribute fields
;
1061 // NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays)
1065 ipv6_pseudo_header_t pseudo_header
{};
1066 static_assert(sizeof(pseudo_header
) == 40, "IPv6 pseudo-header size is incorrect");
1068 /* Fill in the pseudo-header. */
1069 pseudo_header
.fields
.src_ip
= *src_ip
;
1070 pseudo_header
.fields
.dst_ip
= *dst_ip
;
1071 pseudo_header
.fields
.length
= htonl(len
);
1072 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
1073 memset(pseudo_header
.fields
.mbz
, 0, sizeof(pseudo_header
.fields
.mbz
));
1074 pseudo_header
.fields
.next_header
= protocol
;
1075 return ip_checksum_partial(&pseudo_header
, sizeof(pseudo_header
), 0);
1078 void XskPacket::setHeader(PacketBuffer
& buf
)
1080 memcpy(frame
, buf
.data(), buf
.size());
1081 frameLength
= buf
.size();
1085 throw std::runtime_error("Error setting the XSK frame header");
1089 PacketBuffer
XskPacket::cloneHeaderToPacketBuffer() const
1091 const auto size
= getFrameLen() - getDataSize();
1092 PacketBuffer
tmp(size
);
1093 memcpy(tmp
.data(), frame
, size
);
1097 int XskWorker::createEventfd()
1099 auto desc
= ::eventfd(0, EFD_CLOEXEC
);
1101 throw runtime_error("Unable create eventfd");
1106 void XskWorker::waitForXskSocket() const noexcept
1108 uint64_t value
= read(workerWaker
, &value
, sizeof(value
));
1111 void XskWorker::notifyXskSocket() const
1113 notify(xskSocketWaker
);
1116 std::shared_ptr
<XskWorker
> XskWorker::create()
1118 return std::make_shared
<XskWorker
>();
1121 void XskSocket::addWorker(std::shared_ptr
<XskWorker
> worker
)
1123 const auto socketWaker
= worker
->xskSocketWaker
.getHandle();
1124 worker
->umemBufBase
= umem
.bufBase
;
1125 d_workers
.insert({socketWaker
, std::move(worker
)});
1126 fds
.push_back(pollfd
{
1132 void XskSocket::addWorkerRoute(const std::shared_ptr
<XskWorker
>& worker
, const ComboAddress
& dest
)
1134 d_workerRoutes
.lock()->insert({dest
, worker
});
1137 void XskSocket::removeWorkerRoute(const ComboAddress
& dest
)
1139 d_workerRoutes
.lock()->erase(dest
);
1142 uint64_t XskWorker::frameOffset(const XskPacket
& packet
) const noexcept
1144 return packet
.getFrameOffsetFrom(umemBufBase
);
1147 void XskWorker::notifyWorker() const
1149 notify(workerWaker
);
1152 void XskSocket::getMACFromIfName()
1155 auto desc
= FDWrapper(::socket(AF_INET
, SOCK_DGRAM
, 0));
1157 throw std::runtime_error("Error creating a socket to get the MAC address of interface " + ifName
);
1160 if (ifName
.size() >= IFNAMSIZ
) {
1161 throw std::runtime_error("Unable to get MAC address for interface " + ifName
+ ": name too long");
1164 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
1165 strncpy(ifr
.ifr_name
, ifName
.c_str(), ifName
.length() + 1);
1166 if (ioctl(desc
.getHandle(), SIOCGIFHWADDR
, &ifr
) < 0 || ifr
.ifr_hwaddr
.sa_family
!= ARPHRD_ETHER
) {
1167 throw std::runtime_error("Error getting MAC address for interface " + ifName
);
1169 static_assert(sizeof(ifr
.ifr_hwaddr
.sa_data
) >= std::tuple_size
<decltype(source
)>{}, "The size of an ARPHRD_ETHER MAC address is smaller than expected");
1170 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay)
1171 memcpy(source
.data(), ifr
.ifr_hwaddr
.sa_data
, source
.size());
1174 [[nodiscard
]] int XskSocket::timeDifference(const timespec
& lhs
, const timespec
& rhs
) noexcept
1176 const auto res
= lhs
.tv_sec
* 1000 + lhs
.tv_nsec
/ 1000000L - (rhs
.tv_sec
* 1000 + rhs
.tv_nsec
/ 1000000L);
1177 return static_cast<int>(res
);
1180 void XskWorker::cleanWorkerNotification() const noexcept
1182 uint64_t value
= read(xskSocketWaker
, &value
, sizeof(value
));
1185 void XskWorker::cleanSocketNotification() const noexcept
1187 uint64_t value
= read(workerWaker
, &value
, sizeof(value
));
1190 std::vector
<pollfd
> getPollFdsForWorker(XskWorker
& info
)
1192 std::vector
<pollfd
> fds
;
1193 int timerfd
= timerfd_create(CLOCK_MONOTONIC
, TFD_CLOEXEC
);
1195 throw std::runtime_error("create_timerfd failed");
1197 fds
.push_back(pollfd
{
1198 .fd
= info
.workerWaker
,
1202 fds
.push_back(pollfd
{
1210 void XskWorker::fillUniqueEmptyOffset()
1212 auto frames
= sharedEmptyFrameOffset
->lock();
1213 const auto moveSize
= std::min(static_cast<size_t>(32), frames
->size());
1215 // NOLINTNEXTLINE(bugprone-narrowing-conversions,cppcoreguidelines-narrowing-conversions)
1216 uniqueEmptyFrameOffset
.insert(uniqueEmptyFrameOffset
.end(), std::make_move_iterator(frames
->end() - moveSize
), std::make_move_iterator(frames
->end()));
1217 frames
->resize(frames
->size() - moveSize
);
1221 std::optional
<XskPacket
> XskWorker::getEmptyFrame()
1223 if (!uniqueEmptyFrameOffset
.empty()) {
1224 auto offset
= uniqueEmptyFrameOffset
.back();
1225 uniqueEmptyFrameOffset
.pop_back();
1226 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
1227 return XskPacket(offset
+ umemBufBase
, 0, frameSize
);
1229 fillUniqueEmptyOffset();
1230 if (!uniqueEmptyFrameOffset
.empty()) {
1231 auto offset
= uniqueEmptyFrameOffset
.back();
1232 uniqueEmptyFrameOffset
.pop_back();
1233 // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
1234 return XskPacket(offset
+ umemBufBase
, 0, frameSize
);
1236 return std::nullopt
;
1239 void XskWorker::markAsFree(const XskPacket
& packet
)
1241 auto offset
= frameOffset(packet
);
1243 checkUmemIntegrity(__PRETTY_FUNCTION__
, __LINE__
, offset
, {UmemEntryStatus::Status::Received
, UmemEntryStatus::Status::TXQueue
}, UmemEntryStatus::Status::Free
);
1244 #endif /* DEBUG_UMEM */
1245 uniqueEmptyFrameOffset
.push_back(offset
);
1248 uint32_t XskPacket::getFlags() const noexcept
1253 void XskPacket::updatePacket() noexcept
1255 if ((flags
& UPDATE
) == 0U) {
1258 if ((flags
& REWRITE
) == 0U) {
1259 changeDirectAndUpdateChecksum();
1262 #endif /* HAVE_XSK */