]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdist-tcp.cc
Merge pull request #5523 from rubenk/fix-typos-in-logmessage
[thirdparty/pdns.git] / pdns / dnsdist-tcp.cc
CommitLineData
8a5d5053 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
8a5d5053 22#include "dnsdist.hh"
ca404e94 23#include "dnsdist-ecs.hh"
548c8b66 24#include "dnsparser.hh"
ff73f02b 25#include "ednsoptions.hh"
8a5d5053 26#include "dolog.hh"
0e41337b 27#include "lock.hh"
85c7ca75 28#include "gettime.hh"
8a5d5053 29#include <thread>
30#include <atomic>
31
32using std::thread;
33using std::atomic;
34
35/* TCP: the grand design.
36 We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
37 An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
38 we will not go there.
39
40 In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
41 This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
42 to guarantee performance.
43
44 So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
45 So whenever an answer comes in, we know where it needs to go.
46
47 Let's start naively.
48*/
49
b40cffe7
RG
50static int setupTCPDownstream(shared_ptr<DownstreamState> ds, uint16_t& downstreamFailures)
51{
52 do {
53 vinfolog("TCP connecting to downstream %s (%d)", ds->remote.toStringWithPort(), downstreamFailures);
54 int sock = SSocket(ds->remote.sin4.sin_family, SOCK_STREAM, 0);
55 try {
56 if (!IsAnyAddress(ds->sourceAddr)) {
57 SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
ead60f83 58#ifdef IP_BIND_ADDRESS_NO_PORT
b40cffe7 59 SSetsockopt(sock, SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
ead60f83 60#endif
b40cffe7
RG
61 SBind(sock, ds->sourceAddr);
62 }
63 setNonBlocking(sock);
d987f632 64#ifdef MSG_FASTOPEN
284d460c
RG
65 if (!ds->tcpFastOpen) {
66 SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
67 }
d987f632
RG
68#else
69 SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
70#endif /* MSG_FASTOPEN */
b40cffe7 71 return sock;
578fc6e5 72 }
b40cffe7
RG
73 catch(const std::runtime_error& e) {
74 /* don't leak our file descriptor if SConnect() (for example) throws */
75 downstreamFailures++;
76 close(sock);
77 if (downstreamFailures > ds->retries) {
78 throw;
79 }
80 }
81 } while(downstreamFailures <= ds->retries);
82
83 return -1;
8a5d5053 84}
85
8a5d5053 86struct ConnectionInfo
87{
88 int fd;
89 ComboAddress remote;
19d34e95 90 ClientState* cs;
8a5d5053 91};
92
254d7e4c 93uint64_t g_maxTCPQueuedConnections{1000};
9396d955
RG
94size_t g_maxTCPQueriesPerConn{0};
95size_t g_maxTCPConnectionDuration{0};
96size_t g_maxTCPConnectionsPerClient{0};
97static std::mutex tcpClientsCountMutex;
98static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
edbda1ad 99bool g_useTCPSinglePipe{false};
840ed663 100std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
9396d955 101
8a5d5053 102void* tcpClientThread(int pipefd);
103
9396d955
RG
104static void decrementTCPClientCount(const ComboAddress& client)
105{
106 if (g_maxTCPConnectionsPerClient) {
107 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
108 tcpClientsCount[client]--;
109 if (tcpClientsCount[client] == 0) {
110 tcpClientsCount.erase(client);
111 }
112 }
113}
114
8a5d5053 115void TCPClientCollection::addTCPClientThread()
a9bf3ec4 116{
edbda1ad
RG
117 int pipefds[2] = { -1, -1};
118
8a5d5053 119 vinfolog("Adding TCP Client thread");
120
edbda1ad
RG
121 if (d_useSinglePipe) {
122 pipefds[0] = d_singlePipe[0];
123 pipefds[1] = d_singlePipe[1];
adbf75f7 124 }
edbda1ad
RG
125 else {
126 if (pipe(pipefds) < 0) {
127 errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
128 return;
129 }
8a5d5053 130
edbda1ad
RG
131 if (!setNonBlocking(pipefds[1])) {
132 close(pipefds[0]);
133 close(pipefds[1]);
134 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
135 return;
136 }
adbf75f7
RG
137 }
138
ded1985a
RG
139 {
140 std::lock_guard<std::mutex> lock(d_mutex);
141
142 if (d_numthreads >= d_tcpclientthreads.capacity()) {
143 warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
edbda1ad
RG
144 if (!d_useSinglePipe) {
145 close(pipefds[0]);
146 close(pipefds[1]);
147 }
ded1985a
RG
148 return;
149 }
150
39386988
RG
151 try {
152 thread t1(tcpClientThread, pipefds[0]);
153 t1.detach();
154 }
155 catch(const std::runtime_error& e) {
156 /* the thread creation failed, don't leak */
157 errlog("Error creating a TCP thread: %s", e.what());
edbda1ad
RG
158 if (!d_useSinglePipe) {
159 close(pipefds[0]);
160 close(pipefds[1]);
161 }
39386988
RG
162 return;
163 }
164
ded1985a
RG
165 d_tcpclientthreads.push_back(pipefds[1]);
166 }
167
6c1ca990 168 ++d_numthreads;
8a5d5053 169}
170
3f6d07a4
RG
171static bool getNonBlockingMsgLen(int fd, uint16_t* len, int timeout)
172try
173{
174 uint16_t raw;
a683e8bd 175 size_t ret = readn2WithTimeout(fd, &raw, sizeof raw, timeout);
3f6d07a4
RG
176 if(ret != sizeof raw)
177 return false;
178 *len = ntohs(raw);
179 return true;
180}
181catch(...) {
182 return false;
183}
184
0f72fd5c 185static bool sendResponseToClient(int fd, const char* response, uint16_t responseLen)
fcffc585 186{
284d460c 187 return sendSizeAndMsgWithTimeout(fd, responseLen, response, g_tcpSendTimeout, nullptr, nullptr, 0, 0, 0);
fcffc585
RG
188}
189
9396d955
RG
190static bool maxConnectionDurationReached(unsigned int maxConnectionDuration, time_t start, unsigned int& remainingTime)
191{
192 if (maxConnectionDuration) {
193 time_t elapsed = time(NULL) - start;
194 if (elapsed >= maxConnectionDuration) {
195 return true;
196 }
197 remainingTime = maxConnectionDuration - elapsed;
198 }
199 return false;
200}
201
840ed663
RG
202void cleanupClosedTCPConnections(std::map<ComboAddress,int>& sockets)
203{
204 for(auto it = sockets.begin(); it != sockets.end(); ) {
205 if (isTCPSocketUsable(it->second)) {
206 ++it;
207 }
208 else {
209 close(it->second);
210 it = sockets.erase(it);
211 }
212 }
213}
214
a9bf3ec4 215std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
8a5d5053 216
8a5d5053 217void* tcpClientThread(int pipefd)
218{
219 /* we get launched with a pipe on which we receive file descriptors from clients that we own
220 from that point on */
cb52e3ee 221
c594a2f4 222 bool outstanding = false;
840ed663 223 time_t lastTCPCleanup = time(nullptr);
cb52e3ee 224
cb52e3ee 225
6ce2da14 226 auto localPolicy = g_policy.getLocal();
d11c4232 227 auto localRulactions = g_rulactions.getLocal();
d8c19b98 228 auto localRespRulactions = g_resprulactions.getLocal();
cf48b0ce 229 auto localCacheHitRespRulactions = g_cachehitresprulactions.getLocal();
bd1c631b 230 auto localDynBlockNMG = g_dynblockNMG.getLocal();
71c94675 231 auto localDynBlockSMT = g_dynblockSMT.getLocal();
886e2cf2 232 auto localPools = g_pools.getLocal();
d8c19b98
RG
233#ifdef HAVE_PROTOBUF
234 boost::uuids::random_generator uuidGenerator;
235#endif
d11c4232 236
6ce2da14 237 map<ComboAddress,int> sockets;
8a5d5053 238 for(;;) {
239 ConnectionInfo* citmp, ci;
240
18861f97
RG
241 try {
242 readn2(pipefd, &citmp, sizeof(citmp));
243 }
244 catch(const std::runtime_error& e) {
245 throw std::runtime_error("Error reading from TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode: " + e.what());
246 }
247
ded1985a 248 g_tcpclientthreads->decrementQueuedCount();
8a5d5053 249 ci=*citmp;
6ce2da14 250 delete citmp;
251
8a5d5053 252 uint16_t qlen, rlen;
ca404e94
RG
253 string largerQuery;
254 vector<uint8_t> rewrittenResponse;
6ce2da14 255 shared_ptr<DownstreamState> ds;
7cea4e39
RG
256 ComboAddress dest;
257 memset(&dest, 0, sizeof(dest));
258 dest.sin4.sin_family = ci.remote.sin4.sin_family;
259 socklen_t len = dest.getSocklen();
9396d955
RG
260 size_t queriesCount = 0;
261 time_t connectionStartTime = time(NULL);
262
3f6d07a4
RG
263 if (!setNonBlocking(ci.fd))
264 goto drop;
265
7cea4e39
RG
266 if (getsockname(ci.fd, (sockaddr*)&dest, &len)) {
267 dest = ci.cs->local;
268 }
269
8a5d5053 270 try {
c594a2f4 271 for(;;) {
9396d955 272 unsigned int remainingTime = 0;
c594a2f4
RG
273 ds = nullptr;
274 outstanding = false;
275
3f6d07a4 276 if(!getNonBlockingMsgLen(ci.fd, &qlen, g_tcpRecvTimeout))
8a5d5053 277 break;
be35c484 278
e91084ce
RG
279 ci.cs->queries++;
280 g_stats.queries++;
281
9396d955
RG
282 queriesCount++;
283
284 if (g_maxTCPQueriesPerConn && queriesCount > g_maxTCPQueriesPerConn) {
285 vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", ci.remote.toStringWithPort(), queriesCount, g_maxTCPQueriesPerConn);
286 break;
287 }
288
289 if (maxConnectionDurationReached(g_maxTCPConnectionDuration, connectionStartTime, remainingTime)) {
290 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", ci.remote.toStringWithPort());
291 break;
292 }
293
be35c484
RG
294 if (qlen < sizeof(dnsheader)) {
295 g_stats.nonCompliantQueries++;
296 break;
297 }
298
ff73f02b
RG
299 bool ednsAdded = false;
300 bool ecsAdded = false;
87c605c4
RG
301 /* if the query is small, allocate a bit more
302 memory to be able to spoof the content,
303 or to add ECS without allocating a new buffer */
304 size_t querySize = qlen <= 4096 ? qlen + 512 : qlen;
305 char queryBuffer[querySize];
11e1e08b 306 const char* query = queryBuffer;
9396d955 307 readn2WithTimeout(ci.fd, queryBuffer, qlen, g_tcpRecvTimeout, remainingTime);
e91084ce 308
11e1e08b
RG
309#ifdef HAVE_DNSCRYPT
310 std::shared_ptr<DnsCryptQuery> dnsCryptQuery = 0;
311
312 if (ci.cs->dnscryptCtx) {
313 dnsCryptQuery = std::make_shared<DnsCryptQuery>();
314 uint16_t decryptedQueryLen = 0;
315 vector<uint8_t> response;
497a6e3a 316 bool decrypted = handleDnsCryptQuery(ci.cs->dnscryptCtx, queryBuffer, qlen, dnsCryptQuery, &decryptedQueryLen, true, response);
11e1e08b
RG
317
318 if (!decrypted) {
319 if (response.size() > 0) {
a683e8bd 320 sendResponseToClient(ci.fd, reinterpret_cast<char*>(response.data()), (uint16_t) response.size());
11e1e08b
RG
321 }
322 break;
323 }
497a6e3a 324 qlen = decryptedQueryLen;
11e1e08b
RG
325 }
326#endif
2efd427d
RG
327 struct dnsheader* dh = (struct dnsheader*) query;
328
329 if(dh->qr) { // don't respond to responses
330 g_stats.nonCompliantQueries++;
331 goto drop;
332 }
333
334 if(dh->qdcount == 0) {
335 g_stats.emptyQueries++;
336 goto drop;
337 }
11e1e08b 338
e91084ce
RG
339 if (dh->rd) {
340 g_stats.rdQueries++;
341 }
342
343 const uint16_t* flags = getFlagsFromDNSHeader(dh);
344 uint16_t origFlags = *flags;
cec47783 345 uint16_t qtype, qclass;
ca404e94 346 unsigned int consumed = 0;
cec47783 347 DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
7cea4e39 348 DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, (dnsheader*)query, querySize, qlen, true);
d8c19b98
RG
349#ifdef HAVE_PROTOBUF
350 dq.uniqueId = uuidGenerator();
351#endif
e91084ce
RG
352
353 string poolname;
354 int delayMsec=0;
614bfa37
RG
355 /* we need this one to be accurate ("real") for the protobuf message */
356 struct timespec queryRealTime;
bd1c631b 357 struct timespec now;
614bfa37
RG
358 gettime(&now);
359 gettime(&queryRealTime, true);
bd1c631b 360
0c369ddc 361 if (!processQuery(localDynBlockNMG, localDynBlockSMT, localRulactions, dq, poolname, &delayMsec, now)) {
e91084ce 362 goto drop;
d11c4232 363 }
1a2a4e68 364
497a6e3a 365 if(dq.dh->qr) { // something turned it into a response
0f72fd5c 366 restoreFlags(dh, origFlags);
fcffc585 367#ifdef HAVE_DNSCRYPT
0f72fd5c
RG
368 if (!encryptResponse(queryBuffer, &dq.len, dq.size, true, dnsCryptQuery)) {
369 goto drop;
370 }
fcffc585 371#endif
0f72fd5c 372 sendResponseToClient(ci.fd, query, dq.len);
19d34e95 373 g_stats.selfAnswered++;
d11c4232 374 goto drop;
d11c4232 375 }
376
886e2cf2 377 std::shared_ptr<ServerPool> serverPool = getPool(*localPools, poolname);
e6557ec8 378 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
742c079a 379 auto policy = localPolicy->policy;
b9f8a6c8 380 if (serverPool->policy != nullptr) {
742c079a
RG
381 policy = serverPool->policy->policy;
382 }
383 {
384 std::lock_guard<std::mutex> lock(g_luamutex);
385 ds = policy(serverPool->servers, &dq);
386 packetCache = serverPool->packetCache;
387 }
ca404e94 388
ff0902ec 389 if (dq.useECS && ds && ds->useECS) {
497a6e3a 390 uint16_t newLen = dq.len;
ff0902ec 391 handleEDNSClientSubnet(queryBuffer, dq.size, consumed, &newLen, largerQuery, &ednsAdded, &ecsAdded, ci.remote, dq.ecsOverride, dq.ecsPrefixLength);
ca404e94
RG
392 if (largerQuery.empty() == false) {
393 query = largerQuery.c_str();
a683e8bd 394 dq.len = (uint16_t) largerQuery.size();
497a6e3a 395 dq.size = largerQuery.size();
ca404e94 396 } else {
497a6e3a 397 dq.len = newLen;
ca404e94
RG
398 }
399 }
400
886e2cf2 401 uint32_t cacheKey = 0;
e6557ec8 402 if (packetCache && !dq.skipCache) {
886e2cf2
RG
403 char cachedResponse[4096];
404 uint16_t cachedResponseSize = sizeof cachedResponse;
1ea747c0 405 uint32_t allowExpired = ds ? 0 : g_staleCacheEntriesTTL;
a683e8bd 406 if (packetCache->get(dq, (uint16_t) consumed, dq.dh->id, cachedResponse, &cachedResponseSize, &cacheKey, allowExpired)) {
cf48b0ce
RG
407 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, (dnsheader*) cachedResponse, sizeof cachedResponse, cachedResponseSize, true, &queryRealTime);
408#ifdef HAVE_PROTOBUF
409 dr.uniqueId = dq.uniqueId;
410#endif
411 if (!processResponse(localCacheHitRespRulactions, dr, &delayMsec)) {
412 goto drop;
413 }
414
fcffc585 415#ifdef HAVE_DNSCRYPT
0f72fd5c
RG
416 if (!encryptResponse(cachedResponse, &cachedResponseSize, sizeof cachedResponse, true, dnsCryptQuery)) {
417 goto drop;
418 }
fcffc585 419#endif
0f72fd5c 420 sendResponseToClient(ci.fd, cachedResponse, cachedResponseSize);
886e2cf2
RG
421 g_stats.cacheHits++;
422 goto drop;
423 }
424 g_stats.cacheMisses++;
425 }
426
26a3cdb7
RG
427 if(!ds) {
428 g_stats.noPolicy++;
429
430 if (g_servFailOnNoPolicy) {
431 restoreFlags(dh, origFlags);
432 dq.dh->rcode = RCode::ServFail;
433 dq.dh->qr = true;
434
435#ifdef HAVE_DNSCRYPT
436 if (!encryptResponse(queryBuffer, &dq.len, dq.size, true, dnsCryptQuery)) {
437 goto drop;
438 }
439#endif
440 sendResponseToClient(ci.fd, query, dq.len);
441 }
442
443 break;
444 }
1ea747c0 445
18861f97 446 int dsock = -1;
b40cffe7 447 uint16_t downstreamFailures=0;
87706a76 448#ifdef MSG_FASTOPEN
284d460c 449 bool freshConn = true;
87706a76 450#endif /* MSG_FASTOPEN */
6ce2da14 451 if(sockets.count(ds->remote) == 0) {
b40cffe7 452 dsock=setupTCPDownstream(ds, downstreamFailures);
5a99e22f 453 sockets[ds->remote]=dsock;
8a5d5053 454 }
284d460c 455 else {
6ce2da14 456 dsock=sockets[ds->remote];
87706a76 457#ifdef MSG_FASTOPEN
284d460c 458 freshConn = false;
87706a76 459#endif /* MSG_FASTOPEN */
284d460c 460 }
6ce2da14 461
8a5d5053 462 ds->queries++;
463 ds->outstanding++;
c594a2f4 464 outstanding = true;
8a5d5053 465
466 retry:;
3f6d07a4
RG
467 if (dsock < 0) {
468 sockets.erase(ds->remote);
469 break;
470 }
471
b40cffe7
RG
472 if (ds->retries > 0 && downstreamFailures > ds->retries) {
473 vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), downstreamFailures);
3f6d07a4 474 close(dsock);
18861f97 475 dsock=-1;
3f6d07a4
RG
476 sockets.erase(ds->remote);
477 break;
478 }
479
8b92100f 480 try {
284d460c
RG
481 int socketFlags = 0;
482#ifdef MSG_FASTOPEN
483 if (ds->tcpFastOpen && freshConn) {
484 socketFlags |= MSG_FASTOPEN;
fbe2a2e0 485 }
284d460c
RG
486#endif /* MSG_FASTOPEN */
487 sendSizeAndMsgWithTimeout(dsock, dq.len, query, ds->tcpSendTimeout, &ds->remote, &ds->sourceAddr, ds->sourceItf, 0, socketFlags);
8b92100f
RG
488 }
489 catch(const runtime_error& e) {
490 vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
491 close(dsock);
65d7f2bb
RG
492 dsock=-1;
493 sockets.erase(ds->remote);
b40cffe7
RG
494 downstreamFailures++;
495 dsock=setupTCPDownstream(ds, downstreamFailures);
5a99e22f 496 sockets[ds->remote]=dsock;
87706a76 497#ifdef MSG_FASTOPEN
284d460c 498 freshConn=true;
87706a76 499#endif /* MSG_FASTOPEN */
8b92100f
RG
500 goto retry;
501 }
502
548c8b66
RG
503 bool xfrStarted = false;
504 bool isXFR = (dq.qtype == QType::AXFR || dq.qtype == QType::IXFR);
505 if (isXFR) {
506 dq.skipCache = true;
507 }
508
509 getpacket:;
510
3f6d07a4 511 if(!getNonBlockingMsgLen(dsock, &rlen, ds->tcpRecvTimeout)) {
18eeccc9 512 vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->getName());
8a5d5053 513 close(dsock);
65d7f2bb
RG
514 dsock=-1;
515 sockets.erase(ds->remote);
b40cffe7
RG
516 downstreamFailures++;
517 dsock=setupTCPDownstream(ds, downstreamFailures);
5a99e22f 518 sockets[ds->remote]=dsock;
87706a76 519#ifdef MSG_FASTOPEN
284d460c 520 freshConn=true;
87706a76 521#endif /* MSG_FASTOPEN */
548c8b66
RG
522 if(xfrStarted) {
523 goto drop;
524 }
8a5d5053 525 goto retry;
526 }
527
fcffc585 528 size_t responseSize = rlen;
0f72fd5c 529 uint16_t addRoom = 0;
11e1e08b 530#ifdef HAVE_DNSCRYPT
0f72fd5c
RG
531 if (dnsCryptQuery && (UINT16_MAX - rlen) > (uint16_t) DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE) {
532 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
11e1e08b
RG
533 }
534#endif
0f72fd5c 535 responseSize += addRoom;
11e1e08b 536 char answerbuffer[responseSize];
3f6d07a4 537 readn2WithTimeout(dsock, answerbuffer, rlen, ds->tcpRecvTimeout);
11e1e08b
RG
538 char* response = answerbuffer;
539 uint16_t responseLen = rlen;
fdb47c9f
RG
540 if (outstanding) {
541 /* might be false for {A,I}XFR */
542 --ds->outstanding;
543 outstanding = false;
544 }
ca404e94 545
f87c4aff
RG
546 if (rlen < sizeof(dnsheader)) {
547 break;
548 }
549
fcffc585 550 if (!responseContentMatches(response, responseLen, qname, qtype, qclass, ds->remote)) {
f87c4aff
RG
551 break;
552 }
553
ff73f02b 554 if (!fixUpResponse(&response, &responseLen, &responseSize, qname, origFlags, ednsAdded, ecsAdded, rewrittenResponse, addRoom)) {
fcffc585 555 break;
ca404e94 556 }
aeb36780 557
ec469dd7 558 dh = (struct dnsheader*) response;
7cea4e39 559 DNSResponse dr(&qname, qtype, qclass, &dest, &ci.remote, dh, responseSize, responseLen, true, &queryRealTime);
d8c19b98
RG
560#ifdef HAVE_PROTOBUF
561 dr.uniqueId = dq.uniqueId;
562#endif
788c3243 563 if (!processResponse(localRespRulactions, dr, &delayMsec)) {
8146444b 564 break;
d8c19b98
RG
565 }
566
e6557ec8 567 if (packetCache && !dq.skipCache) {
2714396e 568 packetCache->insert(cacheKey, qname, qtype, qclass, response, responseLen, true, dh->rcode);
886e2cf2
RG
569 }
570
11e1e08b 571#ifdef HAVE_DNSCRYPT
0f72fd5c
RG
572 if (!encryptResponse(response, &responseLen, responseSize, true, dnsCryptQuery)) {
573 goto drop;
574 }
11e1e08b 575#endif
0f72fd5c 576 if (!sendResponseToClient(ci.fd, response, responseLen)) {
fcffc585
RG
577 break;
578 }
19d34e95 579
284d460c
RG
580 if (isXFR) {
581 if (dh->rcode == 0 && dh->ancount != 0) {
582 if (xfrStarted == false) {
583 xfrStarted = true;
584 if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 1) {
585 goto getpacket;
586 }
587 }
588 else if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 0) {
548c8b66
RG
589 goto getpacket;
590 }
591 }
284d460c
RG
592 /* Don't reuse the TCP connection after an {A,I}XFR */
593 close(dsock);
594 dsock=-1;
595 sockets.erase(ds->remote);
548c8b66
RG
596 }
597
19d34e95 598 g_stats.responses++;
3fcaeeac 599 struct timespec answertime;
85c7ca75 600 gettime(&answertime);
3fcaeeac 601 unsigned int udiff = 1000000.0*DiffTime(now,answertime);
602 {
603 std::lock_guard<std::mutex> lock(g_rings.respMutex);
b9ebcf13 604 g_rings.respRing.push_back({answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote});
3fcaeeac 605 }
606
ca404e94
RG
607 largerQuery.clear();
608 rewrittenResponse.clear();
8a5d5053 609 }
610 }
611 catch(...){}
d11c4232 612
613 drop:;
8a5d5053 614
3c17e261 615 vinfolog("Closing TCP client connection with %s", ci.remote.toStringWithPort());
f6a81077
RG
616 if (ci.fd >= 0) {
617 close(ci.fd);
618 }
619 ci.fd = -1;
c594a2f4
RG
620 if (ds && outstanding) {
621 outstanding = false;
6ce2da14 622 --ds->outstanding;
c594a2f4 623 }
9396d955 624 decrementTCPClientCount(ci.remote);
840ed663
RG
625
626 if (g_downstreamTCPCleanupInterval > 0 && (connectionStartTime > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
627 cleanupClosedTCPConnections(sockets);
628 lastTCPCleanup = time(nullptr);
629 }
8a5d5053 630 }
631 return 0;
632}
633
8a5d5053 634/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
635 they will hand off to worker threads & spawn more of them if required
636*/
637void* tcpAcceptorThread(void* p)
638{
639 ClientState* cs = (ClientState*) p;
9396d955 640 bool tcpClientCountIncremented = false;
8a5d5053 641 ComboAddress remote;
642 remote.sin4.sin_family = cs->local.sin4.sin_family;
643
a9bf3ec4 644 g_tcpclientthreads->addTCPClientThread();
8a5d5053 645
b719e465 646 auto acl = g_ACL.getLocal();
8a5d5053 647 for(;;) {
8de680ec
RG
648 bool queuedCounterIncremented = false;
649 ConnectionInfo* ci = nullptr;
9396d955 650 tcpClientCountIncremented = false;
8a5d5053 651 try {
3c17e261 652 ci = new ConnectionInfo;
19d34e95 653 ci->cs = cs;
3c17e261 654 ci->fd = -1;
8a5d5053 655 ci->fd = SAccept(cs->tcpFD, remote);
963bef8d 656
b719e465 657 if(!acl->match(remote)) {
658 g_stats.aclDrops++;
659 close(ci->fd);
660 delete ci;
8de680ec 661 ci=nullptr;
b719e465 662 vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
663 continue;
664 }
8a5d5053 665
ded1985a 666 if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
6c1ca990
RG
667 close(ci->fd);
668 delete ci;
669 ci=nullptr;
670 vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
671 continue;
672 }
673
9396d955
RG
674 if (g_maxTCPConnectionsPerClient) {
675 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
676
677 if (tcpClientsCount[remote] >= g_maxTCPConnectionsPerClient) {
678 close(ci->fd);
679 delete ci;
680 ci=nullptr;
681 vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort());
682 continue;
683 }
684 tcpClientsCount[remote]++;
685 tcpClientCountIncremented = true;
686 }
687
b719e465 688 vinfolog("Got TCP connection from %s", remote.toStringWithPort());
9396d955 689
8a5d5053 690 ci->remote = remote;
a9bf3ec4
RG
691 int pipe = g_tcpclientthreads->getThread();
692 if (pipe >= 0) {
8de680ec 693 queuedCounterIncremented = true;
a9bf3ec4
RG
694 writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
695 }
696 else {
ded1985a 697 g_tcpclientthreads->decrementQueuedCount();
8de680ec 698 queuedCounterIncremented = false;
a9bf3ec4
RG
699 close(ci->fd);
700 delete ci;
18861f97 701 ci=nullptr;
9396d955
RG
702 if(tcpClientCountIncremented) {
703 decrementTCPClientCount(remote);
704 }
a9bf3ec4 705 }
8a5d5053 706 }
3c17e261 707 catch(std::exception& e) {
708 errlog("While reading a TCP question: %s", e.what());
709 if(ci && ci->fd >= 0)
710 close(ci->fd);
9396d955
RG
711 if(tcpClientCountIncremented) {
712 decrementTCPClientCount(remote);
713 }
3c17e261 714 delete ci;
8de680ec
RG
715 ci = nullptr;
716 if (queuedCounterIncremented) {
ded1985a 717 g_tcpclientthreads->decrementQueuedCount();
8de680ec 718 }
3c17e261 719 }
8a5d5053 720 catch(...){}
721 }
722
723 return 0;
724}
6ce2da14 725
726
6885d4bf 727bool getMsgLen32(int fd, uint32_t* len)
6ce2da14 728try
729{
6885d4bf 730 uint32_t raw;
a683e8bd 731 size_t ret = readn2(fd, &raw, sizeof raw);
ca404e94 732 if(ret != sizeof raw)
6ce2da14 733 return false;
6885d4bf 734 *len = ntohl(raw);
735 if(*len > 10000000) // arbitrary 10MB limit
736 return false;
6ce2da14 737 return true;
738}
739catch(...) {
740 return false;
741}
742
6885d4bf 743bool putMsgLen32(int fd, uint32_t len)
6ce2da14 744try
745{
6885d4bf 746 uint32_t raw = htonl(len);
a683e8bd 747 size_t ret = writen2(fd, &raw, sizeof raw);
ca404e94 748 return ret==sizeof raw;
6ce2da14 749}
750catch(...) {
751 return false;
752}