]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
Make the constructors taking a pthread_rwlock_t * private.
[thirdparty/pdns.git] / pdns / signingpipe.cc
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include "signingpipe.hh"
5 #include "misc.hh"
6 #include "dns_random.hh"
7 #include <poll.h>
8
9 #include <sys/socket.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
12 #include <sched.h>
13
14 // deal with partial reads
15 namespace {
16 int readn(int fd, void* buffer, unsigned int len)
17 {
18 unsigned int pos=0;
19 int res;
20 for(;;) {
21 res = read(fd, (char*)buffer + pos, len - pos);
22 if(res == 0) {
23 if(pos)
24 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 else {
26 //cerr<<"Got decent EOF on "<<fd<<endl;
27 return 0;
28 }
29 }
30
31 if(res < 0) {
32 if(errno == EAGAIN || errno == EINTR) {
33 if(pos==0)
34 return -1;
35 waitForData(fd, -1);
36 continue;
37 }
38 unixDie("Reading from socket in Signing Pipe loop");
39 }
40
41 pos+=res;
42 if(pos == len)
43 break;
44 }
45 return len;
46 }
47 }
48
49 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe* csp, int fd)
50 try {
51 csp->worker(fd);
52 return nullptr;
53 }
54 catch(...) {
55 g_log<<Logger::Error<<"Unknown exception in signing thread occurred"<<endl;
56 return nullptr;
57 }
58
59 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName& signerName, bool mustSign, unsigned int workers)
60 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers), d_submitted(0), d_signer(signerName),
61 d_maxchunkrecords(100), d_threads(d_numworkers), d_mustSign(mustSign), d_final(false)
62 {
63 d_rrsetToSign = make_unique<rrset_t>();
64 d_chunks.push_back(vector<DNSZoneRecord>()); // load an empty chunk
65
66 if(!d_mustSign)
67 return;
68
69 int fds[2];
70
71 for(unsigned int n=0; n < d_numworkers; ++n) {
72 if(socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0)
73 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
74 setCloseOnExec(fds[0]);
75 setCloseOnExec(fds[1]);
76 d_threads[n] = std::thread(helperWorker, this, fds[1]);
77 setNonBlocking(fds[0]);
78 d_sockets.push_back(fds[0]);
79 d_outstandings[fds[0]] = 0;
80 }
81 }
82
83 ChunkedSigningPipe::~ChunkedSigningPipe()
84 {
85 if(!d_mustSign)
86 return;
87
88 for(int fd : d_sockets) {
89 close(fd); // this will trigger all threads to exit
90 }
91
92 for(auto& thread : d_threads) {
93 thread.join();
94 }
95 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
96 }
97
98 namespace {
99 bool
100 dedupLessThan(const DNSZoneRecord& a, const DNSZoneRecord &b)
101 {
102 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) < make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
103 }
104
105 bool dedupEqual(const DNSZoneRecord& a, const DNSZoneRecord &b)
106 {
107 return make_tuple(a.dr.d_content->getZoneRepresentation(), a.dr.d_ttl) == make_tuple(b.dr.d_content->getZoneRepresentation(), b.dr.d_ttl); // XXX SLOW SLOW SLOW
108 }
109 }
110
111 void ChunkedSigningPipe::dedupRRSet()
112 {
113 // our set contains contains records for one type and one name, but might not be sorted otherwise
114 sort(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupLessThan);
115 d_rrsetToSign->erase(unique(d_rrsetToSign->begin(), d_rrsetToSign->end(), dedupEqual), d_rrsetToSign->end());
116 }
117
118 bool ChunkedSigningPipe::submit(const DNSZoneRecord& rr)
119 {
120 ++d_submitted;
121 // check if we have a full RRSET to sign
122 if(!d_rrsetToSign->empty() && (d_rrsetToSign->begin()->dr.d_type != rr.dr.d_type || d_rrsetToSign->begin()->dr.d_name != rr.dr.d_name))
123 {
124 dedupRRSet();
125 sendRRSetToWorker();
126 }
127 d_rrsetToSign->push_back(rr);
128 return !d_chunks.empty() && d_chunks.front().size() >= d_maxchunkrecords; // "you can send more"
129 }
130
131 pair<vector<int>, vector<int> > ChunkedSigningPipe::waitForRW(bool rd, bool wr, int seconds)
132 {
133 vector<pollfd> pfds;
134
135 for(unsigned int n = 0; n < d_sockets.size(); ++n) {
136 if(d_eof.count(d_sockets[n]))
137 continue;
138 struct pollfd pfd;
139 memset(&pfd, 0, sizeof(pfd));
140 pfd.fd = d_sockets[n];
141 if(rd)
142 pfd.events |= POLLIN;
143 if(wr)
144 pfd.events |= POLLOUT;
145 pfds.push_back(pfd);
146 }
147
148 int res = poll(&pfds[0], pfds.size(), (seconds < 0) ? -1 : (seconds * 1000)); // -1 = infinite
149 if(res < 0)
150 unixDie("polling for activity from signers, "+std::to_string(d_sockets.size()));
151 pair<vector<int>, vector<int> > vects;
152 for(unsigned int n = 0; n < pfds.size(); ++n)
153 if(pfds[n].revents & POLLIN)
154 vects.first.push_back(pfds[n].fd);
155 else if(pfds[n].revents & POLLOUT)
156 vects.second.push_back(pfds[n].fd);
157
158 return vects;
159 }
160
161 void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr<chunk_t>& signedChunk)
162 {
163 chunk_t::const_iterator from = signedChunk->begin();
164
165 while(from != signedChunk->end()) {
166 chunk_t& fillChunk = d_chunks.back();
167 chunk_t::size_type room = d_maxchunkrecords - fillChunk.size();
168
169 unsigned int fit = std::min(room, (chunk_t::size_type)(signedChunk->end() - from));
170
171 d_chunks.back().insert(fillChunk.end(), from , from + fit);
172 from+=fit;
173
174 if(from != signedChunk->end()) // it didn't fit, so add a new chunk
175 d_chunks.push_back(chunk_t());
176 }
177 }
178
179 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
180 {
181 if(!d_mustSign) {
182 addSignedToChunks(d_rrsetToSign);
183 d_rrsetToSign->clear();
184 return;
185 }
186
187 if(d_final && !d_outstanding) // nothing to do!
188 return;
189
190 bool wantRead, wantWrite;
191
192 wantWrite = !d_rrsetToSign->empty();
193 wantRead = d_outstanding || wantWrite; // if we wrote, we want to read
194
195 pair<vector<int>, vector<int> > rwVect;
196
197 rwVect = waitForRW(wantRead, wantWrite, -1); // wait for something to happen
198
199 if(wantWrite && !rwVect.second.empty()) {
200 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
201 auto ptr = d_rrsetToSign.release();
202 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
203 d_rrsetToSign = make_unique<rrset_t>();
204 d_outstandings[*rwVect.second.begin()]++;
205 d_outstanding++;
206 d_queued++;
207 wantWrite=false;
208 }
209
210 if(wantRead) {
211 while(d_outstanding) {
212 for(int fd : rwVect.first) {
213 if(d_eof.count(fd))
214 continue;
215
216 while(d_outstanding) {
217 chunk_t* chunk = nullptr;
218 int res = readn(fd, &chunk, sizeof(chunk));
219 if(!res) {
220 if (d_outstandings[fd] > 0) {
221 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
222 }
223 d_eof.insert(fd);
224 break;
225 }
226 if(res < 0) {
227 if(errno != EAGAIN && errno != EINTR)
228 unixDie("Error reading signed chunk from thread");
229 else
230 break;
231 }
232
233 std::unique_ptr<rrset_t> chunkPtr(chunk);
234 chunk = nullptr;
235 --d_outstanding;
236 d_outstandings[fd]--;
237
238 addSignedToChunks(chunkPtr);
239 }
240 }
241 if(!d_outstanding || !d_final)
242 break;
243 rwVect = waitForRW(true, false, -1); // wait for something to happen
244 }
245 }
246
247 if(wantWrite) { // our optimization above failed, we now wait synchronously
248 rwVect = waitForRW(false, wantWrite, -1); // wait for something to happen
249 shuffle(rwVect.second.begin(), rwVect.second.end(), pdns::dns_random_engine()); // pick random available worker
250 auto ptr = d_rrsetToSign.release();
251 writen2(*rwVect.second.begin(), &ptr, sizeof(ptr));
252 d_rrsetToSign = make_unique<rrset_t>();
253 d_outstandings[*rwVect.second.begin()]++;
254 d_outstanding++;
255 d_queued++;
256 }
257
258 }
259
260 unsigned int ChunkedSigningPipe::getReady() const
261 {
262 unsigned int sum=0;
263 for(const auto& v : d_chunks) {
264 sum += v.size();
265 }
266 return sum;
267 }
268
269 void ChunkedSigningPipe::worker(int fd)
270 try
271 {
272 UeberBackend db("key-only");
273 DNSSECKeeper dk(&db);
274
275 chunk_t* chunk = nullptr;
276 int res;
277 for(;;) {
278 res = readn(fd, &chunk, sizeof(chunk));
279 if(!res)
280 break;
281 if(res < 0)
282 unixDie("reading object pointer to sign from pdns");
283 try {
284 set<DNSName> authSet;
285 authSet.insert(d_signer);
286 addRRSigs(dk, db, authSet, *chunk);
287 ++d_signed;
288
289 writen2(fd, &chunk, sizeof(chunk));
290 chunk = nullptr;
291 }
292 catch(const PDNSException& pe) {
293 delete chunk;
294 throw;
295 }
296 catch(const std::exception& e) {
297 delete chunk;
298 throw;
299 }
300 }
301 close(fd);
302 }
303 catch(const PDNSException& pe)
304 {
305 g_log<<Logger::Error<<"Signing thread died because of PDNSException: "<<pe.reason<<endl;
306 close(fd);
307 }
308 catch(const std::exception& e)
309 {
310 g_log<<Logger::Error<<"Signing thread died because of std::exception: "<<e.what()<<endl;
311 close(fd);
312 }
313
314 void ChunkedSigningPipe::flushToSign()
315 {
316 sendRRSetToWorker();
317 d_rrsetToSign->clear();
318 }
319
320 vector<DNSZoneRecord> ChunkedSigningPipe::getChunk(bool final)
321 {
322 if(final && !d_final) {
323 // this means we should keep on reading until d_outstanding == 0
324 d_final = true;
325 flushToSign();
326
327 for(int fd : d_sockets) {
328 shutdown(fd, SHUT_WR); // perhaps this transmits EOF the other side
329 //cerr<<"shutdown of "<<fd<<endl;
330 }
331 }
332 if(d_final)
333 flushToSign(); // should help us wait
334 vector<DNSZoneRecord> front=d_chunks.front();
335 d_chunks.pop_front();
336 if(d_chunks.empty())
337 d_chunks.push_back(vector<DNSZoneRecord>());
338 /* if(d_final && front.empty())
339 cerr<<"getChunk returning empty in final"<<endl; */
340 return front;
341 }
342
343