]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/dnsdist-tcp-downstream.cc
rec: Update new b-root-server.net addresses in built-in hints.
[thirdparty/pdns.git] / pdns / dnsdistdist / dnsdist-tcp-downstream.cc
1
2 #include "dnsdist-session-cache.hh"
3 #include "dnsdist-tcp-downstream.hh"
4 #include "dnsdist-tcp-upstream.hh"
5 #include "dnsdist-downstream-connection.hh"
6
7 #include "dnsparser.hh"
8
9 thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
10
11 ConnectionToBackend::~ConnectionToBackend()
12 {
13 if (d_ds && d_handler) {
14 --d_ds->tcpCurrentConnections;
15 struct timeval now;
16 gettimeofday(&now, nullptr);
17
18 if (d_handler->isTLS()) {
19 if (d_handler->hasTLSSessionBeenResumed()) {
20 ++d_ds->tlsResumptions;
21 }
22 try {
23 auto sessions = d_handler->getTLSSessions();
24 if (!sessions.empty()) {
25 g_sessionCache.putSessions(d_ds->getID(), now.tv_sec, std::move(sessions));
26 }
27 }
28 catch (const std::exception& e) {
29 vinfolog("Unable to get a TLS session: %s", e.what());
30 }
31 }
32 auto diff = now - d_connectionStartTime;
33 // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
34 d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
35 }
36 }
37
38 bool ConnectionToBackend::reconnect()
39 {
40 std::unique_ptr<TLSSession> tlsSession{nullptr};
41 if (d_handler) {
42 DEBUGLOG("closing socket "<<d_handler->getDescriptor());
43 if (d_handler->isTLS()) {
44 if (d_handler->hasTLSSessionBeenResumed()) {
45 ++d_ds->tlsResumptions;
46 }
47 try {
48 auto sessions = d_handler->getTLSSessions();
49 if (!sessions.empty()) {
50 tlsSession = std::move(sessions.back());
51 sessions.pop_back();
52 if (!sessions.empty()) {
53 g_sessionCache.putSessions(d_ds->getID(), time(nullptr), std::move(sessions));
54 }
55 }
56 }
57 catch (const std::exception& e) {
58 vinfolog("Unable to get a TLS session to resume: %s", e.what());
59 }
60 }
61 d_handler->close();
62 d_ioState.reset();
63 d_handler.reset();
64 --d_ds->tcpCurrentConnections;
65 }
66
67 d_fresh = true;
68 d_highestStreamID = 0;
69 d_proxyProtocolPayloadSent = false;
70
71 do {
72 DEBUGLOG("TCP connecting to downstream "<<d_ds->getNameWithAddr()<<" ("<<d_downstreamFailures<<")");
73 DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
74 ++d_ds->tcpNewConnections;
75 try {
76 auto socket = Socket(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
77 DEBUGLOG("result of socket() is "<<socket.getHandle());
78
79 /* disable NAGLE, which does not play nicely with delayed ACKs.
80 In theory we could be wasting up to 500 milliseconds waiting for
81 the other end to acknowledge our initial packet before we could
82 send the rest. */
83 setTCPNoDelay(socket.getHandle());
84
85 #ifdef SO_BINDTODEVICE
86 if (!d_ds->d_config.sourceItfName.empty()) {
87 int res = setsockopt(socket.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
88 if (res != 0) {
89 vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror());
90 }
91 }
92 #endif
93
94 if (!IsAnyAddress(d_ds->d_config.sourceAddr)) {
95 SSetsockopt(socket.getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
96 #ifdef IP_BIND_ADDRESS_NO_PORT
97 if (d_ds->d_config.ipBindAddrNoPort) {
98 SSetsockopt(socket.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
99 }
100 #endif
101 socket.bind(d_ds->d_config.sourceAddr, false);
102 }
103 socket.setNonBlocking();
104
105 gettimeofday(&d_connectionStartTime, nullptr);
106 auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, d_ds->d_config.d_tlsSubjectIsAddr, socket.releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx);
107 if (!tlsSession && d_ds->d_tlsCtx) {
108 tlsSession = g_sessionCache.getSession(d_ds->getID(), d_connectionStartTime.tv_sec);
109 }
110 if (tlsSession) {
111 handler->setTLSSession(tlsSession);
112 }
113 handler->tryConnect(d_ds->d_config.tcpFastOpen && isFastOpenEnabled(), d_ds->d_config.remote);
114 d_queries = 0;
115
116 d_handler = std::move(handler);
117 d_ds->incCurrentConnectionsCount();
118 return true;
119 }
120 catch (const std::runtime_error& e) {
121 vinfolog("Connection to downstream server %s failed: %s", d_ds->getNameWithAddr(), e.what());
122 d_downstreamFailures++;
123 if (d_downstreamFailures >= d_ds->d_config.d_retries) {
124 throw;
125 }
126 }
127 }
128 while (d_downstreamFailures < d_ds->d_config.d_retries);
129
130 return false;
131 }
132
133 TCPConnectionToBackend::~TCPConnectionToBackend()
134 {
135 if (d_ds && !d_pendingResponses.empty()) {
136 d_ds->outstanding -= d_pendingResponses.size();
137 }
138 }
139
140 void TCPConnectionToBackend::release(){
141 d_ds->outstanding -= d_pendingResponses.size();
142
143 d_pendingResponses.clear();
144 d_pendingQueries.clear();
145
146 if (d_ioState) {
147 d_ioState.reset();
148 }
149 }
150
151 static uint32_t getSerialFromRawSOAContent(const std::vector<uint8_t>& raw)
152 {
153 /* minimal size for a SOA record, as defined by rfc1035:
154 MNAME (root): 1
155 RNAME (root): 1
156 SERIAL: 4
157 REFRESH: 4
158 RETRY: 4
159 EXPIRE: 4
160 MINIMUM: 4
161 = 22 bytes
162 */
163 if (raw.size() < 22) {
164 throw std::runtime_error("Invalid content of size " + std::to_string(raw.size()) + " for a SOA record");
165 }
166 /* As rfc1025 states that "all domain names in the RDATA section of these RRs may be compressed",
167 and we don't want to parse these names, start at the end */
168 uint32_t serial = 0;
169 memcpy(&serial, &raw.at(raw.size() - 20), sizeof(serial));
170 return ntohl(serial);
171 }
172
173 static bool getSerialFromIXFRQuery(TCPQuery& query)
174 {
175 try {
176 size_t proxyPayloadSize = query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0;
177 if (query.d_buffer.size() <= (proxyPayloadSize + sizeof(uint16_t))) {
178 return false;
179 }
180
181 size_t payloadSize = query.d_buffer.size() - sizeof(uint16_t) - proxyPayloadSize;
182
183 MOADNSParser parser(true, reinterpret_cast<const char*>(query.d_buffer.data() + sizeof(uint16_t) + proxyPayloadSize), payloadSize);
184
185 for (const auto& record : parser.d_answers) {
186 if (record.first.d_place != DNSResourceRecord::AUTHORITY || record.first.d_class != QClass::IN || record.first.d_type != QType::SOA) {
187 return false;
188 }
189
190 auto unknownContent = getRR<UnknownRecordContent>(record.first);
191 if (!unknownContent) {
192 return false;
193 }
194 auto raw = unknownContent->getRawContent();
195 query.d_ixfrQuerySerial = getSerialFromRawSOAContent(raw);
196 return true;
197 }
198 }
199 catch (const MOADNSException& e) {
200 DEBUGLOG("Exception when parsing IXFR TCP Query to DNS: " << e.what());
201 /* ponder what to do here, shall we close the connection? */
202 }
203
204 return false;
205 }
206
207 static void editPayloadID(PacketBuffer& payload, uint16_t newId, size_t proxyProtocolPayloadSize, bool sizePrepended)
208 {
209 /* we cannot do a direct cast as the alignment might be off (the size of the payload might have been prepended, which is bad enough,
210 but we might also have a proxy protocol payload */
211 size_t startOfHeaderOffset = (sizePrepended ? sizeof(uint16_t) : 0) + proxyProtocolPayloadSize;
212 if (payload.size() < startOfHeaderOffset + sizeof(dnsheader)) {
213 throw std::runtime_error("Invalid buffer for outgoing TCP query (size " + std::to_string(payload.size()));
214 }
215 uint16_t id = htons(newId);
216 memcpy(&payload.at(startOfHeaderOffset), &id, sizeof(id));
217 }
218
219 enum class QueryState : uint8_t {
220 hasSizePrepended,
221 noSize
222 };
223
224 enum class ConnectionState : uint8_t {
225 needProxy,
226 proxySent
227 };
228
229 static void prepareQueryForSending(TCPQuery& query, uint16_t id, QueryState queryState, ConnectionState connectionState)
230 {
231 if (connectionState == ConnectionState::needProxy) {
232 if (query.d_proxyProtocolPayload.size() > 0 && !query.d_proxyProtocolPayloadAdded) {
233 query.d_buffer.insert(query.d_buffer.begin(), query.d_proxyProtocolPayload.begin(), query.d_proxyProtocolPayload.end());
234 query.d_proxyProtocolPayloadAdded = true;
235 query.d_idstate.d_proxyProtocolPayloadSize = query.d_proxyProtocolPayload.size();
236 }
237 }
238 else if (connectionState == ConnectionState::proxySent) {
239 if (query.d_proxyProtocolPayloadAdded) {
240 if (query.d_buffer.size() < query.d_idstate.d_proxyProtocolPayloadSize) {
241 throw std::runtime_error("Trying to remove a proxy protocol payload of size " + std::to_string(query.d_proxyProtocolPayload.size()) + " from a buffer of size " + std::to_string(query.d_buffer.size()));
242 }
243 // NOLINTNEXTLINE(*-narrowing-conversions): the size of the payload is limited to 2^16-1
244 query.d_buffer.erase(query.d_buffer.begin(), query.d_buffer.begin() + static_cast<ssize_t>(query.d_idstate.d_proxyProtocolPayloadSize));
245 query.d_proxyProtocolPayloadAdded = false;
246 query.d_idstate.d_proxyProtocolPayloadSize = 0;
247 }
248 }
249 if (query.d_idstate.qclass == QClass::IN && query.d_idstate.qtype == QType::IXFR) {
250 getSerialFromIXFRQuery(query);
251 }
252
253 editPayloadID(query.d_buffer, id, query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0, true);
254 }
255
256 IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
257 {
258 conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
259
260 uint16_t id = conn->d_highestStreamID;
261 prepareQueryForSending(conn->d_currentQuery.d_query, id, QueryState::hasSizePrepended, conn->needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
262
263 conn->d_pendingQueries.pop_front();
264 conn->d_state = State::sendingQueryToBackend;
265 conn->d_currentPos = 0;
266
267 return IOState::NeedWrite;
268 }
269
270 IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
271 {
272 DEBUGLOG("sending query to backend "<<conn->getDS()->getNameWithAddr()<<" over FD "<<conn->d_handler->getDescriptor());
273
274 IOState state = conn->d_handler->tryWrite(conn->d_currentQuery.d_query.d_buffer, conn->d_currentPos, conn->d_currentQuery.d_query.d_buffer.size());
275
276 if (state != IOState::Done) {
277 return state;
278 }
279
280 DEBUGLOG("query sent to backend");
281 /* request sent ! */
282 if (conn->d_currentQuery.d_query.d_proxyProtocolPayloadAdded) {
283 conn->d_proxyProtocolPayloadSent = true;
284 }
285 ++conn->d_queries;
286 conn->d_currentPos = 0;
287
288 DEBUGLOG("adding a pending response for ID "<<conn->d_highestStreamID<<" and QNAME "<<conn->d_currentQuery.d_query.d_idstate.qname);
289 auto res = conn->d_pendingResponses.insert({conn->d_highestStreamID, std::move(conn->d_currentQuery)});
290 /* if there was already a pending response with that ID, we messed up and we don't expect more
291 than one response */
292 if (res.second) {
293 ++conn->d_ds->outstanding;
294 }
295 ++conn->d_highestStreamID;
296 conn->d_currentQuery.d_sender.reset();
297 conn->d_currentQuery.d_query.d_buffer.clear();
298
299 return state;
300 }
301
302 void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
303 {
304 if (conn->d_handler == nullptr) {
305 throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
306 }
307
308 bool connectionDied = false;
309 IOState iostate = IOState::Done;
310 IOStateGuard ioGuard(conn->d_ioState);
311 bool reconnected = false;
312
313 do {
314 reconnected = false;
315
316 try {
317 if (conn->d_state == State::sendingQueryToBackend) {
318 iostate = sendQuery(conn, now);
319
320 while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
321 queueNextQuery(conn);
322 iostate = sendQuery(conn, now);
323 }
324
325 if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
326 conn->d_state = State::waitingForResponseFromBackend;
327 conn->d_currentPos = 0;
328 conn->d_responseBuffer.resize(sizeof(uint16_t));
329 iostate = IOState::NeedRead;
330 }
331 }
332
333 if (conn->d_state == State::waitingForResponseFromBackend ||
334 conn->d_state == State::readingResponseSizeFromBackend) {
335 DEBUGLOG("reading response size from backend");
336 // then we need to allocate a new buffer (new because we might need to re-send the query if the
337 // backend dies on us)
338 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
339 conn->d_responseBuffer.resize(sizeof(uint16_t));
340 iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t));
341 if (iostate == IOState::Done) {
342 DEBUGLOG("got response size from backend");
343 conn->d_state = State::readingResponseFromBackend;
344 conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
345 conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
346 conn->d_responseBuffer.resize(conn->d_responseSize);
347 conn->d_currentPos = 0;
348 conn->d_lastDataReceivedTime = now;
349 }
350 else if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
351 conn->d_state = State::readingResponseSizeFromBackend;
352 }
353 }
354
355 if (conn->d_state == State::readingResponseFromBackend) {
356 DEBUGLOG("reading response from backend");
357 iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize);
358 if (iostate == IOState::Done) {
359 DEBUGLOG("got response from backend");
360 try {
361 conn->d_lastDataReceivedTime = now;
362 iostate = conn->handleResponse(conn, now);
363 }
364 catch (const std::exception& e) {
365 vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getNameWithAddr() : "unknown", conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
366 ioGuard.release();
367 conn->release();
368 return;
369 }
370 }
371 }
372
373 if (conn->d_state != State::idle &&
374 conn->d_state != State::sendingQueryToBackend &&
375 conn->d_state != State::waitingForResponseFromBackend &&
376 conn->d_state != State::readingResponseSizeFromBackend &&
377 conn->d_state != State::readingResponseFromBackend) {
378 vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
379 }
380 }
381 catch (const std::exception& e) {
382 /* most likely an EOF because the other end closed the connection,
383 but it might also be a real IO error or something else.
384 Let's just drop the connection
385 */
386 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_state == State::sendingQueryToBackend ? "writing to" : "reading from"), conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
387
388 if (conn->d_state == State::sendingQueryToBackend) {
389 ++conn->d_ds->tcpDiedSendingQuery;
390 }
391 else if (conn->d_state != State::idle) {
392 ++conn->d_ds->tcpDiedReadingResponse;
393 }
394
395 /* don't increase this counter when reusing connections */
396 if (conn->d_fresh) {
397 ++conn->d_downstreamFailures;
398 }
399
400 /* remove this FD from the IO multiplexer */
401 iostate = IOState::Done;
402 connectionDied = true;
403 }
404
405 if (connectionDied) {
406
407 DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->d_config.d_retries);
408
409 if (conn->d_downstreamFailures < conn->d_ds->d_config.d_retries) {
410
411 conn->d_ioState.reset();
412 ioGuard.release();
413
414 try {
415 if (conn->reconnect()) {
416 conn->d_ioState = make_unique<IOStateHandler>(*conn->d_mplexer, conn->d_handler->getDescriptor());
417
418 /* we need to resend the queries that were in flight, if any */
419 if (conn->d_state == State::sendingQueryToBackend) {
420 /* we need to edit this query so it has the correct ID */
421 auto query = std::move(conn->d_currentQuery);
422 uint16_t id = conn->d_highestStreamID;
423 prepareQueryForSending(query.d_query, id, QueryState::hasSizePrepended, ConnectionState::needProxy);
424 conn->d_currentQuery = std::move(query);
425 }
426
427 /* if we notify the sender it might terminate us so we need to move these first */
428 auto pendingResponses = std::move(conn->d_pendingResponses);
429 conn->d_pendingResponses.clear();
430 for (auto& pending : pendingResponses) {
431 --conn->d_ds->outstanding;
432
433 if (pending.second.d_query.isXFR() && pending.second.d_query.d_xfrStarted) {
434 /* this one can't be restarted, sorry */
435 DEBUGLOG("A XFR for which a response has already been sent cannot be restarted");
436 try {
437 TCPResponse response(std::move(pending.second.d_query));
438 pending.second.d_sender->notifyIOError(now, std::move(response));
439 }
440 catch (const std::exception& e) {
441 vinfolog("Got an exception while notifying: %s", e.what());
442 }
443 catch (...) {
444 vinfolog("Got exception while notifying");
445 }
446 }
447 else {
448 conn->d_pendingQueries.push_back(std::move(pending.second));
449 }
450 }
451 conn->d_currentPos = 0;
452
453 if (conn->d_state == State::sendingQueryToBackend) {
454 iostate = IOState::NeedWrite;
455 // resume sending query
456 }
457 else {
458 if (conn->d_pendingQueries.empty()) {
459 throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
460 }
461
462 iostate = queueNextQuery(conn);
463 }
464
465 reconnected = true;
466 connectionDied = false;
467 }
468 }
469 catch (const std::exception& e) {
470 // reconnect might throw on failure, let's ignore that, we just need to know
471 // it failed
472 }
473 }
474
475 if (!reconnected) {
476 /* reconnect failed, we give up */
477 DEBUGLOG("reconnect failed, we give up");
478 ++conn->d_ds->tcpGaveUp;
479 conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
480 }
481 }
482
483 if (conn->d_ioState) {
484 if (iostate == IOState::Done) {
485 conn->d_ioState->update(iostate, handleIOCallback, conn);
486 }
487 else {
488 boost::optional<struct timeval> ttd{boost::none};
489 if (iostate == IOState::NeedRead) {
490 ttd = conn->getBackendReadTTD(now);
491 }
492 else if (conn->isFresh() && conn->d_queries == 0) {
493 /* first write just after the non-blocking connect */
494 ttd = conn->getBackendConnectTTD(now);
495 }
496 else {
497 ttd = conn->getBackendWriteTTD(now);
498 }
499
500 conn->d_ioState->update(iostate, handleIOCallback, conn, ttd);
501 }
502 }
503 }
504 while (reconnected);
505
506 ioGuard.release();
507 }
508
509 void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
510 {
511 auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
512 if (fd != conn->getHandle()) {
513 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
514 }
515
516 struct timeval now;
517 gettimeofday(&now, nullptr);
518 handleIO(conn, now);
519 }
520
521 void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
522 {
523 if (!d_ioState) {
524 d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
525 }
526
527 // if we are not already sending a query or in the middle of reading a response (so idle),
528 // start sending the query
529 if (d_state == State::idle || d_state == State::waitingForResponseFromBackend) {
530 DEBUGLOG("Sending new query to backend right away, with ID "<<d_highestStreamID);
531 d_state = State::sendingQueryToBackend;
532 d_currentPos = 0;
533
534 uint16_t id = d_highestStreamID;
535
536 d_currentQuery = PendingRequest({sender, std::move(query)});
537 prepareQueryForSending(d_currentQuery.d_query, id, QueryState::hasSizePrepended, needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
538
539 struct timeval now;
540 gettimeofday(&now, 0);
541
542 auto shared = std::dynamic_pointer_cast<TCPConnectionToBackend>(shared_from_this());
543 handleIO(shared, now);
544 }
545 else {
546 DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state);
547 // store query in the list of queries to send
548 d_pendingQueries.push_back(PendingRequest({sender, std::move(query)}));
549 }
550 }
551
552 void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
553 {
554 /* in some cases we could retry, here, reconnecting and sending our pending responses again */
555 if (write) {
556 if (isFresh() && d_queries == 0) {
557 ++d_ds->tcpConnectTimeouts;
558 vinfolog("Timeout while connecting to TCP backend %s", d_ds->getNameWithAddr());
559 }
560 else {
561 ++d_ds->tcpWriteTimeouts;
562 vinfolog("Timeout while writing to TCP backend %s", d_ds->getNameWithAddr());
563 }
564 }
565 else {
566 ++d_ds->tcpReadTimeouts;
567 vinfolog("Timeout while reading from TCP backend %s", d_ds->getNameWithAddr());
568 }
569
570 try {
571 notifyAllQueriesFailed(now, FailureReason::timeout);
572 }
573 catch (const std::exception& e) {
574 vinfolog("Got an exception while notifying a timeout: %s", e.what());
575 }
576 catch (...) {
577 vinfolog("Got exception while notifying a timeout");
578 }
579
580 release();
581 }
582
583 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, FailureReason reason)
584 {
585 d_connectionDied = true;
586 d_ds->reportTimeoutOrError();
587
588 /* we might be terminated while notifying a query sender */
589 d_ds->outstanding -= d_pendingResponses.size();
590 auto pendingQueries = std::move(d_pendingQueries);
591 d_pendingQueries.clear();
592 auto pendingResponses = std::move(d_pendingResponses);
593 d_pendingResponses.clear();
594
595 auto increaseCounters = [reason](const ClientState* cs) {
596 if (reason == FailureReason::timeout) {
597 if (cs) {
598 ++cs->tcpDownstreamTimeouts;
599 }
600 }
601 else if (reason == FailureReason::gaveUp) {
602 if (cs) {
603 ++cs->tcpGaveUp;
604 }
605 }
606 };
607
608 try {
609 if (d_state == State::sendingQueryToBackend) {
610 increaseCounters(d_currentQuery.d_query.d_idstate.cs);
611 auto sender = d_currentQuery.d_sender;
612 if (sender->active()) {
613 TCPResponse response(std::move(d_currentQuery.d_query));
614 sender->notifyIOError(now, std::move(response));
615 }
616 }
617
618 for (auto& query : pendingQueries) {
619 increaseCounters(query.d_query.d_idstate.cs);
620 auto sender = query.d_sender;
621 if (sender->active()) {
622 TCPResponse response(std::move(query.d_query));
623 sender->notifyIOError(now, std::move(response));
624 }
625 }
626
627 for (auto& response : pendingResponses) {
628 increaseCounters(response.second.d_query.d_idstate.cs);
629 auto sender = response.second.d_sender;
630 if (sender->active()) {
631 TCPResponse tresp(std::move(response.second.d_query));
632 sender->notifyIOError(now, std::move(tresp));
633 }
634 }
635 }
636 catch (const std::exception& e) {
637 vinfolog("Got an exception while notifying: %s", e.what());
638 }
639 catch (...) {
640 vinfolog("Got exception while notifying");
641 }
642
643 release();
644 }
645
646 IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
647 {
648 d_downstreamFailures = 0;
649
650 uint16_t queryId = 0;
651 try {
652 queryId = getQueryIdFromResponse();
653 }
654 catch (const std::exception& e) {
655 DEBUGLOG("Unable to get query ID");
656 notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
657 throw;
658 }
659
660 auto it = d_pendingResponses.find(queryId);
661 if (it == d_pendingResponses.end()) {
662 DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
663 notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
664 return IOState::Done;
665 }
666
667 editPayloadID(d_responseBuffer, ntohs(it->second.d_query.d_idstate.origID), 0, false);
668
669 auto sender = it->second.d_sender;
670
671 if (sender->active() && it->second.d_query.isXFR()) {
672 DEBUGLOG("XFR!");
673 bool done = false;
674 TCPResponse response;
675 response.d_buffer = std::move(d_responseBuffer);
676 response.d_connection = conn;
677 response.d_ds = conn->d_ds;
678 /* we don't move the whole IDS because we will need for the responses to come */
679 response.d_idstate.qtype = it->second.d_query.d_idstate.qtype;
680 response.d_idstate.qname = it->second.d_query.d_idstate.qname;
681 DEBUGLOG("passing XFRresponse to client connection for "<<response.d_idstate.qname);
682
683 it->second.d_query.d_xfrStarted = true;
684 done = isXFRFinished(response, it->second.d_query);
685
686 if (done) {
687 d_pendingResponses.erase(it);
688 --conn->d_ds->outstanding;
689 /* marking as idle for now, so we can accept new queries if our queues are empty */
690 if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
691 d_state = State::idle;
692 t_downstreamTCPConnectionsManager.moveToIdle(conn);
693 }
694 }
695
696 sender->handleXFRResponse(now, std::move(response));
697 if (done) {
698 d_state = State::idle;
699 t_downstreamTCPConnectionsManager.moveToIdle(conn);
700 return IOState::Done;
701 }
702
703 d_state = State::waitingForResponseFromBackend;
704 d_currentPos = 0;
705 d_responseBuffer.resize(sizeof(uint16_t));
706 // get ready to read the next packet, if any
707 return IOState::NeedRead;
708 }
709
710 --conn->d_ds->outstanding;
711 auto ids = std::move(it->second.d_query.d_idstate);
712 const double udiff = ids.queryRealTime.udiff();
713 conn->d_ds->updateTCPLatency(udiff);
714 if (d_responseBuffer.size() >= sizeof(dnsheader)) {
715 dnsheader dh;
716 memcpy(&dh, d_responseBuffer.data(), sizeof(dh));
717 conn->d_ds->reportResponse(dh.rcode);
718 }
719 else {
720 conn->d_ds->reportTimeoutOrError();
721 }
722
723 d_pendingResponses.erase(it);
724 /* marking as idle for now, so we can accept new queries if our queues are empty */
725 if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
726 d_state = State::idle;
727 t_downstreamTCPConnectionsManager.moveToIdle(conn);
728 }
729 else if (!d_pendingResponses.empty()) {
730 d_currentPos = 0;
731 d_state = State::waitingForResponseFromBackend;
732 }
733
734 // be very careful that handleResponse() might trigger new queries being assigned to us,
735 // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
736 // anything without checking first
737 auto shared = conn;
738 if (sender->active()) {
739 DEBUGLOG("passing response to client connection for "<<ids.qname);
740 // make sure that we still exist after calling handleResponse()
741 TCPResponse response(std::move(d_responseBuffer), std::move(ids), conn, conn->d_ds);
742 sender->handleResponse(now, std::move(response));
743 }
744
745 if (!d_pendingQueries.empty()) {
746 DEBUGLOG("still have some queries to send");
747 return queueNextQuery(shared);
748 }
749 else if (!d_pendingResponses.empty()) {
750 DEBUGLOG("still have some responses to read");
751 return IOState::NeedRead;
752 }
753 else {
754 DEBUGLOG("nothing to do, waiting for a new query");
755 d_state = State::idle;
756 t_downstreamTCPConnectionsManager.moveToIdle(conn);
757 return IOState::Done;
758 }
759 }
760
761 uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
762 {
763 if (d_responseBuffer.size() < sizeof(dnsheader)) {
764 throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer.size()) + ") response from " + d_ds->getNameWithAddr());
765 }
766
767 uint16_t id;
768 memcpy(&id, &d_responseBuffer.at(0), sizeof(id));
769 return ntohs(id);
770 }
771
772 void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent)
773 {
774 /* if we already have some values, we have already verified they match */
775 if (!d_proxyProtocolValuesSent) {
776 d_proxyProtocolValuesSent = std::move(proxyProtocolValuesSent);
777 }
778 }
779
780 bool TCPConnectionToBackend::matchesTLVs(const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) const
781 {
782 if (tlvs == nullptr) {
783 if (d_proxyProtocolValuesSent == nullptr) {
784 return true;
785 }
786 else {
787 return false;
788 }
789 }
790
791 if (d_proxyProtocolValuesSent == nullptr) {
792 return false;
793 }
794
795 return *tlvs == *d_proxyProtocolValuesSent;
796 }
797
798 bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery& query)
799 {
800 bool done = false;
801
802 try {
803 MOADNSParser parser(true, reinterpret_cast<const char*>(response.d_buffer.data()), response.d_buffer.size());
804
805 if (parser.d_header.rcode != 0U) {
806 done = true;
807 }
808 else {
809 for (const auto& record : parser.d_answers) {
810 if (record.first.d_class != QClass::IN || record.first.d_type != QType::SOA) {
811 continue;
812 }
813
814 auto unknownContent = getRR<UnknownRecordContent>(record.first);
815 if (!unknownContent) {
816 continue;
817 }
818 auto raw = unknownContent->getRawContent();
819 auto serial = getSerialFromRawSOAContent(raw);
820 if (query.d_xfrMasterSerial == 0) {
821 // store the first SOA in our client's connection metadata
822 query.d_xfrMasterSerial = serial;
823 if (query.d_idstate.qtype == QType::IXFR && (query.d_xfrMasterSerial == query.d_ixfrQuerySerial || rfc1982LessThan(query.d_xfrMasterSerial, query.d_ixfrQuerySerial))) {
824 /* This is the first message with a master SOA:
825 RFC 1995 Section 2:
826 If an IXFR query with the same or newer version number
827 than that of the server is received, it is replied to
828 with a single SOA record of the server's current version.
829 */
830 done = true;
831 break;
832 }
833 }
834
835 ++query.d_xfrSerialCount;
836 if (serial == query.d_xfrMasterSerial) {
837 ++query.d_xfrMasterSerialCount;
838 // figure out if it's end when receiving master's SOA again
839 if (query.d_xfrSerialCount == 2) {
840 // if there are only two SOA records marks a finished AXFR
841 done = true;
842 break;
843 }
844 if (query.d_xfrMasterSerialCount == 3) {
845 // receiving master's SOA 3 times marks a finished IXFR
846 done = true;
847 break;
848 }
849 }
850 }
851 }
852 }
853 catch (const MOADNSException& e) {
854 DEBUGLOG("Exception when parsing TCPResponse to DNS: " << e.what());
855 /* ponder what to do here, shall we close the connection? */
856 }
857 return done;
858 }
859
860 void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max)
861 {
862 DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
863 }
864
865 void setTCPDownstreamCleanupInterval(uint64_t interval)
866 {
867 DownstreamTCPConnectionsManager::setCleanupInterval(interval);
868 }
869
870 void setTCPDownstreamMaxIdleTime(uint64_t max)
871 {
872 DownstreamTCPConnectionsManager::setMaxIdleTime(max);
873 }