]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/signingpipe.cc
rec: ensure correct service user on debian
[thirdparty/pdns.git] / pdns / signingpipe.cc
CommitLineData
870a0fe4
AT
1#ifdef HAVE_CONFIG_H
2#include "config.h"
3#endif
8e9b7d99 4#include "signingpipe.hh"
a6ef6f7a
BH
5#include "misc.hh"
6#include <poll.h>
fa8fd4d2 7
a6ef6f7a
BH
8#include <sys/socket.h>
9#include <netinet/in.h>
10#include <netinet/tcp.h>
11#include <sched.h>
8e9b7d99 12
8d59e8ce
BH
13// deal with partial reads
14namespace {
15int readn(int fd, void* buffer, unsigned int len)
16{
17 unsigned int pos=0;
18 int res;
19 for(;;) {
20 res = read(fd, (char*)buffer + pos, len - pos);
21 if(res == 0) {
22 if(pos)
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
24 else {
7b217f2e 25 //cerr<<"Got decent EOF on "<<fd<<endl;
8d59e8ce
BH
26 return 0;
27 }
28 }
29
30 if(res < 0) {
31 if(errno == EAGAIN || errno == EINTR) {
32 if(pos==0)
33 return -1;
34 waitForData(fd, -1);
35 continue;
36 }
37 unixDie("Reading from socket in Signing Pipe loop");
38 }
39
40 pos+=res;
41 if(pos == len)
42 break;
43 }
44 return len;
45}
46}
47
07019b51
RG
48void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
49try {
50 csp->worker(fd);
51 return nullptr;
8267bd2c 52}
97f1f8d9 53catch(...) {
e6a9dde5 54 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
07019b51 55 return nullptr;
bec14a20 56}
8267bd2c 57
e3200e07 58ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
c3064e57 59 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
07019b51 60 d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
8267bd2c 61{
a2aaa807 62 d_rrsetToSign = new rrset_t;
90ba52e0 63 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
8d59e8ce 64
8267bd2c
BH
65 if(!d_mustSign)
66 return;
a2aaa807 67
a2aaa807
BH
68 int fds[2];
69
8267bd2c 70 for(unsigned int n=0; n < d_numworkers; ++n) {
8d59e8ce
BH
71 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
72 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
3897b9e1 73 setCloseOnExec(fds[0]);
74 setCloseOnExec(fds[1]);
07019b51 75 d_threads[n] = std::thread(helperWorker, this, fds[1]);
3897b9e1 76 setNonBlocking(fds[0]);
8d59e8ce 77 d_sockets.push_back(fds[0]);
e3200e07 78 d_outstandings[fds[0]] = 0;
8267bd2c
BH
79 }
80}
81
82ChunkedSigningPipe::~ChunkedSigningPipe()
83{
a2aaa807 84 delete d_rrsetToSign;
07019b51 85
8267bd2c
BH
86 if(!d_mustSign)
87 return;
07019b51 88
ef7cd021 89 for(int fd : d_sockets) {
a6ef6f7a
BH
90 close(fd); // this will trigger all threads to exit
91 }
07019b51
RG
92
93 for(auto& thread : d_threads) {
94 thread.join();
a6ef6f7a 95 }
7b217f2e 96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
8267bd2c
BH
97}
98
a2f3b9ec 99namespace {
b9bafae0 100bool
90ba52e0 101dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
a2f3b9ec 102{
90ba52e0 103 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
a2f3b9ec
BH
104}
105
90ba52e0 106bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
a2f3b9ec 107{
90ba52e0 108 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
a2f3b9ec
BH
109}
110}
111
112void ChunkedSigningPipe::dedupRRSet()
113{
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
116 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
117}
118
90ba52e0 119bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
8e9b7d99 120{
8d59e8ce
BH
121 ++d_submitted;
122 // check if we have a full RRSET to sign
90ba52e0 123 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
8e9b7d99 124 {
a2f3b9ec 125 dedupRRSet();
a2aaa807 126 sendRRSetToWorker();
8e9b7d99 127 }
a2aaa807 128 d_rrsetToSign->push_back(rr);
a2f3b9ec 129 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
8e9b7d99
BH
130}
131
a6ef6f7a
BH
132pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
133{
ac10b8c6 134 vector<pollfd> pfds;
e3200e07 135
8d59e8ce 136 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
a4da80fc 137 if(d_eof.count(d_sockets[n]))
ac10b8c6
BH
138 continue;
139 struct pollfd pfd;
140 memset(&pfd, 0, sizeof(pfd));
141 pfd.fd = d_sockets[n];
142 if(rd)
143 pfd.events |= POLLIN;
144 if(wr)
145 pfd.events |= POLLOUT;
146 pfds.push_back(pfd);
a6ef6f7a 147 }
e3200e07 148
a135720d 149 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
a6ef6f7a 150 if(res < 0)
335da0ba 151 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
a6ef6f7a 152 pair<vector<int>, vector<int> > vects;
ac10b8c6 153 for(unsigned int n = 0; n < pfds.size(); ++n)
a6ef6f7a
BH
154 if(pfds[n].revents & POLLIN)
155 vects.first.push_back(pfds[n].fd);
156 else if(pfds[n].revents & POLLOUT)
157 vects.second.push_back(pfds[n].fd);
158
159 return vects;
160}
161
8d59e8ce
BH
162void ChunkedSigningPipe::addSignedToChunks(chunk_t* signedChunk)
163{
164 chunk_t::const_iterator from = signedChunk->begin();
165
166 while(from != signedChunk->end()) {
167 chunk_t& fillChunk = d_chunks.back();
168
169 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
170
171 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
172
173 d_chunks.back().insert(fillChunk.end(), from , from + fit);
174 from+=fit;
175
176 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
177 d_chunks.push_back(chunk_t());
178 }
179}
a6ef6f7a 180
a2aaa807 181void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
8267bd2c
BH
182{
183 if(!d_mustSign) {
8d59e8ce 184 addSignedToChunks(d_rrsetToSign);
a2aaa807 185 d_rrsetToSign->clear();
8267bd2c
BH
186 return;
187 }
a2aaa807 188
8d59e8ce
BH
189 if(d_final && !d_outstanding) // nothing to do!
190 return;
191
a6ef6f7a
BH
192 bool wantRead, wantWrite;
193
194 wantWrite = !d_rrsetToSign->empty();
8d59e8ce 195 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
a6ef6f7a
BH
196
197 pair<vector<int>, vector<int> > rwVect;
198
e3200e07 199 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
a6ef6f7a
BH
200
201 if(wantWrite && !rwVect.second.empty()) {
8d59e8ce 202 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
a6ef6f7a 203 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
a2aaa807 204 d_rrsetToSign = new rrset_t;
e3200e07 205 d_outstandings[*rwVect.second.begin()]++;
8267bd2c 206 d_outstanding++;
a2aaa807 207 d_queued++;
8d59e8ce
BH
208 wantWrite=false;
209 }
210
211 if(wantRead) {
212 while(d_outstanding) {
213 chunk_t* chunk;
a6ef6f7a 214
ef7cd021 215 for(int fd : rwVect.first) {
8d59e8ce
BH
216 if(d_eof.count(fd))
217 continue;
218
219 while(d_outstanding) {
220 int res = readn(fd, &chunk, sizeof(chunk));
221 if(!res) {
e3200e07
RG
222 if (d_outstandings[fd] > 0) {
223 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
224 }
8d59e8ce
BH
225 d_eof.insert(fd);
226 break;
227 }
228 if(res < 0) {
229 if(errno != EAGAIN && errno != EINTR)
230 unixDie("Error reading signed chunk from thread");
231 else
232 break;
233 }
234
235 --d_outstanding;
e3200e07 236 d_outstandings[fd]--;
8d59e8ce
BH
237
238 addSignedToChunks(chunk);
239
240 delete chunk;
241 }
242 }
243 if(!d_outstanding || !d_final)
a6ef6f7a 244 break;
e3200e07 245 rwVect = waitForRW(true, false, -1); // wait for something to happen
a6ef6f7a 246 }
8267bd2c 247 }
a6ef6f7a 248
8d59e8ce 249 if(wantWrite) { // our optimization above failed, we now wait synchronously
e3200e07 250 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
8d59e8ce
BH
251 random_shuffle(rwVect.second.begin(), rwVect.second.end()); // pick random available worker
252 writen2(*rwVect.second.begin(), &d_rrsetToSign, sizeof(d_rrsetToSign));
253 d_rrsetToSign = new rrset_t;
e3200e07 254 d_outstandings[*rwVect.second.begin()]++;
8d59e8ce
BH
255 d_outstanding++;
256 d_queued++;
a6ef6f7a
BH
257 }
258
8267bd2c
BH
259}
260
e3200e07 261unsigned int ChunkedSigningPipe::getReady() const
a2aaa807
BH
262{
263 unsigned int sum=0;
90ba52e0 264 for(const auto& v : d_chunks) {
a2aaa807
BH
265 sum += v.size();
266 }
267 return sum;
268}
07019b51
RG
269
270void ChunkedSigningPipe::worker(int fd)
a6ef6f7a 271try
8267bd2c 272{
a6ef6f7a 273 UeberBackend db("key-only");
ea99d474 274 DNSSECKeeper dk(&db);
a6ef6f7a 275
c23d888d 276 chunk_t* chunk = nullptr;
8267bd2c
BH
277 int res;
278 for(;;) {
a6ef6f7a
BH
279 res = readn(fd, &chunk, sizeof(chunk));
280 if(!res)
8267bd2c 281 break;
a6ef6f7a
BH
282 if(res < 0)
283 unixDie("reading object pointer to sign from pdns");
c23d888d
RG
284 try {
285 set<DNSName> authSet;
286 authSet.insert(d_signer);
287 addRRSigs(dk, db, authSet, *chunk);
288 ++d_signed;
289
290 writen2(fd, &chunk, sizeof(chunk));
291 chunk = nullptr;
292 }
293 catch(const PDNSException& pe) {
294 delete chunk;
295 throw;
296 }
297 catch(const std::exception& e) {
298 delete chunk;
299 throw;
300 }
8267bd2c 301 }
a6ef6f7a
BH
302 close(fd);
303}
c23d888d 304catch(const PDNSException& pe)
97f1f8d9 305{
e6a9dde5 306 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
97f1f8d9
KM
307 close(fd);
308}
c23d888d 309catch(const std::exception& e)
a6ef6f7a 310{
e6a9dde5 311 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
a6ef6f7a 312 close(fd);
8267bd2c
BH
313}
314
8e9b7d99
BH
315void ChunkedSigningPipe::flushToSign()
316{
a2aaa807
BH
317 sendRRSetToWorker();
318 d_rrsetToSign->clear();
8e9b7d99
BH
319}
320
90ba52e0 321vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
8e9b7d99 322{
a6ef6f7a
BH
323 if(final && !d_final) {
324 // this means we should keep on reading until d_outstanding == 0
325 d_final = true;
8e9b7d99 326 flushToSign();
a6ef6f7a 327
ef7cd021 328 for(int fd : d_sockets) {
a6ef6f7a 329 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
7b217f2e 330 //cerr<<"shutdown of "<<fd<<endl;
a6ef6f7a 331 }
8267bd2c 332 }
a6ef6f7a
BH
333 if(d_final)
334 flushToSign(); // should help us wait
90ba52e0 335 vector<DNSZoneRecord> front=d_chunks.front();
a2aaa807
BH
336 d_chunks.pop_front();
337 if(d_chunks.empty())
90ba52e0 338 d_chunks.push_back(vector<DNSZoneRecord>());
e125fc69 339/* if(d_final && front.empty())
340 cerr<<"getChunk returning empty in final"<<endl; */
a2aaa807 341 return front;
8e9b7d99 342}
8d59e8ce 343
8d59e8ce 344