::arg().set("loglevel","Amount of logging. Higher is more. Do not set below 3")="4";
::arg().set("default-soa-name","name to insert in the SOA record if none set in the backend")="a.misconfigured.powerdns.server";
::arg().set("distributor-threads","Default number of Distributor (backend) threads to start")="3";
+ ::arg().set("signing-threads","Default number of signer threads to start")="3";
::arg().set("receiver-threads","Default number of Distributor (backend) threads to start")="1";
::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500";
::arg().set("recursor","If recursion is desired, IP address of a recursing nameserver")="no";
#include "signingpipe.hh"
+AtomicCounter ChunkedSigningPipe::s_workerid;
+
+void* ChunkedSigningPipe::helperWorker(void* p)
+{
+ ChunkedSigningPipe* us = (ChunkedSigningPipe*)p;
+ us->worker();
+ return 0;
+}
+
+ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers)
+ : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100), d_outstanding(0), d_numworkers(workers), d_tids(d_numworkers),
+ d_mustSign(mustSign)
+{
+ if(!d_mustSign)
+ return;
+ if(pipe(d_uppipe) < 0 || pipe(d_backpipe))
+ throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe");
+
+ Utility::setNonBlocking(d_backpipe[0]);
+ for(unsigned int n=0; n < d_numworkers; ++n) {
+ pthread_create(&d_tids[n], 0, helperWorker, (void*) this);
+ }
+}
+
+ChunkedSigningPipe::~ChunkedSigningPipe()
+{
+ if(!d_mustSign)
+ return;
+ close(d_uppipe[1]); // this will trigger all threads to exit
+ void* res;
+ for(unsigned int n = 0; n < d_numworkers; ++n)
+ pthread_join(d_tids[n], &res);
+
+ close(d_backpipe[1]);
+ close(d_backpipe[0]);
+ close(d_uppipe[0]);
+}
+
bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
{
if(!d_toSign.empty() && (d_toSign.begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_toSign.begin()->qname, rr.qname)))
return d_chunk.size() > d_chunkrecords;
}
+void ChunkedSigningPipe::sendChunkToSign()
+{
+ if(!d_mustSign) {
+ copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk));
+ d_toSign.clear();
+ return;
+ }
+ if(!d_toSign.empty()) {
+ chunk_t* toSign = new chunk_t(d_toSign);
+
+ if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign))
+ throw runtime_error("Partial write or error communicating to signing thread");
+ d_outstanding++;
+ }
+ chunk_t* signedChunk;
+
+ while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) {
+ --d_outstanding;
+ copy(signedChunk->begin(), signedChunk->end(), back_inserter(d_chunk));
+ delete signedChunk;
+ }
+
+ d_toSign.clear();
+}
+
+void ChunkedSigningPipe::worker()
+{
+ //int my_id = ++s_workerid;
+ // cout<<my_id<<" worker reporting!"<<endl;
+ chunk_t* chunk;
+
+ DNSSECKeeper dk;
+ int res;
+ for(;;) {
+ res=read(d_uppipe[0], &chunk, sizeof(chunk));
+ if(!res) {
+ // cerr<<my_id<<" exiting"<<endl;
+ break;
+ }
+ if(res != sizeof(chunk))
+ unixDie("error or partial read from ChunkedSigningPipe main thread");
+ // cout<< my_id <<" worker signing!"<<endl;
+ addRRSigs(dk, d_db, d_signer, *chunk); // should start returning sigs separately instead of interleaved
+ if(write(d_backpipe[1], &chunk, sizeof(chunk)) != sizeof(chunk))
+ unixDie("error writing back to ChunkedSigningPipe");
+ }
+}
+
void ChunkedSigningPipe::flushToSign()
{
- addRRSigs(d_dk, d_db, d_signer, d_toSign); // should start returning sigs separately instead of interleaved
- copy(d_toSign.begin(), d_toSign.end(), back_inserter(d_chunk));
+ sendChunkToSign();
d_toSign.clear();
}
vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
{
- if(final)
+ if(final) {
+ Utility::setBlocking(d_backpipe[0]);
flushToSign();
-
+ }
chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size());
chunk_t chunk;
#ifndef PDNS_SIGNINGPIPE
#define PDNS_SIGNINGPIPE
#include <vector>
+#include <pthread.h>
#include "dnsseckeeper.hh"
#include "dns.hh"
using std::string;
public:
typedef vector<DNSResourceRecord> chunk_t;
- ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName) : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100) {}
+ ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int numWorkers=3);
+ ~ChunkedSigningPipe();
bool submit(const DNSResourceRecord& rr);
chunk_t getChunk(bool final=false);
private:
void flushToSign();
-
+
+ void sendChunkToSign(); // dispatch chunk to worker
+ void worker();
+
+ static void* helperWorker(void* p);
chunk_t d_toSign, d_chunk;
DNSSECKeeper& d_dk;
UeberBackend& d_db;
string d_signer;
chunk_t::size_type d_chunkrecords;
+
+ int d_uppipe[2], d_backpipe[2];
+ int d_outstanding;
+ unsigned int d_numworkers;
+ vector<pthread_t> d_tids;
+ static AtomicCounter s_workerid;
+ bool d_mustSign;
};
#endif
bool narrow;
bool NSEC3Zone=false;
+
DNSSECKeeper dk;
+ bool securedZone = dk.isSecuredZone(target);
if(dk.getNSEC3PARAM(target, &ns3pr, &narrow)) {
NSEC3Zone=true;
if(narrow) {
return 0;
}
-
if(!sd.db || sd.db==(DNSBackend *)-1) {
L<<Logger::Error<<"Error determining backend for domain '"<<target<<"' trying to serve an AXFR"<<endl;
outpacket->setRcode(RCode::ServFail);
}
UeberBackend signatureDB;
- ChunkedSigningPipe csp(dk, signatureDB, target);
- DNSResourceRecord soa = makeDNSRRFromSOAData(sd);
- csp.submit(soa); // an AXFR always starts with the SOA
+ // SOA *must* go out first, our signing pipe might reorder
DLOG(L<<"Sending out SOA"<<endl);
+ DNSResourceRecord soa = makeDNSRRFromSOAData(sd);
+ outpacket->addRecord(soa);
+ if(securedZone)
+ addRRSigs(dk, signatureDB, target, outpacket->getRRS());
+ sendPacket(outpacket, outsock);
+ outpacket = getFreshAXFRPacket(q);
+
+ ChunkedSigningPipe csp(dk, signatureDB, target, securedZone, ::arg().asNum("signing-threads"));
typedef map<string, NSECXEntry, CanonicalCompare> nsecxrepo_t;
nsecxrepo_t nsecxrepo;
outpacket=getFreshAXFRPacket(q);
}
-
DLOG(L<<"Done writing out records"<<endl);
/* and terminate with yet again the SOA record */
outpacket=getFreshAXFRPacket(q);
-
-
outpacket->addRecord(soa);
sendPacket(outpacket, outsock);
DLOG(L<<"last packet - close"<<endl);