1 #include "signingpipe.hh"
4 #include <boost/foreach.hpp>
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7 #include <netinet/tcp.h>
10 // deal with partial reads
12 int readn(int fd, void* buffer, unsigned int len)
17 res = read(fd, (char*)buffer + pos, len - pos);
20 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
22 //cerr<<"Got decent EOF on "<<fd<<endl;
28 if(errno == EAGAIN || errno == EINTR) {
34 unixDie("Reading from socket in Signing Pipe loop");
46 // used to pass information to the new thread
47 struct StartHelperStruct
49 StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
50 ChunkedSigningPipe* d_csp;
55 // used to launcht the new thread
56 void* ChunkedSigningPipe::helperWorker(void* p)
59 StartHelperStruct shs=*(StartHelperStruct*)p;
60 delete (StartHelperStruct*)p;
62 shs.d_csp->worker(shs.d_id, shs.d_fd);
65 catch(std::exception& e) {
66 L<<Logger::Error<<"Signing thread died with error "<<e.what()<<endl;
70 ChunkedSigningPipe::ChunkedSigningPipe(const std::string& signerName, bool mustSign, const pdns::string& servers, unsigned int workers)
71 : d_queued(0), d_outstanding(0), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers),
72 d_mustSign(mustSign), d_final(false), d_submitted(0)
74 d_rrsetToSign = new rrset_t;
75 d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk
82 for(unsigned int n=0; n < d_numworkers; ++n) {
83 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
84 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
85 Utility::setCloseOnExec(fds[0]);
86 Utility::setCloseOnExec(fds[1]);
87 pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
88 Utility::setNonBlocking(fds[0]);
89 d_sockets.push_back(fds[0]);
93 ChunkedSigningPipe::~ChunkedSigningPipe()
98 BOOST_FOREACH(int fd, d_sockets) {
99 close(fd); // this will trigger all threads to exit
103 BOOST_FOREACH(pthread_t& tid, d_tids) {
104 pthread_join(tid, &res);
106 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
110 bool dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b)
112 if(tie(a.content, a.ttl) < tie(b.content, b.ttl))
114 if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV)
115 return a.priority < b.priority;
119 bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b)
121 if(tie(a.content, a.ttl) != tie(b.content, b.ttl))
123 if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV)
124 return a.priority == b.priority;
129 void ChunkedSigningPipe::dedupRRSet()
131 // our set contains contains records for one type and one name, but might not be sorted otherwise
132 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
133 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
136 bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
139 // check if we have a full RRSET to sign
140 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_rrsetToSign->begin()->qname, rr.qname)))
145 d_rrsetToSign->push_back(rr);
146 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
149 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
153 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
154 if(d_eof.count(d_sockets[n]))
157 memset(&pfd, 0, sizeof(pfd));
158 pfd.fd = d_sockets[n];
160 pfd.events |= POLLIN;
162 pfd.events |= POLLOUT;
166 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
168 unixDie("polling for activity from signers, "+lexical_cast<string>(d_sockets.size()));
169 pair<vector<int>, vector<int> > vects;
170 for(unsigned int n = 0; n < pfds.size(); ++n)
171 if(pfds[n].revents & POLLIN)
172 vects.first.push_back(pfds[n].fd);
173 else if(pfds[n].revents & POLLOUT)
174 vects.second.push_back(pfds[n].fd);
179 void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
181 chunk_t::const_iterator from = signedChunk->begin();
183 while(from != signedChunk->end()) {
184 chunk_t& fillChunk = d_chunks.back();
186 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
188 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
190 d_chunks.back().insert(fillChunk.end(), from , from + fit);
193 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
194 d_chunks.push_back(chunk_t());
198 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
201 addSignedToChunks(d_rrsetToSign);
202 d_rrsetToSign->clear();
206 if(d_final && !d_outstanding) // nothing to do!
209 bool wantRead, wantWrite;
211 wantWrite = !d_rrsetToSign->empty();
212 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
214 pair<vector<int>, vector<int> > rwVect;
216 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
218 if(wantWrite && !rwVect.second.empty()) {
219 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
220 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
221 d_rrsetToSign = new rrset_t;
228 while(d_outstanding) {
231 BOOST_FOREACH(int fd, rwVect.first) {
235 while(d_outstanding) {
236 int res = readn(fd, &chunk, sizeof(chunk));
242 if(errno != EAGAIN && errno != EINTR)
243 unixDie("Error reading signed chunk from thread");
250 addSignedToChunks(chunk);
255 if(!d_outstanding || !d_final)
257 rwVect = waitForRW(1, 0, -1); // wait for something to happen
261 if(wantWrite) { // our optimization above failed, we now wait synchronously
262 rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
263 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
264 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
265 d_rrsetToSign = new rrset_t;
272 unsigned int ChunkedSigningPipe::getReady()
275 BOOST_FOREACH(const std::vector<DNSResourceRecord>& v, d_chunks) {
280 void ChunkedSigningPipe::worker(int id, int fd)
284 UeberBackend db("key-only");
289 res = readn(fd, &chunk, sizeof(chunk));
293 unixDie("reading object pointer to sign from pdns");
294 set<string, CIStringCompare> authSet;
295 authSet.insert(d_signer);
296 addRRSigs(dk, db, authSet, *chunk);
299 writen2(fd, &chunk, sizeof(chunk));
303 catch(std::exception& e)
305 L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
309 void ChunkedSigningPipe::flushToSign()
312 d_rrsetToSign->clear();
315 vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
317 if(final && !d_final) {
318 // this means we should keep on reading until d_outstanding == 0
322 BOOST_FOREACH(int fd, d_sockets) {
323 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
324 //cerr<<"shutdown of "<<fd<<endl;
328 flushToSign(); // should help us wait
329 vector<DNSResourceRecord> front=d_chunks.front();
330 d_chunks.pop_front();
332 d_chunks.push_back(vector<DNSResourceRecord>());
333 if(d_final && front.empty())
334 ; // cerr<<"getChunk returning empty in final"<<endl;
342 if(!servers.empty()) {
344 parseService(servers, st);
345 remote=ComboAddress(st.host, st.port);
349 if(!servers.empty()) {
350 fds[0] = socket(AF_INET, SOCK_STREAM, 0);
353 if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
354 unixDie("Connecting to signing server");
358 signal(SIGCHLD, SIG_IGN);
359 if(!fork()) { // child
361 execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL);
362 // helperWorker(new StartHelperStruct(this, n));
370 bool readLStringFromSocket(int fd, string& msg)
374 if(!readn(fd, &len, sizeof(len)))
379 scoped_array<char> buf(new char[len]);
380 readn(fd, buf.get(), len);
382 msg.assign(buf.get(), len);
385 void writeLStringToSocket(int fd, const string& msg)
388 uint32_t len = htonl(msg.length());
389 string tot((char*)&len, 4);
392 writen2(fd, tot.c_str(), tot.length());