From: Remi Gacogne Date: Fri, 23 Jun 2023 13:46:16 +0000 (+0200) Subject: auth: Refactor the MultiThreadDistributor using pdns::channel X-Git-Tag: rec-5.0.0-alpha1~69^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=13bdc4d9627dd28df025a5837ae84004f21cb023;p=thirdparty%2Fpdns.git auth: Refactor the MultiThreadDistributor using pdns::channel --- diff --git a/pdns/distributor.hh b/pdns/distributor.hh index 2f52454ac4..d021cfa0f7 100644 --- a/pdns/distributor.hh +++ b/pdns/distributor.hh @@ -28,9 +28,10 @@ #include #include #include -#include #include "threadname.hh" #include + +#include "channel.hh" #include "logger.hh" #include "dns.hh" #include "dnsbackend.hh" @@ -117,15 +118,16 @@ public: } private: - int nextid; - time_t d_last_started; - unsigned int d_overloadQueueLength, d_maxQueueLength; - int d_num_threads; + std::vector> d_senders; + std::vector> d_receivers; + time_t d_last_started{0}; std::atomic d_queued{0}; - std::vector> d_pipes; + unsigned int d_overloadQueueLength{0}; + unsigned int d_maxQueueLength{0}; + int d_nextid{0}; + int d_num_threads{0}; }; -//template::nextid; template Distributor* Distributor::Create(int n) { if( n == 1 ) @@ -154,29 +156,24 @@ templateSingleThreadDistributorMultiThreadDistributor::MultiThreadDistributor(int n) +templateMultiThreadDistributor::MultiThreadDistributor(int numberOfThreads) : + d_last_started(time(nullptr)), d_overloadQueueLength(::arg().asNum("overload-queue-length")), d_maxQueueLength(::arg().asNum("max-queue-length")), d_num_threads(numberOfThreads) { - d_num_threads=n; - d_overloadQueueLength=::arg().asNum("overload-queue-length"); - d_maxQueueLength=::arg().asNum("max-queue-length"); - nextid=0; - d_last_started=time(0); - - for(int i=0; i < n; ++i) { - int fds[2]; - if(pipe(fds) < 0) - unixDie("Creating pipe"); - d_pipes.emplace_back(fds[0], fds[1]); - } - - if (n<1) { + if (numberOfThreads < 1) { g_log<(false, false); + d_senders.push_back(std::move(sender)); + d_receivers.push_back(std::move(receiver)); + } + + g_log<void MultiThreadDistributor setThreadName("pdns/distributo"); try { - std::unique_ptr b= make_unique(); // this will answer our questions - int queuetimeout=::arg().asNum("queue-limit"); + auto b = make_unique(); // this will answer our questions + int queuetimeout = ::arg().asNum("queue-limit"); + auto& receiver = d_receivers.at(ournum); - for(;;) { - - QuestionData* tempQD = nullptr; - if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD)) + for (;;) { + auto tempQD = receiver.receive(); + if (!tempQD) { unixDie("read"); + } --d_queued; - std::unique_ptr QD = std::unique_ptr(tempQD); - tempQD = nullptr; + auto questionData = std::move(*tempQD); std::unique_ptr a = nullptr; - - if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) { + if (queuetimeout && questionData->Q.d_dt.udiff() > queuetimeout * 1000) { S.inc("timedout-packets"); continue; - } + } - bool allowRetry=true; + bool allowRetry = true; retry: // this is the only point where we interact with the backend (synchronous) try { if (!b) { - allowRetry=false; - b=make_unique(); + allowRetry = false; + b = make_unique(); } - a=b->question(QD->Q); + a = b->question(questionData->Q); } - catch(const PDNSException &e) { + catch (const PDNSException &e) { b.reset(); if (!allowRetry) { g_log<Q.replyPacket(); + a = questionData->Q.replyPacket(); a->setRcode(RCode::ServFail); S.inc("servfail-packets"); - S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype); + S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype); } else { g_log<Q.replyPacket(); + g_log<Q.replyPacket(); a->setRcode(RCode::ServFail); S.inc("servfail-packets"); - S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype); + S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype); } else { - g_log<callback(a, QD->start); + questionData->callback(a, questionData->start); #ifdef ENABLE_GSS_TSIG if (g_doGssTSIG && a != nullptr) { - QD->Q.cleanupGSS(a->d.rcode); + questionData->Q.cleanupGSS(a->d.rcode); } #endif - QD.reset(); + questionData.reset(); } b.reset(); } - catch(const PDNSException &AE) { + catch (const PDNSException &AE) { g_log<setRcode(RCode::ServFail); S.inc("servfail-packets"); S.ringAccount("servfail-queries", q.qdomain, q.qtype); } else { - g_log<int MultiThreadDistributor::question(Question& q, callback_t callback) { // this is passed to other process over pipe and released there - auto QD=new QuestionData(q); - auto ret = QD->id = nextid++; // might be deleted after write! - QD->callback=callback; + auto questionData = std::make_unique(q); + auto ret = questionData->id = d_nextid++; // might be deleted after write! + questionData->callback = callback; ++d_queued; - if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) { + if (!d_senders.at(questionData->id % d_senders.size()).send(std::move(questionData))) { --d_queued; - delete QD; + questionData.reset(); unixDie("write"); } - if(d_queued > d_maxQueueLength) { + if (d_queued > d_maxQueueLength) { g_log<