]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/signingpipe.cc
dnsdist: print stats from expungeByName
[thirdparty/pdns.git] / pdns / signingpipe.cc
index 6cd81097d7a5b2367008da2e541278ad52f18662..6c98dd54c62b3da241c5eca1596ed4397871f16b 100644 (file)
@@ -45,34 +45,19 @@ int readn(int fd, void* buffer, unsigned int len)
 }
 }
 
-
-// used to pass information to the new thread
-struct StartHelperStruct
-{
-  StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
-  ChunkedSigningPipe* d_csp;
-  int d_id;
-  int d_fd;
-};
-
-// used to launch the new thread
-void* ChunkedSigningPipe::helperWorker(void* p)
-try
-{
-  StartHelperStruct shs=*(StartHelperStruct*)p;
-  delete (StartHelperStruct*)p;
-  
-  shs.d_csp->worker(shs.d_id, shs.d_fd);
-  return 0;
+void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
+try {
+  csp->worker(fd);
+  return nullptr;
 }
 catch(...) {
-  L<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
-  return 0;
+  g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
+  return nullptr;
 }
 
-ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
+ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
   : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
-    d_maxchunkrecords(100), d_tids(d_numworkers), d_mustSign(mustSign), d_final(false)
+    d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
 {
   d_rrsetToSign = new rrset_t;
   d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
@@ -87,24 +72,26 @@ ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign,
       throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
     setCloseOnExec(fds[0]);
     setCloseOnExec(fds[1]);
-    pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
+    d_threads[n] = std::thread(helperWorker, this, fds[1]);
     setNonBlocking(fds[0]);
     d_sockets.push_back(fds[0]);
+    d_outstandings[fds[0]] = 0;
   }
 }
 
 ChunkedSigningPipe::~ChunkedSigningPipe()
 {
   delete d_rrsetToSign;
+
   if(!d_mustSign)
     return;
+
   for(int fd :  d_sockets) {
     close(fd); // this will trigger all threads to exit
   }
-    
-  void* res;
-  for(pthread_t& tid :  d_tids) {
-    pthread_join(tid, &res);
+
+  for(auto& thread : d_threads) {
+    thread.join();
   }
   //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
 }
@@ -145,7 +132,7 @@ bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
 {
   vector<pollfd> pfds;
-  
+
   for(unsigned int n = 0; n < d_sockets.size(); ++n) {    
     if(d_eof.count(d_sockets[n]))  
       continue;
@@ -158,7 +145,7 @@ pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr,
       pfd.events |= POLLOUT;
     pfds.push_back(pfd);
   }
-  
+
   int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
   if(res < 0)
     unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
@@ -209,12 +196,13 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
   
   pair<vector<int>, vector<int> > rwVect;
   
-  rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen  
+  rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
   
   if(wantWrite && !rwVect.second.empty()) {
     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
     d_rrsetToSign = new rrset_t;
+    d_outstandings[*rwVect.second.begin()]++;
     d_outstanding++;
     d_queued++;
     wantWrite=false;
@@ -231,6 +219,9 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
         while(d_outstanding) {
           int res = readn(fd, &chunk, sizeof(chunk));
           if(!res) {
+            if (d_outstandings[fd] > 0) {
+              throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
+            }
             d_eof.insert(fd);
             break;
           }
@@ -242,6 +233,7 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
           }
           
           --d_outstanding;
+          d_outstandings[fd]--;
           
           addSignedToChunks(chunk);
           
@@ -250,22 +242,23 @@ void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
       }
       if(!d_outstanding || !d_final)
         break;
-      rwVect = waitForRW(1, 0, -1); // wait for something to happen  
+      rwVect = waitForRW(true, false, -1); // wait for something to happen
     }
   }
   
   if(wantWrite) {  // our optimization above failed, we now wait synchronously
-    rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen  
+    rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
     random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
     writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
     d_rrsetToSign = new rrset_t;
+    d_outstandings[*rwVect.second.begin()]++;
     d_outstanding++;
     d_queued++;
   }
   
 }
 
-unsigned int ChunkedSigningPipe::getReady()
+unsigned int ChunkedSigningPipe::getReady() const
 {
    unsigned int sum=0; 
    for(const auto& v :  d_chunks) {
@@ -273,11 +266,12 @@ unsigned int ChunkedSigningPipe::getReady()
    }
    return sum;
 }
-void ChunkedSigningPipe::worker(int id, int fd)
+
+void ChunkedSigningPipe::worker(int fd)
 try
 {
-  DNSSECKeeper dk;
   UeberBackend db("key-only");
+  DNSSECKeeper dk(&db);
   
   chunk_t* chunk = nullptr;
   int res;
@@ -309,12 +303,12 @@ try
 }
 catch(const PDNSException& pe)
 {
-  L<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
+  g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
   close(fd);
 }
 catch(const std::exception& e)
 {
-  L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
+  g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
   close(fd);
 }
 
@@ -347,62 +341,4 @@ vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
   return front;
 }
 
-#if 0
-
-  ServiceTuple st;
-  ComboAddress remote;
-  if(!servers.empty()) {
-    st.port=2000;
-    parseService(servers, st);
-    remote=ComboAddress(st.host, st.port);
-  }
-  
-  ///
-    if(!servers.empty()) {
-      fds[0] = socket(AF_INET, SOCK_STREAM, 0);
-      fds[1] = -1;
-      
-      if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
-        unixDie("Connecting to signing server");
-    }
-    else {
-/////
-      signal(SIGCHLD, SIG_IGN);
-      if(!fork()) { // child
-        dup2(fds[1], 0);
-        execl("./pdnsutil", "./pdnsutil", "--config-dir=./", "signing-slave", NULL);
-        // helperWorker(new StartHelperStruct(this, n));
-        return;
-      }
-      else 
-        close(fds[1]);
-#endif
-
-#if 0
-bool readLStringFromSocket(int fd, string& msg)
-{
-  msg.clear();
-  uint32_t len;
-  if(!readn(fd, &len, sizeof(len)))
-    return false;
-  
-  len = ntohl(len);
-  
-  scoped_array<char> buf(new char[len]);
-  readn(fd, buf.get(), len);
-  
-  msg.assign(buf.get(), len);
-  return true;
-}
-void writeLStringToSocket(int fd, const string& msg)
-{
-  string realmsg;
-  uint32_t len = htonl(msg.length());
-  string tot((char*)&len, 4);
-  tot+=msg;
-  
-  writen2(fd, tot.c_str(), tot.length());
-}
-
-#endif