}
}
-
-// used to pass information to the new thread
-struct StartHelperStruct
-{
- StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
- ChunkedSigningPipe* d_csp;
- int d_id;
- int d_fd;
-};
-
-// used to launch the new thread
-void* ChunkedSigningPipe::helperWorker(void* p)
-try
-{
- StartHelperStruct shs=*(StartHelperStruct*)p;
- delete (StartHelperStruct*)p;
-
- shs.d_csp->worker(shs.d_id, shs.d_fd);
- return 0;
+void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
+try {
+ csp->worker(fd);
+ return nullptr;
}
catch(...) {
- L<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
- return 0;
+ g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
+ return nullptr;
}
-ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
+ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
: d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
- d_maxchunkrecords(100), d_tids(d_numworkers), d_mustSign(mustSign), d_final(false)
+ d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
{
d_rrsetToSign = new rrset_t;
d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
setCloseOnExec(fds[0]);
setCloseOnExec(fds[1]);
- pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
+ d_threads[n] = std::thread(helperWorker, this, fds[1]);
setNonBlocking(fds[0]);
d_sockets.push_back(fds[0]);
+ d_outstandings[fds[0]] = 0;
}
}
ChunkedSigningPipe::~ChunkedSigningPipe()
{
delete d_rrsetToSign;
+
if(!d_mustSign)
return;
+
for(int fd : d_sockets) {
close(fd); // this will trigger all threads to exit
}
-
- void* res;
- for(pthread_t& tid : d_tids) {
- pthread_join(tid, &res);
+
+ for(auto& thread : d_threads) {
+ thread.join();
}
//cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
}
pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
{
vector<pollfd> pfds;
-
+
for(unsigned int n = 0; n < d_sockets.size(); ++n) {
if(d_eof.count(d_sockets[n]))
continue;
pfd.events |= POLLOUT;
pfds.push_back(pfd);
}
-
+
int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
if(res < 0)
unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
pair<vector<int>, vector<int> > rwVect;
- rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
+ rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
if(wantWrite && !rwVect.second.empty()) {
random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
d_rrsetToSign = new rrset_t;
+ d_outstandings[*rwVect.second.begin()]++;
d_outstanding++;
d_queued++;
wantWrite=false;
while(d_outstanding) {
int res = readn(fd, &chunk, sizeof(chunk));
if(!res) {
+ if (d_outstandings[fd] > 0) {
+ throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
+ }
d_eof.insert(fd);
break;
}
}
--d_outstanding;
+ d_outstandings[fd]--;
addSignedToChunks(chunk);
}
if(!d_outstanding || !d_final)
break;
- rwVect = waitForRW(1, 0, -1); // wait for something to happen
+ rwVect = waitForRW(true, false, -1); // wait for something to happen
}
}
if(wantWrite) { // our optimization above failed, we now wait synchronously
- rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
+ rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
d_rrsetToSign = new rrset_t;
+ d_outstandings[*rwVect.second.begin()]++;
d_outstanding++;
d_queued++;
}
}
-unsigned int ChunkedSigningPipe::getReady()
+unsigned int ChunkedSigningPipe::getReady() const
{
unsigned int sum=0;
for(const auto& v : d_chunks) {
}
return sum;
}
-void ChunkedSigningPipe::worker(int id, int fd)
+
+void ChunkedSigningPipe::worker(int fd)
try
{
- DNSSECKeeper dk;
UeberBackend db("key-only");
+ DNSSECKeeper dk(&db);
- chunk_t* chunk;
+ chunk_t* chunk = nullptr;
int res;
for(;;) {
res = readn(fd, &chunk, sizeof(chunk));
break;
if(res < 0)
unixDie("reading object pointer to sign from pdns");
- set<DNSName> authSet;
- authSet.insert(d_signer);
- addRRSigs(dk, db, authSet, *chunk);
- ++d_signed;
-
- writen2(fd, &chunk, sizeof(chunk));
+ try {
+ set<DNSName> authSet;
+ authSet.insert(d_signer);
+ addRRSigs(dk, db, authSet, *chunk);
+ ++d_signed;
+
+ writen2(fd, &chunk, sizeof(chunk));
+ chunk = nullptr;
+ }
+ catch(const PDNSException& pe) {
+ delete chunk;
+ throw;
+ }
+ catch(const std::exception& e) {
+ delete chunk;
+ throw;
+ }
}
close(fd);
}
-catch(PDNSException& pe)
+catch(const PDNSException& pe)
{
- L<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
+ g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
close(fd);
}
-catch(std::exception& e)
+catch(const std::exception& e)
{
- L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
+ g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
close(fd);
}
return front;
}
-#if 0
-
- ServiceTuple st;
- ComboAddress remote;
- if(!servers.empty()) {
- st.port=2000;
- parseService(servers, st);
- remote=ComboAddress(st.host, st.port);
- }
-
- ///
- if(!servers.empty()) {
- fds[0] = socket(AF_INET, SOCK_STREAM, 0);
- fds[1] = -1;
-
- if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
- unixDie("Connecting to signing server");
- }
- else {
-/////
- signal(SIGCHLD, SIG_IGN);
- if(!fork()) { // child
- dup2(fds[1], 0);
- execl("./pdnsutil", "./pdnsutil", "--config-dir=./", "signing-slave", NULL);
- // helperWorker(new StartHelperStruct(this, n));
- return;
- }
- else
- close(fds[1]);
-#endif
-
-#if 0
-bool readLStringFromSocket(int fd, string& msg)
-{
- msg.clear();
- uint32_t len;
- if(!readn(fd, &len, sizeof(len)))
- return false;
-
- len = ntohl(len);
-
- scoped_array<char> buf(new char[len]);
- readn(fd, buf.get(), len);
-
- msg.assign(buf.get(), len);
- return true;
-}
-void writeLStringToSocket(int fd, const string& msg)
-{
- string realmsg;
- uint32_t len = htonl(msg.length());
- string tot((char*)&len, 4);
- tot+=msg;
-
- writen2(fd, tot.c_str(), tot.length());
-}
-
-#endif