]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/distributor.hh
Merge pull request #9070 from rgacogne/boost-173
[thirdparty/pdns.git] / pdns / distributor.hh
index aa53f64a11050ef3458b5c4f41dff856fb8f1c45..38f08005ef58a2e61f45e658f64416fc87a9c758 100644 (file)
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  */
-#ifndef DISTRIBUTOR_HH
-#define DISTRIBUTOR_HH
-
+#pragma once
 #include <string>
 #include <deque>
 #include <queue>
 #include <vector>
+#include <thread>
 #include <pthread.h>
 #include "threadname.hh"
 #include <unistd.h>
@@ -51,11 +50,12 @@ extern StatBag S;
 template<class Answer, class Question, class Backend> class Distributor
 {
 public:
-  static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
-  typedef std::function<void(Answer*)> callback_t;
-  virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
+  static DistributorCreate(int n=1); //!< Create a new Distributor with \param n threads
+  typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
+  virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
   virtual int getQueueSize() =0; //!< Returns length of question queue
   virtual bool isOverloaded() =0;
+  virtual ~Distributor() { cerr<<__func__<<endl;}
 };
 
 template<class Answer, class Question, class Backend> class SingleThreadDistributor
@@ -65,8 +65,8 @@ public:
   SingleThreadDistributor(const SingleThreadDistributor&) = delete;
   void operator=(const SingleThreadDistributor&) = delete;
   SingleThreadDistributor();
-  typedef std::function<void(Answer*)> callback_t;
-  int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
+  typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
+  int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
   int getQueueSize() override {
     return 0;
   }
@@ -76,11 +76,8 @@ public:
     return false;
   }
 
-  ~SingleThreadDistributor() {
-    if (b) delete b;
-  }
 private:
-  Backend *b{0};
+  std::unique_ptr<Backend> b{nullptr};
 };
 
 template<class Answer, class Question, class Backend> class MultiThreadDistributor
@@ -90,16 +87,20 @@ public:
   MultiThreadDistributor(const MultiThreadDistributor&) = delete;
   void operator=(const MultiThreadDistributor&) = delete;
   MultiThreadDistributor(int n);
-  typedef std::function<void(Answer*)> callback_t;
-  int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
-  static void* makeThread(void *); //!< helper function to create our n threads
+  typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
+  int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
+  void distribute(int n);
   int getQueueSize() override {
     return d_queued;
   }
 
   struct QuestionData
   {
-    Question *Q;
+    QuestionData(const Question& query): Q(query)
+    {
+    }
+
+    Question Q;
     callback_t callback;
     int id;
   };
@@ -108,35 +109,39 @@ public:
   {
     return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
   }
-  
+
 private:
   int nextid;
   time_t d_last_started;
   unsigned int d_overloadQueueLength, d_maxQueueLength;
   int d_num_threads;
-  std::atomic<unsigned int> d_queued{0}, d_running{0};
+  std::atomic<unsigned int> d_queued{0};
   std::vector<std::pair<int,int>> d_pipes;
 };
 
 //template<class Answer, class Question, class Backend>::nextid;
-template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
+template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
 {
     if( n == 1 )
-        return new SingleThreadDistributor<Answer,Question,Backend>();
+      return new SingleThreadDistributor<Answer,Question,Backend>();
     else
-        return new MultiThreadDistributor<Answer,Question,Backend>( n );
+      return new MultiThreadDistributor<Answer,Question,Backend>( n );
 }
 
 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
 {
   g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
   try {
-    b=new Backend;
+    b=make_unique<Backend>();
   }
   catch(const PDNSException &AE) {
     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
     _exit(1);
   }
+  catch(const std::exception& e) {
+    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
+    _exit(1);
+  }
   catch(...) {
     g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
     _exit(1);
@@ -151,9 +156,6 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
   nextid=0;
   d_last_started=time(0);
 
-  pthread_t tid;
-  
-
   for(int i=0; i < n; ++i) {
     int fds[2];
     if(pipe(fds) < 0)
@@ -168,7 +170,8 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
 
   g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
   for(int i=0;i<n;i++) {
-    pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
+    std::thread t(std::bind(&MultiThreadDistributor<Answer,Question,Backend>::distribute, this, i));
+    t.detach();
     Utility::usleep(50000); // we've overloaded mysql in the past :-)
   }
   g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
@@ -176,28 +179,25 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
 
 
 // start of a new thread
-template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
+template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
 {
   setThreadName("pdns/distributo");
-  pthread_detach(pthread_self());
-  MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
-  int ournum=us->d_running++;
 
   try {
-    Backend *b=new Backend(); // this will answer our questions
+    std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
     int queuetimeout=::arg().asNum("queue-limit"); 
 
     for(;;) {
     
-      QuestionData* QD;
-      if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
+      QuestionData* tempQD = nullptr;
+      if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
        unixDie("read");
-      --us->d_queued;
-      Answer *a; 
+      --d_queued;
+      std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
+      tempQD = nullptr;
+      std::unique_ptr<Answer> a = nullptr;
 
-      if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
-        delete QD->Q;
-       delete QD;
+      if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) {
         S.inc("timedout-packets");
         continue;
       }        
@@ -208,40 +208,33 @@ retry:
       try {
         if (!b) {
           allowRetry=false;
-          b=new Backend();
+          b=make_unique<Backend>();
         }
         a=b->question(QD->Q);
-        delete QD->Q;
       }
       catch(const PDNSException &e) {
-        delete b;
-        b=NULL;
+        b.reset();
         if (!allowRetry) {
           g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
-          a=QD->Q->replyPacket();
+          a=QD->Q.replyPacket();
 
           a->setRcode(RCode::ServFail);
           S.inc("servfail-packets");
-          S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
-
-          delete QD->Q;
+          S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
         } else {
           g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
           goto retry;
         }
       }
       catch(...) {
-        delete b;
-        b=NULL;
+        b.reset();
         if (!allowRetry) {
           g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
-          a=QD->Q->replyPacket();
+          a=QD->Q.replyPacket();
 
           a->setRcode(RCode::ServFail);
           S.inc("servfail-packets");
-          S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
-
-          delete QD->Q;
+          S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
         } else {
           g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
           goto retry;
@@ -249,59 +242,60 @@ retry:
       }
 
       QD->callback(a);
-      delete QD;
+      QD.reset();
     }
 
-    delete b;
+    b.reset();
   }
   catch(const PDNSException &AE) {
     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
     _exit(1);
   }
+  catch(const std::exception& e) {
+    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
+    _exit(1);
+  }
   catch(...) {
     g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
     _exit(1);
   }
-  return 0;
 }
 
-template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
+template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
 {
-  Answer *a;
+  std::unique_ptr<Answer> a = nullptr;
   bool allowRetry=true;
 retry:
   try {
     if (!b) {
       allowRetry=false;
-      b=new Backend;
+      b=make_unique<Backend>();
     }
     a=b->question(q); // a can be NULL!
   }
   catch(const PDNSException &e) {
-    delete b;
-    b=NULL;
+    b.reset();
     if (!allowRetry) {
       g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
-      a=q->replyPacket();
+      a=q.replyPacket();
 
       a->setRcode(RCode::ServFail);
       S.inc("servfail-packets");
-      S.ringAccount("servfail-queries",q->qdomain.toLogString());
+      S.ringAccount("servfail-queries", q.qdomain, q.qtype);
     } else {
       g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
       goto retry;
     }
   }
   catch(...) {
-    delete b;
-    b=NULL;
+    b.reset();
     if (!allowRetry) {
       g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
-      a=q->replyPacket();
+      a=q.replyPacket();
 
       a->setRcode(RCode::ServFail);
       S.inc("servfail-packets");
-      S.ringAccount("servfail-queries",q->qdomain.toLogString());
+      S.ringAccount("servfail-queries", q.qdomain, q.qtype);
     } else {
       g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
       goto retry;
@@ -313,31 +307,25 @@ retry:
 
 struct DistributorFatal{};
 
-template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
+template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
 {
-  q=new Question(*q);
-
   // this is passed to other process over pipe and released there
-  auto QD=new QuestionData();
-  QD->Q=q;
+  auto QD=new QuestionData(q);
   auto ret = QD->id = nextid++; // might be deleted after write!
   QD->callback=callback;
-  
-  if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD))
-    unixDie("write");
-
-  d_queued++;
-
 
+  ++d_queued;
+  if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) {
+    --d_queued;
+    delete QD;
+    unixDie("write");
+  }
 
   if(d_queued > d_maxQueueLength) {
     g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
     // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
     throw DistributorFatal();
   }
-   
+
   return ret;
 }
-
-#endif // DISTRIBUTOR_HH
-