]>
git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
4 #include "signingpipe.hh"
6 #include "dns_random.hh"
9 #include <sys/socket.h>
10 #include <netinet/in.h>
11 #include <netinet/tcp.h>
14 // deal with partial reads
16 int readn(int fd
, void* buffer
, unsigned int len
)
21 res
= read(fd
, (char*)buffer
+ pos
, len
- pos
);
24 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
26 //cerr<<"Got decent EOF on "<<fd<<endl;
32 if(errno
== EAGAIN
|| errno
== EINTR
) {
38 unixDie("Reading from socket in Signing Pipe loop");
49 void* ChunkedSigningPipe::helperWorker(ChunkedSigningPipe
* csp
, int fd
)
55 g_log
<<Logger::Error
<<"Unknown exception in signing thread occurred"<<endl
;
59 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName
& signerName
, bool mustSign
, unsigned int workers
)
60 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers
), d_submitted(0), d_signer(signerName
),
61 d_maxchunkrecords(100), d_threads(d_numworkers
), d_mustSign(mustSign
), d_final(false)
63 d_rrsetToSign
= make_unique
<rrset_t
>();
64 d_chunks
.push_back(vector
<DNSZoneRecord
>()); // load an empty chunk
71 for(unsigned int n
=0; n
< d_numworkers
; ++n
) {
72 if(socketpair(AF_UNIX
, SOCK_STREAM
, 0, fds
) < 0)
73 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
74 setCloseOnExec(fds
[0]);
75 setCloseOnExec(fds
[1]);
76 d_threads
[n
] = std::thread(helperWorker
, this, fds
[1]);
77 setNonBlocking(fds
[0]);
78 d_sockets
.push_back(fds
[0]);
79 d_outstandings
[fds
[0]] = 0;
83 ChunkedSigningPipe::~ChunkedSigningPipe()
88 for(int fd
: d_sockets
) {
89 close(fd
); // this will trigger all threads to exit
92 for(auto& thread
: d_threads
) {
95 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
100 dedupLessThan(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
102 return make_tuple(a
.dr
.d_content
->getZoneRepresentation(), a
.dr
.d_ttl
) < make_tuple(b
.dr
.d_content
->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
105 bool dedupEqual(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
107 return make_tuple(a
.dr
.d_content
->getZoneRepresentation(), a
.dr
.d_ttl
) == make_tuple(b
.dr
.d_content
->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
111 void ChunkedSigningPipe::dedupRRSet()
113 // our set contains contains records for one type and one name, but might not be sorted otherwise
114 sort(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupLessThan
);
115 d_rrsetToSign
->erase(unique(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupEqual
), d_rrsetToSign
->end());
118 bool ChunkedSigningPipe::submit(const DNSZoneRecord
& rr
)
121 // check if we have a full RRSET to sign
122 if(!d_rrsetToSign
->empty() && (d_rrsetToSign
->begin()->dr
.d_type
!= rr
.dr
.d_type
|| d_rrsetToSign
->begin()->dr
.d_name
!= rr
.dr
.d_name
))
127 d_rrsetToSign
->push_back(rr
);
128 return !d_chunks
.empty() && d_chunks
.front().size() >= d_maxchunkrecords
; // "you can send more"
131 pair
<vector
<int>, vector
<int> > ChunkedSigningPipe::waitForRW(bool rd
, bool wr
, int seconds
)
135 for(unsigned int n
= 0; n
< d_sockets
.size(); ++n
) {
136 if(d_eof
.count(d_sockets
[n
]))
139 memset(&pfd
, 0, sizeof(pfd
));
140 pfd
.fd
= d_sockets
[n
];
142 pfd
.events
|= POLLIN
;
144 pfd
.events
|= POLLOUT
;
148 int res
= poll(&pfds
[0], pfds
.size(), (seconds
< 0) ? -1 : (seconds
* 1000)); // -1 = infinite
150 unixDie("polling for activity from signers, "+std::to_string(d_sockets
.size()));
151 pair
<vector
<int>, vector
<int> > vects
;
152 for(unsigned int n
= 0; n
< pfds
.size(); ++n
)
153 if(pfds
[n
].revents
& POLLIN
)
154 vects
.first
.push_back(pfds
[n
].fd
);
155 else if(pfds
[n
].revents
& POLLOUT
)
156 vects
.second
.push_back(pfds
[n
].fd
);
161 void ChunkedSigningPipe::addSignedToChunks(std::unique_ptr
<chunk_t
>& signedChunk
)
163 chunk_t::const_iterator from
= signedChunk
->begin();
165 while(from
!= signedChunk
->end()) {
166 chunk_t
& fillChunk
= d_chunks
.back();
167 chunk_t::size_type room
= d_maxchunkrecords
- fillChunk
.size();
169 unsigned int fit
= std::min(room
, (chunk_t::size_type
)(signedChunk
->end() - from
));
171 d_chunks
.back().insert(fillChunk
.end(), from
, from
+ fit
);
174 if(from
!= signedChunk
->end()) // it didn't fit, so add a new chunk
175 d_chunks
.push_back(chunk_t());
179 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
182 addSignedToChunks(d_rrsetToSign
);
183 d_rrsetToSign
->clear();
187 if(d_final
&& !d_outstanding
) // nothing to do!
190 bool wantRead
, wantWrite
;
192 wantWrite
= !d_rrsetToSign
->empty();
193 wantRead
= d_outstanding
|| wantWrite
; // if we wrote, we want to read
195 pair
<vector
<int>, vector
<int> > rwVect
;
197 rwVect
= waitForRW(wantRead
, wantWrite
, -1); // wait for something to happen
199 if(wantWrite
&& !rwVect
.second
.empty()) {
200 shuffle(rwVect
.second
.begin(), rwVect
.second
.end(), pdns::dns_random_engine()); // pick random available worker
201 auto ptr
= d_rrsetToSign
.release();
202 writen2(*rwVect
.second
.begin(), &ptr
, sizeof(ptr
));
203 d_rrsetToSign
= make_unique
<rrset_t
>();
204 d_outstandings
[*rwVect
.second
.begin()]++;
211 while(d_outstanding
) {
212 for(int fd
: rwVect
.first
) {
216 while(d_outstanding
) {
217 chunk_t
* chunk
= nullptr;
218 int res
= readn(fd
, &chunk
, sizeof(chunk
));
220 if (d_outstandings
[fd
] > 0) {
221 throw std::runtime_error("A signing pipe worker died while we were waiting for its result");
227 if(errno
!= EAGAIN
&& errno
!= EINTR
)
228 unixDie("Error reading signed chunk from thread");
233 std::unique_ptr
<rrset_t
> chunkPtr(chunk
);
236 d_outstandings
[fd
]--;
238 addSignedToChunks(chunkPtr
);
241 if(!d_outstanding
|| !d_final
)
243 rwVect
= waitForRW(true, false, -1); // wait for something to happen
247 if(wantWrite
) { // our optimization above failed, we now wait synchronously
248 rwVect
= waitForRW(false, wantWrite
, -1); // wait for something to happen
249 shuffle(rwVect
.second
.begin(), rwVect
.second
.end(), pdns::dns_random_engine()); // pick random available worker
250 auto ptr
= d_rrsetToSign
.release();
251 writen2(*rwVect
.second
.begin(), &ptr
, sizeof(ptr
));
252 d_rrsetToSign
= make_unique
<rrset_t
>();
253 d_outstandings
[*rwVect
.second
.begin()]++;
260 unsigned int ChunkedSigningPipe::getReady() const
263 for(const auto& v
: d_chunks
) {
269 void ChunkedSigningPipe::worker(int fd
)
272 UeberBackend
db("key-only");
273 DNSSECKeeper
dk(&db
);
275 chunk_t
* chunk
= nullptr;
278 res
= readn(fd
, &chunk
, sizeof(chunk
));
282 unixDie("reading object pointer to sign from pdns");
284 set
<DNSName
> authSet
;
285 authSet
.insert(d_signer
);
286 addRRSigs(dk
, db
, authSet
, *chunk
);
289 writen2(fd
, &chunk
, sizeof(chunk
));
292 catch(const PDNSException
& pe
) {
296 catch(const std::exception
& e
) {
303 catch(const PDNSException
& pe
)
305 g_log
<<Logger::Error
<<"Signing thread died because of PDNSException: "<<pe
.reason
<<endl
;
308 catch(const std::exception
& e
)
310 g_log
<<Logger::Error
<<"Signing thread died because of std::exception: "<<e
.what()<<endl
;
314 void ChunkedSigningPipe::flushToSign()
317 d_rrsetToSign
->clear();
320 vector
<DNSZoneRecord
> ChunkedSigningPipe::getChunk(bool final
)
322 if(final
&& !d_final
) {
323 // this means we should keep on reading until d_outstanding == 0
327 for(int fd
: d_sockets
) {
328 shutdown(fd
, SHUT_WR
); // perhaps this transmits EOF the other side
329 //cerr<<"shutdown of "<<fd<<endl;
333 flushToSign(); // should help us wait
334 vector
<DNSZoneRecord
> front
=d_chunks
.front();
335 d_chunks
.pop_front();
337 d_chunks
.push_back(vector
<DNSZoneRecord
>());
338 /* if(d_final && front.empty())
339 cerr<<"getChunk returning empty in final"<<endl; */