]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
big cleanup of distributor, with free unit tests.
authorbert hubert <bert.hubert@netherlabs.nl>
Thu, 28 May 2015 11:50:58 +0000 (13:50 +0200)
committerbert hubert <bert.hubert@netherlabs.nl>
Thu, 28 May 2015 11:51:38 +0000 (13:51 +0200)
pdns/Makefile.am
pdns/common_startup.cc
pdns/distributor.hh
pdns/test-distributor_hh.cc [new file with mode: 0644]

index 84ec5431892276728728d44685c34b030e70eba1..054c2f069966d002b68a9876bb31c4f8bd58e530 100644 (file)
@@ -964,6 +964,7 @@ testrunner_SOURCES = \
        test-base32_cc.cc \
        test-base64_cc.cc \
        test-bindparser_cc.cc \
+       test-distributor_hh.cc \
        test-dns_random_hh.cc \
        test-dnsname_cc.cc \
        test-dnsrecords_cc.cc \
index 16ccc7f50e08899485c458377b830afe3d75091a..118373aa0999f87bcaf98286014dd23601a9a32f 100644 (file)
@@ -202,10 +202,7 @@ try
   BOOST_FOREACH(DNSDistributor* d, g_distributors) {
     if(!d)
       continue;
-    int qcount, acount;
-    
-    d->getQueueSizes(qcount, acount);  // this does locking and other things, so don't get smart
-    totcount+=qcount;
+    totcount += d->getQueueSize();  // this does locking and other things, so don't get smart
   }
   return totcount;
 }
@@ -303,16 +300,16 @@ int isGuarded(char **argv)
   return !!p;
 }
 
-void sendout(const AnswerData<DNSPacket> &AD)
+void sendout(DNSPacket* a)
 {
-  if(!AD.A)
+  if(!a)
     return;
   
-  N->send(AD.A);
+  N->send(a);
 
-  int diff=AD.A->d_dt.udiff();
+  int diff=a->d_dt.udiff();
   avg_latency=(int)(0.999*avg_latency+0.001*diff);
-  delete AD.A;  
+  delete a;  
 }
 
 //! The qthread receives questions over the internet via the Nameserver class, and hands them to the Distributor for further processing
@@ -429,7 +426,12 @@ void *qthread(void *number)
     if(logDNSQueries) 
       L<<"packetcache MISS"<<endl;
 
-    distributor->question(P, &sendout); // otherwise, give to the distributor
+    try {
+      distributor->question(P, &sendout); // otherwise, give to the distributor
+    }
+    catch(DistributorFatal& df) { // when this happens, we have leaked loads of memory. Bailing out time.
+      _exit(1);
+    }
   }
   return 0;
 }
index ba3ce0e248ba902834d3b253dfcdfd878eac9308..b842bf9a3c7684546cdd4666aba66c956e61f1ec 100644 (file)
 #include <queue>
 #include <vector>
 #include <pthread.h>
-#include <semaphore.h>
 #include <unistd.h>
 #include "logger.hh"
 #include "dns.hh"
 #include "dnsbackend.hh"
 #include "pdnsexception.hh"
 #include "arguments.hh"
+#include <atomic>
 #include "statbag.hh"
 
 extern StatBag S;
@@ -44,33 +44,19 @@ extern StatBag S;
     
     Questions are posed to the Distributor, which returns the answer via a callback.
 
-    The Distributor takes care that there are enough Backends alive at any one
-    time and will try to spawn additional ones should they die.
-
-    The Backend needs to count the number of living instances and supply this number to
-    the Distributor using its numBackends() method. This is silly.
-
-    If an exception escapes a Backend, the distributor retires it.
+    The Distributor spawns sufficient backends, and if they thrown an exception,
+    it will cycle the backend but drop the query that was active during the exception.
 */
-template<class Answer>struct AnswerData
-{
-  Answer *A;
-};
 
 template<class Answer, class Question, class Backend> class Distributor
 {
 public:
   static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
-
-  virtual void cleanup();
-  virtual int question(Question *, void (*)(const AnswerData<Answer> &)) =0; //!< Submit a question to the Distributor
-  virtual void getQueueSizes(int &questions, int &answers) =0; //!< Returns length of question queue
-
-  virtual int getNumBusy() =0;
-
+  typedef std::function<void(Answer*)> callback_t;
+  virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
+  virtual int getQueueSize() =0; //!< Returns length of question queue
   virtual bool isOverloaded() =0;
 
-private:
 };
 
 template<class Answer, class Question, class Backend> class SingleThreadDistributor
@@ -78,14 +64,9 @@ template<class Answer, class Question, class Backend> class SingleThreadDistribu
 {
 public:
   SingleThreadDistributor();
-  int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
-  void getQueueSizes(int &questions, int &answers) {
-    questions = 0;
-    answers = 0;
-  }
-
-  int getNumBusy()
-  {
+  typedef std::function<void(Answer*)> callback_t;
+  int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
+  int getQueueSize() {
     return 0;
   }
 
@@ -98,49 +79,40 @@ public:
     if (b) delete b;
   }
 private:
-  Backend *b;
+  Backend *b{0};
 };
 
 template<class Answer, class Question, class Backend> class MultiThreadDistributor
     : public Distributor<Answer, Question, Backend>
 {
 public:
-  MultiThreadDistributor(int n=1);
-  int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
+  MultiThreadDistributor(int n);
+  typedef std::function<void(Answer*)> callback_t;
+  int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
   static void* makeThread(void *); //!< helper function to create our n threads
-  void getQueueSizes(int &questions, int &answers) {
-      numquestions.getValue( &questions );
-      answers = 0;
-  }
-
-  int getNumBusy()
-  {
-    return d_num_threads-d_idle_threads;
+  int getQueueSize() override {
+    return d_queued;
   }
 
   struct QuestionData
   {
     Question *Q;
-    void (*callback)(const AnswerData<Answer> &);
+    callback_t callback;
     int id;
   };
 
-  bool isOverloaded()
+  bool isOverloaded() override
   {
     return d_overloaded;
   }
   
 private:
   bool d_overloaded;
-  std::queue<QuestionData> questions;
-  pthread_mutex_t q_lock;
-
-  Semaphore numquestions;
-
   int nextid;
   time_t d_last_started;
   int d_num_threads;
-  AtomicCounter d_idle_threads;
+  std::atomic<unsigned int> d_queued{0}, d_running{0};
+  std::vector<std::pair<int,int>> d_pipes;
 };
 
 //template<class Answer, class Question, class Backend>::nextid;
@@ -164,13 +136,18 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
   d_overloaded = false;
 
   nextid=0;
-  // d_idle_threads=0;
   d_last_started=time(0);
-//  sem_init(&numquestions,0,0);
-  pthread_mutex_init(&q_lock,0);
 
   pthread_t tid;
   
+
+  for(int i=0; i < n; ++i) {
+    int fds[2];
+    if(pipe(fds) < 0)
+      unixDie("Creating pipe");
+    d_pipes.push_back({fds[0],fds[1]});
+  }
+  
   L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
   for(int i=0;i<n;i++) {
     pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
@@ -179,89 +156,77 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
   L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
 }
 
-template<class Answer, class Question, class Backend>void Distributor<Answer,Question,Backend>::cleanup()
-{
-    L<<Logger::Error<< "Cleaning up distributor" <<endl;
-}
 
 // start of a new thread
 template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
 {
   pthread_detach(pthread_self());
+  MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
+  int ournum=us->d_running++;
+
   try {
     Backend *b=new Backend(); // this will answer our questions
-    MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
-    int qcount;
-
     int queuetimeout=::arg().asNum("queue-limit"); 
-    static int overloadQueueLength=::arg().asNum("overload-queue-length");
-    for(;;) {
-      ++(us->d_idle_threads);
-
-      us->numquestions.getValue( &qcount );
 
-      us->numquestions.wait();
-
-      --(us->d_idle_threads);
-      pthread_mutex_lock(&us->q_lock);
-
-      QuestionData QD=us->questions.front();
-
-      us->questions.pop();
-      pthread_mutex_unlock(&us->q_lock);
-
-      Question *q=QD.Q;
-      
-
-      if(us->d_overloaded && qcount <= overloadQueueLength/10) {
-        us->d_overloaded=false;
-      }
-      
+    for(;;) {
+    
+      QuestionData* QD;
+      if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
+       unixDie("read");
+      --us->d_queued;
       Answer *a; 
 
-      if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) {
-        delete q;
+      if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
+        delete QD->Q;
+       delete QD;
         S.inc("timedout-packets");
         continue;
       }        
       // this is the only point where we interact with the backend (synchronous)
       try {
-        a=b->question(q); // a can be NULL!
-        delete q;
+        a=b->question(QD->Q); 
+       delete QD->Q;
       }
       catch(const PDNSException &e) {
         L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
-        a=q->replyPacket();
+       delete b;
+       b=new Backend();
+        a=QD->Q->replyPacket();
+
         a->setRcode(RCode::ServFail);
         S.inc("servfail-packets");
-        S.ringAccount("servfail-queries",q->qdomain);
+        S.ringAccount("servfail-queries",QD->Q->qdomain);
+
+       delete QD->Q;
       }
       catch(...) {
-        L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
-        a=q->replyPacket();
+        L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
+       delete b;
+       b=new Backend();
+        a=QD->Q->replyPacket();
+       
         a->setRcode(RCode::ServFail);
         S.inc("servfail-packets");
-        S.ringAccount("servfail-queries",q->qdomain);
+        S.ringAccount("servfail-queries",QD->Q->qdomain);
+       delete QD->Q;
       }
 
-      AnswerData<Answer> AD;
-      AD.A=a;
-
-      QD.callback(AD);
+      QD->callback(a);
+      delete QD;
     }
     
     delete b;
   }
   catch(const PDNSException &AE) {
-    L<<Logger::Error<<Logger::NTLog<<"Distributor caught fatal exception: "<<AE.reason<<endl;
+    L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
   }
   catch(...) {
-    L<<Logger::Error<<Logger::NTLog<<"Caught an unknown exception when creating backend, probably"<<endl;
+    L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
   }
   return 0;
 }
 
-template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
+template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
 {
   Answer *a;
   try {
@@ -277,7 +242,7 @@ template<class Answer, class Question, class Backend>int SingleThreadDistributor
     S.ringAccount("servfail-queries",q->qdomain);
   }
   catch(...) {
-    L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
+    L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
     delete b;
     b=new Backend;
     a=q->replyPacket();
@@ -285,66 +250,39 @@ template<class Answer, class Question, class Backend>int SingleThreadDistributor
     S.inc("servfail-packets");
     S.ringAccount("servfail-queries",q->qdomain);
   }
-  AnswerData<Answer> AD;
-  AD.A=a;
-  callback(AD);
+  callback(a);
   return 0;
 }
 
-template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
+struct DistributorFatal{};
+
+template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
 {
-  // XXX assert callback
   q=new Question(*q);
 
-  DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
-
-  /* the line below is a bit difficult.
-     What happens is that we have a goal for the number of running distributor threads. Furthermore, other
-     parts of PowerDNS also start backends, which get included in this count.
-
-     If less than two threads now die, no new ones will be spawned.
-
-     The solutionis to add '+2' below, but it is not a pretty solution. Better solution is
-     to only account the number of threads within the Distributor, and not in the backend.
-
-     XXX FIXME 
-  */
-
-  if(Backend::numRunning() < d_num_threads+2 && time(0)-d_last_started>5) { 
-    d_last_started=time(0);
-    L<<"Distributor misses a thread ("<<Backend::numRunning()<<"<"<<d_num_threads + 2<<"), spawning new one"<<endl;
-    pthread_t tid;
-    pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
-  }
-
-  QuestionData QD;
-  QD.Q=q;
-  QD.id=nextid++;
-  QD.callback=callback;
-
-  pthread_mutex_lock(&q_lock);
-  questions.push(QD);
-  pthread_mutex_unlock(&q_lock);
-
-  numquestions.post();
+  auto QD=new QuestionData();
+  QD->Q=q;
+  auto ret = QD->id = nextid++; // might be deleted after write!
+  QD->callback=callback;
   
-  static int overloadQueueLength=::arg().asNum("overload-queue-length");
+  if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD))
+    unixDie("write");
 
-  if(!(nextid%50)) {
-    int val;
-    numquestions.getValue( &val );
-    
-    if(!d_overloaded)
-      d_overloaded = overloadQueueLength && (val > overloadQueueLength);
+  d_queued++;
+  
+  static unsigned int overloadQueueLength=::arg().asNum("overload-queue-length");
+  static unsigned int maxQueueLength=::arg().asNum("max-queue-length");
 
-    if(val>::arg().asNum("max-queue-length")) {
-      L<<Logger::Error<<val<<" questions waiting for database attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
-      _exit(1);
-    }
+  if(overloadQueueLength) 
+    d_overloaded= d_queued > overloadQueueLength;
 
+  if(d_queued > maxQueueLength) {
+    L<<Logger::Error<< d_queued <<" questions waiting for database attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
+    // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
+    throw DistributorFatal();
   }
-
-  return QD.id;
+   
+  return ret;
 }
 
 #endif // DISTRIBUTOR_HH
diff --git a/pdns/test-distributor_hh.cc b/pdns/test-distributor_hh.cc
new file mode 100644 (file)
index 0000000..f9e77fb
--- /dev/null
@@ -0,0 +1,144 @@
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <stdlib.h>
+#include <unistd.h>
+#include <boost/test/unit_test.hpp>
+#include "distributor.hh"
+#include "dnspacket.hh"
+#include "namespaces.hh" 
+
+BOOST_AUTO_TEST_SUITE(test_distributor_hh)
+
+struct Question
+{
+  int q;
+  DTime d_dt;
+  string qdomain;
+  DNSPacket* replyPacket()
+  {
+    return new DNSPacket();
+  }
+};
+
+struct Backend
+{
+  DNSPacket* question(Question*)
+  {
+    return new DNSPacket();
+  }
+};
+
+static std::atomic<int> g_receivedAnswers;
+static void report(DNSPacket* A)
+{
+  delete A;
+  g_receivedAnswers++;
+}
+
+BOOST_AUTO_TEST_CASE(test_distributor_basic) {
+  ::arg().set("overload-queue-length","Maximum queuelength moving to packetcache only")="0";
+  ::arg().set("max-queue-length","Maximum queuelength before considering situation lost")="5000";
+  ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500";
+  S.declare("servfail-packets","Number of times a server-failed packet was sent out");
+  S.declare("timedout-packets", "timedout-packets");
+
+  auto d=Distributor<DNSPacket, Question, Backend>::Create(2);
+
+  int n;
+  for(n=0; n < 100; ++n)  {
+    auto q = new Question();
+    q->d_dt.set(); 
+    d->question(q, report);
+  }
+  sleep(1);
+  BOOST_CHECK_EQUAL(n, g_receivedAnswers);
+};
+
+struct BackendSlow
+{
+  DNSPacket* question(Question*)
+  {
+    sleep(1);
+    return new DNSPacket();
+  }
+};
+
+
+BOOST_AUTO_TEST_CASE(test_distributor_queue) {
+  auto d=Distributor<DNSPacket, Question, BackendSlow>::Create(2);
+
+  BOOST_CHECK_EXCEPTION( {
+    int n;
+    for(n=0; n < 6000; ++n)  {
+      auto q = new Question();
+      q->d_dt.set(); 
+      d->question(q, report);
+    }
+    }, DistributorFatal, [](DistributorFatal) { return true; });
+};
+
+struct BackendDies
+{
+  BackendDies()
+  {
+    d_ourcount=s_count++;
+  }
+  ~BackendDies()
+  {
+  }
+  DNSPacket* question(Question* q)
+  {
+    //  cout<<"Q: "<<q->qdomain<<endl;
+    if(!d_ourcount && ++d_count == 10) {
+      // cerr<<"Going.. down!"<<endl;
+      throw runtime_error("kill");
+    }
+    return new DNSPacket();
+  }
+  static std::atomic<int> s_count;
+  int d_count{0};
+  int d_ourcount;
+};
+
+std::atomic<int> BackendDies::s_count;
+
+std::atomic<int> g_receivedAnswers2;
+
+static void report2(DNSPacket* A)
+{
+  delete A;
+  g_receivedAnswers2++;
+}
+
+
+BOOST_AUTO_TEST_CASE(test_distributor_dies) {
+  auto d=Distributor<DNSPacket, Question, BackendDies>::Create(10);
+  sleep(1);
+  g_receivedAnswers=0;
+
+  try {
+    for(int n=0; n < 100; ++n)  {
+      auto q = new Question();
+      q->d_dt.set(); 
+      q->qdomain=std::to_string(n);
+      d->question(q, report2);
+    }
+
+    sleep(1);
+    cerr<<"Queued: "<<d->getQueueSize()<<endl;
+    cerr<<"Received: "<<g_receivedAnswers2<<endl;
+  }
+  catch(std::exception& e) {
+    cerr<<e.what()<<endl;
+  }
+  catch(PDNSException &pe) {
+    cerr<<pe.reason<<endl;
+  }
+};
+
+
+
+BOOST_AUTO_TEST_SUITE_END();