]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
make the signingpipe multithreaded, achieving around 8000 RSASHA256/1024 signatures...
authorBert Hubert <bert.hubert@netherlabs.nl>
Tue, 1 Feb 2011 23:12:40 +0000 (23:12 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Tue, 1 Feb 2011 23:12:40 +0000 (23:12 +0000)
git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@1960 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/common_startup.cc
pdns/signingpipe.cc
pdns/signingpipe.hh
pdns/tcpreceiver.cc

index 39329d239adb8da62f702f9592ff7a01ae9cb3c7..986c75311c4a439642d7901d37d8728c3e08b78d 100644 (file)
@@ -66,6 +66,7 @@ void declareArguments()
   ::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="4";
   ::arg().set("default-soa-name","name to insert in the SOA record if none set in the backend")="a.misconfigured.powerdns.server";
   ::arg().set("distributor-threads","Default number of Distributor (backend) threads to start")="3";
+  ::arg().set("signing-threads","Default number of signer threads to start")="3";
   ::arg().set("receiver-threads","Default number of Distributor (backend) threads to start")="1";
   ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500"; 
   ::arg().set("recursor","If recursion is desired, IP address of a recursing nameserver")="no"; 
index 15d0d2b7881eba5dacea11da6f0ddd8544565ee5..1c199063985e2557495f5065c09e8f18cee7140e 100644 (file)
@@ -1,5 +1,43 @@
 #include "signingpipe.hh"
 
+AtomicCounter ChunkedSigningPipe::s_workerid;
+
+void* ChunkedSigningPipe::helperWorker(void* p)
+{
+  ChunkedSigningPipe* us = (ChunkedSigningPipe*)p;
+  us->worker();
+  return 0;
+}
+
+ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers) 
+  : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100), d_outstanding(0), d_numworkers(workers), d_tids(d_numworkers),
+    d_mustSign(mustSign)
+{
+  if(!d_mustSign)
+    return;
+  if(pipe(d_uppipe) < 0 || pipe(d_backpipe))
+    throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe");
+  
+  Utility::setNonBlocking(d_backpipe[0]);
+  for(unsigned int n=0; n < d_numworkers; ++n) {
+    pthread_create(&d_tids[n], 0, helperWorker, (void*) this);
+  }
+}
+
+ChunkedSigningPipe::~ChunkedSigningPipe()
+{
+  if(!d_mustSign)
+    return;
+  close(d_uppipe[1]); // this will trigger all threads to exit
+  void* res;
+  for(unsigned int n = 0; n < d_numworkers; ++n)
+    pthread_join(d_tids[n], &res);
+  
+  close(d_backpipe[1]);
+  close(d_backpipe[0]);
+  close(d_uppipe[0]);
+}
+
 bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
 {
   if(!d_toSign.empty() && (d_toSign.begin()->qtype.getCode() != rr.qtype.getCode()  ||  !pdns_iequals(d_toSign.begin()->qname, rr.qname))) 
@@ -10,18 +48,66 @@ bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
   return d_chunk.size() > d_chunkrecords;
 }
 
+void ChunkedSigningPipe::sendChunkToSign()
+{
+  if(!d_mustSign) {
+    copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk));
+    d_toSign.clear();
+    return;
+  }
+  if(!d_toSign.empty()) {
+    chunk_t* toSign = new chunk_t(d_toSign);
+    
+    if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign)) 
+      throw runtime_error("Partial write or error communicating to signing thread");
+    d_outstanding++;
+  }
+  chunk_t* signedChunk;
+  
+  while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) {
+    --d_outstanding;
+    copy(signedChunk->begin(), signedChunk->end(), back_inserter(d_chunk));
+    delete signedChunk;
+  }
+  
+  d_toSign.clear();
+}
+
+void ChunkedSigningPipe::worker()
+{
+  //int my_id = ++s_workerid;
+  // cout<<my_id<<" worker reporting!"<<endl;
+  chunk_t* chunk;
+  
+  DNSSECKeeper dk;
+  int res;
+  for(;;) {
+    res=read(d_uppipe[0], &chunk, sizeof(chunk));
+    if(!res) {
+      // cerr<<my_id<<" exiting"<<endl;
+      break;
+    }
+    if(res != sizeof(chunk))
+      unixDie("error or partial read from ChunkedSigningPipe main thread");
+    // cout<< my_id <<" worker signing!"<<endl;
+    addRRSigs(dk, d_db, d_signer, *chunk); // should start returning sigs separately instead of interleaved  
+    if(write(d_backpipe[1], &chunk, sizeof(chunk)) != sizeof(chunk))
+      unixDie("error writing back to ChunkedSigningPipe");
+  }
+}
+
 void ChunkedSigningPipe::flushToSign()
 {
-  addRRSigs(d_dk, d_db, d_signer, d_toSign); // should start returning sigs separately instead of interleaved
-  copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk));
+  sendChunkToSign();
   d_toSign.clear();
 }
 
 vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
 {
-  if(final)
+  if(final) {
+    Utility::setBlocking(d_backpipe[0]);
     flushToSign();
-  
+  }
   
   chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size());
   chunk_t chunk;
index a72f9391b413474237d912a12ee41966bcca6bc8..def6dbee1b4819c7a98f6e6625b41f50806067dc 100644 (file)
@@ -1,6 +1,7 @@
 #ifndef PDNS_SIGNINGPIPE
 #define PDNS_SIGNINGPIPE
 #include <vector>
+#include <pthread.h>
 #include "dnsseckeeper.hh"
 #include "dns.hh"
 using std::string;
@@ -15,17 +16,29 @@ class ChunkedSigningPipe
 public:
   typedef vector<DNSResourceRecord> chunk_t; 
   
-  ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName) : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100) {}
+  ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3);
+  ~ChunkedSigningPipe();
   bool submit(const DNSResourceRecord& rr);
   chunk_t getChunk(bool final=false);
 private:
   void flushToSign();  
-
+  
+  void sendChunkToSign(); // dispatch chunk to worker
+  void worker();
+  
+  static void* helperWorker(void* p);
   chunk_t d_toSign, d_chunk;
   DNSSECKeeper& d_dk;
   UeberBackend& d_db;
   string d_signer;
   chunk_t::size_type d_chunkrecords;
+  
+  int d_uppipe[2], d_backpipe[2];
+  int d_outstanding;
+  unsigned int d_numworkers;
+  vector<pthread_t> d_tids;
+  static AtomicCounter s_workerid;
+  bool d_mustSign;
 };
 
 #endif
index 96327f3ca0cf68e7fc29851629e885d63a9bc94e..ab91b931a46a57f86dfeb079155ebacdf256fc04 100644 (file)
@@ -421,7 +421,9 @@ int TCPNameserver::doAXFR(const string &target, shared_ptr<DNSPacket> q, int out
   bool narrow;
   bool NSEC3Zone=false;
   
+  
   DNSSECKeeper dk;
+  bool securedZone = dk.isSecuredZone(target);
   if(dk.getNSEC3PARAM(target, &ns3pr, &narrow)) {
     NSEC3Zone=true;
     if(narrow) {
@@ -469,7 +471,6 @@ int TCPNameserver::doAXFR(const string &target, shared_ptr<DNSPacket> q, int out
     return 0;
   }
 
-
   if(!sd.db || sd.db==(DNSBackend *)-1) {
     L<<Logger::Error<<"Error determining backend for domain '"<<target<<"' trying to serve an AXFR"<<endl;
     outpacket->setRcode(RCode::ServFail);
@@ -487,12 +488,18 @@ int TCPNameserver::doAXFR(const string &target, shared_ptr<DNSPacket> q, int out
   }
   
   UeberBackend signatureDB; 
-  ChunkedSigningPipe csp(dk, signatureDB, target);
   
-  DNSResourceRecord soa = makeDNSRRFromSOAData(sd);
-  csp.submit(soa); // an AXFR always starts with the SOA
+  // SOA *must* go out first, our signing pipe might reorder
   DLOG(L<<"Sending out SOA"<<endl);
+  DNSResourceRecord soa = makeDNSRRFromSOAData(sd);
+  outpacket->addRecord(soa);
+  if(securedZone)
+    addRRSigs(dk, signatureDB, target, outpacket->getRRS());
   
+  sendPacket(outpacket, outsock);
+  outpacket = getFreshAXFRPacket(q);
+  
+  ChunkedSigningPipe csp(dk, signatureDB, target, securedZone, ::arg().asNum("signing-threads"));
   
   typedef map<string, NSECXEntry, CanonicalCompare> nsecxrepo_t;
   nsecxrepo_t nsecxrepo;
@@ -597,12 +604,9 @@ int TCPNameserver::doAXFR(const string &target, shared_ptr<DNSPacket> q, int out
     outpacket=getFreshAXFRPacket(q);
   }
   
-  
   DLOG(L<<"Done writing out records"<<endl);
   /* and terminate with yet again the SOA record */
   outpacket=getFreshAXFRPacket(q);
-  
-  
   outpacket->addRecord(soa);
   sendPacket(outpacket, outsock);
   DLOG(L<<"last packet - close"<<endl);