#include <queue>
#include <vector>
#include <thread>
-#include <pthread.h>
#include "threadname.hh"
#include <unistd.h>
+
+#include "channel.hh"
#include "logger.hh"
#include "dns.hh"
#include "dnsbackend.hh"
}
private:
- int nextid;
- time_t d_last_started;
- unsigned int d_overloadQueueLength, d_maxQueueLength;
- int d_num_threads;
+ std::vector<pdns::channel::Sender<QuestionData>> d_senders;
+ std::vector<pdns::channel::Receiver<QuestionData>> d_receivers;
+ time_t d_last_started{0};
std::atomic<unsigned int> d_queued{0};
- std::vector<std::pair<int,int>> d_pipes;
+ unsigned int d_overloadQueueLength{0};
+ unsigned int d_maxQueueLength{0};
+ int d_nextid{0};
+ int d_num_threads{0};
};
-//template<class Answer, class Question, class Backend>::nextid;
template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
{
if( n == 1 )
}
}
-template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
+template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int numberOfThreads) :
+ d_last_started(time(nullptr)), d_overloadQueueLength(::arg().asNum("overload-queue-length")), d_maxQueueLength(::arg().asNum("max-queue-length")), d_num_threads(numberOfThreads)
{
- d_num_threads=n;
- d_overloadQueueLength=::arg().asNum("overload-queue-length");
- d_maxQueueLength=::arg().asNum("max-queue-length");
- nextid=0;
- d_last_started=time(0);
-
- for(int i=0; i < n; ++i) {
- int fds[2];
- if(pipe(fds) < 0)
- unixDie("Creating pipe");
- d_pipes.emplace_back(fds[0], fds[1]);
- }
-
- if (n<1) {
+ if (numberOfThreads < 1) {
g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
_exit(1);
}
- g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
- for(int i=0;i<n;i++) {
- std::thread t([=](){distribute(i);});
+ for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
+ auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(false, false);
+ d_senders.push_back(std::move(sender));
+ d_receivers.push_back(std::move(receiver));
+ }
+
+ g_log<<Logger::Warning<<"About to create "<<numberOfThreads<<" backend threads for UDP"<<endl;
+
+ for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
+ std::thread t([=](){distribute(distributorIdx);});
t.detach();
Utility::usleep(50000); // we've overloaded mysql in the past :-)
}
setThreadName("pdns/distributo");
try {
- std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
- int queuetimeout=::arg().asNum("queue-limit");
+ auto b = make_unique<Backend>(); // this will answer our questions
+ int queuetimeout = ::arg().asNum("queue-limit");
+ auto& receiver = d_receivers.at(ournum);
- for(;;) {
-
- QuestionData* tempQD = nullptr;
- if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
+ for (;;) {
+ auto tempQD = receiver.receive();
+ if (!tempQD) {
unixDie("read");
+ }
--d_queued;
- std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
- tempQD = nullptr;
+ auto questionData = std::move(*tempQD);
std::unique_ptr<Answer> a = nullptr;
-
- if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) {
+ if (queuetimeout && questionData->Q.d_dt.udiff() > queuetimeout * 1000) {
S.inc("timedout-packets");
continue;
- }
+ }
- bool allowRetry=true;
+ bool allowRetry = true;
retry:
// this is the only point where we interact with the backend (synchronous)
try {
if (!b) {
- allowRetry=false;
- b=make_unique<Backend>();
+ allowRetry = false;
+ b = make_unique<Backend>();
}
- a=b->question(QD->Q);
+ a = b->question(questionData->Q);
}
- catch(const PDNSException &e) {
+ catch (const PDNSException &e) {
b.reset();
if (!allowRetry) {
g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
- a=QD->Q.replyPacket();
+ a = questionData->Q.replyPacket();
a->setRcode(RCode::ServFail);
S.inc("servfail-packets");
- S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
+ S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
} else {
g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
goto retry;
}
}
- catch(...) {
+ catch (...) {
b.reset();
if (!allowRetry) {
- g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
- a=QD->Q.replyPacket();
+ g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
+ a = questionData->Q.replyPacket();
a->setRcode(RCode::ServFail);
S.inc("servfail-packets");
- S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
+ S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
} else {
- g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
+ g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
goto retry;
}
}
- QD->callback(a, QD->start);
+ questionData->callback(a, questionData->start);
#ifdef ENABLE_GSS_TSIG
if (g_doGssTSIG && a != nullptr) {
- QD->Q.cleanupGSS(a->d.rcode);
+ questionData->Q.cleanupGSS(a->d.rcode);
}
#endif
- QD.reset();
+ questionData.reset();
}
b.reset();
}
- catch(const PDNSException &AE) {
+ catch (const PDNSException &AE) {
g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
_exit(1);
}
- catch(const std::exception& e) {
+ catch (const std::exception& e) {
g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
_exit(1);
}
- catch(...) {
+ catch (...) {
g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
_exit(1);
}
catch(...) {
b.reset();
if (!allowRetry) {
- g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
+ g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
a=q.replyPacket();
a->setRcode(RCode::ServFail);
S.inc("servfail-packets");
S.ringAccount("servfail-queries", q.qdomain, q.qtype);
} else {
- g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
+ g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
goto retry;
}
}
template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
{
// this is passed to other process over pipe and released there
- auto QD=new QuestionData(q);
- auto ret = QD->id = nextid++; // might be deleted after write!
- QD->callback=callback;
+ auto questionData = std::make_unique<QuestionData>(q);
+ auto ret = questionData->id = d_nextid++; // might be deleted after write!
+ questionData->callback = callback;
++d_queued;
- if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) {
+ if (!d_senders.at(questionData->id % d_senders.size()).send(std::move(questionData))) {
--d_queued;
- delete QD;
+ questionData.reset();
unixDie("write");
}
- if(d_queued > d_maxQueueLength) {
+ if (d_queued > d_maxQueueLength) {
g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
// this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
throw DistributorFatal();