]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/distributor.hh
Merge pull request #5523 from rubenk/fix-typos-in-logmessage
[thirdparty/pdns.git] / pdns / distributor.hh
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifndef DISTRIBUTOR_HH
23 #define DISTRIBUTOR_HH
24
25 #include <string>
26 #include <deque>
27 #include <queue>
28 #include <vector>
29 #include <pthread.h>
30 #include <unistd.h>
31 #include "logger.hh"
32 #include "dns.hh"
33 #include "dnsbackend.hh"
34 #include "pdnsexception.hh"
35 #include "arguments.hh"
36 #include <atomic>
37 #include "statbag.hh"
38
39 extern StatBag S;
40
41 /** the Distributor template class enables you to multithread slow question/answer
42 processes.
43
44 Questions are posed to the Distributor, which returns the answer via a callback.
45
46 The Distributor spawns sufficient backends, and if they thrown an exception,
47 it will cycle the backend but drop the query that was active during the exception.
48 */
49
50 template<class Answer, class Question, class Backend> class Distributor
51 {
52 public:
53 static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
54 typedef std::function<void(Answer*)> callback_t;
55 virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor
56 virtual int getQueueSize() =0; //!< Returns length of question queue
57 virtual bool isOverloaded() =0;
58 };
59
60 template<class Answer, class Question, class Backend> class SingleThreadDistributor
61 : public Distributor<Answer, Question, Backend>
62 {
63 public:
64 SingleThreadDistributor();
65 typedef std::function<void(Answer*)> callback_t;
66 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
67 int getQueueSize() override {
68 return 0;
69 }
70
71 bool isOverloaded() override
72 {
73 return false;
74 }
75
76 ~SingleThreadDistributor() {
77 if (b) delete b;
78 }
79 private:
80 Backend *b{0};
81 };
82
83 template<class Answer, class Question, class Backend> class MultiThreadDistributor
84 : public Distributor<Answer, Question, Backend>
85 {
86 public:
87 MultiThreadDistributor(int n);
88 typedef std::function<void(Answer*)> callback_t;
89 int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor
90 static void* makeThread(void *); //!< helper function to create our n threads
91 int getQueueSize() override {
92 return d_queued;
93 }
94
95 struct QuestionData
96 {
97 Question *Q;
98 callback_t callback;
99 int id;
100 };
101
102 bool isOverloaded() override
103 {
104 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
105 }
106
107 private:
108 int nextid;
109 time_t d_last_started;
110 unsigned int d_overloadQueueLength, d_maxQueueLength;
111 int d_num_threads;
112 std::atomic<unsigned int> d_queued{0}, d_running{0};
113 std::vector<std::pair<int,int>> d_pipes;
114 };
115
116 //template<class Answer, class Question, class Backend>::nextid;
117 template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
118 {
119 if( n == 1 )
120 return new SingleThreadDistributor<Answer,Question,Backend>();
121 else
122 return new MultiThreadDistributor<Answer,Question,Backend>( n );
123 }
124
125 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
126 {
127 L<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
128 try {
129 b=new Backend;
130 }
131 catch(const PDNSException &AE) {
132 L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
133 exit(1);
134 }
135 catch(...) {
136 L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
137 exit(1);
138 }
139 }
140
141 template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
142 {
143 d_num_threads=n;
144 d_overloadQueueLength=::arg().asNum("overload-queue-length");
145 d_maxQueueLength=::arg().asNum("max-queue-length");
146 nextid=0;
147 d_last_started=time(0);
148
149 pthread_t tid;
150
151
152 for(int i=0; i < n; ++i) {
153 int fds[2];
154 if(pipe(fds) < 0)
155 unixDie("Creating pipe");
156 d_pipes.push_back({fds[0],fds[1]});
157 }
158
159 if (n<1) {
160 L<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
161 exit(1);
162 }
163
164 L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
165 for(int i=0;i<n;i++) {
166 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
167 Utility::usleep(50000); // we've overloaded mysql in the past :-)
168 }
169 L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
170 }
171
172
173 // start of a new thread
174 template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
175 {
176 pthread_detach(pthread_self());
177 MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
178 int ournum=us->d_running++;
179
180 try {
181 Backend *b=new Backend(); // this will answer our questions
182 int queuetimeout=::arg().asNum("queue-limit");
183
184 for(;;) {
185
186 QuestionData* QD;
187 if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD))
188 unixDie("read");
189 --us->d_queued;
190 Answer *a;
191
192 if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) {
193 delete QD->Q;
194 delete QD;
195 S.inc("timedout-packets");
196 continue;
197 }
198
199 bool allowRetry=true;
200 retry:
201 // this is the only point where we interact with the backend (synchronous)
202 try {
203 if (!b) {
204 allowRetry=false;
205 b=new Backend();
206 }
207 a=b->question(QD->Q);
208 delete QD->Q;
209 }
210 catch(const PDNSException &e) {
211 delete b;
212 b=NULL;
213 if (!allowRetry) {
214 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
215 a=QD->Q->replyPacket();
216
217 a->setRcode(RCode::ServFail);
218 S.inc("servfail-packets");
219 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
220
221 delete QD->Q;
222 } else {
223 L<<Logger::Error<<"Backend error (retry once): "<<e.reason<<endl;
224 goto retry;
225 }
226 }
227 catch(...) {
228 delete b;
229 b=NULL;
230 if (!allowRetry) {
231 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
232 a=QD->Q->replyPacket();
233
234 a->setRcode(RCode::ServFail);
235 S.inc("servfail-packets");
236 S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString());
237
238 delete QD->Q;
239 } else {
240 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
241 goto retry;
242 }
243 }
244
245 QD->callback(a);
246 delete QD;
247 }
248
249 delete b;
250 }
251 catch(const PDNSException &AE) {
252 L<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
253 exit(1);
254 }
255 catch(...) {
256 L<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
257 exit(1);
258 }
259 return 0;
260 }
261
262 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
263 {
264 Answer *a;
265 bool allowRetry=true;
266 retry:
267 try {
268 if (!b) {
269 allowRetry=false;
270 b=new Backend;
271 }
272 a=b->question(q); // a can be NULL!
273 }
274 catch(const PDNSException &e) {
275 delete b;
276 b=NULL;
277 if (!allowRetry) {
278 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
279 a=q->replyPacket();
280
281 a->setRcode(RCode::ServFail);
282 S.inc("servfail-packets");
283 S.ringAccount("servfail-queries",q->qdomain.toLogString());
284 } else {
285 L<<Logger::Error<<"Backend error (retry once): "<<e.reason<<endl;
286 goto retry;
287 }
288 }
289 catch(...) {
290 delete b;
291 b=NULL;
292 if (!allowRetry) {
293 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
294 a=q->replyPacket();
295
296 a->setRcode(RCode::ServFail);
297 S.inc("servfail-packets");
298 S.ringAccount("servfail-queries",q->qdomain.toLogString());
299 } else {
300 L<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
301 goto retry;
302 }
303 }
304 callback(a);
305 return 0;
306 }
307
308 struct DistributorFatal{};
309
310 template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback)
311 {
312 q=new Question(*q);
313
314 // this is passed to other process over pipe and released there
315 auto QD=new QuestionData();
316 QD->Q=q;
317 auto ret = QD->id = nextid++; // might be deleted after write!
318 QD->callback=callback;
319
320 if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD))
321 unixDie("write");
322
323 d_queued++;
324
325
326
327 if(d_queued > d_maxQueueLength) {
328 L<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
329 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
330 throw DistributorFatal();
331 }
332
333 return ret;
334 }
335
336 #endif // DISTRIBUTOR_HH
337