]>
Commit | Line | Data |
---|---|---|
1 | #ifdef HAVE_CONFIG_H | |
2 | #include "config.h" | |
3 | #endif | |
4 | #include "signingpipe.hh" | |
5 | #include "misc.hh" | |
6 | #include <poll.h> | |
7 | ||
8 | #include <sys/socket.h> | |
9 | #include <netinet/in.h> | |
10 | #include <netinet/tcp.h> | |
11 | #include <sched.h> | |
12 | ||
13 | // deal with partial reads | |
14 | namespace { | |
15 | int readn(int fd, void* buffer, unsigned int len) | |
16 | { | |
17 | unsigned int pos=0; | |
18 | int res; | |
19 | for(;;) { | |
20 | res = read(fd, (char*)buffer + pos, len - pos); | |
21 | if(res == 0) { | |
22 | if(pos) | |
23 | throw runtime_error("Signing Pipe remote shut down in the middle of a message"); | |
24 | else { | |
25 | //cerr<<"Got decent EOF on "<<fd<<endl; | |
26 | return 0; | |
27 | } | |
28 | } | |
29 | ||
30 | if(res < 0) { | |
31 | if(errno == EAGAIN || errno == EINTR) { | |
32 | if(pos==0) | |
33 | return -1; | |
34 | waitForData(fd, -1); | |
35 | continue; | |
36 | } | |
37 | unixDie("Reading from socket in Signing Pipe loop"); | |
38 | } | |
39 | ||
40 | pos+=res; | |
41 | if(pos == len) | |
42 | break; | |
43 | } | |
44 | return len; | |
45 | } | |
46 | } | |
47 | ||
48 | void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd) | |
49 | try { | |
50 | csp->worker(fd); | |
51 | return nullptr; | |
52 | } | |
53 | catch(...) { | |
54 | g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl; | |
55 | return nullptr; | |
56 | } | |
57 | ||
58 | ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers) | |
59 | : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName), | |
60 | d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false) | |
61 | { | |
62 | d_rrsetToSign = new rrset_t; | |
63 | d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk | |
64 | ||
65 | if(!d_mustSign) | |
66 | return; | |
67 | ||
68 | int fds[2]; | |
69 | ||
70 | for(unsigned int n=0; n < d_numworkers; ++n) { | |
71 | if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) | |
72 | throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe"); | |
73 | setCloseOnExec(fds[0]); | |
74 | setCloseOnExec(fds[1]); | |
75 | d_threads[n] = std::thread(helperWorker, this, fds[1]); | |
76 | setNonBlocking(fds[0]); | |
77 | d_sockets.push_back(fds[0]); | |
78 | d_outstandings[fds[0]] = 0; | |
79 | } | |
80 | } | |
81 | ||
82 | ChunkedSigningPipe::~ChunkedSigningPipe() | |
83 | { | |
84 | delete d_rrsetToSign; | |
85 | ||
86 | if(!d_mustSign) | |
87 | return; | |
88 | ||
89 | for(int fd : d_sockets) { | |
90 | close(fd); // this will trigger all threads to exit | |
91 | } | |
92 | ||
93 | for(auto& thread : d_threads) { | |
94 | thread.join(); | |
95 | } | |
96 | //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl; | |
97 | } | |
98 | ||
99 | namespace { | |
100 | bool | |
101 | dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b) | |
102 | { | |
103 | return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW | |
104 | } | |
105 | ||
106 | bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b) | |
107 | { | |
108 | return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW | |
109 | } | |
110 | } | |
111 | ||
112 | void ChunkedSigningPipe::dedupRRSet() | |
113 | { | |
114 | // our set contains contains records for one type and one name, but might not be sorted otherwise | |
115 | sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan); | |
116 | d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end()); | |
117 | } | |
118 | ||
119 | bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr) | |
120 | { | |
121 | ++d_submitted; | |
122 | // check if we have a full RRSET to sign | |
123 | if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name)) | |
124 | { | |
125 | dedupRRSet(); | |
126 | sendRRSetToWorker(); | |
127 | } | |
128 | d_rrsetToSign->push_back(rr); | |
129 | return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more" | |
130 | } | |
131 | ||
132 | pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds) | |
133 | { | |
134 | vector<pollfd> pfds; | |
135 | ||
136 | for(unsigned int n = 0; n < d_sockets.size(); ++n) { | |
137 | if(d_eof.count(d_sockets[n])) | |
138 | continue; | |
139 | struct pollfd pfd; | |
140 | memset(&pfd, 0, sizeof(pfd)); | |
141 | pfd.fd = d_sockets[n]; | |
142 | if(rd) | |
143 | pfd.events |= POLLIN; | |
144 | if(wr) | |
145 | pfd.events |= POLLOUT; | |
146 | pfds.push_back(pfd); | |
147 | } | |
148 | ||
149 | int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite | |
150 | if(res < 0) | |
151 | unixDie("polling for activity from signers, "+std::to_string(d_sockets.size())); | |
152 | pair<vector<int>, vector<int> > vects; | |
153 | for(unsigned int n = 0; n < pfds.size(); ++n) | |
154 | if(pfds[n].revents & POLLIN) | |
155 | vects.first.push_back(pfds[n].fd); | |
156 | else if(pfds[n].revents & POLLOUT) | |
157 | vects.second.push_back(pfds[n].fd); | |
158 | ||
159 | return vects; | |
160 | } | |
161 | ||
162 | void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk) | |
163 | { | |
164 | chunk_t::const_iterator from = signedChunk->begin(); | |
165 | ||
166 | while(from != signedChunk->end()) { | |
167 | chunk_t& fillChunk = d_chunks.back(); | |
168 | ||
169 | chunk_t::size_type room = d_maxchunkrecords - fillChunk.size(); | |
170 | ||
171 | unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from)); | |
172 | ||
173 | d_chunks.back().insert(fillChunk.end(), from , from + fit); | |
174 | from+=fit; | |
175 | ||
176 | if(from != signedChunk->end()) // it didn't fit, so add a new chunk | |
177 | d_chunks.push_back(chunk_t()); | |
178 | } | |
179 | } | |
180 | ||
181 | void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist! | |
182 | { | |
183 | if(!d_mustSign) { | |
184 | addSignedToChunks(d_rrsetToSign); | |
185 | d_rrsetToSign->clear(); | |
186 | return; | |
187 | } | |
188 | ||
189 | if(d_final && !d_outstanding) // nothing to do! | |
190 | return; | |
191 | ||
192 | bool wantRead, wantWrite; | |
193 | ||
194 | wantWrite = !d_rrsetToSign->empty(); | |
195 | wantRead = d_outstanding || wantWrite; // if we wrote, we want to read | |
196 | ||
197 | pair<vector<int>, vector<int> > rwVect; | |
198 | ||
199 | rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen | |
200 | ||
201 | if(wantWrite && !rwVect.second.empty()) { | |
202 | random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker | |
203 | writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); | |
204 | d_rrsetToSign = new rrset_t; | |
205 | d_outstandings[*rwVect.second.begin()]++; | |
206 | d_outstanding++; | |
207 | d_queued++; | |
208 | wantWrite=false; | |
209 | } | |
210 | ||
211 | if(wantRead) { | |
212 | while(d_outstanding) { | |
213 | chunk_t* chunk; | |
214 | ||
215 | for(int fd : rwVect.first) { | |
216 | if(d_eof.count(fd)) | |
217 | continue; | |
218 | ||
219 | while(d_outstanding) { | |
220 | int res = readn(fd, &chunk, sizeof(chunk)); | |
221 | if(!res) { | |
222 | if (d_outstandings[fd] > 0) { | |
223 | throw std::runtime_error("A signing pipe worker died while we were waiting for its result"); | |
224 | } | |
225 | d_eof.insert(fd); | |
226 | break; | |
227 | } | |
228 | if(res < 0) { | |
229 | if(errno != EAGAIN && errno != EINTR) | |
230 | unixDie("Error reading signed chunk from thread"); | |
231 | else | |
232 | break; | |
233 | } | |
234 | ||
235 | --d_outstanding; | |
236 | d_outstandings[fd]--; | |
237 | ||
238 | addSignedToChunks(chunk); | |
239 | ||
240 | delete chunk; | |
241 | } | |
242 | } | |
243 | if(!d_outstanding || !d_final) | |
244 | break; | |
245 | rwVect = waitForRW(true, false, -1); // wait for something to happen | |
246 | } | |
247 | } | |
248 | ||
249 | if(wantWrite) { // our optimization above failed, we now wait synchronously | |
250 | rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen | |
251 | random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker | |
252 | writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign)); | |
253 | d_rrsetToSign = new rrset_t; | |
254 | d_outstandings[*rwVect.second.begin()]++; | |
255 | d_outstanding++; | |
256 | d_queued++; | |
257 | } | |
258 | ||
259 | } | |
260 | ||
261 | unsigned int ChunkedSigningPipe::getReady() const | |
262 | { | |
263 | unsigned int sum=0; | |
264 | for(const auto& v : d_chunks) { | |
265 | sum += v.size(); | |
266 | } | |
267 | return sum; | |
268 | } | |
269 | ||
270 | void ChunkedSigningPipe::worker(int fd) | |
271 | try | |
272 | { | |
273 | UeberBackend db("key-only"); | |
274 | DNSSECKeeper dk(&db); | |
275 | ||
276 | chunk_t* chunk = nullptr; | |
277 | int res; | |
278 | for(;;) { | |
279 | res = readn(fd, &chunk, sizeof(chunk)); | |
280 | if(!res) | |
281 | break; | |
282 | if(res < 0) | |
283 | unixDie("reading object pointer to sign from pdns"); | |
284 | try { | |
285 | set<DNSName> authSet; | |
286 | authSet.insert(d_signer); | |
287 | addRRSigs(dk, db, authSet, *chunk); | |
288 | ++d_signed; | |
289 | ||
290 | writen2(fd, &chunk, sizeof(chunk)); | |
291 | chunk = nullptr; | |
292 | } | |
293 | catch(const PDNSException& pe) { | |
294 | delete chunk; | |
295 | throw; | |
296 | } | |
297 | catch(const std::exception& e) { | |
298 | delete chunk; | |
299 | throw; | |
300 | } | |
301 | } | |
302 | close(fd); | |
303 | } | |
304 | catch(const PDNSException& pe) | |
305 | { | |
306 | g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl; | |
307 | close(fd); | |
308 | } | |
309 | catch(const std::exception& e) | |
310 | { | |
311 | g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl; | |
312 | close(fd); | |
313 | } | |
314 | ||
315 | void ChunkedSigningPipe::flushToSign() | |
316 | { | |
317 | sendRRSetToWorker(); | |
318 | d_rrsetToSign->clear(); | |
319 | } | |
320 | ||
321 | vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final) | |
322 | { | |
323 | if(final && !d_final) { | |
324 | // this means we should keep on reading until d_outstanding == 0 | |
325 | d_final = true; | |
326 | flushToSign(); | |
327 | ||
328 | for(int fd : d_sockets) { | |
329 | shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side | |
330 | //cerr<<"shutdown of "<<fd<<endl; | |
331 | } | |
332 | } | |
333 | if(d_final) | |
334 | flushToSign(); // should help us wait | |
335 | vector<DNSZoneRecord> front=d_chunks.front(); | |
336 | d_chunks.pop_front(); | |
337 | if(d_chunks.empty()) | |
338 | d_chunks.push_back(vector<DNSZoneRecord>()); | |
339 | /* if(d_final && front.empty()) | |
340 | cerr<<"getChunk returning empty in final"<<endl; */ | |
341 | return front; | |
342 | } | |
343 | ||
344 |