]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
fixes PowerDNS/pdns#650
authorMark Zealey <mark@markandruth.co.uk>
Mon, 2 Dec 2013 08:57:42 +0000 (10:57 +0200)
committerMark Zealey <mark@markandruth.co.uk>
Mon, 2 Dec 2013 08:57:42 +0000 (10:57 +0200)
The attached patch changes the distributor code in the following ways:

1) Remove (as far as i can see) unused functions which allow for the fetching of the answer from an answers queue. I struggle to understand why this would ever be useful compared to having a callback; it also reduces code complexity and removes some locks. Also allows (2):
2) Split into 3 classes - the new ones being SingleThreadDistributor and MultiThreadDistributor - removes some of the conditional statements. It also means that in distributor-threads=1 mode, NO additional distributor threads are forked (unlike the existing code), and the class will also use less memory and generally be more efficient.

This has been tested in that the pdns server starts up and answers questions correctly in both modes, it's more of an RFC attempt to clean up the code a bit.

pdns/common_startup.cc
pdns/distributor.hh

index eb75054918ef18c16aaf984742d2f391b9909750..0d6205786213017f8a03c073ea9da9db9b378bde 100644 (file)
@@ -211,7 +211,7 @@ int isGuarded(char **argv)
   return !!p;
 }
 
-void sendout(const DNSDistributor::AnswerData &AD)
+void sendout(const AnswerData<DNSPacket> &AD)
 {
   if(!AD.A)
     return;
@@ -228,7 +228,7 @@ void sendout(const DNSDistributor::AnswerData &AD)
 void *qthread(void *number)
 {
   DNSPacket *P;
-  DNSDistributor *distributor = new DNSDistributor(::arg().asNum("distributor-threads")); // the big dispatcher!
+  DNSDistributor *distributor = DNSDistributor::Create(::arg().asNum("distributor-threads")); // the big dispatcher!
   DNSPacket question;
   DNSPacket cached;
 
index 09187e6a6593d585cf7d102f38a1d1b1c8a390d2..9b72bb28ac6ee3e206cc590ed09794149a2bd8b6 100644 (file)
@@ -42,9 +42,7 @@ extern StatBag S;
 /** the Distributor template class enables you to multithread slow question/answer 
     processes. 
     
-    Questions are posed to the Distributor, which can either hand back the answer,
-    or give it directly to a callback. Only the latter mode of operation is used in 
-    PowerDNS. 
+    Questions are posed to the Distributor, which returns the answer via a callback.
 
     The Distributor takes care that there are enough Backends alive at any one
     time and will try to spawn additional ones should they die.
@@ -54,21 +52,64 @@ extern StatBag S;
 
     If an exception escapes a Backend, the distributor retires it.
 */
+template<class Answer>struct AnswerData
+{
+  Answer *A;
+  time_t created;
+};
+
 template<class Answer, class Question, class Backend> class Distributor
 {
 public:
-  Distributor(int n=10); //!< Create a new Distributor with \param n threads
-  struct AnswerData
+  static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
+
+  virtual void cleanup();
+  virtual int question(Question *, void (*)(const AnswerData<Answer> &)) {return 0;}; //!< Submit a question to the Distributor
+  virtual void getQueueSizes(int &questions, int &answers) {}; //!< Returns length of question queue
+
+  virtual int getNumBusy() {return 0;};
+
+  virtual bool isOverloaded() {return false;};
+
+private:
+};
+
+template<class Answer, class Question, class Backend> class SingleThreadDistributor
+    : public Distributor<Answer, Question, Backend>
+{
+public:
+  SingleThreadDistributor();
+  int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
+  void getQueueSizes(int &questions, int &answers) {
+    questions = 0;
+    answers = 0;
+  }
+
+  int getNumBusy()
+  {
+    return 0;
+  }
+
+  bool isOverloaded()
   {
-    Answer *A;
-    time_t created;
-  };  
-  int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor
-  Answer *answer(void); //!< Wait for any answer from the Distributor
-  Answer *wait(Question *); //!< wait for an answer to a specific question
-  int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout
+    return false;
+  }
+
+private:
+  Backend *b;
+};
+
+template<class Answer, class Question, class Backend> class MultiThreadDistributor
+    : public Distributor<Answer, Question, Backend>
+{
+public:
+  MultiThreadDistributor(int n=1);
+  int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
   static void* makeThread(void *); //!< helper function to create our n threads
-  void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue
+  void getQueueSizes(int &questions, int &answers) {
+      numquestions.getValue( &questions );
+      answers = 0;
+  }
 
   int getNumBusy()
   {
@@ -78,11 +119,10 @@ public:
   struct QuestionData
   {
     Question *Q;
-    void (*callback)(const AnswerData &);
+    void (*callback)(const AnswerData<Answer> &);
     int id;
   };
 
-  typedef pair<QuestionData, AnswerData> tuple_t;
   bool isOverloaded()
   {
     return d_overloaded;
@@ -92,12 +132,8 @@ private:
   bool d_overloaded;
   std::queue<QuestionData> questions;
   pthread_mutex_t q_lock;
-  
-  deque<tuple_t> answers;
-  pthread_mutex_t a_lock;
 
   Semaphore numquestions;
-  Semaphore numanswers;
 
   pthread_mutex_t to_mut;
   pthread_cond_t to_cond;
@@ -106,49 +142,59 @@ private:
   time_t d_last_started;
   int d_num_threads;
   AtomicCounter d_idle_threads;
-  Backend *b;
 };
 
-
 //template<class Answer, class Question, class Backend>::nextid;
+template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
+{
+    if( n == 1 )
+        return new SingleThreadDistributor<Answer,Question,Backend>();
+    else
+        return new MultiThreadDistributor<Answer,Question,Backend>( n );
+}
+
+template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
+{
+  L<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
+  b=new Backend;
+}
 
-template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>::Distributor(int n)
+template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
 {
-  b=0;
+  d_num_threads=n;
   d_overloaded = false;
+
   nextid=0;
   // d_idle_threads=0;
   d_last_started=time(0);
 //  sem_init(&numquestions,0,0);
   pthread_mutex_init(&q_lock,0);
 
-//  sem_init(&numanswers,0,0);
-  pthread_mutex_init(&a_lock,0);
-
   pthread_mutex_init(&to_mut,0);
   pthread_cond_init(&to_cond,0);
 
   pthread_t tid;
   
-  d_num_threads=n;
-
   L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
-
   for(int i=0;i<n;i++) {
     pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
     Utility::usleep(50000); // we've overloaded mysql in the past :-)
   }
-
   L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
 }
 
+template<class Answer, class Question, class Backend>void Distributor<Answer,Question,Backend>::cleanup()
+{
+    L<<Logger::Error<< "Cleaning up distributor" <<endl;
+}
+
 // start of a new thread
-template<class Answer, class Question, class Backend>void *Distributor<Answer,Question,Backend>::makeThread(void *p)
+template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
 {
   pthread_detach(pthread_self());
   try {
     Backend *b=new Backend(); // this will answer our questions
-    Distributor *us=static_cast<Distributor *>(p);
+    MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
     int qcount;
 
     // this is so gross
@@ -204,28 +250,11 @@ template<class Answer, class Question, class Backend>void *Distributor<Answer,Qu
         return 0;
       }
 
-      AnswerData AD;
+      AnswerData<Answer> AD;
       AD.A=a;
       AD.created=time(0);
-      tuple_t tuple(QD,AD);
 
-      if(QD.callback) {
-        QD.callback(AD);
-      }
-      else {
-        pthread_mutex_lock(&us->a_lock);
-
-        us->answers.push_back(tuple);
-        pthread_mutex_unlock(&us->a_lock);
-      
-        //       L<<"We have an answer to send! Trying to get to to_mut lock"<<endl;
-        pthread_mutex_lock(&us->to_mut); 
-        // L<<"Yes, we got the lock, we can transmit! First we post"<<endl;
-        us->numanswers.post();
-        // L<<"And now we broadcast!"<<endl;
-        pthread_cond_broadcast(&us->to_cond); // for timeoutWait(); 
-        pthread_mutex_unlock(&us->to_mut);
-      }
+      QD.callback(AD);
     }
     
     delete b;
@@ -239,41 +268,35 @@ template<class Answer, class Question, class Backend>void *Distributor<Answer,Qu
   return 0;
 }
 
-template<class Answer, class Question, class Backend>int Distributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData &))
+template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
 {
-  if(d_num_threads==1 && callback) {  // short circuit
-    if(!b) {
-      L<<Logger::Error<<"Engaging bypass - now operating unthreaded"<<endl;
-      b=new Backend;
-    }
-    Answer *a;
-
-    try {
-      a=b->question(q); // a can be NULL!
-    }
-    catch(const PDNSException &e) {
-      L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
-      delete b;
-      b=0;
-      return 0;
-    }
-    catch(...) {
-      L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
-      delete b;
-      b=0;
-      return 0;
-    }
-
-
-    AnswerData AD;
-    AD.A=a;
-    AD.created=time(0);
-    callback(AD); 
+  Answer *a;
+  try {
+    a=b->question(q); // a can be NULL!
+  }
+  catch(const AhuException &e) {
+    L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
+    delete b;
+    b=new Backend;
     return 0;
   }
-  else {
-    q=new Question(*q);
+  catch(...) {
+    L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
+    delete b;
+    b=new Backend;
+    return 0;
   }
+  AnswerData<Answer> AD;
+  AD.A=a;
+  AD.created=time(0);
+  callback(AD);
+  return 0;
+}
+
+template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
+{
+  // XXX assert callback
+  q=new Question(*q);
 
   DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
 
@@ -326,46 +349,5 @@ template<class Answer, class Question, class Backend>int Distributor<Answer,Ques
   return QD.id;
 }
 
-template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::answer()
-{
-  numanswers.wait();
-  tuple_t tuple;
-
-  pthread_mutex_lock(&a_lock);
-  tuple=answers.front();
-  answers.pop_front();
-  pthread_mutex_unlock(&a_lock);
-  return tuple.second.A;
-}
-
-//! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
-template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::wait(Question *q)
-{
-  for(;;)
-    {
-      numanswers.wait();
-      pthread_mutex_lock(&a_lock);
-      
-      // search if the answer is there
-      tuple_t tuple=answers.front();
-      if(tuple.first==q)
-        {
-          answers.pop_front();
-          pthread_mutex_unlock(&a_lock);
-          return tuple.second.A;
-        }
-      // if not, loop again
-      pthread_mutex_unlock(&a_lock);
-      numanswers.post();
-    }
-  // FIXME: write this
-}
-
-template<class Answer, class Question,class Backend>void Distributor<Answer,Question,Backend>::getQueueSizes(int &questions, int &answers)
-{
-  numquestions.getValue( &questions );
-  numanswers.getValue( &answers );
-}
-
 #endif // DISTRIBUTOR_HH