]>
git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
4 #include "signingpipe.hh"
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <netinet/tcp.h>
13 // deal with partial reads
15 int readn(int fd
, void* buffer
, unsigned int len
)
20 res
= read(fd
, (char*)buffer
+ pos
, len
- pos
);
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 //cerr<<"Got decent EOF on "<<fd<<endl;
31 if(errno
== EAGAIN
|| errno
== EINTR
) {
37 unixDie("Reading from socket in Signing Pipe loop");
49 // used to pass information to the new thread
50 struct StartHelperStruct
52 StartHelperStruct(ChunkedSigningPipe
* csp
, int id
, int fd
) : d_csp(csp
), d_id(id
), d_fd(fd
){}
53 ChunkedSigningPipe
* d_csp
;
58 // used to launch the new thread
59 void* ChunkedSigningPipe::helperWorker(void* p
)
62 StartHelperStruct shs
=*(StartHelperStruct
*)p
;
63 delete (StartHelperStruct
*)p
;
65 shs
.d_csp
->worker(shs
.d_id
, shs
.d_fd
);
69 L
<<Logger::Error
<<"Unknown exception in signing thread occurred"<<endl
;
73 ChunkedSigningPipe::ChunkedSigningPipe(const DNSName
& signerName
, bool mustSign
, const string
& servers
, unsigned int workers
)
74 : d_signed(0), d_queued(0), d_outstanding(0), d_numworkers(workers
), d_submitted(0), d_signer(signerName
),
75 d_maxchunkrecords(100), d_tids(d_numworkers
), d_mustSign(mustSign
), d_final(false)
77 d_rrsetToSign
= new rrset_t
;
78 d_chunks
.push_back(vector
<DNSZoneRecord
>()); // load an empty chunk
85 for(unsigned int n
=0; n
< d_numworkers
; ++n
) {
86 if(socketpair(AF_UNIX
, SOCK_STREAM
, 0, fds
) < 0)
87 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
88 setCloseOnExec(fds
[0]);
89 setCloseOnExec(fds
[1]);
90 pthread_create(&d_tids
[n
], 0, helperWorker
, (void*) new StartHelperStruct(this, n
, fds
[1]));
91 setNonBlocking(fds
[0]);
92 d_sockets
.push_back(fds
[0]);
96 ChunkedSigningPipe::~ChunkedSigningPipe()
101 for(int fd
: d_sockets
) {
102 close(fd
); // this will trigger all threads to exit
106 for(pthread_t
& tid
: d_tids
) {
107 pthread_join(tid
, &res
);
109 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
114 dedupLessThan(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
116 return make_tuple(a
.dr
.d_content
->getZoneRepresentation(), a
.dr
.d_ttl
) < make_tuple(b
.dr
.d_content
->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
119 bool dedupEqual(const DNSZoneRecord
& a
, const DNSZoneRecord
&b
)
121 return make_tuple(a
.dr
.d_content
->getZoneRepresentation(), a
.dr
.d_ttl
) == make_tuple(b
.dr
.d_content
->getZoneRepresentation(), b
.dr
.d_ttl
); // XXX SLOW SLOW SLOW
125 void ChunkedSigningPipe::dedupRRSet()
127 // our set contains contains records for one type and one name, but might not be sorted otherwise
128 sort(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupLessThan
);
129 d_rrsetToSign
->erase(unique(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupEqual
), d_rrsetToSign
->end());
132 bool ChunkedSigningPipe::submit(const DNSZoneRecord
& rr
)
135 // check if we have a full RRSET to sign
136 if(!d_rrsetToSign
->empty() && (d_rrsetToSign
->begin()->dr
.d_type
!= rr
.dr
.d_type
|| d_rrsetToSign
->begin()->dr
.d_name
!= rr
.dr
.d_name
))
141 d_rrsetToSign
->push_back(rr
);
142 return !d_chunks
.empty() && d_chunks
.front().size() >= d_maxchunkrecords
; // "you can send more"
145 pair
<vector
<int>, vector
<int> > ChunkedSigningPipe::waitForRW(bool rd
, bool wr
, int seconds
)
149 for(unsigned int n
= 0; n
< d_sockets
.size(); ++n
) {
150 if(d_eof
.count(d_sockets
[n
]))
153 memset(&pfd
, 0, sizeof(pfd
));
154 pfd
.fd
= d_sockets
[n
];
156 pfd
.events
|= POLLIN
;
158 pfd
.events
|= POLLOUT
;
162 int res
= poll(&pfds
[0], pfds
.size(), (seconds
< 0) ? -1 : (seconds
* 1000)); // -1 = infinite
164 unixDie("polling for activity from signers, "+std::to_string(d_sockets
.size()));
165 pair
<vector
<int>, vector
<int> > vects
;
166 for(unsigned int n
= 0; n
< pfds
.size(); ++n
)
167 if(pfds
[n
].revents
& POLLIN
)
168 vects
.first
.push_back(pfds
[n
].fd
);
169 else if(pfds
[n
].revents
& POLLOUT
)
170 vects
.second
.push_back(pfds
[n
].fd
);
175 void ChunkedSigningPipe::addSignedToChunks(chunk_t
* signedChunk
)
177 chunk_t::const_iterator from
= signedChunk
->begin();
179 while(from
!= signedChunk
->end()) {
180 chunk_t
& fillChunk
= d_chunks
.back();
182 chunk_t::size_type room
= d_maxchunkrecords
- fillChunk
.size();
184 unsigned int fit
= std::min(room
, (chunk_t::size_type
)(signedChunk
->end() - from
));
186 d_chunks
.back().insert(fillChunk
.end(), from
, from
+ fit
);
189 if(from
!= signedChunk
->end()) // it didn't fit, so add a new chunk
190 d_chunks
.push_back(chunk_t());
194 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
197 addSignedToChunks(d_rrsetToSign
);
198 d_rrsetToSign
->clear();
202 if(d_final
&& !d_outstanding
) // nothing to do!
205 bool wantRead
, wantWrite
;
207 wantWrite
= !d_rrsetToSign
->empty();
208 wantRead
= d_outstanding
|| wantWrite
; // if we wrote, we want to read
210 pair
<vector
<int>, vector
<int> > rwVect
;
212 rwVect
= waitForRW(wantRead
, wantWrite
, -1); // wait for something to happen
214 if(wantWrite
&& !rwVect
.second
.empty()) {
215 random_shuffle(rwVect
.second
.begin(), rwVect
.second
.end()); // pick random available worker
216 writen2(*rwVect
.second
.begin(), &d_rrsetToSign
, sizeof(d_rrsetToSign
));
217 d_rrsetToSign
= new rrset_t
;
224 while(d_outstanding
) {
227 for(int fd
: rwVect
.first
) {
231 while(d_outstanding
) {
232 int res
= readn(fd
, &chunk
, sizeof(chunk
));
238 if(errno
!= EAGAIN
&& errno
!= EINTR
)
239 unixDie("Error reading signed chunk from thread");
246 addSignedToChunks(chunk
);
251 if(!d_outstanding
|| !d_final
)
253 rwVect
= waitForRW(1, 0, -1); // wait for something to happen
257 if(wantWrite
) { // our optimization above failed, we now wait synchronously
258 rwVect
= waitForRW(0, wantWrite
, -1); // wait for something to happen
259 random_shuffle(rwVect
.second
.begin(), rwVect
.second
.end()); // pick random available worker
260 writen2(*rwVect
.second
.begin(), &d_rrsetToSign
, sizeof(d_rrsetToSign
));
261 d_rrsetToSign
= new rrset_t
;
268 unsigned int ChunkedSigningPipe::getReady()
271 for(const auto& v
: d_chunks
) {
276 void ChunkedSigningPipe::worker(int id
, int fd
)
280 UeberBackend
db("key-only");
282 chunk_t
* chunk
= nullptr;
285 res
= readn(fd
, &chunk
, sizeof(chunk
));
289 unixDie("reading object pointer to sign from pdns");
291 set
<DNSName
> authSet
;
292 authSet
.insert(d_signer
);
293 addRRSigs(dk
, db
, authSet
, *chunk
);
296 writen2(fd
, &chunk
, sizeof(chunk
));
299 catch(const PDNSException
& pe
) {
303 catch(const std::exception
& e
) {
310 catch(const PDNSException
& pe
)
312 L
<<Logger::Error
<<"Signing thread died because of PDNSException: "<<pe
.reason
<<endl
;
315 catch(const std::exception
& e
)
317 L
<<Logger::Error
<<"Signing thread died because of std::exception: "<<e
.what()<<endl
;
321 void ChunkedSigningPipe::flushToSign()
324 d_rrsetToSign
->clear();
327 vector
<DNSZoneRecord
> ChunkedSigningPipe::getChunk(bool final
)
329 if(final
&& !d_final
) {
330 // this means we should keep on reading until d_outstanding == 0
334 for(int fd
: d_sockets
) {
335 shutdown(fd
, SHUT_WR
); // perhaps this transmits EOF the other side
336 //cerr<<"shutdown of "<<fd<<endl;
340 flushToSign(); // should help us wait
341 vector
<DNSZoneRecord
> front
=d_chunks
.front();
342 d_chunks
.pop_front();
344 d_chunks
.push_back(vector
<DNSZoneRecord
>());
345 /* if(d_final && front.empty())
346 cerr<<"getChunk returning empty in final"<<endl; */
354 if(!servers
.empty()) {
356 parseService(servers
, st
);
357 remote
=ComboAddress(st
.host
, st
.port
);
361 if(!servers
.empty()) {
362 fds
[0] = socket(AF_INET
, SOCK_STREAM
, 0);
365 if(connect(fds
[0], (struct sockaddr
*)&remote
, remote
.getSocklen()) < 0)
366 unixDie("Connecting to signing server");
370 signal(SIGCHLD
, SIG_IGN
);
371 if(!fork()) { // child
373 execl("./pdnsutil", "./pdnsutil", "--config-dir=./", "signing-slave", NULL
);
374 // helperWorker(new StartHelperStruct(this, n));
382 bool readLStringFromSocket(int fd
, string
& msg
)
386 if(!readn(fd
, &len
, sizeof(len
)))
391 scoped_array
<char> buf(new char[len
]);
392 readn(fd
, buf
.get(), len
);
394 msg
.assign(buf
.get(), len
);
397 void writeLStringToSocket(int fd
, const string
& msg
)
400 uint32_t len
= htonl(msg
.length());
401 string
tot((char*)&len
, 4);
404 writen2(fd
, tot
.c_str(), tot
.length());