]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/distributor.hh
Merge pull request #7653 from pieterlexis/docker-ignore
[thirdparty/pdns.git] / pdns / distributor.hh
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifndef DISTRIBUTOR_HH
23 #define DISTRIBUTOR_HH
24
25 #include <string>
26 #include <deque>
27 #include <queue>
28 #include <vector>
29 #include <pthread.h>
30 #include "threadname.hh"
31 #include <unistd.h>
32 #include "logger.hh"
33 #include "dns.hh"
34 #include "dnsbackend.hh"
35 #include "pdnsexception.hh"
36 #include "arguments.hh"
37 #include <atomic>
38 #include "statbag.hh"
39
40 extern StatBag S;
41
42 /** the Distributor template class enables you to multithread slow question/answer
43 processes.
44
45 Questions are posed to the Distributor, which returns the answer via a callback.
46
47 The Distributor spawns sufficient backends, and if they thrown an exception,
48 it will cycle the backend but drop the query that was active during the exception.
49 */
50
51 template<class Answer, class Question, class Backend> class Distributor
52 {
53 public:
54 static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
55 typedef std::function<void(Answer*)> callback_t;
56 virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
57 virtual int getQueueSize() =0; //!< Returns length of question queue
58 virtual bool isOverloaded() =0;
59 };
60
61 template<class Answer, class Question, class Backend> class SingleThreadDistributor
62 : public Distributor<Answer, Question, Backend>
63 {
64 public:
65 SingleThreadDistributor(const SingleThreadDistributor&) = delete;
66 void operator=(const SingleThreadDistributor&) = delete;
67 SingleThreadDistributor();
68 typedef std::function<void(Answer*)> callback_t;
69 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
70 int getQueueSize() override {
71 return 0;
72 }
73
74 bool isOverloaded() override
75 {
76 return false;
77 }
78
79 ~SingleThreadDistributor() {
80 if (b) delete b;
81 }
82 private:
83 Backend *b{0};
84 };
85
86 template<class Answer, class Question, class Backend> class MultiThreadDistributor
87 : public Distributor<Answer, Question, Backend>
88 {
89 public:
90 MultiThreadDistributor(const MultiThreadDistributor&) = delete;
91 void operator=(const MultiThreadDistributor&) = delete;
92 MultiThreadDistributor(int n);
93 typedef std::function<void(Answer*)> callback_t;
94 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
95 static void* makeThread(void *); //!< helper function to create our n threads
96 int getQueueSize() override {
97 return d_queued;
98 }
99
100 struct QuestionData
101 {
102 Question *Q;
103 callback_t callback;
104 int id;
105 };
106
107 bool isOverloaded() override
108 {
109 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
110 }
111
112 private:
113 int nextid;
114 time_t d_last_started;
115 unsigned int d_overloadQueueLength, d_maxQueueLength;
116 int d_num_threads;
117 std::atomic<unsigned int> d_queued{0}, d_running{0};
118 std::vector<std::pair<int,int>> d_pipes;
119 };
120
121 //template<class Answer, class Question, class Backend>::nextid;
122 template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
123 {
124 if( n == 1 )
125 return new SingleThreadDistributor<Answer,Question,Backend>();
126 else
127 return new MultiThreadDistributor<Answer,Question,Backend>( n );
128 }
129
130 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
131 {
132 g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
133 try {
134 b=new Backend;
135 }
136 catch(const PDNSException &AE) {
137 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
138 _exit(1);
139 }
140 catch(...) {
141 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
142 _exit(1);
143 }
144 }
145
146 template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
147 {
148 d_num_threads=n;
149 d_overloadQueueLength=::arg().asNum("overload-queue-length");
150 d_maxQueueLength=::arg().asNum("max-queue-length");
151 nextid=0;
152 d_last_started=time(0);
153
154 pthread_t tid;
155
156
157 for(int i=0; i < n; ++i) {
158 int fds[2];
159 if(pipe(fds) < 0)
160 unixDie("Creating pipe");
161 d_pipes.push_back({fds[0],fds[1]});
162 }
163
164 if (n<1) {
165 g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
166 _exit(1);
167 }
168
169 g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
170 for(int i=0;i<n;i++) {
171 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
172 Utility::usleep(50000); // we've overloaded mysql in the past :-)
173 }
174 g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
175 }
176
177
178 // start of a new thread
179 template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
180 {
181 setThreadName("pdns/distributo");
182 pthread_detach(pthread_self());
183 MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
184 int ournum=us->d_running++;
185
186 try {
187 Backend *b=new Backend(); // this will answer our questions
188 int queuetimeout=::arg().asNum("queue-limit");
189
190 for(;;) {
191
192 QuestionData* QD;
193 if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
194 unixDie("read");
195 --us->d_queued;
196 Answer *a;
197
198 if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
199 delete QD->Q;
200 delete QD;
201 S.inc("timedout-packets");
202 continue;
203 }
204
205 bool allowRetry=true;
206 retry:
207 // this is the only point where we interact with the backend (synchronous)
208 try {
209 if (!b) {
210 allowRetry=false;
211 b=new Backend();
212 }
213 a=b->question(QD->Q);
214 delete QD->Q;
215 }
216 catch(const PDNSException &e) {
217 delete b;
218 b=NULL;
219 if (!allowRetry) {
220 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
221 a=QD->Q->replyPacket();
222
223 a->setRcode(RCode::ServFail);
224 S.inc("servfail-packets");
225 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
226
227 delete QD->Q;
228 } else {
229 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
230 goto retry;
231 }
232 }
233 catch(...) {
234 delete b;
235 b=NULL;
236 if (!allowRetry) {
237 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
238 a=QD->Q->replyPacket();
239
240 a->setRcode(RCode::ServFail);
241 S.inc("servfail-packets");
242 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
243
244 delete QD->Q;
245 } else {
246 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
247 goto retry;
248 }
249 }
250
251 QD->callback(a);
252 delete QD;
253 }
254
255 delete b;
256 }
257 catch(const PDNSException &AE) {
258 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
259 _exit(1);
260 }
261 catch(...) {
262 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
263 _exit(1);
264 }
265 return 0;
266 }
267
268 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
269 {
270 Answer *a;
271 bool allowRetry=true;
272 retry:
273 try {
274 if (!b) {
275 allowRetry=false;
276 b=new Backend;
277 }
278 a=b->question(q); // a can be NULL!
279 }
280 catch(const PDNSException &e) {
281 delete b;
282 b=NULL;
283 if (!allowRetry) {
284 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
285 a=q->replyPacket();
286
287 a->setRcode(RCode::ServFail);
288 S.inc("servfail-packets");
289 S.ringAccount("servfail-queries",q->qdomain.toLogString());
290 } else {
291 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
292 goto retry;
293 }
294 }
295 catch(...) {
296 delete b;
297 b=NULL;
298 if (!allowRetry) {
299 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
300 a=q->replyPacket();
301
302 a->setRcode(RCode::ServFail);
303 S.inc("servfail-packets");
304 S.ringAccount("servfail-queries",q->qdomain.toLogString());
305 } else {
306 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
307 goto retry;
308 }
309 }
310 callback(a);
311 return 0;
312 }
313
314 struct DistributorFatal{};
315
316 template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
317 {
318 q=new Question(*q);
319
320 // this is passed to other process over pipe and released there
321 auto QD=new QuestionData();
322 QD->Q=q;
323 auto ret = QD->id = nextid++; // might be deleted after write!
324 QD->callback=callback;
325
326 ++d_queued;
327 if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) {
328 --d_queued;
329 unixDie("write");
330 }
331
332 if(d_queued > d_maxQueueLength) {
333 g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
334 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
335 throw DistributorFatal();
336 }
337
338 return ret;
339 }
340
341 #endif // DISTRIBUTOR_HH
342