]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/dnsdistdist/dnsdist-tcp-downstream.hh
Merge pull request #11414 from fredmorcos/11325-ecdsa-pem
[thirdparty/pdns.git] / pdns / dnsdistdist / dnsdist-tcp-downstream.hh
1 #pragma once
2
3 #include <boost/multi_index_container.hpp>
4 #include <boost/multi_index/ordered_index.hpp>
5 #include <boost/multi_index/sequenced_index.hpp>
6 #include <boost/multi_index/key_extractors.hpp>
7
8 #include <queue>
9
10 #include "sstuff.hh"
11 #include "tcpiohandler-mplexer.hh"
12 #include "dnsdist.hh"
13 #include "dnsdist-tcp.hh"
14
15 class ConnectionToBackend : public std::enable_shared_from_this<ConnectionToBackend>
16 {
17 public:
18 ConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now): d_connectionStartTime(now), d_lastDataReceivedTime(now), d_ds(ds), d_mplexer(mplexer), d_enableFastOpen(ds->d_config.tcpFastOpen)
19 {
20 reconnect();
21 }
22
23 virtual ~ConnectionToBackend();
24
25 int getHandle() const
26 {
27 if (!d_handler) {
28 throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection");
29 }
30
31 return d_handler->getDescriptor();
32 }
33
34 /* whether the underlying socket has been closed under our feet, basically */
35 bool isUsable() const
36 {
37 if (!d_handler) {
38 return false;
39 }
40
41 return d_handler->isUsable();
42 }
43
44 const std::shared_ptr<DownstreamState>& getDS() const
45 {
46 return d_ds;
47 }
48
49 const ComboAddress& getRemote() const
50 {
51 return d_ds->d_config.remote;
52 }
53
54 const std::string& getBackendName() const
55 {
56 return d_ds->getName();
57 }
58
59 bool isFresh() const
60 {
61 return d_fresh;
62 }
63
64 void setReused()
65 {
66 d_fresh = false;
67 }
68
69 void disableFastOpen()
70 {
71 d_enableFastOpen = false;
72 }
73
74 bool isFastOpenEnabled()
75 {
76 return d_enableFastOpen;
77 }
78
79 /* whether a connection can be used now */
80 bool canBeReused(bool sameClient = false) const
81 {
82 if (d_connectionDied) {
83 return false;
84 }
85
86 /* we can't reuse a connection where a proxy protocol payload has been sent,
87 since:
88 - it cannot be reused for a different client
89 - we might have different TLV values for each query
90 */
91 if (d_ds && d_ds->d_config.useProxyProtocol == true && !sameClient) {
92 return false;
93 }
94
95 if (reachedMaxStreamID()) {
96 return false;
97 }
98
99 if (reachedMaxConcurrentQueries()) {
100 return false;
101 }
102
103 return true;
104 }
105
106 /* full now but will become usable later */
107 bool willBeReusable(bool sameClient) const
108 {
109 if (d_connectionDied || reachedMaxStreamID()) {
110 return false;
111 }
112
113 if (d_ds && d_ds->d_config.useProxyProtocol == true) {
114 return sameClient;
115 }
116
117 return true;
118 }
119
120 virtual bool reachedMaxStreamID() const = 0;
121 virtual bool reachedMaxConcurrentQueries() const = 0;
122 virtual bool isIdle() const = 0;
123 virtual void release() = 0;
124 virtual void stopIO()
125 {
126 }
127
128 bool matches(const std::shared_ptr<DownstreamState>& ds) const
129 {
130 if (!ds || !d_ds) {
131 return false;
132 }
133 return ds == d_ds;
134 }
135
136 virtual void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) = 0;
137 virtual void handleTimeout(const struct timeval& now, bool write) = 0;
138
139 struct timeval getLastDataReceivedTime() const
140 {
141 return d_lastDataReceivedTime;
142 }
143
144 virtual std::string toString() const = 0;
145
146 protected:
147 bool reconnect();
148
149 boost::optional<struct timeval> getBackendHealthCheckTTD(const struct timeval& now) const
150 {
151 if (d_ds == nullptr) {
152 throw std::runtime_error("getBackendReadTTD() without any backend selected");
153 }
154 if (d_ds->d_config.checkTimeout == 0) {
155 return boost::none;
156 }
157
158 struct timeval res = now;
159 res.tv_sec += d_ds->d_config.checkTimeout / 1000; /* ms to s */
160 res.tv_usec += (d_ds->d_config.checkTimeout % 1000) / 1000; /* remaining ms to µs */
161
162 return res;
163 }
164
165 boost::optional<struct timeval> getBackendReadTTD(const struct timeval& now) const
166 {
167 if (d_ds == nullptr) {
168 throw std::runtime_error("getBackendReadTTD() without any backend selected");
169 }
170 if (d_ds->d_config.tcpRecvTimeout == 0) {
171 return boost::none;
172 }
173
174 struct timeval res = now;
175 res.tv_sec += d_ds->d_config.tcpRecvTimeout;
176
177 return res;
178 }
179
180 boost::optional<struct timeval> getBackendWriteTTD(const struct timeval& now) const
181 {
182 if (d_ds == nullptr) {
183 throw std::runtime_error("getBackendWriteTTD() called without any backend selected");
184 }
185 if (d_ds->d_config.tcpSendTimeout == 0) {
186 return boost::none;
187 }
188
189 struct timeval res = now;
190 res.tv_sec += d_ds->d_config.tcpSendTimeout;
191
192 return res;
193 }
194
195 boost::optional<struct timeval> getBackendConnectTTD(const struct timeval& now) const
196 {
197 if (d_ds == nullptr) {
198 throw std::runtime_error("getBackendConnectTTD() called without any backend selected");
199 }
200 if (d_ds->d_config.tcpConnectTimeout == 0) {
201 return boost::none;
202 }
203
204 struct timeval res = now;
205 res.tv_sec += d_ds->d_config.tcpConnectTimeout;
206
207 return res;
208 }
209
210 struct timeval d_connectionStartTime;
211 struct timeval d_lastDataReceivedTime;
212 const std::shared_ptr<DownstreamState> d_ds{nullptr};
213 std::shared_ptr<TCPQuerySender> d_sender{nullptr};
214 std::unique_ptr<FDMultiplexer>& d_mplexer;
215 std::unique_ptr<TCPIOHandler> d_handler{nullptr};
216 std::unique_ptr<IOStateHandler> d_ioState{nullptr};
217 uint64_t d_queries{0};
218 uint32_t d_highestStreamID{0};
219 uint16_t d_downstreamFailures{0};
220 bool d_proxyProtocolPayloadSent{false};
221 bool d_enableFastOpen{false};
222 bool d_connectionDied{false};
223 bool d_fresh{true};
224 };
225
226 class TCPConnectionToBackend : public ConnectionToBackend
227 {
228 public:
229 TCPConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& /* proxyProtocolPayload*, unused but there to match the HTTP2 connections, so we can use the same templated connections manager class */): ConnectionToBackend(ds, mplexer, now), d_responseBuffer(s_maxPacketCacheEntrySize)
230 {
231 }
232
233 virtual ~TCPConnectionToBackend();
234
235 bool isIdle() const override
236 {
237 return d_state == State::idle && d_pendingQueries.size() == 0 && d_pendingResponses.size() == 0;
238 }
239
240 bool reachedMaxStreamID() const override
241 {
242 /* TCP/DoT has only 2^16 usable identifiers, DoH has 2^32 */
243 const uint32_t maximumStreamID = std::numeric_limits<uint16_t>::max() - 1;
244 return d_highestStreamID == maximumStreamID;
245 }
246
247 bool reachedMaxConcurrentQueries() const override
248 {
249 const size_t concurrent = d_pendingQueries.size() + d_pendingResponses.size();
250 if (concurrent > 0 && concurrent >= d_ds->d_config.d_maxInFlightQueriesPerConn) {
251 return true;
252 }
253 return false;
254 }
255 bool matchesTLVs(const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) const;
256
257 void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
258 void handleTimeout(const struct timeval& now, bool write) override;
259 void release() override;
260
261 std::string toString() const override
262 {
263 ostringstream o;
264 o << "TCP connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? d_ioState->getState() : "empty")<<", queries count is "<<d_queries<<", pending queries count is "<<d_pendingQueries.size()<<", "<<d_pendingResponses.size()<<" pending responses";
265 return o.str();
266 }
267
268 void setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent);
269
270 private:
271 /* waitingForResponseFromBackend is a state where we have not yet started reading the size,
272 so we can still switch to sending instead */
273 enum class State : uint8_t { idle, sendingQueryToBackend, waitingForResponseFromBackend, readingResponseSizeFromBackend, readingResponseFromBackend };
274 enum class FailureReason : uint8_t { /* too many attempts */ gaveUp, timeout, unexpectedQueryID };
275
276 static void handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
277 static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
278 static IOState queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn);
279 static IOState sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
280 static bool isXFRFinished(const TCPResponse& response, TCPQuery& query);
281
282 IOState handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now);
283 uint16_t getQueryIdFromResponse() const;
284 void notifyAllQueriesFailed(const struct timeval& now, FailureReason reason);
285 bool needProxyProtocolPayload() const
286 {
287 return !d_proxyProtocolPayloadSent && (d_ds && d_ds->d_config.useProxyProtocol);
288 }
289
290 class PendingRequest
291 {
292 public:
293 std::shared_ptr<TCPQuerySender> d_sender{nullptr};
294 TCPQuery d_query;
295 };
296
297 PacketBuffer d_responseBuffer;
298 std::list<PendingRequest> d_pendingQueries;
299 std::unordered_map<uint16_t, PendingRequest> d_pendingResponses;
300 std::unique_ptr<std::vector<ProxyProtocolValue>> d_proxyProtocolValuesSent{nullptr};
301 PendingRequest d_currentQuery;
302 size_t d_currentPos{0};
303 uint16_t d_responseSize{0};
304 State d_state{State::idle};
305 };
306
307 template <class T> class DownstreamConnectionsManager
308 {
309 struct SequencedTag {};
310 struct OrderedTag {};
311
312 typedef multi_index_container<
313 std::shared_ptr<T>,
314 indexed_by <
315 ordered_unique<tag<OrderedTag>,
316 identity<std::shared_ptr<T>>
317 >,
318 /* new elements are added to the front of the sequence */
319 sequenced<tag<SequencedTag> >
320 >
321 > list_t;
322 struct ConnectionLists
323 {
324 list_t d_actives;
325 list_t d_idles;
326 };
327
328 public:
329 static void setMaxIdleConnectionsPerDownstream(size_t max)
330 {
331 s_maxIdleConnectionsPerDownstream = max;
332 }
333
334 static void setCleanupInterval(uint16_t interval)
335 {
336 s_cleanupInterval = interval;
337 }
338
339 static void setMaxIdleTime(uint16_t max)
340 {
341 s_maxIdleTime = max;
342 }
343
344 std::shared_ptr<T> getConnectionToDownstream(std::unique_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, const struct timeval& now, std::string&& proxyProtocolPayload)
345 {
346 struct timeval freshCutOff = now;
347 freshCutOff.tv_sec -= 1;
348
349 auto backendId = ds->getID();
350
351 cleanupClosedConnections(now);
352
353 const bool haveProxyProtocol = ds->d_config.useProxyProtocol || !proxyProtocolPayload.empty();
354 if (!haveProxyProtocol) {
355 const auto& it = d_downstreamConnections.find(backendId);
356 if (it != d_downstreamConnections.end()) {
357 /* first scan idle connections, more recent first */
358 auto entry = findUsableConnectionInList(now, freshCutOff, it->second.d_idles, true);
359 if (entry) {
360 ++ds->tcpReusedConnections;
361 it->second.d_actives.insert(entry);
362 return entry;
363 }
364
365 /* then scan actives ones, more recent first as well */
366 entry = findUsableConnectionInList(now, freshCutOff, it->second.d_actives, false);
367 if (entry) {
368 ++ds->tcpReusedConnections;
369 return entry;
370 }
371 }
372 }
373
374 auto newConnection = std::make_shared<T>(ds, mplexer, now, std::move(proxyProtocolPayload));
375 if (!haveProxyProtocol) {
376 auto& list = d_downstreamConnections[backendId].d_actives;
377 list.template get<SequencedTag>().push_front(newConnection);
378 }
379
380 return newConnection;
381 }
382
383 void cleanupClosedConnections(const struct timeval& now)
384 {
385 if (s_cleanupInterval == 0 || (d_nextCleanup != 0 && d_nextCleanup > now.tv_sec)) {
386 return;
387 }
388
389 d_nextCleanup = now.tv_sec + s_cleanupInterval;
390
391 struct timeval freshCutOff = now;
392 freshCutOff.tv_sec -= 1;
393 struct timeval idleCutOff = now;
394 idleCutOff.tv_sec -= s_maxIdleTime;
395
396 for (auto dsIt = d_downstreamConnections.begin(); dsIt != d_downstreamConnections.end(); ) {
397 cleanUpList(dsIt->second.d_idles, now, freshCutOff, idleCutOff);
398 cleanUpList(dsIt->second.d_actives, now, freshCutOff, idleCutOff);
399
400 if (dsIt->second.d_idles.empty() && dsIt->second.d_actives.empty()) {
401 dsIt = d_downstreamConnections.erase(dsIt);
402 }
403 else {
404 ++dsIt;
405 }
406 }
407 }
408
409 size_t clear()
410 {
411 size_t count = 0;
412 for (const auto& downstream : d_downstreamConnections) {
413 count += downstream.second.d_actives.size();
414 for (auto& conn : downstream.second.d_actives) {
415 conn->stopIO();
416 }
417 count += downstream.second.d_idles.size();
418 for (auto& conn : downstream.second.d_idles) {
419 conn->stopIO();
420 }
421 }
422
423 d_downstreamConnections.clear();
424 return count;
425 }
426
427 size_t count() const
428 {
429 return getActiveCount() + getIdleCount();
430 }
431
432 size_t getActiveCount() const
433 {
434 size_t count = 0;
435 for (const auto& downstream : d_downstreamConnections) {
436 count += downstream.second.d_actives.size();
437 }
438 return count;
439 }
440
441 size_t getIdleCount() const
442 {
443 size_t count = 0;
444 for (const auto& downstream : d_downstreamConnections) {
445 count += downstream.second.d_idles.size();
446 }
447 return count;
448 }
449
450 bool removeDownstreamConnection(std::shared_ptr<T>& conn)
451 {
452 auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
453 if (backendIt == d_downstreamConnections.end()) {
454 return false;
455 }
456
457 /* idle list first */
458 {
459 auto it = backendIt->second.d_idles.find(conn);
460 if (it != backendIt->second.d_idles.end()) {
461 backendIt->second.d_idles.erase(it);
462 return true;
463 }
464 }
465 /* then active */
466 {
467 auto it = backendIt->second.d_actives.find(conn);
468 if (it != backendIt->second.d_actives.end()) {
469 backendIt->second.d_actives.erase(it);
470 return true;
471 }
472 }
473
474 return false;
475 }
476
477 bool moveToIdle(std::shared_ptr<T>& conn)
478 {
479 auto backendIt = d_downstreamConnections.find(conn->getDS()->getID());
480 if (backendIt == d_downstreamConnections.end()) {
481 return false;
482 }
483
484 auto it = backendIt->second.d_actives.find(conn);
485 if (it == backendIt->second.d_actives.end()) {
486 return false;
487 }
488
489 backendIt->second.d_actives.erase(it);
490
491 if (backendIt->second.d_idles.size() >= s_maxIdleConnectionsPerDownstream) {
492 backendIt->second.d_idles.template get<SequencedTag>().pop_back();
493 }
494
495 backendIt->second.d_idles.template get<SequencedTag>().push_front(conn);
496 return true;
497 }
498
499 protected:
500
501 void cleanUpList(list_t& list, const struct timeval& now, const struct timeval& freshCutOff, const struct timeval& idleCutOff)
502 {
503 auto& sidx = list.template get<SequencedTag>();
504 for (auto connIt = sidx.begin(); connIt != sidx.end(); ) {
505 if (!(*connIt)) {
506 connIt = sidx.erase(connIt);
507 continue;
508 }
509
510 auto& entry = *connIt;
511
512 /* don't bother checking freshly used connections */
513 if (freshCutOff < entry->getLastDataReceivedTime()) {
514 ++connIt;
515 continue;
516 }
517
518 if (entry->isIdle() && entry->getLastDataReceivedTime() < idleCutOff) {
519 /* idle for too long */
520 connIt = sidx.erase(connIt);
521 continue;
522 }
523
524 if (entry->isUsable()) {
525 ++connIt;
526 continue;
527 }
528
529 connIt = sidx.erase(connIt);
530 }
531 }
532
533 std::shared_ptr<T> findUsableConnectionInList(const struct timeval& now, const struct timeval& freshCutOff, list_t& list, bool removeIfFound)
534 {
535 auto& sidx = list.template get<SequencedTag>();
536 for (auto listIt = sidx.begin(); listIt != sidx.end(); ) {
537 if (!(*listIt)) {
538 listIt = sidx.erase(listIt);
539 continue;
540 }
541
542 auto& entry = *listIt;
543 if (isConnectionUsable(entry, now, freshCutOff)) {
544 entry->setReused();
545 // make a copy since the iterator will be invalidated after erasing
546 auto result = entry;
547 if (removeIfFound) {
548 sidx.erase(listIt);
549 }
550 return result;
551 }
552
553 if (entry->willBeReusable(false)) {
554 ++listIt;
555 continue;
556 }
557
558 /* that connection will not be usable later, no need to keep it in that list */
559 listIt = sidx.erase(listIt);
560 }
561
562 return nullptr;
563 }
564
565 bool isConnectionUsable(const std::shared_ptr<T>& conn, const struct timeval& now, const struct timeval& freshCutOff)
566 {
567 if (!conn->canBeReused()) {
568 return false;
569 }
570
571 /* for connections that have not been used very recently,
572 check whether they have been closed in the meantime */
573 if (freshCutOff < conn->getLastDataReceivedTime()) {
574 /* used recently enough, skip the check */
575 return true;
576 }
577
578 return conn->isUsable();
579 }
580
581 static size_t s_maxIdleConnectionsPerDownstream;
582 static uint16_t s_cleanupInterval;
583 static uint16_t s_maxIdleTime;
584
585 std::map<boost::uuids::uuid, ConnectionLists> d_downstreamConnections;
586
587 time_t d_nextCleanup{0};
588 };
589
590 template <class T> size_t DownstreamConnectionsManager<T>::s_maxIdleConnectionsPerDownstream{10};
591 template <class T> uint16_t DownstreamConnectionsManager<T>::s_cleanupInterval{60};
592 template <class T> uint16_t DownstreamConnectionsManager<T>::s_maxIdleTime{300};
593
594 using DownstreamTCPConnectionsManager = DownstreamConnectionsManager<TCPConnectionToBackend>;
595 extern thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;