]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/distributor.hh
Merge pull request #9084 from pieterlexis/dnsdist_latency_prometheus_help
[thirdparty/pdns.git] / pdns / distributor.hh
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #pragma once
23 #include <string>
24 #include <deque>
25 #include <queue>
26 #include <vector>
27 #include <thread>
28 #include <pthread.h>
29 #include "threadname.hh"
30 #include <unistd.h>
31 #include "logger.hh"
32 #include "dns.hh"
33 #include "dnsbackend.hh"
34 #include "pdnsexception.hh"
35 #include "arguments.hh"
36 #include <atomic>
37 #include "statbag.hh"
38
39 extern StatBag S;
40
41 /** the Distributor template class enables you to multithread slow question/answer
42 processes.
43
44 Questions are posed to the Distributor, which returns the answer via a callback.
45
46 The Distributor spawns sufficient backends, and if they thrown an exception,
47 it will cycle the backend but drop the query that was active during the exception.
48 */
49
50 template<class Answer, class Question, class Backend> class Distributor
51 {
52 public:
53 static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
54 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
55 virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
56 virtual int getQueueSize() =0; //!< Returns length of question queue
57 virtual bool isOverloaded() =0;
58 virtual ~Distributor() { cerr<<__func__<<endl;}
59 };
60
61 template<class Answer, class Question, class Backend> class SingleThreadDistributor
62 : public Distributor<Answer, Question, Backend>
63 {
64 public:
65 SingleThreadDistributor(const SingleThreadDistributor&) = delete;
66 void operator=(const SingleThreadDistributor&) = delete;
67 SingleThreadDistributor();
68 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
69 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
70 int getQueueSize() override {
71 return 0;
72 }
73
74 bool isOverloaded() override
75 {
76 return false;
77 }
78
79 private:
80 std::unique_ptr<Backend> b{nullptr};
81 };
82
83 template<class Answer, class Question, class Backend> class MultiThreadDistributor
84 : public Distributor<Answer, Question, Backend>
85 {
86 public:
87 MultiThreadDistributor(const MultiThreadDistributor&) = delete;
88 void operator=(const MultiThreadDistributor&) = delete;
89 MultiThreadDistributor(int n);
90 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
91 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
92 void distribute(int n);
93 int getQueueSize() override {
94 return d_queued;
95 }
96
97 struct QuestionData
98 {
99 QuestionData(const Question& query): Q(query)
100 {
101 }
102
103 Question Q;
104 callback_t callback;
105 int id;
106 };
107
108 bool isOverloaded() override
109 {
110 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
111 }
112
113 private:
114 int nextid;
115 time_t d_last_started;
116 unsigned int d_overloadQueueLength, d_maxQueueLength;
117 int d_num_threads;
118 std::atomic<unsigned int> d_queued{0};
119 std::vector<std::pair<int,int>> d_pipes;
120 };
121
122 //template<class Answer, class Question, class Backend>::nextid;
123 template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
124 {
125 if( n == 1 )
126 return new SingleThreadDistributor<Answer,Question,Backend>();
127 else
128 return new MultiThreadDistributor<Answer,Question,Backend>( n );
129 }
130
131 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
132 {
133 g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
134 try {
135 b=make_unique<Backend>();
136 }
137 catch(const PDNSException &AE) {
138 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
139 _exit(1);
140 }
141 catch(const std::exception& e) {
142 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
143 _exit(1);
144 }
145 catch(...) {
146 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
147 _exit(1);
148 }
149 }
150
151 template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
152 {
153 d_num_threads=n;
154 d_overloadQueueLength=::arg().asNum("overload-queue-length");
155 d_maxQueueLength=::arg().asNum("max-queue-length");
156 nextid=0;
157 d_last_started=time(0);
158
159 for(int i=0; i < n; ++i) {
160 int fds[2];
161 if(pipe(fds) < 0)
162 unixDie("Creating pipe");
163 d_pipes.push_back({fds[0],fds[1]});
164 }
165
166 if (n<1) {
167 g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
168 _exit(1);
169 }
170
171 g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
172 for(int i=0;i<n;i++) {
173 std::thread t(std::bind(&MultiThreadDistributor<Answer,Question,Backend>::distribute, this, i));
174 t.detach();
175 Utility::usleep(50000); // we've overloaded mysql in the past :-)
176 }
177 g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
178 }
179
180
181 // start of a new thread
182 template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
183 {
184 setThreadName("pdns/distributo");
185
186 try {
187 std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
188 int queuetimeout=::arg().asNum("queue-limit");
189
190 for(;;) {
191
192 QuestionData* tempQD = nullptr;
193 if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
194 unixDie("read");
195 --d_queued;
196 std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
197 tempQD = nullptr;
198 std::unique_ptr<Answer> a = nullptr;
199
200 if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) {
201 S.inc("timedout-packets");
202 continue;
203 }
204
205 bool allowRetry=true;
206 retry:
207 // this is the only point where we interact with the backend (synchronous)
208 try {
209 if (!b) {
210 allowRetry=false;
211 b=make_unique<Backend>();
212 }
213 a=b->question(QD->Q);
214 }
215 catch(const PDNSException &e) {
216 b.reset();
217 if (!allowRetry) {
218 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
219 a=QD->Q.replyPacket();
220
221 a->setRcode(RCode::ServFail);
222 S.inc("servfail-packets");
223 S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
224 } else {
225 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
226 goto retry;
227 }
228 }
229 catch(...) {
230 b.reset();
231 if (!allowRetry) {
232 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
233 a=QD->Q.replyPacket();
234
235 a->setRcode(RCode::ServFail);
236 S.inc("servfail-packets");
237 S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
238 } else {
239 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
240 goto retry;
241 }
242 }
243
244 QD->callback(a);
245 QD.reset();
246 }
247
248 b.reset();
249 }
250 catch(const PDNSException &AE) {
251 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
252 _exit(1);
253 }
254 catch(const std::exception& e) {
255 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
256 _exit(1);
257 }
258 catch(...) {
259 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
260 _exit(1);
261 }
262 }
263
264 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
265 {
266 std::unique_ptr<Answer> a = nullptr;
267 bool allowRetry=true;
268 retry:
269 try {
270 if (!b) {
271 allowRetry=false;
272 b=make_unique<Backend>();
273 }
274 a=b->question(q); // a can be NULL!
275 }
276 catch(const PDNSException &e) {
277 b.reset();
278 if (!allowRetry) {
279 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
280 a=q.replyPacket();
281
282 a->setRcode(RCode::ServFail);
283 S.inc("servfail-packets");
284 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
285 } else {
286 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
287 goto retry;
288 }
289 }
290 catch(...) {
291 b.reset();
292 if (!allowRetry) {
293 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
294 a=q.replyPacket();
295
296 a->setRcode(RCode::ServFail);
297 S.inc("servfail-packets");
298 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
299 } else {
300 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
301 goto retry;
302 }
303 }
304 callback(a);
305 return 0;
306 }
307
308 struct DistributorFatal{};
309
310 template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
311 {
312 // this is passed to other process over pipe and released there
313 auto QD=new QuestionData(q);
314 auto ret = QD->id = nextid++; // might be deleted after write!
315 QD->callback=callback;
316
317 ++d_queued;
318 if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) {
319 --d_queued;
320 delete QD;
321 unixDie("write");
322 }
323
324 if(d_queued > d_maxQueueLength) {
325 g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
326 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
327 throw DistributorFatal();
328 }
329
330 return ret;
331 }