]> git.ipfire.org Git - thirdparty/pdns.git/blob
1644498
[thirdparty/pdns.git] /
1 #include "signingpipe.hh"
2 #include "misc.hh"
3 #include <poll.h>
4 #include <boost/foreach.hpp>
5 #include <sys/socket.h>
6 #include <netinet/in.h>
7 #include <netinet/tcp.h>
8 #include <sched.h>
9
10 // deal with partial reads
11 namespace {
12 int readn(int fd, void* buffer, unsigned int len)
13 {
14 unsigned int pos=0;
15 int res;
16 for(;;) {
17 res = read(fd, (char*)buffer + pos, len - pos);
18 if(res == 0) {
19 if(pos)
20 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
21 else {
22 //cerr<<"Got decent EOF on "<<fd<<endl;
23 return 0;
24 }
25 }
26
27 if(res < 0) {
28 if(errno == EAGAIN || errno == EINTR) {
29 if(pos==0)
30 return -1;
31 waitForData(fd, -1);
32 continue;
33 }
34 unixDie("Reading from socket in Signing Pipe loop");
35 }
36
37 pos+=res;
38 if(pos == len)
39 break;
40 }
41 return len;
42 }
43 }
44
45
46 // used to pass information to the new thread
47 struct StartHelperStruct
48 {
49 StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
50 ChunkedSigningPipe* d_csp;
51 int d_id;
52 int d_fd;
53 };
54
55 // used to launcht the new thread
56 void* ChunkedSigningPipe::helperWorker(void* p)
57 try
58 {
59 StartHelperStruct shs=*(StartHelperStruct*)p;
60 delete (StartHelperStruct*)p;
61
62 shs.d_csp->worker(shs.d_id, shs.d_fd);
63 return 0;
64 }
65 catch(std::exception& e) {
66 L<<Logger::Error<<"Signing thread died with error "<<e.what()<<endl;
67 return 0;
68 }
69
70 ChunkedSigningPipe::ChunkedSigningPipe(const std::string& signerName, bool mustSign, const pdns::string& servers, unsigned int workers)
71 : d_queued(0), d_outstanding(0), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers),
72 d_mustSign(mustSign), d_final(false), d_submitted(0)
73 {
74 d_rrsetToSign = new rrset_t;
75 d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk
76
77 if(!d_mustSign)
78 return;
79
80 int fds[2];
81
82 for(unsigned int n=0; n < d_numworkers; ++n) {
83 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
84 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
85 Utility::setCloseOnExec(fds[0]);
86 Utility::setCloseOnExec(fds[1]);
87 pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
88 Utility::setNonBlocking(fds[0]);
89 d_sockets.push_back(fds[0]);
90 }
91 }
92
93 ChunkedSigningPipe::~ChunkedSigningPipe()
94 {
95 delete d_rrsetToSign;
96 if(!d_mustSign)
97 return;
98 BOOST_FOREACH(int fd, d_sockets) {
99 close(fd); // this will trigger all threads to exit
100 }
101
102 void* res;
103 BOOST_FOREACH(pthread_t& tid, d_tids) {
104 pthread_join(tid, &res);
105 }
106 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
107 }
108
109 namespace {
110 bool dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b)
111 {
112 if(tie(a.content, a.ttl) < tie(b.content, b.ttl))
113 return true;
114 if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV)
115 return a.priority < b.priority;
116 return false;
117 }
118
119 bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b)
120 {
121 if(tie(a.content, a.ttl) != tie(b.content, b.ttl))
122 return false;
123 if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV)
124 return a.priority == b.priority;
125 return true;
126 }
127 }
128
129 void ChunkedSigningPipe::dedupRRSet()
130 {
131 // our set contains contains records for one type and one name, but might not be sorted otherwise
132 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
133 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
134 }
135
136 bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
137 {
138 ++d_submitted;
139 // check if we have a full RRSET to sign
140 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_rrsetToSign->begin()->qname, rr.qname)))
141 {
142 dedupRRSet();
143 sendRRSetToWorker();
144 }
145 d_rrsetToSign->push_back(rr);
146 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
147 }
148
149 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
150 {
151 vector<pollfd> pfds;
152
153 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
154 if(d_eof.count(d_sockets[n]))
155 continue;
156 struct pollfd pfd;
157 memset(&pfd, 0, sizeof(pfd));
158 pfd.fd = d_sockets[n];
159 if(rd)
160 pfd.events |= POLLIN;
161 if(wr)
162 pfd.events |= POLLOUT;
163 pfds.push_back(pfd);
164 }
165
166 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
167 if(res < 0)
168 unixDie("polling for activity from signers, "+lexical_cast<string>(d_sockets.size()));
169 pair<vector<int>, vector<int> > vects;
170 for(unsigned int n = 0; n < pfds.size(); ++n)
171 if(pfds[n].revents & POLLIN)
172 vects.first.push_back(pfds[n].fd);
173 else if(pfds[n].revents & POLLOUT)
174 vects.second.push_back(pfds[n].fd);
175
176 return vects;
177 }
178
179 void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
180 {
181 chunk_t::const_iterator from = signedChunk->begin();
182
183 while(from != signedChunk->end()) {
184 chunk_t& fillChunk = d_chunks.back();
185
186 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
187
188 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
189
190 d_chunks.back().insert(fillChunk.end(), from , from + fit);
191 from+=fit;
192
193 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
194 d_chunks.push_back(chunk_t());
195 }
196 }
197
198 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
199 {
200 if(!d_mustSign) {
201 addSignedToChunks(d_rrsetToSign);
202 d_rrsetToSign->clear();
203 return;
204 }
205
206 if(d_final && !d_outstanding) // nothing to do!
207 return;
208
209 bool wantRead, wantWrite;
210
211 wantWrite = !d_rrsetToSign->empty();
212 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
213
214 pair<vector<int>, vector<int> > rwVect;
215
216 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
217
218 if(wantWrite && !rwVect.second.empty()) {
219 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
220 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
221 d_rrsetToSign = new rrset_t;
222 d_outstanding++;
223 d_queued++;
224 wantWrite=false;
225 }
226
227 if(wantRead) {
228 while(d_outstanding) {
229 chunk_t* chunk;
230
231 BOOST_FOREACH(int fd, rwVect.first) {
232 if(d_eof.count(fd))
233 continue;
234
235 while(d_outstanding) {
236 int res = readn(fd, &chunk, sizeof(chunk));
237 if(!res) {
238 d_eof.insert(fd);
239 break;
240 }
241 if(res < 0) {
242 if(errno != EAGAIN && errno != EINTR)
243 unixDie("Error reading signed chunk from thread");
244 else
245 break;
246 }
247
248 --d_outstanding;
249
250 addSignedToChunks(chunk);
251
252 delete chunk;
253 }
254 }
255 if(!d_outstanding || !d_final)
256 break;
257 rwVect = waitForRW(1, 0, -1); // wait for something to happen
258 }
259 }
260
261 if(wantWrite) { // our optimization above failed, we now wait synchronously
262 rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
263 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
264 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
265 d_rrsetToSign = new rrset_t;
266 d_outstanding++;
267 d_queued++;
268 }
269
270 }
271
272 unsigned int ChunkedSigningPipe::getReady()
273 {
274 unsigned int sum=0;
275 BOOST_FOREACH(const std::vector<DNSResourceRecord>& v, d_chunks) {
276 sum += v.size();
277 }
278 return sum;
279 }
280 void ChunkedSigningPipe::worker(int id, int fd)
281 try
282 {
283 DNSSECKeeper dk;
284 UeberBackend db("key-only");
285
286 chunk_t* chunk;
287 int res;
288 for(;;) {
289 res = readn(fd, &chunk, sizeof(chunk));
290 if(!res)
291 break;
292 if(res < 0)
293 unixDie("reading object pointer to sign from pdns");
294 set<string, CIStringCompare> authSet;
295 authSet.insert(d_signer);
296 addRRSigs(dk, db, authSet, *chunk);
297 ++d_signed;
298
299 writen2(fd, &chunk, sizeof(chunk));
300 }
301 close(fd);
302 }
303 catch(std::exception& e)
304 {
305 L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
306 close(fd);
307 }
308
309 void ChunkedSigningPipe::flushToSign()
310 {
311 sendRRSetToWorker();
312 d_rrsetToSign->clear();
313 }
314
315 vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
316 {
317 if(final && !d_final) {
318 // this means we should keep on reading until d_outstanding == 0
319 d_final = true;
320 flushToSign();
321
322 BOOST_FOREACH(int fd, d_sockets) {
323 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
324 //cerr<<"shutdown of "<<fd<<endl;
325 }
326 }
327 if(d_final)
328 flushToSign(); // should help us wait
329 vector<DNSResourceRecord> front=d_chunks.front();
330 d_chunks.pop_front();
331 if(d_chunks.empty())
332 d_chunks.push_back(vector<DNSResourceRecord>());
333 if(d_final && front.empty())
334 ; // cerr<<"getChunk returning empty in final"<<endl;
335 return front;
336 }
337
338 #if 0
339
340 ServiceTuple st;
341 ComboAddress remote;
342 if(!servers.empty()) {
343 st.port=2000;
344 parseService(servers, st);
345 remote=ComboAddress(st.host, st.port);
346 }
347
348 ///
349 if(!servers.empty()) {
350 fds[0] = socket(AF_INET, SOCK_STREAM, 0);
351 fds[1] = -1;
352
353 if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
354 unixDie("Connecting to signing server");
355 }
356 else {
357 /////
358 signal(SIGCHLD, SIG_IGN);
359 if(!fork()) { // child
360 dup2(fds[1], 0);
361 execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL);
362 // helperWorker(new StartHelperStruct(this, n));
363 return;
364 }
365 else
366 close(fds[1]);
367 #endif
368
369 #if 0
370 bool readLStringFromSocket(int fd, string& msg)
371 {
372 msg.clear();
373 uint32_t len;
374 if(!readn(fd, &len, sizeof(len)))
375 return false;
376
377 len = ntohl(len);
378
379 scoped_array<char> buf(new char[len]);
380 readn(fd, buf.get(), len);
381
382 msg.assign(buf.get(), len);
383 return true;
384 }
385 void writeLStringToSocket(int fd, const string& msg)
386 {
387 string realmsg;
388 uint32_t len = htonl(msg.length());
389 string tot((char*)&len, 4);
390 tot+=msg;
391
392 writen2(fd, tot.c_str(), tot.length());
393 }
394
395 #endif
396