]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdist-tcp.cc
rec: Only log qname parsing errors when 'log-common-errors' is set
[thirdparty/pdns.git] / pdns / dnsdist-tcp.cc
CommitLineData
8a5d5053 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
8a5d5053 22#include "dnsdist.hh"
ca404e94 23#include "dnsdist-ecs.hh"
03b00917 24#include "dnsdist-rings.hh"
53c57da7 25#include "dnsdist-xpf.hh"
03b00917 26
548c8b66 27#include "dnsparser.hh"
ff73f02b 28#include "ednsoptions.hh"
8a5d5053 29#include "dolog.hh"
0e41337b 30#include "lock.hh"
85c7ca75 31#include "gettime.hh"
a227f47d 32#include "tcpiohandler.hh"
519f5484 33#include "threadname.hh"
8a5d5053 34#include <thread>
35#include <atomic>
12894ed9 36#include <netinet/tcp.h>
8a5d5053 37
d0ae6360
RG
38#include "sstuff.hh"
39
8a5d5053 40using std::thread;
41using std::atomic;
42
a227f47d
RG
43/* TCP: the grand design.
44 We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
45 An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
8a5d5053 46 we will not go there.
47
48 In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
49 This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
50 to guarantee performance.
51
52 So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
53 So whenever an answer comes in, we know where it needs to go.
54
55 Let's start naively.
56*/
57
d0ae6360
RG
58static std::mutex tcpClientsCountMutex;
59static std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan> tcpClientsCount;
acadc544 60static const size_t g_maxCachedConnectionsPerDownstream = 20;
d0ae6360
RG
61uint64_t g_maxTCPQueuedConnections{1000};
62size_t g_maxTCPQueriesPerConn{0};
63size_t g_maxTCPConnectionDuration{0};
64size_t g_maxTCPConnectionsPerClient{0};
cff9aa03 65uint16_t g_downstreamTCPCleanupInterval{60};
d0ae6360 66bool g_useTCPSinglePipe{false};
d0ae6360 67
cff9aa03 68static std::unique_ptr<Socket> setupTCPDownstream(shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures)
b40cffe7 69{
d0ae6360
RG
70 std::unique_ptr<Socket> result;
71
b40cffe7
RG
72 do {
73 vinfolog("TCP connecting to downstream %s (%d)", ds->remote.toStringWithPort(), downstreamFailures);
d0ae6360 74 result = std::unique_ptr<Socket>(new Socket(ds->remote.sin4.sin_family, SOCK_STREAM, 0));
b40cffe7
RG
75 try {
76 if (!IsAnyAddress(ds->sourceAddr)) {
d0ae6360 77 SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
ead60f83 78#ifdef IP_BIND_ADDRESS_NO_PORT
5602f131 79 if (ds->ipBindAddrNoPort) {
d0ae6360 80 SSetsockopt(result->getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
5602f131 81 }
ead60f83 82#endif
d0ae6360 83 result->bind(ds->sourceAddr, false);
b40cffe7 84 }
d0ae6360 85 result->setNonBlocking();
d987f632 86#ifdef MSG_FASTOPEN
284d460c 87 if (!ds->tcpFastOpen) {
cff9aa03 88 SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0);
284d460c 89 }
d987f632 90#else
cff9aa03 91 SConnectWithTimeout(result->getHandle(), ds->remote, /* no timeout, we will handle it ourselves */ 0);
d987f632 92#endif /* MSG_FASTOPEN */
d0ae6360 93 return result;
578fc6e5 94 }
b40cffe7 95 catch(const std::runtime_error& e) {
d0ae6360 96 vinfolog("Connection to downstream server %s failed: %s", ds->getName(), e.what());
b40cffe7 97 downstreamFailures++;
b40cffe7
RG
98 if (downstreamFailures > ds->retries) {
99 throw;
100 }
101 }
102 } while(downstreamFailures <= ds->retries);
103
d0ae6360
RG
104 return nullptr;
105}
106
cff9aa03 107class TCPConnectionToBackend
d0ae6360 108{
cff9aa03
RG
109public:
110 TCPConnectionToBackend(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, const struct timeval& now): d_ds(ds), d_connectionStartTime(now)
111 {
112 d_socket = setupTCPDownstream(d_ds, downstreamFailures);
113 ++d_ds->tcpCurrentConnections;
114 }
d0ae6360 115
cff9aa03
RG
116 ~TCPConnectionToBackend()
117 {
118 if (d_ds && d_socket) {
119 --d_ds->tcpCurrentConnections;
120 struct timeval now;
121 gettimeofday(&now, nullptr);
122
123 auto diff = now - d_connectionStartTime;
124 d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
125 }
126 }
127
128 int getHandle() const
129 {
130 if (!d_socket) {
131 throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection");
132 }
133
134 return d_socket->getHandle();
135 }
136
137 const ComboAddress& getRemote() const
138 {
139 return d_ds->remote;
140 }
141
142 bool isFresh() const
143 {
144 return d_fresh;
145 }
146
147 void incQueries()
148 {
149 ++d_queries;
150 }
151
152 void setReused()
153 {
154 d_fresh = false;
155 }
156
157private:
158 std::unique_ptr<Socket> d_socket{nullptr};
159 std::shared_ptr<DownstreamState> d_ds{nullptr};
160 struct timeval d_connectionStartTime;
161 uint64_t d_queries{0};
162 bool d_fresh{true};
163};
164
165static thread_local map<ComboAddress, std::deque<std::unique_ptr<TCPConnectionToBackend>>> t_downstreamConnections;
166
167static std::unique_ptr<TCPConnectionToBackend> getConnectionToDownstream(std::shared_ptr<DownstreamState>& ds, uint16_t& downstreamFailures, const struct timeval& now)
168{
169 std::unique_ptr<TCPConnectionToBackend> result;
170
171 const auto& it = t_downstreamConnections.find(ds->remote);
172 if (it != t_downstreamConnections.end()) {
d0ae6360
RG
173 auto& list = it->second;
174 if (!list.empty()) {
175 result = std::move(list.front());
176 list.pop_front();
cff9aa03 177 result->setReused();
d0ae6360
RG
178 return result;
179 }
180 }
181
cff9aa03 182 return std::unique_ptr<TCPConnectionToBackend>(new TCPConnectionToBackend(ds, downstreamFailures, now));
d0ae6360
RG
183}
184
cff9aa03 185static void releaseDownstreamConnection(std::unique_ptr<TCPConnectionToBackend>&& conn)
d0ae6360 186{
cff9aa03 187 if (conn == nullptr) {
a6e9e107
RG
188 return;
189 }
190
cff9aa03
RG
191 const auto& remote = conn->getRemote();
192 const auto& it = t_downstreamConnections.find(remote);
193 if (it != t_downstreamConnections.end()) {
d0ae6360 194 auto& list = it->second;
acadc544 195 if (list.size() >= g_maxCachedConnectionsPerDownstream) {
d0ae6360 196 /* too many connections queued already */
cff9aa03 197 conn.reset();
d0ae6360
RG
198 return;
199 }
cff9aa03 200 list.push_back(std::move(conn));
d0ae6360
RG
201 }
202 else {
cff9aa03 203 t_downstreamConnections[remote].push_back(std::move(conn));
d0ae6360 204 }
8a5d5053 205}
206
8a5d5053 207struct ConnectionInfo
208{
cff9aa03 209 ConnectionInfo(ClientState* cs_): cs(cs_), fd(-1)
275bcb5b
RG
210 {
211 }
4d1d0ef6 212 ConnectionInfo(ConnectionInfo&& rhs): remote(rhs.remote), cs(rhs.cs), fd(rhs.fd)
d0ae6360 213 {
d0ae6360 214 rhs.cs = nullptr;
d0ae6360
RG
215 rhs.fd = -1;
216 }
275bcb5b
RG
217
218 ConnectionInfo(const ConnectionInfo& rhs) = delete;
219 ConnectionInfo& operator=(const ConnectionInfo& rhs) = delete;
220
221 ConnectionInfo& operator=(ConnectionInfo&& rhs)
222 {
223 remote = rhs.remote;
224 cs = rhs.cs;
225 rhs.cs = nullptr;
226 fd = rhs.fd;
227 rhs.fd = -1;
228 return *this;
229 }
230
231 ~ConnectionInfo()
232 {
233 if (fd != -1) {
234 close(fd);
235 fd = -1;
236 }
cff9aa03
RG
237 if (cs) {
238 --cs->tcpCurrentConnections;
239 }
275bcb5b
RG
240 }
241
8a5d5053 242 ComboAddress remote;
275bcb5b
RG
243 ClientState* cs{nullptr};
244 int fd{-1};
8a5d5053 245};
246
9b73b71c 247void tcpClientThread(int pipefd);
8a5d5053 248
9396d955
RG
249static void decrementTCPClientCount(const ComboAddress& client)
250{
251 if (g_maxTCPConnectionsPerClient) {
252 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
253 tcpClientsCount[client]--;
254 if (tcpClientsCount[client] == 0) {
255 tcpClientsCount.erase(client);
256 }
257 }
258}
259
8a5d5053 260void TCPClientCollection::addTCPClientThread()
a9bf3ec4 261{
edbda1ad
RG
262 int pipefds[2] = { -1, -1};
263
8a5d5053 264 vinfolog("Adding TCP Client thread");
265
edbda1ad
RG
266 if (d_useSinglePipe) {
267 pipefds[0] = d_singlePipe[0];
268 pipefds[1] = d_singlePipe[1];
adbf75f7 269 }
edbda1ad
RG
270 else {
271 if (pipe(pipefds) < 0) {
272 errlog("Error creating the TCP thread communication pipe: %s", strerror(errno));
273 return;
274 }
8a5d5053 275
3b07fd1b
RG
276 if (!setNonBlocking(pipefds[0])) {
277 close(pipefds[0]);
278 close(pipefds[1]);
279 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
280 return;
281 }
282
edbda1ad
RG
283 if (!setNonBlocking(pipefds[1])) {
284 close(pipefds[0]);
285 close(pipefds[1]);
286 errlog("Error setting the TCP thread communication pipe non-blocking: %s", strerror(errno));
287 return;
288 }
adbf75f7
RG
289 }
290
ded1985a
RG
291 {
292 std::lock_guard<std::mutex> lock(d_mutex);
293
294 if (d_numthreads >= d_tcpclientthreads.capacity()) {
295 warnlog("Adding a new TCP client thread would exceed the vector capacity (%d/%d), skipping", d_numthreads.load(), d_tcpclientthreads.capacity());
edbda1ad
RG
296 if (!d_useSinglePipe) {
297 close(pipefds[0]);
298 close(pipefds[1]);
299 }
ded1985a
RG
300 return;
301 }
302
39386988
RG
303 try {
304 thread t1(tcpClientThread, pipefds[0]);
305 t1.detach();
306 }
307 catch(const std::runtime_error& e) {
308 /* the thread creation failed, don't leak */
309 errlog("Error creating a TCP thread: %s", e.what());
edbda1ad
RG
310 if (!d_useSinglePipe) {
311 close(pipefds[0]);
312 close(pipefds[1]);
313 }
39386988
RG
314 return;
315 }
316
ded1985a
RG
317 d_tcpclientthreads.push_back(pipefds[1]);
318 }
319
6c1ca990 320 ++d_numthreads;
8a5d5053 321}
322
d0ae6360 323static void cleanupClosedTCPConnections()
3f6d07a4 324{
cff9aa03
RG
325 for(auto dsIt = t_downstreamConnections.begin(); dsIt != t_downstreamConnections.end(); ) {
326 for (auto connIt = dsIt->second.begin(); connIt != dsIt->second.end(); ) {
327 if (*connIt && isTCPSocketUsable((*connIt)->getHandle())) {
328 ++connIt;
d0ae6360
RG
329 }
330 else {
cff9aa03 331 connIt = dsIt->second.erase(connIt);
d0ae6360
RG
332 }
333 }
3f6d07a4 334
d0ae6360
RG
335 if (!dsIt->second.empty()) {
336 ++dsIt;
337 }
338 else {
cff9aa03 339 dsIt = t_downstreamConnections.erase(dsIt);
d0ae6360
RG
340 }
341 }
fcffc585
RG
342}
343
d0ae6360
RG
344/* Tries to read exactly toRead bytes into the buffer, starting at position pos.
345 Updates pos everytime a successful read occurs,
346 throws an std::runtime_error in case of IO error,
347 return Done when toRead bytes have been read, needRead or needWrite if the IO operation
348 would block.
349*/
350// XXX could probably be implemented as a TCPIOHandler
351IOState tryRead(int fd, std::vector<uint8_t>& buffer, size_t& pos, size_t toRead)
9396d955 352{
acadc544
RG
353 if (buffer.size() < (pos + toRead)) {
354 throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos));
355 }
356
d0ae6360
RG
357 size_t got = 0;
358 do {
359 ssize_t res = ::read(fd, reinterpret_cast<char*>(&buffer.at(pos)), toRead - got);
360 if (res == 0) {
361 throw runtime_error("EOF while reading message");
0b75a272 362 }
d0ae6360
RG
363 if (res < 0) {
364 if (errno == EAGAIN || errno == EWOULDBLOCK) {
365 return IOState::NeedRead;
366 }
367 else {
368 throw std::runtime_error(std::string("Error while reading message: ") + strerror(errno));
369 }
9396d955 370 }
d0ae6360
RG
371
372 pos += static_cast<size_t>(res);
373 got += static_cast<size_t>(res);
9396d955 374 }
d0ae6360
RG
375 while (got < toRead);
376
377 return IOState::Done;
9396d955
RG
378}
379
1f7646c2 380std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
d0ae6360
RG
381
382class TCPClientThreadData
383{
384public:
385 TCPClientThreadData(): localRespRulactions(g_resprulactions.getLocal()), mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent()))
386 {
387 }
388
389 LocalHolders holders;
390 LocalStateHolder<vector<DNSDistResponseRuleAction> > localRespRulactions;
391 std::unique_ptr<FDMultiplexer> mplexer{nullptr};
392};
393
394static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param);
395
396class IncomingTCPConnectionState
840ed663 397{
d0ae6360 398public:
77c3701d 399 IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(4096), d_responseBuffer(4096), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, g_tcpRecvTimeout, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_connectionStartTime(now)
d0ae6360
RG
400 {
401 d_ids.origDest.reset();
402 d_ids.origDest.sin4.sin_family = d_ci.remote.sin4.sin_family;
403 socklen_t socklen = d_ids.origDest.getSocklen();
404 if (getsockname(d_ci.fd, reinterpret_cast<sockaddr*>(&d_ids.origDest), &socklen)) {
405 d_ids.origDest = d_ci.cs->local;
406 }
407 }
408
409 IncomingTCPConnectionState(const IncomingTCPConnectionState& rhs) = delete;
410 IncomingTCPConnectionState& operator=(const IncomingTCPConnectionState& rhs) = delete;
411
412 ~IncomingTCPConnectionState()
413 {
414 decrementTCPClientCount(d_ci.remote);
cff9aa03
RG
415 if (d_ci.cs != nullptr) {
416 struct timeval now;
417 gettimeofday(&now, nullptr);
418
419 auto diff = now - d_connectionStartTime;
420 d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0);
421 }
d0ae6360
RG
422
423 if (d_ds != nullptr) {
424 if (d_outstanding) {
425 --d_ds->outstanding;
bf20f487 426 d_outstanding = false;
d0ae6360
RG
427 }
428
cff9aa03 429 if (d_downstreamConnection) {
d0ae6360
RG
430 try {
431 if (d_lastIOState == IOState::NeedRead) {
cff9aa03
RG
432 cerr<<__func__<<": removing leftover backend read FD "<<d_downstreamConnection->getHandle()<<endl;
433 d_threadData.mplexer->removeReadFD(d_downstreamConnection->getHandle());
d0ae6360
RG
434 }
435 else if (d_lastIOState == IOState::NeedWrite) {
cff9aa03
RG
436 cerr<<__func__<<": removing leftover backend write FD "<<d_downstreamConnection->getHandle()<<endl;
437 d_threadData.mplexer->removeWriteFD(d_downstreamConnection->getHandle());
d0ae6360
RG
438 }
439 }
440 catch(const FDMultiplexerException& e) {
441 vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds->getName(), e.what());
442 }
33d01bbd
RG
443 catch(const std::runtime_error& e) {
444 /* might be thrown by getHandle() */
445 vinfolog("Got an exception when trying to remove a pending IO operation on the socket to the %s backend: %s", d_ds->getName(), e.what());
446 }
d0ae6360
RG
447 }
448 }
449
450 try {
451 if (d_lastIOState == IOState::NeedRead) {
452 cerr<<__func__<<": removing leftover client read FD "<<d_ci.fd<<endl;
453 d_threadData.mplexer->removeReadFD(d_ci.fd);
454 }
455 else if (d_lastIOState == IOState::NeedWrite) {
456 cerr<<__func__<<": removing leftover client write FD "<<d_ci.fd<<endl;
457 d_threadData.mplexer->removeWriteFD(d_ci.fd);
458 }
459 }
460 catch(const FDMultiplexerException& e) {
461 vinfolog("Got an exception when trying to remove a pending IO operation on an incoming TCP connection from %s: %s", d_ci.remote.toStringWithPort(), e.what());
462 }
463 }
464
465 void resetForNewQuery()
466 {
467 d_buffer.resize(sizeof(uint16_t));
468 d_currentPos = 0;
469 d_querySize = 0;
470 d_responseSize = 0;
471 d_downstreamFailures = 0;
472 d_state = State::readingQuerySize;
473 d_lastIOState = IOState::Done;
474 }
475
476 boost::optional<struct timeval> getClientReadTTD(struct timeval now) const
477 {
478 if (g_maxTCPConnectionDuration == 0 && g_tcpRecvTimeout == 0) {
479 return boost::none;
480 }
481
482 if (g_maxTCPConnectionDuration > 0) {
77c3701d 483 auto elapsed = now.tv_sec - d_connectionStartTime.tv_sec;
d0ae6360
RG
484 if (elapsed < 0 || (static_cast<size_t>(elapsed) >= g_maxTCPConnectionDuration)) {
485 return now;
486 }
487 auto remaining = g_maxTCPConnectionDuration - elapsed;
488 if (g_tcpRecvTimeout == 0 || remaining <= static_cast<size_t>(g_tcpRecvTimeout)) {
489 now.tv_sec += remaining;
490 return now;
491 }
492 }
493
494 now.tv_sec += g_tcpRecvTimeout;
495 return now;
496 }
497
cff9aa03 498 boost::optional<struct timeval> getBackendReadTTD(const struct timeval& now) const
d0ae6360
RG
499 {
500 if (d_ds == nullptr) {
501 throw std::runtime_error("getBackendReadTTD() without any backend selected");
502 }
503 if (d_ds->tcpRecvTimeout == 0) {
504 return boost::none;
505 }
506
cff9aa03 507 struct timeval res = now;
d0ae6360
RG
508 res.tv_sec += d_ds->tcpRecvTimeout;
509
510 return res;
511 }
512
cff9aa03 513 boost::optional<struct timeval> getClientWriteTTD(const struct timeval& now) const
d0ae6360
RG
514 {
515 if (g_maxTCPConnectionDuration == 0 && g_tcpSendTimeout == 0) {
516 return boost::none;
517 }
518
cff9aa03 519 struct timeval res = now;
d0ae6360
RG
520
521 if (g_maxTCPConnectionDuration > 0) {
77c3701d 522 auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec;
d0ae6360
RG
523 if (elapsed < 0 || static_cast<size_t>(elapsed) >= g_maxTCPConnectionDuration) {
524 return res;
525 }
526 auto remaining = g_maxTCPConnectionDuration - elapsed;
527 if (g_tcpSendTimeout == 0 || remaining <= static_cast<size_t>(g_tcpSendTimeout)) {
528 res.tv_sec += remaining;
529 return res;
530 }
531 }
532
533 res.tv_sec += g_tcpSendTimeout;
534 return res;
840ed663 535 }
d0ae6360 536
cff9aa03 537 boost::optional<struct timeval> getBackendWriteTTD(const struct timeval& now) const
d0ae6360
RG
538 {
539 if (d_ds == nullptr) {
540 throw std::runtime_error("getBackendReadTTD() called without any backend selected");
541 }
542 if (d_ds->tcpSendTimeout == 0) {
543 return boost::none;
544 }
545
cff9aa03 546 struct timeval res = now;
d0ae6360
RG
547 res.tv_sec += d_ds->tcpSendTimeout;
548
549 return res;
550 }
551
acadc544 552 bool maxConnectionDurationReached(unsigned int maxConnectionDuration, const struct timeval& now)
d0ae6360
RG
553 {
554 if (maxConnectionDuration) {
555 time_t curtime = now.tv_sec;
556 unsigned int elapsed = 0;
77c3701d
RG
557 if (curtime > d_connectionStartTime.tv_sec) { // To prevent issues when time goes backward
558 elapsed = curtime - d_connectionStartTime.tv_sec;
d0ae6360
RG
559 }
560 if (elapsed >= maxConnectionDuration) {
561 return true;
562 }
563 d_remainingTime = maxConnectionDuration - elapsed;
564 }
565
566 return false;
567 }
568
cff9aa03
RG
569 void dump() const
570 {
571 static std::mutex s_mutex;
572
573 struct timeval now;
574 gettimeofday(&now, 0);
575
576 {
577 std::lock_guard<std::mutex> lock(s_mutex);
578 fprintf(stderr, "State is %p\n", this);
579 cerr << "Current state is " << static_cast<int>(d_state) << ", got "<<d_queriesCount<<" queries so far" << endl;
580 cerr << "Current time is " << now.tv_sec << " - " << now.tv_usec << endl;
581 cerr << "Connection started at " << d_connectionStartTime.tv_sec << " - " << d_connectionStartTime.tv_usec << endl;
582 if (d_state > State::doingHandshake) {
583 cerr << "Handshake done at " << d_handshakeDoneTime.tv_sec << " - " << d_handshakeDoneTime.tv_usec << endl;
584 }
585 if (d_state > State::readingQuerySize) {
586 cerr << "Got first query size at " << d_firstQuerySizeReadTime.tv_sec << " - " << d_firstQuerySizeReadTime.tv_usec << endl;
587 }
588 if (d_state > State::readingQuerySize) {
589 cerr << "Got query size at " << d_querySizeReadTime.tv_sec << " - " << d_querySizeReadTime.tv_usec << endl;
590 }
591 if (d_state > State::readingQuery) {
592 cerr << "Got query at " << d_queryReadTime.tv_sec << " - " << d_queryReadTime.tv_usec << endl;
593 }
594 if (d_state > State::sendingQueryToBackend) {
595 cerr << "Sent query at " << d_querySentTime.tv_sec << " - " << d_querySentTime.tv_usec << endl;
596 }
597 if (d_state > State::readingResponseFromBackend) {
598 cerr << "Got response at " << d_responseReadTime.tv_sec << " - " << d_responseReadTime.tv_usec << endl;
599 }
600 }
601 }
602
d0ae6360
RG
603 enum class State { doingHandshake, readingQuerySize, readingQuery, sendingQueryToBackend, readingResponseSizeFromBackend, readingResponseFromBackend, sendingResponse };
604
605 std::vector<uint8_t> d_buffer;
606 std::vector<uint8_t> d_responseBuffer;
607 TCPClientThreadData& d_threadData;
608 IDState d_ids;
609 ConnectionInfo d_ci;
610 TCPIOHandler d_handler;
cff9aa03 611 std::unique_ptr<TCPConnectionToBackend> d_downstreamConnection{nullptr};
d0ae6360 612 std::shared_ptr<DownstreamState> d_ds{nullptr};
77c3701d 613 struct timeval d_connectionStartTime;
cff9aa03
RG
614 struct timeval d_handshakeDoneTime;
615 struct timeval d_firstQuerySizeReadTime;
616 struct timeval d_querySizeReadTime;
617 struct timeval d_queryReadTime;
618 struct timeval d_querySentTime;
619 struct timeval d_responseReadTime;
d0ae6360
RG
620 size_t d_currentPos{0};
621 size_t d_queriesCount{0};
d0ae6360
RG
622 unsigned int d_remainingTime{0};
623 uint16_t d_querySize{0};
624 uint16_t d_responseSize{0};
625 uint16_t d_downstreamFailures{0};
626 State d_state{State::doingHandshake};
627 IOState d_lastIOState{IOState::Done};
d0ae6360
RG
628 bool d_readingFirstQuery{true};
629 bool d_outstanding{false};
630 bool d_firstResponsePacket{true};
631 bool d_isXFR{false};
632 bool d_xfrStarted{false};
633};
634
635static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param);
636static void handleNewIOState(std::shared_ptr<IncomingTCPConnectionState>& state, IOState iostate, const int fd, FDMultiplexer::callbackfunc_t callback, boost::optional<struct timeval> ttd=boost::none);
77c3701d 637static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now);
1e26e48b 638static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now);
d0ae6360 639
1e26e48b 640static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
d0ae6360
RG
641{
642 handleNewIOState(state, IOState::Done, state->d_ci.fd, handleIOCallback);
643
cff9aa03 644 if (state->d_isXFR && state->d_downstreamConnection) {
d0ae6360
RG
645 /* we need to resume reading from the backend! */
646 state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
647 state->d_currentPos = 0;
1e26e48b 648 handleDownstreamIO(state, now);
d0ae6360
RG
649 return;
650 }
651
652 if (g_maxTCPQueriesPerConn && state->d_queriesCount > g_maxTCPQueriesPerConn) {
653 vinfolog("Terminating TCP connection from %s because it reached the maximum number of queries per conn (%d / %d)", state->d_ci.remote.toStringWithPort(), state->d_queriesCount, g_maxTCPQueriesPerConn);
654 return;
655 }
656
d0ae6360
RG
657 if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
658 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
659 return;
660 }
661
662 state->resetForNewQuery();
77c3701d
RG
663
664 handleIO(state, now);
840ed663
RG
665}
666
1e26e48b 667static void sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
d0ae6360
RG
668{
669 state->d_state = IncomingTCPConnectionState::State::sendingResponse;
670 const uint8_t sizeBytes[] = { static_cast<uint8_t>(state->d_responseSize / 256), static_cast<uint8_t>(state->d_responseSize % 256) };
671 /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
672 that could occur if we had to deal with the size during the processing,
673 especially alignment issues */
674 state->d_responseBuffer.insert(state->d_responseBuffer.begin(), sizeBytes, sizeBytes + 2);
8a5d5053 675
d0ae6360
RG
676 state->d_currentPos = 0;
677
1e26e48b 678 handleIO(state, now);
d0ae6360
RG
679}
680
1e26e48b 681static void handleResponse(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
8a5d5053 682{
d0ae6360
RG
683 if (state->d_responseSize < sizeof(dnsheader)) {
684 return;
685 }
a227f47d 686
d0ae6360
RG
687 auto response = reinterpret_cast<char*>(&state->d_responseBuffer.at(0));
688 unsigned int consumed;
689 if (state->d_firstResponsePacket && !responseContentMatches(response, state->d_responseSize, state->d_ids.qname, state->d_ids.qtype, state->d_ids.qclass, state->d_ds->remote, consumed)) {
690 return;
691 }
692 state->d_firstResponsePacket = false;
693
694 if (state->d_outstanding) {
695 --state->d_ds->outstanding;
696 state->d_outstanding = false;
697 }
698
699 auto dh = reinterpret_cast<struct dnsheader*>(response);
700 uint16_t addRoom = 0;
701 DNSResponse dr = makeDNSResponseFromIDState(state->d_ids, dh, state->d_responseBuffer.size(), state->d_responseSize, true);
702 if (dr.dnsCryptQuery) {
703 addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
704 }
77c9bc9a 705
3e425868 706 dnsheader cleartextDH;
d0ae6360 707 memcpy(&cleartextDH, dr.dh, sizeof(cleartextDH));
d11c4232 708
d0ae6360
RG
709 std::vector<uint8_t> rewrittenResponse;
710 size_t responseSize = state->d_responseBuffer.size();
711 if (!processResponse(&response, &state->d_responseSize, &responseSize, state->d_threadData.localRespRulactions, dr, addRoom, rewrittenResponse, false)) {
712 return;
713 }
8a5d5053 714
d0ae6360
RG
715 if (!rewrittenResponse.empty()) {
716 /* responseSize has been updated as well but we don't really care since it will match
717 the capacity of rewrittenResponse anyway */
718 state->d_responseBuffer = std::move(rewrittenResponse);
719 state->d_responseSize = state->d_responseBuffer.size();
720 } else {
721 /* the size might have been updated (shrinked) if we removed the whole OPT RR, for example) */
722 state->d_responseBuffer.resize(state->d_responseSize);
723 }
724
725 if (state->d_isXFR && !state->d_xfrStarted) {
726 /* don't bother parsing the content of the response for now */
727 state->d_xfrStarted = true;
728 }
729
1e26e48b 730 sendResponse(state, now);
18861f97 731
d0ae6360
RG
732 ++g_stats.responses;
733 struct timespec answertime;
734 gettime(&answertime);
735 double udiff = state->d_ids.sentTime.udiff();
736 g_rings.insertResponse(answertime, state->d_ci.remote, *dr.qname, dr.qtype, static_cast<unsigned int>(udiff), static_cast<unsigned int>(state->d_responseBuffer.size()), cleartextDH, state->d_ds->remote);
737}
6ce2da14 738
1e26e48b 739static void sendQueryToBackend(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
d0ae6360
RG
740{
741 auto ds = state->d_ds;
742 state->d_state = IncomingTCPConnectionState::State::sendingQueryToBackend;
743 state->d_currentPos = 0;
744 state->d_firstResponsePacket = true;
cff9aa03 745 state->d_downstreamConnection.reset();
d0ae6360
RG
746
747 if (state->d_xfrStarted) {
748 /* sorry, but we are not going to resume a XFR if we have already sent some packets
749 to the client */
750 return;
751 }
3f6d07a4 752
d0ae6360
RG
753 while (state->d_downstreamFailures < state->d_ds->retries)
754 {
cff9aa03 755 state->d_downstreamConnection = getConnectionToDownstream(ds, state->d_downstreamFailures, now);
d0ae6360 756
cff9aa03 757 if (!state->d_downstreamConnection) {
a6e9e107
RG
758 ++ds->tcpGaveUp;
759 ++state->d_ci.cs->tcpGaveUp;
d0ae6360
RG
760 vinfolog("Downstream connection to %s failed %d times in a row, giving up.", ds->getName(), state->d_downstreamFailures);
761 return;
7cea4e39
RG
762 }
763
1e26e48b 764 handleDownstreamIO(state, now);
d0ae6360
RG
765 return;
766 }
a227f47d 767
a6e9e107
RG
768 ++ds->tcpGaveUp;
769 ++state->d_ci.cs->tcpGaveUp;
d0ae6360
RG
770 vinfolog("Downstream connection to %s failed %u times in a row, giving up.", ds->getName(), state->d_downstreamFailures);
771}
c594a2f4 772
1e26e48b 773static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
d0ae6360
RG
774{
775 if (state->d_querySize < sizeof(dnsheader)) {
776 ++g_stats.nonCompliantQueries;
777 return;
778 }
be35c484 779
d0ae6360
RG
780 state->d_readingFirstQuery = false;
781 ++state->d_queriesCount;
782 ++state->d_ci.cs->queries;
783 ++g_stats.queries;
784
785 /* we need an accurate ("real") value for the response and
786 to store into the IDS, but not for insertion into the
787 rings for example */
d0ae6360 788 struct timespec queryRealTime;
d0ae6360
RG
789 gettime(&queryRealTime, true);
790
791 auto query = reinterpret_cast<char*>(&state->d_buffer.at(0));
792 std::shared_ptr<DNSCryptQuery> dnsCryptQuery{nullptr};
793 auto dnsCryptResponse = checkDNSCryptQuery(*state->d_ci.cs, query, state->d_querySize, dnsCryptQuery, queryRealTime.tv_sec, true);
794 if (dnsCryptResponse) {
795 state->d_responseBuffer = std::move(*dnsCryptResponse);
796 state->d_responseSize = state->d_responseBuffer.size();
1e26e48b 797 sendResponse(state, now);
d0ae6360
RG
798 return;
799 }
0beaa5c8 800
d0ae6360
RG
801 const auto& dh = reinterpret_cast<dnsheader*>(query);
802 if (!checkQueryHeaders(dh)) {
803 return;
804 }
0beaa5c8 805
d0ae6360
RG
806 uint16_t qtype, qclass;
807 unsigned int consumed = 0;
808 DNSName qname(query, state->d_querySize, sizeof(dnsheader), false, &qtype, &qclass, &consumed);
809 DNSQuestion dq(&qname, qtype, qclass, consumed, &state->d_ids.origDest, &state->d_ci.remote, reinterpret_cast<dnsheader*>(query), state->d_buffer.size(), state->d_querySize, true, &queryRealTime);
810 dq.dnsCryptQuery = std::move(dnsCryptQuery);
046bac5c 811 dq.sni = state->d_handler.getServerNameIndication();
e91084ce 812
d0ae6360
RG
813 state->d_isXFR = (dq.qtype == QType::AXFR || dq.qtype == QType::IXFR);
814 if (state->d_isXFR) {
815 dq.skipCache = true;
816 }
9396d955 817
d0ae6360
RG
818 state->d_ds.reset();
819 auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, state->d_ds);
9396d955 820
d0ae6360
RG
821 if (result == ProcessQueryResult::Drop) {
822 return;
823 }
2efd427d 824
d0ae6360
RG
825 if (result == ProcessQueryResult::SendAnswer) {
826 state->d_buffer.resize(dq.len);
827 state->d_responseBuffer = std::move(state->d_buffer);
828 state->d_responseSize = state->d_responseBuffer.size();
1e26e48b 829 sendResponse(state, now);
d0ae6360
RG
830 return;
831 }
2efd427d 832
d0ae6360
RG
833 if (result != ProcessQueryResult::PassToBackend || state->d_ds == nullptr) {
834 return;
835 }
5ffb2f83 836
d0ae6360
RG
837 state->d_buffer.resize(dq.len);
838 setIDStateFromDNSQuestion(state->d_ids, dq, std::move(qname));
2d4783a8 839
d0ae6360
RG
840 const uint8_t sizeBytes[] = { static_cast<uint8_t>(dq.len / 256), static_cast<uint8_t>(dq.len % 256) };
841 /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
842 that could occur if we had to deal with the size during the processing,
843 especially alignment issues */
844 state->d_buffer.insert(state->d_buffer.begin(), sizeBytes, sizeBytes + 2);
1e26e48b 845 sendQueryToBackend(state, now);
d0ae6360 846}
3e425868 847
d0ae6360
RG
848static void handleNewIOState(std::shared_ptr<IncomingTCPConnectionState>& state, IOState iostate, const int fd, FDMultiplexer::callbackfunc_t callback, boost::optional<struct timeval> ttd)
849{
850 //cerr<<"in "<<__func__<<" for fd "<<fd<<", last state was "<<(int)state->d_lastIOState<<", new state is "<<(int)iostate<<endl;
d11c4232 851
d0ae6360
RG
852 if (state->d_lastIOState == IOState::NeedRead && iostate != IOState::NeedRead) {
853 state->d_threadData.mplexer->removeReadFD(fd);
854 //cerr<<__func__<<": remove read FD "<<fd<<endl;
855 state->d_lastIOState = IOState::Done;
856 }
857 else if (state->d_lastIOState == IOState::NeedWrite && iostate != IOState::NeedWrite) {
858 state->d_threadData.mplexer->removeWriteFD(fd);
859 //cerr<<__func__<<": remove write FD "<<fd<<endl;
860 state->d_lastIOState = IOState::Done;
861 }
5cc8371b 862
d0ae6360
RG
863 if (iostate == IOState::NeedRead) {
864 if (state->d_lastIOState == IOState::NeedRead) {
865 if (ttd) {
866 /* let's update the TTD ! */
867 state->d_threadData.mplexer->setReadTTD(fd, *ttd, /* we pass 0 here because we already have a TTD */0);
868 }
869 return;
870 }
6ce2da14 871
d0ae6360
RG
872 state->d_lastIOState = IOState::NeedRead;
873 //cerr<<__func__<<": add read FD "<<fd<<endl;
874 state->d_threadData.mplexer->addReadFD(fd, callback, state, ttd ? &*ttd : nullptr);
875 }
876 else if (iostate == IOState::NeedWrite) {
877 if (state->d_lastIOState == IOState::NeedWrite) {
878 return;
879 }
8a5d5053 880
d0ae6360
RG
881 state->d_lastIOState = IOState::NeedWrite;
882 //cerr<<__func__<<": add write FD "<<fd<<endl;
883 state->d_threadData.mplexer->addWriteFD(fd, callback, state, ttd ? &*ttd : nullptr);
884 }
885 else if (iostate == IOState::Done) {
886 state->d_lastIOState = IOState::Done;
887 }
888}
3f6d07a4 889
1e26e48b 890static void handleDownstreamIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
d0ae6360 891{
cff9aa03 892 if (state->d_downstreamConnection == nullptr) {
d0ae6360
RG
893 throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
894 }
3f6d07a4 895
cff9aa03 896 int fd = state->d_downstreamConnection->getHandle();
d0ae6360
RG
897 IOState iostate = IOState::Done;
898 bool connectionDied = false;
8b92100f 899
d0ae6360
RG
900 try {
901 if (state->d_state == IncomingTCPConnectionState::State::sendingQueryToBackend) {
902 int socketFlags = 0;
87706a76 903#ifdef MSG_FASTOPEN
cff9aa03 904 if (state->d_ds->tcpFastOpen && state->d_downstreamConnection->isFresh()) {
d0ae6360
RG
905 socketFlags |= MSG_FASTOPEN;
906 }
87706a76 907#endif /* MSG_FASTOPEN */
8a5d5053 908
6714b6ac 909 size_t sent = sendMsgWithOptions(fd, reinterpret_cast<const char *>(&state->d_buffer.at(state->d_currentPos)), state->d_buffer.size() - state->d_currentPos, &state->d_ds->remote, &state->d_ds->sourceAddr, state->d_ds->sourceItf, socketFlags);
d0ae6360
RG
910 if (sent == state->d_buffer.size()) {
911 /* request sent ! */
cff9aa03 912 state->d_downstreamConnection->incQueries();
d0ae6360
RG
913 state->d_state = IncomingTCPConnectionState::State::readingResponseSizeFromBackend;
914 state->d_currentPos = 0;
cff9aa03 915 state->d_querySentTime = now;
d0ae6360
RG
916 iostate = IOState::NeedRead;
917 if (!state->d_isXFR) {
918 /* don't bother with the outstanding count for XFR queries */
919 ++state->d_ds->outstanding;
920 state->d_outstanding = true;
11e1e08b 921 }
d0ae6360
RG
922 }
923 else {
924 state->d_currentPos += sent;
925 iostate = IOState::NeedWrite;
926 /* disable fast open on partial write */
cff9aa03 927 state->d_downstreamConnection->setReused();
d0ae6360
RG
928 }
929 }
7129b5c4 930
d0ae6360
RG
931 if (state->d_state == IncomingTCPConnectionState::State::readingResponseSizeFromBackend) {
932 // then we need to allocate a new buffer (new because we might need to re-send the query if the
933 // backend dies on us
934 // We also might need to read and send to the client more than one response in case of XFR (yeah!)
935 // should very likely be a TCPIOHandler d_downstreamHandler
936 iostate = tryRead(fd, state->d_responseBuffer, state->d_currentPos, sizeof(uint16_t) - state->d_currentPos);
937 if (iostate == IOState::Done) {
938 state->d_state = IncomingTCPConnectionState::State::readingResponseFromBackend;
939 state->d_responseSize = state->d_responseBuffer.at(0) * 256 + state->d_responseBuffer.at(1);
940 state->d_responseBuffer.resize((state->d_ids.dnsCryptQuery && (UINT16_MAX - state->d_responseSize) > static_cast<uint16_t>(DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE)) ? state->d_responseSize + DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE : state->d_responseSize);
941 state->d_currentPos = 0;
942 }
943 }
ca404e94 944
d0ae6360
RG
945 if (state->d_state == IncomingTCPConnectionState::State::readingResponseFromBackend) {
946 iostate = tryRead(fd, state->d_responseBuffer, state->d_currentPos, state->d_responseSize - state->d_currentPos);
947 if (iostate == IOState::Done) {
948 handleNewIOState(state, IOState::Done, fd, handleDownstreamIOCallback);
f87c4aff 949
d0ae6360
RG
950 if (state->d_isXFR) {
951 /* Don't reuse the TCP connection after an {A,I}XFR */
952 /* but don't reset it either, we will need to read more messages */
f87c4aff 953 }
d0ae6360 954 else {
cff9aa03 955 releaseDownstreamConnection(std::move(state->d_downstreamConnection));
d8c19b98 956 }
d0ae6360 957 fd = -1;
d8c19b98 958
cff9aa03 959 state->d_responseReadTime = now;
1e26e48b 960 handleResponse(state, now);
d0ae6360
RG
961 return;
962 }
963 }
3e425868 964
d0ae6360
RG
965 if (state->d_state != IncomingTCPConnectionState::State::sendingQueryToBackend &&
966 state->d_state != IncomingTCPConnectionState::State::readingResponseSizeFromBackend &&
967 state->d_state != IncomingTCPConnectionState::State::readingResponseFromBackend) {
968 vinfolog("Unexpected state %d in handleDownstreamIOCallback", static_cast<int>(state->d_state));
969 }
970 }
971 catch(const std::exception& e) {
972 /* most likely an EOF because the other end closed the connection,
973 but it might also be a real IO error or something else.
974 Let's just drop the connection
975 */
976 vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (state->d_lastIOState == IOState::NeedRead ? "reading from" : "writing to"), state->d_ci.remote.toStringWithPort(), e.what());
a6e9e107
RG
977 if (state->d_state == IncomingTCPConnectionState::State::sendingQueryToBackend) {
978 ++state->d_ds->tcpDiedSendingQuery;
979 }
980 else {
981 ++state->d_ds->tcpDiedReadingResponse;
982 }
983
c53fb6b2 984 /* don't increase this counter when reusing connections */
cff9aa03 985 if (state->d_downstreamConnection->isFresh()) {
c53fb6b2
RG
986 ++state->d_downstreamFailures;
987 }
d0ae6360
RG
988 if (state->d_outstanding && state->d_ds != nullptr) {
989 --state->d_ds->outstanding;
bf20f487 990 state->d_outstanding = false;
d0ae6360 991 }
c53fb6b2 992 /* remove this FD from the IO multiplexer */
d0ae6360
RG
993 iostate = IOState::Done;
994 connectionDied = true;
995 }
19d34e95 996
d0ae6360
RG
997 if (iostate == IOState::Done) {
998 handleNewIOState(state, iostate, fd, handleDownstreamIOCallback);
999 }
1000 else {
cff9aa03 1001 handleNewIOState(state, iostate, fd, handleDownstreamIOCallback, iostate == IOState::NeedRead ? state->getBackendReadTTD(now) : state->getBackendWriteTTD(now));
d0ae6360
RG
1002 }
1003
1004 if (connectionDied) {
1e26e48b
RG
1005 sendQueryToBackend(state, now);
1006 }
1007}
1008
1009static void handleDownstreamIOCallback(int fd, FDMultiplexer::funcparam_t& param)
1010{
1011 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
cff9aa03 1012 if (state->d_downstreamConnection == nullptr) {
1e26e48b
RG
1013 throw std::runtime_error("No downstream socket in " + std::string(__func__) + "!");
1014 }
cff9aa03
RG
1015 if (fd != state->d_downstreamConnection->getHandle()) {
1016 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_downstreamConnection->getHandle()));
d0ae6360 1017 }
1e26e48b
RG
1018
1019 struct timeval now;
1020 gettimeofday(&now, 0);
1021 handleDownstreamIO(state, now);
d0ae6360
RG
1022}
1023
77c3701d 1024static void handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, struct timeval& now)
d0ae6360 1025{
77c3701d 1026 int fd = state->d_ci.fd;
d0ae6360
RG
1027 IOState iostate = IOState::Done;
1028
d0ae6360
RG
1029 if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
1030 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
1031 handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1032 return;
1033 }
548c8b66 1034
d0ae6360
RG
1035 try {
1036 if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
1037 iostate = state->d_handler.tryHandshake();
1038 if (iostate == IOState::Done) {
cff9aa03 1039 state->d_handshakeDoneTime = now;
d0ae6360
RG
1040 state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
1041 }
1042 }
1043
1044 if (state->d_state == IncomingTCPConnectionState::State::readingQuerySize) {
1045 iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t) - state->d_currentPos);
1046 if (iostate == IOState::Done) {
1047 state->d_state = IncomingTCPConnectionState::State::readingQuery;
cff9aa03
RG
1048 state->d_querySizeReadTime = now;
1049 if (state->d_queriesCount == 0) {
1050 state->d_firstQuerySizeReadTime = now;
1051 }
d0ae6360
RG
1052 state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1);
1053 if (state->d_querySize < sizeof(dnsheader)) {
1054 /* go away */
1055 handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1056 return;
61d10a4d 1057 }
3fcaeeac 1058
d0ae6360
RG
1059 /* allocate a bit more memory to be able to spoof the content,
1060 or to add ECS without allocating a new buffer */
1061 state->d_buffer.resize(state->d_querySize + 512);
1062 state->d_currentPos = 0;
8a5d5053 1063 }
1064 }
d0ae6360
RG
1065
1066 if (state->d_state == IncomingTCPConnectionState::State::readingQuery) {
1067 iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize);
1068 if (iostate == IOState::Done) {
1069 handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1e26e48b 1070 handleQuery(state, now);
d0ae6360
RG
1071 return;
1072 }
4ab01344 1073 }
d0ae6360
RG
1074
1075 if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
77c3701d 1076 iostate = state->d_handler.tryWrite(state->d_responseBuffer, state->d_currentPos, state->d_responseBuffer.size());
d0ae6360 1077 if (iostate == IOState::Done) {
1e26e48b 1078 handleResponseSent(state, now);
d0ae6360
RG
1079 return;
1080 }
4ab01344 1081 }
a227f47d 1082
d0ae6360
RG
1083 if (state->d_state != IncomingTCPConnectionState::State::doingHandshake &&
1084 state->d_state != IncomingTCPConnectionState::State::readingQuerySize &&
1085 state->d_state != IncomingTCPConnectionState::State::readingQuery &&
1086 state->d_state != IncomingTCPConnectionState::State::sendingResponse) {
1087 vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(state->d_state));
1088 }
1089 }
1090 catch(const std::exception& e) {
1091 /* most likely an EOF because the other end closed the connection,
1092 but it might also be a real IO error or something else.
1093 Let's just drop the connection
1094 */
a6e9e107
RG
1095 if (state->d_state == IncomingTCPConnectionState::State::doingHandshake ||
1096 state->d_state == IncomingTCPConnectionState::State::readingQuerySize ||
1097 state->d_state == IncomingTCPConnectionState::State::readingQuery) {
1098 ++state->d_ci.cs->tcpDiedReadingQuery;
1099 }
1100 else if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
1101 ++state->d_ci.cs->tcpDiedSendingResponse;
1102 }
1103
d0ae6360
RG
1104 if (state->d_lastIOState == IOState::NeedWrite || state->d_readingFirstQuery) {
1105 vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (state->d_lastIOState == IOState::NeedRead ? "reading" : "writing"), state->d_ci.remote.toStringWithPort(), e.what());
1106 }
1107 else {
1108 vinfolog("Closing TCP client connection with %s", state->d_ci.remote.toStringWithPort());
1109 }
1110 /* remove this FD from the IO multiplexer */
1111 iostate = IOState::Done;
1112 }
1113
1114 if (iostate == IOState::Done) {
1115 handleNewIOState(state, iostate, fd, handleIOCallback);
1116 }
1117 else {
1118 handleNewIOState(state, iostate, fd, handleIOCallback, iostate == IOState::NeedRead ? state->getClientReadTTD(now) : state->getClientWriteTTD(now));
1119 }
1120}
1121
77c3701d
RG
1122static void handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
1123{
1124 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
1125 if (fd != state->d_ci.fd) {
1126 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__func__) + ", expected " + std::to_string(state->d_ci.fd));
1127 }
1128 struct timeval now;
1129 gettimeofday(&now, 0);
1130
1131 handleIO(state, now);
1132}
1133
d0ae6360
RG
1134static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1135{
1136 auto threadData = boost::any_cast<TCPClientThreadData*>(param);
1137
1138 ConnectionInfo* citmp{nullptr};
1139
ea87ba72
RG
1140 ssize_t got = read(pipefd, &citmp, sizeof(citmp));
1141 if (got == 0) {
acadc544 1142 throw std::runtime_error("EOF while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
ea87ba72
RG
1143 }
1144 else if (got == -1) {
1145 if (errno == EAGAIN || errno == EINTR) {
1146 return;
1147 }
acadc544 1148 throw std::runtime_error("Error while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + strerror(errno));
d0ae6360 1149 }
ea87ba72
RG
1150 else if (got != sizeof(citmp)) {
1151 throw std::runtime_error("Partial read while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
d0ae6360
RG
1152 }
1153
acadc544
RG
1154 try {
1155 g_tcpclientthreads->decrementQueuedCount();
d0ae6360 1156
acadc544
RG
1157 struct timeval now;
1158 gettimeofday(&now, 0);
1159 auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
1160 delete citmp;
1161 citmp = nullptr;
d0ae6360 1162
acadc544
RG
1163 /* let's update the remaining time */
1164 state->d_remainingTime = g_maxTCPConnectionDuration;
a227f47d 1165
acadc544
RG
1166 handleIO(state, now);
1167 }
1168 catch(...) {
1169 delete citmp;
1170 citmp = nullptr;
1171 throw;
1172 }
d0ae6360
RG
1173}
1174
1175void tcpClientThread(int pipefd)
1176{
1177 /* we get launched with a pipe on which we receive file descriptors from clients that we own
1178 from that point on */
1179
1180 setThreadName("dnsdist/tcpClie");
1181
1182 TCPClientThreadData data;
1183
1184 data.mplexer->addReadFD(pipefd, handleIncomingTCPQuery, &data);
d0ae6360
RG
1185 struct timeval now;
1186 gettimeofday(&now, 0);
acadc544
RG
1187 time_t lastTCPCleanup = now.tv_sec;
1188 time_t lastTimeoutScan = now.tv_sec;
d0ae6360
RG
1189
1190 for (;;) {
1191 data.mplexer->run(&now);
1192
1193 if (g_downstreamTCPCleanupInterval > 0 && (now.tv_sec > (lastTCPCleanup + g_downstreamTCPCleanupInterval))) {
1194 cleanupClosedTCPConnections();
1195 lastTCPCleanup = now.tv_sec;
1196 }
1197
e90a5cb6
RG
1198 if (now.tv_sec > lastTimeoutScan) {
1199 lastTimeoutScan = now.tv_sec;
1200 auto expiredReadConns = data.mplexer->getTimeouts(now, false);
1201 for(const auto& conn : expiredReadConns) {
1202 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(conn.second);
1203 if (conn.first == state->d_ci.fd) {
1204 vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
a6e9e107 1205 ++state->d_ci.cs->tcpClientTimeouts;
e90a5cb6
RG
1206 }
1207 else if (state->d_ds) {
1208 vinfolog("Timeout (read) from remote backend %s", state->d_ds->getName());
a6e9e107
RG
1209 ++state->d_ci.cs->tcpDownstreamTimeouts;
1210 ++state->d_ds->tcpReadTimeouts;
e90a5cb6
RG
1211 }
1212 data.mplexer->removeReadFD(conn.first);
1213 state->d_lastIOState = IOState::Done;
d0ae6360 1214 }
840ed663 1215
e90a5cb6
RG
1216 auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
1217 for(const auto& conn : expiredWriteConns) {
1218 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(conn.second);
1219 if (conn.first == state->d_ci.fd) {
1220 vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
a6e9e107 1221 ++state->d_ci.cs->tcpClientTimeouts;
e90a5cb6
RG
1222 }
1223 else if (state->d_ds) {
1224 vinfolog("Timeout (write) from remote backend %s", state->d_ds->getName());
a6e9e107
RG
1225 ++state->d_ci.cs->tcpDownstreamTimeouts;
1226 ++state->d_ds->tcpWriteTimeouts;
e90a5cb6
RG
1227 }
1228 data.mplexer->removeWriteFD(conn.first);
1229 state->d_lastIOState = IOState::Done;
d0ae6360 1230 }
840ed663 1231 }
8a5d5053 1232 }
8a5d5053 1233}
1234
d0ae6360 1235/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
8a5d5053 1236 they will hand off to worker threads & spawn more of them if required
1237*/
9b73b71c 1238void tcpAcceptorThread(void* p)
8a5d5053 1239{
519f5484 1240 setThreadName("dnsdist/tcpAcce");
8a5d5053 1241 ClientState* cs = (ClientState*) p;
9396d955 1242 bool tcpClientCountIncremented = false;
8a5d5053 1243 ComboAddress remote;
1244 remote.sin4.sin_family = cs->local.sin4.sin_family;
d0ae6360 1245
a9bf3ec4 1246 g_tcpclientthreads->addTCPClientThread();
8a5d5053 1247
b719e465 1248 auto acl = g_ACL.getLocal();
8a5d5053 1249 for(;;) {
8de680ec 1250 bool queuedCounterIncremented = false;
275bcb5b 1251 std::unique_ptr<ConnectionInfo> ci;
9396d955 1252 tcpClientCountIncremented = false;
8a5d5053 1253 try {
0c8d25a7 1254 socklen_t remlen = remote.getSocklen();
cff9aa03 1255 ci = std::unique_ptr<ConnectionInfo>(new ConnectionInfo(cs));
0c8d25a7 1256#ifdef HAVE_ACCEPT4
3e425868 1257 ci->fd = accept4(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
0c8d25a7 1258#else
3e425868 1259 ci->fd = accept(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
0c8d25a7 1260#endif
cff9aa03
RG
1261 ++cs->tcpCurrentConnections;
1262
0c8d25a7
RG
1263 if(ci->fd < 0) {
1264 throw std::runtime_error((boost::format("accepting new connection on socket: %s") % strerror(errno)).str());
1265 }
963bef8d 1266
b719e465 1267 if(!acl->match(remote)) {
cb167afd 1268 ++g_stats.aclDrops;
b719e465 1269 vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
1270 continue;
1271 }
8a5d5053 1272
0c8d25a7
RG
1273#ifndef HAVE_ACCEPT4
1274 if (!setNonBlocking(ci->fd)) {
0c8d25a7
RG
1275 continue;
1276 }
1277#endif
c75d3c10 1278 setTCPNoDelay(ci->fd); // disable NAGLE
ded1985a 1279 if(g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
6c1ca990
RG
1280 vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
1281 continue;
1282 }
1283
9396d955
RG
1284 if (g_maxTCPConnectionsPerClient) {
1285 std::lock_guard<std::mutex> lock(tcpClientsCountMutex);
1286
1287 if (tcpClientsCount[remote] >= g_maxTCPConnectionsPerClient) {
9396d955
RG
1288 vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort());
1289 continue;
1290 }
1291 tcpClientsCount[remote]++;
1292 tcpClientCountIncremented = true;
1293 }
1294
b719e465 1295 vinfolog("Got TCP connection from %s", remote.toStringWithPort());
9396d955 1296
8a5d5053 1297 ci->remote = remote;
a9bf3ec4
RG
1298 int pipe = g_tcpclientthreads->getThread();
1299 if (pipe >= 0) {
8de680ec 1300 queuedCounterIncremented = true;
275bcb5b
RG
1301 auto tmp = ci.release();
1302 try {
1303 writen2WithTimeout(pipe, &tmp, sizeof(tmp), 0);
1304 }
1305 catch(...) {
1306 delete tmp;
1307 tmp = nullptr;
1308 throw;
1309 }
a9bf3ec4
RG
1310 }
1311 else {
ded1985a 1312 g_tcpclientthreads->decrementQueuedCount();
8de680ec 1313 queuedCounterIncremented = false;
9396d955
RG
1314 if(tcpClientCountIncremented) {
1315 decrementTCPClientCount(remote);
1316 }
a9bf3ec4 1317 }
8a5d5053 1318 }
3e425868 1319 catch(const std::exception& e) {
3c17e261 1320 errlog("While reading a TCP question: %s", e.what());
9396d955
RG
1321 if(tcpClientCountIncremented) {
1322 decrementTCPClientCount(remote);
1323 }
8de680ec 1324 if (queuedCounterIncremented) {
ded1985a 1325 g_tcpclientthreads->decrementQueuedCount();
8de680ec 1326 }
3c17e261 1327 }
8a5d5053 1328 catch(...){}
1329 }
8a5d5053 1330}