]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/signingpipe.cc
snap
[thirdparty/pdns.git] / pdns / signingpipe.cc
CommitLineData
870a0fe4
AT
1#ifdef HAVE_CONFIG_H
2#include "config.h"
3#endif
8e9b7d99 4#include "signingpipe.hh"
a6ef6f7a
BH
5#include "misc.hh"
6#include <poll.h>
a2aaa807 7#include <boost/foreach.hpp>
a6ef6f7a
BH
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <netinet/tcp.h>
11#include <sched.h>
8e9b7d99 12
8d59e8ce
BH
13// deal with partial reads
14namespace {
15int readn(int fd, void* buffer, unsigned int len)
16{
17 unsigned int pos=0;
18 int res;
19 for(;;) {
20 res = read(fd, (char*)buffer + pos, len - pos);
21 if(res == 0) {
22 if(pos)
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
24 else {
7b217f2e 25 //cerr<<"Got decent EOF on "<<fd<<endl;
8d59e8ce
BH
26 return 0;
27 }
28 }
29
30 if(res < 0) {
31 if(errno == EAGAIN || errno == EINTR) {
32 if(pos==0)
33 return -1;
34 waitForData(fd, -1);
35 continue;
36 }
37 unixDie("Reading from socket in Signing Pipe loop");
38 }
39
40 pos+=res;
41 if(pos == len)
42 break;
43 }
44 return len;
45}
46}
47
48
49// used to pass information to the new thread
a2aaa807
BH
50struct StartHelperStruct
51{
a6ef6f7a 52 StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
a2aaa807
BH
53 ChunkedSigningPipe* d_csp;
54 int d_id;
a6ef6f7a 55 int d_fd;
a2aaa807 56};
8267bd2c 57
a67dd0cf 58// used to launch the new thread
8267bd2c 59void* ChunkedSigningPipe::helperWorker(void* p)
bec14a20 60try
8267bd2c 61{
a2aaa807
BH
62 StartHelperStruct shs=*(StartHelperStruct*)p;
63 delete (StartHelperStruct*)p;
64
a6ef6f7a 65 shs.d_csp->worker(shs.d_id, shs.d_fd);
8267bd2c
BH
66 return 0;
67}
97f1f8d9 68catch(...) {
c057bfaa 69 L<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
a2aaa807 70 return 0;
bec14a20 71}
8267bd2c 72
bab008e2 73ChunkedSigningPipe::ChunkedSigningPipe(const string& signerName, bool mustSign, const string& servers, unsigned int workers)
a6ef6f7a 74 : d_queued(0), d_outstanding(0), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers),
8d59e8ce 75 d_mustSign(mustSign), d_final(false), d_submitted(0)
8267bd2c 76{
a2aaa807 77 d_rrsetToSign = new rrset_t;
8d59e8ce
BH
78 d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk
79
8267bd2c
BH
80 if(!d_mustSign)
81 return;
a2aaa807 82
a2aaa807
BH
83 int fds[2];
84
8267bd2c 85 for(unsigned int n=0; n < d_numworkers; ++n) {
8d59e8ce
BH
86 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
87 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
3897b9e1 88 setCloseOnExec(fds[0]);
89 setCloseOnExec(fds[1]);
8d59e8ce 90 pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
3897b9e1 91 setNonBlocking(fds[0]);
8d59e8ce 92 d_sockets.push_back(fds[0]);
8267bd2c
BH
93 }
94}
95
96ChunkedSigningPipe::~ChunkedSigningPipe()
97{
a2aaa807 98 delete d_rrsetToSign;
8267bd2c
BH
99 if(!d_mustSign)
100 return;
a6ef6f7a
BH
101 BOOST_FOREACH(int fd, d_sockets) {
102 close(fd); // this will trigger all threads to exit
103 }
a2aaa807 104
8267bd2c 105 void* res;
a6ef6f7a
BH
106 BOOST_FOREACH(pthread_t& tid, d_tids) {
107 pthread_join(tid, &res);
108 }
7b217f2e 109 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
8267bd2c
BH
110}
111
a2f3b9ec 112namespace {
b9bafae0
KM
113bool
114dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b)
a2f3b9ec 115{
b9bafae0 116 return (tie(a.content, a.ttl) < tie(b.content, b.ttl));
a2f3b9ec
BH
117}
118
119bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b)
120{
b9bafae0 121 return(tie(a.content, a.ttl) == tie(b.content, b.ttl));
a2f3b9ec
BH
122}
123}
124
125void ChunkedSigningPipe::dedupRRSet()
126{
127 // our set contains contains records for one type and one name, but might not be sorted otherwise
128 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
129 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
130}
131
8e9b7d99
BH
132bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
133{
8d59e8ce
BH
134 ++d_submitted;
135 // check if we have a full RRSET to sign
675fa24c 136 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || d_rrsetToSign->begin()->qname != rr.qname))
8e9b7d99 137 {
a2f3b9ec 138 dedupRRSet();
a2aaa807 139 sendRRSetToWorker();
8e9b7d99 140 }
a2aaa807 141 d_rrsetToSign->push_back(rr);
a2f3b9ec 142 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
8e9b7d99
BH
143}
144
a6ef6f7a
BH
145pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
146{
ac10b8c6 147 vector<pollfd> pfds;
a6ef6f7a 148
8d59e8ce 149 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
a4da80fc 150 if(d_eof.count(d_sockets[n]))
ac10b8c6
BH
151 continue;
152 struct pollfd pfd;
153 memset(&pfd, 0, sizeof(pfd));
154 pfd.fd = d_sockets[n];
155 if(rd)
156 pfd.events |= POLLIN;
157 if(wr)
158 pfd.events |= POLLOUT;
159 pfds.push_back(pfd);
a6ef6f7a
BH
160 }
161
a135720d 162 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
a6ef6f7a 163 if(res < 0)
ac10b8c6 164 unixDie("polling for activity from signers, "+lexical_cast<string>(d_sockets.size()));
a6ef6f7a 165 pair<vector<int>, vector<int> > vects;
ac10b8c6 166 for(unsigned int n = 0; n < pfds.size(); ++n)
a6ef6f7a
BH
167 if(pfds[n].revents & POLLIN)
168 vects.first.push_back(pfds[n].fd);
169 else if(pfds[n].revents & POLLOUT)
170 vects.second.push_back(pfds[n].fd);
171
172 return vects;
173}
174
8d59e8ce
BH
175void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
176{
177 chunk_t::const_iterator from = signedChunk->begin();
178
179 while(from != signedChunk->end()) {
180 chunk_t& fillChunk = d_chunks.back();
181
182 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
183
184 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
185
186 d_chunks.back().insert(fillChunk.end(), from , from + fit);
187 from+=fit;
188
189 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
190 d_chunks.push_back(chunk_t());
191 }
192}
a6ef6f7a 193
a2aaa807 194void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
8267bd2c
BH
195{
196 if(!d_mustSign) {
8d59e8ce 197 addSignedToChunks(d_rrsetToSign);
a2aaa807 198 d_rrsetToSign->clear();
8267bd2c
BH
199 return;
200 }
a2aaa807 201
8d59e8ce
BH
202 if(d_final && !d_outstanding) // nothing to do!
203 return;
204
a6ef6f7a
BH
205 bool wantRead, wantWrite;
206
207 wantWrite = !d_rrsetToSign->empty();
8d59e8ce 208 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
a6ef6f7a
BH
209
210 pair<vector<int>, vector<int> > rwVect;
211
8d59e8ce 212 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
a6ef6f7a
BH
213
214 if(wantWrite && !rwVect.second.empty()) {
8d59e8ce 215 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
a6ef6f7a 216 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
a2aaa807 217 d_rrsetToSign = new rrset_t;
8267bd2c 218 d_outstanding++;
a2aaa807 219 d_queued++;
8d59e8ce
BH
220 wantWrite=false;
221 }
222
223 if(wantRead) {
224 while(d_outstanding) {
225 chunk_t* chunk;
a6ef6f7a 226
8d59e8ce
BH
227 BOOST_FOREACH(int fd, rwVect.first) {
228 if(d_eof.count(fd))
229 continue;
230
231 while(d_outstanding) {
232 int res = readn(fd, &chunk, sizeof(chunk));
233 if(!res) {
234 d_eof.insert(fd);
235 break;
236 }
237 if(res < 0) {
238 if(errno != EAGAIN && errno != EINTR)
239 unixDie("Error reading signed chunk from thread");
240 else
241 break;
242 }
243
244 --d_outstanding;
245
246 addSignedToChunks(chunk);
247
248 delete chunk;
249 }
250 }
251 if(!d_outstanding || !d_final)
a6ef6f7a 252 break;
8d59e8ce 253 rwVect = waitForRW(1, 0, -1); // wait for something to happen
a6ef6f7a 254 }
8267bd2c 255 }
a6ef6f7a 256
8d59e8ce
BH
257 if(wantWrite) { // our optimization above failed, we now wait synchronously
258 rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
259 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
260 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
261 d_rrsetToSign = new rrset_t;
262 d_outstanding++;
263 d_queued++;
a6ef6f7a
BH
264 }
265
8267bd2c
BH
266}
267
a2aaa807
BH
268unsigned int ChunkedSigningPipe::getReady()
269{
270 unsigned int sum=0;
271 BOOST_FOREACH(const std::vector<DNSResourceRecord>& v, d_chunks) {
272 sum += v.size();
273 }
274 return sum;
275}
a6ef6f7a
BH
276void ChunkedSigningPipe::worker(int id, int fd)
277try
8267bd2c 278{
8267bd2c 279 DNSSECKeeper dk;
a6ef6f7a
BH
280 UeberBackend db("key-only");
281
282 chunk_t* chunk;
8267bd2c
BH
283 int res;
284 for(;;) {
a6ef6f7a
BH
285 res = readn(fd, &chunk, sizeof(chunk));
286 if(!res)
8267bd2c 287 break;
a6ef6f7a
BH
288 if(res < 0)
289 unixDie("reading object pointer to sign from pdns");
8d3cbffa
BH
290 set<string, CIStringCompare> authSet;
291 authSet.insert(d_signer);
292 addRRSigs(dk, db, authSet, *chunk);
a6ef6f7a
BH
293 ++d_signed;
294
295 writen2(fd, &chunk, sizeof(chunk));
8267bd2c 296 }
a6ef6f7a
BH
297 close(fd);
298}
97f1f8d9
KM
299catch(PDNSException& pe)
300{
301 L<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
302 close(fd);
303}
18e7758c 304catch(std::exception& e)
a6ef6f7a 305{
a4da80fc 306 L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
a6ef6f7a 307 close(fd);
8267bd2c
BH
308}
309
8e9b7d99
BH
310void ChunkedSigningPipe::flushToSign()
311{
a2aaa807
BH
312 sendRRSetToWorker();
313 d_rrsetToSign->clear();
8e9b7d99
BH
314}
315
316vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
317{
a6ef6f7a
BH
318 if(final && !d_final) {
319 // this means we should keep on reading until d_outstanding == 0
320 d_final = true;
8e9b7d99 321 flushToSign();
a6ef6f7a
BH
322
323 BOOST_FOREACH(int fd, d_sockets) {
324 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
7b217f2e 325 //cerr<<"shutdown of "<<fd<<endl;
a6ef6f7a 326 }
8267bd2c 327 }
a6ef6f7a
BH
328 if(d_final)
329 flushToSign(); // should help us wait
a2aaa807
BH
330 vector<DNSResourceRecord> front=d_chunks.front();
331 d_chunks.pop_front();
332 if(d_chunks.empty())
333 d_chunks.push_back(vector<DNSResourceRecord>());
e125fc69 334/* if(d_final && front.empty())
335 cerr<<"getChunk returning empty in final"<<endl; */
a2aaa807 336 return front;
8e9b7d99 337}
8d59e8ce
BH
338
339#if 0
340
341 ServiceTuple st;
342 ComboAddress remote;
343 if(!servers.empty()) {
344 st.port=2000;
345 parseService(servers, st);
346 remote=ComboAddress(st.host, st.port);
347 }
348
349 ///
350 if(!servers.empty()) {
351 fds[0] = socket(AF_INET, SOCK_STREAM, 0);
352 fds[1] = -1;
353
354 if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
355 unixDie("Connecting to signing server");
356 }
357 else {
358/////
359 signal(SIGCHLD, SIG_IGN);
360 if(!fork()) { // child
361 dup2(fds[1], 0);
362 execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL);
363 // helperWorker(new StartHelperStruct(this, n));
364 return;
365 }
366 else
367 close(fds[1]);
368#endif
369
370#if 0
371bool readLStringFromSocket(int fd, string& msg)
372{
373 msg.clear();
374 uint32_t len;
375 if(!readn(fd, &len, sizeof(len)))
376 return false;
377
378 len = ntohl(len);
379
380 scoped_array<char> buf(new char[len]);
381 readn(fd, buf.get(), len);
382
383 msg.assign(buf.get(), len);
384 return true;
385}
386void writeLStringToSocket(int fd, const string& msg)
387{
388 string realmsg;
389 uint32_t len = htonl(msg.length());
390 string tot((char*)&len, 4);
391 tot+=msg;
392
393 writen2(fd, tot.c_str(), tot.length());
394}
395
396#endif
397