]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdist-tcp.cc
rec: Don't account chained queries more than once
[thirdparty/pdns.git] / pdns / dnsdist-tcp.cc
CommitLineData
8a5d5053 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
8a5d5053 22#include "dnsdist.hh"
ca404e94 23#include "dnsdist-ecs.hh"
548c8b66 24#include "dnsparser.hh"
ff73f02b 25#include "ednsoptions.hh"
8a5d5053 26#include "dolog.hh"
0e41337b 27#include "lock.hh"
85c7ca75 28#include "gettime.hh"
8a5d5053 29#include <thread>
30#include <atomic>
31
32using std::thread;
33using std::atomic;
34
35/* TCP: the grand design.
36 We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
37 An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
38 we will not go there.
39
40 In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
41 This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
42 to guarantee performance.
43
44 So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
45 So whenever an answer comes in, we know where it needs to go.
46
47 Let's start naively.
48*/
49
b40cffe7
RG
50static int setupTCPDownstream(shared_ptr<DownstreamState> ds, uint16_t& downstreamFailures)
51{
52 do {
53 vinfolog("TCP connecting to downstream %s (%d)", ds->remote.toStringWithPort(), downstreamFailures);
54 int sock = SSocket(ds->remote.sin4.sin_family, SOCK_STREAM, 0);
55 try {
56 if (!IsAnyAddress(ds->sourceAddr)) {
57 SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
ead60f83 58#ifdef IP_BIND_ADDRESS_NO_PORT
5602f131
DM
59 if (ds->ipBindAddrNoPort) {
60 SSetsockopt(sock, SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
61 }
ead60f83 62#endif
b40cffe7
RG
63 SBind(sock, ds->sourceAddr);
64 }
65 setNonBlocking(sock);
d987f632 66#ifdef MSG_FASTOPEN
284d460c
RG
67 if (!ds->tcpFastOpen) {
68 SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
69 }
d987f632
RG
70#else
71 SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
72#endif /* MSG_FASTOPEN */
b40cffe7 73 return sock;
578fc6e5 74 }
b40cffe7
RG
75 catch(const std::runtime_error& e) {
76 /* don't leak our file descriptor if SConnect() (for example) throws */
77 downstreamFailures++;
78 close(sock);
79 if (downstreamFailures > ds->retries) {
80 throw;
81 }
82 }
83 } while(downstreamFailures <= ds->retries);
84
85 return -1;
8a5d5053 86}
87
8a5d5053 88struct ConnectionInfo
89{
90 int fd;
91 ComboAddress remote;
19d34e95 92 ClientState* cs;
8a5d5053 93};
94
254d7e4c 95uint64_t g_maxTCPQueuedConnections{1000};
9396d955
RG
96size_t g_maxTCPQueriesPerConn{0};
97size_t g_maxTCPConnectionDuration{0};
98size_t g_maxTCPConnectionsPerClient{0};
99static std::mutex tcpClientsCountMutex;
100static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
edbda1ad 101bool g_useTCPSinglePipe{false};
840ed663 102std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
9396d955 103
8a5d5053 104void* tcpClientThread(int pipefd);
105
9396d955
RG
106static void decrementTCPClientCount(const ComboAddress& client)
107{
108 if (g_maxTCPConnectionsPerClient) {
109 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
110 tcpClientsCount[client]--;
111 if (tcpClientsCount[client] == 0) {
112 tcpClientsCount.erase(client);
113 }
114 }
115}
116
8a5d5053 117void TCPClientCollection::addTCPClientThread()
a9bf3ec4 118{
edbda1ad
RG
119 int pipefds[2] = { -1, -1};
120
8a5d5053 121 vinfolog("Adding TCP Client thread");
122
edbda1ad
RG
123 if (d_useSinglePipe) {
124 pipefds[0] = d_singlePipe[0];
125 pipefds[1] = d_singlePipe[1];
adbf75f7 126 }
edbda1ad
RG
127 else {
128 if (pipe(pipefds) < 0) {
129 errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
130 return;
131 }
8a5d5053 132
edbda1ad
RG
133 if (!setNonBlocking(pipefds[1])) {
134 close(pipefds[0]);
135 close(pipefds[1]);
136 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
137 return;
138 }
adbf75f7
RG
139 }
140
ded1985a
RG
141 {
142 std::lock_guard<std::mutex> lock(d_mutex);
143
144 if (d_numthreads >= d_tcpclientthreads.capacity()) {
145 warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
edbda1ad
RG
146 if (!d_useSinglePipe) {
147 close(pipefds[0]);
148 close(pipefds[1]);
149 }
ded1985a
RG
150 return;
151 }
152
39386988
RG
153 try {
154 thread t1(tcpClientThread, pipefds[0]);
155 t1.detach();
156 }
157 catch(const std::runtime_error& e) {
158 /* the thread creation failed, don't leak */
159 errlog("Error creating a TCP thread: %s", e.what());
edbda1ad
RG
160 if (!d_useSinglePipe) {
161 close(pipefds[0]);
162 close(pipefds[1]);
163 }
39386988
RG
164 return;
165 }
166
ded1985a
RG
167 d_tcpclientthreads.push_back(pipefds[1]);
168 }
169
6c1ca990 170 ++d_numthreads;
8a5d5053 171}
172
3f6d07a4
RG
173static bool getNonBlockingMsgLen(int fd, uint16_t* len, int timeout)
174try
175{
176 uint16_t raw;
a683e8bd 177 size_t ret = readn2WithTimeout(fd, &raw, sizeof raw, timeout);
3f6d07a4
RG
178 if(ret != sizeof raw)
179 return false;
180 *len = ntohs(raw);
181 return true;
182}
183catch(...) {
184 return false;
185}
186
0f72fd5c 187static bool sendResponseToClient(int fd, const char* response, uint16_t responseLen)
fcffc585 188{
284d460c 189 return sendSizeAndMsgWithTimeout(fd, responseLen, response, g_tcpSendTimeout, nullptr, nullptr, 0, 0, 0);
fcffc585
RG
190}
191
9396d955
RG
192static bool maxConnectionDurationReached(unsigned int maxConnectionDuration, time_t start, unsigned int& remainingTime)
193{
194 if (maxConnectionDuration) {
0b75a272
PL
195 time_t curtime = time(nullptr);
196 unsigned int elapsed = 0;
197 if (curtime > start) { // To prevent issues when time goes backward
198 elapsed = curtime - start;
199 }
9396d955
RG
200 if (elapsed >= maxConnectionDuration) {
201 return true;
202 }
203 remainingTime = maxConnectionDuration - elapsed;
204 }
205 return false;
206}
207
840ed663
RG
208void cleanupClosedTCPConnections(std::map<ComboAddress,int>& sockets)
209{
210 for(auto it = sockets.begin(); it != sockets.end(); ) {
211 if (isTCPSocketUsable(it->second)) {
212 ++it;
213 }
214 else {
215 close(it->second);
216 it = sockets.erase(it);
217 }
218 }
219}
220
a9bf3ec4 221std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
8a5d5053 222
8a5d5053 223void* tcpClientThread(int pipefd)
224{
225 /* we get launched with a pipe on which we receive file descriptors from clients that we own
226 from that point on */
cb52e3ee 227
c594a2f4 228 bool outstanding = false;
840ed663 229 time_t lastTCPCleanup = time(nullptr);
cb52e3ee 230
0beaa5c8 231 LocalHolders holders;
d8c19b98 232 auto localRespRulactions = g_resprulactions.getLocal();
57847d65
RG
233#ifdef HAVE_DNSCRYPT
234 /* when the answer is encrypted in place, we need to get a copy
235 of the original header before encryption to fill the ring buffer */
236 dnsheader dhCopy;
237#endif
d11c4232 238
6ce2da14 239 map<ComboAddress,int> sockets;
8a5d5053 240 for(;;) {
241 ConnectionInfo* citmp, ci;
242
18861f97
RG
243 try {
244 readn2(pipefd, &citmp, sizeof(citmp));
245 }
246 catch(const std::runtime_error& e) {
247 throw std::runtime_error("Error reading from TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode: " + e.what());
248 }
249
ded1985a 250 g_tcpclientthreads->decrementQueuedCount();
8a5d5053 251 ci=*citmp;
6ce2da14 252 delete citmp;
253
8a5d5053 254 uint16_t qlen, rlen;
ca404e94 255 vector<uint8_t> rewrittenResponse;
6ce2da14 256 shared_ptr<DownstreamState> ds;
7cea4e39 257 ComboAddress dest;
1caa9c83 258 dest.reset();
7cea4e39
RG
259 dest.sin4.sin_family = ci.remote.sin4.sin_family;
260 socklen_t len = dest.getSocklen();
9396d955
RG
261 size_t queriesCount = 0;
262 time_t connectionStartTime = time(NULL);
0beaa5c8 263 std::vector<char> queryBuffer;
3f6d07a4 264
7cea4e39
RG
265 if (getsockname(ci.fd, (sockaddr*)&dest, &len)) {
266 dest = ci.cs->local;
267 }
268
8a5d5053 269 try {
c594a2f4 270 for(;;) {
9396d955 271 unsigned int remainingTime = 0;
c594a2f4
RG
272 ds = nullptr;
273 outstanding = false;
274
3f6d07a4 275 if(!getNonBlockingMsgLen(ci.fd, &qlen, g_tcpRecvTimeout))
8a5d5053 276 break;
be35c484 277
0beaa5c8
RG
278 queriesCount++;
279
280 if (qlen < sizeof(dnsheader)) {
281 g_stats.nonCompliantQueries++;
282 break;
283 }
284
e91084ce
RG
285 ci.cs->queries++;
286 g_stats.queries++;
287
9396d955
RG
288 if (g_maxTCPQueriesPerConn && queriesCount > g_maxTCPQueriesPerConn) {
289 vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", ci.remote.toStringWithPort(), queriesCount, g_maxTCPQueriesPerConn);
290 break;
291 }
292
293 if (maxConnectionDurationReached(g_maxTCPConnectionDuration, connectionStartTime, remainingTime)) {
294 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", ci.remote.toStringWithPort());
295 break;
296 }
297
ff73f02b
RG
298 bool ednsAdded = false;
299 bool ecsAdded = false;
0beaa5c8 300 /* allocate a bit more memory to be able to spoof the content,
87c605c4 301 or to add ECS without allocating a new buffer */
0beaa5c8
RG
302 queryBuffer.reserve(qlen + 512);
303
304 char* query = &queryBuffer[0];
305 readn2WithTimeout(ci.fd, query, qlen, g_tcpRecvTimeout, remainingTime);
e91084ce 306
11e1e08b 307#ifdef HAVE_DNSCRYPT
0beaa5c8 308 std::shared_ptr<DnsCryptQuery> dnsCryptQuery = nullptr;
11e1e08b
RG
309
310 if (ci.cs->dnscryptCtx) {
311 dnsCryptQuery = std::make_shared<DnsCryptQuery>();
312 uint16_t decryptedQueryLen = 0;
313 vector<uint8_t> response;
0beaa5c8 314 bool decrypted = handleDnsCryptQuery(ci.cs->dnscryptCtx, query, qlen, dnsCryptQuery, &decryptedQueryLen, true, response);
11e1e08b
RG
315
316 if (!decrypted) {
317 if (response.size() > 0) {
a683e8bd 318 sendResponseToClient(ci.fd, reinterpret_cast<char*>(response.data()), (uint16_t) response.size());
11e1e08b
RG
319 }
320 break;
321 }
497a6e3a 322 qlen = decryptedQueryLen;
11e1e08b
RG
323 }
324#endif
0beaa5c8 325 struct dnsheader* dh = reinterpret_cast<struct dnsheader*>(query);
2efd427d 326
0beaa5c8 327 if (!checkQueryHeaders(dh)) {
2efd427d
RG
328 goto drop;
329 }
330
e91084ce
RG
331 const uint16_t* flags = getFlagsFromDNSHeader(dh);
332 uint16_t origFlags = *flags;
cec47783 333 uint16_t qtype, qclass;
ca404e94 334 unsigned int consumed = 0;
cec47783 335 DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
0beaa5c8 336 DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, dh, queryBuffer.capacity(), qlen, true);
e91084ce
RG
337
338 string poolname;
339 int delayMsec=0;
614bfa37
RG
340 /* we need this one to be accurate ("real") for the protobuf message */
341 struct timespec queryRealTime;
bd1c631b 342 struct timespec now;
614bfa37
RG
343 gettime(&now);
344 gettime(&queryRealTime, true);
bd1c631b 345
0beaa5c8 346 if (!processQuery(holders, dq, poolname, &delayMsec, now)) {
e91084ce 347 goto drop;
d11c4232 348 }
1a2a4e68 349
497a6e3a 350 if(dq.dh->qr) { // something turned it into a response
0f72fd5c 351 restoreFlags(dh, origFlags);
fcffc585 352#ifdef HAVE_DNSCRYPT
0beaa5c8 353 if (!encryptResponse(query, &dq.len, dq.size, true, dnsCryptQuery, nullptr, nullptr)) {
0f72fd5c
RG
354 goto drop;
355 }
fcffc585 356#endif
0f72fd5c 357 sendResponseToClient(ci.fd, query, dq.len);
19d34e95 358 g_stats.selfAnswered++;
d11c4232 359 goto drop;
d11c4232 360 }
361
0beaa5c8 362 std::shared_ptr<ServerPool> serverPool = getPool(*holders.pools, poolname);
e6557ec8 363 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
0beaa5c8 364 auto policy = holders.policy->policy;
b9f8a6c8 365 if (serverPool->policy != nullptr) {
742c079a
RG
366 policy = serverPool->policy->policy;
367 }
368 {
369 std::lock_guard<std::mutex> lock(g_luamutex);
370 ds = policy(serverPool->servers, &dq);
371 packetCache = serverPool->packetCache;
372 }
ca404e94 373
ff0902ec 374 if (dq.useECS && ds && ds->useECS) {
497a6e3a 375 uint16_t newLen = dq.len;
0beaa5c8
RG
376 if (!handleEDNSClientSubnet(query, dq.size, consumed, &newLen, &ednsAdded, &ecsAdded, ci.remote, dq.ecsOverride, dq.ecsPrefixLength)) {
377 vinfolog("Dropping query from %s because we couldn't insert the ECS value", ci.remote.toStringWithPort());
378 goto drop;
ca404e94 379 }
0beaa5c8 380 dq.len = newLen;
ca404e94
RG
381 }
382
886e2cf2 383 uint32_t cacheKey = 0;
e6557ec8 384 if (packetCache && !dq.skipCache) {
886e2cf2
RG
385 char cachedResponse[4096];
386 uint16_t cachedResponseSize = sizeof cachedResponse;
1ea747c0 387 uint32_t allowExpired = ds ? 0 : g_staleCacheEntriesTTL;
a683e8bd 388 if (packetCache->get(dq, (uint16_t) consumed, dq.dh->id, cachedResponse, &cachedResponseSize, &cacheKey, allowExpired)) {
cf48b0ce
RG
389 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, (dnsheader*) cachedResponse, sizeof cachedResponse, cachedResponseSize, true, &queryRealTime);
390#ifdef HAVE_PROTOBUF
391 dr.uniqueId = dq.uniqueId;
392#endif
0beaa5c8 393 if (!processResponse(holders.cacheHitRespRulactions, dr, &delayMsec)) {
cf48b0ce
RG
394 goto drop;
395 }
396
fcffc585 397#ifdef HAVE_DNSCRYPT
57847d65 398 if (!encryptResponse(cachedResponse, &cachedResponseSize, sizeof cachedResponse, true, dnsCryptQuery, nullptr, nullptr)) {
0f72fd5c
RG
399 goto drop;
400 }
fcffc585 401#endif
0f72fd5c 402 sendResponseToClient(ci.fd, cachedResponse, cachedResponseSize);
886e2cf2
RG
403 g_stats.cacheHits++;
404 goto drop;
405 }
406 g_stats.cacheMisses++;
407 }
408
26a3cdb7
RG
409 if(!ds) {
410 g_stats.noPolicy++;
411
412 if (g_servFailOnNoPolicy) {
413 restoreFlags(dh, origFlags);
414 dq.dh->rcode = RCode::ServFail;
415 dq.dh->qr = true;
416
417#ifdef HAVE_DNSCRYPT
0beaa5c8 418 if (!encryptResponse(query, &dq.len, dq.size, true, dnsCryptQuery, nullptr, nullptr)) {
26a3cdb7
RG
419 goto drop;
420 }
421#endif
422 sendResponseToClient(ci.fd, query, dq.len);
423 }
424
425 break;
426 }
1ea747c0 427
18861f97 428 int dsock = -1;
b40cffe7 429 uint16_t downstreamFailures=0;
87706a76 430#ifdef MSG_FASTOPEN
284d460c 431 bool freshConn = true;
87706a76 432#endif /* MSG_FASTOPEN */
6ce2da14 433 if(sockets.count(ds->remote) == 0) {
b40cffe7 434 dsock=setupTCPDownstream(ds, downstreamFailures);
5a99e22f 435 sockets[ds->remote]=dsock;
8a5d5053 436 }
284d460c 437 else {
6ce2da14 438 dsock=sockets[ds->remote];
87706a76 439#ifdef MSG_FASTOPEN
284d460c 440 freshConn = false;
87706a76 441#endif /* MSG_FASTOPEN */
284d460c 442 }
6ce2da14 443
8a5d5053 444 ds->queries++;
445 ds->outstanding++;
c594a2f4 446 outstanding = true;
8a5d5053 447
448 retry:;
3f6d07a4
RG
449 if (dsock < 0) {
450 sockets.erase(ds->remote);
451 break;
452 }
453
b40cffe7
RG
454 if (ds->retries > 0 && downstreamFailures > ds->retries) {
455 vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), downstreamFailures);
3f6d07a4 456 close(dsock);
18861f97 457 dsock=-1;
3f6d07a4
RG
458 sockets.erase(ds->remote);
459 break;
460 }
461
8b92100f 462 try {
284d460c
RG
463 int socketFlags = 0;
464#ifdef MSG_FASTOPEN
465 if (ds->tcpFastOpen && freshConn) {
466 socketFlags |= MSG_FASTOPEN;
fbe2a2e0 467 }
284d460c
RG
468#endif /* MSG_FASTOPEN */
469 sendSizeAndMsgWithTimeout(dsock, dq.len, query, ds->tcpSendTimeout, &ds->remote, &ds->sourceAddr, ds->sourceItf, 0, socketFlags);
8b92100f
RG
470 }
471 catch(const runtime_error& e) {
472 vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
473 close(dsock);
65d7f2bb
RG
474 dsock=-1;
475 sockets.erase(ds->remote);
b40cffe7
RG
476 downstreamFailures++;
477 dsock=setupTCPDownstream(ds, downstreamFailures);
5a99e22f 478 sockets[ds->remote]=dsock;
87706a76 479#ifdef MSG_FASTOPEN
284d460c 480 freshConn=true;
87706a76 481#endif /* MSG_FASTOPEN */
8b92100f
RG
482 goto retry;
483 }
484
548c8b66
RG
485 bool xfrStarted = false;
486 bool isXFR = (dq.qtype == QType::AXFR || dq.qtype == QType::IXFR);
487 if (isXFR) {
488 dq.skipCache = true;
489 }
490
491 getpacket:;
492
3f6d07a4 493 if(!getNonBlockingMsgLen(dsock, &rlen, ds->tcpRecvTimeout)) {
18eeccc9 494 vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->getName());
8a5d5053 495 close(dsock);
65d7f2bb
RG
496 dsock=-1;
497 sockets.erase(ds->remote);
b40cffe7
RG
498 downstreamFailures++;
499 dsock=setupTCPDownstream(ds, downstreamFailures);
5a99e22f 500 sockets[ds->remote]=dsock;
87706a76 501#ifdef MSG_FASTOPEN
284d460c 502 freshConn=true;
87706a76 503#endif /* MSG_FASTOPEN */
548c8b66
RG
504 if(xfrStarted) {
505 goto drop;
506 }
8a5d5053 507 goto retry;
508 }
509
fcffc585 510 size_t responseSize = rlen;
0f72fd5c 511 uint16_t addRoom = 0;
11e1e08b 512#ifdef HAVE_DNSCRYPT
0f72fd5c
RG
513 if (dnsCryptQuery && (UINT16_MAX - rlen) > (uint16_t) DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE) {
514 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
11e1e08b
RG
515 }
516#endif
0f72fd5c 517 responseSize += addRoom;
11e1e08b 518 char answerbuffer[responseSize];
3f6d07a4 519 readn2WithTimeout(dsock, answerbuffer, rlen, ds->tcpRecvTimeout);
11e1e08b
RG
520 char* response = answerbuffer;
521 uint16_t responseLen = rlen;
fdb47c9f
RG
522 if (outstanding) {
523 /* might be false for {A,I}XFR */
524 --ds->outstanding;
525 outstanding = false;
526 }
ca404e94 527
f87c4aff
RG
528 if (rlen < sizeof(dnsheader)) {
529 break;
530 }
531
fcffc585 532 if (!responseContentMatches(response, responseLen, qname, qtype, qclass, ds->remote)) {
f87c4aff
RG
533 break;
534 }
535
ff73f02b 536 if (!fixUpResponse(&response, &responseLen, &responseSize, qname, origFlags, ednsAdded, ecsAdded, rewrittenResponse, addRoom)) {
fcffc585 537 break;
ca404e94 538 }
aeb36780 539
ec469dd7 540 dh = (struct dnsheader*) response;
7cea4e39 541 DNSResponse dr(&qname, qtype, qclass, &dest, &ci.remote, dh, responseSize, responseLen, true, &queryRealTime);
d8c19b98
RG
542#ifdef HAVE_PROTOBUF
543 dr.uniqueId = dq.uniqueId;
544#endif
788c3243 545 if (!processResponse(localRespRulactions, dr, &delayMsec)) {
8146444b 546 break;
d8c19b98
RG
547 }
548
e6557ec8 549 if (packetCache && !dq.skipCache) {
2714396e 550 packetCache->insert(cacheKey, qname, qtype, qclass, response, responseLen, true, dh->rcode);
886e2cf2
RG
551 }
552
11e1e08b 553#ifdef HAVE_DNSCRYPT
57847d65 554 if (!encryptResponse(response, &responseLen, responseSize, true, dnsCryptQuery, &dh, &dhCopy)) {
0f72fd5c
RG
555 goto drop;
556 }
11e1e08b 557#endif
0f72fd5c 558 if (!sendResponseToClient(ci.fd, response, responseLen)) {
fcffc585
RG
559 break;
560 }
19d34e95 561
284d460c
RG
562 if (isXFR) {
563 if (dh->rcode == 0 && dh->ancount != 0) {
564 if (xfrStarted == false) {
565 xfrStarted = true;
566 if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 1) {
567 goto getpacket;
568 }
569 }
570 else if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 0) {
548c8b66
RG
571 goto getpacket;
572 }
573 }
284d460c
RG
574 /* Don't reuse the TCP connection after an {A,I}XFR */
575 close(dsock);
576 dsock=-1;
577 sockets.erase(ds->remote);
548c8b66
RG
578 }
579
19d34e95 580 g_stats.responses++;
3fcaeeac 581 struct timespec answertime;
85c7ca75 582 gettime(&answertime);
3fcaeeac 583 unsigned int udiff = 1000000.0*DiffTime(now,answertime);
584 {
585 std::lock_guard<std::mutex> lock(g_rings.respMutex);
b9ebcf13 586 g_rings.respRing.push_back({answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote});
3fcaeeac 587 }
588
ca404e94 589 rewrittenResponse.clear();
8a5d5053 590 }
591 }
592 catch(...){}
d11c4232 593
594 drop:;
8a5d5053 595
3c17e261 596 vinfolog("Closing TCP client connection with %s", ci.remote.toStringWithPort());
f6a81077
RG
597 if (ci.fd >= 0) {
598 close(ci.fd);
599 }
600 ci.fd = -1;
c594a2f4
RG
601 if (ds && outstanding) {
602 outstanding = false;
6ce2da14 603 --ds->outstanding;
c594a2f4 604 }
9396d955 605 decrementTCPClientCount(ci.remote);
840ed663
RG
606
607 if (g_downstreamTCPCleanupInterval > 0 && (connectionStartTime > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
608 cleanupClosedTCPConnections(sockets);
609 lastTCPCleanup = time(nullptr);
610 }
8a5d5053 611 }
612 return 0;
613}
614
8a5d5053 615/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
616 they will hand off to worker threads & spawn more of them if required
617*/
618void* tcpAcceptorThread(void* p)
619{
620 ClientState* cs = (ClientState*) p;
9396d955 621 bool tcpClientCountIncremented = false;
8a5d5053 622 ComboAddress remote;
623 remote.sin4.sin_family = cs->local.sin4.sin_family;
624
a9bf3ec4 625 g_tcpclientthreads->addTCPClientThread();
8a5d5053 626
b719e465 627 auto acl = g_ACL.getLocal();
8a5d5053 628 for(;;) {
8de680ec
RG
629 bool queuedCounterIncremented = false;
630 ConnectionInfo* ci = nullptr;
9396d955 631 tcpClientCountIncremented = false;
8a5d5053 632 try {
0c8d25a7 633 socklen_t remlen = remote.getSocklen();
3c17e261 634 ci = new ConnectionInfo;
19d34e95 635 ci->cs = cs;
3c17e261 636 ci->fd = -1;
0c8d25a7
RG
637#ifdef HAVE_ACCEPT4
638 ci->fd = accept4(cs->tcpFD, (struct sockaddr*)&remote, &remlen, SOCK_NONBLOCK);
639#else
640 ci->fd = accept(cs->tcpFD, (struct sockaddr*)&remote, &remlen);
641#endif
642 if(ci->fd < 0) {
643 throw std::runtime_error((boost::format("accepting new connection on socket: %s") % strerror(errno)).str());
644 }
963bef8d 645
b719e465 646 if(!acl->match(remote)) {
647 g_stats.aclDrops++;
648 close(ci->fd);
649 delete ci;
8de680ec 650 ci=nullptr;
b719e465 651 vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
652 continue;
653 }
8a5d5053 654
0c8d25a7
RG
655#ifndef HAVE_ACCEPT4
656 if (!setNonBlocking(ci->fd)) {
657 close(ci->fd);
658 delete ci;
659 ci=nullptr;
660 continue;
661 }
662#endif
663
ded1985a 664 if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
6c1ca990
RG
665 close(ci->fd);
666 delete ci;
667 ci=nullptr;
668 vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
669 continue;
670 }
671
9396d955
RG
672 if (g_maxTCPConnectionsPerClient) {
673 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
674
675 if (tcpClientsCount[remote] >= g_maxTCPConnectionsPerClient) {
676 close(ci->fd);
677 delete ci;
678 ci=nullptr;
679 vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort());
680 continue;
681 }
682 tcpClientsCount[remote]++;
683 tcpClientCountIncremented = true;
684 }
685
b719e465 686 vinfolog("Got TCP connection from %s", remote.toStringWithPort());
9396d955 687
8a5d5053 688 ci->remote = remote;
a9bf3ec4
RG
689 int pipe = g_tcpclientthreads->getThread();
690 if (pipe >= 0) {
8de680ec 691 queuedCounterIncremented = true;
a9bf3ec4
RG
692 writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
693 }
694 else {
ded1985a 695 g_tcpclientthreads->decrementQueuedCount();
8de680ec 696 queuedCounterIncremented = false;
a9bf3ec4
RG
697 close(ci->fd);
698 delete ci;
18861f97 699 ci=nullptr;
9396d955
RG
700 if(tcpClientCountIncremented) {
701 decrementTCPClientCount(remote);
702 }
a9bf3ec4 703 }
8a5d5053 704 }
3c17e261 705 catch(std::exception& e) {
706 errlog("While reading a TCP question: %s", e.what());
707 if(ci && ci->fd >= 0)
708 close(ci->fd);
9396d955
RG
709 if(tcpClientCountIncremented) {
710 decrementTCPClientCount(remote);
711 }
3c17e261 712 delete ci;
8de680ec
RG
713 ci = nullptr;
714 if (queuedCounterIncremented) {
ded1985a 715 g_tcpclientthreads->decrementQueuedCount();
8de680ec 716 }
3c17e261 717 }
8a5d5053 718 catch(...){}
719 }
720
721 return 0;
722}
6ce2da14 723
724
6885d4bf 725bool getMsgLen32(int fd, uint32_t* len)
6ce2da14 726try
727{
6885d4bf 728 uint32_t raw;
a683e8bd 729 size_t ret = readn2(fd, &raw, sizeof raw);
ca404e94 730 if(ret != sizeof raw)
6ce2da14 731 return false;
6885d4bf 732 *len = ntohl(raw);
733 if(*len > 10000000) // arbitrary 10MB limit
734 return false;
6ce2da14 735 return true;
736}
737catch(...) {
738 return false;
739}
740
6885d4bf 741bool putMsgLen32(int fd, uint32_t len)
6ce2da14 742try
743{
6885d4bf 744 uint32_t raw = htonl(len);
a683e8bd 745 size_t ret = writen2(fd, &raw, sizeof raw);
ca404e94 746 return ret==sizeof raw;
6ce2da14 747}
748catch(...) {
749 return false;
750}