]>
git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
4 #include "signingpipe.hh"
6 #include "dns_random.hh"
9 #include <sys/socket.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
14 // deal with partial reads
16 int readn(int fd
, void* buffer
, unsigned int len
)
21 res
= read(fd
, (char*)buffer
+ pos
, len
- pos
);
24 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
26 //cerr<<"Got decent EOF on "<<fd<<endl;
32 if(errno
== EAGAIN
|| errno
== EINTR
) {
35 // error handled later
36 (void)waitForData(fd
, -1);
39 unixDie("Reading from socket in Signing Pipe loop");
50 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe
* csp
, int fd
)
56 g_log
<<Logger::Error
<<"Unknown exception in signing thread occurred"<<endl
;
60 ChunkedSigningPipe::ChunkedSigningPipe(DNSName signerName
, bool mustSign
, unsigned int workers
, unsigned int maxChunkRecords
)
61 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers
), d_submitted(0), d_signer(std::move(signerName
)),
62 d_maxchunkrecords(maxChunkRecords
), d_threads(d_numworkers
), d_mustSign(mustSign
), d_final(false)
64 d_rrsetToSign
= make_unique
<rrset_t
>();
65 d_chunks
.push_back(vector
<DNSZoneRecord
>()); // load an empty chunk
72 for(unsigned int n
=0; n
< d_numworkers
; ++n
) {
73 if(socketpair(AF_UNIX
, SOCK_STREAM
, 0, fds
) < 0)
74 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
75 setCloseOnExec(fds
[0]);
76 setCloseOnExec(fds
[1]);
77 d_threads
[n
] = std::thread(helperWorker
, this, fds
[1]);
78 setNonBlocking(fds
[0]);
79 d_sockets
.push_back(fds
[0]);
80 d_outstandings
[fds
[0]] = 0;
84 ChunkedSigningPipe::~ChunkedSigningPipe()
89 for(int fd
: d_sockets
) {
90 close(fd
); // this will trigger all threads to exit
93 for(auto& thread
: d_threads
) {
96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
101 dedupLessThan(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
103 return std::tuple(a
.dr
.getContent()->getZoneRepresentation(), a
.dr
.d_ttl
) < std::tuple(b
.dr
.getContent()->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
106 bool dedupEqual(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
108 return std::tuple(a
.dr
.getContent()->getZoneRepresentation(), a
.dr
.d_ttl
) == std::tuple(b
.dr
.getContent()->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
112 void ChunkedSigningPipe::dedupRRSet()
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupLessThan
);
116 d_rrsetToSign
->erase(unique(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupEqual
), d_rrsetToSign
->end());
119 bool ChunkedSigningPipe::submit(const DNSZoneRecord
& rr
)
122 // check if we have a full RRSET to sign
123 if(!d_rrsetToSign
->empty() && (d_rrsetToSign
->begin()->dr
.d_type
!= rr
.dr
.d_type
|| d_rrsetToSign
->begin()->dr
.d_name
!= rr
.dr
.d_name
))
128 d_rrsetToSign
->push_back(rr
);
129 return !d_chunks
.empty() && d_chunks
.front().size() >= d_maxchunkrecords
; // "you can send more"
132 pair
<vector
<int>, vector
<int> > ChunkedSigningPipe::waitForRW(bool rd
, bool wr
, int seconds
)
136 for(int & socket
: d_sockets
) {
137 if(d_eof
.count(socket
))
140 memset(&pfd
, 0, sizeof(pfd
));
143 pfd
.events
|= POLLIN
;
145 pfd
.events
|= POLLOUT
;
149 int res
= poll(&pfds
[0], pfds
.size(), (seconds
< 0) ? -1 : (seconds
* 1000)); // -1 = infinite
151 unixDie("polling for activity from signers, "+std::to_string(d_sockets
.size()));
152 pair
<vector
<int>, vector
<int> > vects
;
153 for(auto & pfd
: pfds
)
154 if(pfd
.revents
& POLLIN
)
155 vects
.first
.push_back(pfd
.fd
);
156 else if(pfd
.revents
& POLLOUT
)
157 vects
.second
.push_back(pfd
.fd
);
162 void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr
<chunk_t
>& signedChunk
)
164 chunk_t::const_iterator from
= signedChunk
->begin();
166 while(from
!= signedChunk
->end()) {
167 chunk_t
& fillChunk
= d_chunks
.back();
168 chunk_t::size_type room
= d_maxchunkrecords
- fillChunk
.size();
170 unsigned int fit
= std::min(room
, (chunk_t::size_type
)(signedChunk
->end() - from
));
172 d_chunks
.back().insert(fillChunk
.end(), from
, from
+ fit
);
175 if(from
!= signedChunk
->end()) // it didn't fit, so add a new chunk
176 d_chunks
.push_back(chunk_t());
180 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
183 addSignedToChunks(d_rrsetToSign
);
184 d_rrsetToSign
->clear();
188 if(d_final
&& !d_outstanding
) // nothing to do!
191 bool wantRead
, wantWrite
;
193 wantWrite
= !d_rrsetToSign
->empty();
194 wantRead
= d_outstanding
|| wantWrite
; // if we wrote, we want to read
196 pair
<vector
<int>, vector
<int> > rwVect
;
198 rwVect
= waitForRW(wantRead
, wantWrite
, -1); // wait for something to happen
200 if(wantWrite
&& !rwVect
.second
.empty()) {
201 shuffle(rwVect
.second
.begin(), rwVect
.second
.end(), pdns::dns_random_engine()); // pick random available worker
202 auto ptr
= d_rrsetToSign
.get();
203 writen2(*rwVect
.second
.begin(), &ptr
, sizeof(ptr
));
204 // coverity[leaked_storage]
205 static_cast<void>(d_rrsetToSign
.release());
206 d_rrsetToSign
= make_unique
<rrset_t
>();
207 d_outstandings
[*rwVect
.second
.begin()]++;
214 while(d_outstanding
) {
215 for(int fd
: rwVect
.first
) {
219 while(d_outstanding
) {
220 chunk_t
* chunk
= nullptr;
221 int res
= readn(fd
, &chunk
, sizeof(chunk
));
223 if (d_outstandings
[fd
] > 0) {
224 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
230 if(errno
!= EAGAIN
&& errno
!= EINTR
)
231 unixDie("Error reading signed chunk from thread");
236 std::unique_ptr
<rrset_t
> chunkPtr(chunk
);
239 d_outstandings
[fd
]--;
241 addSignedToChunks(chunkPtr
);
244 if(!d_outstanding
|| !d_final
)
246 rwVect
= waitForRW(true, false, -1); // wait for something to happen
250 if(wantWrite
) { // our optimization above failed, we now wait synchronously
251 rwVect
= waitForRW(false, wantWrite
, -1); // wait for something to happen
252 shuffle(rwVect
.second
.begin(), rwVect
.second
.end(), pdns::dns_random_engine()); // pick random available worker
253 auto ptr
= d_rrsetToSign
.get();
254 writen2(*rwVect
.second
.begin(), &ptr
, sizeof(ptr
));
255 // coverity[leaked_storage]
256 static_cast<void>(d_rrsetToSign
.release());
257 d_rrsetToSign
= make_unique
<rrset_t
>();
258 d_outstandings
[*rwVect
.second
.begin()]++;
265 unsigned int ChunkedSigningPipe::getReady() const
268 for(const auto& v
: d_chunks
) {
274 void ChunkedSigningPipe::worker(int fd
)
277 UeberBackend
db("key-only");
278 DNSSECKeeper
dk(&db
);
280 chunk_t
* chunk
= nullptr;
283 res
= readn(fd
, &chunk
, sizeof(chunk
));
287 unixDie("reading object pointer to sign from pdns");
289 set
<DNSName
> authSet
;
290 authSet
.insert(d_signer
);
291 addRRSigs(dk
, db
, authSet
, *chunk
);
294 writen2(fd
, &chunk
, sizeof(chunk
));
297 catch(const PDNSException
& pe
) {
301 catch(const std::exception
& e
) {
308 catch(const PDNSException
& pe
)
310 g_log
<<Logger::Error
<<"Signing thread died because of PDNSException: "<<pe
.reason
<<endl
;
313 catch(const std::exception
& e
)
315 g_log
<<Logger::Error
<<"Signing thread died because of std::exception: "<<e
.what()<<endl
;
319 void ChunkedSigningPipe::flushToSign()
322 d_rrsetToSign
->clear();
325 vector
<DNSZoneRecord
> ChunkedSigningPipe::getChunk(bool final
)
327 if(final
&& !d_final
) {
328 // this means we should keep on reading until d_outstanding == 0
332 for(int fd
: d_sockets
) {
333 shutdown(fd
, SHUT_WR
); // perhaps this transmits EOF the other side
334 //cerr<<"shutdown of "<<fd<<endl;
338 flushToSign(); // should help us wait
339 vector
<DNSZoneRecord
> front
=d_chunks
.front();
340 d_chunks
.pop_front();
342 d_chunks
.push_back(vector
<DNSZoneRecord
>());
343 /* if(d_final && front.empty())
344 cerr<<"getChunk returning empty in final"<<endl; */