From: bert hubert Date: Thu, 28 May 2015 11:50:58 +0000 (+0200) Subject: big cleanup of distributor, with free unit tests. X-Git-Tag: dnsdist-1.0.0-alpha1~248^2~79^2~8^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=491d03d7e0aad1e560ea5c1202c2dd4d00792074;p=thirdparty%2Fpdns.git big cleanup of distributor, with free unit tests. --- diff --git a/pdns/Makefile.am b/pdns/Makefile.am index 84ec543189..054c2f0699 100644 --- a/pdns/Makefile.am +++ b/pdns/Makefile.am @@ -964,6 +964,7 @@ testrunner_SOURCES = \ test-base32_cc.cc \ test-base64_cc.cc \ test-bindparser_cc.cc \ + test-distributor_hh.cc \ test-dns_random_hh.cc \ test-dnsname_cc.cc \ test-dnsrecords_cc.cc \ diff --git a/pdns/common_startup.cc b/pdns/common_startup.cc index 16ccc7f50e..118373aa09 100644 --- a/pdns/common_startup.cc +++ b/pdns/common_startup.cc @@ -202,10 +202,7 @@ try BOOST_FOREACH(DNSDistributor* d, g_distributors) { if(!d) continue; - int qcount, acount; - - d->getQueueSizes(qcount, acount); // this does locking and other things, so don't get smart - totcount+=qcount; + totcount += d->getQueueSize(); // this does locking and other things, so don't get smart } return totcount; } @@ -303,16 +300,16 @@ int isGuarded(char **argv) return !!p; } -void sendout(const AnswerData &AD) +void sendout(DNSPacket* a) { - if(!AD.A) + if(!a) return; - N->send(AD.A); + N->send(a); - int diff=AD.A->d_dt.udiff(); + int diff=a->d_dt.udiff(); avg_latency=(int)(0.999*avg_latency+0.001*diff); - delete AD.A; + delete a; } //! The qthread receives questions over the internet via the Nameserver class, and hands them to the Distributor for further processing @@ -429,7 +426,12 @@ void *qthread(void *number) if(logDNSQueries) L<<"packetcache MISS"<question(P, &sendout); // otherwise, give to the distributor + try { + distributor->question(P, &sendout); // otherwise, give to the distributor + } + catch(DistributorFatal& df) { // when this happens, we have leaked loads of memory. Bailing out time. + _exit(1); + } } return 0; } diff --git a/pdns/distributor.hh b/pdns/distributor.hh index ba3ce0e248..b842bf9a3c 100644 --- a/pdns/distributor.hh +++ b/pdns/distributor.hh @@ -28,13 +28,13 @@ #include #include #include -#include #include #include "logger.hh" #include "dns.hh" #include "dnsbackend.hh" #include "pdnsexception.hh" #include "arguments.hh" +#include #include "statbag.hh" extern StatBag S; @@ -44,33 +44,19 @@ extern StatBag S; Questions are posed to the Distributor, which returns the answer via a callback. - The Distributor takes care that there are enough Backends alive at any one - time and will try to spawn additional ones should they die. - - The Backend needs to count the number of living instances and supply this number to - the Distributor using its numBackends() method. This is silly. - - If an exception escapes a Backend, the distributor retires it. + The Distributor spawns sufficient backends, and if they thrown an exception, + it will cycle the backend but drop the query that was active during the exception. */ -templatestruct AnswerData -{ - Answer *A; -}; template class Distributor { public: static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads - - virtual void cleanup(); - virtual int question(Question *, void (*)(const AnswerData &)) =0; //!< Submit a question to the Distributor - virtual void getQueueSizes(int &questions, int &answers) =0; //!< Returns length of question queue - - virtual int getNumBusy() =0; - + typedef std::function callback_t; + virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor + virtual int getQueueSize() =0; //!< Returns length of question queue virtual bool isOverloaded() =0; -private: }; template class SingleThreadDistributor @@ -78,14 +64,9 @@ template class SingleThreadDistribu { public: SingleThreadDistributor(); - int question(Question *, void (*)(const AnswerData &)); //!< Submit a question to the Distributor - void getQueueSizes(int &questions, int &answers) { - questions = 0; - answers = 0; - } - - int getNumBusy() - { + typedef std::function callback_t; + int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor + int getQueueSize() { return 0; } @@ -98,49 +79,40 @@ public: if (b) delete b; } private: - Backend *b; + Backend *b{0}; }; template class MultiThreadDistributor : public Distributor { public: - MultiThreadDistributor(int n=1); - int question(Question *, void (*)(const AnswerData &)); //!< Submit a question to the Distributor + MultiThreadDistributor(int n); + typedef std::function callback_t; + int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor static void* makeThread(void *); //!< helper function to create our n threads - void getQueueSizes(int &questions, int &answers) { - numquestions.getValue( &questions ); - answers = 0; - } - - int getNumBusy() - { - return d_num_threads-d_idle_threads; + int getQueueSize() override { + return d_queued; } struct QuestionData { Question *Q; - void (*callback)(const AnswerData &); + callback_t callback; int id; }; - bool isOverloaded() + bool isOverloaded() override { return d_overloaded; } private: bool d_overloaded; - std::queue questions; - pthread_mutex_t q_lock; - - Semaphore numquestions; - int nextid; time_t d_last_started; int d_num_threads; - AtomicCounter d_idle_threads; + std::atomic d_queued{0}, d_running{0}; + std::vector> d_pipes; }; //template::nextid; @@ -164,13 +136,18 @@ templateMultiThreadDistributor(this)); @@ -179,89 +156,77 @@ templateMultiThreadDistributorvoid Distributor::cleanup() -{ - L<void *MultiThreadDistributor::makeThread(void *p) { pthread_detach(pthread_self()); + MultiThreadDistributor *us=static_cast(p); + int ournum=us->d_running++; + try { Backend *b=new Backend(); // this will answer our questions - MultiThreadDistributor *us=static_cast(p); - int qcount; - int queuetimeout=::arg().asNum("queue-limit"); - static int overloadQueueLength=::arg().asNum("overload-queue-length"); - for(;;) { - ++(us->d_idle_threads); - - us->numquestions.getValue( &qcount ); - us->numquestions.wait(); - - --(us->d_idle_threads); - pthread_mutex_lock(&us->q_lock); - - QuestionData QD=us->questions.front(); - - us->questions.pop(); - pthread_mutex_unlock(&us->q_lock); - - Question *q=QD.Q; - - - if(us->d_overloaded && qcount <= overloadQueueLength/10) { - us->d_overloaded=false; - } - + for(;;) { + + QuestionData* QD; + if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD)) + unixDie("read"); + --us->d_queued; Answer *a; - if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) { - delete q; + if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) { + delete QD->Q; + delete QD; S.inc("timedout-packets"); continue; } // this is the only point where we interact with the backend (synchronous) try { - a=b->question(q); // a can be NULL! - delete q; + a=b->question(QD->Q); + delete QD->Q; } catch(const PDNSException &e) { L<replyPacket(); + delete b; + b=new Backend(); + a=QD->Q->replyPacket(); + a->setRcode(RCode::ServFail); S.inc("servfail-packets"); - S.ringAccount("servfail-queries",q->qdomain); + S.ringAccount("servfail-queries",QD->Q->qdomain); + + delete QD->Q; } catch(...) { - L<replyPacket(); + L<Q->replyPacket(); + a->setRcode(RCode::ServFail); S.inc("servfail-packets"); - S.ringAccount("servfail-queries",q->qdomain); + S.ringAccount("servfail-queries",QD->Q->qdomain); + delete QD->Q; } - AnswerData AD; - AD.A=a; - - QD.callback(AD); + QD->callback(a); + delete QD; } delete b; } catch(const PDNSException &AE) { - L<int SingleThreadDistributor::question(Question* q, void (*callback)(const AnswerData &)) +templateint SingleThreadDistributor::question(Question* q, callback_t callback) { Answer *a; try { @@ -277,7 +242,7 @@ templateint SingleThreadDistributor S.ringAccount("servfail-queries",q->qdomain); } catch(...) { - L<replyPacket(); @@ -285,66 +250,39 @@ templateint SingleThreadDistributor S.inc("servfail-packets"); S.ringAccount("servfail-queries",q->qdomain); } - AnswerData AD; - AD.A=a; - callback(AD); + callback(a); return 0; } -templateint MultiThreadDistributor::question(Question* q, void (*callback)(const AnswerData &)) +struct DistributorFatal{}; + +templateint MultiThreadDistributor::question(Question* q, callback_t callback) { - // XXX assert callback q=new Question(*q); - DLOG(L<<"Distributor has "<5) { - d_last_started=time(0); - L<<"Distributor misses a thread ("<(this)); - } - - QuestionData QD; - QD.Q=q; - QD.id=nextid++; - QD.callback=callback; - - pthread_mutex_lock(&q_lock); - questions.push(QD); - pthread_mutex_unlock(&q_lock); - - numquestions.post(); + auto QD=new QuestionData(); + QD->Q=q; + auto ret = QD->id = nextid++; // might be deleted after write! + QD->callback=callback; - static int overloadQueueLength=::arg().asNum("overload-queue-length"); + if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) + unixDie("write"); - if(!(nextid%50)) { - int val; - numquestions.getValue( &val ); - - if(!d_overloaded) - d_overloaded = overloadQueueLength && (val > overloadQueueLength); + d_queued++; + + static unsigned int overloadQueueLength=::arg().asNum("overload-queue-length"); + static unsigned int maxQueueLength=::arg().asNum("max-queue-length"); - if(val>::arg().asNum("max-queue-length")) { - L< overloadQueueLength; + if(d_queued > maxQueueLength) { + L< +#include +#include +#include "distributor.hh" +#include "dnspacket.hh" +#include "namespaces.hh" + +BOOST_AUTO_TEST_SUITE(test_distributor_hh) + +struct Question +{ + int q; + DTime d_dt; + string qdomain; + DNSPacket* replyPacket() + { + return new DNSPacket(); + } +}; + +struct Backend +{ + DNSPacket* question(Question*) + { + return new DNSPacket(); + } +}; + +static std::atomic g_receivedAnswers; +static void report(DNSPacket* A) +{ + delete A; + g_receivedAnswers++; +} + +BOOST_AUTO_TEST_CASE(test_distributor_basic) { + ::arg().set("overload-queue-length","Maximum queuelength moving to packetcache only")="0"; + ::arg().set("max-queue-length","Maximum queuelength before considering situation lost")="5000"; + ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500"; + S.declare("servfail-packets","Number of times a server-failed packet was sent out"); + S.declare("timedout-packets", "timedout-packets"); + + auto d=Distributor::Create(2); + + int n; + for(n=0; n < 100; ++n) { + auto q = new Question(); + q->d_dt.set(); + d->question(q, report); + } + sleep(1); + BOOST_CHECK_EQUAL(n, g_receivedAnswers); +}; + +struct BackendSlow +{ + DNSPacket* question(Question*) + { + sleep(1); + return new DNSPacket(); + } +}; + + +BOOST_AUTO_TEST_CASE(test_distributor_queue) { + auto d=Distributor::Create(2); + + BOOST_CHECK_EXCEPTION( { + int n; + for(n=0; n < 6000; ++n) { + auto q = new Question(); + q->d_dt.set(); + d->question(q, report); + } + }, DistributorFatal, [](DistributorFatal) { return true; }); +}; + +struct BackendDies +{ + BackendDies() + { + d_ourcount=s_count++; + } + ~BackendDies() + { + } + DNSPacket* question(Question* q) + { + // cout<<"Q: "<qdomain< s_count; + int d_count{0}; + int d_ourcount; +}; + +std::atomic BackendDies::s_count; + +std::atomic g_receivedAnswers2; + +static void report2(DNSPacket* A) +{ + delete A; + g_receivedAnswers2++; +} + + +BOOST_AUTO_TEST_CASE(test_distributor_dies) { + auto d=Distributor::Create(10); + sleep(1); + g_receivedAnswers=0; + + try { + for(int n=0; n < 100; ++n) { + auto q = new Question(); + q->d_dt.set(); + q->qdomain=std::to_string(n); + d->question(q, report2); + } + + sleep(1); + cerr<<"Queued: "<getQueueSize()<