#include <queue>
#include <vector>
#include <pthread.h>
-#include <semaphore.h>
#include <unistd.h>
#include "logger.hh"
#include "dns.hh"
#include "dnsbackend.hh"
#include "pdnsexception.hh"
#include "arguments.hh"
+#include <atomic>
#include "statbag.hh"
extern StatBag S;
Questions are posed to the Distributor, which returns the answer via a callback.
- The Distributor takes care that there are enough Backends alive at any one
- time and will try to spawn additional ones should they die.
-
- The Backend needs to count the number of living instances and supply this number to
- the Distributor using its numBackends() method. This is silly.
-
- If an exception escapes a Backend, the distributor retires it.
+ The Distributor spawns sufficient backends, and if they thrown an exception,
+ it will cycle the backend but drop the query that was active during the exception.
*/
-template<class Answer>struct AnswerData
-{
- Answer *A;
-};
template<class Answer, class Question, class Backend> class Distributor
{
public:
static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
-
- virtual void cleanup();
- virtual int question(Question *, void (*)(const AnswerData<Answer> &)) =0; //!< Submit a question to the Distributor
- virtual void getQueueSizes(int &questions, int &answers) =0; //!< Returns length of question queue
-
- virtual int getNumBusy() =0;
-
+ typedef std::function<void(Answer*)> callback_t;
+ virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
+ virtual int getQueueSize() =0; //!< Returns length of question queue
virtual bool isOverloaded() =0;
-private:
};
template<class Answer, class Question, class Backend> class SingleThreadDistributor
{
public:
SingleThreadDistributor();
- int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
- void getQueueSizes(int &questions, int &answers) {
- questions = 0;
- answers = 0;
- }
-
- int getNumBusy()
- {
+ typedef std::function<void(Answer*)> callback_t;
+ int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
+ int getQueueSize() {
return 0;
}
if (b) delete b;
}
private:
- Backend *b;
+ Backend *b{0};
};
template<class Answer, class Question, class Backend> class MultiThreadDistributor
: public Distributor<Answer, Question, Backend>
{
public:
- MultiThreadDistributor(int n=1);
- int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
+ MultiThreadDistributor(int n);
+ typedef std::function<void(Answer*)> callback_t;
+ int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
static void* makeThread(void *); //!< helper function to create our n threads
- void getQueueSizes(int &questions, int &answers) {
- numquestions.getValue( &questions );
- answers = 0;
- }
-
- int getNumBusy()
- {
- return d_num_threads-d_idle_threads;
+ int getQueueSize() override {
+ return d_queued;
}
struct QuestionData
{
Question *Q;
- void (*callback)(const AnswerData<Answer> &);
+ callback_t callback;
int id;
};
- bool isOverloaded()
+ bool isOverloaded() override
{
return d_overloaded;
}
private:
bool d_overloaded;
- std::queue<QuestionData> questions;
- pthread_mutex_t q_lock;
-
- Semaphore numquestions;
-
int nextid;
time_t d_last_started;
int d_num_threads;
- AtomicCounter d_idle_threads;
+ std::atomic<unsigned int> d_queued{0}, d_running{0};
+ std::vector<std::pair<int,int>> d_pipes;
};
//template<class Answer, class Question, class Backend>::nextid;
d_overloaded = false;
nextid=0;
- // d_idle_threads=0;
d_last_started=time(0);
-// sem_init(&numquestions,0,0);
- pthread_mutex_init(&q_lock,0);
pthread_t tid;
+
+ for(int i=0; i < n; ++i) {
+ int fds[2];
+ if(pipe(fds) < 0)
+ unixDie("Creating pipe");
+ d_pipes.push_back({fds[0],fds[1]});
+ }
+
L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
for(int i=0;i<n;i++) {
pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
}
-template<class Answer, class Question, class Backend>void Distributor<Answer,Question,Backend>::cleanup()
-{
- L<<Logger::Error<< "Cleaning up distributor" <<endl;
-}
// start of a new thread
template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
{
pthread_detach(pthread_self());
+ MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
+ int ournum=us->d_running++;
+
try {
Backend *b=new Backend(); // this will answer our questions
- MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
- int qcount;
-
int queuetimeout=::arg().asNum("queue-limit");
- static int overloadQueueLength=::arg().asNum("overload-queue-length");
- for(;;) {
- ++(us->d_idle_threads);
-
- us->numquestions.getValue( &qcount );
- us->numquestions.wait();
-
- --(us->d_idle_threads);
- pthread_mutex_lock(&us->q_lock);
-
- QuestionData QD=us->questions.front();
-
- us->questions.pop();
- pthread_mutex_unlock(&us->q_lock);
-
- Question *q=QD.Q;
-
-
- if(us->d_overloaded && qcount <= overloadQueueLength/10) {
- us->d_overloaded=false;
- }
-
+ for(;;) {
+
+ QuestionData* QD;
+ if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
+ unixDie("read");
+ --us->d_queued;
Answer *a;
- if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) {
- delete q;
+ if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
+ delete QD->Q;
+ delete QD;
S.inc("timedout-packets");
continue;
}
// this is the only point where we interact with the backend (synchronous)
try {
- a=b->question(q); // a can be NULL!
- delete q;
+ a=b->question(QD->Q);
+ delete QD->Q;
}
catch(const PDNSException &e) {
L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
- a=q->replyPacket();
+ delete b;
+ b=new Backend();
+ a=QD->Q->replyPacket();
+
a->setRcode(RCode::ServFail);
S.inc("servfail-packets");
- S.ringAccount("servfail-queries",q->qdomain);
+ S.ringAccount("servfail-queries",QD->Q->qdomain);
+
+ delete QD->Q;
}
catch(...) {
- L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
- a=q->replyPacket();
+ L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
+ delete b;
+ b=new Backend();
+ a=QD->Q->replyPacket();
+
a->setRcode(RCode::ServFail);
S.inc("servfail-packets");
- S.ringAccount("servfail-queries",q->qdomain);
+ S.ringAccount("servfail-queries",QD->Q->qdomain);
+ delete QD->Q;
}
- AnswerData<Answer> AD;
- AD.A=a;
-
- QD.callback(AD);
+ QD->callback(a);
+ delete QD;
}
delete b;
}
catch(const PDNSException &AE) {
- L<<Logger::Error<<Logger::NTLog<<"Distributor caught fatal exception: "<<AE.reason<<endl;
+ L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
}
catch(...) {
- L<<Logger::Error<<Logger::NTLog<<"Caught an unknown exception when creating backend, probably"<<endl;
+ L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
}
return 0;
}
-template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
+template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
{
Answer *a;
try {
S.ringAccount("servfail-queries",q->qdomain);
}
catch(...) {
- L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
+ L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
delete b;
b=new Backend;
a=q->replyPacket();
S.inc("servfail-packets");
S.ringAccount("servfail-queries",q->qdomain);
}
- AnswerData<Answer> AD;
- AD.A=a;
- callback(AD);
+ callback(a);
return 0;
}
-template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
+struct DistributorFatal{};
+
+template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
{
- // XXX assert callback
q=new Question(*q);
- DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
-
- /* the line below is a bit difficult.
- What happens is that we have a goal for the number of running distributor threads. Furthermore, other
- parts of PowerDNS also start backends, which get included in this count.
-
- If less than two threads now die, no new ones will be spawned.
-
- The solutionis to add '+2' below, but it is not a pretty solution. Better solution is
- to only account the number of threads within the Distributor, and not in the backend.
-
- XXX FIXME
- */
-
- if(Backend::numRunning() < d_num_threads+2 && time(0)-d_last_started>5) {
- d_last_started=time(0);
- L<<"Distributor misses a thread ("<<Backend::numRunning()<<"<"<<d_num_threads + 2<<"), spawning new one"<<endl;
- pthread_t tid;
- pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
- }
-
- QuestionData QD;
- QD.Q=q;
- QD.id=nextid++;
- QD.callback=callback;
-
- pthread_mutex_lock(&q_lock);
- questions.push(QD);
- pthread_mutex_unlock(&q_lock);
-
- numquestions.post();
+ auto QD=new QuestionData();
+ QD->Q=q;
+ auto ret = QD->id = nextid++; // might be deleted after write!
+ QD->callback=callback;
- static int overloadQueueLength=::arg().asNum("overload-queue-length");
+ if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD))
+ unixDie("write");
- if(!(nextid%50)) {
- int val;
- numquestions.getValue( &val );
-
- if(!d_overloaded)
- d_overloaded = overloadQueueLength && (val > overloadQueueLength);
+ d_queued++;
+
+ static unsigned int overloadQueueLength=::arg().asNum("overload-queue-length");
+ static unsigned int maxQueueLength=::arg().asNum("max-queue-length");
- if(val>::arg().asNum("max-queue-length")) {
- L<<Logger::Error<<val<<" questions waiting for database attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
- _exit(1);
- }
+ if(overloadQueueLength)
+ d_overloaded= d_queued > overloadQueueLength;
+ if(d_queued > maxQueueLength) {
+ L<<Logger::Error<< d_queued <<" questions waiting for database attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
+ // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
+ throw DistributorFatal();
}
-
- return QD.id;
+
+ return ret;
}
#endif // DISTRIBUTOR_HH
--- /dev/null
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_NO_MAIN
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <stdlib.h>
+#include <unistd.h>
+#include <boost/test/unit_test.hpp>
+#include "distributor.hh"
+#include "dnspacket.hh"
+#include "namespaces.hh"
+
+BOOST_AUTO_TEST_SUITE(test_distributor_hh)
+
+struct Question
+{
+ int q;
+ DTime d_dt;
+ string qdomain;
+ DNSPacket* replyPacket()
+ {
+ return new DNSPacket();
+ }
+};
+
+struct Backend
+{
+ DNSPacket* question(Question*)
+ {
+ return new DNSPacket();
+ }
+};
+
+static std::atomic<int> g_receivedAnswers;
+static void report(DNSPacket* A)
+{
+ delete A;
+ g_receivedAnswers++;
+}
+
+BOOST_AUTO_TEST_CASE(test_distributor_basic) {
+ ::arg().set("overload-queue-length","Maximum queuelength moving to packetcache only")="0";
+ ::arg().set("max-queue-length","Maximum queuelength before considering situation lost")="5000";
+ ::arg().set("queue-limit","Maximum number of milliseconds to queue a query")="1500";
+ S.declare("servfail-packets","Number of times a server-failed packet was sent out");
+ S.declare("timedout-packets", "timedout-packets");
+
+ auto d=Distributor<DNSPacket, Question, Backend>::Create(2);
+
+ int n;
+ for(n=0; n < 100; ++n) {
+ auto q = new Question();
+ q->d_dt.set();
+ d->question(q, report);
+ }
+ sleep(1);
+ BOOST_CHECK_EQUAL(n, g_receivedAnswers);
+};
+
+struct BackendSlow
+{
+ DNSPacket* question(Question*)
+ {
+ sleep(1);
+ return new DNSPacket();
+ }
+};
+
+
+BOOST_AUTO_TEST_CASE(test_distributor_queue) {
+ auto d=Distributor<DNSPacket, Question, BackendSlow>::Create(2);
+
+ BOOST_CHECK_EXCEPTION( {
+ int n;
+ for(n=0; n < 6000; ++n) {
+ auto q = new Question();
+ q->d_dt.set();
+ d->question(q, report);
+ }
+ }, DistributorFatal, [](DistributorFatal) { return true; });
+};
+
+struct BackendDies
+{
+ BackendDies()
+ {
+ d_ourcount=s_count++;
+ }
+ ~BackendDies()
+ {
+ }
+ DNSPacket* question(Question* q)
+ {
+ // cout<<"Q: "<<q->qdomain<<endl;
+ if(!d_ourcount && ++d_count == 10) {
+ // cerr<<"Going.. down!"<<endl;
+ throw runtime_error("kill");
+ }
+ return new DNSPacket();
+ }
+ static std::atomic<int> s_count;
+ int d_count{0};
+ int d_ourcount;
+};
+
+std::atomic<int> BackendDies::s_count;
+
+std::atomic<int> g_receivedAnswers2;
+
+static void report2(DNSPacket* A)
+{
+ delete A;
+ g_receivedAnswers2++;
+}
+
+
+BOOST_AUTO_TEST_CASE(test_distributor_dies) {
+ auto d=Distributor<DNSPacket, Question, BackendDies>::Create(10);
+ sleep(1);
+ g_receivedAnswers=0;
+
+ try {
+ for(int n=0; n < 100; ++n) {
+ auto q = new Question();
+ q->d_dt.set();
+ q->qdomain=std::to_string(n);
+ d->question(q, report2);
+ }
+
+ sleep(1);
+ cerr<<"Queued: "<<d->getQueueSize()<<endl;
+ cerr<<"Received: "<<g_receivedAnswers2<<endl;
+ }
+ catch(std::exception& e) {
+ cerr<<e.what()<<endl;
+ }
+ catch(PDNSException &pe) {
+ cerr<<pe.reason<<endl;
+ }
+};
+
+
+
+BOOST_AUTO_TEST_SUITE_END();