]>
Commit | Line | Data |
---|---|---|
12c86877 | 1 | /* |
12471842 PL |
2 | * This file is part of PowerDNS or dnsdist. |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
1258abe0 BH |
22 | #ifndef DISTRIBUTOR_HH |
23 | #define DISTRIBUTOR_HH | |
24 | ||
12c86877 BH |
25 | #include <string> |
26 | #include <deque> | |
27 | #include <queue> | |
28 | #include <vector> | |
29 | #include <pthread.h> | |
519f5484 | 30 | #include "threadname.hh" |
76473b92 | 31 | #include <unistd.h> |
12c86877 BH |
32 | #include "logger.hh" |
33 | #include "dns.hh" | |
34 | #include "dnsbackend.hh" | |
5c409fa2 | 35 | #include "pdnsexception.hh" |
092c9cc4 | 36 | #include "arguments.hh" |
491d03d7 | 37 | #include <atomic> |
092c9cc4 | 38 | #include "statbag.hh" |
12c86877 | 39 | |
092c9cc4 | 40 | extern StatBag S; |
12c86877 BH |
41 | |
42 | /** the Distributor template class enables you to multithread slow question/answer | |
43 | processes. | |
44 | ||
ad7d9cd0 | 45 | Questions are posed to the Distributor, which returns the answer via a callback. |
bdc9f8d2 | 46 | |
491d03d7 | 47 | The Distributor spawns sufficient backends, and if they thrown an exception, |
48 | it will cycle the backend but drop the query that was active during the exception. | |
bdc9f8d2 | 49 | */ |
ad7d9cd0 | 50 | |
12c86877 BH |
51 | template<class Answer, class Question, class Backend> class Distributor |
52 | { | |
53 | public: | |
ad7d9cd0 | 54 | static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads |
491d03d7 | 55 | typedef std::function<void(Answer*)> callback_t; |
56 | virtual int question(Question *, callback_t callback) =0; //!< Submit a question to the Distributor | |
57 | virtual int getQueueSize() =0; //!< Returns length of question queue | |
d70682f7 | 58 | virtual bool isOverloaded() =0; |
ad7d9cd0 MZ |
59 | }; |
60 | ||
61 | template<class Answer, class Question, class Backend> class SingleThreadDistributor | |
62 | : public Distributor<Answer, Question, Backend> | |
63 | { | |
64 | public: | |
f6c19c4f CHB |
65 | SingleThreadDistributor(const SingleThreadDistributor&) = delete; |
66 | void operator=(const SingleThreadDistributor&) = delete; | |
ad7d9cd0 | 67 | SingleThreadDistributor(); |
491d03d7 | 68 | typedef std::function<void(Answer*)> callback_t; |
69 | int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor | |
cff3e04d | 70 | int getQueueSize() override { |
ad7d9cd0 MZ |
71 | return 0; |
72 | } | |
73 | ||
cff3e04d | 74 | bool isOverloaded() override |
12c86877 | 75 | { |
ad7d9cd0 MZ |
76 | return false; |
77 | } | |
78 | ||
5d47a576 AT |
79 | ~SingleThreadDistributor() { |
80 | if (b) delete b; | |
81 | } | |
ad7d9cd0 | 82 | private: |
491d03d7 | 83 | Backend *b{0}; |
ad7d9cd0 MZ |
84 | }; |
85 | ||
86 | template<class Answer, class Question, class Backend> class MultiThreadDistributor | |
87 | : public Distributor<Answer, Question, Backend> | |
88 | { | |
89 | public: | |
f6c19c4f CHB |
90 | MultiThreadDistributor(const MultiThreadDistributor&) = delete; |
91 | void operator=(const MultiThreadDistributor&) = delete; | |
491d03d7 | 92 | MultiThreadDistributor(int n); |
93 | typedef std::function<void(Answer*)> callback_t; | |
94 | int question(Question *, callback_t callback) override; //!< Submit a question to the Distributor | |
12c86877 | 95 | static void* makeThread(void *); //!< helper function to create our n threads |
491d03d7 | 96 | int getQueueSize() override { |
97 | return d_queued; | |
12c86877 BH |
98 | } |
99 | ||
12c86877 BH |
100 | struct QuestionData |
101 | { | |
102 | Question *Q; | |
491d03d7 | 103 | callback_t callback; |
12c86877 BH |
104 | int id; |
105 | }; | |
106 | ||
491d03d7 | 107 | bool isOverloaded() override |
e7e691cc | 108 | { |
d3363b40 | 109 | return d_overloadQueueLength && (d_queued > d_overloadQueueLength); |
e7e691cc | 110 | } |
12c86877 BH |
111 | |
112 | private: | |
12c86877 BH |
113 | int nextid; |
114 | time_t d_last_started; | |
d3363b40 | 115 | unsigned int d_overloadQueueLength, d_maxQueueLength; |
12c86877 | 116 | int d_num_threads; |
491d03d7 | 117 | std::atomic<unsigned int> d_queued{0}, d_running{0}; |
118 | std::vector<std::pair<int,int>> d_pipes; | |
12c86877 BH |
119 | }; |
120 | ||
12c86877 | 121 | //template<class Answer, class Question, class Backend>::nextid; |
ad7d9cd0 MZ |
122 | template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n) |
123 | { | |
124 | if( n == 1 ) | |
125 | return new SingleThreadDistributor<Answer,Question,Backend>(); | |
126 | else | |
127 | return new MultiThreadDistributor<Answer,Question,Backend>( n ); | |
128 | } | |
129 | ||
130 | template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor() | |
131 | { | |
e6a9dde5 | 132 | g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl; |
07dbfb4f KM |
133 | try { |
134 | b=new Backend; | |
135 | } | |
136 | catch(const PDNSException &AE) { | |
e6a9dde5 | 137 | g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; |
dc386240 | 138 | _exit(1); |
07dbfb4f KM |
139 | } |
140 | catch(...) { | |
e6a9dde5 | 141 | g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl; |
dc386240 | 142 | _exit(1); |
07dbfb4f | 143 | } |
ad7d9cd0 | 144 | } |
12c86877 | 145 | |
ad7d9cd0 | 146 | template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n) |
12c86877 | 147 | { |
ad7d9cd0 | 148 | d_num_threads=n; |
d3363b40 | 149 | d_overloadQueueLength=::arg().asNum("overload-queue-length"); |
150 | d_maxQueueLength=::arg().asNum("max-queue-length"); | |
49f076e8 | 151 | nextid=0; |
12c86877 | 152 | d_last_started=time(0); |
12c86877 | 153 | |
12c86877 BH |
154 | pthread_t tid; |
155 | ||
491d03d7 | 156 | |
157 | for(int i=0; i < n; ++i) { | |
158 | int fds[2]; | |
159 | if(pipe(fds) < 0) | |
160 | unixDie("Creating pipe"); | |
161 | d_pipes.push_back({fds[0],fds[1]}); | |
162 | } | |
163 | ||
0469c6ed | 164 | if (n<1) { |
e6a9dde5 | 165 | g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl; |
dc386240 | 166 | _exit(1); |
0469c6ed AT |
167 | } |
168 | ||
e6a9dde5 | 169 | g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl; |
12c86877 BH |
170 | for(int i=0;i<n;i++) { |
171 | pthread_create(&tid,0,&makeThread,static_cast<void *>(this)); | |
172 | Utility::usleep(50000); // we've overloaded mysql in the past :-) | |
173 | } | |
e6a9dde5 | 174 | g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl; |
12c86877 BH |
175 | } |
176 | ||
ad7d9cd0 | 177 | |
12c86877 | 178 | // start of a new thread |
ad7d9cd0 | 179 | template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p) |
12c86877 | 180 | { |
519f5484 | 181 | setThreadName("pdns/distributo"); |
a858f79e | 182 | pthread_detach(pthread_self()); |
491d03d7 | 183 | MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p); |
184 | int ournum=us->d_running++; | |
185 | ||
12c86877 BH |
186 | try { |
187 | Backend *b=new Backend(); // this will answer our questions | |
379ab445 | 188 | int queuetimeout=::arg().asNum("queue-limit"); |
12c86877 | 189 | |
491d03d7 | 190 | for(;;) { |
191 | ||
192 | QuestionData* QD; | |
193 | if(read(us->d_pipes[ournum].first, &QD, sizeof(QD)) != sizeof(QD)) | |
194 | unixDie("read"); | |
195 | --us->d_queued; | |
f1ad3c5b | 196 | Answer *a; |
12c86877 | 197 | |
491d03d7 | 198 | if(queuetimeout && QD->Q->d_dt.udiff()>queuetimeout*1000) { |
199 | delete QD->Q; | |
200 | delete QD; | |
12c86877 BH |
201 | S.inc("timedout-packets"); |
202 | continue; | |
4957a608 | 203 | } |
bf17c14b KM |
204 | |
205 | bool allowRetry=true; | |
206 | retry: | |
12c86877 BH |
207 | // this is the only point where we interact with the backend (synchronous) |
208 | try { | |
bf17c14b KM |
209 | if (!b) { |
210 | allowRetry=false; | |
07dbfb4f | 211 | b=new Backend(); |
bf17c14b KM |
212 | } |
213 | a=b->question(QD->Q); | |
214 | delete QD->Q; | |
12c86877 | 215 | } |
3f81d239 | 216 | catch(const PDNSException &e) { |
bf17c14b | 217 | delete b; |
07dbfb4f | 218 | b=NULL; |
bf17c14b | 219 | if (!allowRetry) { |
e6a9dde5 | 220 | g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl; |
bf17c14b KM |
221 | a=QD->Q->replyPacket(); |
222 | ||
223 | a->setRcode(RCode::ServFail); | |
224 | S.inc("servfail-packets"); | |
74f6d6e1 | 225 | S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString()); |
bf17c14b KM |
226 | |
227 | delete QD->Q; | |
228 | } else { | |
e6a9dde5 | 229 | g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl; |
bf17c14b KM |
230 | goto retry; |
231 | } | |
12c86877 BH |
232 | } |
233 | catch(...) { | |
bf17c14b KM |
234 | delete b; |
235 | b=NULL; | |
236 | if (!allowRetry) { | |
e6a9dde5 | 237 | g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<endl; |
bf17c14b KM |
238 | a=QD->Q->replyPacket(); |
239 | ||
240 | a->setRcode(RCode::ServFail); | |
241 | S.inc("servfail-packets"); | |
74f6d6e1 | 242 | S.ringAccount("servfail-queries",QD->Q->qdomain.toLogString()); |
bf17c14b KM |
243 | |
244 | delete QD->Q; | |
245 | } else { | |
e6a9dde5 | 246 | g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(long)pthread_self()<<" (retry once)"<<endl; |
bf17c14b KM |
247 | goto retry; |
248 | } | |
12c86877 BH |
249 | } |
250 | ||
491d03d7 | 251 | QD->callback(a); |
252 | delete QD; | |
12c86877 | 253 | } |
bf17c14b | 254 | |
12c86877 BH |
255 | delete b; |
256 | } | |
3f81d239 | 257 | catch(const PDNSException &AE) { |
e6a9dde5 | 258 | g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; |
dc386240 | 259 | _exit(1); |
12c86877 BH |
260 | } |
261 | catch(...) { | |
e6a9dde5 | 262 | g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl; |
dc386240 | 263 | _exit(1); |
12c86877 BH |
264 | } |
265 | return 0; | |
266 | } | |
267 | ||
491d03d7 | 268 | template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback) |
12c86877 | 269 | { |
ad7d9cd0 | 270 | Answer *a; |
bf17c14b KM |
271 | bool allowRetry=true; |
272 | retry: | |
ad7d9cd0 | 273 | try { |
bf17c14b KM |
274 | if (!b) { |
275 | allowRetry=false; | |
07dbfb4f | 276 | b=new Backend; |
bf17c14b | 277 | } |
ad7d9cd0 MZ |
278 | a=b->question(q); // a can be NULL! |
279 | } | |
e6577aef | 280 | catch(const PDNSException &e) { |
ad7d9cd0 | 281 | delete b; |
07dbfb4f | 282 | b=NULL; |
bf17c14b | 283 | if (!allowRetry) { |
e6a9dde5 | 284 | g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl; |
bf17c14b KM |
285 | a=q->replyPacket(); |
286 | ||
287 | a->setRcode(RCode::ServFail); | |
288 | S.inc("servfail-packets"); | |
74f6d6e1 | 289 | S.ringAccount("servfail-queries",q->qdomain.toLogString()); |
bf17c14b | 290 | } else { |
e6a9dde5 | 291 | g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl; |
bf17c14b KM |
292 | goto retry; |
293 | } | |
12c86877 | 294 | } |
ad7d9cd0 | 295 | catch(...) { |
ad7d9cd0 | 296 | delete b; |
07dbfb4f | 297 | b=NULL; |
bf17c14b | 298 | if (!allowRetry) { |
e6a9dde5 | 299 | g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl; |
bf17c14b KM |
300 | a=q->replyPacket(); |
301 | ||
302 | a->setRcode(RCode::ServFail); | |
303 | S.inc("servfail-packets"); | |
74f6d6e1 | 304 | S.ringAccount("servfail-queries",q->qdomain.toLogString()); |
bf17c14b | 305 | } else { |
e6a9dde5 | 306 | g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<" (retry once)"<<endl; |
bf17c14b KM |
307 | goto retry; |
308 | } | |
12c86877 | 309 | } |
491d03d7 | 310 | callback(a); |
ad7d9cd0 MZ |
311 | return 0; |
312 | } | |
313 | ||
491d03d7 | 314 | struct DistributorFatal{}; |
315 | ||
316 | template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, callback_t callback) | |
ad7d9cd0 | 317 | { |
ad7d9cd0 | 318 | q=new Question(*q); |
12c86877 | 319 | |
2301afa0 | 320 | // this is passed to other process over pipe and released there |
491d03d7 | 321 | auto QD=new QuestionData(); |
322 | QD->Q=q; | |
323 | auto ret = QD->id = nextid++; // might be deleted after write! | |
324 | QD->callback=callback; | |
e7e691cc | 325 | |
101112ad RG |
326 | ++d_queued; |
327 | if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) { | |
328 | --d_queued; | |
329 | unixDie("write"); | |
330 | } | |
d3363b40 | 331 | |
332 | if(d_queued > d_maxQueueLength) { | |
e6a9dde5 | 333 | g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl; |
491d03d7 | 334 | // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens! |
335 | throw DistributorFatal(); | |
1258abe0 | 336 | } |
491d03d7 | 337 | |
338 | return ret; | |
12c86877 BH |
339 | } |
340 | ||
1258abe0 | 341 | #endif // DISTRIBUTOR_HH |
12c86877 | 342 |