]>
Commit | Line | Data |
---|---|---|
12c86877 | 1 | /* |
12471842 PL |
2 | * This file is part of PowerDNS or dnsdist. |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
1258abe0 BH |
22 | #ifndef DISTRIBUTOR_HH |
23 | #define DISTRIBUTOR_HH | |
24 | ||
12c86877 BH |
25 | #include <string> |
26 | #include <deque> | |
27 | #include <queue> | |
28 | #include <vector> | |
29 | #include <pthread.h> | |
76473b92 | 30 | #include <unistd.h> |
12c86877 BH |
31 | #include "logger.hh" |
32 | #include "dns.hh" | |
33 | #include "dnsbackend.hh" | |
5c409fa2 | 34 | #include "pdnsexception.hh" |
092c9cc4 | 35 | #include "arguments.hh" |
491d03d7 | 36 | #include <atomic> |
092c9cc4 | 37 | #include "statbag.hh" |
12c86877 | 38 | |
092c9cc4 | 39 | extern StatBag S; |
12c86877 BH |
40 | |
41 | /** the Distributor template class enables you to multithread slow question/answer | |
42 | processes. | |
43 | ||
ad7d9cd0 | 44 | Questions are posed to the Distributor, which returns the answer via a callback. |
bdc9f8d2 | 45 | |
491d03d7 | 46 | The Distributor spawns sufficient backends, and if they thrown an exception, |
47 | it will cycle the backend but drop the query that was active during the exception. | |
bdc9f8d2 | 48 | */ |
ad7d9cd0 | 49 | |
12c86877 BH |
50 | template<class Answer, class Question, class Backend> class Distributor |
51 | { | |
52 | public: | |
ad7d9cd0 | 53 | static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads |
491d03d7 | 54 | typedef std::function<void(Answer*)> callback_t; |
55 | virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor | |
56 | virtual int getQueueSize() =0; //!< Returns length of question queue | |
d70682f7 | 57 | virtual bool isOverloaded() =0; |
ad7d9cd0 MZ |
58 | }; |
59 | ||
60 | template<class Answer, class Question, class Backend> class SingleThreadDistributor | |
61 | : public Distributor<Answer, Question, Backend> | |
62 | { | |
63 | public: | |
64 | SingleThreadDistributor(); | |
491d03d7 | 65 | typedef std::function<void(Answer*)> callback_t; |
66 | int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor | |
cff3e04d | 67 | int getQueueSize() override { |
ad7d9cd0 MZ |
68 | return 0; |
69 | } | |
70 | ||
cff3e04d | 71 | bool isOverloaded() override |
12c86877 | 72 | { |
ad7d9cd0 MZ |
73 | return false; |
74 | } | |
75 | ||
5d47a576 AT |
76 | ~SingleThreadDistributor() { |
77 | if (b) delete b; | |
78 | } | |
ad7d9cd0 | 79 | private: |
491d03d7 | 80 | Backend *b{0}; |
ad7d9cd0 MZ |
81 | }; |
82 | ||
83 | template<class Answer, class Question, class Backend> class MultiThreadDistributor | |
84 | : public Distributor<Answer, Question, Backend> | |
85 | { | |
86 | public: | |
491d03d7 | 87 | MultiThreadDistributor(int n); |
88 | typedef std::function<void(Answer*)> callback_t; | |
89 | int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor | |
12c86877 | 90 | static void* makeThread(void *); //!< helper function to create our n threads |
491d03d7 | 91 | int getQueueSize() override { |
92 | return d_queued; | |
12c86877 BH |
93 | } |
94 | ||
12c86877 BH |
95 | struct QuestionData |
96 | { | |
97 | Question *Q; | |
491d03d7 | 98 | callback_t callback; |
12c86877 BH |
99 | int id; |
100 | }; | |
101 | ||
491d03d7 | 102 | bool isOverloaded() override |
e7e691cc | 103 | { |
d3363b40 | 104 | return d_overloadQueueLength && (d_queued > d_overloadQueueLength); |
e7e691cc | 105 | } |
12c86877 BH |
106 | |
107 | private: | |
12c86877 BH |
108 | int nextid; |
109 | time_t d_last_started; | |
d3363b40 | 110 | unsigned int d_overloadQueueLength, d_maxQueueLength; |
12c86877 | 111 | int d_num_threads; |
491d03d7 | 112 | std::atomic<unsigned int> d_queued{0}, d_running{0}; |
113 | std::vector<std::pair<int,int>> d_pipes; | |
12c86877 BH |
114 | }; |
115 | ||
12c86877 | 116 | //template<class Answer, class Question, class Backend>::nextid; |
ad7d9cd0 MZ |
117 | template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n) |
118 | { | |
119 | if( n == 1 ) | |
120 | return new SingleThreadDistributor<Answer,Question,Backend>(); | |
121 | else | |
122 | return new MultiThreadDistributor<Answer,Question,Backend>( n ); | |
123 | } | |
124 | ||
125 | template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor() | |
126 | { | |
127 | L<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl; | |
07dbfb4f KM |
128 | try { |
129 | b=new Backend; | |
130 | } | |
131 | catch(const PDNSException &AE) { | |
132 | L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; | |
133 | exit(1); | |
134 | } | |
135 | catch(...) { | |
136 | L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl; | |
137 | exit(1); | |
138 | } | |
ad7d9cd0 | 139 | } |
12c86877 | 140 | |
ad7d9cd0 | 141 | template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n) |
12c86877 | 142 | { |
ad7d9cd0 | 143 | d_num_threads=n; |
d3363b40 | 144 | d_overloadQueueLength=::arg().asNum("overload-queue-length"); |
145 | d_maxQueueLength=::arg().asNum("max-queue-length"); | |
49f076e8 | 146 | nextid=0; |
12c86877 | 147 | d_last_started=time(0); |
12c86877 | 148 | |
12c86877 BH |
149 | pthread_t tid; |
150 | ||
491d03d7 | 151 | |
152 | for(int i=0; i < n; ++i) { | |
153 | int fds[2]; | |
154 | if(pipe(fds) < 0) | |
155 | unixDie("Creating pipe"); | |
156 | d_pipes.push_back({fds[0],fds[1]}); | |
157 | } | |
158 | ||
0469c6ed | 159 | if (n<1) { |
ade4cedf PD |
160 | L<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl; |
161 | exit(1); | |
0469c6ed AT |
162 | } |
163 | ||
45fdd612 | 164 | L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl; |
12c86877 BH |
165 | for(int i=0;i<n;i++) { |
166 | pthread_create(&tid,0,&makeThread,static_cast<void *>(this)); | |
167 | Utility::usleep(50000); // we've overloaded mysql in the past :-) | |
168 | } | |
f4a38014 | 169 | L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl; |
12c86877 BH |
170 | } |
171 | ||
ad7d9cd0 | 172 | |
12c86877 | 173 | // start of a new thread |
ad7d9cd0 | 174 | template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p) |
12c86877 | 175 | { |
a858f79e | 176 | pthread_detach(pthread_self()); |
491d03d7 | 177 | MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p); |
178 | int ournum=us->d_running++; | |
179 | ||
12c86877 BH |
180 | try { |
181 | Backend *b=new Backend(); // this will answer our questions | |
379ab445 | 182 | int queuetimeout=::arg().asNum("queue-limit"); |
12c86877 | 183 | |
491d03d7 | 184 | for(;;) { |
185 | ||
186 | QuestionData* QD; | |
187 | if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD)) | |
188 | unixDie("read"); | |
189 | --us->d_queued; | |
f1ad3c5b | 190 | Answer *a; |
12c86877 | 191 | |
491d03d7 | 192 | if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) { |
193 | delete QD->Q; | |
194 | delete QD; | |
12c86877 BH |
195 | S.inc("timedout-packets"); |
196 | continue; | |
4957a608 | 197 | } |
bf17c14b KM |
198 | |
199 | bool allowRetry=true; | |
200 | retry: | |
12c86877 BH |
201 | // this is the only point where we interact with the backend (synchronous) |
202 | try { | |
bf17c14b KM |
203 | if (!b) { |
204 | allowRetry=false; | |
07dbfb4f | 205 | b=new Backend(); |
bf17c14b KM |
206 | } |
207 | a=b->question(QD->Q); | |
208 | delete QD->Q; | |
12c86877 | 209 | } |
3f81d239 | 210 | catch(const PDNSException &e) { |
bf17c14b | 211 | delete b; |
07dbfb4f | 212 | b=NULL; |
bf17c14b KM |
213 | if (!allowRetry) { |
214 | L<<Logger::Error<<"Backend error: "<<e.reason<<endl; | |
215 | a=QD->Q->replyPacket(); | |
216 | ||
217 | a->setRcode(RCode::ServFail); | |
218 | S.inc("servfail-packets"); | |
74f6d6e1 | 219 | S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString()); |
bf17c14b KM |
220 | |
221 | delete QD->Q; | |
222 | } else { | |
223 | L<<Logger::Error<<"Backend error (retry once): "<<e.reason<<endl; | |
224 | goto retry; | |
225 | } | |
12c86877 BH |
226 | } |
227 | catch(...) { | |
bf17c14b KM |
228 | delete b; |
229 | b=NULL; | |
230 | if (!allowRetry) { | |
231 | L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl; | |
232 | a=QD->Q->replyPacket(); | |
233 | ||
234 | a->setRcode(RCode::ServFail); | |
235 | S.inc("servfail-packets"); | |
74f6d6e1 | 236 | S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString()); |
bf17c14b KM |
237 | |
238 | delete QD->Q; | |
239 | } else { | |
240 | L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl; | |
241 | goto retry; | |
242 | } | |
12c86877 BH |
243 | } |
244 | ||
491d03d7 | 245 | QD->callback(a); |
246 | delete QD; | |
12c86877 | 247 | } |
bf17c14b | 248 | |
12c86877 BH |
249 | delete b; |
250 | } | |
3f81d239 | 251 | catch(const PDNSException &AE) { |
491d03d7 | 252 | L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; |
07dbfb4f | 253 | exit(1); |
12c86877 BH |
254 | } |
255 | catch(...) { | |
491d03d7 | 256 | L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl; |
07dbfb4f | 257 | exit(1); |
12c86877 BH |
258 | } |
259 | return 0; | |
260 | } | |
261 | ||
491d03d7 | 262 | template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback) |
12c86877 | 263 | { |
ad7d9cd0 | 264 | Answer *a; |
bf17c14b KM |
265 | bool allowRetry=true; |
266 | retry: | |
ad7d9cd0 | 267 | try { |
bf17c14b KM |
268 | if (!b) { |
269 | allowRetry=false; | |
07dbfb4f | 270 | b=new Backend; |
bf17c14b | 271 | } |
ad7d9cd0 MZ |
272 | a=b->question(q); // a can be NULL! |
273 | } | |
e6577aef | 274 | catch(const PDNSException &e) { |
ad7d9cd0 | 275 | delete b; |
07dbfb4f | 276 | b=NULL; |
bf17c14b KM |
277 | if (!allowRetry) { |
278 | L<<Logger::Error<<"Backend error: "<<e.reason<<endl; | |
279 | a=q->replyPacket(); | |
280 | ||
281 | a->setRcode(RCode::ServFail); | |
282 | S.inc("servfail-packets"); | |
74f6d6e1 | 283 | S.ringAccount("servfail-queries",q->qdomain.toLogString()); |
bf17c14b KM |
284 | } else { |
285 | L<<Logger::Error<<"Backend error (retry once): "<<e.reason<<endl; | |
286 | goto retry; | |
287 | } | |
12c86877 | 288 | } |
ad7d9cd0 | 289 | catch(...) { |
ad7d9cd0 | 290 | delete b; |
07dbfb4f | 291 | b=NULL; |
bf17c14b KM |
292 | if (!allowRetry) { |
293 | L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl; | |
294 | a=q->replyPacket(); | |
295 | ||
296 | a->setRcode(RCode::ServFail); | |
297 | S.inc("servfail-packets"); | |
74f6d6e1 | 298 | S.ringAccount("servfail-queries",q->qdomain.toLogString()); |
bf17c14b KM |
299 | } else { |
300 | L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl; | |
301 | goto retry; | |
302 | } | |
12c86877 | 303 | } |
491d03d7 | 304 | callback(a); |
ad7d9cd0 MZ |
305 | return 0; |
306 | } | |
307 | ||
491d03d7 | 308 | struct DistributorFatal{}; |
309 | ||
310 | template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback) | |
ad7d9cd0 | 311 | { |
ad7d9cd0 | 312 | q=new Question(*q); |
12c86877 | 313 | |
2301afa0 | 314 | // this is passed to other process over pipe and released there |
491d03d7 | 315 | auto QD=new QuestionData(); |
316 | QD->Q=q; | |
317 | auto ret = QD->id = nextid++; // might be deleted after write! | |
318 | QD->callback=callback; | |
e7e691cc | 319 | |
491d03d7 | 320 | if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) |
321 | unixDie("write"); | |
1258abe0 | 322 | |
491d03d7 | 323 | d_queued++; |
e7e691cc | 324 | |
e7e691cc | 325 | |
d3363b40 | 326 | |
327 | if(d_queued > d_maxQueueLength) { | |
3172f9b6 | 328 | L<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl; |
491d03d7 | 329 | // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens! |
330 | throw DistributorFatal(); | |
1258abe0 | 331 | } |
491d03d7 | 332 | |
333 | return ret; | |
12c86877 BH |
334 | } |
335 | ||
1258abe0 | 336 | #endif // DISTRIBUTOR_HH |
12c86877 | 337 |