]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdist-tcp.cc
Merge pull request #4692 from cmouse/ssql-unique-ptr
[thirdparty/pdns.git] / pdns / dnsdist-tcp.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #include "dnsdist.hh"
23 #include "dnsdist-ecs.hh"
24 #include "dnsparser.hh"
25 #include "ednsoptions.hh"
26 #include "dolog.hh"
27 #include "lock.hh"
28 #include "gettime.hh"
29 #include <thread>
30 #include <atomic>
31
32 using std::thread;
33 using std::atomic;
34
35 /* TCP: the grand design.
36 We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
37 An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
38 we will not go there.
39
40 In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
41 This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
42 to guarantee performance.
43
44 So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
45 So whenever an answer comes in, we know where it needs to go.
46
47 Let's start naively.
48 */
49
50 static int setupTCPDownstream(shared_ptr<DownstreamState> ds, uint16_t& downstreamFailures)
51 {
52 do {
53 vinfolog("TCP connecting to downstream %s (%d)", ds->remote.toStringWithPort(), downstreamFailures);
54 int sock = SSocket(ds->remote.sin4.sin_family, SOCK_STREAM, 0);
55 try {
56 if (!IsAnyAddress(ds->sourceAddr)) {
57 SSetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
58 #ifdef IP_BIND_ADDRESS_NO_PORT
59 SSetsockopt(sock, SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
60 #endif
61 SBind(sock, ds->sourceAddr);
62 }
63 setNonBlocking(sock);
64 #ifdef MSG_FASTOPEN
65 if (!ds->tcpFastOpen) {
66 SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
67 }
68 #else
69 SConnectWithTimeout(sock, ds->remote, ds->tcpConnectTimeout);
70 #endif /* MSG_FASTOPEN */
71 return sock;
72 }
73 catch(const std::runtime_error& e) {
74 /* don't leak our file descriptor if SConnect() (for example) throws */
75 downstreamFailures++;
76 close(sock);
77 if (downstreamFailures > ds->retries) {
78 throw;
79 }
80 }
81 } while(downstreamFailures <= ds->retries);
82
83 return -1;
84 }
85
86 struct ConnectionInfo
87 {
88 int fd;
89 ComboAddress remote;
90 ClientState* cs;
91 };
92
93 uint64_t g_maxTCPQueuedConnections{1000};
94 size_t g_maxTCPQueriesPerConn{0};
95 size_t g_maxTCPConnectionDuration{0};
96 size_t g_maxTCPConnectionsPerClient{0};
97 static std::mutex tcpClientsCountMutex;
98 static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
99 bool g_useTCPSinglePipe{false};
100 std::atomic<uint16_t> g_downstreamTCPCleanupInterval{60};
101
102 void* tcpClientThread(int pipefd);
103
104 static void decrementTCPClientCount(const ComboAddress& client)
105 {
106 if (g_maxTCPConnectionsPerClient) {
107 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
108 tcpClientsCount[client]--;
109 if (tcpClientsCount[client] == 0) {
110 tcpClientsCount.erase(client);
111 }
112 }
113 }
114
115 void TCPClientCollection::addTCPClientThread()
116 {
117 int pipefds[2] = { -1, -1};
118
119 vinfolog("Adding TCP Client thread");
120
121 if (d_useSinglePipe) {
122 pipefds[0] = d_singlePipe[0];
123 pipefds[1] = d_singlePipe[1];
124 }
125 else {
126 if (pipe(pipefds) < 0) {
127 errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
128 return;
129 }
130
131 if (!setNonBlocking(pipefds[1])) {
132 close(pipefds[0]);
133 close(pipefds[1]);
134 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
135 return;
136 }
137 }
138
139 {
140 std::lock_guard<std::mutex> lock(d_mutex);
141
142 if (d_numthreads >= d_tcpclientthreads.capacity()) {
143 warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
144 if (!d_useSinglePipe) {
145 close(pipefds[0]);
146 close(pipefds[1]);
147 }
148 return;
149 }
150
151 try {
152 thread t1(tcpClientThread, pipefds[0]);
153 t1.detach();
154 }
155 catch(const std::runtime_error& e) {
156 /* the thread creation failed, don't leak */
157 errlog("Error creating a TCP thread: %s", e.what());
158 if (!d_useSinglePipe) {
159 close(pipefds[0]);
160 close(pipefds[1]);
161 }
162 return;
163 }
164
165 d_tcpclientthreads.push_back(pipefds[1]);
166 }
167
168 ++d_numthreads;
169 }
170
171 static bool getNonBlockingMsgLen(int fd, uint16_t* len, int timeout)
172 try
173 {
174 uint16_t raw;
175 size_t ret = readn2WithTimeout(fd, &raw, sizeof raw, timeout);
176 if(ret != sizeof raw)
177 return false;
178 *len = ntohs(raw);
179 return true;
180 }
181 catch(...) {
182 return false;
183 }
184
185 static bool sendResponseToClient(int fd, const char* response, uint16_t responseLen)
186 {
187 return sendSizeAndMsgWithTimeout(fd, responseLen, response, g_tcpSendTimeout, nullptr, nullptr, 0, 0, 0);
188 }
189
190 static bool maxConnectionDurationReached(unsigned int maxConnectionDuration, time_t start, unsigned int& remainingTime)
191 {
192 if (maxConnectionDuration) {
193 time_t elapsed = time(NULL) - start;
194 if (elapsed >= maxConnectionDuration) {
195 return true;
196 }
197 remainingTime = maxConnectionDuration - elapsed;
198 }
199 return false;
200 }
201
202 void cleanupClosedTCPConnections(std::map<ComboAddress,int>& sockets)
203 {
204 for(auto it = sockets.begin(); it != sockets.end(); ) {
205 if (isTCPSocketUsable(it->second)) {
206 ++it;
207 }
208 else {
209 close(it->second);
210 it = sockets.erase(it);
211 }
212 }
213 }
214
215 std::shared_ptr<TCPClientCollection> g_tcpclientthreads;
216
217 void* tcpClientThread(int pipefd)
218 {
219 /* we get launched with a pipe on which we receive file descriptors from clients that we own
220 from that point on */
221
222 bool outstanding = false;
223 time_t lastTCPCleanup = time(nullptr);
224
225
226 auto localPolicy = g_policy.getLocal();
227 auto localRulactions = g_rulactions.getLocal();
228 auto localRespRulactions = g_resprulactions.getLocal();
229 auto localCacheHitRespRulactions = g_cachehitresprulactions.getLocal();
230 auto localDynBlockNMG = g_dynblockNMG.getLocal();
231 auto localDynBlockSMT = g_dynblockSMT.getLocal();
232 auto localPools = g_pools.getLocal();
233 #ifdef HAVE_PROTOBUF
234 boost::uuids::random_generator uuidGenerator;
235 #endif
236 #ifdef HAVE_DNSCRYPT
237 /* when the answer is encrypted in place, we need to get a copy
238 of the original header before encryption to fill the ring buffer */
239 dnsheader dhCopy;
240 #endif
241
242 map<ComboAddress,int> sockets;
243 for(;;) {
244 ConnectionInfo* citmp, ci;
245
246 try {
247 readn2(pipefd, &citmp, sizeof(citmp));
248 }
249 catch(const std::runtime_error& e) {
250 throw std::runtime_error("Error reading from TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode: " + e.what());
251 }
252
253 g_tcpclientthreads->decrementQueuedCount();
254 ci=*citmp;
255 delete citmp;
256
257 uint16_t qlen, rlen;
258 string largerQuery;
259 vector<uint8_t> rewrittenResponse;
260 shared_ptr<DownstreamState> ds;
261 ComboAddress dest;
262 memset(&dest, 0, sizeof(dest));
263 dest.sin4.sin_family = ci.remote.sin4.sin_family;
264 socklen_t len = dest.getSocklen();
265 size_t queriesCount = 0;
266 time_t connectionStartTime = time(NULL);
267
268 if (!setNonBlocking(ci.fd))
269 goto drop;
270
271 if (getsockname(ci.fd, (sockaddr*)&dest, &len)) {
272 dest = ci.cs->local;
273 }
274
275 try {
276 for(;;) {
277 unsigned int remainingTime = 0;
278 ds = nullptr;
279 outstanding = false;
280
281 if(!getNonBlockingMsgLen(ci.fd, &qlen, g_tcpRecvTimeout))
282 break;
283
284 ci.cs->queries++;
285 g_stats.queries++;
286
287 queriesCount++;
288
289 if (g_maxTCPQueriesPerConn && queriesCount > g_maxTCPQueriesPerConn) {
290 vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", ci.remote.toStringWithPort(), queriesCount, g_maxTCPQueriesPerConn);
291 break;
292 }
293
294 if (maxConnectionDurationReached(g_maxTCPConnectionDuration, connectionStartTime, remainingTime)) {
295 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", ci.remote.toStringWithPort());
296 break;
297 }
298
299 if (qlen < sizeof(dnsheader)) {
300 g_stats.nonCompliantQueries++;
301 break;
302 }
303
304 bool ednsAdded = false;
305 bool ecsAdded = false;
306 /* if the query is small, allocate a bit more
307 memory to be able to spoof the content,
308 or to add ECS without allocating a new buffer */
309 size_t querySize = qlen <= 4096 ? qlen + 512 : qlen;
310 char queryBuffer[querySize];
311 const char* query = queryBuffer;
312 readn2WithTimeout(ci.fd, queryBuffer, qlen, g_tcpRecvTimeout, remainingTime);
313
314 #ifdef HAVE_DNSCRYPT
315 std::shared_ptr<DnsCryptQuery> dnsCryptQuery = 0;
316
317 if (ci.cs->dnscryptCtx) {
318 dnsCryptQuery = std::make_shared<DnsCryptQuery>();
319 uint16_t decryptedQueryLen = 0;
320 vector<uint8_t> response;
321 bool decrypted = handleDnsCryptQuery(ci.cs->dnscryptCtx, queryBuffer, qlen, dnsCryptQuery, &decryptedQueryLen, true, response);
322
323 if (!decrypted) {
324 if (response.size() > 0) {
325 sendResponseToClient(ci.fd, reinterpret_cast<char*>(response.data()), (uint16_t) response.size());
326 }
327 break;
328 }
329 qlen = decryptedQueryLen;
330 }
331 #endif
332 struct dnsheader* dh = (struct dnsheader*) query;
333
334 if(dh->qr) { // don't respond to responses
335 g_stats.nonCompliantQueries++;
336 goto drop;
337 }
338
339 if(dh->qdcount == 0) {
340 g_stats.emptyQueries++;
341 goto drop;
342 }
343
344 if (dh->rd) {
345 g_stats.rdQueries++;
346 }
347
348 const uint16_t* flags = getFlagsFromDNSHeader(dh);
349 uint16_t origFlags = *flags;
350 uint16_t qtype, qclass;
351 unsigned int consumed = 0;
352 DNSName qname(query, qlen, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
353 DNSQuestion dq(&qname, qtype, qclass, &dest, &ci.remote, (dnsheader*)query, querySize, qlen, true);
354 #ifdef HAVE_PROTOBUF
355 dq.uniqueId = uuidGenerator();
356 #endif
357
358 string poolname;
359 int delayMsec=0;
360 /* we need this one to be accurate ("real") for the protobuf message */
361 struct timespec queryRealTime;
362 struct timespec now;
363 gettime(&now);
364 gettime(&queryRealTime, true);
365
366 if (!processQuery(localDynBlockNMG, localDynBlockSMT, localRulactions, dq, poolname, &delayMsec, now)) {
367 goto drop;
368 }
369
370 if(dq.dh->qr) { // something turned it into a response
371 restoreFlags(dh, origFlags);
372 #ifdef HAVE_DNSCRYPT
373 if (!encryptResponse(queryBuffer, &dq.len, dq.size, true, dnsCryptQuery, nullptr, nullptr)) {
374 goto drop;
375 }
376 #endif
377 sendResponseToClient(ci.fd, query, dq.len);
378 g_stats.selfAnswered++;
379 goto drop;
380 }
381
382 std::shared_ptr<ServerPool> serverPool = getPool(*localPools, poolname);
383 std::shared_ptr<DNSDistPacketCache> packetCache = nullptr;
384 auto policy = localPolicy->policy;
385 if (serverPool->policy != nullptr) {
386 policy = serverPool->policy->policy;
387 }
388 {
389 std::lock_guard<std::mutex> lock(g_luamutex);
390 ds = policy(serverPool->servers, &dq);
391 packetCache = serverPool->packetCache;
392 }
393
394 if (dq.useECS && ds && ds->useECS) {
395 uint16_t newLen = dq.len;
396 handleEDNSClientSubnet(queryBuffer, dq.size, consumed, &newLen, largerQuery, &ednsAdded, &ecsAdded, ci.remote, dq.ecsOverride, dq.ecsPrefixLength);
397 if (largerQuery.empty() == false) {
398 query = largerQuery.c_str();
399 dq.len = (uint16_t) largerQuery.size();
400 dq.size = largerQuery.size();
401 } else {
402 dq.len = newLen;
403 }
404 }
405
406 uint32_t cacheKey = 0;
407 if (packetCache && !dq.skipCache) {
408 char cachedResponse[4096];
409 uint16_t cachedResponseSize = sizeof cachedResponse;
410 uint32_t allowExpired = ds ? 0 : g_staleCacheEntriesTTL;
411 if (packetCache->get(dq, (uint16_t) consumed, dq.dh->id, cachedResponse, &cachedResponseSize, &cacheKey, allowExpired)) {
412 DNSResponse dr(dq.qname, dq.qtype, dq.qclass, dq.local, dq.remote, (dnsheader*) cachedResponse, sizeof cachedResponse, cachedResponseSize, true, &queryRealTime);
413 #ifdef HAVE_PROTOBUF
414 dr.uniqueId = dq.uniqueId;
415 #endif
416 if (!processResponse(localCacheHitRespRulactions, dr, &delayMsec)) {
417 goto drop;
418 }
419
420 #ifdef HAVE_DNSCRYPT
421 if (!encryptResponse(cachedResponse, &cachedResponseSize, sizeof cachedResponse, true, dnsCryptQuery, nullptr, nullptr)) {
422 goto drop;
423 }
424 #endif
425 sendResponseToClient(ci.fd, cachedResponse, cachedResponseSize);
426 g_stats.cacheHits++;
427 goto drop;
428 }
429 g_stats.cacheMisses++;
430 }
431
432 if(!ds) {
433 g_stats.noPolicy++;
434
435 if (g_servFailOnNoPolicy) {
436 restoreFlags(dh, origFlags);
437 dq.dh->rcode = RCode::ServFail;
438 dq.dh->qr = true;
439
440 #ifdef HAVE_DNSCRYPT
441 if (!encryptResponse(queryBuffer, &dq.len, dq.size, true, dnsCryptQuery, nullptr, nullptr)) {
442 goto drop;
443 }
444 #endif
445 sendResponseToClient(ci.fd, query, dq.len);
446 }
447
448 break;
449 }
450
451 int dsock = -1;
452 uint16_t downstreamFailures=0;
453 #ifdef MSG_FASTOPEN
454 bool freshConn = true;
455 #endif /* MSG_FASTOPEN */
456 if(sockets.count(ds->remote) == 0) {
457 dsock=setupTCPDownstream(ds, downstreamFailures);
458 sockets[ds->remote]=dsock;
459 }
460 else {
461 dsock=sockets[ds->remote];
462 #ifdef MSG_FASTOPEN
463 freshConn = false;
464 #endif /* MSG_FASTOPEN */
465 }
466
467 ds->queries++;
468 ds->outstanding++;
469 outstanding = true;
470
471 retry:;
472 if (dsock < 0) {
473 sockets.erase(ds->remote);
474 break;
475 }
476
477 if (ds->retries > 0 && downstreamFailures > ds->retries) {
478 vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), downstreamFailures);
479 close(dsock);
480 dsock=-1;
481 sockets.erase(ds->remote);
482 break;
483 }
484
485 try {
486 int socketFlags = 0;
487 #ifdef MSG_FASTOPEN
488 if (ds->tcpFastOpen && freshConn) {
489 socketFlags |= MSG_FASTOPEN;
490 }
491 #endif /* MSG_FASTOPEN */
492 sendSizeAndMsgWithTimeout(dsock, dq.len, query, ds->tcpSendTimeout, &ds->remote, &ds->sourceAddr, ds->sourceItf, 0, socketFlags);
493 }
494 catch(const runtime_error& e) {
495 vinfolog("Downstream connection to %s died on us, getting a new one!", ds->getName());
496 close(dsock);
497 dsock=-1;
498 sockets.erase(ds->remote);
499 downstreamFailures++;
500 dsock=setupTCPDownstream(ds, downstreamFailures);
501 sockets[ds->remote]=dsock;
502 #ifdef MSG_FASTOPEN
503 freshConn=true;
504 #endif /* MSG_FASTOPEN */
505 goto retry;
506 }
507
508 bool xfrStarted = false;
509 bool isXFR = (dq.qtype == QType::AXFR || dq.qtype == QType::IXFR);
510 if (isXFR) {
511 dq.skipCache = true;
512 }
513
514 getpacket:;
515
516 if(!getNonBlockingMsgLen(dsock, &rlen, ds->tcpRecvTimeout)) {
517 vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->getName());
518 close(dsock);
519 dsock=-1;
520 sockets.erase(ds->remote);
521 downstreamFailures++;
522 dsock=setupTCPDownstream(ds, downstreamFailures);
523 sockets[ds->remote]=dsock;
524 #ifdef MSG_FASTOPEN
525 freshConn=true;
526 #endif /* MSG_FASTOPEN */
527 if(xfrStarted) {
528 goto drop;
529 }
530 goto retry;
531 }
532
533 size_t responseSize = rlen;
534 uint16_t addRoom = 0;
535 #ifdef HAVE_DNSCRYPT
536 if (dnsCryptQuery && (UINT16_MAX - rlen) > (uint16_t) DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE) {
537 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
538 }
539 #endif
540 responseSize += addRoom;
541 char answerbuffer[responseSize];
542 readn2WithTimeout(dsock, answerbuffer, rlen, ds->tcpRecvTimeout);
543 char* response = answerbuffer;
544 uint16_t responseLen = rlen;
545 if (outstanding) {
546 /* might be false for {A,I}XFR */
547 --ds->outstanding;
548 outstanding = false;
549 }
550
551 if (rlen < sizeof(dnsheader)) {
552 break;
553 }
554
555 if (!responseContentMatches(response, responseLen, qname, qtype, qclass, ds->remote)) {
556 break;
557 }
558
559 if (!fixUpResponse(&response, &responseLen, &responseSize, qname, origFlags, ednsAdded, ecsAdded, rewrittenResponse, addRoom)) {
560 break;
561 }
562
563 dh = (struct dnsheader*) response;
564 DNSResponse dr(&qname, qtype, qclass, &dest, &ci.remote, dh, responseSize, responseLen, true, &queryRealTime);
565 #ifdef HAVE_PROTOBUF
566 dr.uniqueId = dq.uniqueId;
567 #endif
568 if (!processResponse(localRespRulactions, dr, &delayMsec)) {
569 break;
570 }
571
572 if (packetCache && !dq.skipCache) {
573 packetCache->insert(cacheKey, qname, qtype, qclass, response, responseLen, true, dh->rcode);
574 }
575
576 #ifdef HAVE_DNSCRYPT
577 if (!encryptResponse(response, &responseLen, responseSize, true, dnsCryptQuery, &dh, &dhCopy)) {
578 goto drop;
579 }
580 #endif
581 if (!sendResponseToClient(ci.fd, response, responseLen)) {
582 break;
583 }
584
585 if (isXFR) {
586 if (dh->rcode == 0 && dh->ancount != 0) {
587 if (xfrStarted == false) {
588 xfrStarted = true;
589 if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 1) {
590 goto getpacket;
591 }
592 }
593 else if (getRecordsOfTypeCount(response, responseLen, 1, QType::SOA) == 0) {
594 goto getpacket;
595 }
596 }
597 /* Don't reuse the TCP connection after an {A,I}XFR */
598 close(dsock);
599 dsock=-1;
600 sockets.erase(ds->remote);
601 }
602
603 g_stats.responses++;
604 struct timespec answertime;
605 gettime(&answertime);
606 unsigned int udiff = 1000000.0*DiffTime(now,answertime);
607 {
608 std::lock_guard<std::mutex> lock(g_rings.respMutex);
609 g_rings.respRing.push_back({answertime, ci.remote, qname, dq.qtype, (unsigned int)udiff, (unsigned int)responseLen, *dh, ds->remote});
610 }
611
612 largerQuery.clear();
613 rewrittenResponse.clear();
614 }
615 }
616 catch(...){}
617
618 drop:;
619
620 vinfolog("Closing TCP client connection with %s", ci.remote.toStringWithPort());
621 if (ci.fd >= 0) {
622 close(ci.fd);
623 }
624 ci.fd = -1;
625 if (ds && outstanding) {
626 outstanding = false;
627 --ds->outstanding;
628 }
629 decrementTCPClientCount(ci.remote);
630
631 if (g_downstreamTCPCleanupInterval > 0 && (connectionStartTime > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
632 cleanupClosedTCPConnections(sockets);
633 lastTCPCleanup = time(nullptr);
634 }
635 }
636 return 0;
637 }
638
639 /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
640 they will hand off to worker threads & spawn more of them if required
641 */
642 void* tcpAcceptorThread(void* p)
643 {
644 ClientState* cs = (ClientState*) p;
645 bool tcpClientCountIncremented = false;
646 ComboAddress remote;
647 remote.sin4.sin_family = cs->local.sin4.sin_family;
648
649 g_tcpclientthreads->addTCPClientThread();
650
651 auto acl = g_ACL.getLocal();
652 for(;;) {
653 bool queuedCounterIncremented = false;
654 ConnectionInfo* ci = nullptr;
655 tcpClientCountIncremented = false;
656 try {
657 ci = new ConnectionInfo;
658 ci->cs = cs;
659 ci->fd = -1;
660 ci->fd = SAccept(cs->tcpFD, remote);
661
662 if(!acl->match(remote)) {
663 g_stats.aclDrops++;
664 close(ci->fd);
665 delete ci;
666 ci=nullptr;
667 vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
668 continue;
669 }
670
671 if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
672 close(ci->fd);
673 delete ci;
674 ci=nullptr;
675 vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
676 continue;
677 }
678
679 if (g_maxTCPConnectionsPerClient) {
680 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
681
682 if (tcpClientsCount[remote] >= g_maxTCPConnectionsPerClient) {
683 close(ci->fd);
684 delete ci;
685 ci=nullptr;
686 vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort());
687 continue;
688 }
689 tcpClientsCount[remote]++;
690 tcpClientCountIncremented = true;
691 }
692
693 vinfolog("Got TCP connection from %s", remote.toStringWithPort());
694
695 ci->remote = remote;
696 int pipe = g_tcpclientthreads->getThread();
697 if (pipe >= 0) {
698 queuedCounterIncremented = true;
699 writen2WithTimeout(pipe, &ci, sizeof(ci), 0);
700 }
701 else {
702 g_tcpclientthreads->decrementQueuedCount();
703 queuedCounterIncremented = false;
704 close(ci->fd);
705 delete ci;
706 ci=nullptr;
707 if(tcpClientCountIncremented) {
708 decrementTCPClientCount(remote);
709 }
710 }
711 }
712 catch(std::exception& e) {
713 errlog("While reading a TCP question: %s", e.what());
714 if(ci && ci->fd >= 0)
715 close(ci->fd);
716 if(tcpClientCountIncremented) {
717 decrementTCPClientCount(remote);
718 }
719 delete ci;
720 ci = nullptr;
721 if (queuedCounterIncremented) {
722 g_tcpclientthreads->decrementQueuedCount();
723 }
724 }
725 catch(...){}
726 }
727
728 return 0;
729 }
730
731
732 bool getMsgLen32(int fd, uint32_t* len)
733 try
734 {
735 uint32_t raw;
736 size_t ret = readn2(fd, &raw, sizeof raw);
737 if(ret != sizeof raw)
738 return false;
739 *len = ntohl(raw);
740 if(*len > 10000000) // arbitrary 10MB limit
741 return false;
742 return true;
743 }
744 catch(...) {
745 return false;
746 }
747
748 bool putMsgLen32(int fd, uint32_t len)
749 try
750 {
751 uint32_t raw = htonl(len);
752 size_t ret = writen2(fd, &raw, sizeof raw);
753 return ret==sizeof raw;
754 }
755 catch(...) {
756 return false;
757 }