]> git.ipfire.org Git - thirdparty/pdns.git/blame_incremental - pdns/signingpipe.cc
rec: ensure correct service user on debian
[thirdparty/pdns.git] / pdns / signingpipe.cc
... / ...
CommitLineData
1#ifdef HAVE_CONFIG_H
2#include "config.h"
3#endif
4#include "signingpipe.hh"
5#include "misc.hh"
6#include <poll.h>
7
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <netinet/tcp.h>
11#include <sched.h>
12
13// deal with partial reads
14namespace {
15int readn(int fd, void* buffer, unsigned int len)
16{
17 unsigned int pos=0;
18 int res;
19 for(;;) {
20 res = read(fd, (char*)buffer + pos, len - pos);
21 if(res == 0) {
22 if(pos)
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
24 else {
25 //cerr<<"Got decent EOF on "<<fd<<endl;
26 return 0;
27 }
28 }
29
30 if(res < 0) {
31 if(errno == EAGAIN || errno == EINTR) {
32 if(pos==0)
33 return -1;
34 waitForData(fd, -1);
35 continue;
36 }
37 unixDie("Reading from socket in Signing Pipe loop");
38 }
39
40 pos+=res;
41 if(pos == len)
42 break;
43 }
44 return len;
45}
46}
47
48void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
49try {
50 csp->worker(fd);
51 return nullptr;
52}
53catch(...) {
54 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
55 return nullptr;
56}
57
58ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
59 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
60 d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
61{
62 d_rrsetToSign = new rrset_t;
63 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
64
65 if(!d_mustSign)
66 return;
67
68 int fds[2];
69
70 for(unsigned int n=0; n < d_numworkers; ++n) {
71 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
72 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
73 setCloseOnExec(fds[0]);
74 setCloseOnExec(fds[1]);
75 d_threads[n] = std::thread(helperWorker, this, fds[1]);
76 setNonBlocking(fds[0]);
77 d_sockets.push_back(fds[0]);
78 d_outstandings[fds[0]] = 0;
79 }
80}
81
82ChunkedSigningPipe::~ChunkedSigningPipe()
83{
84 delete d_rrsetToSign;
85
86 if(!d_mustSign)
87 return;
88
89 for(int fd : d_sockets) {
90 close(fd); // this will trigger all threads to exit
91 }
92
93 for(auto& thread : d_threads) {
94 thread.join();
95 }
96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
97}
98
99namespace {
100bool
101dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
102{
103 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
104}
105
106bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
107{
108 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
109}
110}
111
112void ChunkedSigningPipe::dedupRRSet()
113{
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
116 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
117}
118
119bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
120{
121 ++d_submitted;
122 // check if we have a full RRSET to sign
123 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
124 {
125 dedupRRSet();
126 sendRRSetToWorker();
127 }
128 d_rrsetToSign->push_back(rr);
129 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
130}
131
132pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
133{
134 vector<pollfd> pfds;
135
136 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
137 if(d_eof.count(d_sockets[n]))
138 continue;
139 struct pollfd pfd;
140 memset(&pfd, 0, sizeof(pfd));
141 pfd.fd = d_sockets[n];
142 if(rd)
143 pfd.events |= POLLIN;
144 if(wr)
145 pfd.events |= POLLOUT;
146 pfds.push_back(pfd);
147 }
148
149 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
150 if(res < 0)
151 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
152 pair<vector<int>, vector<int> > vects;
153 for(unsigned int n = 0; n < pfds.size(); ++n)
154 if(pfds[n].revents & POLLIN)
155 vects.first.push_back(pfds[n].fd);
156 else if(pfds[n].revents & POLLOUT)
157 vects.second.push_back(pfds[n].fd);
158
159 return vects;
160}
161
162void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
163{
164 chunk_t::const_iterator from = signedChunk->begin();
165
166 while(from != signedChunk->end()) {
167 chunk_t& fillChunk = d_chunks.back();
168
169 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
170
171 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
172
173 d_chunks.back().insert(fillChunk.end(), from , from + fit);
174 from+=fit;
175
176 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
177 d_chunks.push_back(chunk_t());
178 }
179}
180
181void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
182{
183 if(!d_mustSign) {
184 addSignedToChunks(d_rrsetToSign);
185 d_rrsetToSign->clear();
186 return;
187 }
188
189 if(d_final && !d_outstanding) // nothing to do!
190 return;
191
192 bool wantRead, wantWrite;
193
194 wantWrite = !d_rrsetToSign->empty();
195 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
196
197 pair<vector<int>, vector<int> > rwVect;
198
199 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
200
201 if(wantWrite && !rwVect.second.empty()) {
202 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
203 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
204 d_rrsetToSign = new rrset_t;
205 d_outstandings[*rwVect.second.begin()]++;
206 d_outstanding++;
207 d_queued++;
208 wantWrite=false;
209 }
210
211 if(wantRead) {
212 while(d_outstanding) {
213 chunk_t* chunk;
214
215 for(int fd : rwVect.first) {
216 if(d_eof.count(fd))
217 continue;
218
219 while(d_outstanding) {
220 int res = readn(fd, &chunk, sizeof(chunk));
221 if(!res) {
222 if (d_outstandings[fd] > 0) {
223 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
224 }
225 d_eof.insert(fd);
226 break;
227 }
228 if(res < 0) {
229 if(errno != EAGAIN && errno != EINTR)
230 unixDie("Error reading signed chunk from thread");
231 else
232 break;
233 }
234
235 --d_outstanding;
236 d_outstandings[fd]--;
237
238 addSignedToChunks(chunk);
239
240 delete chunk;
241 }
242 }
243 if(!d_outstanding || !d_final)
244 break;
245 rwVect = waitForRW(true, false, -1); // wait for something to happen
246 }
247 }
248
249 if(wantWrite) { // our optimization above failed, we now wait synchronously
250 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
251 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
252 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
253 d_rrsetToSign = new rrset_t;
254 d_outstandings[*rwVect.second.begin()]++;
255 d_outstanding++;
256 d_queued++;
257 }
258
259}
260
261unsigned int ChunkedSigningPipe::getReady() const
262{
263 unsigned int sum=0;
264 for(const auto& v : d_chunks) {
265 sum += v.size();
266 }
267 return sum;
268}
269
270void ChunkedSigningPipe::worker(int fd)
271try
272{
273 UeberBackend db("key-only");
274 DNSSECKeeper dk(&db);
275
276 chunk_t* chunk = nullptr;
277 int res;
278 for(;;) {
279 res = readn(fd, &chunk, sizeof(chunk));
280 if(!res)
281 break;
282 if(res < 0)
283 unixDie("reading object pointer to sign from pdns");
284 try {
285 set<DNSName> authSet;
286 authSet.insert(d_signer);
287 addRRSigs(dk, db, authSet, *chunk);
288 ++d_signed;
289
290 writen2(fd, &chunk, sizeof(chunk));
291 chunk = nullptr;
292 }
293 catch(const PDNSException& pe) {
294 delete chunk;
295 throw;
296 }
297 catch(const std::exception& e) {
298 delete chunk;
299 throw;
300 }
301 }
302 close(fd);
303}
304catch(const PDNSException& pe)
305{
306 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
307 close(fd);
308}
309catch(const std::exception& e)
310{
311 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
312 close(fd);
313}
314
315void ChunkedSigningPipe::flushToSign()
316{
317 sendRRSetToWorker();
318 d_rrsetToSign->clear();
319}
320
321vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
322{
323 if(final && !d_final) {
324 // this means we should keep on reading until d_outstanding == 0
325 d_final = true;
326 flushToSign();
327
328 for(int fd : d_sockets) {
329 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
330 //cerr<<"shutdown of "<<fd<<endl;
331 }
332 }
333 if(d_final)
334 flushToSign(); // should help us wait
335 vector<DNSZoneRecord> front=d_chunks.front();
336 d_chunks.pop_front();
337 if(d_chunks.empty())
338 d_chunks.push_back(vector<DNSZoneRecord>());
339/* if(d_final && front.empty())
340 cerr<<"getChunk returning empty in final"<<endl; */
341 return front;
342}
343
344