]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/signingpipe.cc
Merge pull request #9672 from neheb/clng
[thirdparty/pdns.git] / pdns / signingpipe.cc
CommitLineData
870a0fe4
AT
1#ifdef HAVE_CONFIG_H
2#include "config.h"
3#endif
8e9b7d99 4#include "signingpipe.hh"
a6ef6f7a 5#include "misc.hh"
d720eb8a 6#include "dns_random.hh"
a6ef6f7a 7#include <poll.h>
fa8fd4d2 8
a6ef6f7a
BH
9#include <sys/socket.h>
10#include <netinet/in.h>
11#include <netinet/tcp.h>
12#include <sched.h>
8e9b7d99 13
8d59e8ce
BH
14// deal with partial reads
15namespace {
16int readn(int fd, void* buffer, unsigned int len)
17{
18 unsigned int pos=0;
19 int res;
20 for(;;) {
21 res = read(fd, (char*)buffer + pos, len - pos);
22 if(res == 0) {
23 if(pos)
24 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 else {
7b217f2e 26 //cerr<<"Got decent EOF on "<<fd<<endl;
8d59e8ce
BH
27 return 0;
28 }
29 }
30
31 if(res < 0) {
32 if(errno == EAGAIN || errno == EINTR) {
33 if(pos==0)
34 return -1;
35 waitForData(fd, -1);
36 continue;
37 }
38 unixDie("Reading from socket in Signing Pipe loop");
39 }
40
41 pos+=res;
42 if(pos == len)
43 break;
44 }
45 return len;
46}
47}
48
07019b51
RG
49void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
50try {
51 csp->worker(fd);
52 return nullptr;
8267bd2c 53}
97f1f8d9 54catch(...) {
e6a9dde5 55 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
07019b51 56 return nullptr;
bec14a20 57}
8267bd2c 58
1290c1d2
RP
59ChunkedSigningPipe::ChunkedSigningPipe(DNSName signerName, bool mustSign, unsigned int workers)
60 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(std::move(signerName)),
07019b51 61 d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
8267bd2c 62{
c2826d2e 63 d_rrsetToSign = make_unique<rrset_t>();
90ba52e0 64 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
8d59e8ce 65
8267bd2c
BH
66 if(!d_mustSign)
67 return;
a2aaa807 68
a2aaa807
BH
69 int fds[2];
70
8267bd2c 71 for(unsigned int n=0; n < d_numworkers; ++n) {
8d59e8ce
BH
72 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
73 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
3897b9e1 74 setCloseOnExec(fds[0]);
75 setCloseOnExec(fds[1]);
07019b51 76 d_threads[n] = std::thread(helperWorker, this, fds[1]);
3897b9e1 77 setNonBlocking(fds[0]);
8d59e8ce 78 d_sockets.push_back(fds[0]);
e3200e07 79 d_outstandings[fds[0]] = 0;
8267bd2c
BH
80 }
81}
82
83ChunkedSigningPipe::~ChunkedSigningPipe()
84{
85 if(!d_mustSign)
86 return;
07019b51 87
ef7cd021 88 for(int fd : d_sockets) {
a6ef6f7a
BH
89 close(fd); // this will trigger all threads to exit
90 }
07019b51
RG
91
92 for(auto& thread : d_threads) {
93 thread.join();
a6ef6f7a 94 }
7b217f2e 95 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
8267bd2c
BH
96}
97
a2f3b9ec 98namespace {
b9bafae0 99bool
90ba52e0 100dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
a2f3b9ec 101{
90ba52e0 102 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
a2f3b9ec
BH
103}
104
90ba52e0 105bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
a2f3b9ec 106{
90ba52e0 107 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
a2f3b9ec
BH
108}
109}
110
111void ChunkedSigningPipe::dedupRRSet()
112{
113 // our set contains contains records for one type and one name, but might not be sorted otherwise
114 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
115 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
116}
117
90ba52e0 118bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
8e9b7d99 119{
8d59e8ce
BH
120 ++d_submitted;
121 // check if we have a full RRSET to sign
90ba52e0 122 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
8e9b7d99 123 {
a2f3b9ec 124 dedupRRSet();
a2aaa807 125 sendRRSetToWorker();
8e9b7d99 126 }
a2aaa807 127 d_rrsetToSign->push_back(rr);
a2f3b9ec 128 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
8e9b7d99
BH
129}
130
a6ef6f7a
BH
131pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
132{
ac10b8c6 133 vector<pollfd> pfds;
e3200e07 134
d7f67000
RP
135 for(int & d_socket : d_sockets) {
136 if(d_eof.count(d_socket))
ac10b8c6
BH
137 continue;
138 struct pollfd pfd;
139 memset(&pfd, 0, sizeof(pfd));
d7f67000 140 pfd.fd = d_socket;
ac10b8c6
BH
141 if(rd)
142 pfd.events |= POLLIN;
143 if(wr)
144 pfd.events |= POLLOUT;
145 pfds.push_back(pfd);
a6ef6f7a 146 }
e3200e07 147
a135720d 148 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
a6ef6f7a 149 if(res < 0)
335da0ba 150 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
a6ef6f7a 151 pair<vector<int>, vector<int> > vects;
d7f67000
RP
152 for(auto & pfd : pfds)
153 if(pfd.revents & POLLIN)
154 vects.first.push_back(pfd.fd);
155 else if(pfd.revents & POLLOUT)
156 vects.second.push_back(pfd.fd);
a6ef6f7a
BH
157
158 return vects;
159}
160
c2826d2e 161void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr<chunk_t>& signedChunk)
8d59e8ce
BH
162{
163 chunk_t::const_iterator from = signedChunk->begin();
164
165 while(from != signedChunk->end()) {
166 chunk_t& fillChunk = d_chunks.back();
8d59e8ce
BH
167 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
168
169 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
170
171 d_chunks.back().insert(fillChunk.end(), from , from + fit);
172 from+=fit;
c2826d2e 173
8d59e8ce
BH
174 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
175 d_chunks.push_back(chunk_t());
176 }
177}
a6ef6f7a 178
a2aaa807 179void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
8267bd2c
BH
180{
181 if(!d_mustSign) {
8d59e8ce 182 addSignedToChunks(d_rrsetToSign);
a2aaa807 183 d_rrsetToSign->clear();
8267bd2c
BH
184 return;
185 }
a2aaa807 186
8d59e8ce
BH
187 if(d_final && !d_outstanding) // nothing to do!
188 return;
189
a6ef6f7a
BH
190 bool wantRead, wantWrite;
191
192 wantWrite = !d_rrsetToSign->empty();
8d59e8ce 193 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
a6ef6f7a
BH
194
195 pair<vector<int>, vector<int> > rwVect;
196
e3200e07 197 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
a6ef6f7a
BH
198
199 if(wantWrite && !rwVect.second.empty()) {
d720eb8a 200 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
c2826d2e
RG
201 auto ptr = d_rrsetToSign.release();
202 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
203 d_rrsetToSign = make_unique<rrset_t>();
e3200e07 204 d_outstandings[*rwVect.second.begin()]++;
8267bd2c 205 d_outstanding++;
a2aaa807 206 d_queued++;
8d59e8ce
BH
207 wantWrite=false;
208 }
209
210 if(wantRead) {
211 while(d_outstanding) {
ef7cd021 212 for(int fd : rwVect.first) {
8d59e8ce
BH
213 if(d_eof.count(fd))
214 continue;
215
216 while(d_outstanding) {
c2826d2e 217 chunk_t* chunk = nullptr;
8d59e8ce
BH
218 int res = readn(fd, &chunk, sizeof(chunk));
219 if(!res) {
e3200e07
RG
220 if (d_outstandings[fd] > 0) {
221 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
222 }
8d59e8ce
BH
223 d_eof.insert(fd);
224 break;
225 }
226 if(res < 0) {
227 if(errno != EAGAIN && errno != EINTR)
228 unixDie("Error reading signed chunk from thread");
229 else
230 break;
231 }
c2826d2e
RG
232
233 std::unique_ptr<rrset_t> chunkPtr(chunk);
234 chunk = nullptr;
8d59e8ce 235 --d_outstanding;
e3200e07 236 d_outstandings[fd]--;
8d59e8ce 237
c2826d2e 238 addSignedToChunks(chunkPtr);
8d59e8ce
BH
239 }
240 }
241 if(!d_outstanding || !d_final)
a6ef6f7a 242 break;
e3200e07 243 rwVect = waitForRW(true, false, -1); // wait for something to happen
a6ef6f7a 244 }
8267bd2c 245 }
a6ef6f7a 246
8d59e8ce 247 if(wantWrite) { // our optimization above failed, we now wait synchronously
e3200e07 248 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
d720eb8a 249 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
c2826d2e
RG
250 auto ptr = d_rrsetToSign.release();
251 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
252 d_rrsetToSign = make_unique<rrset_t>();
e3200e07 253 d_outstandings[*rwVect.second.begin()]++;
8d59e8ce
BH
254 d_outstanding++;
255 d_queued++;
a6ef6f7a
BH
256 }
257
8267bd2c
BH
258}
259
e3200e07 260unsigned int ChunkedSigningPipe::getReady() const
a2aaa807
BH
261{
262 unsigned int sum=0;
90ba52e0 263 for(const auto& v : d_chunks) {
a2aaa807
BH
264 sum += v.size();
265 }
266 return sum;
267}
07019b51
RG
268
269void ChunkedSigningPipe::worker(int fd)
a6ef6f7a 270try
8267bd2c 271{
a6ef6f7a 272 UeberBackend db("key-only");
ea99d474 273 DNSSECKeeper dk(&db);
a6ef6f7a 274
c23d888d 275 chunk_t* chunk = nullptr;
8267bd2c
BH
276 int res;
277 for(;;) {
a6ef6f7a
BH
278 res = readn(fd, &chunk, sizeof(chunk));
279 if(!res)
8267bd2c 280 break;
a6ef6f7a
BH
281 if(res < 0)
282 unixDie("reading object pointer to sign from pdns");
c23d888d
RG
283 try {
284 set<DNSName> authSet;
285 authSet.insert(d_signer);
286 addRRSigs(dk, db, authSet, *chunk);
287 ++d_signed;
288
289 writen2(fd, &chunk, sizeof(chunk));
290 chunk = nullptr;
291 }
292 catch(const PDNSException& pe) {
293 delete chunk;
294 throw;
295 }
296 catch(const std::exception& e) {
297 delete chunk;
298 throw;
299 }
8267bd2c 300 }
a6ef6f7a
BH
301 close(fd);
302}
c23d888d 303catch(const PDNSException& pe)
97f1f8d9 304{
e6a9dde5 305 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
97f1f8d9
KM
306 close(fd);
307}
c23d888d 308catch(const std::exception& e)
a6ef6f7a 309{
e6a9dde5 310 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
a6ef6f7a 311 close(fd);
8267bd2c
BH
312}
313
8e9b7d99
BH
314void ChunkedSigningPipe::flushToSign()
315{
a2aaa807
BH
316 sendRRSetToWorker();
317 d_rrsetToSign->clear();
8e9b7d99
BH
318}
319
90ba52e0 320vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
8e9b7d99 321{
a6ef6f7a
BH
322 if(final && !d_final) {
323 // this means we should keep on reading until d_outstanding == 0
324 d_final = true;
8e9b7d99 325 flushToSign();
a6ef6f7a 326
ef7cd021 327 for(int fd : d_sockets) {
a6ef6f7a 328 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
7b217f2e 329 //cerr<<"shutdown of "<<fd<<endl;
a6ef6f7a 330 }
8267bd2c 331 }
a6ef6f7a
BH
332 if(d_final)
333 flushToSign(); // should help us wait
90ba52e0 334 vector<DNSZoneRecord> front=d_chunks.front();
a2aaa807
BH
335 d_chunks.pop_front();
336 if(d_chunks.empty())
90ba52e0 337 d_chunks.push_back(vector<DNSZoneRecord>());
e125fc69 338/* if(d_final && front.empty())
339 cerr<<"getChunk returning empty in final"<<endl; */
a2aaa807 340 return front;
8e9b7d99 341}
8d59e8ce 342
8d59e8ce 343