]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/dnsdist-tcp.cc
dnsdist: Implement a limit of concurrent connections to a backend
[thirdparty/pdns.git] / pdns / dnsdist-tcp.cc
CommitLineData
8a5d5053 1/*
12471842
PL
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
086a43eb
RG
22
23#include <thread>
24#include <netinet/tcp.h>
25#include <queue>
26
8a5d5053 27#include "dnsdist.hh"
ca404e94 28#include "dnsdist-ecs.hh"
bde73d5b 29#include "dnsdist-proxy-protocol.hh"
03b00917 30#include "dnsdist-rings.hh"
2c0e81bb 31#include "dnsdist-tcp.hh"
086a43eb 32#include "dnsdist-tcp-downstream.hh"
8a2dd7db 33#include "dnsdist-downstream-connection.hh"
086a43eb 34#include "dnsdist-tcp-upstream.hh"
53c57da7 35#include "dnsdist-xpf.hh"
548c8b66 36#include "dnsparser.hh"
8a5d5053 37#include "dolog.hh"
85c7ca75 38#include "gettime.hh"
086a43eb
RG
39#include "lock.hh"
40#include "sstuff.hh"
a227f47d 41#include "tcpiohandler.hh"
086a43eb 42#include "tcpiohandler-mplexer.hh"
519f5484 43#include "threadname.hh"
8a5d5053 44
a227f47d
RG
45/* TCP: the grand design.
46 We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
47 An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
8a5d5053 48 we will not go there.
49
50 In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
51 This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
52 to guarantee performance.
53
54 So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
55 So whenever an answer comes in, we know where it needs to go.
56
57 Let's start naively.
58*/
59
3d313832 60static LockGuarded<std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan>> s_tcpClientsCount;
086a43eb 61
d0ae6360
RG
62size_t g_maxTCPQueriesPerConn{0};
63size_t g_maxTCPConnectionDuration{0};
64size_t g_maxTCPConnectionsPerClient{0};
69e66563
RG
65#ifdef __linux__
66// On Linux this gives us 128k pending queries (default is 8192 queries),
67// which should be enough to deal with huge spikes
68size_t g_tcpInternalPipeBufferSize{1024*1024};
69uint64_t g_maxTCPQueuedConnections{10000};
70#else
4ce513a4 71size_t g_tcpInternalPipeBufferSize{0};
69e66563
RG
72uint64_t g_maxTCPQueuedConnections{1000};
73#endif
9fdcf7ca 74
b7eb6b6e
RG
75int g_tcpRecvTimeout{2};
76int g_tcpSendTimeout{2};
da63cb08 77std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
d0ae6360 78
086a43eb 79static void decrementTCPClientCount(const ComboAddress& client)
cff9aa03 80{
086a43eb 81 if (g_maxTCPConnectionsPerClient) {
3d313832
RG
82 auto tcpClientsCount = s_tcpClientsCount.lock();
83 tcpClientsCount->at(client)--;
84 if (tcpClientsCount->at(client) == 0) {
85 tcpClientsCount->erase(client);
d0ae6360
RG
86 }
87 }
d0ae6360
RG
88}
89
086a43eb 90IncomingTCPConnectionState::~IncomingTCPConnectionState()
d0ae6360 91{
086a43eb 92 decrementTCPClientCount(d_ci.remote);
086a43eb
RG
93
94 if (d_ci.cs != nullptr) {
95 struct timeval now;
96 gettimeofday(&now, nullptr);
97
98 auto diff = now - d_connectionStartTime;
086a43eb 99 d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0);
80d74de3 100 }
61e1924b
RG
101
102 // would have been done when the object is destroyed anyway,
103 // but that way we make sure it's done before the ConnectionInfo is destroyed,
104 // closing the descriptor, instead of relying on the declaration order of the objects in the class
105 d_handler.close();
8a5d5053 106}
107
97e8507d 108size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
40a81af4 109{
9fd8dc1f 110 return t_downstreamTCPConnectionsManager.clear();
40a81af4
RG
111}
112
c4f2293b 113std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
8a5d5053 114{
086a43eb 115 std::shared_ptr<TCPConnectionToBackend> downstream{nullptr};
275bcb5b 116
645a1ca4 117 downstream = getOwnedDownstreamConnection(ds, tlvs);
275bcb5b 118
086a43eb 119 if (!downstream) {
645a1ca4 120 /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
9fd8dc1f 121 downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, ds, now, std::string());
8cf75ac2 122 if (ds->d_config.useProxyProtocol) {
645a1ca4
RG
123 registerOwnedDownstreamConnection(downstream);
124 }
275bcb5b
RG
125 }
126
086a43eb 127 return downstream;
9396d955
RG
128}
129
ae3b96d9 130static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD);
086a43eb 131
2c0e81bb 132TCPClientCollection::TCPClientCollection(size_t maxThreads): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
cea9f70d 133{
ae3b96d9
RG
134 for (size_t idx = 0; idx < maxThreads; idx++) {
135 addTCPClientThread();
136 }
cea9f70d
RG
137}
138
8a5d5053 139void TCPClientCollection::addTCPClientThread()
a9bf3ec4 140{
2c0e81bb
RG
141 auto preparePipe = [](int fds[2], const std::string& type) -> bool {
142 if (pipe(fds) < 0) {
143 errlog("Error creating the TCP thread %s pipe: %s", type, stringerror());
144 return false;
edbda1ad 145 }
8a5d5053 146
2c0e81bb 147 if (!setNonBlocking(fds[0])) {
a702a96c 148 int err = errno;
2c0e81bb
RG
149 close(fds[0]);
150 close(fds[1]);
151 errlog("Error setting the TCP thread %s pipe non-blocking: %s", type, stringerror(err));
152 return false;
3b07fd1b
RG
153 }
154
2c0e81bb 155 if (!setNonBlocking(fds[1])) {
a702a96c 156 int err = errno;
2c0e81bb
RG
157 close(fds[0]);
158 close(fds[1]);
159 errlog("Error setting the TCP thread %s pipe non-blocking: %s", type, stringerror(err));
160 return false;
edbda1ad 161 }
4ce513a4 162
2c0e81bb
RG
163 if (g_tcpInternalPipeBufferSize > 0 && getPipeBufferSize(fds[0]) < g_tcpInternalPipeBufferSize) {
164 setPipeBufferSize(fds[0], g_tcpInternalPipeBufferSize);
4ce513a4 165 }
2c0e81bb
RG
166
167 return true;
168 };
169
170 int pipefds[2] = { -1, -1};
171 if (!preparePipe(pipefds, "communication")) {
172 return;
adbf75f7
RG
173 }
174
ae3b96d9
RG
175 int crossProtocolQueriesFDs[2] = { -1, -1};
176 if (!preparePipe(crossProtocolQueriesFDs, "cross-protocol queries")) {
177 return;
178 }
179
180 int crossProtocolResponsesFDs[2] = { -1, -1};
181 if (!preparePipe(crossProtocolResponsesFDs, "cross-protocol responses")) {
2c0e81bb
RG
182 return;
183 }
184
185 vinfolog("Adding TCP Client thread");
186
ded1985a 187 {
90e4fcd8 188 if (d_numthreads >= d_tcpclientthreads.size()) {
06e4b948 189 vinfolog("Adding a new TCP client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of TCP client threads with setMaxTCPClientThreads() in the configuration.", d_numthreads.load(), d_tcpclientthreads.size());
ae3b96d9
RG
190 close(crossProtocolQueriesFDs[0]);
191 close(crossProtocolQueriesFDs[1]);
192 close(crossProtocolResponsesFDs[0]);
193 close(crossProtocolResponsesFDs[1]);
2c0e81bb
RG
194 close(pipefds[0]);
195 close(pipefds[1]);
ded1985a
RG
196 return;
197 }
198
2c0e81bb
RG
199 /* from now on this side of the pipe will be managed by that object,
200 no need to worry about it */
ae3b96d9 201 TCPWorkerThread worker(pipefds[1], crossProtocolQueriesFDs[1], crossProtocolResponsesFDs[1]);
39386988 202 try {
ae3b96d9 203 std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1]);
39386988
RG
204 t1.detach();
205 }
d094bec2 206 catch (const std::runtime_error& e) {
39386988
RG
207 /* the thread creation failed, don't leak */
208 errlog("Error creating a TCP thread: %s", e.what());
2c0e81bb 209 close(pipefds[0]);
ae3b96d9
RG
210 close(crossProtocolQueriesFDs[0]);
211 close(crossProtocolResponsesFDs[0]);
39386988
RG
212 return;
213 }
214
2c0e81bb 215 d_tcpclientthreads.at(d_numthreads) = std::move(worker);
dd9c8246 216 ++d_numthreads;
ded1985a 217 }
8a5d5053 218}
219
1f7646c2 220std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
d0ae6360 221
44e34991
RG
222static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
223{
224 IOState result = IOState::Done;
225
d46e6c91 226 while (state->active() && !state->d_queuedResponses.empty()) {
44e34991
RG
227 DEBUGLOG("queue size is "<<state->d_queuedResponses.size()<<", sending the next one");
228 TCPResponse resp = std::move(state->d_queuedResponses.front());
229 state->d_queuedResponses.pop_front();
230 state->d_state = IncomingTCPConnectionState::State::idle;
231 result = state->sendResponse(state, now, std::move(resp));
232 if (result != IOState::Done) {
233 return result;
234 }
235 }
236
4253054a
RG
237 state->d_state = IncomingTCPConnectionState::State::idle;
238 return IOState::Done;
44e34991
RG
239}
240
5c7fb70a 241static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, TCPResponse& currentResponse)
d0ae6360 242{
2c0e81bb 243 if (currentResponse.d_idstate.qtype == QType::AXFR || currentResponse.d_idstate.qtype == QType::IXFR) {
4253054a
RG
244 return;
245 }
d0ae6360 246
4253054a 247 --state->d_currentQueriesCount;
d0ae6360 248
4253054a
RG
249 if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) {
250 const auto& ds = currentResponse.d_connection->getDS();
4253054a
RG
251 const auto& ids = currentResponse.d_idstate;
252 double udiff = ids.sentTime.udiff();
8cf75ac2 253 vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f usec", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), currentResponse.d_buffer.size(), udiff);
4253054a 254
d5578666
RG
255 auto backendProtocol = ds->getProtocol();
256 if (backendProtocol == dnsdist::Protocol::DoUDP) {
257 backendProtocol = dnsdist::Protocol::DoTCP;
258 }
259 ::handleResponseSent(ids, udiff, state->d_ci.remote, ds->d_config.remote, static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, backendProtocol);
12593efb
CHB
260 } else {
261 const auto& ids = currentResponse.d_idstate;
30067fd6 262 ::handleResponseSent(ids, 0., state->d_ci.remote, ComboAddress(), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ids.protocol);
d0ae6360 263 }
5c7fb70a
RG
264
265 currentResponse.d_buffer.clear();
266 currentResponse.d_connection.reset();
086a43eb 267}
cff9aa03 268
acf2a221 269static void prependSizeToTCPQuery(PacketBuffer& buffer, size_t proxyProtocolPayloadSize)
2c0e81bb 270{
acf2a221
RG
271 if (buffer.size() <= proxyProtocolPayloadSize) {
272 throw std::runtime_error("The payload size is smaller or equal to the buffer size");
273 }
274
275 uint16_t queryLen = proxyProtocolPayloadSize > 0 ? (buffer.size() - proxyProtocolPayloadSize) : buffer.size();
2c0e81bb
RG
276 const uint8_t sizeBytes[] = { static_cast<uint8_t>(queryLen / 256), static_cast<uint8_t>(queryLen % 256) };
277 /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
278 that could occur if we had to deal with the size during the processing,
279 especially alignment issues */
acf2a221 280 buffer.insert(buffer.begin() + proxyProtocolPayloadSize, sizeBytes, sizeBytes + 2);
2c0e81bb
RG
281}
282
4253054a 283bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
20ad5012 284{
40a81af4
RG
285 if (d_hadErrors) {
286 DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
287 return false;
288 }
289
20ad5012
RG
290 if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
291 DEBUGLOG("not accepting new queries because we already have "<<d_currentQueriesCount<<" out of "<<d_ci.cs->d_maxInFlightQueriesPerConn);
292 return false;
293 }
294
4253054a
RG
295 if (g_maxTCPQueriesPerConn && d_queriesCount > g_maxTCPQueriesPerConn) {
296 vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, g_maxTCPQueriesPerConn);
297 return false;
298 }
299
300 if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
301 vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
302 return false;
303 }
304
20ad5012
RG
305 return true;
306}
307
086a43eb 308void IncomingTCPConnectionState::resetForNewQuery()
d0ae6360 309{
086a43eb
RG
310 d_buffer.resize(sizeof(uint16_t));
311 d_currentPos = 0;
312 d_querySize = 0;
4253054a 313 d_state = State::waitingForQuery;
086a43eb 314}
d0ae6360 315
645a1ca4 316std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getOwnedDownstreamConnection(const std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
20ad5012 317{
645a1ca4
RG
318 auto it = d_ownedConnectionsToBackend.find(ds);
319 if (it == d_ownedConnectionsToBackend.end()) {
320 DEBUGLOG("no owned connection found for "<<ds->getName());
20ad5012
RG
321 return nullptr;
322 }
323
324 for (auto& conn : it->second) {
645a1ca4
RG
325 if (conn->canBeReused(true) && conn->matchesTLVs(tlvs)) {
326 DEBUGLOG("Got one owned connection accepting more for "<<ds->getName());
c4f2293b 327 conn->setReused();
20ad5012
RG
328 return conn;
329 }
4f0a424e 330 DEBUGLOG("not accepting more for "<<ds->getName());
20ad5012
RG
331 }
332
333 return nullptr;
334}
335
645a1ca4 336void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
20ad5012 337{
645a1ca4 338 d_ownedConnectionsToBackend[conn->getDS()].push_front(conn);
20ad5012
RG
339}
340
44e34991
RG
341/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
342IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
343{
344 state->d_state = IncomingTCPConnectionState::State::sendingResponse;
345
346 uint16_t responseSize = static_cast<uint16_t>(response.d_buffer.size());
347 const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) };
348 /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
349 that could occur if we had to deal with the size during the processing,
350 especially alignment issues */
351 response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2);
352 state->d_currentPos = 0;
353 state->d_currentResponse = std::move(response);
354
9c2ca55f
RG
355 try {
356 auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
357 if (iostate == IOState::Done) {
4253054a 358 DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
40a81af4 359 handleResponseSent(state, state->d_currentResponse);
4253054a 360 return iostate;
9c2ca55f 361 } else {
4253054a 362 state->d_lastIOBlocked = true;
9c2ca55f 363 DEBUGLOG("partial write");
5da53ae3 364 return iostate;
44e34991 365 }
9c2ca55f
RG
366 }
367 catch (const std::exception& e) {
368 vinfolog("Closing TCP client connection with %s: %s", state->d_ci.remote.toStringWithPort(), e.what());
369 DEBUGLOG("Closing TCP client connection: "<<e.what());
370 ++state->d_ci.cs->tcpDiedSendingResponse;
371
372 state->terminateClientConnection();
373
374 return IOState::Done;
44e34991
RG
375 }
376}
377
9c2ca55f
RG
378void IncomingTCPConnectionState::terminateClientConnection()
379{
40a81af4 380 DEBUGLOG("terminating client connection");
9c2ca55f
RG
381 d_queuedResponses.clear();
382 /* we have already released idle connections that could be reused,
383 we don't care about the ones still waiting for responses */
645a1ca4
RG
384 for (auto& backend : d_ownedConnectionsToBackend) {
385 for (auto& conn : backend.second) {
386 conn->release();
387 }
388 }
389 d_ownedConnectionsToBackend.clear();
489caa9f 390
9c2ca55f
RG
391 /* meaning we will no longer be 'active' when the backend
392 response or timeout comes in */
4253054a 393 d_ioState.reset();
7a735295
RG
394
395 /* if we do have remaining async descriptors associated with this TLS
396 connection, we need to defer the destruction of the TLS object until
397 the engine has reported back, otherwise we have a use-after-free.. */
398 auto afds = d_handler.getAsyncFDs();
399 if (afds.empty()) {
400 d_handler.close();
401 }
402 else {
403 /* we might already be waiting, but we might also not because sometimes we have already been
404 notified via the descriptor, not received Async again, but the async job still exists.. */
405 auto state = shared_from_this();
406 for (const auto fd : afds) {
407 try {
5d08a451 408 state->d_threadData.mplexer->addReadFD(fd, handleAsyncReady, state);
7a735295
RG
409 }
410 catch (...) {
411 }
412 }
413
414 }
9c2ca55f
RG
415}
416
4253054a 417void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
086a43eb 418{
4253054a
RG
419 // queue response
420 state->d_queuedResponses.push_back(std::move(response));
421 DEBUGLOG("queueing response, state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size());
422
423 // when the response comes from a backend, there is a real possibility that we are currently
424 // idle, and thus not trying to send the response right away would make our ref count go to 0.
425 // Even if we are waiting for a query, we will not wake up before the new query arrives or a
426 // timeout occurs
086a43eb 427 if (state->d_state == IncomingTCPConnectionState::State::idle ||
4253054a
RG
428 state->d_state == IncomingTCPConnectionState::State::waitingForQuery) {
429 auto iostate = sendQueuedResponses(state, now);
086a43eb 430
d46e6c91 431 if (iostate == IOState::Done && state->active()) {
40a81af4
RG
432 if (state->canAcceptNewQueries(now)) {
433 state->resetForNewQuery();
434 state->d_state = IncomingTCPConnectionState::State::waitingForQuery;
435 iostate = IOState::NeedRead;
436 }
437 else {
438 state->d_state = IncomingTCPConnectionState::State::idle;
439 }
4253054a
RG
440 }
441
442 // for the same reason we need to update the state right away, nobody will do that for us
d46e6c91 443 if (state->active()) {
489caa9f
RG
444 updateIO(state, iostate, now);
445 }
446 }
447}
448
449void IncomingTCPConnectionState::handleAsyncReady(int fd, FDMultiplexer::funcparam_t& param)
450{
451 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
452
453 /* If we are here, the async jobs for this SSL* are finished
454 so we should be able to remove all FDs */
455 auto afds = state->d_handler.getAsyncFDs();
456 for (const auto afd : afds) {
457 try {
458 state->d_threadData.mplexer->removeReadFD(afd);
459 }
460 catch (...) {
461 }
462 }
463
7a735295
RG
464 if (state->active()) {
465 /* and now we restart our own I/O state machine */
466 struct timeval now;
467 gettimeofday(&now, nullptr);
468 handleIO(state, now);
469 }
470 else {
471 /* we were only waiting for the engine to come back,
472 to prevent a use-after-free */
473 state->d_handler.close();
474 }
489caa9f
RG
475}
476
477void IncomingTCPConnectionState::updateIO(std::shared_ptr<IncomingTCPConnectionState>& state, IOState newState, const struct timeval& now)
478{
479 if (newState == IOState::Async) {
480 auto fds = state->d_handler.getAsyncFDs();
481 for (const auto fd : fds) {
482 state->d_threadData.mplexer->addReadFD(fd, handleAsyncReady, state);
d46e6c91 483 }
489caa9f
RG
484 state->d_ioState->update(IOState::Done, handleIOCallback, state);
485 }
486 else {
487 state->d_ioState->update(newState, handleIOCallback, state, newState == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
d0ae6360 488 }
d0ae6360
RG
489}
490
44e34991 491/* called from the backend code when a new response has been received */
2c0e81bb 492void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response)
8a5d5053 493{
2c0e81bb
RG
494 std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
495
8cf75ac2 496 if (response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
645a1ca4
RG
497 // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
498 if (!response.d_connection->willBeReusable(true)) {
499 // if it can't be reused even by us, well
500 const auto connIt = state->d_ownedConnectionsToBackend.find(response.d_connection->getDS());
501 if (connIt != state->d_ownedConnectionsToBackend.end()) {
89e62bd8
RG
502 auto& list = connIt->second;
503
504 for (auto it = list.begin(); it != list.end(); ++it) {
505 if (*it == response.d_connection) {
506 try {
507 response.d_connection->release();
89e62bd8
RG
508 }
509 catch (const std::exception& e) {
510 vinfolog("Error releasing connection: %s", e.what());
511 }
512 list.erase(it);
513 break;
9c2ca55f 514 }
44e34991 515 }
20ad5012
RG
516 }
517 }
518 }
519
086a43eb 520 if (response.d_buffer.size() < sizeof(dnsheader)) {
40a81af4 521 state->terminateClientConnection();
d0ae6360
RG
522 return;
523 }
a227f47d 524
40a81af4
RG
525 try {
526 auto& ids = response.d_idstate;
527 unsigned int qnameWireLength;
d70f95ac 528 if (!response.d_connection || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getDS(), qnameWireLength)) {
40a81af4
RG
529 state->terminateClientConnection();
530 return;
531 }
d0ae6360 532
4df166a9
RG
533 if (response.d_connection->getDS()) {
534 ++response.d_connection->getDS()->responses;
535 }
536
d6e8cc56 537 DNSResponse dr = makeDNSResponseFromIDState(ids, response.d_buffer);
77c9bc9a 538
40a81af4 539 memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH));
d11c4232 540
d84ea5be 541 if (!processResponse(response.d_buffer, state->d_threadData.localRespRuleActions, dr, false, false)) {
40a81af4
RG
542 state->terminateClientConnection();
543 return;
544 }
545 }
546 catch (const std::exception& e) {
80a9bf9c 547 vinfolog("Unexpected exception while handling response from backend: %s", e.what());
40a81af4 548 state->terminateClientConnection();
d0ae6360
RG
549 return;
550 }
8a5d5053 551
df5f9e85
RG
552 ++g_stats.responses;
553 ++state->d_ci.cs->responses;
2f967133 554
4253054a 555 queueResponse(state, now, std::move(response));
d0ae6360 556}
6ce2da14 557
ae3b96d9
RG
558struct TCPCrossProtocolResponse
559{
560 TCPCrossProtocolResponse(TCPResponse&& response, std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now): d_response(std::move(response)), d_state(state), d_now(now)
561 {
562 }
563
564 TCPResponse d_response;
565 std::shared_ptr<IncomingTCPConnectionState> d_state;
566 struct timeval d_now;
567};
568
569class TCPCrossProtocolQuerySender : public TCPQuerySender
570{
571public:
38edba89 572 TCPCrossProtocolQuerySender(std::shared_ptr<IncomingTCPConnectionState>& state): d_state(state)
ae3b96d9
RG
573 {
574 }
575
576 bool active() const override
577 {
578 return d_state->active();
579 }
580
581 const ClientState* getClientState() const override
582 {
583 return d_state->getClientState();
584 }
585
586 void handleResponse(const struct timeval& now, TCPResponse&& response) override
587 {
38edba89 588 if (d_state->d_threadData.crossProtocolResponsesPipe == -1) {
ae3b96d9
RG
589 throw std::runtime_error("Invalid pipe descriptor in TCP Cross Protocol Query Sender");
590 }
591
592 auto ptr = new TCPCrossProtocolResponse(std::move(response), d_state, now);
593 static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail");
38edba89 594 ssize_t sent = write(d_state->d_threadData.crossProtocolResponsesPipe, &ptr, sizeof(ptr));
ae3b96d9
RG
595 if (sent != sizeof(ptr)) {
596 if (errno == EAGAIN || errno == EWOULDBLOCK) {
eec63896 597 ++g_stats.tcpCrossProtocolResponsePipeFull;
ae3b96d9
RG
598 vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full");
599 }
600 else {
601 vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because we couldn't write to the pipe: %s", stringerror());
602 }
603 delete ptr;
604 }
605 }
606
607 void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
608 {
609 handleResponse(now, std::move(response));
610 }
611
612 void notifyIOError(IDState&& query, const struct timeval& now) override
613 {
614 TCPResponse response(PacketBuffer(), std::move(query), nullptr);
615 handleResponse(now, std::move(response));
616 }
617
618private:
619 std::shared_ptr<IncomingTCPConnectionState> d_state;
ae3b96d9
RG
620};
621
89e62bd8
RG
622class TCPCrossProtocolQuery : public CrossProtocolQuery
623{
624public:
ae3b96d9 625 TCPCrossProtocolQuery(PacketBuffer&& buffer, IDState&& ids, std::shared_ptr<DownstreamState>& ds, std::shared_ptr<TCPCrossProtocolQuerySender>& sender): d_sender(sender)
89e62bd8
RG
626 {
627 query = InternalQuery(std::move(buffer), std::move(ids));
628 downstream = ds;
89e62bd8
RG
629 proxyProtocolPayloadSize = 0;
630 }
631
632 ~TCPCrossProtocolQuery()
633 {
634 }
635
636 std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
637 {
638 return d_sender;
639 }
640
641private:
ae3b96d9 642 std::shared_ptr<TCPCrossProtocolQuerySender> d_sender;
89e62bd8
RG
643};
644
4253054a 645static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
d0ae6360
RG
646{
647 if (state->d_querySize < sizeof(dnsheader)) {
648 ++g_stats.nonCompliantQueries;
fd23f5de 649 ++state->d_ci.cs->nonCompliantQueries;
4253054a
RG
650 state->terminateClientConnection();
651 return;
d0ae6360 652 }
be35c484 653
d0ae6360
RG
654 ++state->d_queriesCount;
655 ++state->d_ci.cs->queries;
656 ++g_stats.queries;
657
bb3954f0
RG
658 if (state->d_handler.isTLS()) {
659 auto tlsVersion = state->d_handler.getTLSVersion();
660 switch (tlsVersion) {
661 case LibsslTLSVersion::TLS10:
662 ++state->d_ci.cs->tls10queries;
663 break;
664 case LibsslTLSVersion::TLS11:
665 ++state->d_ci.cs->tls11queries;
666 break;
667 case LibsslTLSVersion::TLS12:
668 ++state->d_ci.cs->tls12queries;
669 break;
670 case LibsslTLSVersion::TLS13:
671 ++state->d_ci.cs->tls13queries;
672 break;
673 default:
674 ++state->d_ci.cs->tlsUnknownqueries;
675 }
676 }
677
d0ae6360
RG
678 /* we need an accurate ("real") value for the response and
679 to store into the IDS, but not for insertion into the
680 rings for example */
d0ae6360 681 struct timespec queryRealTime;
d0ae6360
RG
682 gettime(&queryRealTime, true);
683
73571c03 684 std::unique_ptr<DNSCryptQuery> dnsCryptQuery{nullptr};
341d2553 685 auto dnsCryptResponse = checkDNSCryptQuery(*state->d_ci.cs, state->d_buffer, dnsCryptQuery, queryRealTime.tv_sec, true);
d0ae6360 686 if (dnsCryptResponse) {
086a43eb 687 TCPResponse response;
086a43eb 688 state->d_state = IncomingTCPConnectionState::State::idle;
20ad5012 689 ++state->d_currentQueriesCount;
4253054a
RG
690 state->queueResponse(state, now, std::move(response));
691 return;
d0ae6360 692 }
0beaa5c8 693
9a872eb7
RG
694 {
695 /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
696 auto* dh = reinterpret_cast<dnsheader*>(state->d_buffer.data());
fd23f5de 697 if (!checkQueryHeaders(dh, *state->d_ci.cs)) {
4253054a
RG
698 state->terminateClientConnection();
699 return;
9a872eb7
RG
700 }
701
702 if (dh->qdcount == 0) {
703 TCPResponse response;
704 dh->rcode = RCode::NotImp;
705 dh->qr = true;
706 response.d_selfGenerated = true;
707 response.d_buffer = std::move(state->d_buffer);
708 state->d_state = IncomingTCPConnectionState::State::idle;
709 ++state->d_currentQueriesCount;
4253054a
RG
710 state->queueResponse(state, now, std::move(response));
711 return;
9a872eb7 712 }
d0ae6360 713 }
0beaa5c8 714
d0ae6360 715 uint16_t qtype, qclass;
341d2553
RG
716 unsigned int qnameWireLength = 0;
717 DNSName qname(reinterpret_cast<const char*>(state->d_buffer.data()), state->d_buffer.size(), sizeof(dnsheader), false, &qtype, &qclass, &qnameWireLength);
2c0e81bb 718 dnsdist::Protocol protocol = dnsdist::Protocol::DoTCP;
d84ea5be 719 if (dnsCryptQuery) {
2c0e81bb 720 protocol = dnsdist::Protocol::DNSCryptTCP;
d84ea5be
RG
721 }
722 else if (state->d_handler.isTLS()) {
2c0e81bb 723 protocol = dnsdist::Protocol::DoT;
d84ea5be
RG
724 }
725
726 DNSQuestion dq(&qname, qtype, qclass, &state->d_proxiedDestination, &state->d_proxiedRemote, state->d_buffer, protocol, &queryRealTime);
d0ae6360 727 dq.dnsCryptQuery = std::move(dnsCryptQuery);
046bac5c 728 dq.sni = state->d_handler.getServerNameIndication();
448d66d4 729 if (state->d_proxyProtocolValues) {
c4f2293b
RG
730 /* we need to copy them, because the next queries received on that connection will
731 need to get the _unaltered_ values */
732 dq.proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(*state->d_proxyProtocolValues);
448d66d4 733 }
e91084ce 734
2c0e81bb 735 if (dq.qtype == QType::AXFR || dq.qtype == QType::IXFR) {
d0ae6360
RG
736 dq.skipCache = true;
737 }
9396d955 738
086a43eb
RG
739 std::shared_ptr<DownstreamState> ds;
740 auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds);
9396d955 741
d0ae6360 742 if (result == ProcessQueryResult::Drop) {
4253054a
RG
743 state->terminateClientConnection();
744 return;
d0ae6360 745 }
2efd427d 746
341d2553 747 // the buffer might have been invalidated by now
9a872eb7 748 const dnsheader* dh = dq.getHeader();
d0ae6360 749 if (result == ProcessQueryResult::SendAnswer) {
086a43eb 750 TCPResponse response;
3728cbca 751 response.d_selfGenerated = true;
12593efb
CHB
752 response.d_idstate.origID = dh->id;
753 response.d_idstate.cs = state->d_ci.cs;
30067fd6 754 setIDStateFromDNSQuestion(response.d_idstate, dq, std::move(qname));
12593efb 755
30067fd6
CHB
756 memcpy(&response.d_cleartextDH, dh, sizeof(response.d_cleartextDH));
757 response.d_buffer = std::move(state->d_buffer);
12593efb 758
086a43eb 759 state->d_state = IncomingTCPConnectionState::State::idle;
20ad5012 760 ++state->d_currentQueriesCount;
4253054a
RG
761 state->queueResponse(state, now, std::move(response));
762 return;
d0ae6360 763 }
2efd427d 764
086a43eb 765 if (result != ProcessQueryResult::PassToBackend || ds == nullptr) {
4253054a
RG
766 state->terminateClientConnection();
767 return;
086a43eb
RG
768 }
769
086a43eb 770 IDState ids;
086a43eb 771 setIDStateFromDNSQuestion(ids, dq, std::move(qname));
8ac88d69 772 ids.origID = dh->id;
5a96fcd2 773 ids.cs = state->d_ci.cs;
2d4783a8 774
89e62bd8
RG
775 ++state->d_currentQueriesCount;
776
0e6892c6 777 std::string proxyProtocolPayload;
89e62bd8 778 if (ds->isDoH()) {
f05cd66c
RG
779 vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).toString(), state->d_proxiedRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName());
780
0e6892c6
RG
781 /* we need to do this _before_ creating the cross protocol query because
782 after that the buffer will have been moved */
8cf75ac2 783 if (ds->d_config.useProxyProtocol) {
0e6892c6
RG
784 proxyProtocolPayload = getProxyProtocolPayload(dq);
785 }
786
38edba89 787 auto incoming = std::make_shared<TCPCrossProtocolQuerySender>(state);
ae3b96d9 788 auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(state->d_buffer), std::move(ids), ds, incoming);
0e6892c6 789 cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
89e62bd8
RG
790
791 ds->passCrossProtocolQuery(std::move(cpq));
792 return;
793 }
794
acf2a221 795 prependSizeToTCPQuery(state->d_buffer, 0);
bde73d5b 796
c4f2293b 797 auto downstreamConnection = state->getDownstreamConnection(ds, dq.proxyProtocolValues, now);
c4f2293b 798
8cf75ac2 799 if (ds->d_config.useProxyProtocol) {
fbfb8fb8
RG
800 /* if we ever sent a TLV over a connection, we can never go back */
801 if (!state->d_proxyProtocolPayloadHasTLV) {
802 state->d_proxyProtocolPayloadHasTLV = dq.proxyProtocolValues && !dq.proxyProtocolValues->empty();
803 }
bde73d5b 804
086a43eb 805 proxyProtocolPayload = getProxyProtocolPayload(dq);
fbfb8fb8 806 }
bde73d5b 807
c4f2293b
RG
808 if (dq.proxyProtocolValues) {
809 downstreamConnection->setProxyProtocolValuesSent(std::move(dq.proxyProtocolValues));
810 }
d11c4232 811
97e8507d 812 TCPQuery query(std::move(state->d_buffer), std::move(ids));
c48a3e33 813 query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
5cc8371b 814
d84ea5be 815 vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), state->d_proxiedRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), query.d_buffer.size(), ds->getName());
2c0e81bb
RG
816 std::shared_ptr<TCPQuerySender> incoming = state;
817 downstreamConnection->queueQuery(incoming, std::move(query));
d0ae6360 818}
3f6d07a4 819
086a43eb 820void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
d0ae6360 821{
086a43eb 822 auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
9c2ca55f
RG
823 if (fd != conn->d_handler.getDescriptor()) {
824 throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor()));
d0ae6360 825 }
3f6d07a4 826
086a43eb 827 struct timeval now;
b7eb6b6e 828 gettimeofday(&now, nullptr);
086a43eb
RG
829 handleIO(conn, now);
830}
8b92100f 831
086a43eb
RG
832void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
833{
086a43eb
RG
834 // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
835 // even though the underlying socket is not ready, so we need to actually ask for the data first
086a43eb
RG
836 IOState iostate = IOState::Done;
837 do {
838 iostate = IOState::Done;
839 IOStateGuard ioGuard(state->d_ioState);
7129b5c4 840
086a43eb
RG
841 if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
842 vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort());
843 // will be handled by the ioGuard
844 //handleNewIOState(state, IOState::Done, fd, handleIOCallback);
845 return;
d0ae6360 846 }
ca404e94 847
4253054a
RG
848 state->d_lastIOBlocked = false;
849
086a43eb
RG
850 try {
851 if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) {
20ad5012 852 DEBUGLOG("doing handshake");
086a43eb
RG
853 iostate = state->d_handler.tryHandshake();
854 if (iostate == IOState::Done) {
20ad5012 855 DEBUGLOG("handshake done");
086a43eb
RG
856 if (state->d_handler.isTLS()) {
857 if (!state->d_handler.hasTLSSessionBeenResumed()) {
858 ++state->d_ci.cs->tlsNewSessions;
fbfb8fb8
RG
859 }
860 else {
086a43eb
RG
861 ++state->d_ci.cs->tlsResumptions;
862 }
863 if (state->d_handler.getResumedFromInactiveTicketKey()) {
864 ++state->d_ci.cs->tlsInactiveTicketKey;
865 }
866 if (state->d_handler.getUnknownTicketKey()) {
867 ++state->d_ci.cs->tlsUnknownTicketKey;
fbfb8fb8
RG
868 }
869 }
d8c19b98 870
086a43eb 871 state->d_handshakeDoneTime = now;
448d66d4
RG
872 if (expectProxyProtocolFrom(state->d_ci.remote)) {
873 state->d_state = IncomingTCPConnectionState::State::readingProxyProtocolHeader;
874 state->d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
875 state->d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
876 }
877 else {
878 state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
879 }
0ffc95ec 880 }
086a43eb 881 else {
4253054a 882 state->d_lastIOBlocked = true;
0ffc95ec 883 }
d0ae6360 884 }
a6e9e107 885
4253054a 886 if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) {
448d66d4 887 do {
40a81af4 888 DEBUGLOG("reading proxy protocol header");
448d66d4
RG
889 iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed);
890 if (iostate == IOState::Done) {
891 state->d_buffer.resize(state->d_currentPos);
892 ssize_t remaining = isProxyHeaderComplete(state->d_buffer);
893 if (remaining == 0) {
894 vinfolog("Unable to consume proxy protocol header in packet from TCP client %s", state->d_ci.remote.toStringWithPort());
895 ++g_stats.proxyProtocolInvalid;
896 break;
897 }
898 else if (remaining < 0) {
899 state->d_proxyProtocolNeed += -remaining;
900 state->d_buffer.resize(state->d_currentPos + state->d_proxyProtocolNeed);
901 /* we need to keep reading, since we might have buffered data */
902 iostate = IOState::NeedRead;
903 }
904 else {
905 /* proxy header received */
906 std::vector<ProxyProtocolValue> proxyProtocolValues;
907 if (!handleProxyProtocol(state->d_ci.remote, true, *state->d_threadData.holders.acl, state->d_buffer, state->d_proxiedRemote, state->d_proxiedDestination, proxyProtocolValues)) {
908 vinfolog("Error handling the Proxy Protocol received from TCP client %s", state->d_ci.remote.toStringWithPort());
909 break;
910 }
911
912 if (!proxyProtocolValues.empty()) {
913 state->d_proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(std::move(proxyProtocolValues));
914 }
915
916 state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
917 state->d_buffer.resize(sizeof(uint16_t));
918 state->d_currentPos = 0;
919 state->d_proxyProtocolNeed = 0;
920 break;
921 }
922 }
923 else {
4253054a 924 state->d_lastIOBlocked = true;
448d66d4
RG
925 }
926 }
4253054a 927 while (state->active() && !state->d_lastIOBlocked);
448d66d4
RG
928 }
929
4253054a
RG
930 if (!state->d_lastIOBlocked && (state->d_state == IncomingTCPConnectionState::State::waitingForQuery ||
931 state->d_state == IncomingTCPConnectionState::State::readingQuerySize)) {
20ad5012 932 DEBUGLOG("reading query size");
086a43eb 933 iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t));
4253054a
RG
934 if (state->d_currentPos > 0) {
935 /* if we got at least one byte, we can't go around sending responses */
936 state->d_state = IncomingTCPConnectionState::State::readingQuerySize;
937 }
938
086a43eb 939 if (iostate == IOState::Done) {
20ad5012 940 DEBUGLOG("query size received");
086a43eb
RG
941 state->d_state = IncomingTCPConnectionState::State::readingQuery;
942 state->d_querySizeReadTime = now;
943 if (state->d_queriesCount == 0) {
944 state->d_firstQuerySizeReadTime = now;
945 }
946 state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1);
947 if (state->d_querySize < sizeof(dnsheader)) {
948 /* go away */
d46e6c91 949 state->terminateClientConnection();
086a43eb
RG
950 return;
951 }
e724d52a 952
086a43eb
RG
953 /* allocate a bit more memory to be able to spoof the content, get an answer from the cache
954 or to add ECS without allocating a new buffer */
955 state->d_buffer.resize(std::max(state->d_querySize + static_cast<size_t>(512), s_maxPacketCacheEntrySize));
956 state->d_currentPos = 0;
957 }
958 else {
4253054a 959 state->d_lastIOBlocked = true;
086a43eb 960 }
e724d52a 961 }
d0ae6360 962
4253054a 963 if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingQuery) {
20ad5012 964 DEBUGLOG("reading query");
086a43eb
RG
965 iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize);
966 if (iostate == IOState::Done) {
20ad5012 967 DEBUGLOG("query received");
341d2553 968 state->d_buffer.resize(state->d_querySize);
20ad5012 969
4253054a
RG
970 state->d_state = IncomingTCPConnectionState::State::idle;
971 handleQuery(state, now);
4253054a
RG
972 /* the state might have been updated in the meantime, we don't want to override it
973 in that case */
974 if (state->active() && state->d_state != IncomingTCPConnectionState::State::idle) {
e82bf80f
RG
975 if (state->d_ioState->isWaitingForRead()) {
976 iostate = IOState::NeedRead;
977 }
978 else if (state->d_ioState->isWaitingForWrite()) {
979 iostate = IOState::NeedWrite;
980 }
981 else {
982 iostate = IOState::Done;
983 }
610e1bcc 984 }
846b63bb 985 }
086a43eb 986 else {
4253054a 987 state->d_lastIOBlocked = true;
086a43eb 988 }
d0ae6360 989 }
d0ae6360 990
4253054a 991 if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
20ad5012 992 DEBUGLOG("sending response");
086a43eb
RG
993 iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
994 if (iostate == IOState::Done) {
4253054a 995 DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__);
40a81af4 996 handleResponseSent(state, state->d_currentResponse);
4253054a
RG
997 state->d_state = IncomingTCPConnectionState::State::idle;
998 }
999 else {
1000 state->d_lastIOBlocked = true;
1001 }
1002 }
1003
1004 if (state->active() &&
1005 !state->d_lastIOBlocked &&
1006 iostate == IOState::Done &&
1007 (state->d_state == IncomingTCPConnectionState::State::idle ||
1008 state->d_state == IncomingTCPConnectionState::State::waitingForQuery))
1009 {
86348274 1010 // try sending queued responses
40a81af4 1011 DEBUGLOG("send responses, if any");
4253054a
RG
1012 iostate = sendQueuedResponses(state, now);
1013
d46e6c91 1014 if (!state->d_lastIOBlocked && state->active() && iostate == IOState::Done) {
4253054a
RG
1015 // if the query has been passed to a backend, or dropped, and the responses have been sent,
1016 // we can start reading again
ee189f5c
RG
1017 if (state->canAcceptNewQueries(now)) {
1018 state->resetForNewQuery();
1019 iostate = IOState::NeedRead;
1020 }
1021 else {
1022 state->d_state = IncomingTCPConnectionState::State::idle;
1023 iostate = IOState::Done;
44e34991 1024 }
61d10a4d 1025 }
8a5d5053 1026 }
d0ae6360 1027
086a43eb
RG
1028 if (state->d_state != IncomingTCPConnectionState::State::idle &&
1029 state->d_state != IncomingTCPConnectionState::State::doingHandshake &&
448d66d4 1030 state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader &&
4253054a 1031 state->d_state != IncomingTCPConnectionState::State::waitingForQuery &&
086a43eb
RG
1032 state->d_state != IncomingTCPConnectionState::State::readingQuerySize &&
1033 state->d_state != IncomingTCPConnectionState::State::readingQuery &&
1034 state->d_state != IncomingTCPConnectionState::State::sendingResponse) {
1035 vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(state->d_state));
d0ae6360 1036 }
4ab01344 1037 }
4253054a 1038 catch (const std::exception& e) {
086a43eb
RG
1039 /* most likely an EOF because the other end closed the connection,
1040 but it might also be a real IO error or something else.
1041 Let's just drop the connection
1042 */
1043 if (state->d_state == IncomingTCPConnectionState::State::idle ||
3aeefc8e
RG
1044 state->d_state == IncomingTCPConnectionState::State::waitingForQuery) {
1045 /* no need to increase any counters in that case, the client is simply done with us */
1046 }
1047 else if (state->d_state == IncomingTCPConnectionState::State::doingHandshake ||
1048 state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader ||
1049 state->d_state == IncomingTCPConnectionState::State::waitingForQuery ||
1050 state->d_state == IncomingTCPConnectionState::State::readingQuerySize ||
1051 state->d_state == IncomingTCPConnectionState::State::readingQuery) {
086a43eb
RG
1052 ++state->d_ci.cs->tcpDiedReadingQuery;
1053 }
1054 else if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) {
97e8507d 1055 /* unlikely to happen here, the exception should be handled in sendResponse() */
086a43eb 1056 ++state->d_ci.cs->tcpDiedSendingResponse;
d0ae6360 1057 }
a227f47d 1058
e82bf80f 1059 if (state->d_ioState->isWaitingForWrite() || state->d_queriesCount == 0) {
20ad5012 1060 DEBUGLOG("Got an exception while handling TCP query: "<<e.what());
e82bf80f 1061 vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (state->d_ioState->isWaitingForRead() ? "reading" : "writing"), state->d_ci.remote.toStringWithPort(), e.what());
086a43eb
RG
1062 }
1063 else {
c6f504cb 1064 vinfolog("Closing TCP client connection with %s: %s", state->d_ci.remote.toStringWithPort(), e.what());
20ad5012 1065 DEBUGLOG("Closing TCP client connection: "<<e.what());
086a43eb
RG
1066 }
1067 /* remove this FD from the IO multiplexer */
40a81af4 1068 state->terminateClientConnection();
a6e9e107
RG
1069 }
1070
4253054a 1071 if (!state->active()) {
40a81af4 1072 DEBUGLOG("state is no longer active");
4253054a
RG
1073 return;
1074 }
1075
086a43eb 1076 if (iostate == IOState::Done) {
086a43eb 1077 state->d_ioState->update(iostate, handleIOCallback, state);
d0ae6360
RG
1078 }
1079 else {
489caa9f 1080 updateIO(state, iostate, now);
d0ae6360 1081 }
086a43eb
RG
1082 ioGuard.release();
1083 }
4253054a 1084 while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !state->d_lastIOBlocked);
086a43eb
RG
1085}
1086
2c0e81bb 1087void IncomingTCPConnectionState::notifyIOError(IDState&& query, const struct timeval& now)
086a43eb 1088{
2c0e81bb
RG
1089 std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
1090
da029f91 1091 --state->d_currentQueriesCount;
40a81af4 1092 state->d_hadErrors = true;
da029f91 1093
c6c6f96e 1094 if (state->d_state == State::sendingResponse) {
086a43eb
RG
1095 /* if we have responses to send, let's do that first */
1096 }
c6c6f96e 1097 else if (!state->d_queuedResponses.empty()) {
086a43eb 1098 /* stop reading and send what we have */
9c2ca55f 1099 try {
df5f9e85
RG
1100 auto iostate = sendQueuedResponses(state, now);
1101
1102 if (state->active() && iostate != IOState::Done) {
1103 // we need to update the state right away, nobody will do that for us
489caa9f 1104 updateIO(state, iostate, now);
df5f9e85 1105 }
9c2ca55f
RG
1106 }
1107 catch (const std::exception& e) {
df5f9e85 1108 vinfolog("Exception in notifyIOError: %s", e.what());
9c2ca55f 1109 }
d0ae6360
RG
1110 }
1111 else {
086a43eb 1112 // the backend code already tried to reconnect if it was possible
40a81af4 1113 state->terminateClientConnection();
d0ae6360
RG
1114 }
1115}
1116
2c0e81bb 1117void IncomingTCPConnectionState::handleXFRResponse(const struct timeval& now, TCPResponse&& response)
77c3701d 1118{
2c0e81bb 1119 std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
4253054a 1120 queueResponse(state, now, std::move(response));
086a43eb 1121}
77c3701d 1122
c6c6f96e 1123void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
086a43eb 1124{
40a81af4 1125 vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
20ad5012 1126 DEBUGLOG("client timeout");
645a1ca4 1127 DEBUGLOG("Processed "<<state->d_queriesCount<<" queries, current count is "<<state->d_currentQueriesCount<<", "<<state->d_ownedConnectionsToBackend.size()<<" owned connections, "<<state->d_queuedResponses.size()<<" response queued");
c6c6f96e
RG
1128
1129 if (write || state->d_currentQueriesCount == 0) {
1130 ++state->d_ci.cs->tcpClientTimeouts;
40a81af4 1131 state->d_ioState.reset();
c6c6f96e
RG
1132 }
1133 else {
1134 DEBUGLOG("Going idle");
1135 /* we still have some queries in flight, let's just stop reading for now */
1136 state->d_state = IncomingTCPConnectionState::State::idle;
1137 state->d_ioState->update(IOState::Done, handleIOCallback, state);
c6c6f96e 1138 }
77c3701d
RG
1139}
1140
d0ae6360
RG
1141static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1142{
1143 auto threadData = boost::any_cast<TCPClientThreadData*>(param);
1144
1145 ConnectionInfo* citmp{nullptr};
1146
ea87ba72
RG
1147 ssize_t got = read(pipefd, &citmp, sizeof(citmp));
1148 if (got == 0) {
acadc544 1149 throw std::runtime_error("EOF while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
ea87ba72
RG
1150 }
1151 else if (got == -1) {
1152 if (errno == EAGAIN || errno == EINTR) {
1153 return;
1154 }
a702a96c 1155 throw std::runtime_error("Error while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror());
d0ae6360 1156 }
ea87ba72
RG
1157 else if (got != sizeof(citmp)) {
1158 throw std::runtime_error("Partial read while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
d0ae6360
RG
1159 }
1160
acadc544
RG
1161 try {
1162 g_tcpclientthreads->decrementQueuedCount();
d0ae6360 1163
acadc544 1164 struct timeval now;
b7eb6b6e 1165 gettimeofday(&now, nullptr);
acadc544
RG
1166 auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
1167 delete citmp;
1168 citmp = nullptr;
d0ae6360 1169
086a43eb 1170 IncomingTCPConnectionState::handleIO(state, now);
acadc544 1171 }
2c0e81bb 1172 catch (...) {
acadc544
RG
1173 delete citmp;
1174 citmp = nullptr;
1175 throw;
1176 }
d0ae6360
RG
1177}
1178
2c0e81bb
RG
1179static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1180{
1181 auto threadData = boost::any_cast<TCPClientThreadData*>(param);
1182 CrossProtocolQuery* tmp{nullptr};
1183
1184 ssize_t got = read(pipefd, &tmp, sizeof(tmp));
1185 if (got == 0) {
1186 throw std::runtime_error("EOF while reading from the TCP cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
1187 }
1188 else if (got == -1) {
1189 if (errno == EAGAIN || errno == EINTR) {
1190 return;
1191 }
1192 throw std::runtime_error("Error while reading from the TCP cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror());
1193 }
1194 else if (got != sizeof(tmp)) {
1195 throw std::runtime_error("Partial read while reading from the TCP cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
1196 }
1197
1198 try {
1199 struct timeval now;
1200 gettimeofday(&now, nullptr);
1201
585c03c4 1202 std::shared_ptr<TCPQuerySender> tqs = tmp->getTCPQuerySender();
2c0e81bb
RG
1203 auto query = std::move(tmp->query);
1204 auto downstreamServer = std::move(tmp->downstream);
acf2a221 1205 auto proxyProtocolPayloadSize = tmp->proxyProtocolPayloadSize;
2c0e81bb
RG
1206 delete tmp;
1207 tmp = nullptr;
1208
e82bf80f 1209 try {
9fd8dc1f 1210 auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
2c0e81bb 1211
e82bf80f 1212 prependSizeToTCPQuery(query.d_buffer, proxyProtocolPayloadSize);
1c9c001c 1213 query.d_proxyProtocolPayloadAddedSize = proxyProtocolPayloadSize;
e82bf80f
RG
1214 downstream->queueQuery(tqs, std::move(query));
1215 }
1216 catch (...) {
1217 tqs->notifyIOError(std::move(query.d_idstate), now);
1218 }
2c0e81bb
RG
1219 }
1220 catch (...) {
1221 delete tmp;
1222 tmp = nullptr;
2c0e81bb
RG
1223 }
1224}
1225
ae3b96d9
RG
1226static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& param)
1227{
1228 TCPCrossProtocolResponse* tmp{nullptr};
1229
1230 ssize_t got = read(pipefd, &tmp, sizeof(tmp));
1231 if (got == 0) {
1232 throw std::runtime_error("EOF while reading from the TCP cross-protocol response pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
1233 }
1234 else if (got == -1) {
1235 if (errno == EAGAIN || errno == EINTR) {
1236 return;
1237 }
1238 throw std::runtime_error("Error while reading from the TCP cross-protocol response pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror());
1239 }
1240 else if (got != sizeof(tmp)) {
1241 throw std::runtime_error("Partial read while reading from the TCP cross-protocol response pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode");
1242 }
1243
1244 auto response = std::move(*tmp);
1245 delete tmp;
1246 tmp = nullptr;
1247
f05cd66c
RG
1248 try {
1249 if (response.d_response.d_buffer.empty()) {
1250 response.d_state->notifyIOError(std::move(response.d_response.d_idstate), response.d_now);
1251 }
1252 else if (response.d_response.d_idstate.qtype == QType::AXFR || response.d_response.d_idstate.qtype == QType::IXFR) {
1253 response.d_state->handleXFRResponse(response.d_now, std::move(response.d_response));
1254 }
1255 else {
1256 response.d_state->handleResponse(response.d_now, std::move(response.d_response));
1257 }
ae3b96d9 1258 }
f05cd66c
RG
1259 catch (...) {
1260 /* no point bubbling up from there */
ae3b96d9
RG
1261 }
1262}
1263
1264static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD)
d0ae6360
RG
1265{
1266 /* we get launched with a pipe on which we receive file descriptors from clients that we own
1267 from that point on */
1268
1269 setThreadName("dnsdist/tcpClie");
1270
55a2979f
RG
1271 try {
1272 TCPClientThreadData data;
1273 /* this is the writing end! */
1274 data.crossProtocolResponsesPipe = crossProtocolResponsesWritePipeFD;
1275 data.mplexer->addReadFD(pipefd, handleIncomingTCPQuery, &data);
1276 data.mplexer->addReadFD(crossProtocolQueriesPipeFD, handleCrossProtocolQuery, &data);
1277 data.mplexer->addReadFD(crossProtocolResponsesListenPipeFD, handleCrossProtocolResponse, &data);
8904f4fd 1278
55a2979f
RG
1279 struct timeval now;
1280 gettimeofday(&now, nullptr);
1281 time_t lastTimeoutScan = now.tv_sec;
1282
1283 for (;;) {
1284 data.mplexer->run(&now);
1285
1286 try {
9fd8dc1f 1287 t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
55a2979f
RG
1288
1289 if (now.tv_sec > lastTimeoutScan) {
1290 lastTimeoutScan = now.tv_sec;
1291 auto expiredReadConns = data.mplexer->getTimeouts(now, false);
1292 for (const auto& cbData : expiredReadConns) {
1293 if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
1294 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
1295 if (cbData.first == state->d_handler.getDescriptor()) {
1296 vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
1297 state->handleTimeout(state, false);
1298 }
da63cb08 1299 }
55a2979f
RG
1300 else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
1301 auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
1302 vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
1303 conn->handleTimeout(now, false);
da63cb08 1304 }
55a2979f 1305 }
da63cb08 1306
55a2979f
RG
1307 auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
1308 for (const auto& cbData : expiredWriteConns) {
1309 if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
1310 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
1311 if (cbData.first == state->d_handler.getDescriptor()) {
1312 vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
1313 state->handleTimeout(state, true);
1314 }
da63cb08 1315 }
55a2979f
RG
1316 else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
1317 auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
1318 vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
1319 conn->handleTimeout(now, true);
da63cb08 1320 }
55a2979f
RG
1321 }
1322
1323 if (g_tcpStatesDumpRequested > 0) {
1324 /* just to keep things clean in the output, debug only */
1325 static std::mutex s_lock;
1326 std::lock_guard<decltype(s_lock)> lck(s_lock);
1327 if (g_tcpStatesDumpRequested > 0) {
1328 /* no race here, we took the lock so it can only be increased in the meantime */
1329 --g_tcpStatesDumpRequested;
1330 errlog("Dumping the TCP states, as requested:");
1331 data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd)
1332 {
1333 struct timeval lnow;
1334 gettimeofday(&lnow, nullptr);
1335 if (ttd.tv_sec > 0) {
1336 errlog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec-lnow.tv_sec));
1337 }
1338 else {
1339 errlog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
1340 }
1341
1342 if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
1343 auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
1344 errlog(" - %s", state->toString());
1345 }
1346 else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
1347 auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
1348 errlog(" - %s", conn->toString());
1349 }
1350 else if (param.type() == typeid(TCPClientThreadData*)) {
1351 errlog(" - Worker thread pipe");
1352 }
1353 });
4a62d2bf 1354 errlog("The TCP/DoT client cache has %d active and %d idle outgoing connections cached", t_downstreamTCPConnectionsManager.getActiveCount(), t_downstreamTCPConnectionsManager.getIdleCount());
da63cb08 1355 }
55a2979f 1356 }
fb59decc 1357 }
da63cb08 1358 }
55a2979f
RG
1359 catch (const std::exception& e) {
1360 errlog("Error in TCP worker thread: %s", e.what());
1361 }
d0ae6360 1362 }
8a5d5053 1363 }
55a2979f
RG
1364 catch (const std::exception& e) {
1365 errlog("Fatal error in TCP worker thread: %s", e.what());
1366 }
8a5d5053 1367}
1368
d0ae6360 1369/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
8a5d5053 1370 they will hand off to worker threads & spawn more of them if required
1371*/
fd51c832 1372void tcpAcceptorThread(ClientState* cs)
8a5d5053 1373{
519f5484 1374 setThreadName("dnsdist/tcpAcce");
fd51c832 1375
9396d955 1376 bool tcpClientCountIncremented = false;
8a5d5053 1377 ComboAddress remote;
1378 remote.sin4.sin_family = cs->local.sin4.sin_family;
d0ae6360 1379
b719e465 1380 auto acl = g_ACL.getLocal();
8a5d5053 1381 for(;;) {
275bcb5b 1382 std::unique_ptr<ConnectionInfo> ci;
9396d955 1383 tcpClientCountIncremented = false;
8a5d5053 1384 try {
0c8d25a7 1385 socklen_t remlen = remote.getSocklen();
d094bec2 1386 ci = std::make_unique<ConnectionInfo>(cs);
0c8d25a7 1387#ifdef HAVE_ACCEPT4
3e425868 1388 ci->fd = accept4(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
0c8d25a7 1389#else
3e425868 1390 ci->fd = accept(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
0c8d25a7 1391#endif
d094bec2 1392 // will be decremented when the ConnectionInfo object is destroyed, no matter the reason
cc8416ef 1393 auto concurrentConnections = ++cs->tcpCurrentConnections;
519a0072
RG
1394 if (cs->d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > cs->d_tcpConcurrentConnectionsLimit) {
1395 continue;
1396 }
1397
05e59526
RG
1398 if (concurrentConnections > cs->tcpMaxConcurrentConnections.load()) {
1399 cs->tcpMaxConcurrentConnections.store(concurrentConnections);
cc8416ef 1400 }
cff9aa03 1401
d094bec2 1402 if (ci->fd < 0) {
a702a96c 1403 throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str());
0c8d25a7 1404 }
963bef8d 1405
d094bec2 1406 if (!acl->match(remote)) {
cb167afd 1407 ++g_stats.aclDrops;
b719e465 1408 vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
1409 continue;
1410 }
8a5d5053 1411
0c8d25a7
RG
1412#ifndef HAVE_ACCEPT4
1413 if (!setNonBlocking(ci->fd)) {
0c8d25a7
RG
1414 continue;
1415 }
1416#endif
c75d3c10 1417 setTCPNoDelay(ci->fd); // disable NAGLE
d094bec2 1418 if (g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
6c1ca990
RG
1419 vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
1420 continue;
1421 }
1422
9396d955 1423 if (g_maxTCPConnectionsPerClient) {
3d313832 1424 auto tcpClientsCount = s_tcpClientsCount.lock();
9396d955 1425
3d313832 1426 if ((*tcpClientsCount)[remote] >= g_maxTCPConnectionsPerClient) {
9396d955
RG
1427 vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort());
1428 continue;
1429 }
3d313832 1430 (*tcpClientsCount)[remote]++;
9396d955
RG
1431 tcpClientCountIncremented = true;
1432 }
1433
b719e465 1434 vinfolog("Got TCP connection from %s", remote.toStringWithPort());
9396d955 1435
8a5d5053 1436 ci->remote = remote;
2c0e81bb 1437 if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) {
d094bec2 1438 if (tcpClientCountIncremented) {
9396d955
RG
1439 decrementTCPClientCount(remote);
1440 }
a9bf3ec4 1441 }
8a5d5053 1442 }
d094bec2 1443 catch (const std::exception& e) {
3c17e261 1444 errlog("While reading a TCP question: %s", e.what());
d094bec2 1445 if (tcpClientCountIncremented) {
9396d955
RG
1446 decrementTCPClientCount(remote);
1447 }
3c17e261 1448 }
d094bec2 1449 catch (...){}
8a5d5053 1450 }
8a5d5053 1451}