]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
Merge pull request #4318 from rgacogne/rec-anonymize-protobuf-ecs
[thirdparty/pdns.git] / pdns / signingpipe.cc
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include "signingpipe.hh"
5 #include "misc.hh"
6 #include <poll.h>
7
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <netinet/tcp.h>
11 #include <sched.h>
12
13 // deal with partial reads
14 namespace {
15 int readn(int fd, void* buffer, unsigned int len)
16 {
17 unsigned int pos=0;
18 int res;
19 for(;;) {
20 res = read(fd, (char*)buffer + pos, len - pos);
21 if(res == 0) {
22 if(pos)
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
24 else {
25 //cerr<<"Got decent EOF on "<<fd<<endl;
26 return 0;
27 }
28 }
29
30 if(res < 0) {
31 if(errno == EAGAIN || errno == EINTR) {
32 if(pos==0)
33 return -1;
34 waitForData(fd, -1);
35 continue;
36 }
37 unixDie("Reading from socket in Signing Pipe loop");
38 }
39
40 pos+=res;
41 if(pos == len)
42 break;
43 }
44 return len;
45 }
46 }
47
48
49 // used to pass information to the new thread
50 struct StartHelperStruct
51 {
52 StartHelperStruct(ChunkedSigningPipe* csp, int id, int fd) : d_csp(csp), d_id(id), d_fd(fd){}
53 ChunkedSigningPipe* d_csp;
54 int d_id;
55 int d_fd;
56 };
57
58 // used to launch the new thread
59 void* ChunkedSigningPipe::helperWorker(void* p)
60 try
61 {
62 StartHelperStruct shs=*(StartHelperStruct*)p;
63 delete (StartHelperStruct*)p;
64
65 shs.d_csp->worker(shs.d_id, shs.d_fd);
66 return 0;
67 }
68 catch(...) {
69 L<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
70 return 0;
71 }
72
73 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, const string& servers, unsigned int workers)
74 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
75 d_maxchunkrecords(100), d_tids(d_numworkers), d_mustSign(mustSign), d_final(false)
76 {
77 d_rrsetToSign = new rrset_t;
78 d_chunks.push_back(vector<DNSResourceRecord>()); // load an empty chunk
79
80 if(!d_mustSign)
81 return;
82
83 int fds[2];
84
85 for(unsigned int n=0; n < d_numworkers; ++n) {
86 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
87 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
88 setCloseOnExec(fds[0]);
89 setCloseOnExec(fds[1]);
90 pthread_create(&d_tids[n], 0, helperWorker, (void*) new StartHelperStruct(this, n, fds[1]));
91 setNonBlocking(fds[0]);
92 d_sockets.push_back(fds[0]);
93 }
94 }
95
96 ChunkedSigningPipe::~ChunkedSigningPipe()
97 {
98 delete d_rrsetToSign;
99 if(!d_mustSign)
100 return;
101 for(int fd : d_sockets) {
102 close(fd); // this will trigger all threads to exit
103 }
104
105 void* res;
106 for(pthread_t& tid : d_tids) {
107 pthread_join(tid, &res);
108 }
109 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
110 }
111
112 namespace {
113 bool
114 dedupLessThan(const DNSResourceRecord& a, const DNSResourceRecord &b)
115 {
116 return (tie(a.content, a.ttl) < tie(b.content, b.ttl));
117 }
118
119 bool dedupEqual(const DNSResourceRecord& a, const DNSResourceRecord &b)
120 {
121 return(tie(a.content, a.ttl) == tie(b.content, b.ttl));
122 }
123 }
124
125 void ChunkedSigningPipe::dedupRRSet()
126 {
127 // our set contains contains records for one type and one name, but might not be sorted otherwise
128 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
129 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
130 }
131
132 bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr)
133 {
134 ++d_submitted;
135 // check if we have a full RRSET to sign
136 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->qtype.getCode() != rr.qtype.getCode() || d_rrsetToSign->begin()->qname != rr.qname))
137 {
138 dedupRRSet();
139 sendRRSetToWorker();
140 }
141 d_rrsetToSign->push_back(rr);
142 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
143 }
144
145 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
146 {
147 vector<pollfd> pfds;
148
149 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
150 if(d_eof.count(d_sockets[n]))
151 continue;
152 struct pollfd pfd;
153 memset(&pfd, 0, sizeof(pfd));
154 pfd.fd = d_sockets[n];
155 if(rd)
156 pfd.events |= POLLIN;
157 if(wr)
158 pfd.events |= POLLOUT;
159 pfds.push_back(pfd);
160 }
161
162 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
163 if(res < 0)
164 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
165 pair<vector<int>, vector<int> > vects;
166 for(unsigned int n = 0; n < pfds.size(); ++n)
167 if(pfds[n].revents & POLLIN)
168 vects.first.push_back(pfds[n].fd);
169 else if(pfds[n].revents & POLLOUT)
170 vects.second.push_back(pfds[n].fd);
171
172 return vects;
173 }
174
175 void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
176 {
177 chunk_t::const_iterator from = signedChunk->begin();
178
179 while(from != signedChunk->end()) {
180 chunk_t& fillChunk = d_chunks.back();
181
182 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
183
184 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
185
186 d_chunks.back().insert(fillChunk.end(), from , from + fit);
187 from+=fit;
188
189 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
190 d_chunks.push_back(chunk_t());
191 }
192 }
193
194 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
195 {
196 if(!d_mustSign) {
197 addSignedToChunks(d_rrsetToSign);
198 d_rrsetToSign->clear();
199 return;
200 }
201
202 if(d_final && !d_outstanding) // nothing to do!
203 return;
204
205 bool wantRead, wantWrite;
206
207 wantWrite = !d_rrsetToSign->empty();
208 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
209
210 pair<vector<int>, vector<int> > rwVect;
211
212 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
213
214 if(wantWrite && !rwVect.second.empty()) {
215 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
216 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
217 d_rrsetToSign = new rrset_t;
218 d_outstanding++;
219 d_queued++;
220 wantWrite=false;
221 }
222
223 if(wantRead) {
224 while(d_outstanding) {
225 chunk_t* chunk;
226
227 for(int fd : rwVect.first) {
228 if(d_eof.count(fd))
229 continue;
230
231 while(d_outstanding) {
232 int res = readn(fd, &chunk, sizeof(chunk));
233 if(!res) {
234 d_eof.insert(fd);
235 break;
236 }
237 if(res < 0) {
238 if(errno != EAGAIN && errno != EINTR)
239 unixDie("Error reading signed chunk from thread");
240 else
241 break;
242 }
243
244 --d_outstanding;
245
246 addSignedToChunks(chunk);
247
248 delete chunk;
249 }
250 }
251 if(!d_outstanding || !d_final)
252 break;
253 rwVect = waitForRW(1, 0, -1); // wait for something to happen
254 }
255 }
256
257 if(wantWrite) { // our optimization above failed, we now wait synchronously
258 rwVect = waitForRW(0, wantWrite, -1); // wait for something to happen
259 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
260 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
261 d_rrsetToSign = new rrset_t;
262 d_outstanding++;
263 d_queued++;
264 }
265
266 }
267
268 unsigned int ChunkedSigningPipe::getReady()
269 {
270 unsigned int sum=0;
271 for(const std::vector<DNSResourceRecord>& v : d_chunks) {
272 sum += v.size();
273 }
274 return sum;
275 }
276 void ChunkedSigningPipe::worker(int id, int fd)
277 try
278 {
279 DNSSECKeeper dk;
280 UeberBackend db("key-only");
281
282 chunk_t* chunk;
283 int res;
284 for(;;) {
285 res = readn(fd, &chunk, sizeof(chunk));
286 if(!res)
287 break;
288 if(res < 0)
289 unixDie("reading object pointer to sign from pdns");
290 set<DNSName> authSet;
291 authSet.insert(d_signer);
292 addRRSigs(dk, db, authSet, *chunk);
293 ++d_signed;
294
295 writen2(fd, &chunk, sizeof(chunk));
296 }
297 close(fd);
298 }
299 catch(PDNSException& pe)
300 {
301 L<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
302 close(fd);
303 }
304 catch(std::exception& e)
305 {
306 L<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
307 close(fd);
308 }
309
310 void ChunkedSigningPipe::flushToSign()
311 {
312 sendRRSetToWorker();
313 d_rrsetToSign->clear();
314 }
315
316 vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final)
317 {
318 if(final && !d_final) {
319 // this means we should keep on reading until d_outstanding == 0
320 d_final = true;
321 flushToSign();
322
323 for(int fd : d_sockets) {
324 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
325 //cerr<<"shutdown of "<<fd<<endl;
326 }
327 }
328 if(d_final)
329 flushToSign(); // should help us wait
330 vector<DNSResourceRecord> front=d_chunks.front();
331 d_chunks.pop_front();
332 if(d_chunks.empty())
333 d_chunks.push_back(vector<DNSResourceRecord>());
334 /* if(d_final && front.empty())
335 cerr<<"getChunk returning empty in final"<<endl; */
336 return front;
337 }
338
339 #if 0
340
341 ServiceTuple st;
342 ComboAddress remote;
343 if(!servers.empty()) {
344 st.port=2000;
345 parseService(servers, st);
346 remote=ComboAddress(st.host, st.port);
347 }
348
349 ///
350 if(!servers.empty()) {
351 fds[0] = socket(AF_INET, SOCK_STREAM, 0);
352 fds[1] = -1;
353
354 if(connect(fds[0], (struct sockaddr*)&remote, remote.getSocklen()) < 0)
355 unixDie("Connecting to signing server");
356 }
357 else {
358 /////
359 signal(SIGCHLD, SIG_IGN);
360 if(!fork()) { // child
361 dup2(fds[1], 0);
362 execl("./pdnsutil", "./pdnsutil", "--config-dir=./", "signing-slave", NULL);
363 // helperWorker(new StartHelperStruct(this, n));
364 return;
365 }
366 else
367 close(fds[1]);
368 #endif
369
370 #if 0
371 bool readLStringFromSocket(int fd, string& msg)
372 {
373 msg.clear();
374 uint32_t len;
375 if(!readn(fd, &len, sizeof(len)))
376 return false;
377
378 len = ntohl(len);
379
380 scoped_array<char> buf(new char[len]);
381 readn(fd, buf.get(), len);
382
383 msg.assign(buf.get(), len);
384 return true;
385 }
386 void writeLStringToSocket(int fd, const string& msg)
387 {
388 string realmsg;
389 uint32_t len = htonl(msg.length());
390 string tot((char*)&len, 4);
391 tot+=msg;
392
393 writen2(fd, tot.c_str(), tot.length());
394 }
395
396 #endif
397