]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/distributor.hh
Merge pull request #7628 from tcely/patch-3
[thirdparty/pdns.git] / pdns / distributor.hh
CommitLineData
12c86877 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
1258abe0
BH
22#ifndef DISTRIBUTOR_HH
23#define DISTRIBUTOR_HH
24
12c86877
BH
25#include <string>
26#include <deque>
27#include <queue>
28#include <vector>
29#include <pthread.h>
519f5484 30#include "threadname.hh"
76473b92 31#include <unistd.h>
12c86877
BH
32#include "logger.hh"
33#include "dns.hh"
34#include "dnsbackend.hh"
5c409fa2 35#include "pdnsexception.hh"
092c9cc4 36#include "arguments.hh"
491d03d7 37#include <atomic>
092c9cc4 38#include "statbag.hh"
12c86877 39
092c9cc4 40extern StatBag S;
12c86877
BH
41
42/** the Distributor template class enables you to multithread slow question/answer
43 processes.
44
ad7d9cd0 45 Questions are posed to the Distributor, which returns the answer via a callback.
bdc9f8d2 46
491d03d7 47 The Distributor spawns sufficient backends, and if they thrown an exception,
48 it will cycle the backend but drop the query that was active during the exception.
bdc9f8d2 49*/
ad7d9cd0 50
12c86877
BH
51template<class Answer, class Question, class Backend> class Distributor
52{
53public:
ad7d9cd0 54 static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
491d03d7 55 typedef std::function<void(Answer*)> callback_t;
56 virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
57 virtual int getQueueSize() =0; //!< Returns length of question queue
d70682f7 58 virtual bool isOverloaded() =0;
ad7d9cd0
MZ
59};
60
61template<class Answer, class Question, class Backend> class SingleThreadDistributor
62 : public Distributor<Answer, Question, Backend>
63{
64public:
f6c19c4f
CHB
65 SingleThreadDistributor(const SingleThreadDistributor&) = delete;
66 void operator=(const SingleThreadDistributor&) = delete;
ad7d9cd0 67 SingleThreadDistributor();
491d03d7 68 typedef std::function<void(Answer*)> callback_t;
69 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
cff3e04d 70 int getQueueSize() override {
ad7d9cd0
MZ
71 return 0;
72 }
73
cff3e04d 74 bool isOverloaded() override
12c86877 75 {
ad7d9cd0
MZ
76 return false;
77 }
78
5d47a576
AT
79 ~SingleThreadDistributor() {
80 if (b) delete b;
81 }
ad7d9cd0 82private:
491d03d7 83 Backend *b{0};
ad7d9cd0
MZ
84};
85
86template<class Answer, class Question, class Backend> class MultiThreadDistributor
87 : public Distributor<Answer, Question, Backend>
88{
89public:
f6c19c4f
CHB
90 MultiThreadDistributor(const MultiThreadDistributor&) = delete;
91 void operator=(const MultiThreadDistributor&) = delete;
491d03d7 92 MultiThreadDistributor(int n);
93 typedef std::function<void(Answer*)> callback_t;
94 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
12c86877 95 static void* makeThread(void *); //!< helper function to create our n threads
491d03d7 96 int getQueueSize() override {
97 return d_queued;
12c86877
BH
98 }
99
12c86877
BH
100 struct QuestionData
101 {
102 Question *Q;
491d03d7 103 callback_t callback;
12c86877
BH
104 int id;
105 };
106
491d03d7 107 bool isOverloaded() override
e7e691cc 108 {
d3363b40 109 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
e7e691cc 110 }
12c86877
BH
111
112private:
12c86877
BH
113 int nextid;
114 time_t d_last_started;
d3363b40 115 unsigned int d_overloadQueueLength, d_maxQueueLength;
12c86877 116 int d_num_threads;
491d03d7 117 std::atomic<unsigned int> d_queued{0}, d_running{0};
118 std::vector<std::pair<int,int>> d_pipes;
12c86877
BH
119};
120
12c86877 121//template<class Answer, class Question, class Backend>::nextid;
ad7d9cd0
MZ
122template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
123{
124 if( n == 1 )
125 return new SingleThreadDistributor<Answer,Question,Backend>();
126 else
127 return new MultiThreadDistributor<Answer,Question,Backend>( n );
128}
129
130template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
131{
e6a9dde5 132 g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
07dbfb4f
KM
133 try {
134 b=new Backend;
135 }
136 catch(const PDNSException &AE) {
e6a9dde5 137 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
dc386240 138 _exit(1);
07dbfb4f
KM
139 }
140 catch(...) {
e6a9dde5 141 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
dc386240 142 _exit(1);
07dbfb4f 143 }
ad7d9cd0 144}
12c86877 145
ad7d9cd0 146template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
12c86877 147{
ad7d9cd0 148 d_num_threads=n;
d3363b40 149 d_overloadQueueLength=::arg().asNum("overload-queue-length");
150 d_maxQueueLength=::arg().asNum("max-queue-length");
49f076e8 151 nextid=0;
12c86877 152 d_last_started=time(0);
12c86877 153
12c86877
BH
154 pthread_t tid;
155
491d03d7 156
157 for(int i=0; i < n; ++i) {
158 int fds[2];
159 if(pipe(fds) < 0)
160 unixDie("Creating pipe");
161 d_pipes.push_back({fds[0],fds[1]});
162 }
163
0469c6ed 164 if (n<1) {
e6a9dde5 165 g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
dc386240 166 _exit(1);
0469c6ed
AT
167 }
168
e6a9dde5 169 g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
12c86877
BH
170 for(int i=0;i<n;i++) {
171 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
172 Utility::usleep(50000); // we've overloaded mysql in the past :-)
173 }
e6a9dde5 174 g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
12c86877
BH
175}
176
ad7d9cd0 177
12c86877 178// start of a new thread
ad7d9cd0 179template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
12c86877 180{
519f5484 181 setThreadName("pdns/distributo");
a858f79e 182 pthread_detach(pthread_self());
491d03d7 183 MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
184 int ournum=us->d_running++;
185
12c86877
BH
186 try {
187 Backend *b=new Backend(); // this will answer our questions
379ab445 188 int queuetimeout=::arg().asNum("queue-limit");
12c86877 189
491d03d7 190 for(;;) {
191
192 QuestionData* QD;
193 if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
194 unixDie("read");
195 --us->d_queued;
f1ad3c5b 196 Answer *a;
12c86877 197
491d03d7 198 if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
199 delete QD->Q;
200 delete QD;
12c86877
BH
201 S.inc("timedout-packets");
202 continue;
4957a608 203 }
bf17c14b
KM
204
205 bool allowRetry=true;
206retry:
12c86877
BH
207 // this is the only point where we interact with the backend (synchronous)
208 try {
bf17c14b
KM
209 if (!b) {
210 allowRetry=false;
07dbfb4f 211 b=new Backend();
bf17c14b
KM
212 }
213 a=b->question(QD->Q);
214 delete QD->Q;
12c86877 215 }
3f81d239 216 catch(const PDNSException &e) {
bf17c14b 217 delete b;
07dbfb4f 218 b=NULL;
bf17c14b 219 if (!allowRetry) {
e6a9dde5 220 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
bf17c14b
KM
221 a=QD->Q->replyPacket();
222
223 a->setRcode(RCode::ServFail);
224 S.inc("servfail-packets");
74f6d6e1 225 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
bf17c14b
KM
226
227 delete QD->Q;
228 } else {
e6a9dde5 229 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
bf17c14b
KM
230 goto retry;
231 }
12c86877
BH
232 }
233 catch(...) {
bf17c14b
KM
234 delete b;
235 b=NULL;
236 if (!allowRetry) {
e6a9dde5 237 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
bf17c14b
KM
238 a=QD->Q->replyPacket();
239
240 a->setRcode(RCode::ServFail);
241 S.inc("servfail-packets");
74f6d6e1 242 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
bf17c14b
KM
243
244 delete QD->Q;
245 } else {
e6a9dde5 246 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
bf17c14b
KM
247 goto retry;
248 }
12c86877
BH
249 }
250
491d03d7 251 QD->callback(a);
252 delete QD;
12c86877 253 }
bf17c14b 254
12c86877
BH
255 delete b;
256 }
3f81d239 257 catch(const PDNSException &AE) {
e6a9dde5 258 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
dc386240 259 _exit(1);
12c86877
BH
260 }
261 catch(...) {
e6a9dde5 262 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
dc386240 263 _exit(1);
12c86877
BH
264 }
265 return 0;
266}
267
491d03d7 268template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
12c86877 269{
ad7d9cd0 270 Answer *a;
bf17c14b
KM
271 bool allowRetry=true;
272retry:
ad7d9cd0 273 try {
bf17c14b
KM
274 if (!b) {
275 allowRetry=false;
07dbfb4f 276 b=new Backend;
bf17c14b 277 }
ad7d9cd0
MZ
278 a=b->question(q); // a can be NULL!
279 }
e6577aef 280 catch(const PDNSException &e) {
ad7d9cd0 281 delete b;
07dbfb4f 282 b=NULL;
bf17c14b 283 if (!allowRetry) {
e6a9dde5 284 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
bf17c14b
KM
285 a=q->replyPacket();
286
287 a->setRcode(RCode::ServFail);
288 S.inc("servfail-packets");
74f6d6e1 289 S.ringAccount("servfail-queries",q->qdomain.toLogString());
bf17c14b 290 } else {
e6a9dde5 291 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
bf17c14b
KM
292 goto retry;
293 }
12c86877 294 }
ad7d9cd0 295 catch(...) {
ad7d9cd0 296 delete b;
07dbfb4f 297 b=NULL;
bf17c14b 298 if (!allowRetry) {
e6a9dde5 299 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
bf17c14b
KM
300 a=q->replyPacket();
301
302 a->setRcode(RCode::ServFail);
303 S.inc("servfail-packets");
74f6d6e1 304 S.ringAccount("servfail-queries",q->qdomain.toLogString());
bf17c14b 305 } else {
e6a9dde5 306 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
bf17c14b
KM
307 goto retry;
308 }
12c86877 309 }
491d03d7 310 callback(a);
ad7d9cd0
MZ
311 return 0;
312}
313
491d03d7 314struct DistributorFatal{};
315
316template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
ad7d9cd0 317{
ad7d9cd0 318 q=new Question(*q);
12c86877 319
2301afa0 320 // this is passed to other process over pipe and released there
491d03d7 321 auto QD=new QuestionData();
322 QD->Q=q;
323 auto ret = QD->id = nextid++; // might be deleted after write!
324 QD->callback=callback;
e7e691cc 325
101112ad
RG
326 ++d_queued;
327 if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) {
328 --d_queued;
329 unixDie("write");
330 }
d3363b40 331
332 if(d_queued > d_maxQueueLength) {
e6a9dde5 333 g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
491d03d7 334 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
335 throw DistributorFatal();
1258abe0 336 }
491d03d7 337
338 return ret;
12c86877
BH
339}
340
1258abe0 341#endif // DISTRIBUTOR_HH
12c86877 342