ClientState& cs;
ComboAddress local;
LocalStateHolder<NetmaskGroup>& acl;
+ int socket{-1};
};
-static void acceptNewConnection(int socket, TCPAcceptorParam& param)
+static void acceptNewConnection(const TCPAcceptorParam& param)
{
auto& cs = param.cs;
auto& acl = param.acl;
+ int socket = param.socket;
bool tcpClientCountIncremented = false;
ComboAddress remote;
remote.sin4.sin_family = param.local.sin4.sin_family;
/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
they will hand off to worker threads & spawn more of them if required
*/
-void tcpAcceptorThread(ClientState* cs)
+void tcpAcceptorThread(std::vector<ClientState*> states)
{
setThreadName("dnsdist/tcpAcce");
auto acl = g_ACL.getLocal();
- struct TCPAcceptorParam param{*cs, cs->local, acl};
+ std::vector<TCPAcceptorParam> params;
+ params.reserve(states.size());
- if (cs->d_additionalAddresses.empty()) {
+ for (auto& state : states) {
+ params.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD});
+ for (const auto& [addr, socket] : state->d_additionalAddresses) {
+ params.emplace_back(TCPAcceptorParam{*state, addr, acl, socket});
+ }
+ }
+
+ if (params.size() == 1) {
while (true) {
- acceptNewConnection(cs->tcpFD, param);
+ acceptNewConnection(params.at(0));
}
}
else {
auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
- auto acceptorParam = boost::any_cast<TCPAcceptorParam*>(funcparam);
- acceptNewConnection(socket, *acceptorParam);
+ auto acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
+ acceptNewConnection(*acceptorParam);
};
- std::vector<TCPAcceptorParam> additionalParams;
auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
- mplexer->addReadFD(cs->tcpFD, acceptCallback, ¶m);
- for (const auto& [addr, socket] : cs->d_additionalAddresses) {
- additionalParams.emplace_back(TCPAcceptorParam{*cs, addr, acl});
- }
- size_t idx = 0;
- for (const auto& [addr, socket] : cs->d_additionalAddresses) {
- mplexer->addReadFD(socket, acceptCallback, &additionalParams.at(idx));
- idx++;
+ for (size_t idx = 0; idx < params.size(); idx++) {
+ const auto& param = params.at(idx);
+ mplexer->addReadFD(param.socket, acceptCallback, ¶m);
}
struct timeval tv;
#endif /* DISABLE_RECVMMSG */
// listens to incoming queries, sends out to downstream servers, noting the intended return path
-static void udpClientThread(ClientState* cs)
+static void udpClientThread(std::vector<ClientState*> states)
{
try {
setThreadName("dnsdist/udpClie");
#ifndef DISABLE_RECVMMSG
#if defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE)
if (g_udpVectorSize > 1) {
- MultipleMessagesUDPClientThread(cs, holders);
+ MultipleMessagesUDPClientThread(states.at(0), holders);
}
else
#endif /* defined(HAVE_RECVMMSG) && defined(HAVE_SENDMMSG) && defined(MSG_WAITFORONE) */
- we use it for self-generated responses (from rule or cache)
but we only accept incoming payloads up to that size
*/
+ struct UDPStateParam
+ {
+ ClientState* cs{nullptr};
+ size_t maxIncomingPacketSize{0};
+ int socket{-1};
+ };
const size_t initialBufferSize = getInitialUDPPacketBufferSize();
- const size_t maxIncomingPacketSize = getMaximumIncomingPacketSize(*cs);
PacketBuffer packet(initialBufferSize);
struct msghdr msgh;
struct iovec iov;
- /* used by HarvestDestinationAddress */
- cmsgbuf_aligned cbuf;
-
ComboAddress remote;
ComboAddress dest;
- remote.sin4.sin_family = cs->local.sin4.sin_family;
- fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast<char*>(&packet.at(0)), maxIncomingPacketSize, &remote);
- for(;;) {
+ auto handleOnePacket = [&packet, &iov, &holders, &msgh, &remote, &dest, initialBufferSize](const UDPStateParam& param) {
packet.resize(initialBufferSize);
iov.iov_base = &packet.at(0);
iov.iov_len = packet.size();
- ssize_t got = recvmsg(cs->udpFD, &msgh, 0);
+ ssize_t got = recvmsg(param.socket, &msgh, 0);
if (got < 0 || static_cast<size_t>(got) < sizeof(struct dnsheader)) {
++g_stats.nonCompliantQueries;
- ++cs->nonCompliantQueries;
- continue;
+ ++param.cs->nonCompliantQueries;
+ return;
}
packet.resize(static_cast<size_t>(got));
- processUDPQuery(*cs, holders, &msgh, remote, dest, packet, nullptr, nullptr, nullptr, nullptr);
+ processUDPQuery(*param.cs, holders, &msgh, remote, dest, packet, nullptr, nullptr, nullptr, nullptr);
+ };
+
+ std::vector<UDPStateParam> params;
+ for (auto& state : states) {
+ const size_t maxIncomingPacketSize = getMaximumIncomingPacketSize(*state);
+ params.emplace_back(UDPStateParam{state, maxIncomingPacketSize, state->udpFD});
+ }
+
+ if (params.size() == 1) {
+ auto param = params.at(0);
+ remote.sin4.sin_family = param.cs->local.sin4.sin_family;
+ /* used by HarvestDestinationAddress */
+ cmsgbuf_aligned cbuf;
+ fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast<char*>(&packet.at(0)), param.maxIncomingPacketSize, &remote);
+ while (true) {
+ handleOnePacket(param);
+ }
+ }
+ else {
+ auto callback = [&remote, &msgh, &iov, &packet, &handleOnePacket, initialBufferSize](int socket, FDMultiplexer::funcparam_t& funcparam) {
+ auto param = boost::any_cast<const UDPStateParam*>(funcparam);
+ remote.sin4.sin_family = param->cs->local.sin4.sin_family;
+ packet.resize(initialBufferSize);
+ /* used by HarvestDestinationAddress */
+ cmsgbuf_aligned cbuf;
+ fillMSGHdr(&msgh, &iov, &cbuf, sizeof(cbuf), reinterpret_cast<char*>(&packet.at(0)), param->maxIncomingPacketSize, &remote);
+ handleOnePacket(*param);
+ };
+ auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+ for (size_t idx = 0; idx < params.size(); idx++) {
+ const auto& param = params.at(idx);
+ mplexer->addReadFD(param.socket, callback, ¶m);
+ }
+
+ struct timeval tv;
+ while (true) {
+ mplexer->run(&tv);
+ }
}
}
}
- catch(const std::exception &e)
- {
+ catch (const std::exception &e) {
errlog("UDP client thread died because of exception: %s", e.what());
}
- catch(const PDNSException &e)
- {
+ catch (const PDNSException &e) {
errlog("UDP client thread died because of PowerDNS exception: %s", e.reason);
}
- catch(...)
- {
+ catch (...) {
errlog("UDP client thread died because of an exception: %s", "unknown");
}
}
handleQueuedHealthChecks(*mplexer, true);
}
+ std::vector<ClientState*> tcpStates;
+ std::vector<ClientState*> udpStates;
for(auto& cs : g_frontends) {
if (cs->dohFrontend != nullptr) {
#ifdef HAVE_DNS_OVER_HTTPS
continue;
}
if (cs->udpFD >= 0) {
- thread t1(udpClientThread, cs.get());
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+ udpStates.push_back(cs.get());
+#else /* USE_SINGLE_ACCEPTOR_THREAD */
+ thread t1(udpClientThread, std::vector<ClientState*>{ cs.get() });
if (!cs->cpus.empty()) {
mapThreadToCPUList(t1.native_handle(), cs->cpus);
}
t1.detach();
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
}
else if (cs->tcpFD >= 0) {
- thread t1(tcpAcceptorThread, cs.get());
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+ tcpStates.push_back(cs.get());
+#else /* USE_SINGLE_ACCEPTOR_THREAD */
+ thread t1(tcpAcceptorThread, std::vector<ClientState*>{cs.get() });
if (!cs->cpus.empty()) {
mapThreadToCPUList(t1.native_handle(), cs->cpus);
}
t1.detach();
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
}
}
-
+#ifdef USE_SINGLE_ACCEPTOR_THREAD
+ if (!udpStates.empty()) {
+ thread udp(udpClientThread, udpStates);
+ udp.detach();
+ }
+ if (!tcpStates.empty()) {
+ thread tcp(tcpAcceptorThread, tcpStates);
+ tcp.detach();
+ }
+#endif /* USE_SINGLE_ACCEPTOR_THREAD */
dnsdist::ServiceDiscovery::run();
#ifndef DISABLE_CARBON