]>
Commit | Line | Data |
---|---|---|
870a0fe4 AT |
1 | #ifdef HAVE_CONFIG_H |
2 | #include "config.h" | |
3 | #endif | |
8e9b7d99 | 4 | #include "signingpipe.hh" |
a6ef6f7a | 5 | #include "misc.hh" |
d720eb8a | 6 | #include "dns_random.hh" |
a6ef6f7a | 7 | #include <poll.h> |
fa8fd4d2 | 8 | |
a6ef6f7a BH |
9 | #include <sys/socket.h> |
10 | #include <netinet/in.h> | |
11 | #include <netinet/tcp.h> | |
12 | #include <sched.h> | |
8e9b7d99 | 13 | |
8d59e8ce BH |
14 | // deal with partial reads |
15 | namespace { | |
16 | int readn(int fd, void* buffer, unsigned int len) | |
17 | { | |
18 | unsigned int pos=0; | |
19 | int res; | |
20 | for(;;) { | |
21 | res = read(fd, (char*)buffer + pos, len - pos); | |
22 | if(res == 0) { | |
23 | if(pos) | |
24 | throw runtime_error("Signing Pipe remote shut down in the middle of a message"); | |
25 | else { | |
7b217f2e | 26 | //cerr<<"Got decent EOF on "<<fd<<endl; |
8d59e8ce BH |
27 | return 0; |
28 | } | |
29 | } | |
30 | ||
31 | if(res < 0) { | |
32 | if(errno == EAGAIN || errno == EINTR) { | |
33 | if(pos==0) | |
34 | return -1; | |
e0e96007 AT |
35 | // error handled later |
36 | (void)waitForData(fd, -1); | |
8d59e8ce BH |
37 | continue; |
38 | } | |
39 | unixDie("Reading from socket in Signing Pipe loop"); | |
40 | } | |
41 | ||
42 | pos+=res; | |
43 | if(pos == len) | |
44 | break; | |
45 | } | |
46 | return len; | |
47 | } | |
48 | } | |
49 | ||
07019b51 RG |
50 | void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd) |
51 | try { | |
52 | csp->worker(fd); | |
53 | return nullptr; | |
8267bd2c | 54 | } |
97f1f8d9 | 55 | catch(...) { |
e6a9dde5 | 56 | g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl; |
07019b51 | 57 | return nullptr; |
bec14a20 | 58 | } |
8267bd2c | 59 | |
3af419da | 60 | ChunkedSigningPipe::ChunkedSigningPipe(DNSName signerName, bool mustSign, unsigned int workers, unsigned int maxChunkRecords) |
1290c1d2 | 61 | : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(std::move(signerName)), |
3af419da | 62 | d_maxchunkrecords(maxChunkRecords), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false) |
8267bd2c | 63 | { |
c2826d2e | 64 | d_rrsetToSign = make_unique<rrset_t>(); |
90ba52e0 | 65 | d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk |
8d59e8ce | 66 | |
8267bd2c BH |
67 | if(!d_mustSign) |
68 | return; | |
a2aaa807 | 69 | |
a2aaa807 BH |
70 | int fds[2]; |
71 | ||
8267bd2c | 72 | for(unsigned int n=0; n < d_numworkers; ++n) { |
8d59e8ce BH |
73 | if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) |
74 | throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe"); | |
3897b9e1 | 75 | setCloseOnExec(fds[0]); |
76 | setCloseOnExec(fds[1]); | |
07019b51 | 77 | d_threads[n] = std::thread(helperWorker, this, fds[1]); |
3897b9e1 | 78 | setNonBlocking(fds[0]); |
8d59e8ce | 79 | d_sockets.push_back(fds[0]); |
e3200e07 | 80 | d_outstandings[fds[0]] = 0; |
8267bd2c BH |
81 | } |
82 | } | |
83 | ||
84 | ChunkedSigningPipe::~ChunkedSigningPipe() | |
85 | { | |
86 | if(!d_mustSign) | |
87 | return; | |
07019b51 | 88 | |
ef7cd021 | 89 | for(int fd : d_sockets) { |
a6ef6f7a BH |
90 | close(fd); // this will trigger all threads to exit |
91 | } | |
07019b51 RG |
92 | |
93 | for(auto& thread : d_threads) { | |
94 | thread.join(); | |
a6ef6f7a | 95 | } |
7b217f2e | 96 | //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl; |
8267bd2c BH |
97 | } |
98 | ||
a2f3b9ec | 99 | namespace { |
b9bafae0 | 100 | bool |
90ba52e0 | 101 | dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b) |
a2f3b9ec | 102 | { |
0b0882f5 | 103 | return std::tuple(a.dr.getContent()->getZoneRepresentation(), a.dr.d_ttl) < std::tuple(b.dr.getContent()->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW |
a2f3b9ec BH |
104 | } |
105 | ||
90ba52e0 | 106 | bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b) |
a2f3b9ec | 107 | { |
0b0882f5 | 108 | return std::tuple(a.dr.getContent()->getZoneRepresentation(), a.dr.d_ttl) == std::tuple(b.dr.getContent()->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW |
a2f3b9ec BH |
109 | } |
110 | } | |
111 | ||
112 | void ChunkedSigningPipe::dedupRRSet() | |
113 | { | |
114 | // our set contains contains records for one type and one name, but might not be sorted otherwise | |
115 | sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan); | |
116 | d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end()); | |
117 | } | |
118 | ||
90ba52e0 | 119 | bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr) |
8e9b7d99 | 120 | { |
8d59e8ce BH |
121 | ++d_submitted; |
122 | // check if we have a full RRSET to sign | |
90ba52e0 | 123 | if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name)) |
8e9b7d99 | 124 | { |
a2f3b9ec | 125 | dedupRRSet(); |
a2aaa807 | 126 | sendRRSetToWorker(); |
8e9b7d99 | 127 | } |
a2aaa807 | 128 | d_rrsetToSign->push_back(rr); |
a2f3b9ec | 129 | return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more" |
8e9b7d99 BH |
130 | } |
131 | ||
a6ef6f7a BH |
132 | pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds) |
133 | { | |
ac10b8c6 | 134 | vector<pollfd> pfds; |
e3200e07 | 135 | |
403a3a42 O |
136 | for(int & socket : d_sockets) { |
137 | if(d_eof.count(socket)) | |
ac10b8c6 BH |
138 | continue; |
139 | struct pollfd pfd; | |
140 | memset(&pfd, 0, sizeof(pfd)); | |
403a3a42 | 141 | pfd.fd = socket; |
ac10b8c6 BH |
142 | if(rd) |
143 | pfd.events |= POLLIN; | |
144 | if(wr) | |
145 | pfd.events |= POLLOUT; | |
146 | pfds.push_back(pfd); | |
a6ef6f7a | 147 | } |
e3200e07 | 148 | |
a135720d | 149 | int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite |
a6ef6f7a | 150 | if(res < 0) |
335da0ba | 151 | unixDie("polling for activity from signers, "+std::to_string(d_sockets.size())); |
a6ef6f7a | 152 | pair<vector<int>, vector<int> > vects; |
d7f67000 RP |
153 | for(auto & pfd : pfds) |
154 | if(pfd.revents & POLLIN) | |
155 | vects.first.push_back(pfd.fd); | |
156 | else if(pfd.revents & POLLOUT) | |
157 | vects.second.push_back(pfd.fd); | |
a6ef6f7a BH |
158 | |
159 | return vects; | |
160 | } | |
161 | ||
c2826d2e | 162 | void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr<chunk_t>& signedChunk) |
8d59e8ce BH |
163 | { |
164 | chunk_t::const_iterator from = signedChunk->begin(); | |
165 | ||
166 | while(from != signedChunk->end()) { | |
167 | chunk_t& fillChunk = d_chunks.back(); | |
8d59e8ce BH |
168 | chunk_t::size_type room = d_maxchunkrecords - fillChunk.size(); |
169 | ||
170 | unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from)); | |
171 | ||
172 | d_chunks.back().insert(fillChunk.end(), from , from + fit); | |
173 | from+=fit; | |
c2826d2e | 174 | |
8d59e8ce BH |
175 | if(from != signedChunk->end()) // it didn't fit, so add a new chunk |
176 | d_chunks.push_back(chunk_t()); | |
177 | } | |
178 | } | |
a6ef6f7a | 179 | |
a2aaa807 | 180 | void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! |
8267bd2c BH |
181 | { |
182 | if(!d_mustSign) { | |
8d59e8ce | 183 | addSignedToChunks(d_rrsetToSign); |
a2aaa807 | 184 | d_rrsetToSign->clear(); |
8267bd2c BH |
185 | return; |
186 | } | |
a2aaa807 | 187 | |
8d59e8ce BH |
188 | if(d_final && !d_outstanding) // nothing to do! |
189 | return; | |
190 | ||
a6ef6f7a BH |
191 | bool wantRead, wantWrite; |
192 | ||
193 | wantWrite = !d_rrsetToSign->empty(); | |
8d59e8ce | 194 | wantRead = d_outstanding || wantWrite; // if we wrote, we want to read |
a6ef6f7a BH |
195 | |
196 | pair<vector<int>, vector<int> > rwVect; | |
197 | ||
e3200e07 | 198 | rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen |
a6ef6f7a BH |
199 | |
200 | if(wantWrite && !rwVect.second.empty()) { | |
d720eb8a | 201 | shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker |
68a9a74c OM |
202 | auto ptr = d_rrsetToSign.get(); |
203 | writen2(*rwVect.second.begin(), &ptr, sizeof(ptr)); | |
eaf2d918 OM |
204 | // coverity[leaked_storage] |
205 | static_cast<void>(d_rrsetToSign.release()); | |
c2826d2e | 206 | d_rrsetToSign = make_unique<rrset_t>(); |
e3200e07 | 207 | d_outstandings[*rwVect.second.begin()]++; |
8267bd2c | 208 | d_outstanding++; |
a2aaa807 | 209 | d_queued++; |
8d59e8ce BH |
210 | wantWrite=false; |
211 | } | |
212 | ||
213 | if(wantRead) { | |
214 | while(d_outstanding) { | |
ef7cd021 | 215 | for(int fd : rwVect.first) { |
8d59e8ce BH |
216 | if(d_eof.count(fd)) |
217 | continue; | |
218 | ||
219 | while(d_outstanding) { | |
c2826d2e | 220 | chunk_t* chunk = nullptr; |
8d59e8ce BH |
221 | int res = readn(fd, &chunk, sizeof(chunk)); |
222 | if(!res) { | |
e3200e07 RG |
223 | if (d_outstandings[fd] > 0) { |
224 | throw std::runtime_error("A signing pipe worker died while we were waiting for its result"); | |
225 | } | |
8d59e8ce BH |
226 | d_eof.insert(fd); |
227 | break; | |
228 | } | |
229 | if(res < 0) { | |
230 | if(errno != EAGAIN && errno != EINTR) | |
231 | unixDie("Error reading signed chunk from thread"); | |
232 | else | |
233 | break; | |
234 | } | |
c2826d2e RG |
235 | |
236 | std::unique_ptr<rrset_t> chunkPtr(chunk); | |
237 | chunk = nullptr; | |
8d59e8ce | 238 | --d_outstanding; |
e3200e07 | 239 | d_outstandings[fd]--; |
8d59e8ce | 240 | |
c2826d2e | 241 | addSignedToChunks(chunkPtr); |
8d59e8ce BH |
242 | } |
243 | } | |
244 | if(!d_outstanding || !d_final) | |
a6ef6f7a | 245 | break; |
e3200e07 | 246 | rwVect = waitForRW(true, false, -1); // wait for something to happen |
a6ef6f7a | 247 | } |
8267bd2c | 248 | } |
a6ef6f7a | 249 | |
8d59e8ce | 250 | if(wantWrite) { // our optimization above failed, we now wait synchronously |
e3200e07 | 251 | rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen |
d720eb8a | 252 | shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker |
68a9a74c OM |
253 | auto ptr = d_rrsetToSign.get(); |
254 | writen2(*rwVect.second.begin(), &ptr, sizeof(ptr)); | |
eaf2d918 OM |
255 | // coverity[leaked_storage] |
256 | static_cast<void>(d_rrsetToSign.release()); | |
c2826d2e | 257 | d_rrsetToSign = make_unique<rrset_t>(); |
e3200e07 | 258 | d_outstandings[*rwVect.second.begin()]++; |
8d59e8ce BH |
259 | d_outstanding++; |
260 | d_queued++; | |
a6ef6f7a BH |
261 | } |
262 | ||
8267bd2c BH |
263 | } |
264 | ||
e3200e07 | 265 | unsigned int ChunkedSigningPipe::getReady() const |
a2aaa807 BH |
266 | { |
267 | unsigned int sum=0; | |
90ba52e0 | 268 | for(const auto& v : d_chunks) { |
a2aaa807 BH |
269 | sum += v.size(); |
270 | } | |
271 | return sum; | |
272 | } | |
07019b51 RG |
273 | |
274 | void ChunkedSigningPipe::worker(int fd) | |
a6ef6f7a | 275 | try |
8267bd2c | 276 | { |
a6ef6f7a | 277 | UeberBackend db("key-only"); |
ea99d474 | 278 | DNSSECKeeper dk(&db); |
a6ef6f7a | 279 | |
c23d888d | 280 | chunk_t* chunk = nullptr; |
8267bd2c BH |
281 | int res; |
282 | for(;;) { | |
a6ef6f7a BH |
283 | res = readn(fd, &chunk, sizeof(chunk)); |
284 | if(!res) | |
8267bd2c | 285 | break; |
a6ef6f7a BH |
286 | if(res < 0) |
287 | unixDie("reading object pointer to sign from pdns"); | |
c23d888d RG |
288 | try { |
289 | set<DNSName> authSet; | |
290 | authSet.insert(d_signer); | |
291 | addRRSigs(dk, db, authSet, *chunk); | |
292 | ++d_signed; | |
293 | ||
294 | writen2(fd, &chunk, sizeof(chunk)); | |
295 | chunk = nullptr; | |
296 | } | |
297 | catch(const PDNSException& pe) { | |
298 | delete chunk; | |
299 | throw; | |
300 | } | |
301 | catch(const std::exception& e) { | |
302 | delete chunk; | |
303 | throw; | |
304 | } | |
8267bd2c | 305 | } |
a6ef6f7a BH |
306 | close(fd); |
307 | } | |
c23d888d | 308 | catch(const PDNSException& pe) |
97f1f8d9 | 309 | { |
e6a9dde5 | 310 | g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl; |
97f1f8d9 KM |
311 | close(fd); |
312 | } | |
c23d888d | 313 | catch(const std::exception& e) |
a6ef6f7a | 314 | { |
e6a9dde5 | 315 | g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl; |
a6ef6f7a | 316 | close(fd); |
8267bd2c BH |
317 | } |
318 | ||
8e9b7d99 BH |
319 | void ChunkedSigningPipe::flushToSign() |
320 | { | |
a2aaa807 BH |
321 | sendRRSetToWorker(); |
322 | d_rrsetToSign->clear(); | |
8e9b7d99 BH |
323 | } |
324 | ||
90ba52e0 | 325 | vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final) |
8e9b7d99 | 326 | { |
a6ef6f7a BH |
327 | if(final && !d_final) { |
328 | // this means we should keep on reading until d_outstanding == 0 | |
329 | d_final = true; | |
8e9b7d99 | 330 | flushToSign(); |
a6ef6f7a | 331 | |
ef7cd021 | 332 | for(int fd : d_sockets) { |
a6ef6f7a | 333 | shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side |
7b217f2e | 334 | //cerr<<"shutdown of "<<fd<<endl; |
a6ef6f7a | 335 | } |
8267bd2c | 336 | } |
a6ef6f7a BH |
337 | if(d_final) |
338 | flushToSign(); // should help us wait | |
90ba52e0 | 339 | vector<DNSZoneRecord> front=d_chunks.front(); |
a2aaa807 BH |
340 | d_chunks.pop_front(); |
341 | if(d_chunks.empty()) | |
90ba52e0 | 342 | d_chunks.push_back(vector<DNSZoneRecord>()); |
e125fc69 | 343 | /* if(d_final && front.empty()) |
344 | cerr<<"getChunk returning empty in final"<<endl; */ | |
a2aaa807 | 345 | return front; |
8e9b7d99 | 346 | } |
8d59e8ce | 347 | |
8d59e8ce | 348 |