]>
Commit | Line | Data |
---|---|---|
8e9b7d99 | 1 | #include "signingpipe.hh" |
a6ef6f7a BH |
2 | #include "misc.hh" |
3 | #include <poll.h> | |
a2aaa807 | 4 | #include <boost/foreach.hpp> |
a6ef6f7a BH |
5 | #include <sys/socket.h> |
6 | #include <netinet/in.h> | |
7 | #include <netinet/tcp.h> | |
8 | #include <sched.h> | |
8e9b7d99 | 9 | |
8d59e8ce BH |
10 | // deal with partial reads |
11 | namespace { | |
12 | int readn(int fd, void* buffer, unsigned int len) | |
13 | { | |
14 | unsigned int pos=0; | |
15 | int res; | |
16 | for(;;) { | |
17 | res = read(fd, (char*)buffer + pos, len - pos); | |
18 | if(res == 0) { | |
19 | if(pos) | |
20 | throw runtime_error("Signing Pipe remote shut down in the middle of a message"); | |
21 | else { | |
7b217f2e | 22 | //cerr<<"Got decent EOF on "<<fd<<endl; |
8d59e8ce BH |
23 | return 0; |
24 | } | |
25 | } | |
26 | ||
27 | if(res < 0) { | |
28 | if(errno == EAGAIN || errno == EINTR) { | |
29 | if(pos==0) | |
30 | return -1; | |
31 | waitForData(fd, -1); | |
32 | continue; | |
33 | } | |
34 | unixDie("Reading from socket in Signing Pipe loop"); | |
35 | } | |
36 | ||
37 | pos+=res; | |
38 | if(pos == len) | |
39 | break; | |
40 | } | |
41 | return len; | |
42 | } | |
43 | } | |
44 | ||
45 | ||
46 | // used to pass information to the new thread | |
a2aaa807 BH |
47 | struct StartHelperStruct |
48 | { | |
a6ef6f7a | 49 | StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){} |
a2aaa807 BH |
50 | ChunkedSigningPipe* d_csp; |
51 | int d_id; | |
a6ef6f7a | 52 | int d_fd; |
a2aaa807 | 53 | }; |
8267bd2c | 54 | |
a67dd0cf | 55 | // used to launch the new thread |
8267bd2c | 56 | void* ChunkedSigningPipe::helperWorker(void* p) |
bec14a20 | 57 | try |
8267bd2c | 58 | { |
a2aaa807 BH |
59 | StartHelperStruct shs=*(StartHelperStruct*)p; |
60 | delete (StartHelperStruct*)p; | |
61 | ||
a6ef6f7a | 62 | shs.d_csp->worker(shs.d_id, shs.d_fd); |
8267bd2c BH |
63 | return 0; |
64 | } | |
2e6bbd83 AT |
65 | catch(PDNSException& pe) { |
66 | L<<Logger::Error<<"Signing thread died with error "<<pe.reason<<endl; | |
67 | } | |
bec14a20 | 68 | catch(std::exception& e) { |
a4da80fc | 69 | L<<Logger::Error<<"Signing thread died with error "<<e.what()<<endl; |
a2aaa807 | 70 | return 0; |
bec14a20 | 71 | } |
8267bd2c | 72 | |
a6ef6f7a BH |
73 | ChunkedSigningPipe::ChunkedSigningPipe(const std::string& signerName, bool mustSign, const pdns::string& servers, unsigned int workers) |
74 | : d_queued(0), d_outstanding(0), d_signer(signerName), d_maxchunkrecords(100), d_numworkers(workers), d_tids(d_numworkers), | |
8d59e8ce | 75 | d_mustSign(mustSign), d_final(false), d_submitted(0) |
8267bd2c | 76 | { |
a2aaa807 | 77 | d_rrsetToSign = new rrset_t; |
8d59e8ce BH |
78 | d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk |
79 | ||
8267bd2c BH |
80 | if(!d_mustSign) |
81 | return; | |
a2aaa807 | 82 | |
a2aaa807 BH |
83 | int fds[2]; |
84 | ||
8267bd2c | 85 | for(unsigned int n=0; n < d_numworkers; ++n) { |
8d59e8ce BH |
86 | if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) |
87 | throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe"); | |
42c235e5 PD |
88 | Utility::setCloseOnExec(fds[0]); |
89 | Utility::setCloseOnExec(fds[1]); | |
8d59e8ce | 90 | pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1])); |
a6ef6f7a | 91 | Utility::setNonBlocking(fds[0]); |
8d59e8ce | 92 | d_sockets.push_back(fds[0]); |
8267bd2c BH |
93 | } |
94 | } | |
95 | ||
96 | ChunkedSigningPipe::~ChunkedSigningPipe() | |
97 | { | |
a2aaa807 | 98 | delete d_rrsetToSign; |
8267bd2c BH |
99 | if(!d_mustSign) |
100 | return; | |
a6ef6f7a BH |
101 | BOOST_FOREACH(int fd, d_sockets) { |
102 | close(fd); // this will trigger all threads to exit | |
103 | } | |
a2aaa807 | 104 | |
8267bd2c | 105 | void* res; |
a6ef6f7a BH |
106 | BOOST_FOREACH(pthread_t& tid, d_tids) { |
107 | pthread_join(tid, &res); | |
108 | } | |
7b217f2e | 109 | //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl; |
8267bd2c BH |
110 | } |
111 | ||
a2f3b9ec BH |
112 | namespace { |
113 | bool dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b) | |
114 | { | |
115 | if(tie(a.content, a.ttl) < tie(b.content, b.ttl)) | |
116 | return true; | |
117 | if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV) | |
118 | return a.priority < b.priority; | |
119 | return false; | |
120 | } | |
121 | ||
122 | bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b) | |
123 | { | |
124 | if(tie(a.content, a.ttl) != tie(b.content, b.ttl)) | |
125 | return false; | |
126 | if(a.qtype.getCode() == QType::MX || a.qtype.getCode() == QType::SRV) | |
127 | return a.priority == b.priority; | |
128 | return true; | |
129 | } | |
130 | } | |
131 | ||
132 | void ChunkedSigningPipe::dedupRRSet() | |
133 | { | |
134 | // our set contains contains records for one type and one name, but might not be sorted otherwise | |
135 | sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan); | |
136 | d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end()); | |
137 | } | |
138 | ||
8e9b7d99 BH |
139 | bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr) |
140 | { | |
8d59e8ce BH |
141 | ++d_submitted; |
142 | // check if we have a full RRSET to sign | |
a2aaa807 | 143 | if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_rrsetToSign->begin()->qname, rr.qname))) |
8e9b7d99 | 144 | { |
a2f3b9ec | 145 | dedupRRSet(); |
a2aaa807 | 146 | sendRRSetToWorker(); |
8e9b7d99 | 147 | } |
a2aaa807 | 148 | d_rrsetToSign->push_back(rr); |
a2f3b9ec | 149 | return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more" |
8e9b7d99 BH |
150 | } |
151 | ||
a6ef6f7a BH |
152 | pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds) |
153 | { | |
ac10b8c6 | 154 | vector<pollfd> pfds; |
a6ef6f7a | 155 | |
8d59e8ce | 156 | for(unsigned int n = 0; n < d_sockets.size(); ++n) { |
a4da80fc | 157 | if(d_eof.count(d_sockets[n])) |
ac10b8c6 BH |
158 | continue; |
159 | struct pollfd pfd; | |
160 | memset(&pfd, 0, sizeof(pfd)); | |
161 | pfd.fd = d_sockets[n]; | |
162 | if(rd) | |
163 | pfd.events |= POLLIN; | |
164 | if(wr) | |
165 | pfd.events |= POLLOUT; | |
166 | pfds.push_back(pfd); | |
a6ef6f7a BH |
167 | } |
168 | ||
a135720d | 169 | int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite |
a6ef6f7a | 170 | if(res < 0) |
ac10b8c6 | 171 | unixDie("polling for activity from signers, "+lexical_cast<string>(d_sockets.size())); |
a6ef6f7a | 172 | pair<vector<int>, vector<int> > vects; |
ac10b8c6 | 173 | for(unsigned int n = 0; n < pfds.size(); ++n) |
a6ef6f7a BH |
174 | if(pfds[n].revents & POLLIN) |
175 | vects.first.push_back(pfds[n].fd); | |
176 | else if(pfds[n].revents & POLLOUT) | |
177 | vects.second.push_back(pfds[n].fd); | |
178 | ||
179 | return vects; | |
180 | } | |
181 | ||
8d59e8ce BH |
182 | void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk) |
183 | { | |
184 | chunk_t::const_iterator from = signedChunk->begin(); | |
185 | ||
186 | while(from != signedChunk->end()) { | |
187 | chunk_t& fillChunk = d_chunks.back(); | |
188 | ||
189 | chunk_t::size_type room = d_maxchunkrecords - fillChunk.size(); | |
190 | ||
191 | unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from)); | |
192 | ||
193 | d_chunks.back().insert(fillChunk.end(), from , from + fit); | |
194 | from+=fit; | |
195 | ||
196 | if(from != signedChunk->end()) // it didn't fit, so add a new chunk | |
197 | d_chunks.push_back(chunk_t()); | |
198 | } | |
199 | } | |
a6ef6f7a | 200 | |
a2aaa807 | 201 | void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! |
8267bd2c BH |
202 | { |
203 | if(!d_mustSign) { | |
8d59e8ce | 204 | addSignedToChunks(d_rrsetToSign); |
a2aaa807 | 205 | d_rrsetToSign->clear(); |
8267bd2c BH |
206 | return; |
207 | } | |
a2aaa807 | 208 | |
8d59e8ce BH |
209 | if(d_final && !d_outstanding) // nothing to do! |
210 | return; | |
211 | ||
a6ef6f7a BH |
212 | bool wantRead, wantWrite; |
213 | ||
214 | wantWrite = !d_rrsetToSign->empty(); | |
8d59e8ce | 215 | wantRead = d_outstanding || wantWrite; // if we wrote, we want to read |
a6ef6f7a BH |
216 | |
217 | pair<vector<int>, vector<int> > rwVect; | |
218 | ||
8d59e8ce | 219 | rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen |
a6ef6f7a BH |
220 | |
221 | if(wantWrite && !rwVect.second.empty()) { | |
8d59e8ce | 222 | random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker |
a6ef6f7a | 223 | writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); |
a2aaa807 | 224 | d_rrsetToSign = new rrset_t; |
8267bd2c | 225 | d_outstanding++; |
a2aaa807 | 226 | d_queued++; |
8d59e8ce BH |
227 | wantWrite=false; |
228 | } | |
229 | ||
230 | if(wantRead) { | |
231 | while(d_outstanding) { | |
232 | chunk_t* chunk; | |
a6ef6f7a | 233 | |
8d59e8ce BH |
234 | BOOST_FOREACH(int fd, rwVect.first) { |
235 | if(d_eof.count(fd)) | |
236 | continue; | |
237 | ||
238 | while(d_outstanding) { | |
239 | int res = readn(fd, &chunk, sizeof(chunk)); | |
240 | if(!res) { | |
241 | d_eof.insert(fd); | |
242 | break; | |
243 | } | |
244 | if(res < 0) { | |
245 | if(errno != EAGAIN && errno != EINTR) | |
246 | unixDie("Error reading signed chunk from thread"); | |
247 | else | |
248 | break; | |
249 | } | |
250 | ||
251 | --d_outstanding; | |
252 | ||
253 | addSignedToChunks(chunk); | |
254 | ||
255 | delete chunk; | |
256 | } | |
257 | } | |
258 | if(!d_outstanding || !d_final) | |
a6ef6f7a | 259 | break; |
8d59e8ce | 260 | rwVect = waitForRW(1, 0, -1); // wait for something to happen |
a6ef6f7a | 261 | } |
8267bd2c | 262 | } |
a6ef6f7a | 263 | |
8d59e8ce BH |
264 | if(wantWrite) { // our optimization above failed, we now wait synchronously |
265 | rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen | |
266 | random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker | |
267 | writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); | |
268 | d_rrsetToSign = new rrset_t; | |
269 | d_outstanding++; | |
270 | d_queued++; | |
a6ef6f7a BH |
271 | } |
272 | ||
8267bd2c BH |
273 | } |
274 | ||
a2aaa807 BH |
275 | unsigned int ChunkedSigningPipe::getReady() |
276 | { | |
277 | unsigned int sum=0; | |
278 | BOOST_FOREACH(const std::vector<DNSResourceRecord>& v, d_chunks) { | |
279 | sum += v.size(); | |
280 | } | |
281 | return sum; | |
282 | } | |
a6ef6f7a BH |
283 | void ChunkedSigningPipe::worker(int id, int fd) |
284 | try | |
8267bd2c | 285 | { |
8267bd2c | 286 | DNSSECKeeper dk; |
a6ef6f7a BH |
287 | UeberBackend db("key-only"); |
288 | ||
289 | chunk_t* chunk; | |
8267bd2c BH |
290 | int res; |
291 | for(;;) { | |
a6ef6f7a BH |
292 | res = readn(fd, &chunk, sizeof(chunk)); |
293 | if(!res) | |
8267bd2c | 294 | break; |
a6ef6f7a BH |
295 | if(res < 0) |
296 | unixDie("reading object pointer to sign from pdns"); | |
8d3cbffa BH |
297 | set<string, CIStringCompare> authSet; |
298 | authSet.insert(d_signer); | |
299 | addRRSigs(dk, db, authSet, *chunk); | |
a6ef6f7a BH |
300 | ++d_signed; |
301 | ||
302 | writen2(fd, &chunk, sizeof(chunk)); | |
8267bd2c | 303 | } |
a6ef6f7a BH |
304 | close(fd); |
305 | } | |
18e7758c | 306 | catch(std::exception& e) |
a6ef6f7a | 307 | { |
a4da80fc | 308 | L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl; |
a6ef6f7a | 309 | close(fd); |
8267bd2c BH |
310 | } |
311 | ||
8e9b7d99 BH |
312 | void ChunkedSigningPipe::flushToSign() |
313 | { | |
a2aaa807 BH |
314 | sendRRSetToWorker(); |
315 | d_rrsetToSign->clear(); | |
8e9b7d99 BH |
316 | } |
317 | ||
318 | vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final) | |
319 | { | |
a6ef6f7a BH |
320 | if(final && !d_final) { |
321 | // this means we should keep on reading until d_outstanding == 0 | |
322 | d_final = true; | |
8e9b7d99 | 323 | flushToSign(); |
a6ef6f7a BH |
324 | |
325 | BOOST_FOREACH(int fd, d_sockets) { | |
326 | shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side | |
7b217f2e | 327 | //cerr<<"shutdown of "<<fd<<endl; |
a6ef6f7a | 328 | } |
8267bd2c | 329 | } |
a6ef6f7a BH |
330 | if(d_final) |
331 | flushToSign(); // should help us wait | |
a2aaa807 BH |
332 | vector<DNSResourceRecord> front=d_chunks.front(); |
333 | d_chunks.pop_front(); | |
334 | if(d_chunks.empty()) | |
335 | d_chunks.push_back(vector<DNSResourceRecord>()); | |
e125fc69 | 336 | /* if(d_final && front.empty()) |
337 | cerr<<"getChunk returning empty in final"<<endl; */ | |
a2aaa807 | 338 | return front; |
8e9b7d99 | 339 | } |
8d59e8ce BH |
340 | |
341 | #if 0 | |
342 | ||
343 | ServiceTuple st; | |
344 | ComboAddress remote; | |
345 | if(!servers.empty()) { | |
346 | st.port=2000; | |
347 | parseService(servers, st); | |
348 | remote=ComboAddress(st.host, st.port); | |
349 | } | |
350 | ||
351 | /// | |
352 | if(!servers.empty()) { | |
353 | fds[0] = socket(AF_INET, SOCK_STREAM, 0); | |
354 | fds[1] = -1; | |
355 | ||
356 | if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0) | |
357 | unixDie("Connecting to signing server"); | |
358 | } | |
359 | else { | |
360 | ///// | |
361 | signal(SIGCHLD, SIG_IGN); | |
362 | if(!fork()) { // child | |
363 | dup2(fds[1], 0); | |
364 | execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL); | |
365 | // helperWorker(new StartHelperStruct(this, n)); | |
366 | return; | |
367 | } | |
368 | else | |
369 | close(fds[1]); | |
370 | #endif | |
371 | ||
372 | #if 0 | |
373 | bool readLStringFromSocket(int fd, string& msg) | |
374 | { | |
375 | msg.clear(); | |
376 | uint32_t len; | |
377 | if(!readn(fd, &len, sizeof(len))) | |
378 | return false; | |
379 | ||
380 | len = ntohl(len); | |
381 | ||
382 | scoped_array<char> buf(new char[len]); | |
383 | readn(fd, buf.get(), len); | |
384 | ||
385 | msg.assign(buf.get(), len); | |
386 | return true; | |
387 | } | |
388 | void writeLStringToSocket(int fd, const string& msg) | |
389 | { | |
390 | string realmsg; | |
391 | uint32_t len = htonl(msg.length()); | |
392 | string tot((char*)&len, 4); | |
393 | tot+=msg; | |
394 | ||
395 | writen2(fd, tot.c_str(), tot.length()); | |
396 | } | |
397 | ||
398 | #endif | |
399 |