2 #include "dnsdist-session-cache.hh"
3 #include "dnsdist-tcp-downstream.hh"
4 #include "dnsdist-tcp-upstream.hh"
5 #include "dnsdist-downstream-connection.hh"
7 #include "dnsparser.hh"
9 thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager
;
11 ConnectionToBackend::~ConnectionToBackend()
13 if (d_ds
&& d_handler
) {
14 --d_ds
->tcpCurrentConnections
;
16 gettimeofday(&now
, nullptr);
18 if (d_handler
->isTLS()) {
19 if (d_handler
->hasTLSSessionBeenResumed()) {
20 ++d_ds
->tlsResumptions
;
23 auto sessions
= d_handler
->getTLSSessions();
24 if (!sessions
.empty()) {
25 g_sessionCache
.putSessions(d_ds
->getID(), now
.tv_sec
, std::move(sessions
));
28 catch (const std::exception
& e
) {
29 vinfolog("Unable to get a TLS session: %s", e
.what());
32 auto diff
= now
- d_connectionStartTime
;
33 // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
34 d_ds
->updateTCPMetrics(d_queries
, diff
.tv_sec
* 1000 + diff
.tv_usec
/ 1000);
38 bool ConnectionToBackend::reconnect()
40 std::unique_ptr
<TLSSession
> tlsSession
{nullptr};
42 DEBUGLOG("closing socket "<<d_handler
->getDescriptor());
43 if (d_handler
->isTLS()) {
44 if (d_handler
->hasTLSSessionBeenResumed()) {
45 ++d_ds
->tlsResumptions
;
48 auto sessions
= d_handler
->getTLSSessions();
49 if (!sessions
.empty()) {
50 tlsSession
= std::move(sessions
.back());
52 if (!sessions
.empty()) {
53 g_sessionCache
.putSessions(d_ds
->getID(), time(nullptr), std::move(sessions
));
57 catch (const std::exception
& e
) {
58 vinfolog("Unable to get a TLS session to resume: %s", e
.what());
64 --d_ds
->tcpCurrentConnections
;
68 d_highestStreamID
= 0;
69 d_proxyProtocolPayloadSent
= false;
72 DEBUGLOG("TCP connecting to downstream "<<d_ds
->getNameWithAddr()<<" ("<<d_downstreamFailures
<<")");
73 DEBUGLOG("Opening TCP connection to backend "<<d_ds
->getNameWithAddr());
74 ++d_ds
->tcpNewConnections
;
76 auto socket
= Socket(d_ds
->d_config
.remote
.sin4
.sin_family
, SOCK_STREAM
, 0);
77 DEBUGLOG("result of socket() is "<<socket
.getHandle());
79 /* disable NAGLE, which does not play nicely with delayed ACKs.
80 In theory we could be wasting up to 500 milliseconds waiting for
81 the other end to acknowledge our initial packet before we could
83 setTCPNoDelay(socket
.getHandle());
85 #ifdef SO_BINDTODEVICE
86 if (!d_ds
->d_config
.sourceItfName
.empty()) {
87 int res
= setsockopt(socket
.getHandle(), SOL_SOCKET
, SO_BINDTODEVICE
, d_ds
->d_config
.sourceItfName
.c_str(), d_ds
->d_config
.sourceItfName
.length());
89 vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds
->getNameWithAddr(), stringerror());
94 if (!IsAnyAddress(d_ds
->d_config
.sourceAddr
)) {
95 SSetsockopt(socket
.getHandle(), SOL_SOCKET
, SO_REUSEADDR
, 1);
96 #ifdef IP_BIND_ADDRESS_NO_PORT
97 if (d_ds
->d_config
.ipBindAddrNoPort
) {
98 SSetsockopt(socket
.getHandle(), SOL_IP
, IP_BIND_ADDRESS_NO_PORT
, 1);
101 socket
.bind(d_ds
->d_config
.sourceAddr
, false);
103 socket
.setNonBlocking();
105 gettimeofday(&d_connectionStartTime
, nullptr);
106 auto handler
= std::make_unique
<TCPIOHandler
>(d_ds
->d_config
.d_tlsSubjectName
, d_ds
->d_config
.d_tlsSubjectIsAddr
, socket
.releaseHandle(), timeval
{0,0}, d_ds
->d_tlsCtx
);
107 if (!tlsSession
&& d_ds
->d_tlsCtx
) {
108 tlsSession
= g_sessionCache
.getSession(d_ds
->getID(), d_connectionStartTime
.tv_sec
);
111 handler
->setTLSSession(tlsSession
);
113 handler
->tryConnect(d_ds
->d_config
.tcpFastOpen
&& isFastOpenEnabled(), d_ds
->d_config
.remote
);
116 d_handler
= std::move(handler
);
117 d_ds
->incCurrentConnectionsCount();
120 catch (const std::runtime_error
& e
) {
121 vinfolog("Connection to downstream server %s failed: %s", d_ds
->getNameWithAddr(), e
.what());
122 d_downstreamFailures
++;
123 if (d_downstreamFailures
>= d_ds
->d_config
.d_retries
) {
128 while (d_downstreamFailures
< d_ds
->d_config
.d_retries
);
133 TCPConnectionToBackend::~TCPConnectionToBackend()
135 if (d_ds
&& !d_pendingResponses
.empty()) {
136 d_ds
->outstanding
-= d_pendingResponses
.size();
140 void TCPConnectionToBackend::release(){
141 d_ds
->outstanding
-= d_pendingResponses
.size();
143 d_pendingResponses
.clear();
144 d_pendingQueries
.clear();
151 static uint32_t getSerialFromRawSOAContent(const std::vector
<uint8_t>& raw
)
153 /* minimal size for a SOA record, as defined by rfc1035:
163 if (raw
.size() < 22) {
164 throw std::runtime_error("Invalid content of size " + std::to_string(raw
.size()) + " for a SOA record");
166 /* As rfc1025 states that "all domain names in the RDATA section of these RRs may be compressed",
167 and we don't want to parse these names, start at the end */
169 memcpy(&serial
, &raw
.at(raw
.size() - 20), sizeof(serial
));
170 return ntohl(serial
);
173 static bool getSerialFromIXFRQuery(TCPQuery
& query
)
176 size_t proxyPayloadSize
= query
.d_proxyProtocolPayloadAdded
? query
.d_idstate
.d_proxyProtocolPayloadSize
: 0;
177 if (query
.d_buffer
.size() <= (proxyPayloadSize
+ sizeof(uint16_t))) {
181 size_t payloadSize
= query
.d_buffer
.size() - sizeof(uint16_t) - proxyPayloadSize
;
183 MOADNSParser
parser(true, reinterpret_cast<const char*>(query
.d_buffer
.data() + sizeof(uint16_t) + proxyPayloadSize
), payloadSize
);
185 for (const auto& record
: parser
.d_answers
) {
186 if (record
.first
.d_place
!= DNSResourceRecord::AUTHORITY
|| record
.first
.d_class
!= QClass::IN
|| record
.first
.d_type
!= QType::SOA
) {
190 auto unknownContent
= getRR
<UnknownRecordContent
>(record
.first
);
191 if (!unknownContent
) {
194 auto raw
= unknownContent
->getRawContent();
195 query
.d_ixfrQuerySerial
= getSerialFromRawSOAContent(raw
);
199 catch (const MOADNSException
& e
) {
200 DEBUGLOG("Exception when parsing IXFR TCP Query to DNS: " << e
.what());
201 /* ponder what to do here, shall we close the connection? */
207 static void editPayloadID(PacketBuffer
& payload
, uint16_t newId
, size_t proxyProtocolPayloadSize
, bool sizePrepended
)
209 /* we cannot do a direct cast as the alignment might be off (the size of the payload might have been prepended, which is bad enough,
210 but we might also have a proxy protocol payload */
211 size_t startOfHeaderOffset
= (sizePrepended
? sizeof(uint16_t) : 0) + proxyProtocolPayloadSize
;
212 if (payload
.size() < startOfHeaderOffset
+ sizeof(dnsheader
)) {
213 throw std::runtime_error("Invalid buffer for outgoing TCP query (size " + std::to_string(payload
.size()));
215 uint16_t id
= htons(newId
);
216 memcpy(&payload
.at(startOfHeaderOffset
), &id
, sizeof(id
));
219 enum class QueryState
: uint8_t {
224 enum class ConnectionState
: uint8_t {
229 static void prepareQueryForSending(TCPQuery
& query
, uint16_t id
, QueryState queryState
, ConnectionState connectionState
)
231 if (connectionState
== ConnectionState::needProxy
) {
232 if (query
.d_proxyProtocolPayload
.size() > 0 && !query
.d_proxyProtocolPayloadAdded
) {
233 query
.d_buffer
.insert(query
.d_buffer
.begin(), query
.d_proxyProtocolPayload
.begin(), query
.d_proxyProtocolPayload
.end());
234 query
.d_proxyProtocolPayloadAdded
= true;
235 query
.d_idstate
.d_proxyProtocolPayloadSize
= query
.d_proxyProtocolPayload
.size();
238 else if (connectionState
== ConnectionState::proxySent
) {
239 if (query
.d_proxyProtocolPayloadAdded
) {
240 if (query
.d_buffer
.size() < query
.d_idstate
.d_proxyProtocolPayloadSize
) {
241 throw std::runtime_error("Trying to remove a proxy protocol payload of size " + std::to_string(query
.d_proxyProtocolPayload
.size()) + " from a buffer of size " + std::to_string(query
.d_buffer
.size()));
243 // NOLINTNEXTLINE(*-narrowing-conversions): the size of the payload is limited to 2^16-1
244 query
.d_buffer
.erase(query
.d_buffer
.begin(), query
.d_buffer
.begin() + static_cast<ssize_t
>(query
.d_idstate
.d_proxyProtocolPayloadSize
));
245 query
.d_proxyProtocolPayloadAdded
= false;
246 query
.d_idstate
.d_proxyProtocolPayloadSize
= 0;
249 if (query
.d_idstate
.qclass
== QClass::IN
&& query
.d_idstate
.qtype
== QType::IXFR
) {
250 getSerialFromIXFRQuery(query
);
253 editPayloadID(query
.d_buffer
, id
, query
.d_proxyProtocolPayloadAdded
? query
.d_idstate
.d_proxyProtocolPayloadSize
: 0, true);
256 IOState
TCPConnectionToBackend::queueNextQuery(std::shared_ptr
<TCPConnectionToBackend
>& conn
)
258 conn
->d_currentQuery
= std::move(conn
->d_pendingQueries
.front());
260 uint16_t id
= conn
->d_highestStreamID
;
261 prepareQueryForSending(conn
->d_currentQuery
.d_query
, id
, QueryState::hasSizePrepended
, conn
->needProxyProtocolPayload() ? ConnectionState::needProxy
: ConnectionState::proxySent
);
263 conn
->d_pendingQueries
.pop_front();
264 conn
->d_state
= State::sendingQueryToBackend
;
265 conn
->d_currentPos
= 0;
267 return IOState::NeedWrite
;
270 IOState
TCPConnectionToBackend::sendQuery(std::shared_ptr
<TCPConnectionToBackend
>& conn
, const struct timeval
& now
)
272 DEBUGLOG("sending query to backend "<<conn
->getDS()->getNameWithAddr()<<" over FD "<<conn
->d_handler
->getDescriptor());
274 IOState state
= conn
->d_handler
->tryWrite(conn
->d_currentQuery
.d_query
.d_buffer
, conn
->d_currentPos
, conn
->d_currentQuery
.d_query
.d_buffer
.size());
276 if (state
!= IOState::Done
) {
280 DEBUGLOG("query sent to backend");
282 if (conn
->d_currentQuery
.d_query
.d_proxyProtocolPayloadAdded
) {
283 conn
->d_proxyProtocolPayloadSent
= true;
286 conn
->d_currentPos
= 0;
288 DEBUGLOG("adding a pending response for ID "<<conn
->d_highestStreamID
<<" and QNAME "<<conn
->d_currentQuery
.d_query
.d_idstate
.qname
);
289 auto res
= conn
->d_pendingResponses
.insert({conn
->d_highestStreamID
, std::move(conn
->d_currentQuery
)});
290 /* if there was already a pending response with that ID, we messed up and we don't expect more
293 ++conn
->d_ds
->outstanding
;
295 ++conn
->d_highestStreamID
;
296 conn
->d_currentQuery
.d_sender
.reset();
297 conn
->d_currentQuery
.d_query
.d_buffer
.clear();
302 void TCPConnectionToBackend::handleIO(std::shared_ptr
<TCPConnectionToBackend
>& conn
, const struct timeval
& now
)
304 if (conn
->d_handler
== nullptr) {
305 throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__
) + "!");
308 bool connectionDied
= false;
309 IOState iostate
= IOState::Done
;
310 IOStateGuard
ioGuard(conn
->d_ioState
);
311 bool reconnected
= false;
317 if (conn
->d_state
== State::sendingQueryToBackend
) {
318 iostate
= sendQuery(conn
, now
);
320 while (iostate
== IOState::Done
&& !conn
->d_pendingQueries
.empty()) {
321 queueNextQuery(conn
);
322 iostate
= sendQuery(conn
, now
);
325 if (iostate
== IOState::Done
&& conn
->d_pendingQueries
.empty()) {
326 conn
->d_state
= State::waitingForResponseFromBackend
;
327 conn
->d_currentPos
= 0;
328 conn
->d_responseBuffer
.resize(sizeof(uint16_t));
329 iostate
= IOState::NeedRead
;
333 if (conn
->d_state
== State::waitingForResponseFromBackend
||
334 conn
->d_state
== State::readingResponseSizeFromBackend
) {
335 DEBUGLOG("reading response size from backend");
336 // then we need to allocate a new buffer (new because we might need to re-send the query if the
337 // backend dies on us)
338 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
339 conn
->d_responseBuffer
.resize(sizeof(uint16_t));
340 iostate
= conn
->d_handler
->tryRead(conn
->d_responseBuffer
, conn
->d_currentPos
, sizeof(uint16_t));
341 if (iostate
== IOState::Done
) {
342 DEBUGLOG("got response size from backend");
343 conn
->d_state
= State::readingResponseFromBackend
;
344 conn
->d_responseSize
= conn
->d_responseBuffer
.at(0) * 256 + conn
->d_responseBuffer
.at(1);
345 conn
->d_responseBuffer
.reserve(conn
->d_responseSize
+ /* we will need to prepend the size later */ 2);
346 conn
->d_responseBuffer
.resize(conn
->d_responseSize
);
347 conn
->d_currentPos
= 0;
348 conn
->d_lastDataReceivedTime
= now
;
350 else if (conn
->d_state
== State::waitingForResponseFromBackend
&& conn
->d_currentPos
> 0) {
351 conn
->d_state
= State::readingResponseSizeFromBackend
;
355 if (conn
->d_state
== State::readingResponseFromBackend
) {
356 DEBUGLOG("reading response from backend");
357 iostate
= conn
->d_handler
->tryRead(conn
->d_responseBuffer
, conn
->d_currentPos
, conn
->d_responseSize
);
358 if (iostate
== IOState::Done
) {
359 DEBUGLOG("got response from backend");
361 conn
->d_lastDataReceivedTime
= now
;
362 iostate
= conn
->handleResponse(conn
, now
);
364 catch (const std::exception
& e
) {
365 vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn
->d_ds
? conn
->d_ds
->getNameWithAddr() : "unknown", conn
->d_currentQuery
.d_query
.d_idstate
.origRemote
.toStringWithPort(), e
.what());
373 if (conn
->d_state
!= State::idle
&&
374 conn
->d_state
!= State::sendingQueryToBackend
&&
375 conn
->d_state
!= State::waitingForResponseFromBackend
&&
376 conn
->d_state
!= State::readingResponseSizeFromBackend
&&
377 conn
->d_state
!= State::readingResponseFromBackend
) {
378 vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn
->d_state
));
381 catch (const std::exception
& e
) {
382 /* most likely an EOF because the other end closed the connection,
383 but it might also be a real IO error or something else.
384 Let's just drop the connection
386 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn
->d_state
== State::sendingQueryToBackend
? "writing to" : "reading from"), conn
->d_currentQuery
.d_query
.d_idstate
.origRemote
.toStringWithPort(), e
.what());
388 if (conn
->d_state
== State::sendingQueryToBackend
) {
389 ++conn
->d_ds
->tcpDiedSendingQuery
;
391 else if (conn
->d_state
!= State::idle
) {
392 ++conn
->d_ds
->tcpDiedReadingResponse
;
395 /* don't increase this counter when reusing connections */
397 ++conn
->d_downstreamFailures
;
400 /* remove this FD from the IO multiplexer */
401 iostate
= IOState::Done
;
402 connectionDied
= true;
405 if (connectionDied
) {
407 DEBUGLOG("connection died, number of failures is "<<conn
->d_downstreamFailures
<<", retries is "<<conn
->d_ds
->d_config
.d_retries
);
409 if (conn
->d_downstreamFailures
< conn
->d_ds
->d_config
.d_retries
) {
411 conn
->d_ioState
.reset();
415 if (conn
->reconnect()) {
416 conn
->d_ioState
= make_unique
<IOStateHandler
>(*conn
->d_mplexer
, conn
->d_handler
->getDescriptor());
418 /* we need to resend the queries that were in flight, if any */
419 if (conn
->d_state
== State::sendingQueryToBackend
) {
420 /* we need to edit this query so it has the correct ID */
421 auto query
= std::move(conn
->d_currentQuery
);
422 uint16_t id
= conn
->d_highestStreamID
;
423 prepareQueryForSending(query
.d_query
, id
, QueryState::hasSizePrepended
, ConnectionState::needProxy
);
424 conn
->d_currentQuery
= std::move(query
);
427 /* if we notify the sender it might terminate us so we need to move these first */
428 auto pendingResponses
= std::move(conn
->d_pendingResponses
);
429 conn
->d_pendingResponses
.clear();
430 for (auto& pending
: pendingResponses
) {
431 --conn
->d_ds
->outstanding
;
433 if (pending
.second
.d_query
.isXFR() && pending
.second
.d_query
.d_xfrStarted
) {
434 /* this one can't be restarted, sorry */
435 DEBUGLOG("A XFR for which a response has already been sent cannot be restarted");
437 TCPResponse
response(std::move(pending
.second
.d_query
));
438 pending
.second
.d_sender
->notifyIOError(now
, std::move(response
));
440 catch (const std::exception
& e
) {
441 vinfolog("Got an exception while notifying: %s", e
.what());
444 vinfolog("Got exception while notifying");
448 conn
->d_pendingQueries
.push_back(std::move(pending
.second
));
451 conn
->d_currentPos
= 0;
453 if (conn
->d_state
== State::sendingQueryToBackend
) {
454 iostate
= IOState::NeedWrite
;
455 // resume sending query
458 if (conn
->d_pendingQueries
.empty()) {
459 throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn
->d_state
) + " with no pending queries");
462 iostate
= queueNextQuery(conn
);
466 connectionDied
= false;
469 catch (const std::exception
& e
) {
470 // reconnect might throw on failure, let's ignore that, we just need to know
476 /* reconnect failed, we give up */
477 DEBUGLOG("reconnect failed, we give up");
478 ++conn
->d_ds
->tcpGaveUp
;
479 conn
->notifyAllQueriesFailed(now
, FailureReason::gaveUp
);
483 if (conn
->d_ioState
) {
484 if (iostate
== IOState::Done
) {
485 conn
->d_ioState
->update(iostate
, handleIOCallback
, conn
);
488 boost::optional
<struct timeval
> ttd
{boost::none
};
489 if (iostate
== IOState::NeedRead
) {
490 ttd
= conn
->getBackendReadTTD(now
);
492 else if (conn
->isFresh() && conn
->d_queries
== 0) {
493 /* first write just after the non-blocking connect */
494 ttd
= conn
->getBackendConnectTTD(now
);
497 ttd
= conn
->getBackendWriteTTD(now
);
500 conn
->d_ioState
->update(iostate
, handleIOCallback
, conn
, ttd
);
509 void TCPConnectionToBackend::handleIOCallback(int fd
, FDMultiplexer::funcparam_t
& param
)
511 auto conn
= boost::any_cast
<std::shared_ptr
<TCPConnectionToBackend
>>(param
);
512 if (fd
!= conn
->getHandle()) {
513 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd
) + " received in " + std::string(__PRETTY_FUNCTION__
) + ", expected " + std::to_string(conn
->getHandle()));
517 gettimeofday(&now
, nullptr);
521 void TCPConnectionToBackend::queueQuery(std::shared_ptr
<TCPQuerySender
>& sender
, TCPQuery
&& query
)
524 d_ioState
= make_unique
<IOStateHandler
>(*d_mplexer
, d_handler
->getDescriptor());
527 // if we are not already sending a query or in the middle of reading a response (so idle),
528 // start sending the query
529 if (d_state
== State::idle
|| d_state
== State::waitingForResponseFromBackend
) {
530 DEBUGLOG("Sending new query to backend right away, with ID "<<d_highestStreamID
);
531 d_state
= State::sendingQueryToBackend
;
534 uint16_t id
= d_highestStreamID
;
536 d_currentQuery
= PendingRequest({sender
, std::move(query
)});
537 prepareQueryForSending(d_currentQuery
.d_query
, id
, QueryState::hasSizePrepended
, needProxyProtocolPayload() ? ConnectionState::needProxy
: ConnectionState::proxySent
);
540 gettimeofday(&now
, 0);
542 auto shared
= std::dynamic_pointer_cast
<TCPConnectionToBackend
>(shared_from_this());
543 handleIO(shared
, now
);
546 DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state
);
547 // store query in the list of queries to send
548 d_pendingQueries
.push_back(PendingRequest({sender
, std::move(query
)}));
552 void TCPConnectionToBackend::handleTimeout(const struct timeval
& now
, bool write
)
554 /* in some cases we could retry, here, reconnecting and sending our pending responses again */
556 if (isFresh() && d_queries
== 0) {
557 ++d_ds
->tcpConnectTimeouts
;
558 vinfolog("Timeout while connecting to TCP backend %s", d_ds
->getNameWithAddr());
561 ++d_ds
->tcpWriteTimeouts
;
562 vinfolog("Timeout while writing to TCP backend %s", d_ds
->getNameWithAddr());
566 ++d_ds
->tcpReadTimeouts
;
567 vinfolog("Timeout while reading from TCP backend %s", d_ds
->getNameWithAddr());
571 notifyAllQueriesFailed(now
, FailureReason::timeout
);
573 catch (const std::exception
& e
) {
574 vinfolog("Got an exception while notifying a timeout: %s", e
.what());
577 vinfolog("Got exception while notifying a timeout");
583 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval
& now
, FailureReason reason
)
585 d_connectionDied
= true;
586 d_ds
->reportTimeoutOrError();
588 /* we might be terminated while notifying a query sender */
589 d_ds
->outstanding
-= d_pendingResponses
.size();
590 auto pendingQueries
= std::move(d_pendingQueries
);
591 d_pendingQueries
.clear();
592 auto pendingResponses
= std::move(d_pendingResponses
);
593 d_pendingResponses
.clear();
595 auto increaseCounters
= [reason
](const ClientState
* cs
) {
596 if (reason
== FailureReason::timeout
) {
598 ++cs
->tcpDownstreamTimeouts
;
601 else if (reason
== FailureReason::gaveUp
) {
609 if (d_state
== State::sendingQueryToBackend
) {
610 increaseCounters(d_currentQuery
.d_query
.d_idstate
.cs
);
611 auto sender
= d_currentQuery
.d_sender
;
612 if (sender
->active()) {
613 TCPResponse
response(std::move(d_currentQuery
.d_query
));
614 sender
->notifyIOError(now
, std::move(response
));
618 for (auto& query
: pendingQueries
) {
619 increaseCounters(query
.d_query
.d_idstate
.cs
);
620 auto sender
= query
.d_sender
;
621 if (sender
->active()) {
622 TCPResponse
response(std::move(query
.d_query
));
623 sender
->notifyIOError(now
, std::move(response
));
627 for (auto& response
: pendingResponses
) {
628 increaseCounters(response
.second
.d_query
.d_idstate
.cs
);
629 auto sender
= response
.second
.d_sender
;
630 if (sender
->active()) {
631 TCPResponse
tresp(std::move(response
.second
.d_query
));
632 sender
->notifyIOError(now
, std::move(tresp
));
636 catch (const std::exception
& e
) {
637 vinfolog("Got an exception while notifying: %s", e
.what());
640 vinfolog("Got exception while notifying");
646 IOState
TCPConnectionToBackend::handleResponse(std::shared_ptr
<TCPConnectionToBackend
>& conn
, const struct timeval
& now
)
648 d_downstreamFailures
= 0;
650 uint16_t queryId
= 0;
652 queryId
= getQueryIdFromResponse();
654 catch (const std::exception
& e
) {
655 DEBUGLOG("Unable to get query ID");
656 notifyAllQueriesFailed(now
, FailureReason::unexpectedQueryID
);
660 auto it
= d_pendingResponses
.find(queryId
);
661 if (it
== d_pendingResponses
.end()) {
662 DEBUGLOG("could not find any corresponding query for ID "<<queryId
<<". This is likely a duplicated ID over the same TCP connection, giving up!");
663 notifyAllQueriesFailed(now
, FailureReason::unexpectedQueryID
);
664 return IOState::Done
;
667 editPayloadID(d_responseBuffer
, ntohs(it
->second
.d_query
.d_idstate
.origID
), 0, false);
669 auto sender
= it
->second
.d_sender
;
671 if (sender
->active() && it
->second
.d_query
.isXFR()) {
674 TCPResponse response
;
675 response
.d_buffer
= std::move(d_responseBuffer
);
676 response
.d_connection
= conn
;
677 response
.d_ds
= conn
->d_ds
;
678 /* we don't move the whole IDS because we will need for the responses to come */
679 response
.d_idstate
.qtype
= it
->second
.d_query
.d_idstate
.qtype
;
680 response
.d_idstate
.qname
= it
->second
.d_query
.d_idstate
.qname
;
681 DEBUGLOG("passing XFRresponse to client connection for "<<response
.d_idstate
.qname
);
683 it
->second
.d_query
.d_xfrStarted
= true;
684 done
= isXFRFinished(response
, it
->second
.d_query
);
687 d_pendingResponses
.erase(it
);
688 --conn
->d_ds
->outstanding
;
689 /* marking as idle for now, so we can accept new queries if our queues are empty */
690 if (d_pendingQueries
.empty() && d_pendingResponses
.empty()) {
691 d_state
= State::idle
;
692 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
696 sender
->handleXFRResponse(now
, std::move(response
));
698 d_state
= State::idle
;
699 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
700 return IOState::Done
;
703 d_state
= State::waitingForResponseFromBackend
;
705 d_responseBuffer
.resize(sizeof(uint16_t));
706 // get ready to read the next packet, if any
707 return IOState::NeedRead
;
710 --conn
->d_ds
->outstanding
;
711 auto ids
= std::move(it
->second
.d_query
.d_idstate
);
712 const double udiff
= ids
.queryRealTime
.udiff();
713 conn
->d_ds
->updateTCPLatency(udiff
);
714 if (d_responseBuffer
.size() >= sizeof(dnsheader
)) {
716 memcpy(&dh
, d_responseBuffer
.data(), sizeof(dh
));
717 conn
->d_ds
->reportResponse(dh
.rcode
);
720 conn
->d_ds
->reportTimeoutOrError();
723 d_pendingResponses
.erase(it
);
724 /* marking as idle for now, so we can accept new queries if our queues are empty */
725 if (d_pendingQueries
.empty() && d_pendingResponses
.empty()) {
726 d_state
= State::idle
;
727 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
729 else if (!d_pendingResponses
.empty()) {
731 d_state
= State::waitingForResponseFromBackend
;
734 // be very careful that handleResponse() might trigger new queries being assigned to us,
735 // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
736 // anything without checking first
738 if (sender
->active()) {
739 DEBUGLOG("passing response to client connection for "<<ids
.qname
);
740 // make sure that we still exist after calling handleResponse()
741 TCPResponse
response(std::move(d_responseBuffer
), std::move(ids
), conn
, conn
->d_ds
);
742 sender
->handleResponse(now
, std::move(response
));
745 if (!d_pendingQueries
.empty()) {
746 DEBUGLOG("still have some queries to send");
747 return queueNextQuery(shared
);
749 else if (!d_pendingResponses
.empty()) {
750 DEBUGLOG("still have some responses to read");
751 return IOState::NeedRead
;
754 DEBUGLOG("nothing to do, waiting for a new query");
755 d_state
= State::idle
;
756 t_downstreamTCPConnectionsManager
.moveToIdle(conn
);
757 return IOState::Done
;
761 uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
763 if (d_responseBuffer
.size() < sizeof(dnsheader
)) {
764 throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer
.size()) + ") response from " + d_ds
->getNameWithAddr());
768 memcpy(&id
, &d_responseBuffer
.at(0), sizeof(id
));
772 void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr
<std::vector
<ProxyProtocolValue
>>&& proxyProtocolValuesSent
)
774 /* if we already have some values, we have already verified they match */
775 if (!d_proxyProtocolValuesSent
) {
776 d_proxyProtocolValuesSent
= std::move(proxyProtocolValuesSent
);
780 bool TCPConnectionToBackend::matchesTLVs(const std::unique_ptr
<std::vector
<ProxyProtocolValue
>>& tlvs
) const
782 if (tlvs
== nullptr) {
783 if (d_proxyProtocolValuesSent
== nullptr) {
791 if (d_proxyProtocolValuesSent
== nullptr) {
795 return *tlvs
== *d_proxyProtocolValuesSent
;
798 bool TCPConnectionToBackend::isXFRFinished(const TCPResponse
& response
, TCPQuery
& query
)
803 MOADNSParser
parser(true, reinterpret_cast<const char*>(response
.d_buffer
.data()), response
.d_buffer
.size());
805 if (parser
.d_header
.rcode
!= 0U) {
809 for (const auto& record
: parser
.d_answers
) {
810 if (record
.first
.d_class
!= QClass::IN
|| record
.first
.d_type
!= QType::SOA
) {
814 auto unknownContent
= getRR
<UnknownRecordContent
>(record
.first
);
815 if (!unknownContent
) {
818 auto raw
= unknownContent
->getRawContent();
819 auto serial
= getSerialFromRawSOAContent(raw
);
820 if (query
.d_xfrMasterSerial
== 0) {
821 // store the first SOA in our client's connection metadata
822 query
.d_xfrMasterSerial
= serial
;
823 if (query
.d_idstate
.qtype
== QType::IXFR
&& (query
.d_xfrMasterSerial
== query
.d_ixfrQuerySerial
|| rfc1982LessThan(query
.d_xfrMasterSerial
, query
.d_ixfrQuerySerial
))) {
824 /* This is the first message with a master SOA:
826 If an IXFR query with the same or newer version number
827 than that of the server is received, it is replied to
828 with a single SOA record of the server's current version.
835 ++query
.d_xfrSerialCount
;
836 if (serial
== query
.d_xfrMasterSerial
) {
837 ++query
.d_xfrMasterSerialCount
;
838 // figure out if it's end when receiving master's SOA again
839 if (query
.d_xfrSerialCount
== 2) {
840 // if there are only two SOA records marks a finished AXFR
844 if (query
.d_xfrMasterSerialCount
== 3) {
845 // receiving master's SOA 3 times marks a finished IXFR
853 catch (const MOADNSException
& e
) {
854 DEBUGLOG("Exception when parsing TCPResponse to DNS: " << e
.what());
855 /* ponder what to do here, shall we close the connection? */
860 void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max
)
862 DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max
);
865 void setTCPDownstreamCleanupInterval(uint64_t interval
)
867 DownstreamTCPConnectionsManager::setCleanupInterval(interval
);
870 void setTCPDownstreamMaxIdleTime(uint64_t max
)
872 DownstreamTCPConnectionsManager::setMaxIdleTime(max
);