]>
git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/signingpipe.cc
4 #include "signingpipe.hh"
7 #include <boost/foreach.hpp>
8 #include <sys/socket.h>
9 #include <netinet/in.h>
10 #include <netinet/tcp.h>
13 // deal with partial reads
15 int readn(int fd
, void* buffer
, unsigned int len
)
20 res
= read(fd
, (char*)buffer
+ pos
, len
- pos
);
23 throw runtime_error("Signing Pipe remote shut down in the middle of a message");
25 //cerr<<"Got decent EOF on "<<fd<<endl;
31 if(errno
== EAGAIN
|| errno
== EINTR
) {
37 unixDie("Reading from socket in Signing Pipe loop");
49 // used to pass information to the new thread
50 struct StartHelperStruct
52 StartHelperStruct(ChunkedSigningPipe
* csp
, int id
, int fd
) : d_csp(csp
), d_id(id
), d_fd(fd
){}
53 ChunkedSigningPipe
* d_csp
;
58 // used to launch the new thread
59 void* ChunkedSigningPipe::helperWorker(void* p
)
62 StartHelperStruct shs
=*(StartHelperStruct
*)p
;
63 delete (StartHelperStruct
*)p
;
65 shs
.d_csp
->worker(shs
.d_id
, shs
.d_fd
);
69 L
<<Logger::Error
<<"Unknown exception in signing thread occurred"<<endl
;
73 ChunkedSigningPipe::ChunkedSigningPipe(const string
& signerName
, bool mustSign
, const string
& servers
, unsigned int workers
)
74 : d_queued(0), d_outstanding(0), d_signer(signerName
), d_maxchunkrecords(100), d_numworkers(workers
), d_tids(d_numworkers
),
75 d_mustSign(mustSign
), d_final(false), d_submitted(0)
77 d_rrsetToSign
= new rrset_t
;
78 d_chunks
.push_back(vector
<DNSResourceRecord
>()); // load an empty chunk
85 for(unsigned int n
=0; n
< d_numworkers
; ++n
) {
86 if(socketpair(AF_UNIX
, SOCK_STREAM
, 0, fds
) < 0)
87 throw runtime_error("Unable to create communication socket in for ChunkedSigningPipe");
88 setCloseOnExec(fds
[0]);
89 setCloseOnExec(fds
[1]);
90 pthread_create(&d_tids
[n
], 0, helperWorker
, (void*) new StartHelperStruct(this, n
, fds
[1]));
91 setNonBlocking(fds
[0]);
92 d_sockets
.push_back(fds
[0]);
96 ChunkedSigningPipe::~ChunkedSigningPipe()
101 BOOST_FOREACH(int fd
, d_sockets
) {
102 close(fd
); // this will trigger all threads to exit
106 BOOST_FOREACH(pthread_t
& tid
, d_tids
) {
107 pthread_join(tid
, &res
);
109 //cout<<"Did: "<<d_signed<<", records (!= chunks) submitted: "<<d_submitted<<endl;
114 dedupLessThan(const DNSResourceRecord
& a
, const DNSResourceRecord
&b
)
116 return (tie(a
.content
, a
.ttl
) < tie(b
.content
, b
.ttl
));
119 bool dedupEqual(const DNSResourceRecord
& a
, const DNSResourceRecord
&b
)
121 return(tie(a
.content
, a
.ttl
) == tie(b
.content
, b
.ttl
));
125 void ChunkedSigningPipe::dedupRRSet()
127 // our set contains contains records for one type and one name, but might not be sorted otherwise
128 sort(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupLessThan
);
129 d_rrsetToSign
->erase(unique(d_rrsetToSign
->begin(), d_rrsetToSign
->end(), dedupEqual
), d_rrsetToSign
->end());
132 bool ChunkedSigningPipe::submit(const DNSResourceRecord
& rr
)
135 // check if we have a full RRSET to sign
136 if(!d_rrsetToSign
->empty() && (d_rrsetToSign
->begin()->qtype
.getCode() != rr
.qtype
.getCode() || d_rrsetToSign
->begin()->qname
!= rr
.qname
))
141 d_rrsetToSign
->push_back(rr
);
142 return !d_chunks
.empty() && d_chunks
.front().size() >= d_maxchunkrecords
; // "you can send more"
145 pair
<vector
<int>, vector
<int> > ChunkedSigningPipe::waitForRW(bool rd
, bool wr
, int seconds
)
149 for(unsigned int n
= 0; n
< d_sockets
.size(); ++n
) {
150 if(d_eof
.count(d_sockets
[n
]))
153 memset(&pfd
, 0, sizeof(pfd
));
154 pfd
.fd
= d_sockets
[n
];
156 pfd
.events
|= POLLIN
;
158 pfd
.events
|= POLLOUT
;
162 int res
= poll(&pfds
[0], pfds
.size(), (seconds
< 0) ? -1 : (seconds
* 1000)); // -1 = infinite
164 unixDie("polling for activity from signers, "+lexical_cast
<string
>(d_sockets
.size()));
165 pair
<vector
<int>, vector
<int> > vects
;
166 for(unsigned int n
= 0; n
< pfds
.size(); ++n
)
167 if(pfds
[n
].revents
& POLLIN
)
168 vects
.first
.push_back(pfds
[n
].fd
);
169 else if(pfds
[n
].revents
& POLLOUT
)
170 vects
.second
.push_back(pfds
[n
].fd
);
175 void ChunkedSigningPipe::addSignedToChunks(chunk_t
* signedChunk
)
177 chunk_t::const_iterator from
= signedChunk
->begin();
179 while(from
!= signedChunk
->end()) {
180 chunk_t
& fillChunk
= d_chunks
.back();
182 chunk_t::size_type room
= d_maxchunkrecords
- fillChunk
.size();
184 unsigned int fit
= std::min(room
, (chunk_t::size_type
)(signedChunk
->end() - from
));
186 d_chunks
.back().insert(fillChunk
.end(), from
, from
+ fit
);
189 if(from
!= signedChunk
->end()) // it didn't fit, so add a new chunk
190 d_chunks
.push_back(chunk_t());
194 void ChunkedSigningPipe::sendRRSetToWorker() // it sounds so socialist!
197 addSignedToChunks(d_rrsetToSign
);
198 d_rrsetToSign
->clear();
202 if(d_final
&& !d_outstanding
) // nothing to do!
205 bool wantRead
, wantWrite
;
207 wantWrite
= !d_rrsetToSign
->empty();
208 wantRead
= d_outstanding
|| wantWrite
; // if we wrote, we want to read
210 pair
<vector
<int>, vector
<int> > rwVect
;
212 rwVect
= waitForRW(wantRead
, wantWrite
, -1); // wait for something to happen
214 if(wantWrite
&& !rwVect
.second
.empty()) {
215 random_shuffle(rwVect
.second
.begin(), rwVect
.second
.end()); // pick random available worker
216 writen2(*rwVect
.second
.begin(), &d_rrsetToSign
, sizeof(d_rrsetToSign
));
217 d_rrsetToSign
= new rrset_t
;
224 while(d_outstanding
) {
227 BOOST_FOREACH(int fd
, rwVect
.first
) {
231 while(d_outstanding
) {
232 int res
= readn(fd
, &chunk
, sizeof(chunk
));
238 if(errno
!= EAGAIN
&& errno
!= EINTR
)
239 unixDie("Error reading signed chunk from thread");
246 addSignedToChunks(chunk
);
251 if(!d_outstanding
|| !d_final
)
253 rwVect
= waitForRW(1, 0, -1); // wait for something to happen
257 if(wantWrite
) { // our optimization above failed, we now wait synchronously
258 rwVect
= waitForRW(0, wantWrite
, -1); // wait for something to happen
259 random_shuffle(rwVect
.second
.begin(), rwVect
.second
.end()); // pick random available worker
260 writen2(*rwVect
.second
.begin(), &d_rrsetToSign
, sizeof(d_rrsetToSign
));
261 d_rrsetToSign
= new rrset_t
;
268 unsigned int ChunkedSigningPipe::getReady()
271 BOOST_FOREACH(const std::vector
<DNSResourceRecord
>& v
, d_chunks
) {
276 void ChunkedSigningPipe::worker(int id
, int fd
)
280 UeberBackend
db("key-only");
285 res
= readn(fd
, &chunk
, sizeof(chunk
));
289 unixDie("reading object pointer to sign from pdns");
290 set
<string
, CIStringCompare
> authSet
;
291 authSet
.insert(d_signer
);
292 addRRSigs(dk
, db
, authSet
, *chunk
);
295 writen2(fd
, &chunk
, sizeof(chunk
));
299 catch(PDNSException
& pe
)
301 L
<<Logger::Error
<<"Signing thread died because of PDNSException: "<<pe
.reason
<<endl
;
304 catch(std::exception
& e
)
306 L
<<Logger::Error
<<"Signing thread died because of std::exception: "<<e
.what()<<endl
;
310 void ChunkedSigningPipe::flushToSign()
313 d_rrsetToSign
->clear();
316 vector
<DNSResourceRecord
> ChunkedSigningPipe::getChunk(bool final
)
318 if(final
&& !d_final
) {
319 // this means we should keep on reading until d_outstanding == 0
323 BOOST_FOREACH(int fd
, d_sockets
) {
324 shutdown(fd
, SHUT_WR
); // perhaps this transmits EOF the other side
325 //cerr<<"shutdown of "<<fd<<endl;
329 flushToSign(); // should help us wait
330 vector
<DNSResourceRecord
> front
=d_chunks
.front();
331 d_chunks
.pop_front();
333 d_chunks
.push_back(vector
<DNSResourceRecord
>());
334 /* if(d_final && front.empty())
335 cerr<<"getChunk returning empty in final"<<endl; */
343 if(!servers
.empty()) {
345 parseService(servers
, st
);
346 remote
=ComboAddress(st
.host
, st
.port
);
350 if(!servers
.empty()) {
351 fds
[0] = socket(AF_INET
, SOCK_STREAM
, 0);
354 if(connect(fds
[0], (struct sockaddr
*)&remote
, remote
.getSocklen()) < 0)
355 unixDie("Connecting to signing server");
359 signal(SIGCHLD
, SIG_IGN
);
360 if(!fork()) { // child
362 execl("./pdnssec", "./pdnssec", "--config-dir=./", "signing-slave", NULL
);
363 // helperWorker(new StartHelperStruct(this, n));
371 bool readLStringFromSocket(int fd
, string
& msg
)
375 if(!readn(fd
, &len
, sizeof(len
)))
380 scoped_array
<char> buf(new char[len
]);
381 readn(fd
, buf
.get(), len
);
383 msg
.assign(buf
.get(), len
);
386 void writeLStringToSocket(int fd
, const string
& msg
)
389 uint32_t len
= htonl(msg
.length());
390 string
tot((char*)&len
, 4);
393 writen2(fd
, tot
.c_str(), tot
.length());