]>
Commit | Line | Data |
---|---|---|
12c86877 BH |
1 | /* |
2 | PowerDNS Versatile Database Driven Nameserver | |
e7e691cc | 3 | Copyright (C) 2002 - 2011 PowerDNS.COM BV |
12c86877 BH |
4 | |
5 | This program is free software; you can redistribute it and/or modify | |
22dc646a BH |
6 | it under the terms of the GNU General Public License version 2 |
7 | as published by the Free Software Foundation | |
f782fe38 MH |
8 | |
9 | Additionally, the license of this program contains a special | |
10 | exception which allows to distribute the program in binary form when | |
11 | it is linked against OpenSSL. | |
12c86877 BH |
12 | |
13 | This program is distributed in the hope that it will be useful, | |
14 | but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | GNU General Public License for more details. | |
17 | ||
18 | You should have received a copy of the GNU General Public License | |
19 | along with this program; if not, write to the Free Software | |
06bd9ccf | 20 | Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
12c86877 | 21 | */ |
1258abe0 BH |
22 | |
23 | #ifndef DISTRIBUTOR_HH | |
24 | #define DISTRIBUTOR_HH | |
25 | ||
12c86877 BH |
26 | #include <string> |
27 | #include <deque> | |
28 | #include <queue> | |
29 | #include <vector> | |
30 | #include <pthread.h> | |
1258abe0 | 31 | #include <semaphore.h> |
76473b92 | 32 | #include <unistd.h> |
12c86877 BH |
33 | #include "logger.hh" |
34 | #include "dns.hh" | |
35 | #include "dnsbackend.hh" | |
5c409fa2 | 36 | #include "pdnsexception.hh" |
092c9cc4 BH |
37 | #include "arguments.hh" |
38 | #include "statbag.hh" | |
12c86877 | 39 | |
092c9cc4 | 40 | extern StatBag S; |
12c86877 BH |
41 | |
42 | /** the Distributor template class enables you to multithread slow question/answer | |
43 | processes. | |
44 | ||
bdc9f8d2 BH |
45 | Questions are posed to the Distributor, which can either hand back the answer, |
46 | or give it directly to a callback. Only the latter mode of operation is used in | |
47 | PowerDNS. | |
48 | ||
49 | The Distributor takes care that there are enough Backends alive at any one | |
50 | time and will try to spawn additional ones should they die. | |
12c86877 | 51 | |
bdc9f8d2 BH |
52 | The Backend needs to count the number of living instances and supply this number to |
53 | the Distributor using its numBackends() method. This is silly. | |
12c86877 | 54 | |
bdc9f8d2 BH |
55 | If an exception escapes a Backend, the distributor retires it. |
56 | */ | |
12c86877 BH |
57 | template<class Answer, class Question, class Backend> class Distributor |
58 | { | |
59 | public: | |
60 | Distributor(int n=10); //!< Create a new Distributor with \param n threads | |
61 | struct AnswerData | |
62 | { | |
63 | Answer *A; | |
64 | time_t created; | |
65 | }; | |
66 | int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor | |
67 | Answer *answer(void); //!< Wait for any answer from the Distributor | |
68 | Answer *wait(Question *); //!< wait for an answer to a specific question | |
69 | int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout | |
70 | static void* makeThread(void *); //!< helper function to create our n threads | |
71 | void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue | |
72 | ||
12c86877 BH |
73 | int getNumBusy() |
74 | { | |
75 | return d_num_threads-d_idle_threads; | |
76 | } | |
77 | ||
12c86877 BH |
78 | struct QuestionData |
79 | { | |
80 | Question *Q; | |
12c86877 BH |
81 | void (*callback)(const AnswerData &); |
82 | int id; | |
83 | }; | |
84 | ||
12c86877 | 85 | typedef pair<QuestionData, AnswerData> tuple_t; |
e7e691cc BH |
86 | bool isOverloaded() |
87 | { | |
88 | return d_overloaded; | |
89 | } | |
12c86877 BH |
90 | |
91 | private: | |
e7e691cc | 92 | bool d_overloaded; |
1258abe0 | 93 | std::queue<QuestionData> questions; |
12c86877 | 94 | pthread_mutex_t q_lock; |
12c86877 BH |
95 | |
96 | deque<tuple_t> answers; | |
97 | pthread_mutex_t a_lock; | |
98 | ||
99 | Semaphore numquestions; | |
100 | Semaphore numanswers; | |
101 | ||
102 | pthread_mutex_t to_mut; | |
103 | pthread_cond_t to_cond; | |
104 | ||
105 | int nextid; | |
106 | time_t d_last_started; | |
107 | int d_num_threads; | |
16f7d28d | 108 | AtomicCounter d_idle_threads; |
12c86877 BH |
109 | Backend *b; |
110 | }; | |
111 | ||
112 | ||
113 | //template<class Answer, class Question, class Backend>::nextid; | |
114 | ||
115 | template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>::Distributor(int n) | |
116 | { | |
117 | b=0; | |
e7e691cc | 118 | d_overloaded = false; |
49f076e8 | 119 | nextid=0; |
16f7d28d | 120 | // d_idle_threads=0; |
12c86877 BH |
121 | d_last_started=time(0); |
122 | // sem_init(&numquestions,0,0); | |
123 | pthread_mutex_init(&q_lock,0); | |
124 | ||
125 | // sem_init(&numanswers,0,0); | |
126 | pthread_mutex_init(&a_lock,0); | |
127 | ||
128 | pthread_mutex_init(&to_mut,0); | |
129 | pthread_cond_init(&to_cond,0); | |
130 | ||
131 | pthread_t tid; | |
132 | ||
133 | d_num_threads=n; | |
134 | ||
45fdd612 | 135 | L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl; |
12c86877 BH |
136 | |
137 | for(int i=0;i<n;i++) { | |
138 | pthread_create(&tid,0,&makeThread,static_cast<void *>(this)); | |
139 | Utility::usleep(50000); // we've overloaded mysql in the past :-) | |
140 | } | |
141 | ||
f4a38014 | 142 | L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl; |
12c86877 BH |
143 | } |
144 | ||
145 | // start of a new thread | |
146 | template<class Answer, class Question, class Backend>void *Distributor<Answer,Question,Backend>::makeThread(void *p) | |
147 | { | |
a858f79e | 148 | pthread_detach(pthread_self()); |
12c86877 BH |
149 | try { |
150 | Backend *b=new Backend(); // this will answer our questions | |
151 | Distributor *us=static_cast<Distributor *>(p); | |
152 | int qcount; | |
153 | ||
154 | // this is so gross | |
155 | #ifndef SMTPREDIR | |
379ab445 | 156 | int queuetimeout=::arg().asNum("queue-limit"); |
12c86877 BH |
157 | #endif |
158 | // ick ick ick! | |
e7e691cc | 159 | static int overloadQueueLength=::arg().asNum("overload-queue-length"); |
12c86877 | 160 | for(;;) { |
16f7d28d | 161 | ++(us->d_idle_threads); |
1258abe0 | 162 | |
12c86877 BH |
163 | us->numquestions.getValue( &qcount ); |
164 | ||
1258abe0 | 165 | us->numquestions.wait(); |
12c86877 | 166 | |
16f7d28d | 167 | --(us->d_idle_threads); |
12c86877 BH |
168 | pthread_mutex_lock(&us->q_lock); |
169 | ||
170 | QuestionData QD=us->questions.front(); | |
171 | ||
078f4c97 BH |
172 | us->questions.pop(); |
173 | pthread_mutex_unlock(&us->q_lock); | |
174 | ||
12c86877 BH |
175 | Question *q=QD.Q; |
176 | ||
e7e691cc BH |
177 | |
178 | if(us->d_overloaded && qcount <= overloadQueueLength/10) { | |
179 | us->d_overloaded=false; | |
180 | } | |
12c86877 | 181 | |
12c86877 BH |
182 | Answer *a; |
183 | ||
12c86877 BH |
184 | #ifndef SMTPREDIR |
185 | if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) { | |
186 | delete q; | |
187 | S.inc("timedout-packets"); | |
188 | continue; | |
4957a608 | 189 | } |
12c86877 BH |
190 | #endif |
191 | // this is the only point where we interact with the backend (synchronous) | |
192 | try { | |
193 | a=b->question(q); // a can be NULL! | |
194 | delete q; | |
195 | } | |
3f81d239 | 196 | catch(const PDNSException &e) { |
12c86877 | 197 | L<<Logger::Error<<"Backend error: "<<e.reason<<endl; |
4957a608 | 198 | delete b; |
12c86877 BH |
199 | return 0; |
200 | } | |
201 | catch(...) { | |
279b5b47 | 202 | L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl; |
4957a608 | 203 | delete b; |
12c86877 BH |
204 | return 0; |
205 | } | |
206 | ||
207 | AnswerData AD; | |
208 | AD.A=a; | |
209 | AD.created=time(0); | |
210 | tuple_t tuple(QD,AD); | |
211 | ||
212 | if(QD.callback) { | |
4957a608 | 213 | QD.callback(AD); |
12c86877 BH |
214 | } |
215 | else { | |
4957a608 | 216 | pthread_mutex_lock(&us->a_lock); |
12c86877 | 217 | |
4957a608 BH |
218 | us->answers.push_back(tuple); |
219 | pthread_mutex_unlock(&us->a_lock); | |
12c86877 | 220 | |
4957a608 BH |
221 | // L<<"We have an answer to send! Trying to get to to_mut lock"<<endl; |
222 | pthread_mutex_lock(&us->to_mut); | |
223 | // L<<"Yes, we got the lock, we can transmit! First we post"<<endl; | |
224 | us->numanswers.post(); | |
225 | // L<<"And now we broadcast!"<<endl; | |
226 | pthread_cond_broadcast(&us->to_cond); // for timeoutWait(); | |
227 | pthread_mutex_unlock(&us->to_mut); | |
12c86877 BH |
228 | } |
229 | } | |
230 | ||
231 | delete b; | |
232 | } | |
3f81d239 | 233 | catch(const PDNSException &AE) { |
12c86877 BH |
234 | L<<Logger::Error<<Logger::NTLog<<"Distributor caught fatal exception: "<<AE.reason<<endl; |
235 | } | |
236 | catch(...) { | |
237 | L<<Logger::Error<<Logger::NTLog<<"Caught an unknown exception when creating backend, probably"<<endl; | |
238 | } | |
239 | return 0; | |
240 | } | |
241 | ||
242 | template<class Answer, class Question, class Backend>int Distributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData &)) | |
243 | { | |
244 | if(d_num_threads==1 && callback) { // short circuit | |
245 | if(!b) { | |
246 | L<<Logger::Error<<"Engaging bypass - now operating unthreaded"<<endl; | |
247 | b=new Backend; | |
248 | } | |
249 | Answer *a; | |
250 | ||
5fe21f79 BH |
251 | try { |
252 | a=b->question(q); // a can be NULL! | |
253 | } | |
3f81d239 | 254 | catch(const PDNSException &e) { |
5fe21f79 BH |
255 | L<<Logger::Error<<"Backend error: "<<e.reason<<endl; |
256 | delete b; | |
257 | b=0; | |
258 | return 0; | |
259 | } | |
260 | catch(...) { | |
261 | L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl; | |
262 | delete b; | |
263 | b=0; | |
264 | return 0; | |
265 | } | |
266 | ||
12c86877 BH |
267 | |
268 | AnswerData AD; | |
269 | AD.A=a; | |
270 | AD.created=time(0); | |
271 | callback(AD); | |
272 | return 0; | |
273 | } | |
274 | else { | |
275 | q=new Question(*q); | |
276 | } | |
277 | ||
278 | DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl); | |
65d0bcad BH |
279 | |
280 | /* the line below is a bit difficult. | |
281 | What happens is that we have a goal for the number of running distributor threads. Furthermore, other | |
abc1d928 | 282 | parts of PowerDNS also start backends, which get included in this count. |
65d0bcad BH |
283 | |
284 | If less than two threads now die, no new ones will be spawned. | |
285 | ||
286 | The solutionis to add '+2' below, but it is not a pretty solution. Better solution is | |
287 | to only account the number of threads within the Distributor, and not in the backend. | |
288 | ||
b634b109 | 289 | XXX FIXME |
65d0bcad BH |
290 | */ |
291 | ||
292 | if(Backend::numRunning() < d_num_threads+2 && time(0)-d_last_started>5) { | |
12c86877 | 293 | d_last_started=time(0); |
280439a9 | 294 | L<<"Distributor misses a thread ("<<Backend::numRunning()<<"<"<<d_num_threads + 2<<"), spawning new one"<<endl; |
12c86877 BH |
295 | pthread_t tid; |
296 | pthread_create(&tid,0,&makeThread,static_cast<void *>(this)); | |
297 | } | |
298 | ||
12c86877 BH |
299 | QuestionData QD; |
300 | QD.Q=q; | |
301 | QD.id=nextid++; | |
302 | QD.callback=callback; | |
078f4c97 BH |
303 | |
304 | pthread_mutex_lock(&q_lock); | |
12c86877 BH |
305 | questions.push(QD); |
306 | pthread_mutex_unlock(&q_lock); | |
1258abe0 | 307 | |
12c86877 | 308 | numquestions.post(); |
e7e691cc BH |
309 | |
310 | static int overloadQueueLength=::arg().asNum("overload-queue-length"); | |
1258abe0 | 311 | |
12c86877 BH |
312 | if(!(nextid%50)) { |
313 | int val; | |
314 | numquestions.getValue( &val ); | |
e7e691cc BH |
315 | |
316 | if(!d_overloaded) | |
317 | d_overloaded = overloadQueueLength && (val > overloadQueueLength); | |
318 | ||
379ab445 BH |
319 | if(val>::arg().asNum("max-queue-length")) { |
320 | L<<Logger::Error<<val<<" questions waiting for database attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl; | |
907ea90a | 321 | _exit(1); |
12c86877 | 322 | } |
e7e691cc | 323 | |
1258abe0 | 324 | } |
12c86877 BH |
325 | |
326 | return QD.id; | |
327 | } | |
328 | ||
329 | template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::answer() | |
330 | { | |
331 | numanswers.wait(); | |
332 | tuple_t tuple; | |
333 | ||
334 | pthread_mutex_lock(&a_lock); | |
335 | tuple=answers.front(); | |
336 | answers.pop_front(); | |
337 | pthread_mutex_unlock(&a_lock); | |
338 | return tuple.second.A; | |
339 | } | |
340 | ||
341 | //! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called | |
342 | template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::wait(Question *q) | |
343 | { | |
344 | for(;;) | |
345 | { | |
346 | numanswers.wait(); | |
347 | pthread_mutex_lock(&a_lock); | |
348 | ||
349 | // search if the answer is there | |
350 | tuple_t tuple=answers.front(); | |
351 | if(tuple.first==q) | |
4957a608 BH |
352 | { |
353 | answers.pop_front(); | |
354 | pthread_mutex_unlock(&a_lock); | |
355 | return tuple.second.A; | |
356 | } | |
12c86877 BH |
357 | // if not, loop again |
358 | pthread_mutex_unlock(&a_lock); | |
359 | numanswers.post(); | |
360 | } | |
361 | // FIXME: write this | |
362 | } | |
363 | ||
364 | template<class Answer, class Question,class Backend>void Distributor<Answer,Question,Backend>::getQueueSizes(int &questions, int &answers) | |
1258abe0 BH |
365 | { |
366 | numquestions.getValue( &questions ); | |
12c86877 BH |
367 | numanswers.getValue( &answers ); |
368 | } | |
369 | ||
1258abe0 | 370 | #endif // DISTRIBUTOR_HH |
12c86877 | 371 |