2 PowerDNS Versatile Database Driven Nameserver
3 Copyright (C) 2002 PowerDNS.COM BV
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 #ifndef DISTRIBUTOR_HH
21 #define DISTRIBUTOR_HH
29 #include <semaphore.h>
37 #include "dnsbackend.hh"
38 #include "ahuexception.hh"
41 /** the Distributor template class enables you to multithread slow question/answer
44 Questions are posed to the Distributor, which can either hand back the answer,
45 or give it directly to a callback. Only the latter mode of operation is used in
48 The Distributor takes care that there are enough Backends alive at any one
49 time and will try to spawn additional ones should they die.
51 The Backend needs to count the number of living instances and supply this number to
52 the Distributor using its numBackends() method. This is silly.
54 If an exception escapes a Backend, the distributor retires it.
56 template<class Answer, class Question, class Backend> class Distributor
59 Distributor(int n=10); //!< Create a new Distributor with \param n threads
65 int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor
66 Answer *answer(void); //!< Wait for any answer from the Distributor
67 Answer *wait(Question *); //!< wait for an answer to a specific question
68 int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout
69 static void* makeThread(void *); //!< helper function to create our n threads
70 void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue
73 //! This function can run in a separate thread to output statistics on the queues
74 static void* doStats(void *p)
76 Distributor *us=static_cast<Distributor *>(p);
82 us->numquestions.getvalue( &qcount );
83 us->numanswers.getvalue( &acount );
85 L <<"queued questions: "<<qcount<<", pending answers: "<<acount<<endl;
91 return d_num_threads-d_idle_threads;
101 void (*callback)(const AnswerData &);
106 typedef pair<QuestionData, AnswerData> tuple_t;
109 std::queue<QuestionData> questions;
110 pthread_mutex_t q_lock;
113 deque<tuple_t> answers;
114 pthread_mutex_t a_lock;
116 Semaphore numquestions;
117 Semaphore numanswers;
119 pthread_mutex_t to_mut;
120 pthread_cond_t to_cond;
123 time_t d_last_started;
130 //template<class Answer, class Question, class Backend>::nextid;
132 template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>::Distributor(int n)
136 d_last_started=time(0);
137 // sem_init(&numquestions,0,0);
138 pthread_mutex_init(&q_lock,0);
140 // sem_init(&numanswers,0,0);
141 pthread_mutex_init(&a_lock,0);
143 pthread_mutex_init(&to_mut,0);
144 pthread_cond_init(&to_cond,0);
150 L<<Logger::Warning<<"About to create "<<n<<" backend threads"<<endl;
152 for(int i=0;i<n;i++) {
153 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
154 Utility::usleep(50000); // we've overloaded mysql in the past :-)
157 L<<"Done launching threads, ready to distribute questions"<<endl;
160 // start of a new thread
161 template<class Answer, class Question, class Backend>void *Distributor<Answer,Question,Backend>::makeThread(void *p)
164 Backend *b=new Backend(); // this will answer our questions
165 Distributor *us=static_cast<Distributor *>(p);
170 int queuetimeout=arg().asNum("queue-limit");
175 us->d_idle_threads++;
177 us->numquestions.getValue( &qcount );
179 us->numquestions.wait();
181 us->d_idle_threads--;
182 pthread_mutex_lock(&us->q_lock);
184 QuestionData QD=us->questions.front();
190 pthread_mutex_unlock(&us->q_lock);
195 if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) {
197 S.inc("timedout-packets");
201 // this is the only point where we interact with the backend (synchronous)
203 a=b->question(q); // a can be NULL!
206 catch(const AhuException &e) {
207 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
212 L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
220 tuple_t tuple(QD,AD);
226 pthread_mutex_lock(&us->a_lock);
228 us->answers.push_back(tuple);
229 pthread_mutex_unlock(&us->a_lock);
231 // L<<"We have an answer to send! Trying to get to to_mut lock"<<endl;
232 pthread_mutex_lock(&us->to_mut);
233 // L<<"Yes, we got the lock, we can transmit! First we post"<<endl;
234 us->numanswers.post();
235 // L<<"And now we broadcast!"<<endl;
236 pthread_cond_broadcast(&us->to_cond); // for timeoutWait();
237 pthread_mutex_unlock(&us->to_mut);
243 catch(const AhuException &AE) {
244 L<<Logger::Error<<Logger::NTLog<<"Distributor caught fatal exception: "<<AE.reason<<endl;
247 L<<Logger::Error<<Logger::NTLog<<"Caught an unknown exception when creating backend, probably"<<endl;
252 template<class Answer, class Question, class Backend>int Distributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData &))
254 if(d_num_threads==1 && callback) { // short circuit
256 L<<Logger::Error<<"Engaging bypass - now operating unthreaded"<<endl;
273 DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
274 if(Backend::numRunning()<d_num_threads && time(0)-d_last_started>5) { // add one
275 d_last_started=time(0);
276 L<<"Distributor misses a thread ("<<Backend::numRunning()<<"<"<<d_num_threads<<"), spawning new one"<<endl;
278 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
281 pthread_mutex_lock(&q_lock);
285 QD.callback=callback;
287 pthread_mutex_unlock(&q_lock);
293 numquestions.getValue( &val );
294 if(val>arg().asNum("max-queue-length")) {
295 L<<Logger::Error<<val<<" questions waiting for database attention. Limit is "<<arg().asNum("max-queue-length")<<", respawning"<<endl;
303 template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::answer()
308 pthread_mutex_lock(&a_lock);
309 tuple=answers.front();
311 pthread_mutex_unlock(&a_lock);
312 return tuple.second.A;
315 //! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
316 template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::wait(Question *q)
321 pthread_mutex_lock(&a_lock);
323 // search if the answer is there
324 tuple_t tuple=answers.front();
328 pthread_mutex_unlock(&a_lock);
329 return tuple.second.A;
331 // if not, loop again
332 pthread_mutex_unlock(&a_lock);
338 template<class Answer, class Question,class Backend>void Distributor<Answer,Question,Backend>::getQueueSizes(int &questions, int &answers)
340 numquestions.getValue( &questions );
341 numanswers.getValue( &answers );
344 //! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
345 template<class Answer, class Question,class Backend>int Distributor<Answer,Question,Backend>::timeoutWait(int id, Answer *a, int timeout)
348 struct timespec then;
350 Utility::gettimeofday(&now,0);
352 then.tv_sec=now.tv_sec+timeout;
353 then.tv_nsec=150*1000000+now.tv_usec*1000; // 150ms
357 // L<<"About to acquire to_mut - new broadcasts will then be corked"<<endl;
358 pthread_mutex_lock(&to_mut); // start the lock to prevent races
362 // L<<"Acquired the to_mut lock - checking to see if there are already answers"<<endl;
365 rc=numanswers.getvalue( &val); // are there answers?
366 // L<<"Now "<<val<<" answers according to the semaphore"<<endl;
370 DLOG(L<<"There are some answers! Is the one we want among them?"<<endl);
371 DLOG(L<<"numanswers: "<<val<<", rc="<<rc<<", errno="<<errno<<endl);
373 pthread_mutex_lock(&a_lock);
375 DLOG(L<<"deque contains: "<<answers.size()<<endl);
376 // search if the answer is there
381 for(typename deque<tuple_t>::iterator tuple=answers.begin();
382 tuple!=answers.end();
385 if(!found && tuple->first.id==id)
387 numanswers.wait(); // remove from the semaphore
388 DLOG(L<<"found the answer tuple ("<<tuple->first.id<<") - may be empty answer"<<endl);
400 answers.erase(tuple);
401 tuple=answers.begin(); // restart
402 if(tuple==answers.end())break;
405 else // an answer, but not the right one
407 if(time(0)-tuple->second.created>5) // delete after 5 seconds
409 L<<"Deleted unclaimed answer "<<tuple->first.id<<" due to age"<<endl;
410 answers.erase(tuple);
411 tuple=answers.begin(); // restart
412 if(tuple==answers.end())break;
417 pthread_mutex_unlock(&a_lock);
420 L<<"Right answer was NOT found - we should now sleep and recheck whenever there are new answers"<<endl;
423 pthread_mutex_unlock(&to_mut);
428 L<<"No answers!"<<endl;
431 DLOG(L<<"starting wait on condition, to see if there are new answers"<<endl);
433 // this first lets go of the to_mut lock, which will 'uncork' the pending pthread_cond_broadcasts
434 rc=pthread_cond_timedwait(&to_cond, &to_mut, &then);
435 // and then it atomically reacquires the lock
439 L<<Logger::Error<<"Timeout waiting for data"<<endl;
440 pthread_mutex_unlock(&to_mut);
443 L<<"We received a broadcast that there is new data, checking"<<endl;
447 return -1; // timeout or whatever
452 #endif // DISTRIBUTOR_HH