]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/dnsdist-tcp-downstream.cc
Merge pull request #14004 from rgacogne/ddist-tcp-downstream-release
[thirdparty/pdns.git] / pdns / dnsdistdist / dnsdist-tcp-downstream.cc
1
2 #include "dnsdist-session-cache.hh"
3 #include "dnsdist-tcp-downstream.hh"
4 #include "dnsdist-tcp-upstream.hh"
5 #include "dnsdist-downstream-connection.hh"
6
7 #include "dnsparser.hh"
8
9 thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
10
11 ConnectionToBackend::~ConnectionToBackend()
12 {
13 if (d_ds && d_handler) {
14 --d_ds->tcpCurrentConnections;
15 struct timeval now;
16 gettimeofday(&now, nullptr);
17
18 if (d_handler->isTLS()) {
19 if (d_handler->hasTLSSessionBeenResumed()) {
20 ++d_ds->tlsResumptions;
21 }
22 try {
23 auto sessions = d_handler->getTLSSessions();
24 if (!sessions.empty()) {
25 g_sessionCache.putSessions(d_ds->getID(), now.tv_sec, std::move(sessions));
26 }
27 }
28 catch (const std::exception& e) {
29 vinfolog("Unable to get a TLS session: %s", e.what());
30 }
31 }
32 auto diff = now - d_connectionStartTime;
33 // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
34 d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
35 }
36 }
37
38 bool ConnectionToBackend::reconnect()
39 {
40 std::unique_ptr<TLSSession> tlsSession{nullptr};
41 if (d_handler) {
42 DEBUGLOG("closing socket "<<d_handler->getDescriptor());
43 if (d_handler->isTLS()) {
44 if (d_handler->hasTLSSessionBeenResumed()) {
45 ++d_ds->tlsResumptions;
46 }
47 try {
48 auto sessions = d_handler->getTLSSessions();
49 if (!sessions.empty()) {
50 tlsSession = std::move(sessions.back());
51 sessions.pop_back();
52 if (!sessions.empty()) {
53 g_sessionCache.putSessions(d_ds->getID(), time(nullptr), std::move(sessions));
54 }
55 }
56 }
57 catch (const std::exception& e) {
58 vinfolog("Unable to get a TLS session to resume: %s", e.what());
59 }
60 }
61 d_handler->close();
62 d_ioState.reset();
63 d_handler.reset();
64 --d_ds->tcpCurrentConnections;
65 }
66
67 d_fresh = true;
68 d_highestStreamID = 0;
69 d_proxyProtocolPayloadSent = false;
70
71 do {
72 DEBUGLOG("TCP connecting to downstream "<<d_ds->getNameWithAddr()<<" ("<<d_downstreamFailures<<")");
73 DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
74 ++d_ds->tcpNewConnections;
75 try {
76 auto socket = Socket(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
77 DEBUGLOG("result of socket() is "<<socket.getHandle());
78
79 /* disable NAGLE, which does not play nicely with delayed ACKs.
80 In theory we could be wasting up to 500 milliseconds waiting for
81 the other end to acknowledge our initial packet before we could
82 send the rest. */
83 setTCPNoDelay(socket.getHandle());
84
85 #ifdef SO_BINDTODEVICE
86 if (!d_ds->d_config.sourceItfName.empty()) {
87 int res = setsockopt(socket.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
88 if (res != 0) {
89 vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror());
90 }
91 }
92 #endif
93
94 if (!IsAnyAddress(d_ds->d_config.sourceAddr)) {
95 SSetsockopt(socket.getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
96 #ifdef IP_BIND_ADDRESS_NO_PORT
97 if (d_ds->d_config.ipBindAddrNoPort) {
98 SSetsockopt(socket.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
99 }
100 #endif
101 socket.bind(d_ds->d_config.sourceAddr, false);
102 }
103 socket.setNonBlocking();
104
105 gettimeofday(&d_connectionStartTime, nullptr);
106 auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, d_ds->d_config.d_tlsSubjectIsAddr, socket.releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx);
107 if (!tlsSession && d_ds->d_tlsCtx) {
108 tlsSession = g_sessionCache.getSession(d_ds->getID(), d_connectionStartTime.tv_sec);
109 }
110 if (tlsSession) {
111 handler->setTLSSession(tlsSession);
112 }
113 handler->tryConnect(d_ds->d_config.tcpFastOpen && isFastOpenEnabled(), d_ds->d_config.remote);
114 d_queries = 0;
115
116 d_handler = std::move(handler);
117 d_ds->incCurrentConnectionsCount();
118 return true;
119 }
120 catch (const std::runtime_error& e) {
121 vinfolog("Connection to downstream server %s failed: %s", d_ds->getNameWithAddr(), e.what());
122 d_downstreamFailures++;
123 if (d_downstreamFailures >= d_ds->d_config.d_retries) {
124 throw;
125 }
126 }
127 }
128 while (d_downstreamFailures < d_ds->d_config.d_retries);
129
130 return false;
131 }
132
133 TCPConnectionToBackend::~TCPConnectionToBackend()
134 {
135 if (d_ds && !d_pendingResponses.empty()) {
136 d_ds->outstanding -= d_pendingResponses.size();
137 }
138 }
139
140 void TCPConnectionToBackend::release(){
141 d_ds->outstanding -= d_pendingResponses.size();
142
143 d_pendingResponses.clear();
144 d_pendingQueries.clear();
145
146 if (d_ioState) {
147 d_ioState.reset();
148 }
149
150 auto shared = std::dynamic_pointer_cast<TCPConnectionToBackend>(shared_from_this());
151 if (!willBeReusable(true)) {
152 /* remove ourselves from the connection cache, this might mean that our
153 reference count drops to zero after that, so we need to be careful */
154 t_downstreamTCPConnectionsManager.removeDownstreamConnection(shared);
155 }
156 }
157
158 static uint32_t getSerialFromRawSOAContent(const std::vector<uint8_t>& raw)
159 {
160 /* minimal size for a SOA record, as defined by rfc1035:
161 MNAME (root): 1
162 RNAME (root): 1
163 SERIAL: 4
164 REFRESH: 4
165 RETRY: 4
166 EXPIRE: 4
167 MINIMUM: 4
168 = 22 bytes
169 */
170 if (raw.size() < 22) {
171 throw std::runtime_error("Invalid content of size " + std::to_string(raw.size()) + " for a SOA record");
172 }
173 /* As rfc1025 states that "all domain names in the RDATA section of these RRs may be compressed",
174 and we don't want to parse these names, start at the end */
175 uint32_t serial = 0;
176 memcpy(&serial, &raw.at(raw.size() - 20), sizeof(serial));
177 return ntohl(serial);
178 }
179
180 static bool getSerialFromIXFRQuery(TCPQuery& query)
181 {
182 try {
183 size_t proxyPayloadSize = query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0;
184 if (query.d_buffer.size() <= (proxyPayloadSize + sizeof(uint16_t))) {
185 return false;
186 }
187
188 size_t payloadSize = query.d_buffer.size() - sizeof(uint16_t) - proxyPayloadSize;
189
190 MOADNSParser parser(true, reinterpret_cast<const char*>(query.d_buffer.data() + sizeof(uint16_t) + proxyPayloadSize), payloadSize);
191
192 for (const auto& record : parser.d_answers) {
193 if (record.first.d_place != DNSResourceRecord::AUTHORITY || record.first.d_class != QClass::IN || record.first.d_type != QType::SOA) {
194 return false;
195 }
196
197 auto unknownContent = getRR<UnknownRecordContent>(record.first);
198 if (!unknownContent) {
199 return false;
200 }
201 const auto& raw = unknownContent->getRawContent();
202 query.d_ixfrQuerySerial = getSerialFromRawSOAContent(raw);
203 return true;
204 }
205 }
206 catch (const MOADNSException& e) {
207 DEBUGLOG("Exception when parsing IXFR TCP Query to DNS: " << e.what());
208 /* ponder what to do here, shall we close the connection? */
209 }
210
211 return false;
212 }
213
214 static void editPayloadID(PacketBuffer& payload, uint16_t newId, size_t proxyProtocolPayloadSize, bool sizePrepended)
215 {
216 /* we cannot do a direct cast as the alignment might be off (the size of the payload might have been prepended, which is bad enough,
217 but we might also have a proxy protocol payload */
218 size_t startOfHeaderOffset = (sizePrepended ? sizeof(uint16_t) : 0) + proxyProtocolPayloadSize;
219 if (payload.size() < startOfHeaderOffset + sizeof(dnsheader)) {
220 throw std::runtime_error("Invalid buffer for outgoing TCP query (size " + std::to_string(payload.size()));
221 }
222 uint16_t id = htons(newId);
223 memcpy(&payload.at(startOfHeaderOffset), &id, sizeof(id));
224 }
225
226 enum class QueryState : uint8_t {
227 hasSizePrepended,
228 noSize
229 };
230
231 enum class ConnectionState : uint8_t {
232 needProxy,
233 proxySent
234 };
235
236 static void prepareQueryForSending(TCPQuery& query, uint16_t id, QueryState queryState, ConnectionState connectionState)
237 {
238 if (connectionState == ConnectionState::needProxy) {
239 if (query.d_proxyProtocolPayload.size() > 0 && !query.d_proxyProtocolPayloadAdded) {
240 query.d_buffer.insert(query.d_buffer.begin(), query.d_proxyProtocolPayload.begin(), query.d_proxyProtocolPayload.end());
241 query.d_proxyProtocolPayloadAdded = true;
242 query.d_idstate.d_proxyProtocolPayloadSize = query.d_proxyProtocolPayload.size();
243 }
244 }
245 else if (connectionState == ConnectionState::proxySent) {
246 if (query.d_proxyProtocolPayloadAdded) {
247 if (query.d_buffer.size() < query.d_idstate.d_proxyProtocolPayloadSize) {
248 throw std::runtime_error("Trying to remove a proxy protocol payload of size " + std::to_string(query.d_proxyProtocolPayload.size()) + " from a buffer of size " + std::to_string(query.d_buffer.size()));
249 }
250 // NOLINTNEXTLINE(*-narrowing-conversions): the size of the payload is limited to 2^16-1
251 query.d_buffer.erase(query.d_buffer.begin(), query.d_buffer.begin() + static_cast<ssize_t>(query.d_idstate.d_proxyProtocolPayloadSize));
252 query.d_proxyProtocolPayloadAdded = false;
253 query.d_idstate.d_proxyProtocolPayloadSize = 0;
254 }
255 }
256 if (query.d_idstate.qclass == QClass::IN && query.d_idstate.qtype == QType::IXFR) {
257 getSerialFromIXFRQuery(query);
258 }
259
260 editPayloadID(query.d_buffer, id, query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0, true);
261 }
262
263 IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
264 {
265 conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
266
267 uint16_t id = conn->d_highestStreamID;
268 prepareQueryForSending(conn->d_currentQuery.d_query, id, QueryState::hasSizePrepended, conn->needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
269
270 conn->d_pendingQueries.pop_front();
271 conn->d_state = State::sendingQueryToBackend;
272 conn->d_currentPos = 0;
273
274 return IOState::NeedWrite;
275 }
276
277 IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
278 {
279 DEBUGLOG("sending query to backend "<<conn->getDS()->getNameWithAddr()<<" over FD "<<conn->d_handler->getDescriptor());
280
281 IOState state = conn->d_handler->tryWrite(conn->d_currentQuery.d_query.d_buffer, conn->d_currentPos, conn->d_currentQuery.d_query.d_buffer.size());
282
283 if (state != IOState::Done) {
284 return state;
285 }
286
287 DEBUGLOG("query sent to backend");
288 /* request sent ! */
289 if (conn->d_currentQuery.d_query.d_proxyProtocolPayloadAdded) {
290 conn->d_proxyProtocolPayloadSent = true;
291 }
292 ++conn->d_queries;
293 conn->d_currentPos = 0;
294
295 DEBUGLOG("adding a pending response for ID "<<conn->d_highestStreamID<<" and QNAME "<<conn->d_currentQuery.d_query.d_idstate.qname);
296 auto res = conn->d_pendingResponses.insert({conn->d_highestStreamID, std::move(conn->d_currentQuery)});
297 /* if there was already a pending response with that ID, we messed up and we don't expect more
298 than one response */
299 if (res.second) {
300 ++conn->d_ds->outstanding;
301 }
302 ++conn->d_highestStreamID;
303 conn->d_currentQuery.d_sender.reset();
304 conn->d_currentQuery.d_query.d_buffer.clear();
305
306 return state;
307 }
308
309 void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
310 {
311 if (conn->d_handler == nullptr) {
312 throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
313 }
314
315 bool connectionDied = false;
316 IOState iostate = IOState::Done;
317 IOStateGuard ioGuard(conn->d_ioState);
318 bool reconnected = false;
319
320 do {
321 reconnected = false;
322
323 try {
324 if (conn->d_state == State::sendingQueryToBackend) {
325 iostate = sendQuery(conn, now);
326
327 while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
328 queueNextQuery(conn);
329 iostate = sendQuery(conn, now);
330 }
331
332 if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
333 conn->d_state = State::waitingForResponseFromBackend;
334 conn->d_currentPos = 0;
335 conn->d_responseBuffer.resize(sizeof(uint16_t));
336 iostate = IOState::NeedRead;
337 }
338 }
339
340 if (conn->d_state == State::waitingForResponseFromBackend ||
341 conn->d_state == State::readingResponseSizeFromBackend) {
342 DEBUGLOG("reading response size from backend");
343 // then we need to allocate a new buffer (new because we might need to re-send the query if the
344 // backend dies on us)
345 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
346 conn->d_responseBuffer.resize(sizeof(uint16_t));
347 iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t));
348 if (iostate == IOState::Done) {
349 DEBUGLOG("got response size from backend");
350 conn->d_state = State::readingResponseFromBackend;
351 conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
352 conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
353 conn->d_responseBuffer.resize(conn->d_responseSize);
354 conn->d_currentPos = 0;
355 conn->d_lastDataReceivedTime = now;
356 }
357 else if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
358 conn->d_state = State::readingResponseSizeFromBackend;
359 }
360 }
361
362 if (conn->d_state == State::readingResponseFromBackend) {
363 DEBUGLOG("reading response from backend");
364 iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize);
365 if (iostate == IOState::Done) {
366 DEBUGLOG("got response from backend");
367 try {
368 conn->d_lastDataReceivedTime = now;
369 iostate = conn->handleResponse(conn, now);
370 }
371 catch (const std::exception& e) {
372 vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getNameWithAddr() : "unknown", conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
373 ioGuard.release();
374 conn->release();
375 return;
376 }
377 }
378 }
379
380 if (conn->d_state != State::idle &&
381 conn->d_state != State::sendingQueryToBackend &&
382 conn->d_state != State::waitingForResponseFromBackend &&
383 conn->d_state != State::readingResponseSizeFromBackend &&
384 conn->d_state != State::readingResponseFromBackend) {
385 vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
386 }
387 }
388 catch (const std::exception& e) {
389 /* most likely an EOF because the other end closed the connection,
390 but it might also be a real IO error or something else.
391 Let's just drop the connection
392 */
393 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_state == State::sendingQueryToBackend ? "writing to" : "reading from"), conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
394
395 if (conn->d_state == State::sendingQueryToBackend) {
396 ++conn->d_ds->tcpDiedSendingQuery;
397 }
398 else if (conn->d_state != State::idle) {
399 ++conn->d_ds->tcpDiedReadingResponse;
400 }
401
402 /* don't increase this counter when reusing connections */
403 if (conn->d_fresh) {
404 ++conn->d_downstreamFailures;
405 }
406
407 /* remove this FD from the IO multiplexer */
408 iostate = IOState::Done;
409 connectionDied = true;
410 }
411
412 if (connectionDied) {
413
414 DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->d_config.d_retries);
415
416 if (conn->d_downstreamFailures < conn->d_ds->d_config.d_retries) {
417
418 conn->d_ioState.reset();
419 ioGuard.release();
420
421 try {
422 if (conn->reconnect()) {
423 conn->d_ioState = make_unique<IOStateHandler>(*conn->d_mplexer, conn->d_handler->getDescriptor());
424
425 /* we need to resend the queries that were in flight, if any */
426 if (conn->d_state == State::sendingQueryToBackend) {
427 /* we need to edit this query so it has the correct ID */
428 auto query = std::move(conn->d_currentQuery);
429 uint16_t id = conn->d_highestStreamID;
430 prepareQueryForSending(query.d_query, id, QueryState::hasSizePrepended, ConnectionState::needProxy);
431 conn->d_currentQuery = std::move(query);
432 }
433
434 /* if we notify the sender it might terminate us so we need to move these first */
435 auto pendingResponses = std::move(conn->d_pendingResponses);
436 conn->d_pendingResponses.clear();
437 for (auto& pending : pendingResponses) {
438 --conn->d_ds->outstanding;
439
440 if (pending.second.d_query.isXFR() && pending.second.d_query.d_xfrStarted) {
441 /* this one can't be restarted, sorry */
442 DEBUGLOG("A XFR for which a response has already been sent cannot be restarted");
443 try {
444 TCPResponse response(std::move(pending.second.d_query));
445 pending.second.d_sender->notifyIOError(now, std::move(response));
446 }
447 catch (const std::exception& e) {
448 vinfolog("Got an exception while notifying: %s", e.what());
449 }
450 catch (...) {
451 vinfolog("Got exception while notifying");
452 }
453 }
454 else {
455 conn->d_pendingQueries.push_back(std::move(pending.second));
456 }
457 }
458 conn->d_currentPos = 0;
459
460 if (conn->d_state == State::sendingQueryToBackend) {
461 iostate = IOState::NeedWrite;
462 // resume sending query
463 }
464 else {
465 if (conn->d_pendingQueries.empty()) {
466 throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
467 }
468
469 iostate = queueNextQuery(conn);
470 }
471
472 reconnected = true;
473 connectionDied = false;
474 }
475 }
476 catch (const std::exception& e) {
477 // reconnect might throw on failure, let's ignore that, we just need to know
478 // it failed
479 }
480 }
481
482 if (!reconnected) {
483 /* reconnect failed, we give up */
484 DEBUGLOG("reconnect failed, we give up");
485 ++conn->d_ds->tcpGaveUp;
486 conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
487 }
488 }
489
490 if (conn->d_ioState) {
491 if (iostate == IOState::Done) {
492 conn->d_ioState->update(iostate, handleIOCallback, conn);
493 }
494 else {
495 boost::optional<struct timeval> ttd{boost::none};
496 if (iostate == IOState::NeedRead) {
497 ttd = conn->getBackendReadTTD(now);
498 }
499 else if (conn->isFresh() && conn->d_queries == 0) {
500 /* first write just after the non-blocking connect */
501 ttd = conn->getBackendConnectTTD(now);
502 }
503 else {
504 ttd = conn->getBackendWriteTTD(now);
505 }
506
507 conn->d_ioState->update(iostate, handleIOCallback, conn, ttd);
508 }
509 }
510 }
511 while (reconnected);
512
513 ioGuard.release();
514 }
515
516 void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
517 {
518 auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
519 if (fd != conn->getHandle()) {
520 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
521 }
522
523 struct timeval now;
524 gettimeofday(&now, nullptr);
525 handleIO(conn, now);
526 }
527
528 void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
529 {
530 if (!d_ioState) {
531 d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
532 }
533
534 // if we are not already sending a query or in the middle of reading a response (so idle),
535 // start sending the query
536 if (d_state == State::idle || d_state == State::waitingForResponseFromBackend) {
537 DEBUGLOG("Sending new query to backend right away, with ID "<<d_highestStreamID);
538 d_state = State::sendingQueryToBackend;
539 d_currentPos = 0;
540
541 uint16_t id = d_highestStreamID;
542
543 d_currentQuery = PendingRequest({sender, std::move(query)});
544 prepareQueryForSending(d_currentQuery.d_query, id, QueryState::hasSizePrepended, needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
545
546 struct timeval now;
547 gettimeofday(&now, 0);
548
549 auto shared = std::dynamic_pointer_cast<TCPConnectionToBackend>(shared_from_this());
550 handleIO(shared, now);
551 }
552 else {
553 DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state);
554 // store query in the list of queries to send
555 d_pendingQueries.push_back(PendingRequest({sender, std::move(query)}));
556 }
557 }
558
559 void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
560 {
561 /* in some cases we could retry, here, reconnecting and sending our pending responses again */
562 if (write) {
563 if (isFresh() && d_queries == 0) {
564 ++d_ds->tcpConnectTimeouts;
565 vinfolog("Timeout while connecting to TCP backend %s", d_ds->getNameWithAddr());
566 }
567 else {
568 ++d_ds->tcpWriteTimeouts;
569 vinfolog("Timeout while writing to TCP backend %s", d_ds->getNameWithAddr());
570 }
571 }
572 else {
573 ++d_ds->tcpReadTimeouts;
574 vinfolog("Timeout while reading from TCP backend %s", d_ds->getNameWithAddr());
575 }
576
577 try {
578 notifyAllQueriesFailed(now, FailureReason::timeout);
579 }
580 catch (const std::exception& e) {
581 vinfolog("Got an exception while notifying a timeout: %s", e.what());
582 }
583 catch (...) {
584 vinfolog("Got exception while notifying a timeout");
585 }
586
587 release();
588 }
589
590 void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, FailureReason reason)
591 {
592 d_connectionDied = true;
593 d_ds->reportTimeoutOrError();
594
595 /* we might be terminated while notifying a query sender */
596 d_ds->outstanding -= d_pendingResponses.size();
597 auto pendingQueries = std::move(d_pendingQueries);
598 d_pendingQueries.clear();
599 auto pendingResponses = std::move(d_pendingResponses);
600 d_pendingResponses.clear();
601
602 auto increaseCounters = [reason](const ClientState* cs) {
603 if (reason == FailureReason::timeout) {
604 if (cs) {
605 ++cs->tcpDownstreamTimeouts;
606 }
607 }
608 else if (reason == FailureReason::gaveUp) {
609 if (cs) {
610 ++cs->tcpGaveUp;
611 }
612 }
613 };
614
615 try {
616 if (d_state == State::sendingQueryToBackend) {
617 increaseCounters(d_currentQuery.d_query.d_idstate.cs);
618 auto sender = d_currentQuery.d_sender;
619 if (sender->active()) {
620 TCPResponse response(std::move(d_currentQuery.d_query));
621 sender->notifyIOError(now, std::move(response));
622 }
623 }
624
625 for (auto& query : pendingQueries) {
626 increaseCounters(query.d_query.d_idstate.cs);
627 auto sender = query.d_sender;
628 if (sender->active()) {
629 TCPResponse response(std::move(query.d_query));
630 sender->notifyIOError(now, std::move(response));
631 }
632 }
633
634 for (auto& response : pendingResponses) {
635 increaseCounters(response.second.d_query.d_idstate.cs);
636 auto sender = response.second.d_sender;
637 if (sender->active()) {
638 TCPResponse tresp(std::move(response.second.d_query));
639 sender->notifyIOError(now, std::move(tresp));
640 }
641 }
642 }
643 catch (const std::exception& e) {
644 vinfolog("Got an exception while notifying: %s", e.what());
645 }
646 catch (...) {
647 vinfolog("Got exception while notifying");
648 }
649
650 release();
651 }
652
653 IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
654 {
655 d_downstreamFailures = 0;
656
657 uint16_t queryId = 0;
658 try {
659 queryId = getQueryIdFromResponse();
660 }
661 catch (const std::exception& e) {
662 DEBUGLOG("Unable to get query ID");
663 notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
664 throw;
665 }
666
667 auto it = d_pendingResponses.find(queryId);
668 if (it == d_pendingResponses.end()) {
669 DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
670 notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
671 return IOState::Done;
672 }
673
674 editPayloadID(d_responseBuffer, ntohs(it->second.d_query.d_idstate.origID), 0, false);
675
676 auto sender = it->second.d_sender;
677
678 if (sender->active() && it->second.d_query.isXFR()) {
679 DEBUGLOG("XFR!");
680 bool done = false;
681 TCPResponse response;
682 response.d_buffer = std::move(d_responseBuffer);
683 response.d_connection = conn;
684 response.d_ds = conn->d_ds;
685 const auto& queryIDS = it->second.d_query.d_idstate;
686 /* we don't move the whole IDS because we will need it for the responses to come */
687 response.d_idstate = queryIDS.partialCloneForXFR();
688 DEBUGLOG("passing XFRresponse to client connection for "<<response.d_idstate.qname);
689
690 it->second.d_query.d_xfrStarted = true;
691 done = isXFRFinished(response, it->second.d_query);
692
693 if (done) {
694 d_pendingResponses.erase(it);
695 --conn->d_ds->outstanding;
696 /* marking as idle for now, so we can accept new queries if our queues are empty */
697 if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
698 d_state = State::idle;
699 t_downstreamTCPConnectionsManager.moveToIdle(conn);
700 }
701 }
702
703 sender->handleXFRResponse(now, std::move(response));
704 if (done) {
705 d_state = State::idle;
706 t_downstreamTCPConnectionsManager.moveToIdle(conn);
707 return IOState::Done;
708 }
709
710 d_state = State::waitingForResponseFromBackend;
711 d_currentPos = 0;
712 d_responseBuffer.resize(sizeof(uint16_t));
713 // get ready to read the next packet, if any
714 return IOState::NeedRead;
715 }
716
717 --conn->d_ds->outstanding;
718 auto ids = std::move(it->second.d_query.d_idstate);
719 const double udiff = ids.queryRealTime.udiff();
720 conn->d_ds->updateTCPLatency(udiff);
721 if (d_responseBuffer.size() >= sizeof(dnsheader)) {
722 dnsheader dh;
723 memcpy(&dh, d_responseBuffer.data(), sizeof(dh));
724 conn->d_ds->reportResponse(dh.rcode);
725 }
726 else {
727 conn->d_ds->reportTimeoutOrError();
728 }
729
730 d_pendingResponses.erase(it);
731 /* marking as idle for now, so we can accept new queries if our queues are empty */
732 if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
733 d_state = State::idle;
734 t_downstreamTCPConnectionsManager.moveToIdle(conn);
735 }
736 else if (!d_pendingResponses.empty()) {
737 d_currentPos = 0;
738 d_state = State::waitingForResponseFromBackend;
739 }
740
741 // be very careful that handleResponse() might trigger new queries being assigned to us,
742 // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
743 // anything without checking first
744 auto shared = conn;
745 if (sender->active()) {
746 DEBUGLOG("passing response to client connection for "<<ids.qname);
747 // make sure that we still exist after calling handleResponse()
748 TCPResponse response(std::move(d_responseBuffer), std::move(ids), conn, conn->d_ds);
749 sender->handleResponse(now, std::move(response));
750 }
751
752 if (!d_pendingQueries.empty()) {
753 DEBUGLOG("still have some queries to send");
754 return queueNextQuery(shared);
755 }
756 else if (!d_pendingResponses.empty()) {
757 DEBUGLOG("still have some responses to read");
758 return IOState::NeedRead;
759 }
760 else {
761 DEBUGLOG("nothing to do, waiting for a new query");
762 d_state = State::idle;
763 t_downstreamTCPConnectionsManager.moveToIdle(conn);
764 return IOState::Done;
765 }
766 }
767
768 uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
769 {
770 if (d_responseBuffer.size() < sizeof(dnsheader)) {
771 throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer.size()) + ") response from " + d_ds->getNameWithAddr());
772 }
773
774 uint16_t id;
775 memcpy(&id, &d_responseBuffer.at(0), sizeof(id));
776 return ntohs(id);
777 }
778
779 void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent)
780 {
781 /* if we already have some values, we have already verified they match */
782 if (!d_proxyProtocolValuesSent) {
783 d_proxyProtocolValuesSent = std::move(proxyProtocolValuesSent);
784 }
785 }
786
787 bool TCPConnectionToBackend::matchesTLVs(const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) const
788 {
789 if (tlvs == nullptr) {
790 if (d_proxyProtocolValuesSent == nullptr) {
791 return true;
792 }
793 else {
794 return false;
795 }
796 }
797
798 if (d_proxyProtocolValuesSent == nullptr) {
799 return false;
800 }
801
802 return *tlvs == *d_proxyProtocolValuesSent;
803 }
804
805 bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery& query)
806 {
807 bool done = false;
808
809 try {
810 MOADNSParser parser(true, reinterpret_cast<const char*>(response.d_buffer.data()), response.d_buffer.size());
811
812 if (parser.d_header.rcode != 0U) {
813 done = true;
814 }
815 else {
816 for (const auto& record : parser.d_answers) {
817 if (record.first.d_class != QClass::IN || record.first.d_type != QType::SOA) {
818 continue;
819 }
820
821 auto unknownContent = getRR<UnknownRecordContent>(record.first);
822 if (!unknownContent) {
823 continue;
824 }
825 const auto& raw = unknownContent->getRawContent();
826 auto serial = getSerialFromRawSOAContent(raw);
827 if (query.d_xfrPrimarySerial == 0) {
828 // store the first SOA in our client's connection metadata
829 query.d_xfrPrimarySerial = serial;
830 if (query.d_idstate.qtype == QType::IXFR && (query.d_xfrPrimarySerial == query.d_ixfrQuerySerial || rfc1982LessThan(query.d_xfrPrimarySerial, query.d_ixfrQuerySerial))) {
831 /* This is the first message with a primary SOA:
832 RFC 1995 Section 2:
833 If an IXFR query with the same or newer version number
834 than that of the server is received, it is replied to
835 with a single SOA record of the server's current version.
836 */
837 done = true;
838 break;
839 }
840 }
841
842 ++query.d_xfrSerialCount;
843 if (serial == query.d_xfrPrimarySerial) {
844 ++query.d_xfrPrimarySerialCount;
845 // figure out if it's end when receiving primary's SOA again
846 if (query.d_xfrSerialCount == 2) {
847 // if there are only two SOA records marks a finished AXFR
848 done = true;
849 break;
850 }
851 if (query.d_xfrPrimarySerialCount == 3) {
852 // receiving primary's SOA 3 times marks a finished IXFR
853 done = true;
854 break;
855 }
856 }
857 }
858 }
859 }
860 catch (const MOADNSException& e) {
861 DEBUGLOG("Exception when parsing TCPResponse to DNS: " << e.what());
862 /* ponder what to do here, shall we close the connection? */
863 }
864 return done;
865 }
866
867 void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max)
868 {
869 DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
870 }
871
872 void setTCPDownstreamCleanupInterval(uint64_t interval)
873 {
874 DownstreamTCPConnectionsManager::setCleanupInterval(interval);
875 }
876
877 void setTCPDownstreamMaxIdleTime(uint64_t max)
878 {
879 DownstreamTCPConnectionsManager::setMaxIdleTime(max);
880 }