2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
23 #include "dnsdist-ecs.hh"
24 #include "dnsdist-rings.hh"
25 #include "dnsdist-xpf.hh"
27 #include "dnsparser.hh"
28 #include "ednsoptions.hh"
32 #include "tcpiohandler.hh"
33 #include "threadname.hh"
36 #include <netinet/tcp.h>
43 /* TCP: the grand design.
44 We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
45 An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
48 In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
49 This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
50 to guarantee performance.
52 So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
53 So whenever an answer comes in, we know where it needs to go.
58 static std::mutex tcpClientsCountMutex
;
59 static std::map
<ComboAddress
,size_t,ComboAddress::addressOnlyLessThan
> tcpClientsCount
;
60 static const size_t g_maxCachedConnectionsPerDownstream
= 20;
61 uint64_t g_maxTCPQueuedConnections
{1000};
62 size_t g_maxTCPQueriesPerConn
{0};
63 size_t g_maxTCPConnectionDuration
{0};
64 size_t g_maxTCPConnectionsPerClient
{0};
65 uint16_t g_downstreamTCPCleanupInterval
{60};
66 bool g_useTCPSinglePipe
{false};
68 static std::unique_ptr
<Socket
> setupTCPDownstream(shared_ptr
<DownstreamState
>& ds
, uint16_t& downstreamFailures
)
70 std::unique_ptr
<Socket
> result
;
73 vinfolog("TCP connecting to downstream %s (%d)", ds
->remote
.toStringWithPort(), downstreamFailures
);
74 result
= std::unique_ptr
<Socket
>(new Socket(ds
->remote
.sin4
.sin_family
, SOCK_STREAM
, 0));
76 if (!IsAnyAddress(ds
->sourceAddr
)) {
77 SSetsockopt(result
->getHandle(), SOL_SOCKET
, SO_REUSEADDR
, 1);
78 #ifdef IP_BIND_ADDRESS_NO_PORT
79 if (ds
->ipBindAddrNoPort
) {
80 SSetsockopt(result
->getHandle(), SOL_IP
, IP_BIND_ADDRESS_NO_PORT
, 1);
83 result
->bind(ds
->sourceAddr
, false);
85 result
->setNonBlocking();
87 if (!ds
->tcpFastOpen
) {
88 SConnectWithTimeout(result
->getHandle(), ds
->remote
, /* no timeout, we will handle it ourselves */ 0);
91 SConnectWithTimeout(result
->getHandle(), ds
->remote
, /* no timeout, we will handle it ourselves */ 0);
92 #endif /* MSG_FASTOPEN */
95 catch(const std::runtime_error
& e
) {
96 vinfolog("Connection to downstream server %s failed: %s", ds
->getName(), e
.what());
98 if (downstreamFailures
> ds
->retries
) {
102 } while(downstreamFailures
<= ds
->retries
);
107 class TCPConnectionToBackend
110 TCPConnectionToBackend(std::shared_ptr
<DownstreamState
>& ds
, uint16_t& downstreamFailures
, const struct timeval
& now
): d_ds(ds
), d_connectionStartTime(now
)
112 d_socket
= setupTCPDownstream(d_ds
, downstreamFailures
);
113 ++d_ds
->tcpCurrentConnections
;
116 ~TCPConnectionToBackend()
118 if (d_ds
&& d_socket
) {
119 --d_ds
->tcpCurrentConnections
;
121 gettimeofday(&now
, nullptr);
123 auto diff
= now
- d_connectionStartTime
;
124 d_ds
->updateTCPMetrics(d_queries
, diff
.tv_sec
* 1000 + diff
.tv_usec
/ 1000);
128 int getHandle() const
131 throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection");
134 return d_socket
->getHandle();
137 const ComboAddress
& getRemote() const
158 std::unique_ptr
<Socket
> d_socket
{nullptr};
159 std::shared_ptr
<DownstreamState
> d_ds
{nullptr};
160 struct timeval d_connectionStartTime
;
161 uint64_t d_queries
{0};
165 static thread_local map
<ComboAddress
, std::deque
<std::unique_ptr
<TCPConnectionToBackend
>>> t_downstreamConnections
;
167 static std::unique_ptr
<TCPConnectionToBackend
> getConnectionToDownstream(std::shared_ptr
<DownstreamState
>& ds
, uint16_t& downstreamFailures
, const struct timeval
& now
)
169 std::unique_ptr
<TCPConnectionToBackend
> result
;
171 const auto& it
= t_downstreamConnections
.find(ds
->remote
);
172 if (it
!= t_downstreamConnections
.end()) {
173 auto& list
= it
->second
;
175 result
= std::move(list
.front());
182 return std::unique_ptr
<TCPConnectionToBackend
>(new TCPConnectionToBackend(ds
, downstreamFailures
, now
));
185 static void releaseDownstreamConnection(std::unique_ptr
<TCPConnectionToBackend
>&& conn
)
187 if (conn
== nullptr) {
191 const auto& remote
= conn
->getRemote();
192 const auto& it
= t_downstreamConnections
.find(remote
);
193 if (it
!= t_downstreamConnections
.end()) {
194 auto& list
= it
->second
;
195 if (list
.size() >= g_maxCachedConnectionsPerDownstream
) {
196 /* too many connections queued already */
200 list
.push_back(std::move(conn
));
203 t_downstreamConnections
[remote
].push_back(std::move(conn
));
207 struct ConnectionInfo
209 ConnectionInfo(ClientState
* cs_
): cs(cs_
), fd(-1)
212 ConnectionInfo(ConnectionInfo
&& rhs
): remote(rhs
.remote
), cs(rhs
.cs
), fd(rhs
.fd
)
218 ConnectionInfo(const ConnectionInfo
& rhs
) = delete;
219 ConnectionInfo
& operator=(const ConnectionInfo
& rhs
) = delete;
221 ConnectionInfo
& operator=(ConnectionInfo
&& rhs
)
238 --cs
->tcpCurrentConnections
;
243 ClientState
* cs
{nullptr};
247 void tcpClientThread(int pipefd
);
249 static void decrementTCPClientCount(const ComboAddress
& client
)
251 if (g_maxTCPConnectionsPerClient
) {
252 std::lock_guard
<std::mutex
> lock(tcpClientsCountMutex
);
253 tcpClientsCount
[client
]--;
254 if (tcpClientsCount
[client
] == 0) {
255 tcpClientsCount
.erase(client
);
260 void TCPClientCollection::addTCPClientThread()
262 int pipefds
[2] = { -1, -1};
264 vinfolog("Adding TCP Client thread");
266 if (d_useSinglePipe
) {
267 pipefds
[0] = d_singlePipe
[0];
268 pipefds
[1] = d_singlePipe
[1];
271 if (pipe(pipefds
) < 0) {
272 errlog("Error creating the TCP thread communication pipe: %s", strerror(errno
));
276 if (!setNonBlocking(pipefds
[0])) {
279 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno
));
283 if (!setNonBlocking(pipefds
[1])) {
286 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno
));
292 std::lock_guard
<std::mutex
> lock(d_mutex
);
294 if (d_numthreads
>= d_tcpclientthreads
.capacity()) {
295 warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads
.load(), d_tcpclientthreads
.capacity());
296 if (!d_useSinglePipe
) {
304 thread
t1(tcpClientThread
, pipefds
[0]);
307 catch(const std::runtime_error
& e
) {
308 /* the thread creation failed, don't leak */
309 errlog("Error creating a TCP thread: %s", e
.what());
310 if (!d_useSinglePipe
) {
317 d_tcpclientthreads
.push_back(pipefds
[1]);
323 static void cleanupClosedTCPConnections()
325 for(auto dsIt
= t_downstreamConnections
.begin(); dsIt
!= t_downstreamConnections
.end(); ) {
326 for (auto connIt
= dsIt
->second
.begin(); connIt
!= dsIt
->second
.end(); ) {
327 if (*connIt
&& isTCPSocketUsable((*connIt
)->getHandle())) {
331 connIt
= dsIt
->second
.erase(connIt
);
335 if (!dsIt
->second
.empty()) {
339 dsIt
= t_downstreamConnections
.erase(dsIt
);
344 /* Tries to read exactly toRead bytes into the buffer, starting at position pos.
345 Updates pos everytime a successful read occurs,
346 throws an std::runtime_error in case of IO error,
347 return Done when toRead bytes have been read, needRead or needWrite if the IO operation
350 // XXX could probably be implemented as a TCPIOHandler
351 IOState
tryRead(int fd
, std::vector
<uint8_t>& buffer
, size_t& pos
, size_t toRead
)
353 if (buffer
.size() < (pos
+ toRead
)) {
354 throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer
.size()) + ") for a read of " + std::to_string(toRead
) + " bytes starting at " + std::to_string(pos
));
359 ssize_t res
= ::read(fd
, reinterpret_cast<char*>(&buffer
.at(pos
)), toRead
- got
);
361 throw runtime_error("EOF while reading message");
364 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
365 return IOState::NeedRead
;
368 throw std::runtime_error(std::string("Error while reading message: ") + strerror(errno
));
372 pos
+= static_cast<size_t>(res
);
373 got
+= static_cast<size_t>(res
);
375 while (got
< toRead
);
377 return IOState::Done
;
380 std::unique_ptr
<TCPClientCollection
> g_tcpclientthreads
;
382 class TCPClientThreadData
385 TCPClientThreadData(): localRespRulactions(g_resprulactions
.getLocal()), mplexer(std::unique_ptr
<FDMultiplexer
>(FDMultiplexer::getMultiplexerSilent()))
389 LocalHolders holders
;
390 LocalStateHolder
<vector
<DNSDistResponseRuleAction
> > localRespRulactions
;
391 std::unique_ptr
<FDMultiplexer
> mplexer
{nullptr};
394 static void handleDownstreamIOCallback(int fd
, FDMultiplexer::funcparam_t
& param
);
396 class IncomingTCPConnectionState
399 IncomingTCPConnectionState(ConnectionInfo
&& ci
, TCPClientThreadData
& threadData
, const struct timeval
& now
): d_buffer(4096), d_responseBuffer(4096), d_threadData(threadData
), d_ci(std::move(ci
)), d_handler(d_ci
.fd
, g_tcpRecvTimeout
, d_ci
.cs
->tlsFrontend
? d_ci
.cs
->tlsFrontend
->getContext() : nullptr, now
.tv_sec
), d_connectionStartTime(now
)
401 d_ids
.origDest
.reset();
402 d_ids
.origDest
.sin4
.sin_family
= d_ci
.remote
.sin4
.sin_family
;
403 socklen_t socklen
= d_ids
.origDest
.getSocklen();
404 if (getsockname(d_ci
.fd
, reinterpret_cast<sockaddr
*>(&d_ids
.origDest
), &socklen
)) {
405 d_ids
.origDest
= d_ci
.cs
->local
;
409 IncomingTCPConnectionState(const IncomingTCPConnectionState
& rhs
) = delete;
410 IncomingTCPConnectionState
& operator=(const IncomingTCPConnectionState
& rhs
) = delete;
412 ~IncomingTCPConnectionState()
414 decrementTCPClientCount(d_ci
.remote
);
415 if (d_ci
.cs
!= nullptr) {
417 gettimeofday(&now
, nullptr);
419 auto diff
= now
- d_connectionStartTime
;
420 d_ci
.cs
->updateTCPMetrics(d_queriesCount
, diff
.tv_sec
* 1000.0 + diff
.tv_usec
/ 1000.0);
423 if (d_ds
!= nullptr) {
426 d_outstanding
= false;
429 if (d_downstreamConnection
) {
431 if (d_lastIOState
== IOState::NeedRead
) {
432 cerr
<<__func__
<<": removing leftover backend read FD "<<d_downstreamConnection
->getHandle()<<endl
;
433 d_threadData
.mplexer
->removeReadFD(d_downstreamConnection
->getHandle());
435 else if (d_lastIOState
== IOState::NeedWrite
) {
436 cerr
<<__func__
<<": removing leftover backend write FD "<<d_downstreamConnection
->getHandle()<<endl
;
437 d_threadData
.mplexer
->removeWriteFD(d_downstreamConnection
->getHandle());
440 catch(const FDMultiplexerException
& e
) {
441 vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds
->getName(), e
.what());
443 catch(const std::runtime_error
& e
) {
444 /* might be thrown by getHandle() */
445 vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds
->getName(), e
.what());
451 if (d_lastIOState
== IOState::NeedRead
) {
452 cerr
<<__func__
<<": removing leftover client read FD "<<d_ci
.fd
<<endl
;
453 d_threadData
.mplexer
->removeReadFD(d_ci
.fd
);
455 else if (d_lastIOState
== IOState::NeedWrite
) {
456 cerr
<<__func__
<<": removing leftover client write FD "<<d_ci
.fd
<<endl
;
457 d_threadData
.mplexer
->removeWriteFD(d_ci
.fd
);
460 catch(const FDMultiplexerException
& e
) {
461 vinfolog("Got an exception when trying to remove a pending IO operation on an incoming TCP connection from %s: %s", d_ci
.remote
.toStringWithPort(), e
.what());
465 void resetForNewQuery()
467 d_buffer
.resize(sizeof(uint16_t));
471 d_downstreamFailures
= 0;
472 d_state
= State::readingQuerySize
;
473 d_lastIOState
= IOState::Done
;
476 boost::optional
<struct timeval
> getClientReadTTD(struct timeval now
) const
478 if (g_maxTCPConnectionDuration
== 0 && g_tcpRecvTimeout
== 0) {
482 if (g_maxTCPConnectionDuration
> 0) {
483 auto elapsed
= now
.tv_sec
- d_connectionStartTime
.tv_sec
;
484 if (elapsed
< 0 || (static_cast<size_t>(elapsed
) >= g_maxTCPConnectionDuration
)) {
487 auto remaining
= g_maxTCPConnectionDuration
- elapsed
;
488 if (g_tcpRecvTimeout
== 0 || remaining
<= static_cast<size_t>(g_tcpRecvTimeout
)) {
489 now
.tv_sec
+= remaining
;
494 now
.tv_sec
+= g_tcpRecvTimeout
;
498 boost::optional
<struct timeval
> getBackendReadTTD(const struct timeval
& now
) const
500 if (d_ds
== nullptr) {
501 throw std::runtime_error("getBackendReadTTD() without any backend selected");
503 if (d_ds
->tcpRecvTimeout
== 0) {
507 struct timeval res
= now
;
508 res
.tv_sec
+= d_ds
->tcpRecvTimeout
;
513 boost::optional
<struct timeval
> getClientWriteTTD(const struct timeval
& now
) const
515 if (g_maxTCPConnectionDuration
== 0 && g_tcpSendTimeout
== 0) {
519 struct timeval res
= now
;
521 if (g_maxTCPConnectionDuration
> 0) {
522 auto elapsed
= res
.tv_sec
- d_connectionStartTime
.tv_sec
;
523 if (elapsed
< 0 || static_cast<size_t>(elapsed
) >= g_maxTCPConnectionDuration
) {
526 auto remaining
= g_maxTCPConnectionDuration
- elapsed
;
527 if (g_tcpSendTimeout
== 0 || remaining
<= static_cast<size_t>(g_tcpSendTimeout
)) {
528 res
.tv_sec
+= remaining
;
533 res
.tv_sec
+= g_tcpSendTimeout
;
537 boost::optional
<struct timeval
> getBackendWriteTTD(const struct timeval
& now
) const
539 if (d_ds
== nullptr) {
540 throw std::runtime_error("getBackendReadTTD() called without any backend selected");
542 if (d_ds
->tcpSendTimeout
== 0) {
546 struct timeval res
= now
;
547 res
.tv_sec
+= d_ds
->tcpSendTimeout
;
552 bool maxConnectionDurationReached(unsigned int maxConnectionDuration
, const struct timeval
& now
)
554 if (maxConnectionDuration
) {
555 time_t curtime
= now
.tv_sec
;
556 unsigned int elapsed
= 0;
557 if (curtime
> d_connectionStartTime
.tv_sec
) { // To prevent issues when time goes backward
558 elapsed
= curtime
- d_connectionStartTime
.tv_sec
;
560 if (elapsed
>= maxConnectionDuration
) {
563 d_remainingTime
= maxConnectionDuration
- elapsed
;
571 static std::mutex s_mutex
;
574 gettimeofday(&now
, 0);
577 std::lock_guard
<std::mutex
> lock(s_mutex
);
578 fprintf(stderr
, "State is %p\n", this);
579 cerr
<< "Current state is " << static_cast<int>(d_state
) << ", got "<<d_queriesCount
<<" queries so far" << endl
;
580 cerr
<< "Current time is " << now
.tv_sec
<< " - " << now
.tv_usec
<< endl
;
581 cerr
<< "Connection started at " << d_connectionStartTime
.tv_sec
<< " - " << d_connectionStartTime
.tv_usec
<< endl
;
582 if (d_state
> State::doingHandshake
) {
583 cerr
<< "Handshake done at " << d_handshakeDoneTime
.tv_sec
<< " - " << d_handshakeDoneTime
.tv_usec
<< endl
;
585 if (d_state
> State::readingQuerySize
) {
586 cerr
<< "Got first query size at " << d_firstQuerySizeReadTime
.tv_sec
<< " - " << d_firstQuerySizeReadTime
.tv_usec
<< endl
;
588 if (d_state
> State::readingQuerySize
) {
589 cerr
<< "Got query size at " << d_querySizeReadTime
.tv_sec
<< " - " << d_querySizeReadTime
.tv_usec
<< endl
;
591 if (d_state
> State::readingQuery
) {
592 cerr
<< "Got query at " << d_queryReadTime
.tv_sec
<< " - " << d_queryReadTime
.tv_usec
<< endl
;
594 if (d_state
> State::sendingQueryToBackend
) {
595 cerr
<< "Sent query at " << d_querySentTime
.tv_sec
<< " - " << d_querySentTime
.tv_usec
<< endl
;
597 if (d_state
> State::readingResponseFromBackend
) {
598 cerr
<< "Got response at " << d_responseReadTime
.tv_sec
<< " - " << d_responseReadTime
.tv_usec
<< endl
;
603 enum class State
{ doingHandshake
, readingQuerySize
, readingQuery
, sendingQueryToBackend
, readingResponseSizeFromBackend
, readingResponseFromBackend
, sendingResponse
};
605 std::vector
<uint8_t> d_buffer
;
606 std::vector
<uint8_t> d_responseBuffer
;
607 TCPClientThreadData
& d_threadData
;
610 TCPIOHandler d_handler
;
611 std::unique_ptr
<TCPConnectionToBackend
> d_downstreamConnection
{nullptr};
612 std::shared_ptr
<DownstreamState
> d_ds
{nullptr};
613 struct timeval d_connectionStartTime
;
614 struct timeval d_handshakeDoneTime
;
615 struct timeval d_firstQuerySizeReadTime
;
616 struct timeval d_querySizeReadTime
;
617 struct timeval d_queryReadTime
;
618 struct timeval d_querySentTime
;
619 struct timeval d_responseReadTime
;
620 size_t d_currentPos
{0};
621 size_t d_queriesCount
{0};
622 unsigned int d_remainingTime
{0};
623 uint16_t d_querySize
{0};
624 uint16_t d_responseSize
{0};
625 uint16_t d_downstreamFailures
{0};
626 State d_state
{State::doingHandshake
};
627 IOState d_lastIOState
{IOState::Done
};
628 bool d_readingFirstQuery
{true};
629 bool d_outstanding
{false};
630 bool d_firstResponsePacket
{true};
632 bool d_xfrStarted
{false};
635 static void handleIOCallback(int fd
, FDMultiplexer::funcparam_t
& param
);
636 static void handleNewIOState(std::shared_ptr
<IncomingTCPConnectionState
>& state
, IOState iostate
, const int fd
, FDMultiplexer::callbackfunc_t callback
, boost::optional
<struct timeval
> ttd
=boost::none
);
637 static void handleIO(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
);
638 static void handleDownstreamIO(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
);
640 static void handleResponseSent(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
642 handleNewIOState(state
, IOState::Done
, state
->d_ci
.fd
, handleIOCallback
);
644 if (state
->d_isXFR
&& state
->d_downstreamConnection
) {
645 /* we need to resume reading from the backend! */
646 state
->d_state
= IncomingTCPConnectionState::State::readingResponseSizeFromBackend
;
647 state
->d_currentPos
= 0;
648 handleDownstreamIO(state
, now
);
652 if (g_maxTCPQueriesPerConn
&& state
->d_queriesCount
> g_maxTCPQueriesPerConn
) {
653 vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state
->d_ci
.remote
.toStringWithPort(), state
->d_queriesCount
, g_maxTCPQueriesPerConn
);
657 if (state
->maxConnectionDurationReached(g_maxTCPConnectionDuration
, now
)) {
658 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state
->d_ci
.remote
.toStringWithPort());
662 state
->resetForNewQuery();
664 handleIO(state
, now
);
667 static void sendResponse(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
669 state
->d_state
= IncomingTCPConnectionState::State::sendingResponse
;
670 const uint8_t sizeBytes
[] = { static_cast<uint8_t>(state
->d_responseSize
/ 256), static_cast<uint8_t>(state
->d_responseSize
% 256) };
671 /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
672 that could occur if we had to deal with the size during the processing,
673 especially alignment issues */
674 state
->d_responseBuffer
.insert(state
->d_responseBuffer
.begin(), sizeBytes
, sizeBytes
+ 2);
676 state
->d_currentPos
= 0;
678 handleIO(state
, now
);
681 static void handleResponse(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
683 if (state
->d_responseSize
< sizeof(dnsheader
)) {
687 auto response
= reinterpret_cast<char*>(&state
->d_responseBuffer
.at(0));
688 unsigned int consumed
;
689 if (state
->d_firstResponsePacket
&& !responseContentMatches(response
, state
->d_responseSize
, state
->d_ids
.qname
, state
->d_ids
.qtype
, state
->d_ids
.qclass
, state
->d_ds
->remote
, consumed
)) {
692 state
->d_firstResponsePacket
= false;
694 if (state
->d_outstanding
) {
695 --state
->d_ds
->outstanding
;
696 state
->d_outstanding
= false;
699 auto dh
= reinterpret_cast<struct dnsheader
*>(response
);
700 uint16_t addRoom
= 0;
701 DNSResponse dr
= makeDNSResponseFromIDState(state
->d_ids
, dh
, state
->d_responseBuffer
.size(), state
->d_responseSize
, true);
702 if (dr
.dnsCryptQuery
) {
703 addRoom
= DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
;
706 dnsheader cleartextDH
;
707 memcpy(&cleartextDH
, dr
.dh
, sizeof(cleartextDH
));
709 std::vector
<uint8_t> rewrittenResponse
;
710 size_t responseSize
= state
->d_responseBuffer
.size();
711 if (!processResponse(&response
, &state
->d_responseSize
, &responseSize
, state
->d_threadData
.localRespRulactions
, dr
, addRoom
, rewrittenResponse
, false)) {
715 if (!rewrittenResponse
.empty()) {
716 /* responseSize has been updated as well but we don't really care since it will match
717 the capacity of rewrittenResponse anyway */
718 state
->d_responseBuffer
= std::move(rewrittenResponse
);
719 state
->d_responseSize
= state
->d_responseBuffer
.size();
721 /* the size might have been updated (shrinked) if we removed the whole OPT RR, for example) */
722 state
->d_responseBuffer
.resize(state
->d_responseSize
);
725 if (state
->d_isXFR
&& !state
->d_xfrStarted
) {
726 /* don't bother parsing the content of the response for now */
727 state
->d_xfrStarted
= true;
730 sendResponse(state
, now
);
733 struct timespec answertime
;
734 gettime(&answertime
);
735 double udiff
= state
->d_ids
.sentTime
.udiff();
736 g_rings
.insertResponse(answertime
, state
->d_ci
.remote
, *dr
.qname
, dr
.qtype
, static_cast<unsigned int>(udiff
), static_cast<unsigned int>(state
->d_responseBuffer
.size()), cleartextDH
, state
->d_ds
->remote
);
739 static void sendQueryToBackend(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
741 auto ds
= state
->d_ds
;
742 state
->d_state
= IncomingTCPConnectionState::State::sendingQueryToBackend
;
743 state
->d_currentPos
= 0;
744 state
->d_firstResponsePacket
= true;
745 state
->d_downstreamConnection
.reset();
747 if (state
->d_xfrStarted
) {
748 /* sorry, but we are not going to resume a XFR if we have already sent some packets
753 while (state
->d_downstreamFailures
< state
->d_ds
->retries
)
755 state
->d_downstreamConnection
= getConnectionToDownstream(ds
, state
->d_downstreamFailures
, now
);
757 if (!state
->d_downstreamConnection
) {
759 ++state
->d_ci
.cs
->tcpGaveUp
;
760 vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds
->getName(), state
->d_downstreamFailures
);
764 handleDownstreamIO(state
, now
);
769 ++state
->d_ci
.cs
->tcpGaveUp
;
770 vinfolog("Downstream connection to %s failed %u times in a row, giving up.", ds
->getName(), state
->d_downstreamFailures
);
773 static void handleQuery(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
775 if (state
->d_querySize
< sizeof(dnsheader
)) {
776 ++g_stats
.nonCompliantQueries
;
780 state
->d_readingFirstQuery
= false;
781 ++state
->d_queriesCount
;
782 ++state
->d_ci
.cs
->queries
;
785 /* we need an accurate ("real") value for the response and
786 to store into the IDS, but not for insertion into the
788 struct timespec queryRealTime
;
789 gettime(&queryRealTime
, true);
791 auto query
= reinterpret_cast<char*>(&state
->d_buffer
.at(0));
792 std::shared_ptr
<DNSCryptQuery
> dnsCryptQuery
{nullptr};
793 auto dnsCryptResponse
= checkDNSCryptQuery(*state
->d_ci
.cs
, query
, state
->d_querySize
, dnsCryptQuery
, queryRealTime
.tv_sec
, true);
794 if (dnsCryptResponse
) {
795 state
->d_responseBuffer
= std::move(*dnsCryptResponse
);
796 state
->d_responseSize
= state
->d_responseBuffer
.size();
797 sendResponse(state
, now
);
801 const auto& dh
= reinterpret_cast<dnsheader
*>(query
);
802 if (!checkQueryHeaders(dh
)) {
806 uint16_t qtype
, qclass
;
807 unsigned int consumed
= 0;
808 DNSName
qname(query
, state
->d_querySize
, sizeof(dnsheader
), false, &qtype
, &qclass
, &consumed
);
809 DNSQuestion
dq(&qname
, qtype
, qclass
, consumed
, &state
->d_ids
.origDest
, &state
->d_ci
.remote
, reinterpret_cast<dnsheader
*>(query
), state
->d_buffer
.size(), state
->d_querySize
, true, &queryRealTime
);
810 dq
.dnsCryptQuery
= std::move(dnsCryptQuery
);
812 state
->d_isXFR
= (dq
.qtype
== QType::AXFR
|| dq
.qtype
== QType::IXFR
);
813 if (state
->d_isXFR
) {
818 auto result
= processQuery(dq
, *state
->d_ci
.cs
, state
->d_threadData
.holders
, state
->d_ds
);
820 if (result
== ProcessQueryResult::Drop
) {
824 if (result
== ProcessQueryResult::SendAnswer
) {
825 state
->d_buffer
.resize(dq
.len
);
826 state
->d_responseBuffer
= std::move(state
->d_buffer
);
827 state
->d_responseSize
= state
->d_responseBuffer
.size();
828 sendResponse(state
, now
);
832 if (result
!= ProcessQueryResult::PassToBackend
|| state
->d_ds
== nullptr) {
836 state
->d_buffer
.resize(dq
.len
);
837 setIDStateFromDNSQuestion(state
->d_ids
, dq
, std::move(qname
));
839 const uint8_t sizeBytes
[] = { static_cast<uint8_t>(dq
.len
/ 256), static_cast<uint8_t>(dq
.len
% 256) };
840 /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
841 that could occur if we had to deal with the size during the processing,
842 especially alignment issues */
843 state
->d_buffer
.insert(state
->d_buffer
.begin(), sizeBytes
, sizeBytes
+ 2);
844 sendQueryToBackend(state
, now
);
847 static void handleNewIOState(std::shared_ptr
<IncomingTCPConnectionState
>& state
, IOState iostate
, const int fd
, FDMultiplexer::callbackfunc_t callback
, boost::optional
<struct timeval
> ttd
)
849 //cerr<<"in "<<__func__<<" for fd "<<fd<<", last state was "<<(int)state->d_lastIOState<<", new state is "<<(int)iostate<<endl;
851 if (state
->d_lastIOState
== IOState::NeedRead
&& iostate
!= IOState::NeedRead
) {
852 state
->d_threadData
.mplexer
->removeReadFD(fd
);
853 //cerr<<__func__<<": remove read FD "<<fd<<endl;
854 state
->d_lastIOState
= IOState::Done
;
856 else if (state
->d_lastIOState
== IOState::NeedWrite
&& iostate
!= IOState::NeedWrite
) {
857 state
->d_threadData
.mplexer
->removeWriteFD(fd
);
858 //cerr<<__func__<<": remove write FD "<<fd<<endl;
859 state
->d_lastIOState
= IOState::Done
;
862 if (iostate
== IOState::NeedRead
) {
863 if (state
->d_lastIOState
== IOState::NeedRead
) {
865 /* let's update the TTD ! */
866 state
->d_threadData
.mplexer
->setReadTTD(fd
, *ttd
, /* we pass 0 here because we already have a TTD */0);
871 state
->d_lastIOState
= IOState::NeedRead
;
872 //cerr<<__func__<<": add read FD "<<fd<<endl;
873 state
->d_threadData
.mplexer
->addReadFD(fd
, callback
, state
, ttd
? &*ttd
: nullptr);
875 else if (iostate
== IOState::NeedWrite
) {
876 if (state
->d_lastIOState
== IOState::NeedWrite
) {
880 state
->d_lastIOState
= IOState::NeedWrite
;
881 //cerr<<__func__<<": add write FD "<<fd<<endl;
882 state
->d_threadData
.mplexer
->addWriteFD(fd
, callback
, state
, ttd
? &*ttd
: nullptr);
884 else if (iostate
== IOState::Done
) {
885 state
->d_lastIOState
= IOState::Done
;
889 static void handleDownstreamIO(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
891 if (state
->d_downstreamConnection
== nullptr) {
892 throw std::runtime_error("No downstream socket in " + std::string(__func__
) + "!");
895 int fd
= state
->d_downstreamConnection
->getHandle();
896 IOState iostate
= IOState::Done
;
897 bool connectionDied
= false;
900 if (state
->d_state
== IncomingTCPConnectionState::State::sendingQueryToBackend
) {
903 if (state
->d_ds
->tcpFastOpen
&& state
->d_downstreamConnection
->isFresh()) {
904 socketFlags
|= MSG_FASTOPEN
;
906 #endif /* MSG_FASTOPEN */
908 size_t sent
= sendMsgWithOptions(fd
, reinterpret_cast<const char *>(&state
->d_buffer
.at(state
->d_currentPos
)), state
->d_buffer
.size() - state
->d_currentPos
, &state
->d_ds
->remote
, &state
->d_ds
->sourceAddr
, state
->d_ds
->sourceItf
, socketFlags
);
909 if (sent
== state
->d_buffer
.size()) {
911 state
->d_downstreamConnection
->incQueries();
912 state
->d_state
= IncomingTCPConnectionState::State::readingResponseSizeFromBackend
;
913 state
->d_currentPos
= 0;
914 state
->d_querySentTime
= now
;
915 iostate
= IOState::NeedRead
;
916 if (!state
->d_isXFR
) {
917 /* don't bother with the outstanding count for XFR queries */
918 ++state
->d_ds
->outstanding
;
919 state
->d_outstanding
= true;
923 state
->d_currentPos
+= sent
;
924 iostate
= IOState::NeedWrite
;
925 /* disable fast open on partial write */
926 state
->d_downstreamConnection
->setReused();
930 if (state
->d_state
== IncomingTCPConnectionState::State::readingResponseSizeFromBackend
) {
931 // then we need to allocate a new buffer (new because we might need to re-send the query if the
932 // backend dies on us
933 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
934 // should very likely be a TCPIOHandler d_downstreamHandler
935 iostate
= tryRead(fd
, state
->d_responseBuffer
, state
->d_currentPos
, sizeof(uint16_t) - state
->d_currentPos
);
936 if (iostate
== IOState::Done
) {
937 state
->d_state
= IncomingTCPConnectionState::State::readingResponseFromBackend
;
938 state
->d_responseSize
= state
->d_responseBuffer
.at(0) * 256 + state
->d_responseBuffer
.at(1);
939 state
->d_responseBuffer
.resize((state
->d_ids
.dnsCryptQuery
&& (UINT16_MAX
- state
->d_responseSize
) > static_cast<uint16_t>(DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
)) ? state
->d_responseSize
+ DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE
: state
->d_responseSize
);
940 state
->d_currentPos
= 0;
944 if (state
->d_state
== IncomingTCPConnectionState::State::readingResponseFromBackend
) {
945 iostate
= tryRead(fd
, state
->d_responseBuffer
, state
->d_currentPos
, state
->d_responseSize
- state
->d_currentPos
);
946 if (iostate
== IOState::Done
) {
947 handleNewIOState(state
, IOState::Done
, fd
, handleDownstreamIOCallback
);
949 if (state
->d_isXFR
) {
950 /* Don't reuse the TCP connection after an {A,I}XFR */
951 /* but don't reset it either, we will need to read more messages */
954 releaseDownstreamConnection(std::move(state
->d_downstreamConnection
));
958 state
->d_responseReadTime
= now
;
959 handleResponse(state
, now
);
964 if (state
->d_state
!= IncomingTCPConnectionState::State::sendingQueryToBackend
&&
965 state
->d_state
!= IncomingTCPConnectionState::State::readingResponseSizeFromBackend
&&
966 state
->d_state
!= IncomingTCPConnectionState::State::readingResponseFromBackend
) {
967 vinfolog("Unexpected state %d in handleDownstreamIOCallback", static_cast<int>(state
->d_state
));
970 catch(const std::exception
& e
) {
971 /* most likely an EOF because the other end closed the connection,
972 but it might also be a real IO error or something else.
973 Let's just drop the connection
975 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (state
->d_lastIOState
== IOState::NeedRead
? "reading from" : "writing to"), state
->d_ci
.remote
.toStringWithPort(), e
.what());
976 if (state
->d_state
== IncomingTCPConnectionState::State::sendingQueryToBackend
) {
977 ++state
->d_ds
->tcpDiedSendingQuery
;
980 ++state
->d_ds
->tcpDiedReadingResponse
;
983 /* don't increase this counter when reusing connections */
984 if (state
->d_downstreamConnection
->isFresh()) {
985 ++state
->d_downstreamFailures
;
987 if (state
->d_outstanding
&& state
->d_ds
!= nullptr) {
988 --state
->d_ds
->outstanding
;
989 state
->d_outstanding
= false;
991 /* remove this FD from the IO multiplexer */
992 iostate
= IOState::Done
;
993 connectionDied
= true;
996 if (iostate
== IOState::Done
) {
997 handleNewIOState(state
, iostate
, fd
, handleDownstreamIOCallback
);
1000 handleNewIOState(state
, iostate
, fd
, handleDownstreamIOCallback
, iostate
== IOState::NeedRead
? state
->getBackendReadTTD(now
) : state
->getBackendWriteTTD(now
));
1003 if (connectionDied
) {
1004 sendQueryToBackend(state
, now
);
1008 static void handleDownstreamIOCallback(int fd
, FDMultiplexer::funcparam_t
& param
)
1010 auto state
= boost::any_cast
<std::shared_ptr
<IncomingTCPConnectionState
>>(param
);
1011 if (state
->d_downstreamConnection
== nullptr) {
1012 throw std::runtime_error("No downstream socket in " + std::string(__func__
) + "!");
1014 if (fd
!= state
->d_downstreamConnection
->getHandle()) {
1015 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd
) + " received in " + std::string(__func__
) + ", expected " + std::to_string(state
->d_downstreamConnection
->getHandle()));
1019 gettimeofday(&now
, 0);
1020 handleDownstreamIO(state
, now
);
1023 static void handleIO(std::shared_ptr
<IncomingTCPConnectionState
>& state
, struct timeval
& now
)
1025 int fd
= state
->d_ci
.fd
;
1026 IOState iostate
= IOState::Done
;
1028 if (state
->maxConnectionDurationReached(g_maxTCPConnectionDuration
, now
)) {
1029 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state
->d_ci
.remote
.toStringWithPort());
1030 handleNewIOState(state
, IOState::Done
, fd
, handleIOCallback
);
1035 if (state
->d_state
== IncomingTCPConnectionState::State::doingHandshake
) {
1036 iostate
= state
->d_handler
.tryHandshake();
1037 if (iostate
== IOState::Done
) {
1038 state
->d_handshakeDoneTime
= now
;
1039 state
->d_state
= IncomingTCPConnectionState::State::readingQuerySize
;
1043 if (state
->d_state
== IncomingTCPConnectionState::State::readingQuerySize
) {
1044 iostate
= state
->d_handler
.tryRead(state
->d_buffer
, state
->d_currentPos
, sizeof(uint16_t) - state
->d_currentPos
);
1045 if (iostate
== IOState::Done
) {
1046 state
->d_state
= IncomingTCPConnectionState::State::readingQuery
;
1047 state
->d_querySizeReadTime
= now
;
1048 if (state
->d_queriesCount
== 0) {
1049 state
->d_firstQuerySizeReadTime
= now
;
1051 state
->d_querySize
= state
->d_buffer
.at(0) * 256 + state
->d_buffer
.at(1);
1052 if (state
->d_querySize
< sizeof(dnsheader
)) {
1054 handleNewIOState(state
, IOState::Done
, fd
, handleIOCallback
);
1058 /* allocate a bit more memory to be able to spoof the content,
1059 or to add ECS without allocating a new buffer */
1060 state
->d_buffer
.resize(state
->d_querySize
+ 512);
1061 state
->d_currentPos
= 0;
1065 if (state
->d_state
== IncomingTCPConnectionState::State::readingQuery
) {
1066 iostate
= state
->d_handler
.tryRead(state
->d_buffer
, state
->d_currentPos
, state
->d_querySize
);
1067 if (iostate
== IOState::Done
) {
1068 handleNewIOState(state
, IOState::Done
, fd
, handleIOCallback
);
1069 handleQuery(state
, now
);
1074 if (state
->d_state
== IncomingTCPConnectionState::State::sendingResponse
) {
1075 iostate
= state
->d_handler
.tryWrite(state
->d_responseBuffer
, state
->d_currentPos
, state
->d_responseBuffer
.size());
1076 if (iostate
== IOState::Done
) {
1077 handleResponseSent(state
, now
);
1082 if (state
->d_state
!= IncomingTCPConnectionState::State::doingHandshake
&&
1083 state
->d_state
!= IncomingTCPConnectionState::State::readingQuerySize
&&
1084 state
->d_state
!= IncomingTCPConnectionState::State::readingQuery
&&
1085 state
->d_state
!= IncomingTCPConnectionState::State::sendingResponse
) {
1086 vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(state
->d_state
));
1089 catch(const std::exception
& e
) {
1090 /* most likely an EOF because the other end closed the connection,
1091 but it might also be a real IO error or something else.
1092 Let's just drop the connection
1094 if (state
->d_state
== IncomingTCPConnectionState::State::doingHandshake
||
1095 state
->d_state
== IncomingTCPConnectionState::State::readingQuerySize
||
1096 state
->d_state
== IncomingTCPConnectionState::State::readingQuery
) {
1097 ++state
->d_ci
.cs
->tcpDiedReadingQuery
;
1099 else if (state
->d_state
== IncomingTCPConnectionState::State::sendingResponse
) {
1100 ++state
->d_ci
.cs
->tcpDiedSendingResponse
;
1103 if (state
->d_lastIOState
== IOState::NeedWrite
|| state
->d_readingFirstQuery
) {
1104 vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (state
->d_lastIOState
== IOState::NeedRead
? "reading" : "writing"), state
->d_ci
.remote
.toStringWithPort(), e
.what());
1107 vinfolog("Closing TCP client connection with %s", state
->d_ci
.remote
.toStringWithPort());
1109 /* remove this FD from the IO multiplexer */
1110 iostate
= IOState::Done
;
1113 if (iostate
== IOState::Done
) {
1114 handleNewIOState(state
, iostate
, fd
, handleIOCallback
);
1117 handleNewIOState(state
, iostate
, fd
, handleIOCallback
, iostate
== IOState::NeedRead
? state
->getClientReadTTD(now
) : state
->getClientWriteTTD(now
));
1121 static void handleIOCallback(int fd
, FDMultiplexer::funcparam_t
& param
)
1123 auto state
= boost::any_cast
<std::shared_ptr
<IncomingTCPConnectionState
>>(param
);
1124 if (fd
!= state
->d_ci
.fd
) {
1125 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd
) + " received in " + std::string(__func__
) + ", expected " + std::to_string(state
->d_ci
.fd
));
1128 gettimeofday(&now
, 0);
1130 handleIO(state
, now
);
1133 static void handleIncomingTCPQuery(int pipefd
, FDMultiplexer::funcparam_t
& param
)
1135 auto threadData
= boost::any_cast
<TCPClientThreadData
*>(param
);
1137 ConnectionInfo
* citmp
{nullptr};
1139 ssize_t got
= read(pipefd
, &citmp
, sizeof(citmp
));
1141 throw std::runtime_error("EOF while reading from the TCP acceptor pipe (" + std::to_string(pipefd
) + ") in " + std::string(isNonBlocking(pipefd
) ? "non-blocking" : "blocking") + " mode");
1143 else if (got
== -1) {
1144 if (errno
== EAGAIN
|| errno
== EINTR
) {
1147 throw std::runtime_error("Error while reading from the TCP acceptor pipe (" + std::to_string(pipefd
) + ") in " + std::string(isNonBlocking(pipefd
) ? "non-blocking" : "blocking") + " mode:" + strerror(errno
));
1149 else if (got
!= sizeof(citmp
)) {
1150 throw std::runtime_error("Partial read while reading from the TCP acceptor pipe (" + std::to_string(pipefd
) + ") in " + std::string(isNonBlocking(pipefd
) ? "non-blocking" : "blocking") + " mode");
1154 g_tcpclientthreads
->decrementQueuedCount();
1157 gettimeofday(&now
, 0);
1158 auto state
= std::make_shared
<IncomingTCPConnectionState
>(std::move(*citmp
), *threadData
, now
);
1162 /* let's update the remaining time */
1163 state
->d_remainingTime
= g_maxTCPConnectionDuration
;
1165 handleIO(state
, now
);
1174 void tcpClientThread(int pipefd
)
1176 /* we get launched with a pipe on which we receive file descriptors from clients that we own
1177 from that point on */
1179 setThreadName("dnsdist/tcpClie");
1181 TCPClientThreadData data
;
1183 data
.mplexer
->addReadFD(pipefd
, handleIncomingTCPQuery
, &data
);
1185 gettimeofday(&now
, 0);
1186 time_t lastTCPCleanup
= now
.tv_sec
;
1187 time_t lastTimeoutScan
= now
.tv_sec
;
1190 data
.mplexer
->run(&now
);
1192 if (g_downstreamTCPCleanupInterval
> 0 && (now
.tv_sec
> (lastTCPCleanup
+ g_downstreamTCPCleanupInterval
))) {
1193 cleanupClosedTCPConnections();
1194 lastTCPCleanup
= now
.tv_sec
;
1197 if (now
.tv_sec
> lastTimeoutScan
) {
1198 lastTimeoutScan
= now
.tv_sec
;
1199 auto expiredReadConns
= data
.mplexer
->getTimeouts(now
, false);
1200 for(const auto& conn
: expiredReadConns
) {
1201 auto state
= boost::any_cast
<std::shared_ptr
<IncomingTCPConnectionState
>>(conn
.second
);
1202 if (conn
.first
== state
->d_ci
.fd
) {
1203 vinfolog("Timeout (read) from remote TCP client %s", state
->d_ci
.remote
.toStringWithPort());
1204 ++state
->d_ci
.cs
->tcpClientTimeouts
;
1206 else if (state
->d_ds
) {
1207 vinfolog("Timeout (read) from remote backend %s", state
->d_ds
->getName());
1208 ++state
->d_ci
.cs
->tcpDownstreamTimeouts
;
1209 ++state
->d_ds
->tcpReadTimeouts
;
1211 data
.mplexer
->removeReadFD(conn
.first
);
1212 state
->d_lastIOState
= IOState::Done
;
1215 auto expiredWriteConns
= data
.mplexer
->getTimeouts(now
, true);
1216 for(const auto& conn
: expiredWriteConns
) {
1217 auto state
= boost::any_cast
<std::shared_ptr
<IncomingTCPConnectionState
>>(conn
.second
);
1218 if (conn
.first
== state
->d_ci
.fd
) {
1219 vinfolog("Timeout (write) from remote TCP client %s", state
->d_ci
.remote
.toStringWithPort());
1220 ++state
->d_ci
.cs
->tcpClientTimeouts
;
1222 else if (state
->d_ds
) {
1223 vinfolog("Timeout (write) from remote backend %s", state
->d_ds
->getName());
1224 ++state
->d_ci
.cs
->tcpDownstreamTimeouts
;
1225 ++state
->d_ds
->tcpWriteTimeouts
;
1227 data
.mplexer
->removeWriteFD(conn
.first
);
1228 state
->d_lastIOState
= IOState::Done
;
1234 /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
1235 they will hand off to worker threads & spawn more of them if required
1237 void tcpAcceptorThread(void* p
)
1239 setThreadName("dnsdist/tcpAcce");
1240 ClientState
* cs
= (ClientState
*) p
;
1241 bool tcpClientCountIncremented
= false;
1242 ComboAddress remote
;
1243 remote
.sin4
.sin_family
= cs
->local
.sin4
.sin_family
;
1245 g_tcpclientthreads
->addTCPClientThread();
1247 auto acl
= g_ACL
.getLocal();
1249 bool queuedCounterIncremented
= false;
1250 std::unique_ptr
<ConnectionInfo
> ci
;
1251 tcpClientCountIncremented
= false;
1253 socklen_t remlen
= remote
.getSocklen();
1254 ci
= std::unique_ptr
<ConnectionInfo
>(new ConnectionInfo(cs
));
1256 ci
->fd
= accept4(cs
->tcpFD
, reinterpret_cast<struct sockaddr
*>(&remote
), &remlen
, SOCK_NONBLOCK
);
1258 ci
->fd
= accept(cs
->tcpFD
, reinterpret_cast<struct sockaddr
*>(&remote
), &remlen
);
1260 ++cs
->tcpCurrentConnections
;
1263 throw std::runtime_error((boost::format("accepting new connection on socket: %s") % strerror(errno
)).str());
1266 if(!acl
->match(remote
)) {
1268 vinfolog("Dropped TCP connection from %s because of ACL", remote
.toStringWithPort());
1272 #ifndef HAVE_ACCEPT4
1273 if (!setNonBlocking(ci
->fd
)) {
1277 setTCPNoDelay(ci
->fd
); // disable NAGLE
1278 if(g_maxTCPQueuedConnections
> 0 && g_tcpclientthreads
->getQueuedCount() >= g_maxTCPQueuedConnections
) {
1279 vinfolog("Dropping TCP connection from %s because we have too many queued already", remote
.toStringWithPort());
1283 if (g_maxTCPConnectionsPerClient
) {
1284 std::lock_guard
<std::mutex
> lock(tcpClientsCountMutex
);
1286 if (tcpClientsCount
[remote
] >= g_maxTCPConnectionsPerClient
) {
1287 vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote
.toStringWithPort());
1290 tcpClientsCount
[remote
]++;
1291 tcpClientCountIncremented
= true;
1294 vinfolog("Got TCP connection from %s", remote
.toStringWithPort());
1296 ci
->remote
= remote
;
1297 int pipe
= g_tcpclientthreads
->getThread();
1299 queuedCounterIncremented
= true;
1300 auto tmp
= ci
.release();
1302 writen2WithTimeout(pipe
, &tmp
, sizeof(tmp
), 0);
1311 g_tcpclientthreads
->decrementQueuedCount();
1312 queuedCounterIncremented
= false;
1313 if(tcpClientCountIncremented
) {
1314 decrementTCPClientCount(remote
);
1318 catch(const std::exception
& e
) {
1319 errlog("While reading a TCP question: %s", e
.what());
1320 if(tcpClientCountIncremented
) {
1321 decrementTCPClientCount(remote
);
1323 if (queuedCounterIncremented
) {
1324 g_tcpclientthreads
->decrementQueuedCount();