]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
Merge pull request #13454 from omoerbeek/warnings-followup
[thirdparty/pdns.git] / pdns / signingpipe.cc
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include "signingpipe.hh"
5 #include "misc.hh"
6 #include "dns_random.hh"
7 #include <poll.h>
8
9 #include <sys/socket.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
12 #include <sched.h>
13
14 // deal with partial reads
15 namespace {
16 int readn(int fd, void* buffer, unsigned int len)
17 {
18 unsigned int pos=0;
19 int res;
20 for(;;) {
21 res = read(fd, (char*)buffer + pos, len - pos);
22 if(res == 0) {
23 if(pos)
24 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 else {
26 //cerr<<"Got decent EOF on "<<fd<<endl;
27 return 0;
28 }
29 }
30
31 if(res < 0) {
32 if(errno == EAGAIN || errno == EINTR) {
33 if(pos==0)
34 return -1;
35 // error handled later
36 (void)waitForData(fd, -1);
37 continue;
38 }
39 unixDie("Reading from socket in Signing Pipe loop");
40 }
41
42 pos+=res;
43 if(pos == len)
44 break;
45 }
46 return len;
47 }
48 }
49
50 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
51 try {
52 csp->worker(fd);
53 return nullptr;
54 }
55 catch(...) {
56 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
57 return nullptr;
58 }
59
60 ChunkedSigningPipe::ChunkedSigningPipe(DNSName signerName, bool mustSign, unsigned int workers, unsigned int maxChunkRecords)
61 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(std::move(signerName)),
62 d_maxchunkrecords(maxChunkRecords), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
63 {
64 d_rrsetToSign = make_unique<rrset_t>();
65 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
66
67 if(!d_mustSign)
68 return;
69
70 int fds[2];
71
72 for(unsigned int n=0; n < d_numworkers; ++n) {
73 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
74 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
75 setCloseOnExec(fds[0]);
76 setCloseOnExec(fds[1]);
77 d_threads[n] = std::thread(helperWorker, this, fds[1]);
78 setNonBlocking(fds[0]);
79 d_sockets.push_back(fds[0]);
80 d_outstandings[fds[0]] = 0;
81 }
82 }
83
84 ChunkedSigningPipe::~ChunkedSigningPipe()
85 {
86 if(!d_mustSign)
87 return;
88
89 for(int fd : d_sockets) {
90 close(fd); // this will trigger all threads to exit
91 }
92
93 for(auto& thread : d_threads) {
94 thread.join();
95 }
96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
97 }
98
99 namespace {
100 bool
101 dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
102 {
103 return std::tuple(a.dr.getContent()->getZoneRepresentation(), a.dr.d_ttl) < std::tuple(b.dr.getContent()->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
104 }
105
106 bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
107 {
108 return std::tuple(a.dr.getContent()->getZoneRepresentation(), a.dr.d_ttl) == std::tuple(b.dr.getContent()->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
109 }
110 }
111
112 void ChunkedSigningPipe::dedupRRSet()
113 {
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
116 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
117 }
118
119 bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
120 {
121 ++d_submitted;
122 // check if we have a full RRSET to sign
123 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
124 {
125 dedupRRSet();
126 sendRRSetToWorker();
127 }
128 d_rrsetToSign->push_back(rr);
129 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
130 }
131
132 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
133 {
134 vector<pollfd> pfds;
135
136 for(int & socket : d_sockets) {
137 if(d_eof.count(socket))
138 continue;
139 struct pollfd pfd;
140 memset(&pfd, 0, sizeof(pfd));
141 pfd.fd = socket;
142 if(rd)
143 pfd.events |= POLLIN;
144 if(wr)
145 pfd.events |= POLLOUT;
146 pfds.push_back(pfd);
147 }
148
149 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
150 if(res < 0)
151 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
152 pair<vector<int>, vector<int> > vects;
153 for(auto & pfd : pfds)
154 if(pfd.revents & POLLIN)
155 vects.first.push_back(pfd.fd);
156 else if(pfd.revents & POLLOUT)
157 vects.second.push_back(pfd.fd);
158
159 return vects;
160 }
161
162 void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr<chunk_t>& signedChunk)
163 {
164 chunk_t::const_iterator from = signedChunk->begin();
165
166 while(from != signedChunk->end()) {
167 chunk_t& fillChunk = d_chunks.back();
168 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
169
170 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
171
172 d_chunks.back().insert(fillChunk.end(), from , from + fit);
173 from+=fit;
174
175 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
176 d_chunks.push_back(chunk_t());
177 }
178 }
179
180 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
181 {
182 if(!d_mustSign) {
183 addSignedToChunks(d_rrsetToSign);
184 d_rrsetToSign->clear();
185 return;
186 }
187
188 if(d_final && !d_outstanding) // nothing to do!
189 return;
190
191 bool wantRead, wantWrite;
192
193 wantWrite = !d_rrsetToSign->empty();
194 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
195
196 pair<vector<int>, vector<int> > rwVect;
197
198 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
199
200 if(wantWrite && !rwVect.second.empty()) {
201 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
202 auto ptr = d_rrsetToSign.get();
203 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
204 // coverity[leaked_storage]
205 static_cast<void>(d_rrsetToSign.release());
206 d_rrsetToSign = make_unique<rrset_t>();
207 d_outstandings[*rwVect.second.begin()]++;
208 d_outstanding++;
209 d_queued++;
210 wantWrite=false;
211 }
212
213 if(wantRead) {
214 while(d_outstanding) {
215 for(int fd : rwVect.first) {
216 if(d_eof.count(fd))
217 continue;
218
219 while(d_outstanding) {
220 chunk_t* chunk = nullptr;
221 int res = readn(fd, &chunk, sizeof(chunk));
222 if(!res) {
223 if (d_outstandings[fd] > 0) {
224 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
225 }
226 d_eof.insert(fd);
227 break;
228 }
229 if(res < 0) {
230 if(errno != EAGAIN && errno != EINTR)
231 unixDie("Error reading signed chunk from thread");
232 else
233 break;
234 }
235
236 std::unique_ptr<rrset_t> chunkPtr(chunk);
237 chunk = nullptr;
238 --d_outstanding;
239 d_outstandings[fd]--;
240
241 addSignedToChunks(chunkPtr);
242 }
243 }
244 if(!d_outstanding || !d_final)
245 break;
246 rwVect = waitForRW(true, false, -1); // wait for something to happen
247 }
248 }
249
250 if(wantWrite) { // our optimization above failed, we now wait synchronously
251 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
252 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
253 auto ptr = d_rrsetToSign.get();
254 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
255 // coverity[leaked_storage]
256 static_cast<void>(d_rrsetToSign.release());
257 d_rrsetToSign = make_unique<rrset_t>();
258 d_outstandings[*rwVect.second.begin()]++;
259 d_outstanding++;
260 d_queued++;
261 }
262
263 }
264
265 unsigned int ChunkedSigningPipe::getReady() const
266 {
267 unsigned int sum=0;
268 for(const auto& v : d_chunks) {
269 sum += v.size();
270 }
271 return sum;
272 }
273
274 void ChunkedSigningPipe::worker(int fd)
275 try
276 {
277 UeberBackend db("key-only");
278 DNSSECKeeper dk(&db);
279
280 chunk_t* chunk = nullptr;
281 int res;
282 for(;;) {
283 res = readn(fd, &chunk, sizeof(chunk));
284 if(!res)
285 break;
286 if(res < 0)
287 unixDie("reading object pointer to sign from pdns");
288 try {
289 set<DNSName> authSet;
290 authSet.insert(d_signer);
291 addRRSigs(dk, db, authSet, *chunk);
292 ++d_signed;
293
294 writen2(fd, &chunk, sizeof(chunk));
295 chunk = nullptr;
296 }
297 catch(const PDNSException& pe) {
298 delete chunk;
299 throw;
300 }
301 catch(const std::exception& e) {
302 delete chunk;
303 throw;
304 }
305 }
306 close(fd);
307 }
308 catch(const PDNSException& pe)
309 {
310 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
311 close(fd);
312 }
313 catch(const std::exception& e)
314 {
315 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
316 close(fd);
317 }
318
319 void ChunkedSigningPipe::flushToSign()
320 {
321 sendRRSetToWorker();
322 d_rrsetToSign->clear();
323 }
324
325 vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
326 {
327 if(final && !d_final) {
328 // this means we should keep on reading until d_outstanding == 0
329 d_final = true;
330 flushToSign();
331
332 for(int fd : d_sockets) {
333 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
334 //cerr<<"shutdown of "<<fd<<endl;
335 }
336 }
337 if(d_final)
338 flushToSign(); // should help us wait
339 vector<DNSZoneRecord> front=d_chunks.front();
340 d_chunks.pop_front();
341 if(d_chunks.empty())
342 d_chunks.push_back(vector<DNSZoneRecord>());
343 /* if(d_final && front.empty())
344 cerr<<"getChunk returning empty in final"<<endl; */
345 return front;
346 }
347
348