#include "signingpipe.hh"
#include "misc.hh"
#include <poll.h>
-#include <boost/foreach.hpp>
+
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
}
}
-
-// used to pass information to the new thread
-struct StartHelperStruct
-{
- StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
- ChunkedSigningPipe* d_csp;
- int d_id;
- int d_fd;
-};
-
-// used to launch the new thread
-void* ChunkedSigningPipe::helperWorker(void* p)
-try
-{
- StartHelperStruct shs=*(StartHelperStruct*)p;
- delete (StartHelperStruct*)p;
-
- shs.d_csp->worker(shs.d_id, shs.d_fd);
- return 0;
+void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
+try {
+ csp->worker(fd);
+ return nullptr;
}
catch(...) {
- L<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
- return 0;
+ g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
+ return nullptr;
}
-ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
- : d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
- d_maxchunkrecords(100), d_tids(d_numworkers), d_mustSign(mustSign), d_final(false)
+ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
+ : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
+ d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
{
d_rrsetToSign = new rrset_t;
- d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk
+ d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
if(!d_mustSign)
return;
throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
setCloseOnExec(fds[0]);
setCloseOnExec(fds[1]);
- pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
+ d_threads[n] = std::thread(helperWorker, this, fds[1]);
setNonBlocking(fds[0]);
d_sockets.push_back(fds[0]);
+ d_outstandings[fds[0]] = 0;
}
}
ChunkedSigningPipe::~ChunkedSigningPipe()
{
delete d_rrsetToSign;
+
if(!d_mustSign)
return;
+
for(int fd : d_sockets) {
close(fd); // this will trigger all threads to exit
}
-
- void* res;
- for(pthread_t& tid : d_tids) {
- pthread_join(tid, &res);
+
+ for(auto& thread : d_threads) {
+ thread.join();
}
//cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
}
namespace {
bool
-dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b)
+dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
{
- return (tie(a.content, a.ttl) < tie(b.content, b.ttl));
+ return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
}
-bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b)
+bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
{
- return(tie(a.content, a.ttl) == tie(b.content, b.ttl));
+ return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
}
}
d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
}
-bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
+bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
{
++d_submitted;
// check if we have a full RRSET to sign
- if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || d_rrsetToSign->begin()->qname != rr.qname))
+ if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
{
dedupRRSet();
sendRRSetToWorker();
pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
{
vector<pollfd> pfds;
-
+
for(unsigned int n = 0; n < d_sockets.size(); ++n) {
if(d_eof.count(d_sockets[n]))
continue;
pfd.events |= POLLOUT;
pfds.push_back(pfd);
}
-
+
int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
if(res < 0)
- unixDie("polling for activity from signers, "+lexical_cast<string>(d_sockets.size()));
+ unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
pair<vector<int>, vector<int> > vects;
for(unsigned int n = 0; n < pfds.size(); ++n)
if(pfds[n].revents & POLLIN)
pair<vector<int>, vector<int> > rwVect;
- rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
+ rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
if(wantWrite && !rwVect.second.empty()) {
random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
d_rrsetToSign = new rrset_t;
+ d_outstandings[*rwVect.second.begin()]++;
d_outstanding++;
d_queued++;
wantWrite=false;
while(d_outstanding) {
int res = readn(fd, &chunk, sizeof(chunk));
if(!res) {
+ if (d_outstandings[fd] > 0) {
+ throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
+ }
d_eof.insert(fd);
break;
}
}
--d_outstanding;
+ d_outstandings[fd]--;
addSignedToChunks(chunk);
}
if(!d_outstanding || !d_final)
break;
- rwVect = waitForRW(1, 0, -1); // wait for something to happen
+ rwVect = waitForRW(true, false, -1); // wait for something to happen
}
}
if(wantWrite) { // our optimization above failed, we now wait synchronously
- rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
+ rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
d_rrsetToSign = new rrset_t;
+ d_outstandings[*rwVect.second.begin()]++;
d_outstanding++;
d_queued++;
}
}
-unsigned int ChunkedSigningPipe::getReady()
+unsigned int ChunkedSigningPipe::getReady() const
{
unsigned int sum=0;
- for(const std::vector<DNSResourceRecord>& v : d_chunks) {
+ for(const auto& v : d_chunks) {
sum += v.size();
}
return sum;
}
-void ChunkedSigningPipe::worker(int id, int fd)
+
+void ChunkedSigningPipe::worker(int fd)
try
{
- DNSSECKeeper dk;
UeberBackend db("key-only");
+ DNSSECKeeper dk(&db);
- chunk_t* chunk;
+ chunk_t* chunk = nullptr;
int res;
for(;;) {
res = readn(fd, &chunk, sizeof(chunk));
break;
if(res < 0)
unixDie("reading object pointer to sign from pdns");
- set<DNSName> authSet;
- authSet.insert(d_signer);
- addRRSigs(dk, db, authSet, *chunk);
- ++d_signed;
-
- writen2(fd, &chunk, sizeof(chunk));
+ try {
+ set<DNSName> authSet;
+ authSet.insert(d_signer);
+ addRRSigs(dk, db, authSet, *chunk);
+ ++d_signed;
+
+ writen2(fd, &chunk, sizeof(chunk));
+ chunk = nullptr;
+ }
+ catch(const PDNSException& pe) {
+ delete chunk;
+ throw;
+ }
+ catch(const std::exception& e) {
+ delete chunk;
+ throw;
+ }
}
close(fd);
}
-catch(PDNSException& pe)
+catch(const PDNSException& pe)
{
- L<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
+ g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
close(fd);
}
-catch(std::exception& e)
+catch(const std::exception& e)
{
- L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
+ g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
close(fd);
}
d_rrsetToSign->clear();
}
-vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
+vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
{
if(final && !d_final) {
// this means we should keep on reading until d_outstanding == 0
}
if(d_final)
flushToSign(); // should help us wait
- vector<DNSResourceRecord> front=d_chunks.front();
+ vector<DNSZoneRecord> front=d_chunks.front();
d_chunks.pop_front();
if(d_chunks.empty())
- d_chunks.push_back(vector<DNSResourceRecord>());
+ d_chunks.push_back(vector<DNSZoneRecord>());
/* if(d_final && front.empty())
cerr<<"getChunk returning empty in final"<<endl; */
return front;
}
-#if 0
-
- ServiceTuple st;
- ComboAddress remote;
- if(!servers.empty()) {
- st.port=2000;
- parseService(servers, st);
- remote=ComboAddress(st.host, st.port);
- }
-
- ///
- if(!servers.empty()) {
- fds[0] = socket(AF_INET, SOCK_STREAM, 0);
- fds[1] = -1;
-
- if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
- unixDie("Connecting to signing server");
- }
- else {
-/////
- signal(SIGCHLD, SIG_IGN);
- if(!fork()) { // child
- dup2(fds[1], 0);
- execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL);
- // helperWorker(new StartHelperStruct(this, n));
- return;
- }
- else
- close(fds[1]);
-#endif
-
-#if 0
-bool readLStringFromSocket(int fd, string& msg)
-{
- msg.clear();
- uint32_t len;
- if(!readn(fd, &len, sizeof(len)))
- return false;
-
- len = ntohl(len);
-
- scoped_array<char> buf(new char[len]);
- readn(fd, buf.get(), len);
-
- msg.assign(buf.get(), len);
- return true;
-}
-void writeLStringToSocket(int fd, const string& msg)
-{
- string realmsg;
- uint32_t len = htonl(msg.length());
- string tot((char*)&len, 4);
- tot+=msg;
-
- writen2(fd, tot.c_str(), tot.length());
-}
-
-#endif