2 #include "dnsdist-session-cache.hh"
3 #include "dnsdist-tcp-downstream.hh"
4 #include "dnsdist-tcp-upstream.hh"
5 #include "dnsdist-downstream-connection.hh"
7 #include "dnsparser.hh"
9 thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager
;
11 ConnectionToBackend::~ConnectionToBackend()
13 if (d_ds
&& d_handler
) {
14 --d_ds
->tcpCurrentConnections
;
16 gettimeofday(&now
, nullptr);
18 if (d_handler
->isTLS()) {
19 if (d_handler
->hasTLSSessionBeenResumed()) {
20 ++d_ds
->tlsResumptions
;
23 auto sessions
= d_handler
->getTLSSessions();
24 if (!sessions
.empty()) {
25 g_sessionCache
.putSessions(d_ds
->getID(), now
.tv_sec
, std::move(sessions
));
28 catch (const std::exception
& e
) {
29 vinfolog("Unable to get a TLS session: %s", e
.what());
32 auto diff
= now
- d_connectionStartTime
;
33 // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
34 d_ds
->updateTCPMetrics(d_queries
, diff
.tv_sec
* 1000 + diff
.tv_usec
/ 1000);
38 bool ConnectionToBackend::reconnect()
40 std::unique_ptr
<TLSSession
> tlsSession
{nullptr};
42 DEBUGLOG("closing socket "<<d_handler
->getDescriptor());
43 if (d_handler
->isTLS()) {
44 if (d_handler
->hasTLSSessionBeenResumed()) {
45 ++d_ds
->tlsResumptions
;
48 auto sessions
= d_handler
->getTLSSessions();
49 if (!sessions
.empty()) {
50 tlsSession
= std::move(sessions
.back());
52 if (!sessions
.empty()) {
53 g_sessionCache
.putSessions(d_ds
->getID(), time(nullptr), std::move(sessions
));
57 catch (const std::exception
& e
) {
58 vinfolog("Unable to get a TLS session to resume: %s", e
.what());
64 --d_ds
->tcpCurrentConnections
;
68 d_highestStreamID
= 0;
69 d_proxyProtocolPayloadSent
= false;
72 DEBUGLOG("TCP connecting to downstream "<<d_ds
->getNameWithAddr()<<" ("<<d_downstreamFailures
<<")");
73 DEBUGLOG("Opening TCP connection to backend "<<d_ds
->getNameWithAddr());
74 ++d_ds
->tcpNewConnections
;
76 auto socket
= Socket(d_ds
->d_config
.remote
.sin4
.sin_family
, SOCK_STREAM
, 0);
77 DEBUGLOG("result of socket() is "<<socket
.getHandle());
79 /* disable NAGLE, which does not play nicely with delayed ACKs.
80 In theory we could be wasting up to 500 milliseconds waiting for
81 the other end to acknowledge our initial packet before we could
83 setTCPNoDelay(socket
.getHandle());
85 #ifdef SO_BINDTODEVICE
86 if (!d_ds
->d_config
.sourceItfName
.empty()) {
87 int res
= setsockopt(socket
.getHandle(), SOL_SOCKET
, SO_BINDTODEVICE
, d_ds
->d_config
.sourceItfName
.c_str(), d_ds
->d_config
.sourceItfName
.length());
89 vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds
->getNameWithAddr(), stringerror());
94 if (!IsAnyAddress(d_ds
->d_config
.sourceAddr
)) {
95 SSetsockopt(socket
.getHandle(), SOL_SOCKET
, SO_REUSEADDR
, 1);
96 #ifdef IP_BIND_ADDRESS_NO_PORT
97 if (d_ds
->d_config
.ipBindAddrNoPort
) {
98 SSetsockopt(socket
.getHandle(), SOL_IP
, IP_BIND_ADDRESS_NO_PORT
, 1);
101 socket
.bind(d_ds
->d_config
.sourceAddr
, false);
103 socket
.setNonBlocking();
105 gettimeofday(&d_connectionStartTime
, nullptr);
106 auto handler
= std::make_unique
<TCPIOHandler
>(d_ds
->d_config
.d_tlsSubjectName
, d_ds
->d_config
.d_tlsSubjectIsAddr
, socket
.releaseHandle(), timeval
{0,0}, d_ds
->d_tlsCtx
);
107 if (!tlsSession
&& d_ds
->d_tlsCtx
) {
108 tlsSession
= g_sessionCache
.getSession(d_ds
->getID(), d_connectionStartTime
.tv_sec
);
111 handler
->setTLSSession(tlsSession
);
113 handler
->tryConnect(d_ds
->d_config
.tcpFastOpen
&& isFastOpenEnabled(), d_ds
->d_config
.remote
);
116 d_handler
= std::move(handler
);
117 d_ds
->incCurrentConnectionsCount();
120 catch (const std::runtime_error
& e
) {
121 vinfolog("Connection to downstream server %s failed: %s", d_ds
->getNameWithAddr(), e
.what());
122 d_downstreamFailures
++;
123 if (d_downstreamFailures
>= d_ds
->d_config
.d_retries
) {
128 while (d_downstreamFailures
< d_ds
->d_config
.d_retries
);
133 TCPConnectionToBackend::~TCPConnectionToBackend()
135 if (d_ds
&& !d_pendingResponses
.empty()) {
136 d_ds
->outstanding
-= d_pendingResponses
.size();
140 void TCPConnectionToBackend::release(){
141 d_ds
->outstanding
-= d_pendingResponses
.size();
143 d_pendingResponses
.clear();
144 d_pendingQueries
.clear();
150 auto shared
= std::dynamic_pointer_cast
<TCPConnectionToBackend
>(shared_from_this());
151 if (!willBeReusable(true)) {
152 /* remove ourselves from the connection cache, this might mean that our
153 reference count drops to zero after that, so we need to be careful */
154 t_downstreamTCPConnectionsManager
.removeDownstreamConnection(shared
);
158 static uint32_t getSerialFromRawSOAContent(const std::vector
<uint8_t>& raw
)
160 /* minimal size for a SOA record, as defined by rfc1035:
170 if (raw
.size() < 22) {
171 throw std::runtime_error("Invalid content of size " + std::to_string(raw
.size()) + " for a SOA record");
173 /* As rfc1025 states that "all domain names in the RDATA section of these RRs may be compressed",
174 and we don't want to parse these names, start at the end */
176 memcpy(&serial
, &raw
.at(raw
.size() - 20), sizeof(serial
));
177 return ntohl(serial
);
180 static bool getSerialFromIXFRQuery(TCPQuery
& query
)
183 size_t proxyPayloadSize
= query
.d_proxyProtocolPayloadAdded
? query
.d_idstate
.d_proxyProtocolPayloadSize
: 0;
184 if (query
.d_buffer
.size() <= (proxyPayloadSize
+ sizeof(uint16_t))) {
188 size_t payloadSize
= query
.d_buffer
.size() - sizeof(uint16_t) - proxyPayloadSize
;
190 MOADNSParser
parser(true, reinterpret_cast<const char*>(query
.d_buffer
.data() + sizeof(uint16_t) + proxyPayloadSize
), payloadSize
);
192 for (const auto& record
: parser
.d_answers
) {
193 if (record
.first
.d_place
!= DNSResourceRecord::AUTHORITY
|| record
.first
.d_class
!= QClass::IN
|| record
.first
.d_type
!= QType::SOA
) {
197 auto unknownContent
= getRR
<UnknownRecordContent
>(record
.first
);
198 if (!unknownContent
) {
201 const auto& raw
= unknownContent
->getRawContent();
202 query
.d_ixfrQuerySerial
= getSerialFromRawSOAContent(raw
);
206 catch (const MOADNSException
& e
) {
207 DEBUGLOG("Exception when parsing IXFR TCP Query to DNS: " << e
.what());
208 /* ponder what to do here, shall we close the connection? */
214 static void editPayloadID(PacketBuffer
& payload
, uint16_t newId
, size_t proxyProtocolPayloadSize
, bool sizePrepended
)
216 /* we cannot do a direct cast as the alignment might be off (the size of the payload might have been prepended, which is bad enough,
217 but we might also have a proxy protocol payload */
218 size_t startOfHeaderOffset
= (sizePrepended
? sizeof(uint16_t) : 0) + proxyProtocolPayloadSize
;
219 if (payload
.size() < startOfHeaderOffset
+ sizeof(dnsheader
)) {
220 throw std::runtime_error("Invalid buffer for outgoing TCP query (size " + std::to_string(payload
.size()));
222 uint16_t id
= htons(newId
);
223 memcpy(&payload
.at(startOfHeaderOffset
), &id
, sizeof(id
));
226 enum class QueryState
: uint8_t {
231 enum class ConnectionState
: uint8_t {
236 static void prepareQueryForSending(TCPQuery
& query
, uint16_t id
, QueryState queryState
, ConnectionState connectionState
)
238 if (connectionState
== ConnectionState::needProxy
) {
239 if (query
.d_proxyProtocolPayload
.size() > 0 && !query
.d_proxyProtocolPayloadAdded
) {
240 query
.d_buffer
.insert(query
.d_buffer
.begin(), query
.d_proxyProtocolPayload
.begin(), query
.d_proxyProtocolPayload
.end());
241 query
.d_proxyProtocolPayloadAdded
= true;
242 query
.d_idstate
.d_proxyProtocolPayloadSize
= query
.d_proxyProtocolPayload
.size();
245 else if (connectionState
== ConnectionState::proxySent
) {
246 if (query
.d_proxyProtocolPayloadAdded
) {
247 if (query
.d_buffer
.size() < query
.d_idstate
.d_proxyProtocolPayloadSize
) {
248 throw std::runtime_error("Trying to remove a proxy protocol payload of size " + std::to_string(query
.d_proxyProtocolPayload
.size()) + " from a buffer of size " + std::to_string(query
.d_buffer
.size()));
250 // NOLINTNEXTLINE(*-narrowing-conversions): the size of the payload is limited to 2^16-1
251 query
.d_buffer
.erase(query
.d_buffer
.begin(), query
.d_buffer
.begin() + static_cast<ssize_t
>(query
.d_idstate
.d_proxyProtocolPayloadSize
));
252 query
.d_proxyProtocolPayloadAdded
= false;
253 query
.d_idstate
.d_proxyProtocolPayloadSize
= 0;
256 if (query
.d_idstate
.qclass
== QClass::IN
&& query
.d_idstate
.qtype
== QType::IXFR
) {
257 getSerialFromIXFRQuery(query
);
260 editPayloadID(query
.d_buffer
, id
, query
.d_proxyProtocolPayloadAdded
? query
.d_idstate
.d_proxyProtocolPayloadSize
: 0, true);
263 IOState
TCPConnectionToBackend::queueNextQuery(std::shared_ptr
<TCPConnectionToBackend
>& conn
)
265 conn
->d_currentQuery
= std::move(conn
->d_pendingQueries
.front());
267 uint16_t id
= conn
->d_highestStreamID
;
268 prepareQueryForSending(conn
->d_currentQuery
.d_query
, id
, QueryState::hasSizePrepended
, conn
->needProxyProtocolPayload() ? ConnectionState::needProxy
: ConnectionState::proxySent
);
270 conn
->d_pendingQueries
.pop_front();
271 conn
->d_state
= State::sendingQueryToBackend
;
272 conn
->d_currentPos
= 0;
274 return IOState::NeedWrite
;
277 IOState
TCPConnectionToBackend::sendQuery(std::shared_ptr
<TCPConnectionToBackend
>& conn
, const struct timeval
& now
)
279 DEBUGLOG("sending query to backend "<<conn
->getDS()->getNameWithAddr()<<" over FD "<<conn
->d_handler
->getDescriptor());
281 IOState state
= conn
->d_handler
->tryWrite(conn
->d_currentQuery
.d_query
.d_buffer
, conn
->d_currentPos
, conn
->d_currentQuery
.d_query
.d_buffer
.size());
283 if (state
!= IOState::Done
) {
287 DEBUGLOG("query sent to backend");
289 if (conn
->d_currentQuery
.d_query
.d_proxyProtocolPayloadAdded
) {
290 conn
->d_proxyProtocolPayloadSent
= true;
293 conn
->d_currentPos
= 0;
295 DEBUGLOG("adding a pending response for ID "<<conn
->d_highestStreamID
<<" and QNAME "<<conn
->d_currentQuery
.d_query
.d_idstate
.qname
);
296 auto res
= conn
->d_pendingResponses
.insert({conn
->d_highestStreamID
, std::move(conn
->d_currentQuery
)});
297 /* if there was already a pending response with that ID, we messed up and we don't expect more
300 ++conn
->d_ds
->outstanding
;
302 ++conn
->d_highestStreamID
;
303 conn
->d_currentQuery
.d_sender
.reset();
304 conn
->d_currentQuery
.d_query
.d_buffer
.clear();
309 void TCPConnectionToBackend::handleIO(std::shared_ptr
<TCPConnectionToBackend
>& conn
, const struct timeval
& now
)
311 if (conn
->d_handler
== nullptr) {
312 throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__
) + "!");
315 bool connectionDied
= false;
316 IOState iostate
= IOState::Done
;
317 IOStateGuard
ioGuard(conn
->d_ioState
);
318 bool reconnected
= false;
324 if (conn
->d_state
== State::sendingQueryToBackend
) {
325 iostate
= sendQuery(conn
, now
);
327 while (iostate
== IOState::Done
&& !conn
->d_pendingQueries
.empty()) {
328 queueNextQuery(conn
);
329 iostate
= sendQuery(conn
, now
);
332 if (iostate
== IOState::Done
&& conn
->d_pendingQueries
.empty()) {
333 conn
->d_state
= State::waitingForResponseFromBackend
;
334 conn
->d_currentPos
= 0;
335 conn
->d_responseBuffer
.resize(sizeof(uint16_t));
336 iostate
= IOState::NeedRead
;
340 if (conn
->d_state
== State::waitingForResponseFromBackend
||
341 conn
->d_state
== State::readingResponseSizeFromBackend
) {
342 DEBUGLOG("reading response size from backend");
343 // then we need to allocate a new buffer (new because we might need to re-send the query if the
344 // backend dies on us)
345 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
346 conn
->d_responseBuffer
.resize(sizeof(uint16_t));
347 iostate
= conn
->d_handler
->tryRead(conn
->d_responseBuffer
, conn
->d_currentPos
, sizeof(uint16_t));
348 if (iostate
== IOState::Done
) {
349 DEBUGLOG("got response size from backend");
350 conn
->d_state
= State::readingResponseFromBackend
;
351 conn
->d_responseSize
= conn
->d_responseBuffer
.at(0) * 256 + conn
->d_responseBuffer
.at(1);
352 conn
->d_responseBuffer
.reserve(conn
->d_responseSize
+ /* we will need to prepend the size later */ 2);
353 conn
->d_responseBuffer
.resize(conn
->d_responseSize
);
354 conn
->d_currentPos
= 0;
355 conn
->d_lastDataReceivedTime
= now
;
357 else if (conn
->d_state
== State::waitingForResponseFromBackend
&& conn
->d_currentPos
> 0) {
358 conn
->d_state
= State::readingResponseSizeFromBackend
;
362 if (conn
->d_state
== State::readingResponseFromBackend
) {
363 DEBUGLOG("reading response from backend");
364 iostate
= conn
->d_handler
->tryRead(conn
->d_responseBuffer
, conn
->d_currentPos
, conn
->d_responseSize
);
365 if (iostate
== IOState::Done
) {
366 DEBUGLOG("got response from backend");
368 conn
->d_lastDataReceivedTime
= now
;
369 iostate
= conn
->handleResponse(conn
, now
);
371 catch (const std::exception
& e
) {
372 vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn
->d_ds
? conn
->d_ds
->getNameWithAddr() : "unknown", conn
->d_currentQuery
.d_query
.d_idstate
.origRemote
.toStringWithPort(), e
.what());
380 if (conn
->d_state
!= State::idle
&&
381 conn
->d_state
!= State::sendingQueryToBackend
&&
382 conn
->d_state
!= State::waitingForResponseFromBackend
&&
383 conn
->d_state
!= State::readingResponseSizeFromBackend
&&
384 conn
->d_state
!= State::readingResponseFromBackend
) {
385 vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn
->d_state
));
388 catch (const std::exception
& e
) {
389 /* most likely an EOF because the other end closed the connection,
390 but it might also be a real IO error or something else.
391 Let's just drop the connection
393 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn
->d_state
== State::sendingQueryToBackend
? "writing to" : "reading from"), conn
->d_currentQuery
.d_query
.d_idstate
.origRemote
.toStringWithPort(), e
.what());
395 if (conn
->d_state
== State::sendingQueryToBackend
) {
396 ++conn
->d_ds
->tcpDiedSendingQuery
;
398 else if (conn
->d_state
!= State::idle
) {
399 ++conn
->d_ds
->tcpDiedReadingResponse
;
402 /* don't increase this counter when reusing connections */
404 ++conn
->d_downstreamFailures
;
407 /* remove this FD from the IO multiplexer */
408 iostate
= IOState::Done
;
409 connectionDied
= true;
412 if (connectionDied
) {
414 DEBUGLOG("connection died, number of failures is "<<conn
->d_downstreamFailures
<<", retries is "<<conn
->d_ds
->d_config
.d_retries
);
416 if (conn
->d_downstreamFailures
< conn
->d_ds
->d_config
.d_retries
) {
418 conn
->d_ioState
.reset();
422 if (conn
->reconnect()) {
423 conn
->d_ioState
= make_unique
<IOStateHandler
>(*conn
->d_mplexer
, conn
->d_handler
->getDescriptor());
425 /* we need to resend the queries that were in flight, if any */
426 if (conn
->d_state
== State::sendingQueryToBackend
) {
427 /* we need to edit this query so it has the correct ID */
428 auto query
= std::move(conn
->d_currentQuery
);
429 uint16_t id
= conn
->d_highestStreamID
;
430 prepareQueryForSending(query
.d_query
, id
, QueryState::hasSizePrepended
, ConnectionState::needProxy
);
431 conn
->d_currentQuery
= std::move(query
);
434 /* if we notify the sender it might terminate us so we need to move these first */
435 auto pendingResponses
= std::move(conn
->d_pendingResponses
);
436 conn
->d_pendingResponses
.clear();
437 for (auto& pending
: pendingResponses
) {
438 --conn
->d_ds
->outstanding
;
440 if (pending
.second
.d_query
.isXFR() && pending
.second
.d_query
.d_xfrStarted
) {
441 /* this one can't be restarted, sorry */
442 DEBUGLOG("A XFR for which a response has already been sent cannot be restarted");
444 TCPResponse
response(std::move(pending
.second
.d_query
));
445 pending
.second
.d_sender
->notifyIOError(now
, std::move(response
));
447 catch (const std::exception
& e
) {
448 vinfolog("Got an exception while notifying: %s", e
.what());
451 vinfolog("Got exception while notifying");
455 conn
->d_pendingQueries
.push_back(std::move(pending
.second
));
458 conn
->d_currentPos
= 0;
460 if (conn
->d_state
== State::sendingQueryToBackend
) {
461 iostate
= IOState::NeedWrite
;
462 // resume sending query
465 if (conn
->d_pendingQueries
.empty()) {
466 throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn
->d_state
) + " with no pending queries");
469 iostate
= queueNextQuery(conn
);
473 connectionDied
= false;
476 catch (const std::exception
& e
) {
477 // reconnect might throw on failure, let's ignore that, we just need to know
483 /* reconnect failed, we give up */
484 DEBUGLOG("reconnect failed, we give up");
485 ++conn
->d_ds
->tcpGaveUp
;
486 conn
->notifyAllQueriesFailed(now
, FailureReason::gaveUp
);
490 if (conn
->d_ioState
) {
491 if (iostate
== IOState::Done
) {
492 conn
->d_ioState
->update(iostate
, handleIOCallback
, conn
);
495 boost::optional
<struct timeval
> ttd
{boost::none
};
496 if (iostate
== IOState::NeedRead
) {
497 ttd
= conn
->getBackendReadTTD(now
);
499 else if (conn
->isFresh() && conn
->d_queries
== 0) {
500 /* first write just after the non-blocking connect */
501 ttd
= conn
->getBackendConnectTTD(now
);
504 ttd
= conn
->getBackendWriteTTD(now
);
507 conn
->d_ioState
->update(iostate
, handleIOCallback
, conn
, ttd
);
516 void TCPConnectionToBackend::handleIOCallback(int fd
, FDMultiplexer::funcparam_t
& param
)
518 auto conn
= boost::any_cast
<std::shared_ptr
<TCPConnectionToBackend
>>(param
);
519 if (fd
!= conn
->getHandle()) {
520 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd
) + " received in " + std::string(__PRETTY_FUNCTION__
) + ", expected " + std::to_string(conn
->getHandle()));
524 gettimeofday(&now
, nullptr);
528 void TCPConnectionToBackend::queueQuery(std::shared_ptr
<TCPQuerySender
>& sender
, TCPQuery
&& query
)
531 d_ioState
= make_unique
<IOStateHandler
>(*d_mplexer
, d_handler
->getDescriptor());
534 // if we are not already sending a query or in the middle of reading a response (so idle),
535 // start sending the query
536 if (d_state
== State::idle
|| d_state
== State::waitingForResponseFromBackend
) {
537 DEBUGLOG("Sending new query to backend right away, with ID "<<d_highestStreamID
);
538 d_state
= State::sendingQueryToBackend
;
541 uint16_t id
= d_highestStreamID
;
543 d_currentQuery
= PendingRequest({sender
, std::move(query
)});
544 prepareQueryForSending(d_currentQuery
.d_query
, id
, QueryState::hasSizePrepended
, needProxyProtocolPayload() ? ConnectionState::needProxy
: ConnectionState::proxySent
);
547 gettimeofday(&now
, 0);
549 auto shared
= std::dynamic_pointer_cast
<TCPConnectionToBackend
>(shared_from_this());
550 handleIO(shared
, now
);
553 DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state
);
554 // store query in the list of queries to send
555 d_pendingQueries
.push_back(PendingRequest({sender
, std::move(query
)}));
559 void TCPConnectionToBackend::handleTimeout(const struct timeval
& now
, bool write
)
561 /* in some cases we could retry, here, reconnecting and sending our pending responses again */
563 if (isFresh() && d_queries
== 0) {
564 ++d_ds
->tcpConnectTimeouts
;
565 vinfolog("Timeout while connecting to TCP backend %s", d_ds
->getNameWithAddr());
568 ++d_ds
->tcpWriteTimeouts
;
569 vinfolog("Timeout while writing to TCP backend %s", d_ds
->getNameWithAddr());
573 ++d_ds
->tcpReadTimeouts
;
574 vinfolog("Timeout while reading from TCP backend %s", d_ds
->getNameWithAddr());
578 notifyAllQueriesFailed(now
, FailureReason::timeout
);
580 catch (const std::exception
& e
) {
581 vinfolog("Got an exception while notifying a timeout: %s", e
.what());
584 vinfolog("Got exception while notifying a timeout");
590 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval
& now
, FailureReason reason
)
592 d_connectionDied
= true;
593 d_ds
->reportTimeoutOrError();
595 /* we might be terminated while notifying a query sender */
596 d_ds
->outstanding
-= d_pendingResponses
.size();
597 auto pendingQueries
= std::move(d_pendingQueries
);
598 d_pendingQueries
.clear();
599 auto pendingResponses
= std::move(d_pendingResponses
);
600 d_pendingResponses
.clear();
602 auto increaseCounters
= [reason
](const ClientState
* cs
) {
603 if (reason
== FailureReason::timeout
) {
605 ++cs
->tcpDownstreamTimeouts
;
608 else if (reason
== FailureReason::gaveUp
) {
616 if (d_state
== State::sendingQueryToBackend
) {
617 increaseCounters(d_currentQuery
.d_query
.d_idstate
.cs
);
618 auto sender
= d_currentQuery
.d_sender
;
619 if (sender
->active()) {
620 TCPResponse
response(std::move(d_currentQuery
.d_query
));
621 sender
->notifyIOError(now
, std::move(response
));
625 for (auto& query
: pendingQueries
) {
626 increaseCounters(query
.d_query
.d_idstate
.cs
);
627 auto sender
= query
.d_sender
;
628 if (sender
->active()) {
629 TCPResponse
response(std::move(query
.d_query
));
630 sender
->notifyIOError(now
, std::move(response
));
634 for (auto& response
: pendingResponses
) {
635 increaseCounters(response
.second
.d_query
.d_idstate
.cs
);
636 auto sender
= response
.second
.d_sender
;
637 if (sender
->active()) {
638 TCPResponse
tresp(std::move(response
.second
.d_query
));
639 sender
->notifyIOError(now
, std::move(tresp
));
643 catch (const std::exception
& e
) {
644 vinfolog("Got an exception while notifying: %s", e
.what());
647 vinfolog("Got exception while notifying");
653 IOState
TCPConnectionToBackend::handleResponse(std::shared_ptr
<TCPConnectionToBackend
>& conn
, const struct timeval
& now
)
655 d_downstreamFailures
= 0;
657 uint16_t queryId
= 0;
659 queryId
= getQueryIdFromResponse();
661 catch (const std::exception
& e
) {
662 DEBUGLOG("Unable to get query ID");
663 notifyAllQueriesFailed(now
, FailureReason::unexpectedQueryID
);
667 auto it
= d_pendingResponses
.find(queryId
);
668 if (it
== d_pendingResponses
.end()) {
669 DEBUGLOG("could not find any corresponding query for ID "<<queryId
<<". This is likely a duplicated ID over the same TCP connection, giving up!");
670 notifyAllQueriesFailed(now
, FailureReason::unexpectedQueryID
);
671 return IOState::Done
;
674 editPayloadID(d_responseBuffer
, ntohs(it
->second
.d_query
.d_idstate
.origID
), 0, false);
676 auto sender
= it
->second
.d_sender
;
678 if (sender
->active() && it
->second
.d_query
.isXFR()) {
681 TCPResponse response
;
682 response
.d_buffer
= std::move(d_responseBuffer
);
683 response
.d_connection
= conn
;
684 response
.d_ds
= conn
->d_ds
;
685 const auto& queryIDS
= it
->second
.d_query
.d_idstate
;
686 /* we don't move the whole IDS because we will need it for the responses to come */
687 response
.d_idstate
= queryIDS
.partialCloneForXFR();
688 DEBUGLOG("passing XFRresponse to client connection for "<<response
.d_idstate
.qname
);
690 it
->second
.d_query
.d_xfrStarted
= true;
691 done
= isXFRFinished(response
, it
->second
.d_query
);
694 d_pendingResponses
.erase(it
);
695 --conn
->d_ds
->outstanding
;
696 /* marking as idle for now, so we can accept new queries if our queues are empty */
697 if (d_pendingQueries
.empty() && d_pendingResponses
.empty()) {
698 d_state
= State::idle
;
699 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
703 sender
->handleXFRResponse(now
, std::move(response
));
705 d_state
= State::idle
;
706 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
707 return IOState::Done
;
710 d_state
= State::waitingForResponseFromBackend
;
712 d_responseBuffer
.resize(sizeof(uint16_t));
713 // get ready to read the next packet, if any
714 return IOState::NeedRead
;
717 --conn
->d_ds
->outstanding
;
718 auto ids
= std::move(it
->second
.d_query
.d_idstate
);
719 const double udiff
= ids
.queryRealTime
.udiff();
720 conn
->d_ds
->updateTCPLatency(udiff
);
721 if (d_responseBuffer
.size() >= sizeof(dnsheader
)) {
723 memcpy(&dh
, d_responseBuffer
.data(), sizeof(dh
));
724 conn
->d_ds
->reportResponse(dh
.rcode
);
727 conn
->d_ds
->reportTimeoutOrError();
730 d_pendingResponses
.erase(it
);
731 /* marking as idle for now, so we can accept new queries if our queues are empty */
732 if (d_pendingQueries
.empty() && d_pendingResponses
.empty()) {
733 d_state
= State::idle
;
734 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
736 else if (!d_pendingResponses
.empty()) {
738 d_state
= State::waitingForResponseFromBackend
;
741 // be very careful that handleResponse() might trigger new queries being assigned to us,
742 // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
743 // anything without checking first
745 if (sender
->active()) {
746 DEBUGLOG("passing response to client connection for "<<ids
.qname
);
747 // make sure that we still exist after calling handleResponse()
748 TCPResponse
response(std::move(d_responseBuffer
), std::move(ids
), conn
, conn
->d_ds
);
749 sender
->handleResponse(now
, std::move(response
));
752 if (!d_pendingQueries
.empty()) {
753 DEBUGLOG("still have some queries to send");
754 return queueNextQuery(shared
);
756 else if (!d_pendingResponses
.empty()) {
757 DEBUGLOG("still have some responses to read");
758 return IOState::NeedRead
;
761 DEBUGLOG("nothing to do, waiting for a new query");
762 d_state
= State::idle
;
763 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
764 return IOState::Done
;
768 uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
770 if (d_responseBuffer
.size() < sizeof(dnsheader
)) {
771 throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer
.size()) + ") response from " + d_ds
->getNameWithAddr());
775 memcpy(&id
, &d_responseBuffer
.at(0), sizeof(id
));
779 void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr
<std::vector
<ProxyProtocolValue
>>&& proxyProtocolValuesSent
)
781 /* if we already have some values, we have already verified they match */
782 if (!d_proxyProtocolValuesSent
) {
783 d_proxyProtocolValuesSent
= std::move(proxyProtocolValuesSent
);
787 bool TCPConnectionToBackend::matchesTLVs(const std::unique_ptr
<std::vector
<ProxyProtocolValue
>>& tlvs
) const
789 if (tlvs
== nullptr) {
790 if (d_proxyProtocolValuesSent
== nullptr) {
798 if (d_proxyProtocolValuesSent
== nullptr) {
802 return *tlvs
== *d_proxyProtocolValuesSent
;
805 bool TCPConnectionToBackend::isXFRFinished(const TCPResponse
& response
, TCPQuery
& query
)
810 MOADNSParser
parser(true, reinterpret_cast<const char*>(response
.d_buffer
.data()), response
.d_buffer
.size());
812 if (parser
.d_header
.rcode
!= 0U) {
816 for (const auto& record
: parser
.d_answers
) {
817 if (record
.first
.d_class
!= QClass::IN
|| record
.first
.d_type
!= QType::SOA
) {
821 auto unknownContent
= getRR
<UnknownRecordContent
>(record
.first
);
822 if (!unknownContent
) {
825 const auto& raw
= unknownContent
->getRawContent();
826 auto serial
= getSerialFromRawSOAContent(raw
);
827 if (query
.d_xfrPrimarySerial
== 0) {
828 // store the first SOA in our client's connection metadata
829 query
.d_xfrPrimarySerial
= serial
;
830 if (query
.d_idstate
.qtype
== QType::IXFR
&& (query
.d_xfrPrimarySerial
== query
.d_ixfrQuerySerial
|| rfc1982LessThan(query
.d_xfrPrimarySerial
, query
.d_ixfrQuerySerial
))) {
831 /* This is the first message with a primary SOA:
833 If an IXFR query with the same or newer version number
834 than that of the server is received, it is replied to
835 with a single SOA record of the server's current version.
842 ++query
.d_xfrSerialCount
;
843 if (serial
== query
.d_xfrPrimarySerial
) {
844 ++query
.d_xfrPrimarySerialCount
;
845 // figure out if it's end when receiving primary's SOA again
846 if (query
.d_xfrSerialCount
== 2) {
847 // if there are only two SOA records marks a finished AXFR
851 if (query
.d_xfrPrimarySerialCount
== 3) {
852 // receiving primary's SOA 3 times marks a finished IXFR
860 catch (const MOADNSException
& e
) {
861 DEBUGLOG("Exception when parsing TCPResponse to DNS: " << e
.what());
862 /* ponder what to do here, shall we close the connection? */
867 void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max
)
869 DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max
);
872 void setTCPDownstreamCleanupInterval(uint64_t interval
)
874 DownstreamTCPConnectionsManager::setCleanupInterval(interval
);
877 void setTCPDownstreamMaxIdleTime(uint64_t max
)
879 DownstreamTCPConnectionsManager::setMaxIdleTime(max
);