]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/signingpipe.cc
Merge pull request #14195 from rgacogne/ddist-no-assertions
[thirdparty/pdns.git] / pdns / signingpipe.cc
CommitLineData
870a0fe4
AT
1#ifdef HAVE_CONFIG_H
2#include "config.h"
3#endif
8e9b7d99 4#include "signingpipe.hh"
a6ef6f7a 5#include "misc.hh"
d720eb8a 6#include "dns_random.hh"
a6ef6f7a 7#include <poll.h>
fa8fd4d2 8
a6ef6f7a
BH
9#include <sys/socket.h>
10#include <netinet/in.h>
11#include <netinet/tcp.h>
12#include <sched.h>
8e9b7d99 13
8d59e8ce
BH
14// deal with partial reads
15namespace {
16int readn(int fd, void* buffer, unsigned int len)
17{
18 unsigned int pos=0;
19 int res;
20 for(;;) {
21 res = read(fd, (char*)buffer + pos, len - pos);
22 if(res == 0) {
23 if(pos)
24 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 else {
7b217f2e 26 //cerr<<"Got decent EOF on "<<fd<<endl;
8d59e8ce
BH
27 return 0;
28 }
29 }
30
31 if(res < 0) {
32 if(errno == EAGAIN || errno == EINTR) {
33 if(pos==0)
34 return -1;
e0e96007
AT
35 // error handled later
36 (void)waitForData(fd, -1);
8d59e8ce
BH
37 continue;
38 }
39 unixDie("Reading from socket in Signing Pipe loop");
40 }
41
42 pos+=res;
43 if(pos == len)
44 break;
45 }
46 return len;
47}
48}
49
07019b51
RG
50void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
51try {
52 csp->worker(fd);
53 return nullptr;
8267bd2c 54}
97f1f8d9 55catch(...) {
e6a9dde5 56 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
07019b51 57 return nullptr;
bec14a20 58}
8267bd2c 59
3af419da 60ChunkedSigningPipe::ChunkedSigningPipe(DNSName signerName, bool mustSign, unsigned int workers, unsigned int maxChunkRecords)
1290c1d2 61 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(std::move(signerName)),
3af419da 62 d_maxchunkrecords(maxChunkRecords), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
8267bd2c 63{
c2826d2e 64 d_rrsetToSign = make_unique<rrset_t>();
90ba52e0 65 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
8d59e8ce 66
8267bd2c
BH
67 if(!d_mustSign)
68 return;
a2aaa807 69
a2aaa807
BH
70 int fds[2];
71
8267bd2c 72 for(unsigned int n=0; n < d_numworkers; ++n) {
8d59e8ce
BH
73 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
74 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
3897b9e1 75 setCloseOnExec(fds[0]);
76 setCloseOnExec(fds[1]);
07019b51 77 d_threads[n] = std::thread(helperWorker, this, fds[1]);
3897b9e1 78 setNonBlocking(fds[0]);
8d59e8ce 79 d_sockets.push_back(fds[0]);
e3200e07 80 d_outstandings[fds[0]] = 0;
8267bd2c
BH
81 }
82}
83
84ChunkedSigningPipe::~ChunkedSigningPipe()
85{
86 if(!d_mustSign)
87 return;
07019b51 88
ef7cd021 89 for(int fd : d_sockets) {
a6ef6f7a
BH
90 close(fd); // this will trigger all threads to exit
91 }
07019b51
RG
92
93 for(auto& thread : d_threads) {
94 thread.join();
a6ef6f7a 95 }
7b217f2e 96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
8267bd2c
BH
97}
98
a2f3b9ec 99namespace {
b9bafae0 100bool
90ba52e0 101dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
a2f3b9ec 102{
0b0882f5 103 return std::tuple(a.dr.getContent()->getZoneRepresentation(), a.dr.d_ttl) < std::tuple(b.dr.getContent()->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
a2f3b9ec
BH
104}
105
90ba52e0 106bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
a2f3b9ec 107{
0b0882f5 108 return std::tuple(a.dr.getContent()->getZoneRepresentation(), a.dr.d_ttl) == std::tuple(b.dr.getContent()->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
a2f3b9ec
BH
109}
110}
111
112void ChunkedSigningPipe::dedupRRSet()
113{
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
116 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
117}
118
90ba52e0 119bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
8e9b7d99 120{
8d59e8ce
BH
121 ++d_submitted;
122 // check if we have a full RRSET to sign
90ba52e0 123 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
8e9b7d99 124 {
a2f3b9ec 125 dedupRRSet();
a2aaa807 126 sendRRSetToWorker();
8e9b7d99 127 }
a2aaa807 128 d_rrsetToSign->push_back(rr);
a2f3b9ec 129 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
8e9b7d99
BH
130}
131
a6ef6f7a
BH
132pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
133{
ac10b8c6 134 vector<pollfd> pfds;
e3200e07 135
403a3a42
O
136 for(int & socket : d_sockets) {
137 if(d_eof.count(socket))
ac10b8c6
BH
138 continue;
139 struct pollfd pfd;
140 memset(&pfd, 0, sizeof(pfd));
403a3a42 141 pfd.fd = socket;
ac10b8c6
BH
142 if(rd)
143 pfd.events |= POLLIN;
144 if(wr)
145 pfd.events |= POLLOUT;
146 pfds.push_back(pfd);
a6ef6f7a 147 }
e3200e07 148
a135720d 149 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
a6ef6f7a 150 if(res < 0)
335da0ba 151 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
a6ef6f7a 152 pair<vector<int>, vector<int> > vects;
d7f67000
RP
153 for(auto & pfd : pfds)
154 if(pfd.revents & POLLIN)
155 vects.first.push_back(pfd.fd);
156 else if(pfd.revents & POLLOUT)
157 vects.second.push_back(pfd.fd);
a6ef6f7a
BH
158
159 return vects;
160}
161
c2826d2e 162void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr<chunk_t>& signedChunk)
8d59e8ce
BH
163{
164 chunk_t::const_iterator from = signedChunk->begin();
165
166 while(from != signedChunk->end()) {
167 chunk_t& fillChunk = d_chunks.back();
8d59e8ce
BH
168 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
169
170 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
171
172 d_chunks.back().insert(fillChunk.end(), from , from + fit);
173 from+=fit;
c2826d2e 174
8d59e8ce
BH
175 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
176 d_chunks.push_back(chunk_t());
177 }
178}
a6ef6f7a 179
a2aaa807 180void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
8267bd2c
BH
181{
182 if(!d_mustSign) {
8d59e8ce 183 addSignedToChunks(d_rrsetToSign);
a2aaa807 184 d_rrsetToSign->clear();
8267bd2c
BH
185 return;
186 }
a2aaa807 187
8d59e8ce
BH
188 if(d_final && !d_outstanding) // nothing to do!
189 return;
190
a6ef6f7a
BH
191 bool wantRead, wantWrite;
192
193 wantWrite = !d_rrsetToSign->empty();
8d59e8ce 194 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
a6ef6f7a
BH
195
196 pair<vector<int>, vector<int> > rwVect;
197
e3200e07 198 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
a6ef6f7a
BH
199
200 if(wantWrite && !rwVect.second.empty()) {
d720eb8a 201 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
68a9a74c
OM
202 auto ptr = d_rrsetToSign.get();
203 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
eaf2d918
OM
204 // coverity[leaked_storage]
205 static_cast<void>(d_rrsetToSign.release());
c2826d2e 206 d_rrsetToSign = make_unique<rrset_t>();
e3200e07 207 d_outstandings[*rwVect.second.begin()]++;
8267bd2c 208 d_outstanding++;
a2aaa807 209 d_queued++;
8d59e8ce
BH
210 wantWrite=false;
211 }
212
213 if(wantRead) {
214 while(d_outstanding) {
ef7cd021 215 for(int fd : rwVect.first) {
8d59e8ce
BH
216 if(d_eof.count(fd))
217 continue;
218
219 while(d_outstanding) {
c2826d2e 220 chunk_t* chunk = nullptr;
8d59e8ce
BH
221 int res = readn(fd, &chunk, sizeof(chunk));
222 if(!res) {
e3200e07
RG
223 if (d_outstandings[fd] > 0) {
224 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
225 }
8d59e8ce
BH
226 d_eof.insert(fd);
227 break;
228 }
229 if(res < 0) {
230 if(errno != EAGAIN && errno != EINTR)
231 unixDie("Error reading signed chunk from thread");
232 else
233 break;
234 }
c2826d2e
RG
235
236 std::unique_ptr<rrset_t> chunkPtr(chunk);
237 chunk = nullptr;
8d59e8ce 238 --d_outstanding;
e3200e07 239 d_outstandings[fd]--;
8d59e8ce 240
c2826d2e 241 addSignedToChunks(chunkPtr);
8d59e8ce
BH
242 }
243 }
244 if(!d_outstanding || !d_final)
a6ef6f7a 245 break;
e3200e07 246 rwVect = waitForRW(true, false, -1); // wait for something to happen
a6ef6f7a 247 }
8267bd2c 248 }
a6ef6f7a 249
8d59e8ce 250 if(wantWrite) { // our optimization above failed, we now wait synchronously
e3200e07 251 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
d720eb8a 252 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
68a9a74c
OM
253 auto ptr = d_rrsetToSign.get();
254 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
eaf2d918
OM
255 // coverity[leaked_storage]
256 static_cast<void>(d_rrsetToSign.release());
c2826d2e 257 d_rrsetToSign = make_unique<rrset_t>();
e3200e07 258 d_outstandings[*rwVect.second.begin()]++;
8d59e8ce
BH
259 d_outstanding++;
260 d_queued++;
a6ef6f7a
BH
261 }
262
8267bd2c
BH
263}
264
e3200e07 265unsigned int ChunkedSigningPipe::getReady() const
a2aaa807
BH
266{
267 unsigned int sum=0;
90ba52e0 268 for(const auto& v : d_chunks) {
a2aaa807
BH
269 sum += v.size();
270 }
271 return sum;
272}
07019b51
RG
273
274void ChunkedSigningPipe::worker(int fd)
a6ef6f7a 275try
8267bd2c 276{
a6ef6f7a 277 UeberBackend db("key-only");
ea99d474 278 DNSSECKeeper dk(&db);
a6ef6f7a 279
c23d888d 280 chunk_t* chunk = nullptr;
8267bd2c
BH
281 int res;
282 for(;;) {
a6ef6f7a
BH
283 res = readn(fd, &chunk, sizeof(chunk));
284 if(!res)
8267bd2c 285 break;
a6ef6f7a
BH
286 if(res < 0)
287 unixDie("reading object pointer to sign from pdns");
c23d888d
RG
288 try {
289 set<DNSName> authSet;
290 authSet.insert(d_signer);
291 addRRSigs(dk, db, authSet, *chunk);
292 ++d_signed;
293
294 writen2(fd, &chunk, sizeof(chunk));
295 chunk = nullptr;
296 }
297 catch(const PDNSException& pe) {
298 delete chunk;
299 throw;
300 }
301 catch(const std::exception& e) {
302 delete chunk;
303 throw;
304 }
8267bd2c 305 }
a6ef6f7a
BH
306 close(fd);
307}
c23d888d 308catch(const PDNSException& pe)
97f1f8d9 309{
e6a9dde5 310 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
97f1f8d9
KM
311 close(fd);
312}
c23d888d 313catch(const std::exception& e)
a6ef6f7a 314{
e6a9dde5 315 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
a6ef6f7a 316 close(fd);
8267bd2c
BH
317}
318
8e9b7d99
BH
319void ChunkedSigningPipe::flushToSign()
320{
a2aaa807
BH
321 sendRRSetToWorker();
322 d_rrsetToSign->clear();
8e9b7d99
BH
323}
324
90ba52e0 325vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
8e9b7d99 326{
a6ef6f7a
BH
327 if(final && !d_final) {
328 // this means we should keep on reading until d_outstanding == 0
329 d_final = true;
8e9b7d99 330 flushToSign();
a6ef6f7a 331
ef7cd021 332 for(int fd : d_sockets) {
a6ef6f7a 333 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
7b217f2e 334 //cerr<<"shutdown of "<<fd<<endl;
a6ef6f7a 335 }
8267bd2c 336 }
a6ef6f7a
BH
337 if(d_final)
338 flushToSign(); // should help us wait
90ba52e0 339 vector<DNSZoneRecord> front=d_chunks.front();
a2aaa807
BH
340 d_chunks.pop_front();
341 if(d_chunks.empty())
90ba52e0 342 d_chunks.push_back(vector<DNSZoneRecord>());
e125fc69 343/* if(d_final && front.empty())
344 cerr<<"getChunk returning empty in final"<<endl; */
a2aaa807 345 return front;
8e9b7d99 346}
8d59e8ce 347
8d59e8ce 348