]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/dnsdist-tcp-downstream.cc
rec: Update new b-root-server.net addresses in built-in hints.
[thirdparty/pdns.git] / pdns / dnsdistdist / dnsdist-tcp-downstream.cc
2 #include "dnsdist-session-cache.hh"
3 #include "dnsdist-tcp-downstream.hh"
4 #include "dnsdist-tcp-upstream.hh"
5 #include "dnsdist-downstream-connection.hh"
7 #include "dnsparser.hh"
9 thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
11 ConnectionToBackend::~ConnectionToBackend()
12 {
13 if (d_ds && d_handler) {
14 --d_ds->tcpCurrentConnections;
15 struct timeval now;
16 gettimeofday(&now, nullptr);
18 if (d_handler->isTLS()) {
19 if (d_handler->hasTLSSessionBeenResumed()) {
20 ++d_ds->tlsResumptions;
21 }
22 try {
23 auto sessions = d_handler->getTLSSessions();
24 if (!sessions.empty()) {
25 g_sessionCache.putSessions(d_ds->getID(), now.tv_sec, std::move(sessions));
26 }
27 }
28 catch (const std::exception& e) {
29 vinfolog("Unable to get a TLS session: %s", e.what());
30 }
31 }
32 auto diff = now - d_connectionStartTime;
33 // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
34 d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
35 }
36 }
38 bool ConnectionToBackend::reconnect()
39 {
40 std::unique_ptr<TLSSession> tlsSession{nullptr};
41 if (d_handler) {
42 DEBUGLOG("closing socket "<<d_handler->getDescriptor());
43 if (d_handler->isTLS()) {
44 if (d_handler->hasTLSSessionBeenResumed()) {
45 ++d_ds->tlsResumptions;
46 }
47 try {
48 auto sessions = d_handler->getTLSSessions();
49 if (!sessions.empty()) {
50 tlsSession = std::move(sessions.back());
51 sessions.pop_back();
52 if (!sessions.empty()) {
53 g_sessionCache.putSessions(d_ds->getID(), time(nullptr), std::move(sessions));
54 }
55 }
56 }
57 catch (const std::exception& e) {
58 vinfolog("Unable to get a TLS session to resume: %s", e.what());
59 }
60 }
61 d_handler->close();
62 d_ioState.reset();
63 d_handler.reset();
64 --d_ds->tcpCurrentConnections;
65 }
67 d_fresh = true;
68 d_highestStreamID = 0;
69 d_proxyProtocolPayloadSent = false;
71 do {
72 DEBUGLOG("TCP connecting to downstream "<<d_ds->getNameWithAddr()<<" ("<<d_downstreamFailures<<")");
73 DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
74 ++d_ds->tcpNewConnections;
75 try {
76 auto socket = Socket(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
77 DEBUGLOG("result of socket() is "<<socket.getHandle());
79 /* disable NAGLE, which does not play nicely with delayed ACKs.
80 In theory we could be wasting up to 500 milliseconds waiting for
81 the other end to acknowledge our initial packet before we could
82 send the rest. */
83 setTCPNoDelay(socket.getHandle());
86 if (!d_ds->d_config.sourceItfName.empty()) {
87 int res = setsockopt(socket.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
88 if (res != 0) {
89 vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror());
90 }
91 }
92 #endif
94 if (!IsAnyAddress(d_ds->d_config.sourceAddr)) {
95 SSetsockopt(socket.getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
97 if (d_ds->d_config.ipBindAddrNoPort) {
98 SSetsockopt(socket.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
99 }
100 #endif
101 socket.bind(d_ds->d_config.sourceAddr, false);
102 }
103 socket.setNonBlocking();
105 gettimeofday(&d_connectionStartTime, nullptr);
106 auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, d_ds->d_config.d_tlsSubjectIsAddr, socket.releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx);
107 if (!tlsSession && d_ds->d_tlsCtx) {
108 tlsSession = g_sessionCache.getSession(d_ds->getID(), d_connectionStartTime.tv_sec);
109 }
110 if (tlsSession) {
111 handler->setTLSSession(tlsSession);
112 }
113 handler->tryConnect(d_ds->d_config.tcpFastOpen && isFastOpenEnabled(), d_ds->d_config.remote);
114 d_queries = 0;
116 d_handler = std::move(handler);
117 d_ds->incCurrentConnectionsCount();
118 return true;
119 }
120 catch (const std::runtime_error& e) {
121 vinfolog("Connection to downstream server %s failed: %s", d_ds->getNameWithAddr(), e.what());
122 d_downstreamFailures++;
123 if (d_downstreamFailures >= d_ds->d_config.d_retries) {
124 throw;
125 }
126 }
127 }
128 while (d_downstreamFailures < d_ds->d_config.d_retries);
130 return false;
131 }
133 TCPConnectionToBackend::~TCPConnectionToBackend()
134 {
135 if (d_ds && !d_pendingResponses.empty()) {
136 d_ds->outstanding -= d_pendingResponses.size();
137 }
138 }
140 void TCPConnectionToBackend::release(){
141 d_ds->outstanding -= d_pendingResponses.size();
143 d_pendingResponses.clear();
144 d_pendingQueries.clear();
146 if (d_ioState) {
147 d_ioState.reset();
148 }
149 }
151 static uint32_t getSerialFromRawSOAContent(const std::vector<uint8_t>& raw)
152 {
153 /* minimal size for a SOA record, as defined by rfc1035:
154 MNAME (root): 1
155 RNAME (root): 1
156 SERIAL: 4
157 REFRESH: 4
158 RETRY: 4
159 EXPIRE: 4
160 MINIMUM: 4
161 = 22 bytes
162 */
163 if (raw.size() < 22) {
164 throw std::runtime_error("Invalid content of size " + std::to_string(raw.size()) + " for a SOA record");
165 }
166 /* As rfc1025 states that "all domain names in the RDATA section of these RRs may be compressed",
167 and we don't want to parse these names, start at the end */
168 uint32_t serial = 0;
169 memcpy(&serial, &raw.at(raw.size() - 20), sizeof(serial));
170 return ntohl(serial);
171 }
173 static bool getSerialFromIXFRQuery(TCPQuery& query)
174 {
175 try {
176 size_t proxyPayloadSize = query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0;
177 if (query.d_buffer.size() <= (proxyPayloadSize + sizeof(uint16_t))) {
178 return false;
179 }
181 size_t payloadSize = query.d_buffer.size() - sizeof(uint16_t) - proxyPayloadSize;
183 MOADNSParser parser(true, reinterpret_cast<const char*>(query.d_buffer.data() + sizeof(uint16_t) + proxyPayloadSize), payloadSize);
185 for (const auto& record : parser.d_answers) {
186 if (record.first.d_place != DNSResourceRecord::AUTHORITY || record.first.d_class != QClass::IN || record.first.d_type != QType::SOA) {
187 return false;
188 }
190 auto unknownContent = getRR<UnknownRecordContent>(record.first);
191 if (!unknownContent) {
192 return false;
193 }
194 auto raw = unknownContent->getRawContent();
195 query.d_ixfrQuerySerial = getSerialFromRawSOAContent(raw);
196 return true;
197 }
198 }
199 catch (const MOADNSException& e) {
200 DEBUGLOG("Exception when parsing IXFR TCP Query to DNS: " << e.what());
201 /* ponder what to do here, shall we close the connection? */
202 }
204 return false;
205 }
207 static void editPayloadID(PacketBuffer& payload, uint16_t newId, size_t proxyProtocolPayloadSize, bool sizePrepended)
208 {
209 /* we cannot do a direct cast as the alignment might be off (the size of the payload might have been prepended, which is bad enough,
210 but we might also have a proxy protocol payload */
211 size_t startOfHeaderOffset = (sizePrepended ? sizeof(uint16_t) : 0) + proxyProtocolPayloadSize;
212 if (payload.size() < startOfHeaderOffset + sizeof(dnsheader)) {
213 throw std::runtime_error("Invalid buffer for outgoing TCP query (size " + std::to_string(payload.size()));
214 }
215 uint16_t id = htons(newId);
216 memcpy(&payload.at(startOfHeaderOffset), &id, sizeof(id));
217 }
219 enum class QueryState : uint8_t {
220 hasSizePrepended,
221 noSize
222 };
224 enum class ConnectionState : uint8_t {
225 needProxy,
226 proxySent
227 };
229 static void prepareQueryForSending(TCPQuery& query, uint16_t id, QueryState queryState, ConnectionState connectionState)
230 {
231 if (connectionState == ConnectionState::needProxy) {
232 if (query.d_proxyProtocolPayload.size() > 0 && !query.d_proxyProtocolPayloadAdded) {
233 query.d_buffer.insert(query.d_buffer.begin(), query.d_proxyProtocolPayload.begin(), query.d_proxyProtocolPayload.end());
234 query.d_proxyProtocolPayloadAdded = true;
235 query.d_idstate.d_proxyProtocolPayloadSize = query.d_proxyProtocolPayload.size();
236 }
237 }
238 else if (connectionState == ConnectionState::proxySent) {
239 if (query.d_proxyProtocolPayloadAdded) {
240 if (query.d_buffer.size() < query.d_idstate.d_proxyProtocolPayloadSize) {
241 throw std::runtime_error("Trying to remove a proxy protocol payload of size " + std::to_string(query.d_proxyProtocolPayload.size()) + " from a buffer of size " + std::to_string(query.d_buffer.size()));
242 }
243 // NOLINTNEXTLINE(*-narrowing-conversions): the size of the payload is limited to 2^16-1
244 query.d_buffer.erase(query.d_buffer.begin(), query.d_buffer.begin() + static_cast<ssize_t>(query.d_idstate.d_proxyProtocolPayloadSize));
245 query.d_proxyProtocolPayloadAdded = false;
246 query.d_idstate.d_proxyProtocolPayloadSize = 0;
247 }
248 }
249 if (query.d_idstate.qclass == QClass::IN && query.d_idstate.qtype == QType::IXFR) {
250 getSerialFromIXFRQuery(query);
251 }
253 editPayloadID(query.d_buffer, id, query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0, true);
254 }
256 IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
257 {
258 conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
260 uint16_t id = conn->d_highestStreamID;
261 prepareQueryForSending(conn->d_currentQuery.d_query, id, QueryState::hasSizePrepended, conn->needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
263 conn->d_pendingQueries.pop_front();
264 conn->d_state = State::sendingQueryToBackend;
265 conn->d_currentPos = 0;
267 return IOState::NeedWrite;
268 }
270 IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
271 {
272 DEBUGLOG("sending query to backend "<<conn->getDS()->getNameWithAddr()<<" over FD "<<conn->d_handler->getDescriptor());
274 IOState state = conn->d_handler->tryWrite(conn->d_currentQuery.d_query.d_buffer, conn->d_currentPos, conn->d_currentQuery.d_query.d_buffer.size());
276 if (state != IOState::Done) {
277 return state;
278 }
280 DEBUGLOG("query sent to backend");
281 /* request sent ! */
282 if (conn->d_currentQuery.d_query.d_proxyProtocolPayloadAdded) {
283 conn->d_proxyProtocolPayloadSent = true;
284 }
285 ++conn->d_queries;
286 conn->d_currentPos = 0;
288 DEBUGLOG("adding a pending response for ID "<<conn->d_highestStreamID<<" and QNAME "<<conn->d_currentQuery.d_query.d_idstate.qname);
289 auto res = conn->d_pendingResponses.insert({conn->d_highestStreamID, std::move(conn->d_currentQuery)});
290 /* if there was already a pending response with that ID, we messed up and we don't expect more
291 than one response */
292 if (res.second) {
293 ++conn->d_ds->outstanding;
294 }
295 ++conn->d_highestStreamID;
296 conn->d_currentQuery.d_sender.reset();
297 conn->d_currentQuery.d_query.d_buffer.clear();
299 return state;
300 }
302 void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
303 {
304 if (conn->d_handler == nullptr) {
305 throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
306 }
308 bool connectionDied = false;
309 IOState iostate = IOState::Done;
310 IOStateGuard ioGuard(conn->d_ioState);
311 bool reconnected = false;
313 do {
314 reconnected = false;
316 try {
317 if (conn->d_state == State::sendingQueryToBackend) {
318 iostate = sendQuery(conn, now);
320 while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
321 queueNextQuery(conn);
322 iostate = sendQuery(conn, now);
323 }
325 if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
326 conn->d_state = State::waitingForResponseFromBackend;
327 conn->d_currentPos = 0;
328 conn->d_responseBuffer.resize(sizeof(uint16_t));
329 iostate = IOState::NeedRead;
330 }
331 }
333 if (conn->d_state == State::waitingForResponseFromBackend ||
334 conn->d_state == State::readingResponseSizeFromBackend) {
335 DEBUGLOG("reading response size from backend");
336 // then we need to allocate a new buffer (new because we might need to re-send the query if the
337 // backend dies on us)
338 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
339 conn->d_responseBuffer.resize(sizeof(uint16_t));
340 iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t));
341 if (iostate == IOState::Done) {
342 DEBUGLOG("got response size from backend");
343 conn->d_state = State::readingResponseFromBackend;
344 conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
345 conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
346 conn->d_responseBuffer.resize(conn->d_responseSize);
347 conn->d_currentPos = 0;
348 conn->d_lastDataReceivedTime = now;
349 }
350 else if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
351 conn->d_state = State::readingResponseSizeFromBackend;
352 }
353 }
355 if (conn->d_state == State::readingResponseFromBackend) {
356 DEBUGLOG("reading response from backend");
357 iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize);
358 if (iostate == IOState::Done) {
359 DEBUGLOG("got response from backend");
360 try {
361 conn->d_lastDataReceivedTime = now;
362 iostate = conn->handleResponse(conn, now);
363 }
364 catch (const std::exception& e) {
365 vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getNameWithAddr() : "unknown", conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
366 ioGuard.release();
367 conn->release();
368 return;
369 }
370 }
371 }
373 if (conn->d_state != State::idle &&
374 conn->d_state != State::sendingQueryToBackend &&
375 conn->d_state != State::waitingForResponseFromBackend &&
376 conn->d_state != State::readingResponseSizeFromBackend &&
377 conn->d_state != State::readingResponseFromBackend) {
378 vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
379 }
380 }
381 catch (const std::exception& e) {
382 /* most likely an EOF because the other end closed the connection,
383 but it might also be a real IO error or something else.
384 Let's just drop the connection
385 */
386 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_state == State::sendingQueryToBackend ? "writing to" : "reading from"), conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
388 if (conn->d_state == State::sendingQueryToBackend) {
389 ++conn->d_ds->tcpDiedSendingQuery;
390 }
391 else if (conn->d_state != State::idle) {
392 ++conn->d_ds->tcpDiedReadingResponse;
393 }
395 /* don't increase this counter when reusing connections */
396 if (conn->d_fresh) {
397 ++conn->d_downstreamFailures;
398 }
400 /* remove this FD from the IO multiplexer */
401 iostate = IOState::Done;
402 connectionDied = true;
403 }
405 if (connectionDied) {
407 DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->d_config.d_retries);
409 if (conn->d_downstreamFailures < conn->d_ds->d_config.d_retries) {
411 conn->d_ioState.reset();
412 ioGuard.release();
414 try {
415 if (conn->reconnect()) {
416 conn->d_ioState = make_unique<IOStateHandler>(*conn->d_mplexer, conn->d_handler->getDescriptor());
418 /* we need to resend the queries that were in flight, if any */
419 if (conn->d_state == State::sendingQueryToBackend) {
420 /* we need to edit this query so it has the correct ID */
421 auto query = std::move(conn->d_currentQuery);
422 uint16_t id = conn->d_highestStreamID;
423 prepareQueryForSending(query.d_query, id, QueryState::hasSizePrepended, ConnectionState::needProxy);
424 conn->d_currentQuery = std::move(query);
425 }
427 /* if we notify the sender it might terminate us so we need to move these first */
428 auto pendingResponses = std::move(conn->d_pendingResponses);
429 conn->d_pendingResponses.clear();
430 for (auto& pending : pendingResponses) {
431 --conn->d_ds->outstanding;
433 if (pending.second.d_query.isXFR() && pending.second.d_query.d_xfrStarted) {
434 /* this one can't be restarted, sorry */
435 DEBUGLOG("A XFR for which a response has already been sent cannot be restarted");
436 try {
437 TCPResponse response(std::move(pending.second.d_query));
438 pending.second.d_sender->notifyIOError(now, std::move(response));
439 }
440 catch (const std::exception& e) {
441 vinfolog("Got an exception while notifying: %s", e.what());
442 }
443 catch (...) {
444 vinfolog("Got exception while notifying");
445 }
446 }
447 else {
448 conn->d_pendingQueries.push_back(std::move(pending.second));
449 }
450 }
451 conn->d_currentPos = 0;
453 if (conn->d_state == State::sendingQueryToBackend) {
454 iostate = IOState::NeedWrite;
455 // resume sending query
456 }
457 else {
458 if (conn->d_pendingQueries.empty()) {
459 throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
460 }
462 iostate = queueNextQuery(conn);
463 }
465 reconnected = true;
466 connectionDied = false;
467 }
468 }
469 catch (const std::exception& e) {
470 // reconnect might throw on failure, let's ignore that, we just need to know
471 // it failed
472 }
473 }
475 if (!reconnected) {
476 /* reconnect failed, we give up */
477 DEBUGLOG("reconnect failed, we give up");
478 ++conn->d_ds->tcpGaveUp;
479 conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
480 }
481 }
483 if (conn->d_ioState) {
484 if (iostate == IOState::Done) {
485 conn->d_ioState->update(iostate, handleIOCallback, conn);
486 }
487 else {
488 boost::optional<struct timeval> ttd{boost::none};
489 if (iostate == IOState::NeedRead) {
490 ttd = conn->getBackendReadTTD(now);
491 }
492 else if (conn->isFresh() && conn->d_queries == 0) {
493 /* first write just after the non-blocking connect */
494 ttd = conn->getBackendConnectTTD(now);
495 }
496 else {
497 ttd = conn->getBackendWriteTTD(now);
498 }
500 conn->d_ioState->update(iostate, handleIOCallback, conn, ttd);
501 }
502 }
503 }
504 while (reconnected);
506 ioGuard.release();
507 }
509 void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
510 {
511 auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
512 if (fd != conn->getHandle()) {
513 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
514 }
516 struct timeval now;
517 gettimeofday(&now, nullptr);
518 handleIO(conn, now);
519 }
521 void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
522 {
523 if (!d_ioState) {
524 d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
525 }
527 // if we are not already sending a query or in the middle of reading a response (so idle),
528 // start sending the query
529 if (d_state == State::idle || d_state == State::waitingForResponseFromBackend) {
530 DEBUGLOG("Sending new query to backend right away, with ID "<<d_highestStreamID);
531 d_state = State::sendingQueryToBackend;
532 d_currentPos = 0;
534 uint16_t id = d_highestStreamID;
536 d_currentQuery = PendingRequest({sender, std::move(query)});
537 prepareQueryForSending(d_currentQuery.d_query, id, QueryState::hasSizePrepended, needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
539 struct timeval now;
540 gettimeofday(&now, 0);
542 auto shared = std::dynamic_pointer_cast<TCPConnectionToBackend>(shared_from_this());
543 handleIO(shared, now);
544 }
545 else {
546 DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state);
547 // store query in the list of queries to send
548 d_pendingQueries.push_back(PendingRequest({sender, std::move(query)}));
549 }
550 }
552 void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
553 {
554 /* in some cases we could retry, here, reconnecting and sending our pending responses again */
555 if (write) {
556 if (isFresh() && d_queries == 0) {
557 ++d_ds->tcpConnectTimeouts;
558 vinfolog("Timeout while connecting to TCP backend %s", d_ds->getNameWithAddr());
559 }
560 else {
561 ++d_ds->tcpWriteTimeouts;
562 vinfolog("Timeout while writing to TCP backend %s", d_ds->getNameWithAddr());
563 }
564 }
565 else {
566 ++d_ds->tcpReadTimeouts;
567 vinfolog("Timeout while reading from TCP backend %s", d_ds->getNameWithAddr());
568 }
570 try {
571 notifyAllQueriesFailed(now, FailureReason::timeout);
572 }
573 catch (const std::exception& e) {
574 vinfolog("Got an exception while notifying a timeout: %s", e.what());
575 }
576 catch (...) {
577 vinfolog("Got exception while notifying a timeout");
578 }
580 release();
581 }
583 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, FailureReason reason)
584 {
585 d_connectionDied = true;
586 d_ds->reportTimeoutOrError();
588 /* we might be terminated while notifying a query sender */
589 d_ds->outstanding -= d_pendingResponses.size();
590 auto pendingQueries = std::move(d_pendingQueries);
591 d_pendingQueries.clear();
592 auto pendingResponses = std::move(d_pendingResponses);
593 d_pendingResponses.clear();
595 auto increaseCounters = [reason](const ClientState* cs) {
596 if (reason == FailureReason::timeout) {
597 if (cs) {
598 ++cs->tcpDownstreamTimeouts;
599 }
600 }
601 else if (reason == FailureReason::gaveUp) {
602 if (cs) {
603 ++cs->tcpGaveUp;
604 }
605 }
606 };
608 try {
609 if (d_state == State::sendingQueryToBackend) {
610 increaseCounters(d_currentQuery.d_query.d_idstate.cs);
611 auto sender = d_currentQuery.d_sender;
612 if (sender->active()) {
613 TCPResponse response(std::move(d_currentQuery.d_query));
614 sender->notifyIOError(now, std::move(response));
615 }
616 }
618 for (auto& query : pendingQueries) {
619 increaseCounters(query.d_query.d_idstate.cs);
620 auto sender = query.d_sender;
621 if (sender->active()) {
622 TCPResponse response(std::move(query.d_query));
623 sender->notifyIOError(now, std::move(response));
624 }
625 }
627 for (auto& response : pendingResponses) {
628 increaseCounters(response.second.d_query.d_idstate.cs);
629 auto sender = response.second.d_sender;
630 if (sender->active()) {
631 TCPResponse tresp(std::move(response.second.d_query));
632 sender->notifyIOError(now, std::move(tresp));
633 }
634 }
635 }
636 catch (const std::exception& e) {
637 vinfolog("Got an exception while notifying: %s", e.what());
638 }
639 catch (...) {
640 vinfolog("Got exception while notifying");
641 }
643 release();
644 }
646 IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
647 {
648 d_downstreamFailures = 0;
650 uint16_t queryId = 0;
651 try {
652 queryId = getQueryIdFromResponse();
653 }
654 catch (const std::exception& e) {
655 DEBUGLOG("Unable to get query ID");
656 notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
657 throw;
658 }
660 auto it = d_pendingResponses.find(queryId);
661 if (it == d_pendingResponses.end()) {
662 DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
663 notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
664 return IOState::Done;
665 }
667 editPayloadID(d_responseBuffer, ntohs(it->second.d_query.d_idstate.origID), 0, false);
669 auto sender = it->second.d_sender;
671 if (sender->active() && it->second.d_query.isXFR()) {
673 bool done = false;
674 TCPResponse response;
675 response.d_buffer = std::move(d_responseBuffer);
676 response.d_connection = conn;
677 response.d_ds = conn->d_ds;
678 /* we don't move the whole IDS because we will need for the responses to come */
679 response.d_idstate.qtype = it->second.d_query.d_idstate.qtype;
680 response.d_idstate.qname = it->second.d_query.d_idstate.qname;
681 DEBUGLOG("passing XFRresponse to client connection for "<<response.d_idstate.qname);
683 it->second.d_query.d_xfrStarted = true;
684 done = isXFRFinished(response, it->second.d_query);
686 if (done) {
687 d_pendingResponses.erase(it);
688 --conn->d_ds->outstanding;
689 /* marking as idle for now, so we can accept new queries if our queues are empty */
690 if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
691 d_state = State::idle;
692 t_downstreamTCPConnectionsManager.moveToIdle(conn);
693 }
694 }
696 sender->handleXFRResponse(now, std::move(response));
697 if (done) {
698 d_state = State::idle;
699 t_downstreamTCPConnectionsManager.moveToIdle(conn);
700 return IOState::Done;
701 }
703 d_state = State::waitingForResponseFromBackend;
704 d_currentPos = 0;
705 d_responseBuffer.resize(sizeof(uint16_t));
706 // get ready to read the next packet, if any
707 return IOState::NeedRead;
708 }
710 --conn->d_ds->outstanding;
711 auto ids = std::move(it->second.d_query.d_idstate);
712 const double udiff = ids.queryRealTime.udiff();
713 conn->d_ds->updateTCPLatency(udiff);
714 if (d_responseBuffer.size() >= sizeof(dnsheader)) {
715 dnsheader dh;
716 memcpy(&dh, d_responseBuffer.data(), sizeof(dh));
717 conn->d_ds->reportResponse(dh.rcode);
718 }
719 else {
720 conn->d_ds->reportTimeoutOrError();
721 }
723 d_pendingResponses.erase(it);
724 /* marking as idle for now, so we can accept new queries if our queues are empty */
725 if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
726 d_state = State::idle;
727 t_downstreamTCPConnectionsManager.moveToIdle(conn);
728 }
729 else if (!d_pendingResponses.empty()) {
730 d_currentPos = 0;
731 d_state = State::waitingForResponseFromBackend;
732 }
734 // be very careful that handleResponse() might trigger new queries being assigned to us,
735 // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
736 // anything without checking first
737 auto shared = conn;
738 if (sender->active()) {
739 DEBUGLOG("passing response to client connection for "<<ids.qname);
740 // make sure that we still exist after calling handleResponse()
741 TCPResponse response(std::move(d_responseBuffer), std::move(ids), conn, conn->d_ds);
742 sender->handleResponse(now, std::move(response));
743 }
745 if (!d_pendingQueries.empty()) {
746 DEBUGLOG("still have some queries to send");
747 return queueNextQuery(shared);
748 }
749 else if (!d_pendingResponses.empty()) {
750 DEBUGLOG("still have some responses to read");
751 return IOState::NeedRead;
752 }
753 else {
754 DEBUGLOG("nothing to do, waiting for a new query");
755 d_state = State::idle;
756 t_downstreamTCPConnectionsManager.moveToIdle(conn);
757 return IOState::Done;
758 }
759 }
761 uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
762 {
763 if (d_responseBuffer.size() < sizeof(dnsheader)) {
764 throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer.size()) + ") response from " + d_ds->getNameWithAddr());
765 }
767 uint16_t id;
768 memcpy(&id, &d_responseBuffer.at(0), sizeof(id));
769 return ntohs(id);
770 }
772 void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent)
773 {
774 /* if we already have some values, we have already verified they match */
775 if (!d_proxyProtocolValuesSent) {
776 d_proxyProtocolValuesSent = std::move(proxyProtocolValuesSent);
777 }
778 }
780 bool TCPConnectionToBackend::matchesTLVs(const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) const
781 {
782 if (tlvs == nullptr) {
783 if (d_proxyProtocolValuesSent == nullptr) {
784 return true;
785 }
786 else {
787 return false;
788 }
789 }
791 if (d_proxyProtocolValuesSent == nullptr) {
792 return false;
793 }
795 return *tlvs == *d_proxyProtocolValuesSent;
796 }
798 bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery& query)
799 {
800 bool done = false;
802 try {
803 MOADNSParser parser(true, reinterpret_cast<const char*>(response.d_buffer.data()), response.d_buffer.size());
805 if (parser.d_header.rcode != 0U) {
806 done = true;
807 }
808 else {
809 for (const auto& record : parser.d_answers) {
810 if (record.first.d_class != QClass::IN || record.first.d_type != QType::SOA) {
811 continue;
812 }
814 auto unknownContent = getRR<UnknownRecordContent>(record.first);
815 if (!unknownContent) {
816 continue;
817 }
818 auto raw = unknownContent->getRawContent();
819 auto serial = getSerialFromRawSOAContent(raw);
820 if (query.d_xfrMasterSerial == 0) {
821 // store the first SOA in our client's connection metadata
822 query.d_xfrMasterSerial = serial;
823 if (query.d_idstate.qtype == QType::IXFR && (query.d_xfrMasterSerial == query.d_ixfrQuerySerial || rfc1982LessThan(query.d_xfrMasterSerial, query.d_ixfrQuerySerial))) {
824 /* This is the first message with a master SOA:
825 RFC 1995 Section 2:
826 If an IXFR query with the same or newer version number
827 than that of the server is received, it is replied to
828 with a single SOA record of the server's current version.
829 */
830 done = true;
831 break;
832 }
833 }
835 ++query.d_xfrSerialCount;
836 if (serial == query.d_xfrMasterSerial) {
837 ++query.d_xfrMasterSerialCount;
838 // figure out if it's end when receiving master's SOA again
839 if (query.d_xfrSerialCount == 2) {
840 // if there are only two SOA records marks a finished AXFR
841 done = true;
842 break;
843 }
844 if (query.d_xfrMasterSerialCount == 3) {
845 // receiving master's SOA 3 times marks a finished IXFR
846 done = true;
847 break;
848 }
849 }
850 }
851 }
852 }
853 catch (const MOADNSException& e) {
854 DEBUGLOG("Exception when parsing TCPResponse to DNS: " << e.what());
855 /* ponder what to do here, shall we close the connection? */
856 }
857 return done;
858 }
860 void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max)
861 {
862 DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
863 }
865 void setTCPDownstreamCleanupInterval(uint64_t interval)
866 {
867 DownstreamTCPConnectionsManager::setCleanupInterval(interval);
868 }
870 void setTCPDownstreamMaxIdleTime(uint64_t max)
871 {
872 DownstreamTCPConnectionsManager::setMaxIdleTime(max);
873 }