]>
git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
2f8a2acd6737ee33e3caaa2b16ef5576231c3242
4 #include "signingpipe.hh"
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <netinet/tcp.h>
13 // deal with partial reads
15 int readn(int fd
, void* buffer
, unsigned int len
)
20 res
= read(fd
, (char*)buffer
+ pos
, len
- pos
);
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 //cerr<<"Got decent EOF on "<<fd<<endl;
31 if(errno
== EAGAIN
|| errno
== EINTR
) {
37 unixDie("Reading from socket in Signing Pipe loop");
48 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe
* csp
, int fd
)
54 L
<<Logger::Error
<<"Unknown exception in signing thread occurred"<<endl
;
58 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName
& signerName
, bool mustSign
, unsigned int workers
)
59 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers
), d_submitted(0), d_signer(signerName
),
60 d_maxchunkrecords(100), d_threads(d_numworkers
), d_mustSign(mustSign
), d_final(false)
62 d_rrsetToSign
= new rrset_t
;
63 d_chunks
.push_back(vector
<DNSZoneRecord
>()); // load an empty chunk
70 for(unsigned int n
=0; n
< d_numworkers
; ++n
) {
71 if(socketpair(AF_UNIX
, SOCK_STREAM
, 0, fds
) < 0)
72 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
73 setCloseOnExec(fds
[0]);
74 setCloseOnExec(fds
[1]);
75 d_threads
[n
] = std::thread(helperWorker
, this, fds
[1]);
76 setNonBlocking(fds
[0]);
77 d_sockets
.push_back(fds
[0]);
78 d_outstandings
[fds
[0]] = 0;
82 ChunkedSigningPipe::~ChunkedSigningPipe()
89 for(int fd
: d_sockets
) {
90 close(fd
); // this will trigger all threads to exit
93 for(auto& thread
: d_threads
) {
96 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
101 dedupLessThan(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
103 return make_tuple(a
.dr
.d_content
->getZoneRepresentation(), a
.dr
.d_ttl
) < make_tuple(b
.dr
.d_content
->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
106 bool dedupEqual(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
108 return make_tuple(a
.dr
.d_content
->getZoneRepresentation(), a
.dr
.d_ttl
) == make_tuple(b
.dr
.d_content
->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
112 void ChunkedSigningPipe::dedupRRSet()
114 // our set contains contains records for one type and one name, but might not be sorted otherwise
115 sort(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupLessThan
);
116 d_rrsetToSign
->erase(unique(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupEqual
), d_rrsetToSign
->end());
119 bool ChunkedSigningPipe::submit(const DNSZoneRecord
& rr
)
122 // check if we have a full RRSET to sign
123 if(!d_rrsetToSign
->empty() && (d_rrsetToSign
->begin()->dr
.d_type
!= rr
.dr
.d_type
|| d_rrsetToSign
->begin()->dr
.d_name
!= rr
.dr
.d_name
))
128 d_rrsetToSign
->push_back(rr
);
129 return !d_chunks
.empty() && d_chunks
.front().size() >= d_maxchunkrecords
; // "you can send more"
132 pair
<vector
<int>, vector
<int> > ChunkedSigningPipe::waitForRW(bool rd
, bool wr
, int seconds
)
136 for(unsigned int n
= 0; n
< d_sockets
.size(); ++n
) {
137 if(d_eof
.count(d_sockets
[n
]))
140 memset(&pfd
, 0, sizeof(pfd
));
141 pfd
.fd
= d_sockets
[n
];
143 pfd
.events
|= POLLIN
;
145 pfd
.events
|= POLLOUT
;
149 int res
= poll(&pfds
[0], pfds
.size(), (seconds
< 0) ? -1 : (seconds
* 1000)); // -1 = infinite
151 unixDie("polling for activity from signers, "+std::to_string(d_sockets
.size()));
152 pair
<vector
<int>, vector
<int> > vects
;
153 for(unsigned int n
= 0; n
< pfds
.size(); ++n
)
154 if(pfds
[n
].revents
& POLLIN
)
155 vects
.first
.push_back(pfds
[n
].fd
);
156 else if(pfds
[n
].revents
& POLLOUT
)
157 vects
.second
.push_back(pfds
[n
].fd
);
162 void ChunkedSigningPipe::addSignedToChunks(chunk_t
* signedChunk
)
164 chunk_t::const_iterator from
= signedChunk
->begin();
166 while(from
!= signedChunk
->end()) {
167 chunk_t
& fillChunk
= d_chunks
.back();
169 chunk_t::size_type room
= d_maxchunkrecords
- fillChunk
.size();
171 unsigned int fit
= std::min(room
, (chunk_t::size_type
)(signedChunk
->end() - from
));
173 d_chunks
.back().insert(fillChunk
.end(), from
, from
+ fit
);
176 if(from
!= signedChunk
->end()) // it didn't fit, so add a new chunk
177 d_chunks
.push_back(chunk_t());
181 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
184 addSignedToChunks(d_rrsetToSign
);
185 d_rrsetToSign
->clear();
189 if(d_final
&& !d_outstanding
) // nothing to do!
192 bool wantRead
, wantWrite
;
194 wantWrite
= !d_rrsetToSign
->empty();
195 wantRead
= d_outstanding
|| wantWrite
; // if we wrote, we want to read
197 pair
<vector
<int>, vector
<int> > rwVect
;
199 rwVect
= waitForRW(wantRead
, wantWrite
, -1); // wait for something to happen
201 if(wantWrite
&& !rwVect
.second
.empty()) {
202 random_shuffle(rwVect
.second
.begin(), rwVect
.second
.end()); // pick random available worker
203 writen2(*rwVect
.second
.begin(), &d_rrsetToSign
, sizeof(d_rrsetToSign
));
204 d_rrsetToSign
= new rrset_t
;
205 d_outstandings
[*rwVect
.second
.begin()]++;
212 while(d_outstanding
) {
215 for(int fd
: rwVect
.first
) {
219 while(d_outstanding
) {
220 int res
= readn(fd
, &chunk
, sizeof(chunk
));
222 if (d_outstandings
[fd
] > 0) {
223 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
229 if(errno
!= EAGAIN
&& errno
!= EINTR
)
230 unixDie("Error reading signed chunk from thread");
236 d_outstandings
[fd
]--;
238 addSignedToChunks(chunk
);
243 if(!d_outstanding
|| !d_final
)
245 rwVect
= waitForRW(true, false, -1); // wait for something to happen
249 if(wantWrite
) { // our optimization above failed, we now wait synchronously
250 rwVect
= waitForRW(false, wantWrite
, -1); // wait for something to happen
251 random_shuffle(rwVect
.second
.begin(), rwVect
.second
.end()); // pick random available worker
252 writen2(*rwVect
.second
.begin(), &d_rrsetToSign
, sizeof(d_rrsetToSign
));
253 d_rrsetToSign
= new rrset_t
;
254 d_outstandings
[*rwVect
.second
.begin()]++;
261 unsigned int ChunkedSigningPipe::getReady() const
264 for(const auto& v
: d_chunks
) {
270 void ChunkedSigningPipe::worker(int fd
)
274 UeberBackend
db("key-only");
276 chunk_t
* chunk
= nullptr;
279 res
= readn(fd
, &chunk
, sizeof(chunk
));
283 unixDie("reading object pointer to sign from pdns");
285 set
<DNSName
> authSet
;
286 authSet
.insert(d_signer
);
287 addRRSigs(dk
, db
, authSet
, *chunk
);
290 writen2(fd
, &chunk
, sizeof(chunk
));
293 catch(const PDNSException
& pe
) {
297 catch(const std::exception
& e
) {
304 catch(const PDNSException
& pe
)
306 L
<<Logger::Error
<<"Signing thread died because of PDNSException: "<<pe
.reason
<<endl
;
309 catch(const std::exception
& e
)
311 L
<<Logger::Error
<<"Signing thread died because of std::exception: "<<e
.what()<<endl
;
315 void ChunkedSigningPipe::flushToSign()
318 d_rrsetToSign
->clear();
321 vector
<DNSZoneRecord
> ChunkedSigningPipe::getChunk(bool final
)
323 if(final
&& !d_final
) {
324 // this means we should keep on reading until d_outstanding == 0
328 for(int fd
: d_sockets
) {
329 shutdown(fd
, SHUT_WR
); // perhaps this transmits EOF the other side
330 //cerr<<"shutdown of "<<fd<<endl;
334 flushToSign(); // should help us wait
335 vector
<DNSZoneRecord
> front
=d_chunks
.front();
336 d_chunks
.pop_front();
338 d_chunks
.push_back(vector
<DNSZoneRecord
>());
339 /* if(d_final && front.empty())
340 cerr<<"getChunk returning empty in final"<<endl; */