]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/distributor.hh
auth: Wrap pthread_ objects
[thirdparty/pdns.git] / pdns / distributor.hh
index c025d897f5db4eecceb3bed22ec7406bd4d6cb40..38f08005ef58a2e61f45e658f64416fc87a9c758 100644 (file)
@@ -24,6 +24,7 @@
 #include <deque>
 #include <queue>
 #include <vector>
+#include <thread>
 #include <pthread.h>
 #include "threadname.hh"
 #include <unistd.h>
@@ -88,7 +89,7 @@ public:
   MultiThreadDistributor(int n);
   typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
   int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
-  static void* makeThread(void *); //!< helper function to create our n threads
+  void distribute(int n);
   int getQueueSize() override {
     return d_queued;
   }
@@ -114,7 +115,7 @@ private:
   time_t d_last_started;
   unsigned int d_overloadQueueLength, d_maxQueueLength;
   int d_num_threads;
-  std::atomic<unsigned int> d_queued{0}, d_running{0};
+  std::atomic<unsigned int> d_queued{0};
   std::vector<std::pair<int,int>> d_pipes;
 };
 
@@ -137,6 +138,10 @@ template<class Answer, class Question, class Backend>SingleThreadDistributor<Ans
     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
     _exit(1);
   }
+  catch(const std::exception& e) {
+    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
+    _exit(1);
+  }
   catch(...) {
     g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
     _exit(1);
@@ -151,9 +156,6 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
   nextid=0;
   d_last_started=time(0);
 
-  pthread_t tid;
-  
-
   for(int i=0; i < n; ++i) {
     int fds[2];
     if(pipe(fds) < 0)
@@ -168,7 +170,8 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
 
   g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
   for(int i=0;i<n;i++) {
-    pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
+    std::thread t(std::bind(&MultiThreadDistributor<Answer,Question,Backend>::distribute, this, i));
+    t.detach();
     Utility::usleep(50000); // we've overloaded mysql in the past :-)
   }
   g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
@@ -176,12 +179,9 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
 
 
 // start of a new thread
-template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
+template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
 {
   setThreadName("pdns/distributo");
-  pthread_detach(pthread_self());
-  MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
-  int ournum=us->d_running++;
 
   try {
     std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
@@ -190,9 +190,9 @@ template<class Answer, class Question, class Backend>void *MultiThreadDistributo
     for(;;) {
     
       QuestionData* tempQD = nullptr;
-      if(read(us->d_pipes[ournum].first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
+      if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
        unixDie("read");
-      --us->d_queued;
+      --d_queued;
       std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
       tempQD = nullptr;
       std::unique_ptr<Answer> a = nullptr;
@@ -251,11 +251,14 @@ retry:
     g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
     _exit(1);
   }
+  catch(const std::exception& e) {
+    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
+    _exit(1);
+  }
   catch(...) {
     g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
     _exit(1);
   }
-  return 0;
 }
 
 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
@@ -312,7 +315,7 @@ template<class Answer, class Question, class Backend>int MultiThreadDistributor<
   QD->callback=callback;
 
   ++d_queued;
-  if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) {
+  if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) {
     --d_queued;
     delete QD;
     unixDie("write");