]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/distributor.hh
rec: allow exception to proxy protocal usage for specific listen addresses
[thirdparty/pdns.git] / pdns / distributor.hh
CommitLineData
12c86877 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
e8c59f2d 22#pragma once
15e39ee4
OM
23#ifdef HAVE_CONFIG_H
24#include "config.h"
25#endif
12c86877
BH
26#include <string>
27#include <deque>
28#include <queue>
29#include <vector>
0ddde5fb 30#include <thread>
519f5484 31#include "threadname.hh"
76473b92 32#include <unistd.h>
13bdc4d9
RG
33
34#include "channel.hh"
12c86877
BH
35#include "logger.hh"
36#include "dns.hh"
37#include "dnsbackend.hh"
5c409fa2 38#include "pdnsexception.hh"
092c9cc4 39#include "arguments.hh"
491d03d7 40#include <atomic>
092c9cc4 41#include "statbag.hh"
c113acc3 42#include "gss_context.hh"
12c86877 43
092c9cc4 44extern StatBag S;
12c86877
BH
45
46/** the Distributor template class enables you to multithread slow question/answer
47 processes.
48
ad7d9cd0 49 Questions are posed to the Distributor, which returns the answer via a callback.
bdc9f8d2 50
491d03d7 51 The Distributor spawns sufficient backends, and if they thrown an exception,
52 it will cycle the backend but drop the query that was active during the exception.
bdc9f8d2 53*/
ad7d9cd0 54
12c86877
BH
55template<class Answer, class Question, class Backend> class Distributor
56{
57public:
c2826d2e 58 static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
386c5b13 59 typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
c2826d2e 60 virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
491d03d7 61 virtual int getQueueSize() =0; //!< Returns length of question queue
d70682f7 62 virtual bool isOverloaded() =0;
c2826d2e 63 virtual ~Distributor() { cerr<<__func__<<endl;}
ad7d9cd0
MZ
64};
65
66template<class Answer, class Question, class Backend> class SingleThreadDistributor
67 : public Distributor<Answer, Question, Backend>
68{
69public:
f6c19c4f
CHB
70 SingleThreadDistributor(const SingleThreadDistributor&) = delete;
71 void operator=(const SingleThreadDistributor&) = delete;
ad7d9cd0 72 SingleThreadDistributor();
386c5b13 73 typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
c2826d2e 74 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
cff3e04d 75 int getQueueSize() override {
ad7d9cd0
MZ
76 return 0;
77 }
78
cff3e04d 79 bool isOverloaded() override
12c86877 80 {
ad7d9cd0
MZ
81 return false;
82 }
83
84private:
c2826d2e 85 std::unique_ptr<Backend> b{nullptr};
ad7d9cd0
MZ
86};
87
88template<class Answer, class Question, class Backend> class MultiThreadDistributor
89 : public Distributor<Answer, Question, Backend>
90{
91public:
f6c19c4f
CHB
92 MultiThreadDistributor(const MultiThreadDistributor&) = delete;
93 void operator=(const MultiThreadDistributor&) = delete;
491d03d7 94 MultiThreadDistributor(int n);
386c5b13 95 typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
c2826d2e 96 int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
0ddde5fb 97 void distribute(int n);
491d03d7 98 int getQueueSize() override {
99 return d_queued;
12c86877
BH
100 }
101
12c86877
BH
102 struct QuestionData
103 {
c2826d2e
RG
104 QuestionData(const Question& query): Q(query)
105 {
386c5b13 106 start = Q.d_dt.udiff();
c2826d2e
RG
107 }
108
109 Question Q;
be4b9449
AT
110 callback_t callback{nullptr};
111 int id{0};
386c5b13 112 int start{0};
12c86877
BH
113 };
114
491d03d7 115 bool isOverloaded() override
e7e691cc 116 {
d3363b40 117 return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
e7e691cc 118 }
c2826d2e 119
12c86877 120private:
13bdc4d9
RG
121 std::vector<pdns::channel::Sender<QuestionData>> d_senders;
122 std::vector<pdns::channel::Receiver<QuestionData>> d_receivers;
123 time_t d_last_started{0};
0ddde5fb 124 std::atomic<unsigned int> d_queued{0};
13bdc4d9
RG
125 unsigned int d_overloadQueueLength{0};
126 unsigned int d_maxQueueLength{0};
127 int d_nextid{0};
128 int d_num_threads{0};
12c86877
BH
129};
130
c2826d2e 131template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
ad7d9cd0
MZ
132{
133 if( n == 1 )
c2826d2e 134 return new SingleThreadDistributor<Answer,Question,Backend>();
ad7d9cd0 135 else
c2826d2e 136 return new MultiThreadDistributor<Answer,Question,Backend>( n );
ad7d9cd0
MZ
137}
138
139template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
140{
e6a9dde5 141 g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
07dbfb4f 142 try {
c2826d2e 143 b=make_unique<Backend>();
07dbfb4f
KM
144 }
145 catch(const PDNSException &AE) {
e6a9dde5 146 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
dc386240 147 _exit(1);
07dbfb4f 148 }
0ddde5fb
RG
149 catch(const std::exception& e) {
150 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
151 _exit(1);
152 }
07dbfb4f 153 catch(...) {
e6a9dde5 154 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
dc386240 155 _exit(1);
07dbfb4f 156 }
ad7d9cd0 157}
12c86877 158
13bdc4d9
RG
159template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int numberOfThreads) :
160 d_last_started(time(nullptr)), d_overloadQueueLength(::arg().asNum("overload-queue-length")), d_maxQueueLength(::arg().asNum("max-queue-length")), d_num_threads(numberOfThreads)
12c86877 161{
13bdc4d9 162 if (numberOfThreads < 1) {
e6a9dde5 163 g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
dc386240 164 _exit(1);
0469c6ed
AT
165 }
166
13bdc4d9 167 for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
c1d76521 168 auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking);
13bdc4d9
RG
169 d_senders.push_back(std::move(sender));
170 d_receivers.push_back(std::move(receiver));
171 }
172
173 g_log<<Logger::Warning<<"About to create "<<numberOfThreads<<" backend threads for UDP"<<endl;
174
175 for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
176 std::thread t([=](){distribute(distributorIdx);});
0ddde5fb 177 t.detach();
12c86877
BH
178 Utility::usleep(50000); // we've overloaded mysql in the past :-)
179 }
e6a9dde5 180 g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
12c86877
BH
181}
182
ad7d9cd0 183
12c86877 184// start of a new thread
0ddde5fb 185template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
12c86877 186{
55941601 187 // this is the longest name we can use, not a typo
519f5484 188 setThreadName("pdns/distributo");
491d03d7 189
12c86877 190 try {
13bdc4d9
RG
191 auto b = make_unique<Backend>(); // this will answer our questions
192 int queuetimeout = ::arg().asNum("queue-limit");
193 auto& receiver = d_receivers.at(ournum);
12c86877 194
13bdc4d9
RG
195 for (;;) {
196 auto tempQD = receiver.receive();
197 if (!tempQD) {
491d03d7 198 unixDie("read");
13bdc4d9 199 }
0ddde5fb 200 --d_queued;
13bdc4d9 201 auto questionData = std::move(*tempQD);
c2826d2e 202 std::unique_ptr<Answer> a = nullptr;
13bdc4d9 203 if (queuetimeout && questionData->Q.d_dt.udiff() > queuetimeout * 1000) {
12c86877
BH
204 S.inc("timedout-packets");
205 continue;
13bdc4d9 206 }
bf17c14b 207
13bdc4d9 208 bool allowRetry = true;
bf17c14b 209retry:
12c86877
BH
210 // this is the only point where we interact with the backend (synchronous)
211 try {
bf17c14b 212 if (!b) {
13bdc4d9
RG
213 allowRetry = false;
214 b = make_unique<Backend>();
bf17c14b 215 }
13bdc4d9 216 a = b->question(questionData->Q);
12c86877 217 }
13bdc4d9 218 catch (const PDNSException &e) {
c2826d2e 219 b.reset();
bf17c14b 220 if (!allowRetry) {
e6a9dde5 221 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
13bdc4d9 222 a = questionData->Q.replyPacket();
bf17c14b
KM
223
224 a->setRcode(RCode::ServFail);
225 S.inc("servfail-packets");
13bdc4d9 226 S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
bf17c14b 227 } else {
e6a9dde5 228 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
bf17c14b
KM
229 goto retry;
230 }
12c86877 231 }
13bdc4d9 232 catch (...) {
c2826d2e 233 b.reset();
bf17c14b 234 if (!allowRetry) {
13bdc4d9
RG
235 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
236 a = questionData->Q.replyPacket();
bf17c14b
KM
237
238 a->setRcode(RCode::ServFail);
239 S.inc("servfail-packets");
13bdc4d9 240 S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
bf17c14b 241 } else {
13bdc4d9 242 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
bf17c14b
KM
243 goto retry;
244 }
12c86877
BH
245 }
246
13bdc4d9 247 questionData->callback(a, questionData->start);
15e39ee4 248#ifdef ENABLE_GSS_TSIG
c113acc3 249 if (g_doGssTSIG && a != nullptr) {
13bdc4d9 250 questionData->Q.cleanupGSS(a->d.rcode);
647e83a5 251 }
15e39ee4 252#endif
13bdc4d9 253 questionData.reset();
12c86877 254 }
bf17c14b 255
c2826d2e 256 b.reset();
12c86877 257 }
13bdc4d9 258 catch (const PDNSException &AE) {
e6a9dde5 259 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
dc386240 260 _exit(1);
12c86877 261 }
13bdc4d9 262 catch (const std::exception& e) {
0ddde5fb
RG
263 g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
264 _exit(1);
265 }
13bdc4d9 266 catch (...) {
e6a9dde5 267 g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
dc386240 268 _exit(1);
12c86877 269 }
12c86877
BH
270}
271
c2826d2e 272template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
12c86877 273{
386c5b13 274 int start = q.d_dt.udiff();
c2826d2e 275 std::unique_ptr<Answer> a = nullptr;
bf17c14b
KM
276 bool allowRetry=true;
277retry:
ad7d9cd0 278 try {
bf17c14b
KM
279 if (!b) {
280 allowRetry=false;
c2826d2e 281 b=make_unique<Backend>();
bf17c14b 282 }
ad7d9cd0
MZ
283 a=b->question(q); // a can be NULL!
284 }
e6577aef 285 catch(const PDNSException &e) {
c2826d2e 286 b.reset();
bf17c14b 287 if (!allowRetry) {
e6a9dde5 288 g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
c2826d2e 289 a=q.replyPacket();
bf17c14b
KM
290
291 a->setRcode(RCode::ServFail);
292 S.inc("servfail-packets");
c2826d2e 293 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
bf17c14b 294 } else {
e6a9dde5 295 g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
bf17c14b
KM
296 goto retry;
297 }
12c86877 298 }
ad7d9cd0 299 catch(...) {
c2826d2e 300 b.reset();
bf17c14b 301 if (!allowRetry) {
13bdc4d9 302 g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
c2826d2e 303 a=q.replyPacket();
bf17c14b
KM
304
305 a->setRcode(RCode::ServFail);
306 S.inc("servfail-packets");
c2826d2e 307 S.ringAccount("servfail-queries", q.qdomain, q.qtype);
bf17c14b 308 } else {
13bdc4d9 309 g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
bf17c14b
KM
310 goto retry;
311 }
12c86877 312 }
386c5b13 313 callback(a, start);
15e39ee4 314#ifdef ENABLE_GSS_TSIG
c113acc3 315 if (g_doGssTSIG && a != nullptr) {
647e83a5
O
316 q.cleanupGSS(a->d.rcode);
317 }
15e39ee4 318#endif
ad7d9cd0
MZ
319 return 0;
320}
321
491d03d7 322struct DistributorFatal{};
323
c2826d2e 324template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
ad7d9cd0 325{
2301afa0 326 // this is passed to other process over pipe and released there
13bdc4d9
RG
327 auto questionData = std::make_unique<QuestionData>(q);
328 auto ret = questionData->id = d_nextid++; // might be deleted after write!
329 questionData->callback = callback;
e7e691cc 330
101112ad 331 ++d_queued;
13bdc4d9 332 if (!d_senders.at(questionData->id % d_senders.size()).send(std::move(questionData))) {
101112ad 333 --d_queued;
13bdc4d9 334 questionData.reset();
101112ad
RG
335 unixDie("write");
336 }
d3363b40 337
13bdc4d9 338 if (d_queued > d_maxQueueLength) {
e6a9dde5 339 g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
491d03d7 340 // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
341 throw DistributorFatal();
1258abe0 342 }
c2826d2e 343
491d03d7 344 return ret;
12c86877 345}