]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/distributor.hh
Merge pull request #8342 from chbruyand/pipebackend-unused-warning
[thirdparty/pdns.git] / pdns / distributor.hh
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifndef DISTRIBUTOR_HH
23 #define DISTRIBUTOR_HH
24
25 #include <string>
26 #include <deque>
27 #include <queue>
28 #include <vector>
29 #include <pthread.h>
30 #include "threadname.hh"
31 #include <unistd.h>
32 #include "logger.hh"
33 #include "dns.hh"
34 #include "dnsbackend.hh"
35 #include "pdnsexception.hh"
36 #include "arguments.hh"
37 #include <atomic>
38 #include "statbag.hh"
39
40 extern StatBag S;
41
42 /** the Distributor template class enables you to multithread slow question/answer
43 processes.
44
45 Questions are posed to the Distributor, which returns the answer via a callback.
46
47 The Distributor spawns sufficient backends, and if they thrown an exception,
48 it will cycle the backend but drop the query that was active during the exception.
49 */
50
51 template<class Answer, class Question, class Backend> class Distributor
52 {
53 public:
54 static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
55 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
56 virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
57 virtual int getQueueSize() =0; //!< Returns length of question queue
58 virtual bool isOverloaded() =0;
59 virtual ~Distributor() { cerr<<__func__<<endl;}
60 };
61
62 template<class Answer, class Question, class Backend> class SingleThreadDistributor
63 : public Distributor<Answer, Question, Backend>
64 {
65 public:
66 SingleThreadDistributor(const SingleThreadDistributor&) = delete;
67 void operator=(const SingleThreadDistributor&) = delete;
68 SingleThreadDistributor();
69 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
70 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
71 int getQueueSize() override {
72 return 0;
73 }
74
75 bool isOverloaded() override
76 {
77 return false;
78 }
79
80 private:
81 std::unique_ptr<Backend> b{nullptr};
82 };
83
84 template<class Answer, class Question, class Backend> class MultiThreadDistributor
85 : public Distributor<Answer, Question, Backend>
86 {
87 public:
88 MultiThreadDistributor(const MultiThreadDistributor&) = delete;
89 void operator=(const MultiThreadDistributor&) = delete;
90 MultiThreadDistributor(int n);
91 typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
92 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
93 static void* makeThread(void *); //!< helper function to create our n threads
94 int getQueueSize() override {
95 return d_queued;
96 }
97
98 struct QuestionData
99 {
100 QuestionData(const Question& query): Q(query)
101 {
102 }
103
104 Question Q;
105 callback_t callback;
106 int id;
107 };
108
109 bool isOverloaded() override
110 {
111 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
112 }
113
114 private:
115 int nextid;
116 time_t d_last_started;
117 unsigned int d_overloadQueueLength, d_maxQueueLength;
118 int d_num_threads;
119 std::atomic<unsigned int> d_queued{0}, d_running{0};
120 std::vector<std::pair<int,int>> d_pipes;
121 };
122
123 //template<class Answer, class Question, class Backend>::nextid;
124 template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
125 {
126 if( n == 1 )
127 return new SingleThreadDistributor<Answer,Question,Backend>();
128 else
129 return new MultiThreadDistributor<Answer,Question,Backend>( n );
130 }
131
132 template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
133 {
134 g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
135 try {
136 b=make_unique<Backend>();
137 }
138 catch(const PDNSException &AE) {
139 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
140 _exit(1);
141 }
142 catch(...) {
143 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
144 _exit(1);
145 }
146 }
147
148 template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
149 {
150 d_num_threads=n;
151 d_overloadQueueLength=::arg().asNum("overload-queue-length");
152 d_maxQueueLength=::arg().asNum("max-queue-length");
153 nextid=0;
154 d_last_started=time(0);
155
156 pthread_t tid;
157
158
159 for(int i=0; i < n; ++i) {
160 int fds[2];
161 if(pipe(fds) < 0)
162 unixDie("Creating pipe");
163 d_pipes.push_back({fds[0],fds[1]});
164 }
165
166 if (n<1) {
167 g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
168 _exit(1);
169 }
170
171 g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
172 for(int i=0;i<n;i++) {
173 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
174 Utility::usleep(50000); // we've overloaded mysql in the past :-)
175 }
176 g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
177 }
178
179
180 // start of a new thread
181 template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
182 {
183 setThreadName("pdns/distributo");
184 pthread_detach(pthread_self());
185 MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
186 int ournum=us->d_running++;
187
188 try {
189 std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
190 int queuetimeout=::arg().asNum("queue-limit");
191
192 for(;;) {
193
194 QuestionData* tempQD = nullptr;
195 if(read(us->d_pipes[ournum].first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
196 unixDie("read");
197 --us->d_queued;
198 std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
199 tempQD = nullptr;
200 std::unique_ptr<Answer> a = nullptr;
201
202 if(queuetimeout && QD->Q.d_dt.udiff()>queuetimeout*1000) {
203 S.inc("timedout-packets");
204 continue;
205 }
206
207 bool allowRetry=true;
208 retry:
209 // this is the only point where we interact with the backend (synchronous)
210 try {
211 if (!b) {
212 allowRetry=false;
213 b=make_unique<Backend>();
214 }
215 a=b->question(QD->Q);
216 }
217 catch(const PDNSException &e) {
218 b.reset();
219 if (!allowRetry) {
220 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
221 a=QD->Q.replyPacket();
222
223 a->setRcode(RCode::ServFail);
224 S.inc("servfail-packets");
225 S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
226 } else {
227 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
228 goto retry;
229 }
230 }
231 catch(...) {
232 b.reset();
233 if (!allowRetry) {
234 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl;
235 a=QD->Q.replyPacket();
236
237 a->setRcode(RCode::ServFail);
238 S.inc("servfail-packets");
239 S.ringAccount("servfail-queries", QD->Q.qdomain, QD->Q.qtype);
240 } else {
241 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl;
242 goto retry;
243 }
244 }
245
246 QD->callback(a);
247 QD.reset();
248 }
249
250 b.reset();
251 }
252 catch(const PDNSException &AE) {
253 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
254 _exit(1);
255 }
256 catch(...) {
257 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
258 _exit(1);
259 }
260 return 0;
261 }
262
263 template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
264 {
265 std::unique_ptr<Answer> a = nullptr;
266 bool allowRetry=true;
267 retry:
268 try {
269 if (!b) {
270 allowRetry=false;
271 b=make_unique<Backend>();
272 }
273 a=b->question(q); // a can be NULL!
274 }
275 catch(const PDNSException &e) {
276 b.reset();
277 if (!allowRetry) {
278 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
279 a=q.replyPacket();
280
281 a->setRcode(RCode::ServFail);
282 S.inc("servfail-packets");
283 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
284 } else {
285 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
286 goto retry;
287 }
288 }
289 catch(...) {
290 b.reset();
291 if (!allowRetry) {
292 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
293 a=q.replyPacket();
294
295 a->setRcode(RCode::ServFail);
296 S.inc("servfail-packets");
297 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
298 } else {
299 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl;
300 goto retry;
301 }
302 }
303 callback(a);
304 return 0;
305 }
306
307 struct DistributorFatal{};
308
309 template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
310 {
311 // this is passed to other process over pipe and released there
312 auto QD=new QuestionData(q);
313 auto ret = QD->id = nextid++; // might be deleted after write!
314 QD->callback=callback;
315
316 ++d_queued;
317 if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) {
318 --d_queued;
319 delete QD;
320 unixDie("write");
321 }
322
323 if(d_queued > d_maxQueueLength) {
324 g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
325 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
326 throw DistributorFatal();
327 }
328
329 return ret;
330 }
331
332 #endif // DISTRIBUTOR_HH
333