]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/distributor.hh
add OpenSSL exception to PowerDNS, Netherlabs, van Dijk and Hubert copyrights
[thirdparty/pdns.git] / pdns / distributor.hh
CommitLineData
12c86877
BH
1/*
2 PowerDNS Versatile Database Driven Nameserver
e7e691cc 3 Copyright (C) 2002 - 2011 PowerDNS.COM BV
12c86877
BH
4
5 This program is free software; you can redistribute it and/or modify
22dc646a
BH
6 it under the terms of the GNU General Public License version 2
7 as published by the Free Software Foundation
f782fe38
MH
8
9 Additionally, the license of this program contains a special
10 exception which allows to distribute the program in binary form when
11 it is linked against OpenSSL.
12c86877
BH
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
06bd9ccf 20 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
12c86877 21*/
1258abe0
BH
22
23#ifndef DISTRIBUTOR_HH
24#define DISTRIBUTOR_HH
25
12c86877
BH
26#include <string>
27#include <deque>
28#include <queue>
29#include <vector>
30#include <pthread.h>
1258abe0 31#include <semaphore.h>
76473b92 32#include <unistd.h>
12c86877
BH
33#include "logger.hh"
34#include "dns.hh"
35#include "dnsbackend.hh"
5c409fa2 36#include "pdnsexception.hh"
092c9cc4
BH
37#include "arguments.hh"
38#include "statbag.hh"
12c86877 39
092c9cc4 40extern StatBag S;
12c86877
BH
41
42/** the Distributor template class enables you to multithread slow question/answer
43 processes.
44
bdc9f8d2
BH
45 Questions are posed to the Distributor, which can either hand back the answer,
46 or give it directly to a callback. Only the latter mode of operation is used in
47 PowerDNS.
48
49 The Distributor takes care that there are enough Backends alive at any one
50 time and will try to spawn additional ones should they die.
12c86877 51
bdc9f8d2
BH
52 The Backend needs to count the number of living instances and supply this number to
53 the Distributor using its numBackends() method. This is silly.
12c86877 54
bdc9f8d2
BH
55 If an exception escapes a Backend, the distributor retires it.
56*/
12c86877
BH
57template<class Answer, class Question, class Backend> class Distributor
58{
59public:
60 Distributor(int n=10); //!< Create a new Distributor with \param n threads
61 struct AnswerData
62 {
63 Answer *A;
64 time_t created;
65 };
66 int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor
67 Answer *answer(void); //!< Wait for any answer from the Distributor
68 Answer *wait(Question *); //!< wait for an answer to a specific question
69 int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout
70 static void* makeThread(void *); //!< helper function to create our n threads
71 void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue
72
12c86877
BH
73 int getNumBusy()
74 {
75 return d_num_threads-d_idle_threads;
76 }
77
12c86877
BH
78 struct QuestionData
79 {
80 Question *Q;
12c86877
BH
81 void (*callback)(const AnswerData &);
82 int id;
83 };
84
12c86877 85 typedef pair<QuestionData, AnswerData> tuple_t;
e7e691cc
BH
86 bool isOverloaded()
87 {
88 return d_overloaded;
89 }
12c86877
BH
90
91private:
e7e691cc 92 bool d_overloaded;
1258abe0 93 std::queue<QuestionData> questions;
12c86877 94 pthread_mutex_t q_lock;
12c86877
BH
95
96 deque<tuple_t> answers;
97 pthread_mutex_t a_lock;
98
99 Semaphore numquestions;
100 Semaphore numanswers;
101
102 pthread_mutex_t to_mut;
103 pthread_cond_t to_cond;
104
105 int nextid;
106 time_t d_last_started;
107 int d_num_threads;
16f7d28d 108 AtomicCounter d_idle_threads;
12c86877
BH
109 Backend *b;
110};
111
112
113//template<class Answer, class Question, class Backend>::nextid;
114
115template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>::Distributor(int n)
116{
117 b=0;
e7e691cc 118 d_overloaded = false;
49f076e8 119 nextid=0;
16f7d28d 120 // d_idle_threads=0;
12c86877
BH
121 d_last_started=time(0);
122// sem_init(&numquestions,0,0);
123 pthread_mutex_init(&q_lock,0);
124
125// sem_init(&numanswers,0,0);
126 pthread_mutex_init(&a_lock,0);
127
128 pthread_mutex_init(&to_mut,0);
129 pthread_cond_init(&to_cond,0);
130
131 pthread_t tid;
132
133 d_num_threads=n;
134
45fdd612 135 L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
12c86877
BH
136
137 for(int i=0;i<n;i++) {
138 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
139 Utility::usleep(50000); // we've overloaded mysql in the past :-)
140 }
141
f4a38014 142 L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
12c86877
BH
143}
144
145// start of a new thread
146template<class Answer, class Question, class Backend>void *Distributor<Answer,Question,Backend>::makeThread(void *p)
147{
a858f79e 148 pthread_detach(pthread_self());
12c86877
BH
149 try {
150 Backend *b=new Backend(); // this will answer our questions
151 Distributor *us=static_cast<Distributor *>(p);
152 int qcount;
153
154 // this is so gross
155#ifndef SMTPREDIR
379ab445 156 int queuetimeout=::arg().asNum("queue-limit");
12c86877
BH
157#endif
158 // ick ick ick!
e7e691cc 159 static int overloadQueueLength=::arg().asNum("overload-queue-length");
12c86877 160 for(;;) {
16f7d28d 161 ++(us->d_idle_threads);
1258abe0 162
12c86877
BH
163 us->numquestions.getValue( &qcount );
164
1258abe0 165 us->numquestions.wait();
12c86877 166
16f7d28d 167 --(us->d_idle_threads);
12c86877
BH
168 pthread_mutex_lock(&us->q_lock);
169
170 QuestionData QD=us->questions.front();
171
078f4c97
BH
172 us->questions.pop();
173 pthread_mutex_unlock(&us->q_lock);
174
12c86877
BH
175 Question *q=QD.Q;
176
e7e691cc
BH
177
178 if(us->d_overloaded && qcount <= overloadQueueLength/10) {
179 us->d_overloaded=false;
180 }
12c86877 181
12c86877
BH
182 Answer *a;
183
12c86877
BH
184#ifndef SMTPREDIR
185 if(queuetimeout && q->d_dt.udiff()>queuetimeout*1000) {
186 delete q;
187 S.inc("timedout-packets");
188 continue;
4957a608 189 }
12c86877
BH
190#endif
191 // this is the only point where we interact with the backend (synchronous)
192 try {
193 a=b->question(q); // a can be NULL!
194 delete q;
195 }
3f81d239 196 catch(const PDNSException &e) {
12c86877 197 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
4957a608 198 delete b;
12c86877
BH
199 return 0;
200 }
201 catch(...) {
279b5b47 202 L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
4957a608 203 delete b;
12c86877
BH
204 return 0;
205 }
206
207 AnswerData AD;
208 AD.A=a;
209 AD.created=time(0);
210 tuple_t tuple(QD,AD);
211
212 if(QD.callback) {
4957a608 213 QD.callback(AD);
12c86877
BH
214 }
215 else {
4957a608 216 pthread_mutex_lock(&us->a_lock);
12c86877 217
4957a608
BH
218 us->answers.push_back(tuple);
219 pthread_mutex_unlock(&us->a_lock);
12c86877 220
4957a608
BH
221 // L<<"We have an answer to send! Trying to get to to_mut lock"<<endl;
222 pthread_mutex_lock(&us->to_mut);
223 // L<<"Yes, we got the lock, we can transmit! First we post"<<endl;
224 us->numanswers.post();
225 // L<<"And now we broadcast!"<<endl;
226 pthread_cond_broadcast(&us->to_cond); // for timeoutWait();
227 pthread_mutex_unlock(&us->to_mut);
12c86877
BH
228 }
229 }
230
231 delete b;
232 }
3f81d239 233 catch(const PDNSException &AE) {
12c86877
BH
234 L<<Logger::Error<<Logger::NTLog<<"Distributor caught fatal exception: "<<AE.reason<<endl;
235 }
236 catch(...) {
237 L<<Logger::Error<<Logger::NTLog<<"Caught an unknown exception when creating backend, probably"<<endl;
238 }
239 return 0;
240}
241
242template<class Answer, class Question, class Backend>int Distributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData &))
243{
244 if(d_num_threads==1 && callback) { // short circuit
245 if(!b) {
246 L<<Logger::Error<<"Engaging bypass - now operating unthreaded"<<endl;
247 b=new Backend;
248 }
249 Answer *a;
250
5fe21f79
BH
251 try {
252 a=b->question(q); // a can be NULL!
253 }
3f81d239 254 catch(const PDNSException &e) {
5fe21f79
BH
255 L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
256 delete b;
257 b=0;
258 return 0;
259 }
260 catch(...) {
261 L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
262 delete b;
263 b=0;
264 return 0;
265 }
266
12c86877
BH
267
268 AnswerData AD;
269 AD.A=a;
270 AD.created=time(0);
271 callback(AD);
272 return 0;
273 }
274 else {
275 q=new Question(*q);
276 }
277
278 DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
65d0bcad
BH
279
280 /* the line below is a bit difficult.
281 What happens is that we have a goal for the number of running distributor threads. Furthermore, other
abc1d928 282 parts of PowerDNS also start backends, which get included in this count.
65d0bcad
BH
283
284 If less than two threads now die, no new ones will be spawned.
285
286 The solutionis to add '+2' below, but it is not a pretty solution. Better solution is
287 to only account the number of threads within the Distributor, and not in the backend.
288
b634b109 289 XXX FIXME
65d0bcad
BH
290 */
291
292 if(Backend::numRunning() < d_num_threads+2 && time(0)-d_last_started>5) {
12c86877 293 d_last_started=time(0);
280439a9 294 L<<"Distributor misses a thread ("<<Backend::numRunning()<<"<"<<d_num_threads + 2<<"), spawning new one"<<endl;
12c86877
BH
295 pthread_t tid;
296 pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
297 }
298
12c86877
BH
299 QuestionData QD;
300 QD.Q=q;
301 QD.id=nextid++;
302 QD.callback=callback;
078f4c97
BH
303
304 pthread_mutex_lock(&q_lock);
12c86877
BH
305 questions.push(QD);
306 pthread_mutex_unlock(&q_lock);
1258abe0 307
12c86877 308 numquestions.post();
e7e691cc
BH
309
310 static int overloadQueueLength=::arg().asNum("overload-queue-length");
1258abe0 311
12c86877
BH
312 if(!(nextid%50)) {
313 int val;
314 numquestions.getValue( &val );
e7e691cc
BH
315
316 if(!d_overloaded)
317 d_overloaded = overloadQueueLength && (val > overloadQueueLength);
318
379ab445
BH
319 if(val>::arg().asNum("max-queue-length")) {
320 L<<Logger::Error<<val<<" questions waiting for database attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
907ea90a 321 _exit(1);
12c86877 322 }
e7e691cc 323
1258abe0 324 }
12c86877
BH
325
326 return QD.id;
327}
328
329template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::answer()
330{
331 numanswers.wait();
332 tuple_t tuple;
333
334 pthread_mutex_lock(&a_lock);
335 tuple=answers.front();
336 answers.pop_front();
337 pthread_mutex_unlock(&a_lock);
338 return tuple.second.A;
339}
340
341//! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
342template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::wait(Question *q)
343{
344 for(;;)
345 {
346 numanswers.wait();
347 pthread_mutex_lock(&a_lock);
348
349 // search if the answer is there
350 tuple_t tuple=answers.front();
351 if(tuple.first==q)
4957a608
BH
352 {
353 answers.pop_front();
354 pthread_mutex_unlock(&a_lock);
355 return tuple.second.A;
356 }
12c86877
BH
357 // if not, loop again
358 pthread_mutex_unlock(&a_lock);
359 numanswers.post();
360 }
361 // FIXME: write this
362}
363
364template<class Answer, class Question,class Backend>void Distributor<Answer,Question,Backend>::getQueueSizes(int &questions, int &answers)
1258abe0
BH
365{
366 numquestions.getValue( &questions );
12c86877
BH
367 numanswers.getValue( &answers );
368}
369
1258abe0 370#endif // DISTRIBUTOR_HH
12c86877 371