]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
split up dnsdist.cc, it was getting large
authorbert hubert <bert.hubert@netherlabs.nl>
Thu, 5 Mar 2015 08:05:50 +0000 (09:05 +0100)
committerbert hubert <bert.hubert@netherlabs.nl>
Thu, 5 Mar 2015 08:05:50 +0000 (09:05 +0100)
pdns/Makefile.am
pdns/dnsdist-lua.cc [new file with mode: 0644]
pdns/dnsdist.cc
pdns/dnsdist.hh [new file with mode: 0644]
pdns/sodcrypto.cc
pdns/sodcrypto.hh

index fd9bebfefb4b4cd6e0a7e7de57eec592158428a4..0e2e2f63a62a3daa266a5b5732a175657d2f5de3 100644 (file)
@@ -561,6 +561,7 @@ dnsdist_SOURCES = \
        base64.cc base64.hh \
        dns.cc \
        dnsdist.cc \
+       dnsdist-lua.cc \
        dnslabeltext.cc \
        dnsname.cc dnsname.hh \
        dnsparser.cc dnsparser.hh \
diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc
new file mode 100644 (file)
index 0000000..ce8cdb3
--- /dev/null
@@ -0,0 +1,504 @@
+#include "dnsdist.hh"
+#include <thread>
+#include "dolog.hh"
+#include "sodcrypto.hh"
+#include "base64.hh"
+#include <fstream>
+
+using std::thread;
+
+void setupLua(bool client)
+{
+  g_lua.writeFunction("newServer", 
+                     [](boost::variant<string,std::unordered_map<std::string, std::string>> pvars, boost::optional<int> qps)
+                     { 
+                       if(auto address = boost::get<string>(&pvars)) {
+                         auto ret=std::make_shared<DownstreamState>(ComboAddress(*address, 53));
+                         ret->tid = move(thread(responderThread, ret));
+                         if(qps) {
+                           ret->qps=QPSLimiter(*qps, *qps);
+                         }
+                         g_dstates.push_back(ret);
+                         return ret;
+                       }
+                       auto vars=boost::get<std::unordered_map<std::string, std::string>>(pvars);
+                       auto ret=std::make_shared<DownstreamState>(ComboAddress(vars["address"], 53));
+
+                       ret->tid = move(thread(responderThread, ret));
+
+                       if(vars.count("qps")) {
+                         ret->qps=QPSLimiter(boost::lexical_cast<int>(vars["qps"]),boost::lexical_cast<int>(vars["qps"]));
+                       }
+
+                       if(vars.count("pool")) {
+                         ret->pools.insert(vars["pool"]);
+                       }
+
+                       if(vars.count("order")) {
+                         ret->order=boost::lexical_cast<int>(vars["order"]);
+                       }
+
+                       if(vars.count("weight")) {
+                         ret->weight=boost::lexical_cast<int>(vars["weight"]);
+                       }
+
+
+                       g_dstates.push_back(ret);
+                       std::stable_sort(g_dstates.begin(), g_dstates.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
+                           return a->order < b->order;
+                         });
+                       return ret;
+                     } );
+
+
+
+
+  g_lua.writeFunction("rmServer", 
+                     [](boost::variant<std::shared_ptr<DownstreamState>, int> var)
+                     { 
+                       if(auto* rem = boost::get<shared_ptr<DownstreamState>>(&var))
+                         g_dstates.erase(remove(g_dstates.begin(), g_dstates.end(), *rem), g_dstates.end());
+                       else
+                         g_dstates.erase(g_dstates.begin() + boost::get<int>(var));
+                     } );
+
+
+  g_lua.writeFunction("setServerPolicy", [](ServerPolicy policy)  {
+      g_policy=policy;
+    });
+
+  g_lua.writeFunction("setServerPolicyLua", [](string name, policy_t policy)  {
+      g_policy=ServerPolicy{name, policy};
+    });
+
+  g_lua.writeFunction("showServerPolicy", []() {
+      g_outputBuffer=g_policy.name+"\n";
+    });
+
+
+  g_lua.registerMember("name", &ServerPolicy::name);
+  g_lua.registerMember("policy", &ServerPolicy::policy);
+  g_lua.writeFunction("newServerPolicy", [](string name, policy_t policy) { return ServerPolicy{name, policy};});
+  g_lua.writeVariable("firstAvailable", ServerPolicy{"firstAvailable", firstAvailable});
+  g_lua.writeVariable("roundrobin", ServerPolicy{"roundrobin", roundrobin});
+  g_lua.writeVariable("wrandom", ServerPolicy{"wrandom", wrandom});
+  g_lua.writeVariable("leastOutstanding", ServerPolicy{"leastOutstanding", leastOutstanding});
+  g_lua.writeFunction("addACL", [](const std::string& domain) {
+      g_ACL.addMask(domain);
+    });
+  g_lua.writeFunction("setACL", [](const vector<pair<int, string>>& parts) {
+    NetmaskGroup nmg;
+    for(const auto& p : parts) {
+      nmg.addMask(p.second);
+    }
+    g_ACL=nmg;
+  });
+  g_lua.writeFunction("showACL", []() {
+      vector<string> vec;
+      g_ACL.toStringVector(&vec);
+      string ret;
+      for(const auto& s : vec)
+       ret+=s+"\n";
+      return ret;
+    });
+  g_lua.writeFunction("shutdown", []() { _exit(0);} );
+
+
+  g_lua.writeFunction("addDomainBlock", [](const std::string& domain) { g_suffixMatchNodeFilter.add(DNSName(domain)); });
+  g_lua.writeFunction("showServers", []() {  
+      try {
+      ostringstream ret;
+      
+      boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|44t|%6$3d %|53t|%7$2d %|55t|%8$10d %|61t|%9$7d %|76t|%10$5.1f %|84t|%11$5.1f %12%" );
+      //             1        2          3       4        5       6       7       8           9        10        11
+      ret << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Ord" % "Wt" % "Queries" % "Drops" % "Drate" % "Lat" % "Pools") << endl;
+
+      uint64_t totQPS{0}, totQueries{0}, totDrops{0};
+      int counter=0;
+      for(auto& s : g_dstates) {
+       string status;
+       if(s->availability == DownstreamState::Availability::Up) 
+         status = "UP";
+       else if(s->availability == DownstreamState::Availability::Down) 
+         status = "DOWN";
+       else 
+         status = (s->upStatus ? "up" : "down");
+
+       string pools;
+       for(auto& p : s->pools) {
+         if(!pools.empty())
+           pools+=" ";
+         pools+=p;
+       }
+
+       ret << (fmt % counter % s->remote.toStringWithPort() % 
+               status % 
+               s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0) % pools) << endl;
+
+       totQPS += s->queryLoad;
+       totQueries += s->queries.load();
+       totDrops += s->reuseds.load();
+       ++counter;
+      }
+      ret<< (fmt % "All" % "" % "" 
+               % 
+            (double)totQPS % "" % "" % "" % totQueries % totDrops % "" % "" % "" ) << endl;
+
+      g_outputBuffer=ret.str();
+      }catch(std::exception& e) { g_outputBuffer=e.what(); throw; }
+    });
+
+  g_lua.writeFunction("addPoolRule", [](boost::variant<string,vector<pair<int, string>> > var, string pool) {
+      SuffixMatchNode smn;
+      NetmaskGroup nmg;
+
+      auto add=[&](string src) {
+       try {
+         smn.add(DNSName(src));
+       } catch(...) {
+         nmg.addMask(src);
+       }
+      };
+      if(auto src = boost::get<string>(&var))
+       add(*src);
+      else {
+       for(auto& a : boost::get<vector<pair<int, string>>>(var)) {
+         add(a.second);
+       }
+      }
+      if(nmg.empty())
+       g_poolrules.push_back({smn, pool});
+      else
+       g_poolrules.push_back({nmg, pool});
+
+    });
+
+  g_lua.writeFunction("showPoolRules", []() {
+      boost::format fmt("%-3d %-50s %s\n");
+      g_outputBuffer += (fmt % "#" % "Object" % "Pool").str();
+      int num=0;
+      for(const auto& lim : g_poolrules) {
+       string name;
+       if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
+         name=nmg->toString();
+       }
+       else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
+         name=smn->toString(); 
+       }
+       g_outputBuffer += (fmt % num % name % lim.second).str();
+       ++num;
+      }
+    });
+
+
+  g_lua.writeFunction("addQPSLimit", [](boost::variant<string,vector<pair<int, string>> > var, int lim) {
+      SuffixMatchNode smn;
+      NetmaskGroup nmg;
+
+      auto add=[&](string src) {
+       try {
+         smn.add(DNSName(src));
+       } catch(...) {
+         nmg.addMask(src);
+       }
+      };
+      if(auto src = boost::get<string>(&var))
+       add(*src);
+      else {
+       for(auto& a : boost::get<vector<pair<int, string>>>(var)) {
+         add(a.second);
+       }
+      }
+      if(nmg.empty())
+       g_limiters.push_back({smn, QPSLimiter(lim, lim)});
+      else
+       g_limiters.push_back({nmg, QPSLimiter(lim, lim)});
+    });
+
+  g_lua.writeFunction("rmQPSLimit", [](int i) {
+      g_limiters.erase(g_limiters.begin() + i);
+    });
+
+  g_lua.writeFunction("showQPSLimits", []() {
+      boost::format fmt("%-3d %-50s %7d %8d %8d\n");
+      g_outputBuffer += (fmt % "#" % "Object" % "Lim" % "Passed" % "Blocked").str();
+      int num=0;
+      for(const auto& lim : g_limiters) {
+       string name;
+       if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
+         name=nmg->toString();
+       }
+       else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
+         name=smn->toString(); 
+       }
+       g_outputBuffer += (fmt % num % name % lim.second.getRate() % lim.second.getPassed() % lim.second.getBlocked()).str();
+       ++num;
+      }
+    });
+
+
+  g_lua.writeFunction("getServers", []() {
+      vector<pair<int, std::shared_ptr<DownstreamState> > > ret;
+      int count=1;
+      for(auto& s : g_dstates) {
+       ret.push_back(make_pair(count++, s));
+      }
+      return ret;
+    });
+
+  g_lua.writeFunction("getServer", [](int i) { return g_dstates[i]; });
+
+  g_lua.registerFunction<bool(DownstreamState::*)()>("checkQPS", [](DownstreamState& s) { return s.qps.check(); });
+  g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
+  g_lua.registerFunction<void(DownstreamState::*)(string)>("addPool", [](DownstreamState& s, string pool) { s.pools.insert(pool);});
+  g_lua.registerFunction<void(DownstreamState::*)(string)>("rmPool", [](DownstreamState& s, string pool) { s.pools.erase(pool);});
+
+  g_lua.registerFunction<void(DownstreamState::*)()>("getOutstanding", [](const DownstreamState& s) { g_outputBuffer=std::to_string(s.outstanding.load()); });
+
+
+  g_lua.registerFunction("isUp", &DownstreamState::isUp);
+  g_lua.registerFunction("setDown", &DownstreamState::setDown);
+  g_lua.registerFunction("setUp", &DownstreamState::setUp);
+  g_lua.registerFunction("setAuto", &DownstreamState::setAuto);
+  g_lua.registerMember("upstatus", &DownstreamState::upStatus);
+  g_lua.registerMember("weight", &DownstreamState::weight);
+  g_lua.registerMember("order", &DownstreamState::order);
+  
+  g_lua.writeFunction("show", [](const string& arg) {
+      g_outputBuffer+=arg;
+      g_outputBuffer+="\n";
+    });
+
+  g_lua.registerFunction<void(dnsheader::*)(bool)>("setRD", [](dnsheader& dh, bool v) {
+      dh.rd=v;
+    });
+
+  g_lua.registerFunction<bool(dnsheader::*)()>("getRD", [](dnsheader& dh) {
+      return (bool)dh.rd;
+    });
+
+
+  g_lua.registerFunction<void(dnsheader::*)(bool)>("setTC", [](dnsheader& dh, bool v) {
+      dh.tc=v;
+    });
+
+  g_lua.registerFunction<void(dnsheader::*)(bool)>("setQR", [](dnsheader& dh, bool v) {
+      dh.qr=v;
+    });
+
+  std::ifstream ifs(g_vm["config"].as<string>());
+  if(!ifs) 
+    warnlog("Unable to read configuration from '%s'", g_vm["config"].as<string>());
+  else
+    infolog("Read configuration from '%s'", g_vm["config"].as<string>());
+
+  g_lua.registerFunction("tostring", &ComboAddress::toString);
+
+  g_lua.registerFunction("isPartOf", &DNSName::isPartOf);
+  g_lua.registerFunction("tostring", &DNSName::toString);
+  g_lua.writeFunction("newDNSName", [](const std::string& name) { return DNSName(name); });
+  g_lua.writeFunction("newSuffixNode", []() { return SuffixMatchNode(); });
+
+  g_lua.registerFunction("add",(void (SuffixMatchNode::*)(const DNSName&)) &SuffixMatchNode::add);
+  g_lua.registerFunction("check",(bool (SuffixMatchNode::*)(const DNSName&) const) &SuffixMatchNode::check);
+
+  g_lua.writeFunction("controlSocket", [client](const std::string& str) {
+      ComboAddress local(str, 5199);
+
+      if(client) {
+       g_serverControl = local;
+       return;
+      }
+      
+      try {
+       int sock = socket(local.sin4.sin_family, SOCK_STREAM, 0);
+       SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
+       SBind(sock, local);
+       SListen(sock, 5);
+       thread t(controlThread, sock, local);
+       t.detach();
+      }
+      catch(std::exception& e) {
+       errlog("Unable to bind to control socket on %s: %s", local.toStringWithPort(), e.what());
+      }
+    });
+
+  g_lua.writeFunction("getTopQueries", [](unsigned int top, boost::optional<int> labels) {
+      map<DNSName, int> counts;
+      unsigned int total=0;
+      if(!labels) {
+       for(const auto& a : g_rings.queryRing) {
+         counts[a]++;
+         total++;
+       }
+      }
+      else {
+       unsigned int lab = *labels;
+       for(auto a : g_rings.queryRing) {
+         a.trimToLabels(lab);
+         counts[a]++;
+         total++;
+       }
+
+      }
+      cout<<"Looked at "<<total<<" queries, "<<counts.size()<<" different ones"<<endl;
+      vector<pair<int, DNSName>> rcounts;
+      for(const auto& c : counts) 
+       rcounts.push_back(make_pair(c.second, c.first));
+
+      sort(rcounts.begin(), rcounts.end(), [](const decltype(rcounts)::value_type& a, 
+                                             const decltype(rcounts)::value_type& b) {
+            return b.first < a.first;
+          });
+
+      std::unordered_map<int, vector<boost::variant<string,double>>> ret;
+      unsigned int count=1, rest=0;
+      for(const auto& rc : rcounts) {
+       if(count==top+1)
+         rest+=rc.first;
+       else
+         ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
+      }
+      ret.insert({count, {"Rest", rest, 100.0*rest/total}});
+      return ret;
+
+    });
+  
+  g_lua.executeCode(R"(function topQueries(top, labels) for k,v in ipairs(getTopQueries(top,labels)) do show(string.format("%4d  %-40s %4d %4.1f%%",k,v[1],v[2], v[3])) end end)");
+
+  g_lua.writeFunction("getTopResponses", [](unsigned int top, unsigned int kind, boost::optional<int> labels) {
+      map<DNSName, int> counts;
+      unsigned int total=0;
+      {
+       std::lock_guard<std::mutex> lock(g_rings.respMutex);
+       if(!labels) {
+         for(const auto& a : g_rings.respRing) {
+           if(a.rcode!=kind)
+             continue;
+           counts[a.name]++;
+           total++;
+         }
+       }
+       else {
+         unsigned int lab = *labels;
+         for(auto a : g_rings.respRing) {
+           if(a.rcode!=kind)
+             continue;
+
+           a.name.trimToLabels(lab);
+           counts[a.name]++;
+           total++;
+         }
+         
+       }
+      }
+      //      cout<<"Looked at "<<total<<" responses, "<<counts.size()<<" different ones"<<endl;
+      vector<pair<int, DNSName>> rcounts;
+      for(const auto& c : counts) 
+       rcounts.push_back(make_pair(c.second, c.first));
+
+      sort(rcounts.begin(), rcounts.end(), [](const decltype(rcounts)::value_type& a, 
+                                             const decltype(rcounts)::value_type& b) {
+            return b.first < a.first;
+          });
+
+      std::unordered_map<int, vector<boost::variant<string,double>>> ret;
+      unsigned int count=1, rest=0;
+      for(const auto& rc : rcounts) {
+       if(count==top+1)
+         rest+=rc.first;
+       else
+         ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
+      }
+      ret.insert({count, {"Rest", rest, 100.0*rest/total}});
+      return ret;
+
+    });
+  
+  g_lua.executeCode(R"(function topResponses(top, kind, labels) for k,v in ipairs(getTopResponses(top, kind, labels)) do show(string.format("%4d  %-40s %4d %4.1f%%",k,v[1],v[2], v[3])) end end)");
+
+
+  g_lua.writeFunction("showResponseLatency", []() {
+
+      map<double, unsigned int> histo;
+      double bin=100;
+      for(int i=0; i < 15; ++i) {
+       histo[bin];
+       bin*=2;
+      }
+
+      double totlat=0;
+      int size=0;
+      {
+       std::lock_guard<std::mutex> lock(g_rings.respMutex);
+       for(const auto& r : g_rings.respRing) {
+         ++size;
+         auto iter = histo.lower_bound(r.usec);
+         if(iter != histo.end())
+           iter->second++;
+         else
+           histo.rbegin()++;
+         totlat+=r.usec;
+       }
+      }
+
+      g_outputBuffer = (boost::format("Average response latency: %.02f msec\n") % (0.001*totlat/size)).str();
+      double highest=0;
+      
+      for(auto iter = histo.cbegin(); iter != histo.cend(); ++iter) {
+       highest=std::max(highest, iter->second*1.0);
+      }
+      boost::format fmt("%7.2f\t%s\n");
+      g_outputBuffer += (fmt % "msec" % "").str();
+
+      for(auto iter = histo.cbegin(); iter != histo.cend(); ++iter) {
+       int stars = (70.0 * iter->second/highest);
+       char c='*';
+       if(!stars && iter->second) {
+         stars=1; // you get 1 . to show something is there..
+         if(70.0*iter->second/highest > 0.5)
+           c=':';
+         else
+           c='.';
+       }
+       g_outputBuffer += (fmt % (iter->first/1000.0) % string(stars, c)).str();
+      }
+    });
+
+  g_lua.writeFunction("newQPSLimiter", [](int rate, int burst) { return QPSLimiter(rate, burst); });
+  g_lua.registerFunction("check", &QPSLimiter::check);
+
+
+  g_lua.writeFunction("makeKey", []() {
+      g_outputBuffer="setKey("+newKey()+")\n";
+    });
+  
+  g_lua.writeFunction("setKey", [](const std::string& key) {
+      if(B64Decode(key, g_key)) 
+       throw std::runtime_error("Unable to decode "+key+" as Base64");
+    });
+
+  
+  g_lua.writeFunction("testCrypto", [](string testmsg)
+   {
+     try {
+       SodiumNonce sn, sn2;
+       sn.init();
+       sn2=sn;
+       string encrypted = sodEncryptSym(testmsg, g_key, sn);
+       string decrypted = sodDecryptSym(encrypted, g_key, sn2);
+       
+       if(testmsg == decrypted)
+        g_outputBuffer="Everything is ok!\n";
+       else
+        g_outputBuffer="Crypto failed..\n";
+       
+     }
+     catch(...) {
+       g_outputBuffer="Crypto failed..\n";
+     }});
+
+  
+
+  g_lua.executeCode(ifs);
+}
index 4e2ee566960bcbf42a21f2e8f19e5eb9c5d0280e..07d7560f5897be6457bbce199f911718680177a2 100644 (file)
     along with this program; if not, write to the Free Software
     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
 */
-#include "ext/luawrapper/include/LuaContext.hpp"
-#include <boost/circular_buffer.hpp>
+
+#include "dnsdist.hh"
 #include "sstuff.hh"
 #include "misc.hh"
-#include <mutex>
 #include "statbag.hh"
 #include <netinet/tcp.h>
-#include <boost/program_options.hpp>
 
 
-#include <thread>
+
 #include <limits>
-#include <atomic>
+
 #include "arguments.hh"
 #include "dolog.hh"
 #include <readline/readline.h>
@@ -108,181 +106,10 @@ string g_outputBuffer;
 
    If all downstreams are over QPS, we pick the fastest server */
 
-struct StopWatch
-{
-#ifndef CLOCK_MONOTONIC_RAW
-#define CLOCK_MONOTONIC_RAW CLOCK_MONOTONIC
-#endif
-  struct timespec d_start{0,0};
-  void start() {  
-    if(clock_gettime(CLOCK_MONOTONIC_RAW, &d_start) < 0)
-      unixDie("Getting timestamp");
-    
-  }
-  
-  double udiff() const {
-    struct timespec now;
-    if(clock_gettime(CLOCK_MONOTONIC_RAW, &now) < 0)
-      unixDie("Getting timestamp");
-    
-    return 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
-  }
-
-  double udiffAndSet() {
-    struct timespec now;
-    if(clock_gettime(CLOCK_MONOTONIC_RAW, &now) < 0)
-      unixDie("Getting timestamp");
-    
-    auto ret= 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
-    d_start = now;
-    return ret;
-  }
-
-};
-
-class QPSLimiter
-{
-public:
-  QPSLimiter()
-  {
-  }
-
-  QPSLimiter(unsigned int rate, unsigned int burst) : d_rate(rate), d_burst(burst), d_tokens(burst)
-  {
-    d_passthrough=false;
-    d_prev.start();
-  }
-
-  unsigned int getRate() const
-  {
-    return d_passthrough? 0 : d_rate;
-  }
-
-  int getPassed() const
-  {
-    return d_passed;
-  }
-  int getBlocked() const
-  {
-    return d_blocked;
-  }
-
-  bool check()
-  {
-    if(d_passthrough)
-      return true;
-    auto delta = d_prev.udiffAndSet();
-  
-    d_tokens += 1.0*d_rate * (delta/1000000.0);
-
-    if(d_tokens > d_burst)
-      d_tokens = d_burst;
-
-    bool ret=false;
-    if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
-      ret=true;
-      --d_tokens;
-      d_passed++;
-    }
-    else
-      d_blocked++;
-
-    return ret; 
-  }
-private:
-  bool d_passthrough{true};
-  unsigned int d_rate;
-  unsigned int d_burst;
-  double d_tokens;
-  StopWatch d_prev;
-  unsigned int d_passed{0};
-  unsigned int d_blocked{0};
-};
-
 vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> > g_limiters;
 vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, string> > g_poolrules;
+Rings g_rings;
 
-struct IDState
-{
-  IDState() : origFD(-1) {}
-  IDState(const IDState& orig)
-  {
-    origFD = orig.origFD;
-    origID = orig.origID;
-    origRemote = orig.origRemote;
-    age.store(orig.age.load());
-  }
-
-  int origFD;  // set to <0 to indicate this state is empty
-  uint16_t origID;
-  ComboAddress origRemote;
-  StopWatch sentTime;
-  DNSName qname;
-  uint16_t qtype;
-  atomic<uint64_t> age;
-};
-
-struct Rings {
-  Rings()
-  {
-    clientRing.set_capacity(10000);
-    queryRing.set_capacity(10000);
-    respRing.set_capacity(10000);
-  }
-  boost::circular_buffer<ComboAddress> clientRing;
-  boost::circular_buffer<DNSName> queryRing;
-  struct Response
-  {
-    DNSName name;
-    uint16_t qtype;
-    uint8_t rcode;
-    unsigned int usec;
-  };
-  boost::circular_buffer<Response> respRing;
-  std::mutex respMutex;
-} g_rings;
-
-struct DownstreamState
-{
-  DownstreamState(const ComboAddress& remote_);
-
-  int fd;            
-  thread tid;
-  ComboAddress remote;
-  QPSLimiter qps;
-  vector<IDState> idStates;
-  atomic<uint64_t> idOffset{0};
-  atomic<uint64_t> sendErrors{0};
-  atomic<uint64_t> outstanding{0};
-  atomic<uint64_t> reuseds{0};
-  atomic<uint64_t> queries{0};
-  struct {
-    atomic<uint64_t> sendErrors{0};
-    atomic<uint64_t> reuseds{0};
-    atomic<uint64_t> queries{0};
-  } prev;
-  double queryLoad{0.0};
-  double dropRate{0.0};
-  double latencyUsec{0.0};
-  int order{1};
-  int weight{1};
-  StopWatch sw;
-  set<string> pools;
-  enum class Availability { Up, Down, Auto} availability{Availability::Auto};
-  bool upStatus{false};
-  bool isUp() const
-  {
-    if(availability == Availability::Down)
-      return false;
-    if(availability == Availability::Up)
-      return true;
-    return upStatus;
-  }
-  void setUp() { availability = Availability::Up; }
-  void setDown() { availability = Availability::Down; }
-  void setAuto() { availability = Availability::Auto; }
-};
-using servers_t =vector<std::shared_ptr<DownstreamState>>;
 servers_t g_dstates;
 
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
@@ -345,13 +172,8 @@ struct ClientState
 std::mutex g_luamutex;
 LuaContext g_lua;
 
-typedef std::function<shared_ptr<DownstreamState>(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
 
-struct ServerPolicy
-{
-  string name;
-  policy_t policy;
-} g_policy;
+ServerPolicy g_policy;
 
 shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)
 {
@@ -964,497 +786,6 @@ catch(std::exception& e)
   errlog("Control connection died: %s", e.what());
 }
 
-void setupLua(bool client)
-{
-  g_lua.writeFunction("newServer", 
-                     [](boost::variant<string,std::unordered_map<std::string, std::string>> pvars, boost::optional<int> qps)
-                     { 
-                       if(auto address = boost::get<string>(&pvars)) {
-                         auto ret=std::make_shared<DownstreamState>(ComboAddress(*address, 53));
-                         ret->tid = move(thread(responderThread, ret));
-                         if(qps) {
-                           ret->qps=QPSLimiter(*qps, *qps);
-                         }
-                         g_dstates.push_back(ret);
-                         return ret;
-                       }
-                       auto vars=boost::get<std::unordered_map<std::string, std::string>>(pvars);
-                       auto ret=std::make_shared<DownstreamState>(ComboAddress(vars["address"], 53));
-
-                       ret->tid = move(thread(responderThread, ret));
-
-                       if(vars.count("qps")) {
-                         ret->qps=QPSLimiter(boost::lexical_cast<int>(vars["qps"]),boost::lexical_cast<int>(vars["qps"]));
-                       }
-
-                       if(vars.count("pool")) {
-                         ret->pools.insert(vars["pool"]);
-                       }
-
-                       if(vars.count("order")) {
-                         ret->order=boost::lexical_cast<int>(vars["order"]);
-                       }
-
-                       if(vars.count("weight")) {
-                         ret->weight=boost::lexical_cast<int>(vars["weight"]);
-                       }
-
-
-                       g_dstates.push_back(ret);
-                       std::stable_sort(g_dstates.begin(), g_dstates.end(), [](const decltype(ret)& a, const decltype(ret)& b) {
-                           return a->order < b->order;
-                         });
-                       return ret;
-                     } );
-
-
-
-
-  g_lua.writeFunction("rmServer", 
-                     [](boost::variant<std::shared_ptr<DownstreamState>, int> var)
-                     { 
-                       if(auto* rem = boost::get<shared_ptr<DownstreamState>>(&var))
-                         g_dstates.erase(remove(g_dstates.begin(), g_dstates.end(), *rem), g_dstates.end());
-                       else
-                         g_dstates.erase(g_dstates.begin() + boost::get<int>(var));
-                     } );
-
-
-  g_lua.writeFunction("setServerPolicy", [](ServerPolicy policy)  {
-      g_policy=policy;
-    });
-
-  g_lua.writeFunction("setServerPolicyLua", [](string name, policy_t policy)  {
-      g_policy=ServerPolicy{name, policy};
-    });
-
-  g_lua.writeFunction("showServerPolicy", []() {
-      g_outputBuffer=g_policy.name+"\n";
-    });
-
-
-  g_lua.registerMember("name", &ServerPolicy::name);
-  g_lua.registerMember("policy", &ServerPolicy::policy);
-  g_lua.writeFunction("newServerPolicy", [](string name, policy_t policy) { return ServerPolicy{name, policy};});
-  g_lua.writeVariable("firstAvailable", ServerPolicy{"firstAvailable", firstAvailable});
-  g_lua.writeVariable("roundrobin", ServerPolicy{"roundrobin", roundrobin});
-  g_lua.writeVariable("wrandom", ServerPolicy{"wrandom", wrandom});
-  g_lua.writeVariable("leastOutstanding", ServerPolicy{"leastOutstanding", leastOutstanding});
-  g_lua.writeFunction("addACL", [](const std::string& domain) {
-      g_ACL.addMask(domain);
-    });
-  g_lua.writeFunction("setACL", [](const vector<pair<int, string>>& parts) {
-    NetmaskGroup nmg;
-    for(const auto& p : parts) {
-      nmg.addMask(p.second);
-    }
-    g_ACL=nmg;
-  });
-  g_lua.writeFunction("showACL", []() {
-      vector<string> vec;
-      g_ACL.toStringVector(&vec);
-      string ret;
-      for(const auto& s : vec)
-       ret+=s+"\n";
-      return ret;
-    });
-  g_lua.writeFunction("shutdown", []() { _exit(0);} );
-
-
-  g_lua.writeFunction("addDomainBlock", [](const std::string& domain) { g_suffixMatchNodeFilter.add(DNSName(domain)); });
-  g_lua.writeFunction("showServers", []() {  
-      try {
-      ostringstream ret;
-      
-      boost::format fmt("%1$-3d %2% %|30t|%3$5s %|36t|%4$7.1f %|41t|%5$7d %|44t|%6$3d %|53t|%7$2d %|55t|%8$10d %|61t|%9$7d %|76t|%10$5.1f %|84t|%11$5.1f %12%" );
-      //             1        2          3       4        5       6       7       8           9        10        11
-      ret << (fmt % "#" % "Address" % "State" % "Qps" % "Qlim" % "Ord" % "Wt" % "Queries" % "Drops" % "Drate" % "Lat" % "Pools") << endl;
-
-      uint64_t totQPS{0}, totQueries{0}, totDrops{0};
-      int counter=0;
-      for(auto& s : g_dstates) {
-       string status;
-       if(s->availability == DownstreamState::Availability::Up) 
-         status = "UP";
-       else if(s->availability == DownstreamState::Availability::Down) 
-         status = "DOWN";
-       else 
-         status = (s->upStatus ? "up" : "down");
-
-       string pools;
-       for(auto& p : s->pools) {
-         if(!pools.empty())
-           pools+=" ";
-         pools+=p;
-       }
-
-       ret << (fmt % counter % s->remote.toStringWithPort() % 
-               status % 
-               s->queryLoad % s->qps.getRate() % s->order % s->weight % s->queries.load() % s->reuseds.load() % (s->dropRate) % (s->latencyUsec/1000.0) % pools) << endl;
-
-       totQPS += s->queryLoad;
-       totQueries += s->queries.load();
-       totDrops += s->reuseds.load();
-       ++counter;
-      }
-      ret<< (fmt % "All" % "" % "" 
-               % 
-            (double)totQPS % "" % "" % "" % totQueries % totDrops % "" % "" % "" ) << endl;
-
-      g_outputBuffer=ret.str();
-      }catch(std::exception& e) { g_outputBuffer=e.what(); throw; }
-    });
-
-  g_lua.writeFunction("addPoolRule", [](boost::variant<string,vector<pair<int, string>> > var, string pool) {
-      SuffixMatchNode smn;
-      NetmaskGroup nmg;
-
-      auto add=[&](string src) {
-       try {
-         smn.add(DNSName(src));
-       } catch(...) {
-         nmg.addMask(src);
-       }
-      };
-      if(auto src = boost::get<string>(&var))
-       add(*src);
-      else {
-       for(auto& a : boost::get<vector<pair<int, string>>>(var)) {
-         add(a.second);
-       }
-      }
-      if(nmg.empty())
-       g_poolrules.push_back({smn, pool});
-      else
-       g_poolrules.push_back({nmg, pool});
-
-    });
-
-  g_lua.writeFunction("showPoolRules", []() {
-      boost::format fmt("%-3d %-50s %s\n");
-      g_outputBuffer += (fmt % "#" % "Object" % "Pool").str();
-      int num=0;
-      for(const auto& lim : g_poolrules) {
-       string name;
-       if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
-         name=nmg->toString();
-       }
-       else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
-         name=smn->toString(); 
-       }
-       g_outputBuffer += (fmt % num % name % lim.second).str();
-       ++num;
-      }
-    });
-
-
-  g_lua.writeFunction("addQPSLimit", [](boost::variant<string,vector<pair<int, string>> > var, int lim) {
-      SuffixMatchNode smn;
-      NetmaskGroup nmg;
-
-      auto add=[&](string src) {
-       try {
-         smn.add(DNSName(src));
-       } catch(...) {
-         nmg.addMask(src);
-       }
-      };
-      if(auto src = boost::get<string>(&var))
-       add(*src);
-      else {
-       for(auto& a : boost::get<vector<pair<int, string>>>(var)) {
-         add(a.second);
-       }
-      }
-      if(nmg.empty())
-       g_limiters.push_back({smn, QPSLimiter(lim, lim)});
-      else
-       g_limiters.push_back({nmg, QPSLimiter(lim, lim)});
-    });
-
-  g_lua.writeFunction("rmQPSLimit", [](int i) {
-      g_limiters.erase(g_limiters.begin() + i);
-    });
-
-  g_lua.writeFunction("showQPSLimits", []() {
-      boost::format fmt("%-3d %-50s %7d %8d %8d\n");
-      g_outputBuffer += (fmt % "#" % "Object" % "Lim" % "Passed" % "Blocked").str();
-      int num=0;
-      for(const auto& lim : g_limiters) {
-       string name;
-       if(auto nmg=boost::get<NetmaskGroup>(&lim.first)) {
-         name=nmg->toString();
-       }
-       else if(auto smn=boost::get<SuffixMatchNode>(&lim.first)) {
-         name=smn->toString(); 
-       }
-       g_outputBuffer += (fmt % num % name % lim.second.getRate() % lim.second.getPassed() % lim.second.getBlocked()).str();
-       ++num;
-      }
-    });
-
-
-  g_lua.writeFunction("getServers", []() {
-      vector<pair<int, std::shared_ptr<DownstreamState> > > ret;
-      int count=1;
-      for(auto& s : g_dstates) {
-       ret.push_back(make_pair(count++, s));
-      }
-      return ret;
-    });
-
-  g_lua.writeFunction("getServer", [](int i) { return g_dstates[i]; });
-
-  g_lua.registerFunction<bool(DownstreamState::*)()>("checkQPS", [](DownstreamState& s) { return s.qps.check(); });
-  g_lua.registerFunction<void(DownstreamState::*)(int)>("setQPS", [](DownstreamState& s, int lim) { s.qps = lim ? QPSLimiter(lim, lim) : QPSLimiter(); });
-  g_lua.registerFunction<void(DownstreamState::*)(string)>("addPool", [](DownstreamState& s, string pool) { s.pools.insert(pool);});
-  g_lua.registerFunction<void(DownstreamState::*)(string)>("rmPool", [](DownstreamState& s, string pool) { s.pools.erase(pool);});
-
-  g_lua.registerFunction<void(DownstreamState::*)()>("getOutstanding", [](const DownstreamState& s) { g_outputBuffer=std::to_string(s.outstanding.load()); });
-
-
-  g_lua.registerFunction("isUp", &DownstreamState::isUp);
-  g_lua.registerFunction("setDown", &DownstreamState::setDown);
-  g_lua.registerFunction("setUp", &DownstreamState::setUp);
-  g_lua.registerFunction("setAuto", &DownstreamState::setAuto);
-  g_lua.registerMember("upstatus", &DownstreamState::upStatus);
-  g_lua.registerMember("weight", &DownstreamState::weight);
-  g_lua.registerMember("order", &DownstreamState::order);
-  
-  g_lua.writeFunction("show", [](const string& arg) {
-      g_outputBuffer+=arg;
-      g_outputBuffer+="\n";
-    });
-
-  g_lua.registerFunction<void(dnsheader::*)(bool)>("setRD", [](dnsheader& dh, bool v) {
-      dh.rd=v;
-    });
-
-  g_lua.registerFunction<bool(dnsheader::*)()>("getRD", [](dnsheader& dh) {
-      return (bool)dh.rd;
-    });
-
-
-  g_lua.registerFunction<void(dnsheader::*)(bool)>("setTC", [](dnsheader& dh, bool v) {
-      dh.tc=v;
-    });
-
-  g_lua.registerFunction<void(dnsheader::*)(bool)>("setQR", [](dnsheader& dh, bool v) {
-      dh.qr=v;
-    });
-
-  std::ifstream ifs(g_vm["config"].as<string>());
-  if(!ifs) 
-    warnlog("Unable to read configuration from '%s'", g_vm["config"].as<string>());
-  else
-    infolog("Read configuration from '%s'", g_vm["config"].as<string>());
-
-  g_lua.registerFunction("tostring", &ComboAddress::toString);
-
-  g_lua.registerFunction("isPartOf", &DNSName::isPartOf);
-  g_lua.registerFunction("tostring", &DNSName::toString);
-  g_lua.writeFunction("newDNSName", [](const std::string& name) { return DNSName(name); });
-  g_lua.writeFunction("newSuffixNode", []() { return SuffixMatchNode(); });
-
-  g_lua.registerFunction("add",(void (SuffixMatchNode::*)(const DNSName&)) &SuffixMatchNode::add);
-  g_lua.registerFunction("check",(bool (SuffixMatchNode::*)(const DNSName&) const) &SuffixMatchNode::check);
-
-  g_lua.writeFunction("controlSocket", [client](const std::string& str) {
-      ComboAddress local(str, 5199);
-
-      if(client) {
-       g_serverControl = local;
-       return;
-      }
-      
-      try {
-       int sock = socket(local.sin4.sin_family, SOCK_STREAM, 0);
-       SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
-       SBind(sock, local);
-       SListen(sock, 5);
-       thread t(controlThread, sock, local);
-       t.detach();
-      }
-      catch(std::exception& e) {
-       errlog("Unable to bind to control socket on %s: %s", local.toStringWithPort(), e.what());
-      }
-    });
-
-  g_lua.writeFunction("getTopQueries", [](unsigned int top, boost::optional<int> labels) {
-      map<DNSName, int> counts;
-      unsigned int total=0;
-      if(!labels) {
-       for(const auto& a : g_rings.queryRing) {
-         counts[a]++;
-         total++;
-       }
-      }
-      else {
-       unsigned int lab = *labels;
-       for(auto a : g_rings.queryRing) {
-         a.trimToLabels(lab);
-         counts[a]++;
-         total++;
-       }
-
-      }
-      cout<<"Looked at "<<total<<" queries, "<<counts.size()<<" different ones"<<endl;
-      vector<pair<int, DNSName>> rcounts;
-      for(const auto& c : counts) 
-       rcounts.push_back(make_pair(c.second, c.first));
-
-      sort(rcounts.begin(), rcounts.end(), [](const decltype(rcounts)::value_type& a, 
-                                             const decltype(rcounts)::value_type& b) {
-            return b.first < a.first;
-          });
-
-      std::unordered_map<int, vector<boost::variant<string,double>>> ret;
-      unsigned int count=1, rest=0;
-      for(const auto& rc : rcounts) {
-       if(count==top+1)
-         rest+=rc.first;
-       else
-         ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
-      }
-      ret.insert({count, {"Rest", rest, 100.0*rest/total}});
-      return ret;
-
-    });
-  
-  g_lua.executeCode(R"(function topQueries(top, labels) for k,v in ipairs(getTopQueries(top,labels)) do show(string.format("%4d  %-40s %4d %4.1f%%",k,v[1],v[2], v[3])) end end)");
-
-  g_lua.writeFunction("getTopResponses", [](unsigned int top, unsigned int kind, boost::optional<int> labels) {
-      map<DNSName, int> counts;
-      unsigned int total=0;
-      {
-       std::lock_guard<std::mutex> lock(g_rings.respMutex);
-       if(!labels) {
-         for(const auto& a : g_rings.respRing) {
-           if(a.rcode!=kind)
-             continue;
-           counts[a.name]++;
-           total++;
-         }
-       }
-       else {
-         unsigned int lab = *labels;
-         for(auto a : g_rings.respRing) {
-           if(a.rcode!=kind)
-             continue;
-
-           a.name.trimToLabels(lab);
-           counts[a.name]++;
-           total++;
-         }
-         
-       }
-      }
-      //      cout<<"Looked at "<<total<<" responses, "<<counts.size()<<" different ones"<<endl;
-      vector<pair<int, DNSName>> rcounts;
-      for(const auto& c : counts) 
-       rcounts.push_back(make_pair(c.second, c.first));
-
-      sort(rcounts.begin(), rcounts.end(), [](const decltype(rcounts)::value_type& a, 
-                                             const decltype(rcounts)::value_type& b) {
-            return b.first < a.first;
-          });
-
-      std::unordered_map<int, vector<boost::variant<string,double>>> ret;
-      unsigned int count=1, rest=0;
-      for(const auto& rc : rcounts) {
-       if(count==top+1)
-         rest+=rc.first;
-       else
-         ret.insert({count++, {rc.second.toString(), rc.first, 100.0*rc.first/total}});
-      }
-      ret.insert({count, {"Rest", rest, 100.0*rest/total}});
-      return ret;
-
-    });
-  
-  g_lua.executeCode(R"(function topResponses(top, kind, labels) for k,v in ipairs(getTopResponses(top, kind, labels)) do show(string.format("%4d  %-40s %4d %4.1f%%",k,v[1],v[2], v[3])) end end)");
-
-
-  g_lua.writeFunction("showResponseLatency", []() {
-
-      map<double, unsigned int> histo;
-      double bin=100;
-      for(int i=0; i < 15; ++i) {
-       histo[bin];
-       bin*=2;
-      }
-
-      double totlat=0;
-      int size=0;
-      {
-       std::lock_guard<std::mutex> lock(g_rings.respMutex);
-       for(const auto& r : g_rings.respRing) {
-         ++size;
-         auto iter = histo.lower_bound(r.usec);
-         if(iter != histo.end())
-           iter->second++;
-         else
-           histo.rbegin()++;
-         totlat+=r.usec;
-       }
-      }
-
-      g_outputBuffer = (boost::format("Average response latency: %.02f msec\n") % (0.001*totlat/size)).str();
-      double highest=0;
-      
-      for(auto iter = histo.cbegin(); iter != histo.cend(); ++iter) {
-       highest=std::max(highest, iter->second*1.0);
-      }
-      boost::format fmt("%7.2f\t%s\n");
-      g_outputBuffer += (fmt % "msec" % "").str();
-
-      for(auto iter = histo.cbegin(); iter != histo.cend(); ++iter) {
-       int stars = (70.0 * iter->second/highest);
-       char c='*';
-       if(!stars && iter->second) {
-         stars=1; // you get 1 . to show something is there..
-         if(70.0*iter->second/highest > 0.5)
-           c=':';
-         else
-           c='.';
-       }
-       g_outputBuffer += (fmt % (iter->first/1000.0) % string(stars, c)).str();
-      }
-    });
-
-  g_lua.writeFunction("newQPSLimiter", [](int rate, int burst) { return QPSLimiter(rate, burst); });
-  g_lua.registerFunction("check", &QPSLimiter::check);
-
-
-  g_lua.writeFunction("makeKey", []() {
-      g_outputBuffer="setKey("+newKey()+")\n";
-    });
-  
-  g_lua.writeFunction("setKey", [](const std::string& key) {
-      if(B64Decode(key, g_key)) 
-       throw std::runtime_error("Unable to decode "+key+" as Base64");
-    });
-
-  
-  g_lua.writeFunction("testCrypto", [](string testmsg)
-   {
-     SodiumNonce sn, sn2;
-     sn.init();
-     sn2=sn;
-     string encrypted = sodEncryptSym(testmsg, g_key, sn);
-     string decrypted = sodDecryptSym(encrypted, g_key, sn2);
-     
-     if(testmsg == decrypted)
-       cerr<<"Everything is ok!"<<endl;
-     else
-       cerr<<"Crypto failed.."<<endl;
-     
-   });
-
-  
-
-  g_lua.executeCode(ifs);
-}
 
 
 void doClient(ComboAddress server)
diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh
new file mode 100644 (file)
index 0000000..7f410a0
--- /dev/null
@@ -0,0 +1,220 @@
+#pragma once
+#include "ext/luawrapper/include/LuaContext.hpp"
+#include <time.h>
+#include "misc.hh"
+#include "iputils.hh"
+#include "dnsname.hh"
+#include <atomic>
+#include <boost/circular_buffer.hpp>
+#include <boost/program_options.hpp>
+
+#include <mutex>
+#include <thread>
+struct StopWatch
+{
+#ifndef CLOCK_MONOTONIC_RAW
+#define CLOCK_MONOTONIC_RAW CLOCK_MONOTONIC
+#endif
+  struct timespec d_start{0,0};
+  void start() {  
+    if(clock_gettime(CLOCK_MONOTONIC_RAW, &d_start) < 0)
+      unixDie("Getting timestamp");
+    
+  }
+  
+  double udiff() const {
+    struct timespec now;
+    if(clock_gettime(CLOCK_MONOTONIC_RAW, &now) < 0)
+      unixDie("Getting timestamp");
+    
+    return 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
+  }
+
+  double udiffAndSet() {
+    struct timespec now;
+    if(clock_gettime(CLOCK_MONOTONIC_RAW, &now) < 0)
+      unixDie("Getting timestamp");
+    
+    auto ret= 1000000.0*(now.tv_sec - d_start.tv_sec) + (now.tv_nsec - d_start.tv_nsec)/1000.0;
+    d_start = now;
+    return ret;
+  }
+
+};
+
+class QPSLimiter
+{
+public:
+  QPSLimiter()
+  {
+  }
+
+  QPSLimiter(unsigned int rate, unsigned int burst) : d_rate(rate), d_burst(burst), d_tokens(burst)
+  {
+    d_passthrough=false;
+    d_prev.start();
+  }
+
+  unsigned int getRate() const
+  {
+    return d_passthrough? 0 : d_rate;
+  }
+
+  int getPassed() const
+  {
+    return d_passed;
+  }
+  int getBlocked() const
+  {
+    return d_blocked;
+  }
+
+  bool check()
+  {
+    if(d_passthrough)
+      return true;
+    auto delta = d_prev.udiffAndSet();
+  
+    d_tokens += 1.0*d_rate * (delta/1000000.0);
+
+    if(d_tokens > d_burst)
+      d_tokens = d_burst;
+
+    bool ret=false;
+    if(d_tokens >= 1.0) { // we need this because burst=1 is weird otherwise
+      ret=true;
+      --d_tokens;
+      d_passed++;
+    }
+    else
+      d_blocked++;
+
+    return ret; 
+  }
+private:
+  bool d_passthrough{true};
+  unsigned int d_rate;
+  unsigned int d_burst;
+  double d_tokens;
+  StopWatch d_prev;
+  unsigned int d_passed{0};
+  unsigned int d_blocked{0};
+};
+
+
+struct IDState
+{
+  IDState() : origFD(-1) {}
+  IDState(const IDState& orig)
+  {
+    origFD = orig.origFD;
+    origID = orig.origID;
+    origRemote = orig.origRemote;
+    age.store(orig.age.load());
+  }
+
+  int origFD;  // set to <0 to indicate this state is empty
+  uint16_t origID;
+  ComboAddress origRemote;
+  StopWatch sentTime;
+  DNSName qname;
+  uint16_t qtype;
+  std::atomic<uint64_t> age;
+};
+
+struct Rings {
+  Rings()
+  {
+    clientRing.set_capacity(10000);
+    queryRing.set_capacity(10000);
+    respRing.set_capacity(10000);
+  }
+  boost::circular_buffer<ComboAddress> clientRing;
+  boost::circular_buffer<DNSName> queryRing;
+  struct Response
+  {
+    DNSName name;
+    uint16_t qtype;
+    uint8_t rcode;
+    unsigned int usec;
+  };
+  boost::circular_buffer<Response> respRing;
+  std::mutex respMutex;
+};
+
+extern Rings  g_rings;
+
+struct DownstreamState
+{
+  DownstreamState(const ComboAddress& remote_);
+
+  int fd;            
+  std::thread tid;
+  ComboAddress remote;
+  QPSLimiter qps;
+  vector<IDState> idStates;
+  std::atomic<uint64_t> idOffset{0};
+  std::atomic<uint64_t> sendErrors{0};
+  std::atomic<uint64_t> outstanding{0};
+  std::atomic<uint64_t> reuseds{0};
+  std::atomic<uint64_t> queries{0};
+  struct {
+    std::atomic<uint64_t> sendErrors{0};
+    std::atomic<uint64_t> reuseds{0};
+    std::atomic<uint64_t> queries{0};
+  } prev;
+  double queryLoad{0.0};
+  double dropRate{0.0};
+  double latencyUsec{0.0};
+  int order{1};
+  int weight{1};
+  StopWatch sw;
+  set<string> pools;
+  enum class Availability { Up, Down, Auto} availability{Availability::Auto};
+  bool upStatus{false};
+  bool isUp() const
+  {
+    if(availability == Availability::Down)
+      return false;
+    if(availability == Availability::Up)
+      return true;
+    return upStatus;
+  }
+  void setUp() { availability = Availability::Up; }
+  void setDown() { availability = Availability::Down; }
+  void setAuto() { availability = Availability::Auto; }
+};
+using servers_t =vector<std::shared_ptr<DownstreamState>>;
+typedef std::function<shared_ptr<DownstreamState>(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh)> policy_t;
+
+
+struct ServerPolicy
+{
+  string name;
+  policy_t policy;
+};
+
+void* responderThread(std::shared_ptr<DownstreamState> state);
+extern std::mutex g_luamutex;
+extern LuaContext g_lua;
+extern ServerPolicy g_policy;
+extern servers_t g_dstates;
+extern std::string g_outputBuffer;
+
+struct dnsheader;
+std::shared_ptr<DownstreamState> firstAvailable(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> leastOutstanding(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> wrandom(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+std::shared_ptr<DownstreamState> roundrobin(const servers_t& servers, const ComboAddress& remote, const DNSName& qname, uint16_t qtype, dnsheader* dh);
+extern vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, QPSLimiter> > g_limiters;
+extern vector<pair<boost::variant<SuffixMatchNode,NetmaskGroup>, std::string> > g_poolrules;
+extern SuffixMatchNode g_suffixMatchNodeFilter;
+
+extern ComboAddress g_serverControl;
+void controlThread(int fd, ComboAddress local);
+extern NetmaskGroup g_ACL;
+void setupLua(bool client);
+extern std::string g_key;
+namespace po = boost::program_options;
+extern po::variables_map g_vm;
+
index 7c0c4f41313817cb10d0836a80bf05cb343930e3..0a502f578d7043ae474690e8be17b74062bcaa71 100644 (file)
@@ -1,4 +1,4 @@
-#include <sodium.h>
+
 #include <iostream>
 #include "namespaces.hh"
 #include "misc.hh"
index 64ed46e368fdc357db5e186210b1ad1553e7d692..dd906903a59e3481b918e5cfe08cc4895ee5e008 100644 (file)
@@ -1,7 +1,7 @@
 #pragma once
 #include <string>
 #include <stdint.h>
-
+#include <sodium.h>
 void sodTest();
 std::string newKeypair();