]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/distributor.hh
Standardize license text in all PDNS files
[thirdparty/pdns.git] / pdns / distributor.hh
CommitLineData
12c86877 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
1258abe0
BH
22#ifndef DISTRIBUTOR_HH
23#define DISTRIBUTOR_HH
24
12c86877
BH
25#include <string>
26#include <deque>
27#include <queue>
28#include <vector>
29#include <pthread.h>
76473b92 30#include <unistd.h>
12c86877
BH
31#include "logger.hh"
32#include "dns.hh"
33#include "dnsbackend.hh"
5c409fa2 34#include "pdnsexception.hh"
092c9cc4 35#include "arguments.hh"
491d03d7 36#include <atomic>
092c9cc4 37#include "statbag.hh"
12c86877 38
092c9cc4 39extern StatBag S;
12c86877
BH
40
41/** the Distributor template class enables you to multithread slow question/answer
42 processes.
43
ad7d9cd0 44 Questions are posed to the Distributor, which returns the answer via a callback.
bdc9f8d2 45
491d03d7 46 The Distributor spawns sufficient backends, and if they thrown an exception,
47 it will cycle the backend but drop the query that was active during the exception.
bdc9f8d2 48*/
ad7d9cd0 49
12c86877
BH
50template<class Answer, class Question, class Backend> class Distributor
51{
52public:
ad7d9cd0 53 static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
491d03d7 54 typedef std::function<void(Answer*)> callback_t;
55 virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
56 virtual int getQueueSize() =0; //!< Returns length of question queue
d70682f7 57 virtual bool isOverloaded() =0;
ad7d9cd0
MZ
58};
59
60template<class Answer, class Question, class Backend> class SingleThreadDistributor
61 : public Distributor<Answer, Question, Backend>
62{
63public:
64 SingleThreadDistributor();
491d03d7 65 typedef std::function<void(Answer*)> callback_t;
66 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
cff3e04d 67 int getQueueSize() override {
ad7d9cd0
MZ
68 return 0;
69 }
70
cff3e04d 71 bool isOverloaded() override
12c86877 72 {
ad7d9cd0
MZ
73 return false;
74 }
75
5d47a576
AT
76 ~SingleThreadDistributor() {
77 if (b) delete b;
78 }
ad7d9cd0 79private:
491d03d7 80 Backend *b{0};
ad7d9cd0
MZ
81};
82
83template<class Answer, class Question, class Backend> class MultiThreadDistributor
84 : public Distributor<Answer, Question, Backend>
85{
86public:
491d03d7 87 MultiThreadDistributor(int n);
88 typedef std::function<void(Answer*)> callback_t;
89 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
12c86877 90 static void* makeThread(void *); //!< helper function to create our n threads
491d03d7 91 int getQueueSize() override {
92 return d_queued;
12c86877
BH
93 }
94
12c86877
BH
95 struct QuestionData
96 {
97 Question *Q;
491d03d7 98 callback_t callback;
12c86877
BH
99 int id;
100 };
101
491d03d7 102 bool isOverloaded() override
e7e691cc 103 {
d3363b40 104 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
e7e691cc 105 }
12c86877
BH
106
107private:
12c86877
BH
108 int nextid;
109 time_t d_last_started;
d3363b40 110 unsigned int d_overloadQueueLength, d_maxQueueLength;
12c86877 111 int d_num_threads;
491d03d7 112 std::atomic<unsigned int> d_queued{0}, d_running{0};
113 std::vector<std::pair<int,int>> d_pipes;
12c86877
BH
114};
115
12c86877 116//template<class Answer, class Question, class Backend>::nextid;
ad7d9cd0
MZ
117template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
118{
119 if( n == 1 )
120 return new SingleThreadDistributor<Answer,Question,Backend>();
121 else
122 return new MultiThreadDistributor<Answer,Question,Backend>( n );
123}
124
125template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
126{
127 L<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
07dbfb4f
KM
128 try {
129 b=new Backend;
130 }
131 catch(const PDNSException &AE) {
132 L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
133 exit(1);
134 }
135 catch(...) {
136 L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
137 exit(1);
138 }
ad7d9cd0 139}
12c86877 140
ad7d9cd0 141template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
12c86877 142{
ad7d9cd0 143 d_num_threads=n;
d3363b40 144 d_overloadQueueLength=::arg().asNum("overload-queue-length");
145 d_maxQueueLength=::arg().asNum("max-queue-length");
49f076e8 146 nextid=0;
12c86877 147 d_last_started=time(0);
12c86877 148
12c86877
BH
149 pthread_t tid;
150
491d03d7 151
152 for(int i=0; i < n; ++i) {
153 int fds[2];
154 if(pipe(fds) < 0)
155 unixDie("Creating pipe");
156 d_pipes.push_back({fds[0],fds[1]});
157 }
158
0469c6ed 159 if (n<1) {
ade4cedf
PD
160 L<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
161 exit(1);
0469c6ed
AT
162 }
163
45fdd612 164 L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
12c86877
BH
165 for(int i=0;i<n;i++) {
166 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
167 Utility::usleep(50000); // we've overloaded mysql in the past :-)
168 }
f4a38014 169 L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
12c86877
BH
170}
171
ad7d9cd0 172
12c86877 173// start of a new thread
ad7d9cd0 174template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
12c86877 175{
a858f79e 176 pthread_detach(pthread_self());
491d03d7 177 MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
178 int ournum=us->d_running++;
179
12c86877
BH
180 try {
181 Backend *b=new Backend(); // this will answer our questions
379ab445 182 int queuetimeout=::arg().asNum("queue-limit");
12c86877 183
491d03d7 184 for(;;) {
185
186 QuestionData* QD;
187 if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
188 unixDie("read");
189 --us->d_queued;
f1ad3c5b 190 Answer *a;
12c86877 191
491d03d7 192 if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
193 delete QD->Q;
194 delete QD;
12c86877
BH
195 S.inc("timedout-packets");
196 continue;
4957a608 197 }
bf17c14b
KM
198
199 bool allowRetry=true;
200retry:
12c86877
BH
201 // this is the only point where we interact with the backend (synchronous)
202 try {
bf17c14b
KM
203 if (!b) {
204 allowRetry=false;
07dbfb4f 205 b=new Backend();
bf17c14b
KM
206 }
207 a=b->question(QD->Q);
208 delete QD->Q;
12c86877 209 }
3f81d239 210 catch(const PDNSException &e) {
bf17c14b 211 delete b;
07dbfb4f 212 b=NULL;
bf17c14b
KM
213 if (!allowRetry) {
214 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
215 a=QD->Q->replyPacket();
216
217 a->setRcode(RCode::ServFail);
218 S.inc("servfail-packets");
74f6d6e1 219 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
bf17c14b
KM
220
221 delete QD->Q;
222 } else {
223 L<<Logger::Error<<"Backend error (retry once): "<<e.reason<<endl;
224 goto retry;
225 }
12c86877
BH
226 }
227 catch(...) {
bf17c14b
KM
228 delete b;
229 b=NULL;
230 if (!allowRetry) {
231 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
232 a=QD->Q->replyPacket();
233
234 a->setRcode(RCode::ServFail);
235 S.inc("servfail-packets");
74f6d6e1 236 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
bf17c14b
KM
237
238 delete QD->Q;
239 } else {
240 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
241 goto retry;
242 }
12c86877
BH
243 }
244
491d03d7 245 QD->callback(a);
246 delete QD;
12c86877 247 }
bf17c14b 248
12c86877
BH
249 delete b;
250 }
3f81d239 251 catch(const PDNSException &AE) {
491d03d7 252 L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
07dbfb4f 253 exit(1);
12c86877
BH
254 }
255 catch(...) {
491d03d7 256 L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
07dbfb4f 257 exit(1);
12c86877
BH
258 }
259 return 0;
260}
261
491d03d7 262template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
12c86877 263{
ad7d9cd0 264 Answer *a;
bf17c14b
KM
265 bool allowRetry=true;
266retry:
ad7d9cd0 267 try {
bf17c14b
KM
268 if (!b) {
269 allowRetry=false;
07dbfb4f 270 b=new Backend;
bf17c14b 271 }
ad7d9cd0
MZ
272 a=b->question(q); // a can be NULL!
273 }
e6577aef 274 catch(const PDNSException &e) {
ad7d9cd0 275 delete b;
07dbfb4f 276 b=NULL;
bf17c14b
KM
277 if (!allowRetry) {
278 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
279 a=q->replyPacket();
280
281 a->setRcode(RCode::ServFail);
282 S.inc("servfail-packets");
74f6d6e1 283 S.ringAccount("servfail-queries",q->qdomain.toLogString());
bf17c14b
KM
284 } else {
285 L<<Logger::Error<<"Backend error (retry once): "<<e.reason<<endl;
286 goto retry;
287 }
12c86877 288 }
ad7d9cd0 289 catch(...) {
ad7d9cd0 290 delete b;
07dbfb4f 291 b=NULL;
bf17c14b
KM
292 if (!allowRetry) {
293 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
294 a=q->replyPacket();
295
296 a->setRcode(RCode::ServFail);
297 S.inc("servfail-packets");
74f6d6e1 298 S.ringAccount("servfail-queries",q->qdomain.toLogString());
bf17c14b
KM
299 } else {
300 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
301 goto retry;
302 }
12c86877 303 }
491d03d7 304 callback(a);
ad7d9cd0
MZ
305 return 0;
306}
307
491d03d7 308struct DistributorFatal{};
309
310template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
ad7d9cd0 311{
ad7d9cd0 312 q=new Question(*q);
12c86877 313
2301afa0 314 // this is passed to other process over pipe and released there
491d03d7 315 auto QD=new QuestionData();
316 QD->Q=q;
317 auto ret = QD->id = nextid++; // might be deleted after write!
318 QD->callback=callback;
e7e691cc 319
491d03d7 320 if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD))
321 unixDie("write");
1258abe0 322
491d03d7 323 d_queued++;
e7e691cc 324
e7e691cc 325
d3363b40 326
327 if(d_queued > d_maxQueueLength) {
3172f9b6 328 L<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
491d03d7 329 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
330 throw DistributorFatal();
1258abe0 331 }
491d03d7 332
333 return ret;
12c86877
BH
334}
335
1258abe0 336#endif // DISTRIBUTOR_HH
12c86877 337