]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/signingpipe.cc
Merge pull request #1685 from cmouse/catch-pdnsexception-signingpipe
[thirdparty/pdns.git] / pdns / signingpipe.cc
CommitLineData
8e9b7d99 1#include "signingpipe.hh"
a6ef6f7a
BH
2#include "misc.hh"
3#include <poll.h>
a2aaa807 4#include <boost/foreach.hpp>
a6ef6f7a
BH
5#include <sys/socket.h>
6#include <netinet/in.h>
7#include <netinet/tcp.h>
8#include <sched.h>
8e9b7d99 9
8d59e8ce
BH
10// deal with partial reads
11namespace {
12int readn(int fd, void* buffer, unsigned int len)
13{
14 unsigned int pos=0;
15 int res;
16 for(;;) {
17 res = read(fd, (char*)buffer + pos, len - pos);
18 if(res == 0) {
19 if(pos)
20 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
21 else {
7b217f2e 22 //cerr<<"Got decent EOF on "<<fd<<endl;
8d59e8ce
BH
23 return 0;
24 }
25 }
26
27 if(res < 0) {
28 if(errno == EAGAIN || errno == EINTR) {
29 if(pos==0)
30 return -1;
31 waitForData(fd, -1);
32 continue;
33 }
34 unixDie("Reading from socket in Signing Pipe loop");
35 }
36
37 pos+=res;
38 if(pos == len)
39 break;
40 }
41 return len;
42}
43}
44
45
46// used to pass information to the new thread
a2aaa807
BH
47struct StartHelperStruct
48{
a6ef6f7a 49 StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
a2aaa807
BH
50 ChunkedSigningPipe* d_csp;
51 int d_id;
a6ef6f7a 52 int d_fd;
a2aaa807 53};
8267bd2c 54
a67dd0cf 55// used to launch the new thread
8267bd2c 56void* ChunkedSigningPipe::helperWorker(void* p)
bec14a20 57try
8267bd2c 58{
a2aaa807
BH
59 StartHelperStruct shs=*(StartHelperStruct*)p;
60 delete (StartHelperStruct*)p;
61
a6ef6f7a 62 shs.d_csp->worker(shs.d_id, shs.d_fd);
8267bd2c
BH
63 return 0;
64}
2e6bbd83
AT
65catch(PDNSException& pe) {
66 L<<Logger::Error<<"Signing thread died with error "<<pe.reason<<endl;
67}
bec14a20 68catch(std::exception& e) {
a4da80fc 69 L<<Logger::Error<<"Signing thread died with error "<<e.what()<<endl;
a2aaa807 70 return 0;
bec14a20 71}
8267bd2c 72
a6ef6f7a
BH
73ChunkedSigningPipe::ChunkedSigningPipe(const std::string& signerName, bool mustSign, const pdns::string& servers, unsigned int workers)
74 : d_queued(0), d_outstanding(0), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers),
8d59e8ce 75 d_mustSign(mustSign), d_final(false), d_submitted(0)
8267bd2c 76{
a2aaa807 77 d_rrsetToSign = new rrset_t;
8d59e8ce
BH
78 d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk
79
8267bd2c
BH
80 if(!d_mustSign)
81 return;
a2aaa807 82
a2aaa807
BH
83 int fds[2];
84
8267bd2c 85 for(unsigned int n=0; n < d_numworkers; ++n) {
8d59e8ce
BH
86 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
87 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
42c235e5
PD
88 Utility::setCloseOnExec(fds[0]);
89 Utility::setCloseOnExec(fds[1]);
8d59e8ce 90 pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
a6ef6f7a 91 Utility::setNonBlocking(fds[0]);
8d59e8ce 92 d_sockets.push_back(fds[0]);
8267bd2c
BH
93 }
94}
95
96ChunkedSigningPipe::~ChunkedSigningPipe()
97{
a2aaa807 98 delete d_rrsetToSign;
8267bd2c
BH
99 if(!d_mustSign)
100 return;
a6ef6f7a
BH
101 BOOST_FOREACH(int fd, d_sockets) {
102 close(fd); // this will trigger all threads to exit
103 }
a2aaa807 104
8267bd2c 105 void* res;
a6ef6f7a
BH
106 BOOST_FOREACH(pthread_t& tid, d_tids) {
107 pthread_join(tid, &res);
108 }
7b217f2e 109 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
8267bd2c
BH
110}
111
a2f3b9ec
BH
112namespace {
113bool dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b)
114{
115 if(tie(a.content, a.ttl) < tie(b.content, b.ttl))
116 return true;
117 if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV)
118 return a.priority < b.priority;
119 return false;
120}
121
122bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b)
123{
124 if(tie(a.content, a.ttl) != tie(b.content, b.ttl))
125 return false;
126 if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV)
127 return a.priority == b.priority;
128 return true;
129}
130}
131
132void ChunkedSigningPipe::dedupRRSet()
133{
134 // our set contains contains records for one type and one name, but might not be sorted otherwise
135 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
136 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
137}
138
8e9b7d99
BH
139bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
140{
8d59e8ce
BH
141 ++d_submitted;
142 // check if we have a full RRSET to sign
a2aaa807 143 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_rrsetToSign->begin()->qname, rr.qname)))
8e9b7d99 144 {
a2f3b9ec 145 dedupRRSet();
a2aaa807 146 sendRRSetToWorker();
8e9b7d99 147 }
a2aaa807 148 d_rrsetToSign->push_back(rr);
a2f3b9ec 149 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
8e9b7d99
BH
150}
151
a6ef6f7a
BH
152pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
153{
ac10b8c6 154 vector<pollfd> pfds;
a6ef6f7a 155
8d59e8ce 156 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
a4da80fc 157 if(d_eof.count(d_sockets[n]))
ac10b8c6
BH
158 continue;
159 struct pollfd pfd;
160 memset(&pfd, 0, sizeof(pfd));
161 pfd.fd = d_sockets[n];
162 if(rd)
163 pfd.events |= POLLIN;
164 if(wr)
165 pfd.events |= POLLOUT;
166 pfds.push_back(pfd);
a6ef6f7a
BH
167 }
168
a135720d 169 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
a6ef6f7a 170 if(res < 0)
ac10b8c6 171 unixDie("polling for activity from signers, "+lexical_cast<string>(d_sockets.size()));
a6ef6f7a 172 pair<vector<int>, vector<int> > vects;
ac10b8c6 173 for(unsigned int n = 0; n < pfds.size(); ++n)
a6ef6f7a
BH
174 if(pfds[n].revents & POLLIN)
175 vects.first.push_back(pfds[n].fd);
176 else if(pfds[n].revents & POLLOUT)
177 vects.second.push_back(pfds[n].fd);
178
179 return vects;
180}
181
8d59e8ce
BH
182void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
183{
184 chunk_t::const_iterator from = signedChunk->begin();
185
186 while(from != signedChunk->end()) {
187 chunk_t& fillChunk = d_chunks.back();
188
189 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
190
191 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
192
193 d_chunks.back().insert(fillChunk.end(), from , from + fit);
194 from+=fit;
195
196 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
197 d_chunks.push_back(chunk_t());
198 }
199}
a6ef6f7a 200
a2aaa807 201void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
8267bd2c
BH
202{
203 if(!d_mustSign) {
8d59e8ce 204 addSignedToChunks(d_rrsetToSign);
a2aaa807 205 d_rrsetToSign->clear();
8267bd2c
BH
206 return;
207 }
a2aaa807 208
8d59e8ce
BH
209 if(d_final && !d_outstanding) // nothing to do!
210 return;
211
a6ef6f7a
BH
212 bool wantRead, wantWrite;
213
214 wantWrite = !d_rrsetToSign->empty();
8d59e8ce 215 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
a6ef6f7a
BH
216
217 pair<vector<int>, vector<int> > rwVect;
218
8d59e8ce 219 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
a6ef6f7a
BH
220
221 if(wantWrite && !rwVect.second.empty()) {
8d59e8ce 222 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
a6ef6f7a 223 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
a2aaa807 224 d_rrsetToSign = new rrset_t;
8267bd2c 225 d_outstanding++;
a2aaa807 226 d_queued++;
8d59e8ce
BH
227 wantWrite=false;
228 }
229
230 if(wantRead) {
231 while(d_outstanding) {
232 chunk_t* chunk;
a6ef6f7a 233
8d59e8ce
BH
234 BOOST_FOREACH(int fd, rwVect.first) {
235 if(d_eof.count(fd))
236 continue;
237
238 while(d_outstanding) {
239 int res = readn(fd, &chunk, sizeof(chunk));
240 if(!res) {
241 d_eof.insert(fd);
242 break;
243 }
244 if(res < 0) {
245 if(errno != EAGAIN && errno != EINTR)
246 unixDie("Error reading signed chunk from thread");
247 else
248 break;
249 }
250
251 --d_outstanding;
252
253 addSignedToChunks(chunk);
254
255 delete chunk;
256 }
257 }
258 if(!d_outstanding || !d_final)
a6ef6f7a 259 break;
8d59e8ce 260 rwVect = waitForRW(1, 0, -1); // wait for something to happen
a6ef6f7a 261 }
8267bd2c 262 }
a6ef6f7a 263
8d59e8ce
BH
264 if(wantWrite) { // our optimization above failed, we now wait synchronously
265 rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
266 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
267 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
268 d_rrsetToSign = new rrset_t;
269 d_outstanding++;
270 d_queued++;
a6ef6f7a
BH
271 }
272
8267bd2c
BH
273}
274
a2aaa807
BH
275unsigned int ChunkedSigningPipe::getReady()
276{
277 unsigned int sum=0;
278 BOOST_FOREACH(const std::vector<DNSResourceRecord>& v, d_chunks) {
279 sum += v.size();
280 }
281 return sum;
282}
a6ef6f7a
BH
283void ChunkedSigningPipe::worker(int id, int fd)
284try
8267bd2c 285{
8267bd2c 286 DNSSECKeeper dk;
a6ef6f7a
BH
287 UeberBackend db("key-only");
288
289 chunk_t* chunk;
8267bd2c
BH
290 int res;
291 for(;;) {
a6ef6f7a
BH
292 res = readn(fd, &chunk, sizeof(chunk));
293 if(!res)
8267bd2c 294 break;
a6ef6f7a
BH
295 if(res < 0)
296 unixDie("reading object pointer to sign from pdns");
8d3cbffa
BH
297 set<string, CIStringCompare> authSet;
298 authSet.insert(d_signer);
299 addRRSigs(dk, db, authSet, *chunk);
a6ef6f7a
BH
300 ++d_signed;
301
302 writen2(fd, &chunk, sizeof(chunk));
8267bd2c 303 }
a6ef6f7a
BH
304 close(fd);
305}
18e7758c 306catch(std::exception& e)
a6ef6f7a 307{
a4da80fc 308 L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
a6ef6f7a 309 close(fd);
8267bd2c
BH
310}
311
8e9b7d99
BH
312void ChunkedSigningPipe::flushToSign()
313{
a2aaa807
BH
314 sendRRSetToWorker();
315 d_rrsetToSign->clear();
8e9b7d99
BH
316}
317
318vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
319{
a6ef6f7a
BH
320 if(final && !d_final) {
321 // this means we should keep on reading until d_outstanding == 0
322 d_final = true;
8e9b7d99 323 flushToSign();
a6ef6f7a
BH
324
325 BOOST_FOREACH(int fd, d_sockets) {
326 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
7b217f2e 327 //cerr<<"shutdown of "<<fd<<endl;
a6ef6f7a 328 }
8267bd2c 329 }
a6ef6f7a
BH
330 if(d_final)
331 flushToSign(); // should help us wait
a2aaa807
BH
332 vector<DNSResourceRecord> front=d_chunks.front();
333 d_chunks.pop_front();
334 if(d_chunks.empty())
335 d_chunks.push_back(vector<DNSResourceRecord>());
e125fc69 336/* if(d_final && front.empty())
337 cerr<<"getChunk returning empty in final"<<endl; */
a2aaa807 338 return front;
8e9b7d99 339}
8d59e8ce
BH
340
341#if 0
342
343 ServiceTuple st;
344 ComboAddress remote;
345 if(!servers.empty()) {
346 st.port=2000;
347 parseService(servers, st);
348 remote=ComboAddress(st.host, st.port);
349 }
350
351 ///
352 if(!servers.empty()) {
353 fds[0] = socket(AF_INET, SOCK_STREAM, 0);
354 fds[1] = -1;
355
356 if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
357 unixDie("Connecting to signing server");
358 }
359 else {
360/////
361 signal(SIGCHLD, SIG_IGN);
362 if(!fork()) { // child
363 dup2(fds[1], 0);
364 execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL);
365 // helperWorker(new StartHelperStruct(this, n));
366 return;
367 }
368 else
369 close(fds[1]);
370#endif
371
372#if 0
373bool readLStringFromSocket(int fd, string& msg)
374{
375 msg.clear();
376 uint32_t len;
377 if(!readn(fd, &len, sizeof(len)))
378 return false;
379
380 len = ntohl(len);
381
382 scoped_array<char> buf(new char[len]);
383 readn(fd, buf.get(), len);
384
385 msg.assign(buf.get(), len);
386 return true;
387}
388void writeLStringToSocket(int fd, const string& msg)
389{
390 string realmsg;
391 uint32_t len = htonl(msg.length());
392 string tot((char*)&len, 4);
393 tot+=msg;
394
395 writen2(fd, tot.c_str(), tot.length());
396}
397
398#endif
399