]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/distributor.hh
removed some dead code
[thirdparty/pdns.git] / pdns / distributor.hh
1 /*
2 PowerDNS Versatile Database Driven Nameserver
3 Copyright (C) 2002 PowerDNS.COM BV
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 */
19
20 #ifndef DISTRIBUTOR_HH
21 #define DISTRIBUTOR_HH
22
23
24 #include <string>
25 #include <deque>
26 #include <queue>
27 #include <vector>
28 #include <pthread.h>
29 #include <semaphore.h>
30
31 #ifndef WIN32
32 # include <unistd.h>
33 #endif // WIN32
34
35 #include "logger.hh"
36 #include "dns.hh"
37 #include "dnsbackend.hh"
38 #include "ahuexception.hh"
39
40
41 /** the Distributor template class enables you to multithread slow question/answer
42 processes.
43
44 Questions are posed to the Distributor, which can either hand back the answer,
45 or give it directly to a callback. Only the latter mode of operation is used in
46 PowerDNS.
47
48 The Distributor takes care that there are enough Backends alive at any one
49 time and will try to spawn additional ones should they die.
50
51 The Backend needs to count the number of living instances and supply this number to
52 the Distributor using its numBackends() method. This is silly.
53
54 If an exception escapes a Backend, the distributor retires it.
55 */
56 template<class Answer, class Question, class Backend> class Distributor
57 {
58 public:
59 Distributor(int n=10); //!< Create a new Distributor with \param n threads
60 struct AnswerData
61 {
62 Answer *A;
63 time_t created;
64 };
65 int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor
66 Answer *answer(void); //!< Wait for any answer from the Distributor
67 Answer *wait(Question *); //!< wait for an answer to a specific question
68 int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout
69 static void* makeThread(void *); //!< helper function to create our n threads
70 void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue
71
72
73 //! This function can run in a separate thread to output statistics on the queues
74 static void* doStats(void *p)
75 {
76 Distributor *us=static_cast<Distributor *>(p);
77 for(;;)
78 {
79 sleep(1);
80 int qcount, acount;
81
82 us->numquestions.getvalue( &qcount );
83 us->numanswers.getvalue( &acount );
84
85 L <<"queued questions: "<<qcount<<", pending answers: "<<acount<<endl;
86 }
87 }
88
89 int getNumBusy()
90 {
91 return d_num_threads-d_idle_threads;
92 }
93
94
95
96
97 struct QuestionData
98 {
99 Question *Q;
100 time_t created;
101 void (*callback)(const AnswerData &);
102 int id;
103 };
104
105
106 typedef pair<QuestionData, AnswerData> tuple_t;
107
108 private:
109 std::queue<QuestionData> questions;
110 pthread_mutex_t q_lock;
111
112
113 deque<tuple_t> answers;
114 pthread_mutex_t a_lock;
115
116 Semaphore numquestions;
117 Semaphore numanswers;
118
119 pthread_mutex_t to_mut;
120 pthread_cond_t to_cond;
121
122 int nextid;
123 time_t d_last_started;
124 int d_num_threads;
125 int d_idle_threads;
126 Backend *b;
127 };
128
129
130 //template<class Answer, class Question, class Backend>::nextid;
131
132 template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>::Distributor(int n)
133 {
134 b=0;
135 d_idle_threads=0;
136 d_last_started=time(0);
137 // sem_init(&numquestions,0,0);
138 pthread_mutex_init(&q_lock,0);
139
140 // sem_init(&numanswers,0,0);
141 pthread_mutex_init(&a_lock,0);
142
143 pthread_mutex_init(&to_mut,0);
144 pthread_cond_init(&to_cond,0);
145
146 pthread_t tid;
147
148 d_num_threads=n;
149
150 L<<Logger::Warning<<"About to create "<<n<<" backend threads"<<endl;
151
152 for(int i=0;i<n;i++) {
153 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
154 Utility::usleep(50000); // we've overloaded mysql in the past :-)
155 }
156
157 L<<"Done launching threads, ready to distribute questions"<<endl;
158 }
159
160 // start of a new thread
161 template<class Answer, class Question, class Backend>void *Distributor<Answer,Question,Backend>::makeThread(void *p)
162 {
163 try {
164 Backend *b=new Backend(); // this will answer our questions
165 Distributor *us=static_cast<Distributor *>(p);
166 int qcount;
167
168 // this is so gross
169 #ifndef SMTPREDIR
170 int queuetimeout=arg().asNum("queue-limit");
171 #endif
172 // ick ick ick!
173
174 for(;;) {
175 us->d_idle_threads++;
176
177 us->numquestions.getValue( &qcount );
178
179 us->numquestions.wait();
180
181 us->d_idle_threads--;
182 pthread_mutex_lock(&us->q_lock);
183
184 QuestionData QD=us->questions.front();
185
186 Question *q=QD.Q;
187
188 us->questions.pop();
189
190 pthread_mutex_unlock(&us->q_lock);
191 Answer *a;
192
193
194 #ifndef SMTPREDIR
195 if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) {
196 delete q;
197 S.inc("timedout-packets");
198 continue;
199 }
200 #endif
201 // this is the only point where we interact with the backend (synchronous)
202 try {
203 a=b->question(q); // a can be NULL!
204 delete q;
205 }
206 catch(const AhuException &e) {
207 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
208 delete b;
209 return 0;
210 }
211 catch(...) {
212 L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
213 delete b;
214 return 0;
215 }
216
217 AnswerData AD;
218 AD.A=a;
219 AD.created=time(0);
220 tuple_t tuple(QD,AD);
221
222 if(QD.callback) {
223 QD.callback(AD);
224 }
225 else {
226 pthread_mutex_lock(&us->a_lock);
227
228 us->answers.push_back(tuple);
229 pthread_mutex_unlock(&us->a_lock);
230
231 // L<<"We have an answer to send! Trying to get to to_mut lock"<<endl;
232 pthread_mutex_lock(&us->to_mut);
233 // L<<"Yes, we got the lock, we can transmit! First we post"<<endl;
234 us->numanswers.post();
235 // L<<"And now we broadcast!"<<endl;
236 pthread_cond_broadcast(&us->to_cond); // for timeoutWait();
237 pthread_mutex_unlock(&us->to_mut);
238 }
239 }
240
241 delete b;
242 }
243 catch(const AhuException &AE) {
244 L<<Logger::Error<<Logger::NTLog<<"Distributor caught fatal exception: "<<AE.reason<<endl;
245 }
246 catch(...) {
247 L<<Logger::Error<<Logger::NTLog<<"Caught an unknown exception when creating backend, probably"<<endl;
248 }
249 return 0;
250 }
251
252 template<class Answer, class Question, class Backend>int Distributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData &))
253 {
254 if(d_num_threads==1 && callback) { // short circuit
255 if(!b) {
256 L<<Logger::Error<<"Engaging bypass - now operating unthreaded"<<endl;
257 b=new Backend;
258 }
259 Answer *a;
260
261 a=b->question(q);
262
263 AnswerData AD;
264 AD.A=a;
265 AD.created=time(0);
266 callback(AD);
267 return 0;
268 }
269 else {
270 q=new Question(*q);
271 }
272
273 DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
274 if(Backend::numRunning()<d_num_threads && time(0)-d_last_started>5) { // add one
275 d_last_started=time(0);
276 L<<"Distributor misses a thread ("<<Backend::numRunning()<<"<"<<d_num_threads<<"), spawning new one"<<endl;
277 pthread_t tid;
278 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
279 }
280
281 pthread_mutex_lock(&q_lock);
282 QuestionData QD;
283 QD.Q=q;
284 QD.id=nextid++;
285 QD.callback=callback;
286 questions.push(QD);
287 pthread_mutex_unlock(&q_lock);
288
289 numquestions.post();
290
291 if(!(nextid%50)) {
292 int val;
293 numquestions.getValue( &val );
294 if(val>arg().asNum("max-queue-length")) {
295 L<<Logger::Error<<val<<" questions waiting for database attention. Limit is "<<arg().asNum("max-queue-length")<<", respawning"<<endl;
296 exit(1);
297 }
298 }
299
300 return QD.id;
301 }
302
303 template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::answer()
304 {
305 numanswers.wait();
306 tuple_t tuple;
307
308 pthread_mutex_lock(&a_lock);
309 tuple=answers.front();
310 answers.pop_front();
311 pthread_mutex_unlock(&a_lock);
312 return tuple.second.A;
313 }
314
315 //! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
316 template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::wait(Question *q)
317 {
318 for(;;)
319 {
320 numanswers.wait();
321 pthread_mutex_lock(&a_lock);
322
323 // search if the answer is there
324 tuple_t tuple=answers.front();
325 if(tuple.first==q)
326 {
327 answers.pop_front();
328 pthread_mutex_unlock(&a_lock);
329 return tuple.second.A;
330 }
331 // if not, loop again
332 pthread_mutex_unlock(&a_lock);
333 numanswers.post();
334 }
335 // FIXME: write this
336 }
337
338 template<class Answer, class Question,class Backend>void Distributor<Answer,Question,Backend>::getQueueSizes(int &questions, int &answers)
339 {
340 numquestions.getValue( &questions );
341 numanswers.getValue( &answers );
342 }
343
344 //! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
345 template<class Answer, class Question,class Backend>int Distributor<Answer,Question,Backend>::timeoutWait(int id, Answer *a, int timeout)
346 {
347 struct timeval now;
348 struct timespec then;
349
350 Utility::gettimeofday(&now,0);
351
352 then.tv_sec=now.tv_sec+timeout;
353 then.tv_nsec=150*1000000+now.tv_usec*1000; // 150ms
354
355 int rc;
356
357 // L<<"About to acquire to_mut - new broadcasts will then be corked"<<endl;
358 pthread_mutex_lock(&to_mut); // start the lock to prevent races
359
360 for(;;)
361 {
362 // L<<"Acquired the to_mut lock - checking to see if there are already answers"<<endl;
363
364 int val;
365 rc=numanswers.getvalue( &val); // are there answers?
366 // L<<"Now "<<val<<" answers according to the semaphore"<<endl;
367
368 if(val)
369 {
370 DLOG(L<<"There are some answers! Is the one we want among them?"<<endl);
371 DLOG(L<<"numanswers: "<<val<<", rc="<<rc<<", errno="<<errno<<endl);
372
373 pthread_mutex_lock(&a_lock);
374
375 DLOG(L<<"deque contains: "<<answers.size()<<endl);
376 // search if the answer is there
377
378 bool found=false;
379 int rc;
380
381 for(typename deque<tuple_t>::iterator tuple=answers.begin();
382 tuple!=answers.end();
383 tuple++)
384 {
385 if(!found && tuple->first.id==id)
386 {
387 numanswers.wait(); // remove from the semaphore
388 DLOG(L<<"found the answer tuple ("<<tuple->first.id<<") - may be empty answer"<<endl);
389
390 found=true;
391
392 if(!tuple->second.A)
393 rc=-1;
394 else
395 {
396 *a=*tuple->second.A;
397 rc=0;
398 }
399
400 answers.erase(tuple);
401 tuple=answers.begin(); // restart
402 if(tuple==answers.end())break;
403
404 }
405 else // an answer, but not the right one
406 {
407 if(time(0)-tuple->second.created>5) // delete after 5 seconds
408 {
409 L<<"Deleted unclaimed answer "<<tuple->first.id<<" due to age"<<endl;
410 answers.erase(tuple);
411 tuple=answers.begin(); // restart
412 if(tuple==answers.end())break;
413 numanswers.wait();
414 }
415 }
416 }
417 pthread_mutex_unlock(&a_lock);
418
419 if(!found)
420 L<<"Right answer was NOT found - we should now sleep and recheck whenever there are new answers"<<endl;
421 else
422 {
423 pthread_mutex_unlock(&to_mut);
424 return 0;
425 }
426 }
427 else
428 L<<"No answers!"<<endl;
429
430
431 DLOG(L<<"starting wait on condition, to see if there are new answers"<<endl);
432
433 // this first lets go of the to_mut lock, which will 'uncork' the pending pthread_cond_broadcasts
434 rc=pthread_cond_timedwait(&to_cond, &to_mut, &then);
435 // and then it atomically reacquires the lock
436
437 if(rc==ETIMEDOUT)
438 {
439 L<<Logger::Error<<"Timeout waiting for data"<<endl;
440 pthread_mutex_unlock(&to_mut);
441 return -ETIMEDOUT;
442 }
443 L<<"We received a broadcast that there is new data, checking"<<endl;
444
445 }
446
447 return -1; // timeout or whatever
448 // FIXME: write this
449 }
450
451
452 #endif // DISTRIBUTOR_HH
453