]>
Commit | Line | Data |
---|---|---|
8e9b7d99 BH |
1 | #include "signingpipe.hh" |
2 | ||
8267bd2c BH |
3 | AtomicCounter ChunkedSigningPipe::s_workerid; |
4 | ||
5 | void* ChunkedSigningPipe::helperWorker(void* p) | |
bec14a20 | 6 | try |
8267bd2c BH |
7 | { |
8 | ChunkedSigningPipe* us = (ChunkedSigningPipe*)p; | |
9 | us->worker(); | |
10 | return 0; | |
11 | } | |
bec14a20 BH |
12 | catch(std::exception& e) { |
13 | cerr<<"Signing thread died with error "<<e.what()<<endl; | |
14 | } | |
8267bd2c BH |
15 | |
16 | ChunkedSigningPipe::ChunkedSigningPipe(DNSSECKeeper& dk, UeberBackend& db, const std::string& signerName, bool mustSign, unsigned int workers) | |
17 | : d_dk(dk), d_db(db), d_signer(signerName), d_chunkrecords(100), d_outstanding(0), d_numworkers(workers), d_tids(d_numworkers), | |
18 | d_mustSign(mustSign) | |
19 | { | |
20 | if(!d_mustSign) | |
21 | return; | |
22 | if(pipe(d_uppipe) < 0 || pipe(d_backpipe)) | |
23 | throw runtime_error("Unable to create communication pipes in for ChunkedSigningPipe"); | |
24 | ||
25 | Utility::setNonBlocking(d_backpipe[0]); | |
26 | for(unsigned int n=0; n < d_numworkers; ++n) { | |
27 | pthread_create(&d_tids[n], 0, helperWorker, (void*) this); | |
28 | } | |
29 | } | |
30 | ||
31 | ChunkedSigningPipe::~ChunkedSigningPipe() | |
32 | { | |
33 | if(!d_mustSign) | |
34 | return; | |
35 | close(d_uppipe[1]); // this will trigger all threads to exit | |
36 | void* res; | |
37 | for(unsigned int n = 0; n < d_numworkers; ++n) | |
38 | pthread_join(d_tids[n], &res); | |
39 | ||
40 | close(d_backpipe[1]); | |
41 | close(d_backpipe[0]); | |
42 | close(d_uppipe[0]); | |
43 | } | |
44 | ||
8e9b7d99 BH |
45 | bool ChunkedSigningPipe::submit(const DNSResourceRecord& rr) |
46 | { | |
47 | if(!d_toSign.empty() && (d_toSign.begin()->qtype.getCode() != rr.qtype.getCode() || !pdns_iequals(d_toSign.begin()->qname, rr.qname))) | |
48 | { | |
49 | flushToSign(); | |
50 | } | |
51 | d_toSign.push_back(rr); | |
52 | return d_chunk.size() > d_chunkrecords; | |
53 | } | |
54 | ||
8267bd2c BH |
55 | void ChunkedSigningPipe::sendChunkToSign() |
56 | { | |
57 | if(!d_mustSign) { | |
f5f0e6fb | 58 | d_chunk.insert(d_chunk.end(), d_toSign.begin(), d_toSign.end()); |
8267bd2c BH |
59 | d_toSign.clear(); |
60 | return; | |
61 | } | |
62 | if(!d_toSign.empty()) { | |
63 | chunk_t* toSign = new chunk_t(d_toSign); | |
64 | ||
65 | if(write(d_uppipe[1], &toSign, sizeof(toSign)) != sizeof(toSign)) | |
66 | throw runtime_error("Partial write or error communicating to signing thread"); | |
67 | d_outstanding++; | |
68 | } | |
69 | chunk_t* signedChunk; | |
70 | ||
71 | while(d_outstanding && read(d_backpipe[0], &signedChunk, sizeof(signedChunk)) > 0) { | |
72 | --d_outstanding; | |
f5f0e6fb | 73 | d_chunk.insert(d_chunk.end(), signedChunk->begin(), signedChunk->end()); |
8267bd2c BH |
74 | delete signedChunk; |
75 | } | |
76 | ||
77 | d_toSign.clear(); | |
78 | } | |
79 | ||
80 | void ChunkedSigningPipe::worker() | |
81 | { | |
82 | //int my_id = ++s_workerid; | |
83 | // cout<<my_id<<" worker reporting!"<<endl; | |
84 | chunk_t* chunk; | |
85 | ||
86 | DNSSECKeeper dk; | |
87 | int res; | |
88 | for(;;) { | |
89 | res=read(d_uppipe[0], &chunk, sizeof(chunk)); | |
90 | if(!res) { | |
91 | // cerr<<my_id<<" exiting"<<endl; | |
92 | break; | |
93 | } | |
94 | if(res != sizeof(chunk)) | |
95 | unixDie("error or partial read from ChunkedSigningPipe main thread"); | |
96 | // cout<< my_id <<" worker signing!"<<endl; | |
97 | addRRSigs(dk, d_db, d_signer, *chunk); // should start returning sigs separately instead of interleaved | |
98 | if(write(d_backpipe[1], &chunk, sizeof(chunk)) != sizeof(chunk)) | |
99 | unixDie("error writing back to ChunkedSigningPipe"); | |
100 | } | |
101 | } | |
102 | ||
8e9b7d99 BH |
103 | void ChunkedSigningPipe::flushToSign() |
104 | { | |
8267bd2c | 105 | sendChunkToSign(); |
8e9b7d99 BH |
106 | d_toSign.clear(); |
107 | } | |
108 | ||
109 | vector<DNSResourceRecord> ChunkedSigningPipe::getChunk(bool final) | |
110 | { | |
8267bd2c BH |
111 | if(final) { |
112 | Utility::setBlocking(d_backpipe[0]); | |
8e9b7d99 | 113 | flushToSign(); |
8267bd2c | 114 | } |
8e9b7d99 BH |
115 | |
116 | chunk_t::size_type amount=min(d_chunkrecords, d_chunk.size()); | |
f5f0e6fb BH |
117 | chunk_t chunk(d_chunk.begin(), d_chunk.begin() + amount); |
118 | ||
119 | d_chunk.erase(d_chunk.begin(), d_chunk.begin() + amount); | |
8e9b7d99 BH |
120 | |
121 | return chunk; | |
122 | } |