]>
Commit | Line | Data |
---|---|---|
12c86877 | 1 | /* |
12471842 PL |
2 | * This file is part of PowerDNS or dnsdist. |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
e8c59f2d | 22 | #pragma once |
15e39ee4 OM |
23 | #ifdef HAVE_CONFIG_H |
24 | #include "config.h" | |
25 | #endif | |
12c86877 BH |
26 | #include <string> |
27 | #include <deque> | |
28 | #include <queue> | |
29 | #include <vector> | |
0ddde5fb | 30 | #include <thread> |
519f5484 | 31 | #include "threadname.hh" |
76473b92 | 32 | #include <unistd.h> |
13bdc4d9 RG |
33 | |
34 | #include "channel.hh" | |
12c86877 BH |
35 | #include "logger.hh" |
36 | #include "dns.hh" | |
37 | #include "dnsbackend.hh" | |
5c409fa2 | 38 | #include "pdnsexception.hh" |
092c9cc4 | 39 | #include "arguments.hh" |
491d03d7 | 40 | #include <atomic> |
092c9cc4 | 41 | #include "statbag.hh" |
c113acc3 | 42 | #include "gss_context.hh" |
12c86877 | 43 | |
092c9cc4 | 44 | extern StatBag S; |
12c86877 BH |
45 | |
46 | /** the Distributor template class enables you to multithread slow question/answer | |
47 | processes. | |
48 | ||
ad7d9cd0 | 49 | Questions are posed to the Distributor, which returns the answer via a callback. |
bdc9f8d2 | 50 | |
491d03d7 | 51 | The Distributor spawns sufficient backends, and if they thrown an exception, |
52 | it will cycle the backend but drop the query that was active during the exception. | |
bdc9f8d2 | 53 | */ |
ad7d9cd0 | 54 | |
12c86877 BH |
55 | template<class Answer, class Question, class Backend> class Distributor |
56 | { | |
57 | public: | |
c2826d2e | 58 | static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads |
386c5b13 | 59 | typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t; |
c2826d2e | 60 | virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor |
491d03d7 | 61 | virtual int getQueueSize() =0; //!< Returns length of question queue |
d70682f7 | 62 | virtual bool isOverloaded() =0; |
c2826d2e | 63 | virtual ~Distributor() { cerr<<__func__<<endl;} |
ad7d9cd0 MZ |
64 | }; |
65 | ||
66 | template<class Answer, class Question, class Backend> class SingleThreadDistributor | |
67 | : public Distributor<Answer, Question, Backend> | |
68 | { | |
69 | public: | |
f6c19c4f CHB |
70 | SingleThreadDistributor(const SingleThreadDistributor&) = delete; |
71 | void operator=(const SingleThreadDistributor&) = delete; | |
ad7d9cd0 | 72 | SingleThreadDistributor(); |
386c5b13 | 73 | typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t; |
c2826d2e | 74 | int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor |
cff3e04d | 75 | int getQueueSize() override { |
ad7d9cd0 MZ |
76 | return 0; |
77 | } | |
78 | ||
cff3e04d | 79 | bool isOverloaded() override |
12c86877 | 80 | { |
ad7d9cd0 MZ |
81 | return false; |
82 | } | |
83 | ||
84 | private: | |
c2826d2e | 85 | std::unique_ptr<Backend> b{nullptr}; |
ad7d9cd0 MZ |
86 | }; |
87 | ||
88 | template<class Answer, class Question, class Backend> class MultiThreadDistributor | |
89 | : public Distributor<Answer, Question, Backend> | |
90 | { | |
91 | public: | |
f6c19c4f CHB |
92 | MultiThreadDistributor(const MultiThreadDistributor&) = delete; |
93 | void operator=(const MultiThreadDistributor&) = delete; | |
491d03d7 | 94 | MultiThreadDistributor(int n); |
386c5b13 | 95 | typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t; |
c2826d2e | 96 | int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor |
0ddde5fb | 97 | void distribute(int n); |
491d03d7 | 98 | int getQueueSize() override { |
99 | return d_queued; | |
12c86877 BH |
100 | } |
101 | ||
12c86877 BH |
102 | struct QuestionData |
103 | { | |
c2826d2e RG |
104 | QuestionData(const Question& query): Q(query) |
105 | { | |
386c5b13 | 106 | start = Q.d_dt.udiff(); |
c2826d2e RG |
107 | } |
108 | ||
109 | Question Q; | |
be4b9449 AT |
110 | callback_t callback{nullptr}; |
111 | int id{0}; | |
386c5b13 | 112 | int start{0}; |
12c86877 BH |
113 | }; |
114 | ||
491d03d7 | 115 | bool isOverloaded() override |
e7e691cc | 116 | { |
d3363b40 | 117 | return d_overloadQueueLength && (d_queued > d_overloadQueueLength); |
e7e691cc | 118 | } |
c2826d2e | 119 | |
12c86877 | 120 | private: |
13bdc4d9 RG |
121 | std::vector<pdns::channel::Sender<QuestionData>> d_senders; |
122 | std::vector<pdns::channel::Receiver<QuestionData>> d_receivers; | |
123 | time_t d_last_started{0}; | |
0ddde5fb | 124 | std::atomic<unsigned int> d_queued{0}; |
13bdc4d9 RG |
125 | unsigned int d_overloadQueueLength{0}; |
126 | unsigned int d_maxQueueLength{0}; | |
127 | int d_nextid{0}; | |
128 | int d_num_threads{0}; | |
12c86877 BH |
129 | }; |
130 | ||
c2826d2e | 131 | template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n) |
ad7d9cd0 MZ |
132 | { |
133 | if( n == 1 ) | |
c2826d2e | 134 | return new SingleThreadDistributor<Answer,Question,Backend>(); |
ad7d9cd0 | 135 | else |
c2826d2e | 136 | return new MultiThreadDistributor<Answer,Question,Backend>( n ); |
ad7d9cd0 MZ |
137 | } |
138 | ||
139 | template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor() | |
140 | { | |
e6a9dde5 | 141 | g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl; |
07dbfb4f | 142 | try { |
c2826d2e | 143 | b=make_unique<Backend>(); |
07dbfb4f KM |
144 | } |
145 | catch(const PDNSException &AE) { | |
e6a9dde5 | 146 | g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; |
dc386240 | 147 | _exit(1); |
07dbfb4f | 148 | } |
0ddde5fb RG |
149 | catch(const std::exception& e) { |
150 | g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl; | |
151 | _exit(1); | |
152 | } | |
07dbfb4f | 153 | catch(...) { |
e6a9dde5 | 154 | g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl; |
dc386240 | 155 | _exit(1); |
07dbfb4f | 156 | } |
ad7d9cd0 | 157 | } |
12c86877 | 158 | |
13bdc4d9 RG |
159 | template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int numberOfThreads) : |
160 | d_last_started(time(nullptr)), d_overloadQueueLength(::arg().asNum("overload-queue-length")), d_maxQueueLength(::arg().asNum("max-queue-length")), d_num_threads(numberOfThreads) | |
12c86877 | 161 | { |
13bdc4d9 | 162 | if (numberOfThreads < 1) { |
e6a9dde5 | 163 | g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl; |
dc386240 | 164 | _exit(1); |
0469c6ed AT |
165 | } |
166 | ||
13bdc4d9 | 167 | for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) { |
c1d76521 | 168 | auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking); |
13bdc4d9 RG |
169 | d_senders.push_back(std::move(sender)); |
170 | d_receivers.push_back(std::move(receiver)); | |
171 | } | |
172 | ||
173 | g_log<<Logger::Warning<<"About to create "<<numberOfThreads<<" backend threads for UDP"<<endl; | |
174 | ||
175 | for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) { | |
176 | std::thread t([=](){distribute(distributorIdx);}); | |
0ddde5fb | 177 | t.detach(); |
12c86877 BH |
178 | Utility::usleep(50000); // we've overloaded mysql in the past :-) |
179 | } | |
e6a9dde5 | 180 | g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl; |
12c86877 BH |
181 | } |
182 | ||
ad7d9cd0 | 183 | |
12c86877 | 184 | // start of a new thread |
0ddde5fb | 185 | template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum) |
12c86877 | 186 | { |
55941601 | 187 | // this is the longest name we can use, not a typo |
519f5484 | 188 | setThreadName("pdns/distributo"); |
491d03d7 | 189 | |
12c86877 | 190 | try { |
13bdc4d9 RG |
191 | auto b = make_unique<Backend>(); // this will answer our questions |
192 | int queuetimeout = ::arg().asNum("queue-limit"); | |
193 | auto& receiver = d_receivers.at(ournum); | |
12c86877 | 194 | |
13bdc4d9 RG |
195 | for (;;) { |
196 | auto tempQD = receiver.receive(); | |
197 | if (!tempQD) { | |
491d03d7 | 198 | unixDie("read"); |
13bdc4d9 | 199 | } |
0ddde5fb | 200 | --d_queued; |
13bdc4d9 | 201 | auto questionData = std::move(*tempQD); |
c2826d2e | 202 | std::unique_ptr<Answer> a = nullptr; |
13bdc4d9 | 203 | if (queuetimeout && questionData->Q.d_dt.udiff() > queuetimeout * 1000) { |
12c86877 BH |
204 | S.inc("timedout-packets"); |
205 | continue; | |
13bdc4d9 | 206 | } |
bf17c14b | 207 | |
13bdc4d9 | 208 | bool allowRetry = true; |
bf17c14b | 209 | retry: |
12c86877 BH |
210 | // this is the only point where we interact with the backend (synchronous) |
211 | try { | |
bf17c14b | 212 | if (!b) { |
13bdc4d9 RG |
213 | allowRetry = false; |
214 | b = make_unique<Backend>(); | |
bf17c14b | 215 | } |
13bdc4d9 | 216 | a = b->question(questionData->Q); |
12c86877 | 217 | } |
13bdc4d9 | 218 | catch (const PDNSException &e) { |
c2826d2e | 219 | b.reset(); |
bf17c14b | 220 | if (!allowRetry) { |
e6a9dde5 | 221 | g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl; |
13bdc4d9 | 222 | a = questionData->Q.replyPacket(); |
bf17c14b KM |
223 | |
224 | a->setRcode(RCode::ServFail); | |
225 | S.inc("servfail-packets"); | |
13bdc4d9 | 226 | S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype); |
bf17c14b | 227 | } else { |
e6a9dde5 | 228 | g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl; |
bf17c14b KM |
229 | goto retry; |
230 | } | |
12c86877 | 231 | } |
13bdc4d9 | 232 | catch (...) { |
c2826d2e | 233 | b.reset(); |
bf17c14b | 234 | if (!allowRetry) { |
13bdc4d9 RG |
235 | g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl; |
236 | a = questionData->Q.replyPacket(); | |
bf17c14b KM |
237 | |
238 | a->setRcode(RCode::ServFail); | |
239 | S.inc("servfail-packets"); | |
13bdc4d9 | 240 | S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype); |
bf17c14b | 241 | } else { |
13bdc4d9 | 242 | g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl; |
bf17c14b KM |
243 | goto retry; |
244 | } | |
12c86877 BH |
245 | } |
246 | ||
13bdc4d9 | 247 | questionData->callback(a, questionData->start); |
15e39ee4 | 248 | #ifdef ENABLE_GSS_TSIG |
c113acc3 | 249 | if (g_doGssTSIG && a != nullptr) { |
13bdc4d9 | 250 | questionData->Q.cleanupGSS(a->d.rcode); |
647e83a5 | 251 | } |
15e39ee4 | 252 | #endif |
13bdc4d9 | 253 | questionData.reset(); |
12c86877 | 254 | } |
bf17c14b | 255 | |
c2826d2e | 256 | b.reset(); |
12c86877 | 257 | } |
13bdc4d9 | 258 | catch (const PDNSException &AE) { |
e6a9dde5 | 259 | g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl; |
dc386240 | 260 | _exit(1); |
12c86877 | 261 | } |
13bdc4d9 | 262 | catch (const std::exception& e) { |
0ddde5fb RG |
263 | g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl; |
264 | _exit(1); | |
265 | } | |
13bdc4d9 | 266 | catch (...) { |
e6a9dde5 | 267 | g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl; |
dc386240 | 268 | _exit(1); |
12c86877 | 269 | } |
12c86877 BH |
270 | } |
271 | ||
c2826d2e | 272 | template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback) |
12c86877 | 273 | { |
386c5b13 | 274 | int start = q.d_dt.udiff(); |
c2826d2e | 275 | std::unique_ptr<Answer> a = nullptr; |
bf17c14b KM |
276 | bool allowRetry=true; |
277 | retry: | |
ad7d9cd0 | 278 | try { |
bf17c14b KM |
279 | if (!b) { |
280 | allowRetry=false; | |
c2826d2e | 281 | b=make_unique<Backend>(); |
bf17c14b | 282 | } |
ad7d9cd0 MZ |
283 | a=b->question(q); // a can be NULL! |
284 | } | |
e6577aef | 285 | catch(const PDNSException &e) { |
c2826d2e | 286 | b.reset(); |
bf17c14b | 287 | if (!allowRetry) { |
e6a9dde5 | 288 | g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl; |
c2826d2e | 289 | a=q.replyPacket(); |
bf17c14b KM |
290 | |
291 | a->setRcode(RCode::ServFail); | |
292 | S.inc("servfail-packets"); | |
c2826d2e | 293 | S.ringAccount("servfail-queries", q.qdomain, q.qtype); |
bf17c14b | 294 | } else { |
e6a9dde5 | 295 | g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl; |
bf17c14b KM |
296 | goto retry; |
297 | } | |
12c86877 | 298 | } |
ad7d9cd0 | 299 | catch(...) { |
c2826d2e | 300 | b.reset(); |
bf17c14b | 301 | if (!allowRetry) { |
13bdc4d9 | 302 | g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl; |
c2826d2e | 303 | a=q.replyPacket(); |
bf17c14b KM |
304 | |
305 | a->setRcode(RCode::ServFail); | |
306 | S.inc("servfail-packets"); | |
c2826d2e | 307 | S.ringAccount("servfail-queries", q.qdomain, q.qtype); |
bf17c14b | 308 | } else { |
13bdc4d9 | 309 | g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl; |
bf17c14b KM |
310 | goto retry; |
311 | } | |
12c86877 | 312 | } |
386c5b13 | 313 | callback(a, start); |
15e39ee4 | 314 | #ifdef ENABLE_GSS_TSIG |
c113acc3 | 315 | if (g_doGssTSIG && a != nullptr) { |
647e83a5 O |
316 | q.cleanupGSS(a->d.rcode); |
317 | } | |
15e39ee4 | 318 | #endif |
ad7d9cd0 MZ |
319 | return 0; |
320 | } | |
321 | ||
491d03d7 | 322 | struct DistributorFatal{}; |
323 | ||
c2826d2e | 324 | template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback) |
ad7d9cd0 | 325 | { |
2301afa0 | 326 | // this is passed to other process over pipe and released there |
13bdc4d9 RG |
327 | auto questionData = std::make_unique<QuestionData>(q); |
328 | auto ret = questionData->id = d_nextid++; // might be deleted after write! | |
329 | questionData->callback = callback; | |
e7e691cc | 330 | |
101112ad | 331 | ++d_queued; |
13bdc4d9 | 332 | if (!d_senders.at(questionData->id % d_senders.size()).send(std::move(questionData))) { |
101112ad | 333 | --d_queued; |
13bdc4d9 | 334 | questionData.reset(); |
101112ad RG |
335 | unixDie("write"); |
336 | } | |
d3363b40 | 337 | |
13bdc4d9 | 338 | if (d_queued > d_maxQueueLength) { |
e6a9dde5 | 339 | g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl; |
491d03d7 | 340 | // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens! |
341 | throw DistributorFatal(); | |
1258abe0 | 342 | } |
c2826d2e | 343 | |
491d03d7 | 344 | return ret; |
12c86877 | 345 | } |