]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
rec: ensure correct service user on debian
[thirdparty/pdns.git] / pdns / signingpipe.cc
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include "signingpipe.hh"
5 #include "misc.hh"
6 #include <poll.h>
7
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <netinet/tcp.h>
11 #include <sched.h>
12
13 // deal with partial reads
14 namespace {
15 int readn(int fd, void* buffer, unsigned int len)
16 {
17 unsigned int pos=0;
18 int res;
19 for(;;) {
20 res = read(fd, (char*)buffer + pos, len - pos);
21 if(res == 0) {
22 if(pos)
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
24 else {
25 //cerr<<"Got decent EOF on "<<fd<<endl;
26 return 0;
27 }
28 }
29
30 if(res < 0) {
31 if(errno == EAGAIN || errno == EINTR) {
32 if(pos==0)
33 return -1;
34 waitForData(fd, -1);
35 continue;
36 }
37 unixDie("Reading from socket in Signing Pipe loop");
38 }
39
40 pos+=res;
41 if(pos == len)
42 break;
43 }
44 return len;
45 }
46 }
47
48 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
49 try {
50 csp->worker(fd);
51 return nullptr;
52 }
53 catch(...) {
54 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
55 return nullptr;
56 }
57
58 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
59 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
60 d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
61 {
62 d_rrsetToSign = new rrset_t;
63 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
64
65 if(!d_mustSign)
66 return;
67
68 int fds[2];
69
70 for(unsigned int n=0; n < d_numworkers; ++n) {
71 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
72 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
73 setCloseOnExec(fds[0]);
74 setCloseOnExec(fds[1]);
75 d_threads[n] = std::thread(helperWorker, this, fds[1]);
76 setNonBlocking(fds[0]);
77 d_sockets.push_back(fds[0]);
78 d_outstandings[fds[0]] = 0;
79 }
80 }
81
82 ChunkedSigningPipe::~ChunkedSigningPipe()
83 {
84 delete d_rrsetToSign;
85
86 if(!d_mustSign)
87 return;
88
89 for(int fd : d_sockets) {
90 close(fd); // this will trigger all threads to exit
91 }
92
93 for(auto& thread : d_threads) {
94 thread.join();
95 }
96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
97 }
98
99 namespace {
100 bool
101 dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
102 {
103 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
104 }
105
106 bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
107 {
108 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
109 }
110 }
111
112 void ChunkedSigningPipe::dedupRRSet()
113 {
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
116 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
117 }
118
119 bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
120 {
121 ++d_submitted;
122 // check if we have a full RRSET to sign
123 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
124 {
125 dedupRRSet();
126 sendRRSetToWorker();
127 }
128 d_rrsetToSign->push_back(rr);
129 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
130 }
131
132 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
133 {
134 vector<pollfd> pfds;
135
136 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
137 if(d_eof.count(d_sockets[n]))
138 continue;
139 struct pollfd pfd;
140 memset(&pfd, 0, sizeof(pfd));
141 pfd.fd = d_sockets[n];
142 if(rd)
143 pfd.events |= POLLIN;
144 if(wr)
145 pfd.events |= POLLOUT;
146 pfds.push_back(pfd);
147 }
148
149 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
150 if(res < 0)
151 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
152 pair<vector<int>, vector<int> > vects;
153 for(unsigned int n = 0; n < pfds.size(); ++n)
154 if(pfds[n].revents & POLLIN)
155 vects.first.push_back(pfds[n].fd);
156 else if(pfds[n].revents & POLLOUT)
157 vects.second.push_back(pfds[n].fd);
158
159 return vects;
160 }
161
162 void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
163 {
164 chunk_t::const_iterator from = signedChunk->begin();
165
166 while(from != signedChunk->end()) {
167 chunk_t& fillChunk = d_chunks.back();
168
169 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
170
171 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
172
173 d_chunks.back().insert(fillChunk.end(), from , from + fit);
174 from+=fit;
175
176 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
177 d_chunks.push_back(chunk_t());
178 }
179 }
180
181 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
182 {
183 if(!d_mustSign) {
184 addSignedToChunks(d_rrsetToSign);
185 d_rrsetToSign->clear();
186 return;
187 }
188
189 if(d_final && !d_outstanding) // nothing to do!
190 return;
191
192 bool wantRead, wantWrite;
193
194 wantWrite = !d_rrsetToSign->empty();
195 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
196
197 pair<vector<int>, vector<int> > rwVect;
198
199 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
200
201 if(wantWrite && !rwVect.second.empty()) {
202 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
203 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
204 d_rrsetToSign = new rrset_t;
205 d_outstandings[*rwVect.second.begin()]++;
206 d_outstanding++;
207 d_queued++;
208 wantWrite=false;
209 }
210
211 if(wantRead) {
212 while(d_outstanding) {
213 chunk_t* chunk;
214
215 for(int fd : rwVect.first) {
216 if(d_eof.count(fd))
217 continue;
218
219 while(d_outstanding) {
220 int res = readn(fd, &chunk, sizeof(chunk));
221 if(!res) {
222 if (d_outstandings[fd] > 0) {
223 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
224 }
225 d_eof.insert(fd);
226 break;
227 }
228 if(res < 0) {
229 if(errno != EAGAIN && errno != EINTR)
230 unixDie("Error reading signed chunk from thread");
231 else
232 break;
233 }
234
235 --d_outstanding;
236 d_outstandings[fd]--;
237
238 addSignedToChunks(chunk);
239
240 delete chunk;
241 }
242 }
243 if(!d_outstanding || !d_final)
244 break;
245 rwVect = waitForRW(true, false, -1); // wait for something to happen
246 }
247 }
248
249 if(wantWrite) { // our optimization above failed, we now wait synchronously
250 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
251 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
252 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
253 d_rrsetToSign = new rrset_t;
254 d_outstandings[*rwVect.second.begin()]++;
255 d_outstanding++;
256 d_queued++;
257 }
258
259 }
260
261 unsigned int ChunkedSigningPipe::getReady() const
262 {
263 unsigned int sum=0;
264 for(const auto& v : d_chunks) {
265 sum += v.size();
266 }
267 return sum;
268 }
269
270 void ChunkedSigningPipe::worker(int fd)
271 try
272 {
273 UeberBackend db("key-only");
274 DNSSECKeeper dk(&db);
275
276 chunk_t* chunk = nullptr;
277 int res;
278 for(;;) {
279 res = readn(fd, &chunk, sizeof(chunk));
280 if(!res)
281 break;
282 if(res < 0)
283 unixDie("reading object pointer to sign from pdns");
284 try {
285 set<DNSName> authSet;
286 authSet.insert(d_signer);
287 addRRSigs(dk, db, authSet, *chunk);
288 ++d_signed;
289
290 writen2(fd, &chunk, sizeof(chunk));
291 chunk = nullptr;
292 }
293 catch(const PDNSException& pe) {
294 delete chunk;
295 throw;
296 }
297 catch(const std::exception& e) {
298 delete chunk;
299 throw;
300 }
301 }
302 close(fd);
303 }
304 catch(const PDNSException& pe)
305 {
306 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
307 close(fd);
308 }
309 catch(const std::exception& e)
310 {
311 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
312 close(fd);
313 }
314
315 void ChunkedSigningPipe::flushToSign()
316 {
317 sendRRSetToWorker();
318 d_rrsetToSign->clear();
319 }
320
321 vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
322 {
323 if(final && !d_final) {
324 // this means we should keep on reading until d_outstanding == 0
325 d_final = true;
326 flushToSign();
327
328 for(int fd : d_sockets) {
329 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
330 //cerr<<"shutdown of "<<fd<<endl;
331 }
332 }
333 if(d_final)
334 flushToSign(); // should help us wait
335 vector<DNSZoneRecord> front=d_chunks.front();
336 d_chunks.pop_front();
337 if(d_chunks.empty())
338 d_chunks.push_back(vector<DNSZoneRecord>());
339 /* if(d_final && front.empty())
340 cerr<<"getChunk returning empty in final"<<endl; */
341 return front;
342 }
343
344