```
$ cat /etc/dnsdist.conf
-newServer2 {address="2001:4860:4860::8888", qps=1}
-newServer2 {address="2001:4860:4860::8844", qps=1}
-newServer2 {address="2620:0:ccc::2", qps=10}
-newServer2 {address="2620:0:ccd::2", qps=10}
+newServer {address="2001:4860:4860::8888", qps=1}
+newServer {address="2001:4860:4860::8844", qps=1}
+newServer {address="2620:0:ccc::2", qps=10}
+newServer {address="2620:0:ccd::2", qps=10}
newServer("192.168.1.2")
+setServerPolicy(firstAvailable) -- first server within its QPS limit
$ dnsdist --local=0.0.0.0:5200 --daemon=no
Marking downstream [2001:4860:4860::8888]:53 as 'up'
Marking downstream [2620:0:ccd::2]:53 as 'up'
Marking downstream 192.168.1.2:53 as 'up'
Listening on 0.0.0.0:5200
->
+>
```
We can now send queries to port 5200, and get answers:
TCP/IP, and in this way you can implement ANY-to-TCP even for downstream
servers that lack this feature.
+Inspecting live traffic
+-----------------------
+This is still much in flux, but for now, try:
+
+ * `topQueries(20)`: shows the top-20 queries
+ * `topQueries(20,2)`: shows the top-20 two-level domain queries (so `topQueries(20,1)` only shows TLDs)
+ * `topResponses(20, 2)`: top-20 servfail responses (use ,3 for NXDOMAIN)
+
Dynamic load balancing
----------------------
-The default load balancing policy is called 'firstAvailable', which means
-the first server that has not exceeded its QPS limit gets the traffic. If
-you don't like this default policy, you can create your own, like this for
-example:
+The default load balancing policy is called 'leastOutstanding', which means
+we pick the server with the least queries 'in the air'.
+
+Another policy, 'firstAvailable', picks the first server that has not
+exceeded its QPS limit gets the traffic.
+
+A further policy, 'wrandom' assigns queries randomly, but based on the
+'weight' parameter passed to `newServer`
+
+If you don't like the default policies you can create your own, like this
+for example:
```
counter=0
Incidentally, this is similar to setting: `setServerPolicy(roundrobin)`
which uses the C++ based roundrobin policy.
+Split horizon
+-------------
+
To implement a split horizon, try:
```
-authServer=newServer2{address="2001:888:2000:1d::2", order=12}
+authServer=newServer{address="2001:888:2000:1d::2", order=12}
-- order=12 is the current hack to make sure this server does
--- generally get used, will be replaced by dedicated pools later
+-- not generally get used, will be replaced by dedicated pools later
function splitSetup(remote, qname, qtype, dh)
if(dh:getRD() == false)
then
return authServer
else
- return firstAvailable(remote, qname, qtype, dh)
+ return leastOutstanding(remote, qname, qtype, dh)
end
end
+
+setServerPolicy(splitSetup)
```
This will forward queries that don't want recursion to a specific
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "ext/luawrapper/include/LuaContext.hpp"
+#include <boost/circular_buffer.hpp>
#include "sstuff.hh"
#include "misc.hh"
#include <mutex>
We neglect to do recvfromto() on 0.0.0.0
Receiver is currently singlethreaded (not that bad actually)
We can't compile w/o crypto
- newServer2{} is an abomination of a name
our naming is as inconsistent as only ahu can make it
lack of help()
we offer now way to log from Lua
uint16_t g_maxOutstanding;
bool g_console;
NetmaskGroup g_ACL;
-
+string g_outputBuffer;
/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
Then we have a bunch of connected sockets for talking to downstream servers.
uint16_t origID;
ComboAddress origRemote;
StopWatch sentTime;
+ DNSName qname;
+ uint16_t qtype;
atomic<uint64_t> age;
};
+struct Rings {
+ Rings()
+ {
+ clientRing.set_capacity(10000);
+ queryRing.set_capacity(10000);
+ respRing.set_capacity(10000);
+ }
+ boost::circular_buffer<ComboAddress> clientRing;
+ boost::circular_buffer<DNSName> queryRing;
+ struct Response
+ {
+ DNSName name;
+ uint16_t qtype;
+ uint8_t rcode;
+ unsigned int usec;
+ };
+ boost::circular_buffer<Response> respRing;
+ std::mutex respMutex;
+} g_rings;
+
struct DownstreamState
{
DownstreamState(const ComboAddress& remote_);
double dropRate{0.0};
double latencyUsec{0.0};
int order{1};
+ int weight{1};
StopWatch sw;
enum class Availability { Up, Down, Auto} availability{Availability::Auto};
bool upStatus{false};
sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
double udiff = ids->sentTime.udiff();
vinfolog("Got answer from %s, relayed to %s, took %f usec", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
+
+ std::lock_guard<std::mutex> lock(g_rings.respMutex);
+ g_rings.respRing.push_back({ids->qname, ids->qtype, (uint8_t)dh->rcode, (unsigned int)udiff});
state->latencyUsec = (127.0 * state->latencyUsec / 128.0) + udiff/128.0;
std::mutex g_luamutex;
LuaContext g_lua;
-std::function<shared_ptr<DownstreamState>(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> g_policy;
+typedef std::function<shared_ptr<DownstreamState>(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
+policy_t g_policy;
shared_ptr<DownstreamState> firstAvailable(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
}
static int counter=0;
++counter;
+ if(g_dstates.empty())
+ return shared_ptr<DownstreamState>();
return g_dstates[counter % g_dstates.size()];
}
-shared_ptr<DownstreamState> roundrobin(const ComboAddress& remote, const DNSName& qname, uint16_t qtype)
+shared_ptr<DownstreamState> leastOutstanding(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+{
+ vector<pair<int, shared_ptr<DownstreamState>>> poss;
+
+ for(auto& d : g_dstates) { // w=1, w=10 -> 1, 11
+ if(d->isUp()) {
+ poss.push_back({d->outstanding.load(), d});
+ }
+ }
+ if(poss.empty())
+ return shared_ptr<DownstreamState>();
+ nth_element(poss.begin(), poss.begin(), poss.end(), [](const decltype(poss)::value_type& a, const decltype(poss)::value_type& b) { return a.first < b.first; });
+ return poss.begin()->second;
+}
+
+shared_ptr<DownstreamState> wrandom(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
+{
+ vector<pair<int, shared_ptr<DownstreamState>>> poss;
+ int sum=0;
+ for(auto& d : g_dstates) { // w=1, w=10 -> 1, 11
+ if(d->isUp()) {
+ sum+=d->weight;
+ poss.push_back({sum, d});
+
+ }
+ }
+ int r = random() % sum;
+ auto p = upper_bound(poss.begin(), poss.end(),r, [](int r, const decltype(poss)::value_type& a) { return r < a.first;});
+ if(p==poss.end())
+ return shared_ptr<DownstreamState>();
+ return p->second;
+}
+
+shared_ptr<DownstreamState> roundrobin(const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
{
vector<shared_ptr<DownstreamState>> poss;
+
for(auto& d : g_dstates) {
- if(d->isUp())
+ if(d->isUp()) {
poss.push_back(d);
+ }
}
- static int counter=0;
- ++counter;
+
+ auto *res=&poss;
if(poss.empty())
- return g_dstates[counter % g_dstates.size()];
- return poss[counter % poss.size()];
+ res = &g_dstates;
+
+ if(res->empty())
+ return shared_ptr<DownstreamState>();
+
+ static unsigned int counter;
+
+ return (*res)[(counter++) % res->size()];
}
shared_ptr<DownstreamState> g_abuseDSS;
ComboAddress g_serverControl{"127.0.0.1:5199"};
+
+
// listens to incoming queries, sends out to downstream servers, noting the intended return path
void* udpClientThread(ClientState* cs)
try
if(dh->qr) // don't respond to responses
continue;
+
DNSName qname(packet, len, 12, false, &qtype);
+
+ g_rings.queryRing.push_back(qname);
+
if(blockFilter)
{
std::lock_guard<std::mutex> lock(g_luamutex);
else {
std::lock_guard<std::mutex> lock(g_luamutex);
ss = g_policy(remote, qname, qtype, dh).get();
+ if(!ss)
+ continue;
}
ss->queries++;
ids->origID = dh->id;
ids->origRemote = remote;
ids->sentTime.start();
-
+ ids->qname = qname;
+ ids->qtype = qtype;
dh->id = idOffset;
len = send(ss->fd, packet, len, 0);
ids.origFD = -1;
dss->reuseds++;
--dss->outstanding;
+ std::lock_guard<std::mutex> lock(g_rings.respMutex);
+ g_rings.respRing.push_back({ids.qname, ids.qtype, 0, 2000000});
}
}
}
string response;
try {
std::lock_guard<std::mutex> lock(g_luamutex);
+ g_outputBuffer.clear();
auto ret=g_lua.executeCode<
boost::optional<
boost::variant<
shared_ptr<DownstreamState>
>
>
- >("return "+line);
+ >(line);
if(ret) {
if (const auto strValue = boost::get<shared_ptr<DownstreamState>>(&*ret)) {
response=*strValue;
}
}
+ else
+ response=g_outputBuffer;
}
void setupLua(bool client)
{
g_lua.writeFunction("newServer",
- [](const std::string& address, boost::optional<int> qps)
+ [](boost::variant<string,std::unordered_map<std::string, std::string>> pvars, boost::optional<int> qps)
{
- auto ret=std::shared_ptr<DownstreamState>(new DownstreamState(ComboAddress(address, 53)));
- ret->tid = move(thread(responderThread, ret));
- if(qps) {
- ret->qps=QPSLimiter(*qps, *qps);
+ if(auto address = boost::get<string>(&pvars)) {
+ auto ret=std::shared_ptr<DownstreamState>(new DownstreamState(ComboAddress(*address, 53)));
+ ret->tid = move(thread(responderThread, ret));
+ if(qps) {
+ ret->qps=QPSLimiter(*qps, *qps);
+ }
+ g_dstates.push_back(ret);
+ return ret;
}
- g_dstates.push_back(ret);
- return ret;
- } );
-
- g_lua.writeFunction("newServer2",
- [](std::unordered_map<std::string, std::string> vars)
- {
+ auto vars=boost::get<std::unordered_map<std::string, std::string>>(pvars);
auto ret=std::shared_ptr<DownstreamState>(new DownstreamState(ComboAddress(vars["address"], 53)));
ret->tid = move(thread(responderThread, ret));
ret->order=boost::lexical_cast<int>(vars["order"]);
}
+ if(vars.count("weight")) {
+ ret->weight=boost::lexical_cast<int>(vars["weight"]);
+ }
+
+
g_dstates.push_back(ret);
std::stable_sort(g_dstates.begin(), g_dstates.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
return a->order < b->order;
} );
- g_lua.writeFunction("setServerPolicy", [](std::function<shared_ptr<DownstreamState>(const ComboAddress&, const DNSName&, uint16_t, dnsheader*)> func) {
+ g_lua.writeFunction("setServerPolicy", [](policy_t func) {
+ cerr<<"Set it"<<endl;
g_policy = func;
});
g_lua.writeFunction("firstAvailable", firstAvailable);
g_lua.writeFunction("roundrobin", roundrobin);
+ g_lua.writeFunction("wrandom", wrandom);
+ g_lua.writeFunction("leastOutstanding", leastOutstanding);
g_lua.writeFunction("addACL", [](const std::string& domain) {
g_ACL.addMask(domain);
});
try {
ostringstream ret;
- boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|48t|%6$10d %|59t|%7$7d %|69t|%8$2.1f %|78t|%9$5.1f" );
-
- ret << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Queries" % "Drops" % "Drate" % "Lat") << endl;
+ boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|44t|%6$3d %|53t|%7$2d %|55t|%8$10d %|61t|%9$7d %|76t|%10$5.1f %|84t|%11$5.1f" );
+ // 1 2 3 4 5 6 7 8 9 10 11
+ ret << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Ord" % "Wt" % "Queries" % "Drops" % "Drate" % "Lat") << endl;
uint64_t totQPS{0}, totQueries{0}, totDrops{0};
int counter=0;
ret << (fmt % counter % s->remote.toStringWithPort() %
status %
- s->queryLoad % s->qps.getRate() % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0)) << endl;
+ s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0)) << endl;
totQPS += s->queryLoad;
totQueries += s->queries.load();
}
ret<< (fmt % "All" % "" % ""
%
- (double)totQPS % "" % totQueries % totDrops % "" % "") << endl;
+ (double)totQPS % "" % "" % "" % totQueries % totDrops % "" % "") << endl;
- return ret.str();
- }catch(std::exception& e) { cerr<<e.what()<<endl; throw; }
+ g_outputBuffer=ret.str();
+ }catch(std::exception& e) { g_outputBuffer=e.what(); throw; }
});
g_lua.registerFunction<string(DownstreamState::*)()>("tostring", [](const DownstreamState& s) { return s.remote.toStringWithPort(); });
g_lua.registerFunction<bool(DownstreamState::*)()>("checkQPS", [](DownstreamState& s) { return s.qps.check(); });
g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
+
+ g_lua.registerFunction<int(DownstreamState::*)()>("getOutstanding", [](const DownstreamState& s) { return s.outstanding.load(); });
+
+
g_lua.registerFunction("isUp", &DownstreamState::isUp);
g_lua.registerFunction("setDown", &DownstreamState::setDown);
g_lua.registerFunction("setUp", &DownstreamState::setUp);
g_lua.registerFunction("setAuto", &DownstreamState::setAuto);
g_lua.registerMember("upstatus", &DownstreamState::upStatus);
+ g_lua.registerMember("weight", &DownstreamState::weight);
+
+ g_lua.writeFunction("show", [](const string& arg) {
+ g_outputBuffer+=arg;
+ g_outputBuffer+="\n";
+ });
g_lua.registerFunction<void(dnsheader::*)(bool)>("setRD", [](dnsheader& dh, bool v) {
dh.rd=v;
errlog("Unable to bind to control socket on %s: %s", local.toStringWithPort(), e.what());
}
});
+
+ g_lua.writeFunction("getTopQueries", [](unsigned int top, boost::optional<int> labels) {
+ map<DNSName, int> counts;
+ unsigned int total=0;
+ if(!labels) {
+ for(const auto& a : g_rings.queryRing) {
+ counts[a]++;
+ total++;
+ }
+ }
+ else {
+ unsigned int lab = *labels;
+ for(auto a : g_rings.queryRing) {
+ a.trimToLabels(lab);
+ counts[a]++;
+ total++;
+ }
+
+ }
+ cout<<"Looked at "<<total<<" queries, "<<counts.size()<<" different ones"<<endl;
+ vector<pair<int, DNSName>> rcounts;
+ for(const auto& c : counts)
+ rcounts.push_back(make_pair(c.second, c.first));
+
+ sort(rcounts.begin(), rcounts.end(), [](const decltype(rcounts)::value_type& a,
+ const decltype(rcounts)::value_type& b) {
+ return b.first < a.first;
+ });
+
+ std::unordered_map<int, vector<boost::variant<string,double>>> ret;
+ unsigned int count=1, rest=0;
+ for(const auto& rc : rcounts) {
+ if(count==top+1)
+ rest+=rc.first;
+ else
+ ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
+ }
+ ret.insert({count, {"Rest", rest, 100.0*rest/total}});
+ return ret;
+
+ });
+ g_lua.executeCode(R"(function topQueries(top, labels) for k,v in ipairs(getTopQueries(top,labels)) do show(string.format("%4d %-40s %4d %4.1f%%",k,v[1],v[2], v[3])) end end)");
+
+ g_lua.writeFunction("getTopResponses", [](unsigned int top, unsigned int kind, boost::optional<int> labels) {
+ map<DNSName, int> counts;
+ unsigned int total=0;
+ {
+ std::lock_guard<std::mutex> lock(g_rings.respMutex);
+ if(!labels) {
+ for(const auto& a : g_rings.respRing) {
+ if(a.rcode!=kind)
+ continue;
+ counts[a.name]++;
+ total++;
+ }
+ }
+ else {
+ unsigned int lab = *labels;
+ for(auto a : g_rings.respRing) {
+ if(a.rcode!=kind)
+ continue;
+
+ a.name.trimToLabels(lab);
+ counts[a.name]++;
+ total++;
+ }
+
+ }
+ }
+ // cout<<"Looked at "<<total<<" responses, "<<counts.size()<<" different ones"<<endl;
+ vector<pair<int, DNSName>> rcounts;
+ for(const auto& c : counts)
+ rcounts.push_back(make_pair(c.second, c.first));
+
+ sort(rcounts.begin(), rcounts.end(), [](const decltype(rcounts)::value_type& a,
+ const decltype(rcounts)::value_type& b) {
+ return b.first < a.first;
+ });
+
+ std::unordered_map<int, vector<boost::variant<string,double>>> ret;
+ unsigned int count=1, rest=0;
+ for(const auto& rc : rcounts) {
+ if(count==top+1)
+ rest+=rc.first;
+ else
+ ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
+ }
+ ret.insert({count, {"Rest", rest, 100.0*rest/total}});
+ return ret;
+
+ });
+
+ g_lua.executeCode(R"(function topResponses(top, kind, labels) for k,v in ipairs(getTopResponses(top, kind, labels)) do show(string.format("%4d %-40s %4d %4.1f%%",k,v[1],v[2], v[3])) end end)");
+
+
+
g_lua.writeFunction("newQPSLimiter", [](int rate, int burst) { return QPSLimiter(rate, burst); });
g_lua.registerFunction("check", &QPSLimiter::check);
string response;
try {
std::lock_guard<std::mutex> lock(g_luamutex);
+ g_outputBuffer.clear();
auto ret=g_lua.executeCode<
boost::optional<
boost::variant<
shared_ptr<DownstreamState>
>
>
- >("return "+line);
+ >(line);
if(ret) {
if (const auto strValue = boost::get<shared_ptr<DownstreamState>>(&*ret)) {
cout<<*strValue<<endl;
}
}
+ else
+ cout << g_outputBuffer;
}
catch(std::exception& e) {
g_verbose=g_vm.count("verbose");
g_maxOutstanding = g_vm["max-outstanding"].as<uint16_t>();
- g_policy = firstAvailable;
+ g_policy = leastOutstanding;
if(g_vm.count("client")) {
setupLua(false);
if(g_vm.count("remotes")) {
for(const auto& address : g_vm["remotes"].as<vector<string>>()) {
- auto ret=std::shared_ptr<DownstreamState>(new DownstreamState(ComboAddress(address, 53)));
+ auto ret=std::make_shared<DownstreamState>(ComboAddress(address, 53));
ret->tid = move(thread(responderThread, ret));
g_dstates.push_back(ret);
}