From: Remi Gacogne Date: Thu, 1 Mar 2018 11:19:29 +0000 (+0000) Subject: dnsdist: Add an option to use several source ports toward a backend X-Git-Tag: dnsdist-1.3.0~35^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=150105a20eaebc8e0041b1a41b81fd90d7dbaba3;p=thirdparty%2Fpdns.git dnsdist: Add an option to use several source ports toward a backend This is very useful if the backend is distributing queries based only on (source IP, source port, destination IP, destination port), which is for example the case of PowerDNS Recursor with several threads, reuseport set and pdns-distribute-queries not set. --- diff --git a/pdns/dnsdist-console.cc b/pdns/dnsdist-console.cc index 7d9d0209ee..d83b6c1e21 100644 --- a/pdns/dnsdist-console.cc +++ b/pdns/dnsdist-console.cc @@ -349,7 +349,7 @@ const std::vector g_consoleKeywords{ { "newQPSLimiter", true, "rate, burst", "configure a QPS limiter with that rate and that burst capacity" }, { "newRemoteLogger", true, "address:port [, timeout=2, maxQueuedEntries=100, reconnectWaitTime=1]", "create a Remote Logger object, to use with `RemoteLogAction()` and `RemoteLogResponseAction()`" }, { "newRuleAction", true, "DNS rule, DNS action [, {uuid=\"UUID\"}]", "return a pair of DNS Rule and DNS Action, to be used with `setRules()`" }, - { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\"}", "instantiate a server" }, + { "newServer", true, "{address=\"ip:port\", qps=1000, order=1, weight=10, pool=\"abuse\", retries=5, tcpConnectTimeout=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName=\"a.root-servers.net.\", checkType=\"A\", maxCheckFailures=1, mustResolve=false, useClientSubnet=true, source=\"address|interface name|address@interface\", sockets=1}", "instantiate a server" }, { "newServerPolicy", true, "name, function", "create a policy object from a Lua function" }, { "newSuffixMatchNode", true, "", "returns a new SuffixMatchNode" }, { "NoRecurseAction", true, "", "strip RD bit from the question, let it go through" }, diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index d8a8c916f2..a73b6d1b10 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -117,6 +117,7 @@ void setupLuaConfig(bool client) } ComboAddress sourceAddr; unsigned int sourceItf = 0; + size_t numberOfSockets = 1; std::set cpus; if(auto addressStr = boost::get(&pvars)) { std::shared_ptr ret; @@ -216,6 +217,10 @@ void setupLuaConfig(bool client) } } + if(vars.count("sockets")) { + numberOfSockets=std::stoi(boost::get(vars["sockets"])); + } + std::shared_ptr ret; try { ComboAddress address(boost::get(vars["address"]), 53); @@ -224,7 +229,7 @@ void setupLuaConfig(bool client) errlog("Error creating new server: %s is not a valid address for a downstream server", boost::get(vars["address"])); return ret; } - ret=std::make_shared(address, sourceAddr, sourceItf); + ret=std::make_shared(address, sourceAddr, sourceItf, numberOfSockets); } catch(const PDNSException& e) { g_outputBuffer="Error creating new server: "+string(e.reason); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index e525ba9c05..5061e63360 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -383,6 +383,34 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in return true; } + +static int pickBackendFD(DownstreamState* state) +{ + return state->fds[state->fdOffset++ % state->fds.size()]; +} + +static int selectBackendFD(const std::shared_ptr& state) +{ + if (state->fds.size() == 1) { + return state->fds[0]; + } + + std::set fds; + for (auto fd : state->fds) { + if (fd >= 0) { + fds.insert(fd); + } + } + + int selected = -1; + int res = waitForMultiData(fds, -1, -1, &selected); + if (res != 1) { + throw std::runtime_error("Error selecting a socket for a backend " + state->remote.toStringWithPort() + ": " + strerror(errno)); + } + + return selected; +} + // listens on a dedicated socket, lobs answers from downstream servers to original requestors void* responderThread(std::shared_ptr dss) try { @@ -403,7 +431,8 @@ try { dnsheader* dh = reinterpret_cast(packet); bool outstandingDecreased = false; try { - ssize_t got = recv(dss->fd, packet, sizeof(packet), 0); + int fd = selectBackendFD(dss); + ssize_t got = recv(fd, packet, sizeof(packet), 0); char * response = packet; size_t responseSize = sizeof(packet); @@ -541,30 +570,37 @@ catch(...) void DownstreamState::reconnect() { connected = false; - if (fd != -1) { - /* shutdown() is needed to wake up recv() in the responderThread */ - shutdown(fd, SHUT_RDWR); - close(fd); - fd = -1; - } - if (!IsAnyAddress(remote)) { - fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); - if (!IsAnyAddress(sourceAddr)) { - SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); - SBind(fd, sourceAddr); - } - try { - SConnect(fd, remote); - connected = true; - } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + for (auto& fd : fds) { + if (fd != -1) { + /* shutdown() is needed to wake up recv() in the responderThread */ + shutdown(fd, SHUT_RDWR); + close(fd); + fd = -1; + } + if (!IsAnyAddress(remote)) { + fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); + if (!IsAnyAddress(sourceAddr)) { + SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); + SBind(fd, sourceAddr); + } + try { + SConnect(fd, remote); + connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", remote.toStringWithPort(), error.what()); + } } } } -DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) +DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) { + fds.resize(numberOfSockets); + for (auto& fd : fds) { + fd = -1; + } + if (!IsAnyAddress(remote)) { reconnect(); idStates.resize(g_maxOutstanding); @@ -1462,7 +1498,8 @@ static void processUDPQuery(ClientState& cs, LocalHolders& holders, const struct dh->id = idOffset; - ssize_t ret = udpClientSendRequestToBackend(ss, ss->fd, query, dq.len); + int fd = pickBackendFD(ss); + ssize_t ret = udpClientSendRequestToBackend(ss, fd, query, dq.len); if(ret < 0) { ss->sendErrors++; @@ -1791,15 +1828,19 @@ void* healthChecksThread() warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); if (newState && !dss->connected) { - try { - SConnect(dss->fd, dss->remote); - dss->connected = true; - dss->tid = thread(responderThread, dss); + for (auto& fd : dss->fds) { + try { + SConnect(fd, dss->remote); + dss->connected = true; + } + catch(const std::runtime_error& error) { + infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); + newState = false; + dss->connected = false; + } } - catch(const std::runtime_error& error) { - infolog("Error connecting to new server with address %s: %s", dss->remote.toStringWithPort(), error.what()); - newState = false; - dss->connected = false; + if (dss->connected) { + dss->tid = thread(responderThread, dss); } } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index cfc92475ff..131b280a95 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -523,15 +523,19 @@ extern std::shared_ptr g_tcpclientthreads; struct DownstreamState { - DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf); - DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {} + DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf, size_t numberOfSockets); + DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {} ~DownstreamState() { - if (fd >= 0) - close(fd); + for (auto& fd : fds) { + if (fd >= 0) { + close(fd); + fd = -1; + } + } } - int fd{-1}; + std::vector fds; std::thread tid; ComboAddress remote; QPSLimiter qps; @@ -551,6 +555,7 @@ struct DownstreamState std::atomic queries{0}; } prev; string name; + size_t fdOffset{0}; double queryLoad{0.0}; double dropRate{0.0}; double latencyUsec{0.0}; diff --git a/pdns/dnsdistdist/docs/reference/config.rst b/pdns/dnsdistdist/docs/reference/config.rst index a89d466346..b136c5ceaa 100644 --- a/pdns/dnsdistdist/docs/reference/config.rst +++ b/pdns/dnsdistdist/docs/reference/config.rst @@ -268,8 +268,9 @@ Servers -- "address", e.g. "192.0.2.2" -- "interface name", e.g. "eth0" -- "address@interface", e.g. "192.0.2.2@eth0" - addXPF=NUM -- Add the client's IP address and port to the query, along with the original destination address and port, + addXPF=NUM, -- Add the client's IP address and port to the query, along with the original destination address and port, -- using the experimental XPF record from `draft-bellis-dnsop-xpf `_ and the specified option code. Default is disabled (0) + sockets=NUM -- Number of sockets (and thus source ports) used toward the backend server, defaults to a single one }) :param str server_string: A simple IP:PORT string.