From: Mark Zealey Date: Mon, 2 Dec 2013 08:57:42 +0000 (+0200) Subject: fixes PowerDNS/pdns#650 X-Git-Tag: rec-3.6.0-rc1~240^2~5 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ad7d9cd065b288a6f59bcb2628aa4ffcd278c7cd;p=thirdparty%2Fpdns.git fixes PowerDNS/pdns#650 The attached patch changes the distributor code in the following ways: 1) Remove (as far as i can see) unused functions which allow for the fetching of the answer from an answers queue. I struggle to understand why this would ever be useful compared to having a callback; it also reduces code complexity and removes some locks. Also allows (2): 2) Split into 3 classes - the new ones being SingleThreadDistributor and MultiThreadDistributor - removes some of the conditional statements. It also means that in distributor-threads=1 mode, NO additional distributor threads are forked (unlike the existing code), and the class will also use less memory and generally be more efficient. This has been tested in that the pdns server starts up and answers questions correctly in both modes, it's more of an RFC attempt to clean up the code a bit. --- diff --git a/pdns/common_startup.cc b/pdns/common_startup.cc index eb75054918..0d62057862 100644 --- a/pdns/common_startup.cc +++ b/pdns/common_startup.cc @@ -211,7 +211,7 @@ int isGuarded(char **argv) return !!p; } -void sendout(const DNSDistributor::AnswerData &AD) +void sendout(const AnswerData &AD) { if(!AD.A) return; @@ -228,7 +228,7 @@ void sendout(const DNSDistributor::AnswerData &AD) void *qthread(void *number) { DNSPacket *P; - DNSDistributor *distributor = new DNSDistributor(::arg().asNum("distributor-threads")); // the big dispatcher! + DNSDistributor *distributor = DNSDistributor::Create(::arg().asNum("distributor-threads")); // the big dispatcher! DNSPacket question; DNSPacket cached; diff --git a/pdns/distributor.hh b/pdns/distributor.hh index 09187e6a65..9b72bb28ac 100644 --- a/pdns/distributor.hh +++ b/pdns/distributor.hh @@ -42,9 +42,7 @@ extern StatBag S; /** the Distributor template class enables you to multithread slow question/answer processes. - Questions are posed to the Distributor, which can either hand back the answer, - or give it directly to a callback. Only the latter mode of operation is used in - PowerDNS. + Questions are posed to the Distributor, which returns the answer via a callback. The Distributor takes care that there are enough Backends alive at any one time and will try to spawn additional ones should they die. @@ -54,21 +52,64 @@ extern StatBag S; If an exception escapes a Backend, the distributor retires it. */ +templatestruct AnswerData +{ + Answer *A; + time_t created; +}; + template class Distributor { public: - Distributor(int n=10); //!< Create a new Distributor with \param n threads - struct AnswerData + static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads + + virtual void cleanup(); + virtual int question(Question *, void (*)(const AnswerData &)) {return 0;}; //!< Submit a question to the Distributor + virtual void getQueueSizes(int &questions, int &answers) {}; //!< Returns length of question queue + + virtual int getNumBusy() {return 0;}; + + virtual bool isOverloaded() {return false;}; + +private: +}; + +template class SingleThreadDistributor + : public Distributor +{ +public: + SingleThreadDistributor(); + int question(Question *, void (*)(const AnswerData &)); //!< Submit a question to the Distributor + void getQueueSizes(int &questions, int &answers) { + questions = 0; + answers = 0; + } + + int getNumBusy() + { + return 0; + } + + bool isOverloaded() { - Answer *A; - time_t created; - }; - int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor - Answer *answer(void); //!< Wait for any answer from the Distributor - Answer *wait(Question *); //!< wait for an answer to a specific question - int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout + return false; + } + +private: + Backend *b; +}; + +template class MultiThreadDistributor + : public Distributor +{ +public: + MultiThreadDistributor(int n=1); + int question(Question *, void (*)(const AnswerData &)); //!< Submit a question to the Distributor static void* makeThread(void *); //!< helper function to create our n threads - void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue + void getQueueSizes(int &questions, int &answers) { + numquestions.getValue( &questions ); + answers = 0; + } int getNumBusy() { @@ -78,11 +119,10 @@ public: struct QuestionData { Question *Q; - void (*callback)(const AnswerData &); + void (*callback)(const AnswerData &); int id; }; - typedef pair tuple_t; bool isOverloaded() { return d_overloaded; @@ -92,12 +132,8 @@ private: bool d_overloaded; std::queue questions; pthread_mutex_t q_lock; - - deque answers; - pthread_mutex_t a_lock; Semaphore numquestions; - Semaphore numanswers; pthread_mutex_t to_mut; pthread_cond_t to_cond; @@ -106,49 +142,59 @@ private: time_t d_last_started; int d_num_threads; AtomicCounter d_idle_threads; - Backend *b; }; - //template::nextid; +templateDistributor* Distributor::Create(int n) +{ + if( n == 1 ) + return new SingleThreadDistributor(); + else + return new MultiThreadDistributor( n ); +} + +templateSingleThreadDistributor::SingleThreadDistributor() +{ + L<Distributor::Distributor(int n) +templateMultiThreadDistributor::MultiThreadDistributor(int n) { - b=0; + d_num_threads=n; d_overloaded = false; + nextid=0; // d_idle_threads=0; d_last_started=time(0); // sem_init(&numquestions,0,0); pthread_mutex_init(&q_lock,0); -// sem_init(&numanswers,0,0); - pthread_mutex_init(&a_lock,0); - pthread_mutex_init(&to_mut,0); pthread_cond_init(&to_cond,0); pthread_t tid; - d_num_threads=n; - L<(this)); Utility::usleep(50000); // we've overloaded mysql in the past :-) } - L<void Distributor::cleanup() +{ + L<void *Distributor::makeThread(void *p) +templatevoid *MultiThreadDistributor::makeThread(void *p) { pthread_detach(pthread_self()); try { Backend *b=new Backend(); // this will answer our questions - Distributor *us=static_cast(p); + MultiThreadDistributor *us=static_cast(p); int qcount; // this is so gross @@ -204,28 +250,11 @@ templatevoid *Distributor AD; AD.A=a; AD.created=time(0); - tuple_t tuple(QD,AD); - if(QD.callback) { - QD.callback(AD); - } - else { - pthread_mutex_lock(&us->a_lock); - - us->answers.push_back(tuple); - pthread_mutex_unlock(&us->a_lock); - - // L<<"We have an answer to send! Trying to get to to_mut lock"<to_mut); - // L<<"Yes, we got the lock, we can transmit! First we post"<numanswers.post(); - // L<<"And now we broadcast!"<to_cond); // for timeoutWait(); - pthread_mutex_unlock(&us->to_mut); - } + QD.callback(AD); } delete b; @@ -239,41 +268,35 @@ templatevoid *Distributorint Distributor::question(Question* q, void (*callback)(const AnswerData &)) +templateint SingleThreadDistributor::question(Question* q, void (*callback)(const AnswerData &)) { - if(d_num_threads==1 && callback) { // short circuit - if(!b) { - L<question(q); // a can be NULL! - } - catch(const PDNSException &e) { - L<question(q); // a can be NULL! + } + catch(const AhuException &e) { + L< AD; + AD.A=a; + AD.created=time(0); + callback(AD); + return 0; +} + +templateint MultiThreadDistributor::question(Question* q, void (*callback)(const AnswerData &)) +{ + // XXX assert callback + q=new Question(*q); DLOG(L<<"Distributor has "<int DistributorAnswer* Distributor::answer() -{ - numanswers.wait(); - tuple_t tuple; - - pthread_mutex_lock(&a_lock); - tuple=answers.front(); - answers.pop_front(); - pthread_mutex_unlock(&a_lock); - return tuple.second.A; -} - -//! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called -templateAnswer* Distributor::wait(Question *q) -{ - for(;;) - { - numanswers.wait(); - pthread_mutex_lock(&a_lock); - - // search if the answer is there - tuple_t tuple=answers.front(); - if(tuple.first==q) - { - answers.pop_front(); - pthread_mutex_unlock(&a_lock); - return tuple.second.A; - } - // if not, loop again - pthread_mutex_unlock(&a_lock); - numanswers.post(); - } - // FIXME: write this -} - -templatevoid Distributor::getQueueSizes(int &questions, int &answers) -{ - numquestions.getValue( &questions ); - numanswers.getValue( &answers ); -} - #endif // DISTRIBUTOR_HH