2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
28 #include "threadname.hh"
32 #include "dnsbackend.hh"
33 #include "pdnsexception.hh"
34 #include "arguments.hh"
40 /** the Distributor template class enables you to multithread slow question/answer
43 Questions are posed to the Distributor, which returns the answer via a callback.
45 The Distributor spawns sufficient backends, and if they thrown an exception,
46 it will cycle the backend but drop the query that was active during the exception.
49 template<class Answer, class Question, class Backend> class Distributor
52 static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
53 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
54 virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
55 virtual int getQueueSize() =0; //!< Returns length of question queue
56 virtual bool isOverloaded() =0;
57 virtual ~Distributor() { cerr<<__func__<<endl;}
60 template<class Answer, class Question, class Backend> class SingleThreadDistributor
61 : public Distributor<Answer, Question, Backend>
64 SingleThreadDistributor(const SingleThreadDistributor&) = delete;
65 void operator=(const SingleThreadDistributor&) = delete;
66 SingleThreadDistributor();
67 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
68 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
69 int getQueueSize() override {
73 bool isOverloaded() override
79 std::unique_ptr<Backend> b{nullptr};
82 template<class Answer, class Question, class Backend> class MultiThreadDistributor
83 : public Distributor<Answer, Question, Backend>
86 MultiThreadDistributor(const MultiThreadDistributor&) = delete;
87 void operator=(const MultiThreadDistributor&) = delete;
88 MultiThreadDistributor(int n);
89 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
90 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
91 static void* makeThread(void *); //!< helper function to create our n threads
92 int getQueueSize() override {
98 QuestionData(const Question& query): Q(query)
107 bool isOverloaded() override
109 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
114 time_t d_last_started;
115 unsigned int d_overloadQueueLength, d_maxQueueLength;
117 std::atomic<unsigned int> d_queued{0}, d_running{0};
118 std::vector<std::pair<int,int>> d_pipes;
121 //template<class Answer, class Question, class Backend>::nextid;
122 template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
125 return new SingleThreadDistributor<Answer,Question,Backend>();
127 return new MultiThreadDistributor<Answer,Question,Backend>( n );
130 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
132 g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
134 b=make_unique<Backend>();
136 catch(const PDNSException &AE) {
137 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
141 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
146 template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
149 d_overloadQueueLength=::arg().asNum("overload-queue-length");
150 d_maxQueueLength=::arg().asNum("max-queue-length");
152 d_last_started=time(0);
157 for(int i=0; i < n; ++i) {
160 unixDie("Creating pipe");
161 d_pipes.push_back({fds[0],fds[1]});
165 g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
169 g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
170 for(int i=0;i<n;i++) {
171 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
172 Utility::usleep(50000); // we've overloaded mysql in the past :-)
174 g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
178 // start of a new thread
179 template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
181 setThreadName("pdns/distributo");
182 pthread_detach(pthread_self());
183 MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
184 int ournum=us->d_running++;
187 std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
188 int queuetimeout=::arg().asNum("queue-limit");
192 QuestionData* tempQD = nullptr;
193 if(read(us->d_pipes[ournum].first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
196 std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
198 std::unique_ptr<Answer> a = nullptr;
200 if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) {
201 S.inc("timedout-packets");
205 bool allowRetry=true;
207 // this is the only point where we interact with the backend (synchronous)
211 b=make_unique<Backend>();
213 a=b->question(QD->Q);
215 catch(const PDNSException &e) {
218 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
219 a=QD->Q.replyPacket();
221 a->setRcode(RCode::ServFail);
222 S.inc("servfail-packets");
223 S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
225 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
232 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
233 a=QD->Q.replyPacket();
235 a->setRcode(RCode::ServFail);
236 S.inc("servfail-packets");
237 S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
239 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
250 catch(const PDNSException &AE) {
251 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
255 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
261 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
263 std::unique_ptr<Answer> a = nullptr;
264 bool allowRetry=true;
269 b=make_unique<Backend>();
271 a=b->question(q); // a can be NULL!
273 catch(const PDNSException &e) {
276 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
279 a->setRcode(RCode::ServFail);
280 S.inc("servfail-packets");
281 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
283 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
290 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
293 a->setRcode(RCode::ServFail);
294 S.inc("servfail-packets");
295 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
297 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
305 struct DistributorFatal{};
307 template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
309 // this is passed to other process over pipe and released there
310 auto QD=new QuestionData(q);
311 auto ret = QD->id = nextid++; // might be deleted after write!
312 QD->callback=callback;
315 if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) {
321 if(d_queued > d_maxQueueLength) {
322 g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
323 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
324 throw DistributorFatal();