From: Remi Gacogne Date: Thu, 7 Jan 2016 11:55:14 +0000 (+0100) Subject: dnsdist: Add a 'source' option to select the source addr/interface X-Git-Tag: dnsdist-1.0.0-alpha2~116^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=refs%2Fpull%2F3183%2Fhead;p=thirdparty%2Fpdns.git dnsdist: Add a 'source' option to select the source addr/interface It only supports one source address/interface per downstream server. The more I tried to support more than one, the more I realized I was in fact having grouping several DS into one, without the benefits of separate stats and status checking. In particular, having several sources adresses mean that we would get "random" failures if some addresses are allowed on the backend and some others are not. Simply adding the same backend several times with different source addresses means that only the ones with faulty addresses will be disabled. Closes #3138. --- diff --git a/pdns/README-dnsdist.md b/pdns/README-dnsdist.md index f34cfad3f8..e45ddc5098 100644 --- a/pdns/README-dnsdist.md +++ b/pdns/README-dnsdist.md @@ -174,6 +174,26 @@ parameters to `newServer`: newServer({address="192.0.2.1", tcpRecvTimeout=10, tcpSendTimeout=10}) ``` +Source address +-------------- + +In multi-homed setups, it can be useful to be able to select the source address or the outgoing +interface used by `dnsdist` to contact a downstream server. +This can be done by using the `source` parameter: +``` +newServer({address="192.0.2.1", source="192.0.2.127"}) +newServer({address="192.0.2.1", source="eth1"}) +newServer({address="192.0.2.1", source="192.0.2.127@eth1"}) +``` + +The supported values for `source` are: + * an IPv4 or IPv6 address, which must exist on the system + * an interface name + * an IPv4 or IPv6 address followed by '@' then an interface name + +Specifying the interface name is only supported on system having IP_PKTINFO. + + Configuration management ------------------------ At startup, configuration is read from the command line and the @@ -739,7 +759,7 @@ Here are all functions: * `errlog(string)`: log at level error * Server related: * `newServer("ip:port")`: instantiate a new downstream server with default settings - * `newServer({address="ip:port", qps=1000, order=1, weight=10, pool="abuse", retries=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName="a.root-servers.net.", checkType="A", mustResolve=false, useClientSubnet=true})`: + * `newServer({address="ip:port", qps=1000, order=1, weight=10, pool="abuse", retries=5, tcpSendTimeout=30, tcpRecvTimeout=30, checkName="a.root-servers.net.", checkType="A", mustResolve=false, useClientSubnet=true, source="address|interface name|address@interface"})`: instantiate a server with additional parameters * `showServers()`: output all servers * `getServer(n)`: returns server with index n diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 4dff75d95e..30938867e0 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -7,6 +7,7 @@ #include #include "dnswriter.hh" #include "lock.hh" +#include using std::thread; @@ -139,6 +140,8 @@ vector> setupLua(bool client, const std::string& confi if(client) { return std::make_shared(ComboAddress()); } + ComboAddress sourceAddr; + unsigned int sourceItf = 0; if(auto address = boost::get(&pvars)) { std::shared_ptr ret; try { @@ -173,16 +176,63 @@ vector> setupLua(bool client, const std::string& confi return ret; } auto vars=boost::get(pvars); + + if(vars.count("source")) { + /* handle source in the following forms: + - v4 address ("192.0.2.1") + - v6 address ("2001:DB8::1") + - interface name ("eth0") + - v4 address and interface name ("192.0.2.1@eth0") + - v6 address and interface name ("2001:DB8::1@eth0") + */ + const string source = boost::get(vars["source"]); + bool parsed = false; + std::string::size_type pos = source.find("@"); + if (pos == std::string::npos) { + /* no '@', try to parse that as a valid v4/v6 address */ + try { + sourceAddr = ComboAddress(source); + parsed = true; + } + catch(...) + { + } + } + + if (parsed == false) + { + /* try to parse as interface name, or v4/v6@itf */ + string itfName = source.substr(pos == std::string::npos ? 0 : pos + 1); + unsigned int itfIdx = if_nametoindex(itfName.c_str()); + + if (itfIdx != 0) { + if (pos == 0 || pos == std::string::npos) { + /* "eth0" or "@eth0" */ + sourceItf = itfIdx; + } + else { + /* "192.0.2.1@eth0" */ + sourceAddr = ComboAddress(source.substr(0, pos)); + sourceItf = itfIdx; + } + } + else + { + warnlog("Dismissing source %s because '%s' is not a valid interface name", source, itfName); + } + } + } + std::shared_ptr ret; try { - ret=std::make_shared(ComboAddress(boost::get(vars["address"]), 53)); + ret=std::make_shared(ComboAddress(boost::get(vars["address"]), 53), sourceAddr, sourceItf); } catch(std::exception& e) { g_outputBuffer="Error creating new server: "+string(e.what()); errlog("Error creating new server with address %s: %s", boost::get(vars["address"]), e.what()); return ret; } - + if(vars.count("qps")) { int qps=std::stoi(boost::get(vars["qps"])); ret->qps=QPSLimiter(qps, qps); diff --git a/pdns/dnsdist-tcp.cc b/pdns/dnsdist-tcp.cc index 3f14be24e9..e70599d655 100644 --- a/pdns/dnsdist-tcp.cc +++ b/pdns/dnsdist-tcp.cc @@ -45,11 +45,15 @@ using std::atomic; Let's start naively. */ -static int setupTCPDownstream(const ComboAddress& remote) +static int setupTCPDownstream(shared_ptr ds) { - vinfolog("TCP connecting to downstream %s", remote.toStringWithPort()); - int sock = SSocket(remote.sin4.sin_family, SOCK_STREAM, 0); - SConnect(sock, remote); + vinfolog("TCP connecting to downstream %s", ds->remote.toStringWithPort()); + int sock = SSocket(ds->remote.sin4.sin_family, SOCK_STREAM, 0); + if (!IsAnyAddress(ds->sourceAddr)) { + SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1); + SBind(sock, ds->sourceAddr); + } + SConnect(sock, ds->remote); setNonBlocking(sock); return sock; } @@ -106,6 +110,20 @@ catch(...) { return false; } +static bool sendNonBlockingMsgLen(int fd, uint16_t len, int timeout, ComboAddress& dest, ComboAddress& local, unsigned int localItf) +try +{ + if (localItf == 0) + return putNonBlockingMsgLen(fd, len, timeout); + + uint16_t raw = htons(len); + ssize_t ret = sendMsgWithTimeout(fd, (char*) &raw, sizeof raw, timeout, dest, local, localItf); + return ret == sizeof raw; +} +catch(...) { + return false; +} + TCPClientCollection g_tcpclientthreads; void* tcpClientThread(int pipefd) @@ -297,7 +315,7 @@ void* tcpClientThread(int pipefd) } if(sockets.count(ds->remote) == 0) { - dsock=sockets[ds->remote]=setupTCPDownstream(ds->remote); + dsock=sockets[ds->remote]=setupTCPDownstream(ds); } else dsock=sockets[ds->remote]; @@ -322,21 +340,26 @@ void* tcpClientThread(int pipefd) break; } - if(!putNonBlockingMsgLen(dsock, queryLen, ds->tcpSendTimeout)) { + if(!sendNonBlockingMsgLen(dsock, queryLen, ds->tcpSendTimeout, ds->remote, ds->sourceAddr, ds->sourceItf)) { vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName()); close(dsock); - sockets[ds->remote]=dsock=setupTCPDownstream(ds->remote); + sockets[ds->remote]=dsock=setupTCPDownstream(ds); downstream_failures++; goto retry; } try { - writen2WithTimeout(dsock, query, queryLen, ds->tcpSendTimeout); + if (ds->sourceItf == 0) { + writen2WithTimeout(dsock, query, queryLen, ds->tcpSendTimeout); + } + else { + sendMsgWithTimeout(dsock, query, queryLen, ds->tcpSendTimeout, ds->remote, ds->sourceAddr, ds->sourceItf); + } } catch(const runtime_error& e) { vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName()); close(dsock); - sockets[ds->remote]=dsock=setupTCPDownstream(ds->remote); + sockets[ds->remote]=dsock=setupTCPDownstream(ds); downstream_failures++; goto retry; } @@ -344,7 +367,7 @@ void* tcpClientThread(int pipefd) if(!getNonBlockingMsgLen(dsock, &rlen, ds->tcpRecvTimeout)) { vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->getName()); close(dsock); - sockets[ds->remote]=dsock=setupTCPDownstream(ds->remote); + sockets[ds->remote]=dsock=setupTCPDownstream(ds); downstream_failures++; goto retry; } diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index c14538beeb..5a064ffd6a 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -305,11 +305,13 @@ void* responderThread(std::shared_ptr state) return 0; } -DownstreamState::DownstreamState(const ComboAddress& remote_): checkName("a.root-servers.net."), checkType(QType::A), mustResolve(false) +DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_) { - remote = remote_; - fd = SSocket(remote.sin4.sin_family, SOCK_DGRAM, 0); + if (!IsAnyAddress(sourceAddr)) { + SSetsockopt(fd, SOL_SOCKET, SO_REUSEADDR, 1); + SBind(fd, sourceAddr); + } SConnect(fd, remote); idStates.resize(g_maxOutstanding); sw.start(); @@ -481,6 +483,20 @@ int getEDNSZ(const char* packet, unsigned int len) return 0x100 * (*z) + *(z+1); } +static ssize_t udpClientSendRequestToBackend(DownstreamState* ss, const int sd, const char* request, const size_t requestLen) +{ + if (ss->sourceItf == 0) { + return send(sd, request, requestLen, 0); + } + + struct msghdr msgh; + struct iovec iov; + char cbuf[256]; + fillMSGHdr(&msgh, &iov, cbuf, sizeof(cbuf), const_cast(request), requestLen, &ss->remote); + addCMsgSrcAddr(&msgh, cbuf, &ss->sourceAddr, ss->sourceItf); + return sendmsg(sd, &msgh, 0); +} + // listens to incoming queries, sends out to downstream servers, noting the intended return path static void* udpClientThread(ClientState* cs) try @@ -722,10 +738,10 @@ try } if (largerQuery.empty()) { - ret = send(ss->fd, query, len, 0); + ret = udpClientSendRequestToBackend(ss, ss->fd, query, len); } else { - ret = send(ss->fd, largerQuery.c_str(), largerQuery.size(), 0); + ret = udpClientSendRequestToBackend(ss, ss->fd, largerQuery.c_str(), largerQuery.size()); largerQuery.clear(); } @@ -759,24 +775,30 @@ catch(...) } -bool upCheck(const ComboAddress& remote, const DNSName& checkName, const QType& checkType, bool mustResolve) +static bool upCheck(DownstreamState& ds) try { vector packet; - DNSPacketWriter dpw(packet, checkName, checkType.getCode()); + DNSPacketWriter dpw(packet, ds.checkName, ds.checkType.getCode()); dnsheader * requestHeader = dpw.getHeader(); requestHeader->rd=true; - Socket sock(remote.sin4.sin_family, SOCK_DGRAM); + Socket sock(ds.remote.sin4.sin_family, SOCK_DGRAM); sock.setNonBlocking(); - sock.connect(remote); - sock.write((char*)&packet[0], packet.size()); + if (!IsAnyAddress(ds.sourceAddr)) { + sock.setReuseAddr(); + sock.bind(ds.sourceAddr); + } + sock.connect(ds.remote); + ssize_t sent = udpClientSendRequestToBackend(&ds, sock.getHandle(), (char*)&packet[0], packet.size()); + if (sent < 0) + return false; + int ret=waitForRWData(sock.getHandle(), true, 1, 0); if(ret < 0 || !ret) // error, timeout, both are down! return false; string reply; - ComboAddress dest=remote; - sock.recvFrom(reply, dest); + sock.recvFrom(reply, ds.remote); const dnsheader * responseHeader = (const dnsheader *) reply.c_str(); @@ -789,7 +811,7 @@ try return false; if (responseHeader->rcode == RCode::ServFail) return false; - if (mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) + if (ds.mustResolve && (responseHeader->rcode == RCode::NXDomain || responseHeader->rcode == RCode::Refused)) return false; // XXX fixme do bunch of checking here etc @@ -814,7 +836,7 @@ void* maintThread() for(auto& dss : g_dstates.getCopy()) { // this points to the actual shared_ptrs! if(dss->availability==DownstreamState::Availability::Auto) { - bool newState=upCheck(dss->remote, dss->checkName, dss->checkType, dss->mustResolve); + bool newState=upCheck(*dss); if(newState != dss->upStatus) { warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); } @@ -1240,7 +1262,7 @@ try for(auto& dss : g_dstates.getCopy()) { // it is a copy, but the internal shared_ptrs are the real deal if(dss->availability==DownstreamState::Availability::Auto) { - bool newState=upCheck(dss->remote, dss->checkName, dss->checkType, dss->mustResolve); + bool newState=upCheck(*dss); warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down"); dss->upStatus = newState; } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index e6d13156d2..59dd339c91 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -278,15 +278,17 @@ extern TCPClientCollection g_tcpclientthreads; struct DownstreamState { - DownstreamState(const ComboAddress& remote_); + DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf); + DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0) {} int fd; std::thread tid; ComboAddress remote; QPSLimiter qps; vector idStates; - DNSName checkName; - QType checkType; + ComboAddress sourceAddr; + DNSName checkName{"a.root-servers.net."}; + QType checkType{QType::A}; std::atomic idOffset{0}; std::atomic sendErrors{0}; std::atomic outstanding{0}; @@ -305,11 +307,12 @@ struct DownstreamState int weight{1}; int tcpRecvTimeout{30}; int tcpSendTimeout{30}; + unsigned int sourceItf{0}; uint16_t retries{5}; StopWatch sw; set pools; enum class Availability { Up, Down, Auto} availability{Availability::Auto}; - bool mustResolve; + bool mustResolve{false}; bool upStatus{false}; bool useECS{false}; bool isUp() const diff --git a/pdns/dnsproxy.cc b/pdns/dnsproxy.cc index 80710a3e4f..d6464bb693 100644 --- a/pdns/dnsproxy.cc +++ b/pdns/dnsproxy.cc @@ -278,7 +278,7 @@ void DNSProxy::mainloop(void) msgh.msg_namelen = i->second.remote.getSocklen(); if(i->second.anyLocal) { - addCMsgSrcAddr(&msgh, cbuf, i->second.anyLocal.get_ptr()); + addCMsgSrcAddr(&msgh, cbuf, i->second.anyLocal.get_ptr(), 0); } if(sendmsg(i->second.outsock, &msgh, 0) < 0) L<(buffer), len, &dest); + addCMsgSrcAddr(&msgh, cbuf, &local, localItf); + + do { + ssize_t written = sendmsg(fd, &msgh, 0); + + if (written > 0) + return written; + + if (errno == EAGAIN) { + if (firstTry) { + int res = waitForRWData(fd, false, timeout, 0); + if (res > 0) { + /* there is room available */ + firstTry = false; + } + else if (res == 0) { + throw runtime_error("Timeout while waiting to write data"); + } else { + throw runtime_error("Error while waiting for room to write data"); + } + } + else { + throw runtime_error("Timeout while waiting to write data"); + } + } + else { + unixDie("failed in write2WithTimeout"); + } + } + while (firstTry); + + return 0; +} + template class NetmaskTree; + diff --git a/pdns/iputils.hh b/pdns/iputils.hh index e2b9741f69..7c133e2939 100644 --- a/pdns/iputils.hh +++ b/pdns/iputils.hh @@ -811,6 +811,8 @@ bool HarvestDestinationAddress(struct msghdr* msgh, ComboAddress* destination); bool HarvestTimestamp(struct msghdr* msgh, struct timeval* tv); void fillMSGHdr(struct msghdr* msgh, struct iovec* iov, char* cbuf, size_t cbufsize, char* data, size_t datalen, ComboAddress* addr); int sendfromto(int sock, const char* data, int len, int flags, const ComboAddress& from, const ComboAddress& to); +ssize_t sendMsgWithTimeout(int fd, const char* buffer, size_t len, int timeout, ComboAddress& dest, const ComboAddress& local, unsigned int localItf); + #endif extern template class NetmaskTree; diff --git a/pdns/misc.cc b/pdns/misc.cc index 5ef1138b44..b60eb6dc2d 100644 --- a/pdns/misc.cc +++ b/pdns/misc.cc @@ -868,7 +868,7 @@ Regex::Regex(const string &expr) throw PDNSException("Regular expression did not compile"); } -void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source) +void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source, int itfIndex) { struct cmsghdr *cmsg = NULL; @@ -886,6 +886,7 @@ void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* sour pkt = (struct in6_pktinfo *) CMSG_DATA(cmsg); memset(pkt, 0, sizeof(*pkt)); pkt->ipi6_addr = source->sin6.sin6_addr; + pkt->ipi6_ifindex = itfIndex; msgh->msg_controllen = cmsg->cmsg_len; // makes valgrind happy and is slightly better style } else { @@ -903,6 +904,7 @@ void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* sour pkt = (struct in_pktinfo *) CMSG_DATA(cmsg); memset(pkt, 0, sizeof(*pkt)); pkt->ipi_spec_dst = source->sin4.sin_addr; + pkt->ipi_ifindex = itfIndex; msgh->msg_controllen = cmsg->cmsg_len; #endif #ifdef IP_SENDSRCADDR diff --git a/pdns/misc.hh b/pdns/misc.hh index 4ba73f92a9..51cae92f9f 100644 --- a/pdns/misc.hh +++ b/pdns/misc.hh @@ -587,7 +587,8 @@ private: }; union ComboAddress; -void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source); +/* itfIndex is an interface index, as returned by if_nametoindex(). 0 means default. */ +void addCMsgSrcAddr(struct msghdr* msgh, void* cmsgbuf, const ComboAddress* source, int itfIndex); unsigned int getFilenumLimit(bool hardOrSoft=0); void setFilenumLimit(unsigned int lim); diff --git a/pdns/nameserver.cc b/pdns/nameserver.cc index c17406942e..57c0056250 100644 --- a/pdns/nameserver.cc +++ b/pdns/nameserver.cc @@ -292,7 +292,7 @@ void UDPNameserver::send(DNSPacket *p) fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)buffer.c_str(), buffer.length(), &p->d_remote); if(p->d_anyLocal) { - addCMsgSrcAddr(&msgh, cbuf, p->d_anyLocal.get_ptr()); + addCMsgSrcAddr(&msgh, cbuf, p->d_anyLocal.get_ptr(), 0); } else { msgh.msg_control=NULL; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 42f2e239ca..c6074517f6 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -873,7 +873,7 @@ void startDoResolve(void *p) char cbuf[256]; fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)&*packet.begin(), packet.size(), &dc->d_remote); if(dc->d_local.sin4.sin_family) - addCMsgSrcAddr(&msgh, cbuf, &dc->d_local); + addCMsgSrcAddr(&msgh, cbuf, &dc->d_local, 0); else msgh.msg_control=NULL; sendmsg(dc->d_socket, &msgh, 0); @@ -1174,7 +1174,7 @@ string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fr char cbuf[256]; fillMSGHdr(&msgh, &iov, cbuf, 0, (char*)response.c_str(), response.length(), const_cast(&fromaddr)); if(destaddr.sin4.sin_family) { - addCMsgSrcAddr(&msgh, cbuf, &destaddr); + addCMsgSrcAddr(&msgh, cbuf, &destaddr, 0); } else { msgh.msg_control=NULL;