]>
Commit | Line | Data |
---|---|---|
8a5d5053 | 1 | /* |
12471842 PL |
2 | * This file is part of PowerDNS or dnsdist. |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
086a43eb RG |
22 | |
23 | #include <thread> | |
24 | #include <netinet/tcp.h> | |
25 | #include <queue> | |
26 | ||
8a5d5053 | 27 | #include "dnsdist.hh" |
ca404e94 | 28 | #include "dnsdist-ecs.hh" |
bde73d5b | 29 | #include "dnsdist-proxy-protocol.hh" |
03b00917 | 30 | #include "dnsdist-rings.hh" |
2c0e81bb | 31 | #include "dnsdist-tcp.hh" |
086a43eb | 32 | #include "dnsdist-tcp-downstream.hh" |
8a2dd7db | 33 | #include "dnsdist-downstream-connection.hh" |
086a43eb | 34 | #include "dnsdist-tcp-upstream.hh" |
53c57da7 | 35 | #include "dnsdist-xpf.hh" |
548c8b66 | 36 | #include "dnsparser.hh" |
8a5d5053 | 37 | #include "dolog.hh" |
85c7ca75 | 38 | #include "gettime.hh" |
086a43eb RG |
39 | #include "lock.hh" |
40 | #include "sstuff.hh" | |
a227f47d | 41 | #include "tcpiohandler.hh" |
086a43eb | 42 | #include "tcpiohandler-mplexer.hh" |
519f5484 | 43 | #include "threadname.hh" |
8a5d5053 | 44 | |
a227f47d RG |
45 | /* TCP: the grand design. |
46 | We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops. | |
47 | An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially | |
8a5d5053 | 48 | we will not go there. |
49 | ||
50 | In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup. | |
51 | This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections | |
52 | to guarantee performance. | |
53 | ||
54 | So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue. | |
55 | So whenever an answer comes in, we know where it needs to go. | |
56 | ||
57 | Let's start naively. | |
58 | */ | |
59 | ||
3d313832 | 60 | static LockGuarded<std::map<ComboAddress,size_t,ComboAddress::addressOnlyLessThan>> s_tcpClientsCount; |
086a43eb | 61 | |
d0ae6360 RG |
62 | size_t g_maxTCPQueriesPerConn{0}; |
63 | size_t g_maxTCPConnectionDuration{0}; | |
64 | size_t g_maxTCPConnectionsPerClient{0}; | |
69e66563 RG |
65 | #ifdef __linux__ |
66 | // On Linux this gives us 128k pending queries (default is 8192 queries), | |
67 | // which should be enough to deal with huge spikes | |
68 | size_t g_tcpInternalPipeBufferSize{1024*1024}; | |
69 | uint64_t g_maxTCPQueuedConnections{10000}; | |
70 | #else | |
4ce513a4 | 71 | size_t g_tcpInternalPipeBufferSize{0}; |
69e66563 RG |
72 | uint64_t g_maxTCPQueuedConnections{1000}; |
73 | #endif | |
9fdcf7ca | 74 | |
b7eb6b6e RG |
75 | int g_tcpRecvTimeout{2}; |
76 | int g_tcpSendTimeout{2}; | |
da63cb08 | 77 | std::atomic<uint64_t> g_tcpStatesDumpRequested{0}; |
d0ae6360 | 78 | |
086a43eb | 79 | static void decrementTCPClientCount(const ComboAddress& client) |
cff9aa03 | 80 | { |
086a43eb | 81 | if (g_maxTCPConnectionsPerClient) { |
3d313832 RG |
82 | auto tcpClientsCount = s_tcpClientsCount.lock(); |
83 | tcpClientsCount->at(client)--; | |
84 | if (tcpClientsCount->at(client) == 0) { | |
85 | tcpClientsCount->erase(client); | |
d0ae6360 RG |
86 | } |
87 | } | |
d0ae6360 RG |
88 | } |
89 | ||
086a43eb | 90 | IncomingTCPConnectionState::~IncomingTCPConnectionState() |
d0ae6360 | 91 | { |
086a43eb | 92 | decrementTCPClientCount(d_ci.remote); |
086a43eb RG |
93 | |
94 | if (d_ci.cs != nullptr) { | |
95 | struct timeval now; | |
96 | gettimeofday(&now, nullptr); | |
97 | ||
98 | auto diff = now - d_connectionStartTime; | |
086a43eb | 99 | d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000.0 + diff.tv_usec / 1000.0); |
80d74de3 | 100 | } |
61e1924b RG |
101 | |
102 | // would have been done when the object is destroyed anyway, | |
103 | // but that way we make sure it's done before the ConnectionInfo is destroyed, | |
104 | // closing the descriptor, instead of relying on the declaration order of the objects in the class | |
105 | d_handler.close(); | |
8a5d5053 | 106 | } |
107 | ||
97e8507d | 108 | size_t IncomingTCPConnectionState::clearAllDownstreamConnections() |
40a81af4 | 109 | { |
9fd8dc1f | 110 | return t_downstreamTCPConnectionsManager.clear(); |
40a81af4 RG |
111 | } |
112 | ||
c4f2293b | 113 | std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now) |
8a5d5053 | 114 | { |
086a43eb | 115 | std::shared_ptr<TCPConnectionToBackend> downstream{nullptr}; |
275bcb5b | 116 | |
645a1ca4 | 117 | downstream = getOwnedDownstreamConnection(ds, tlvs); |
275bcb5b | 118 | |
086a43eb | 119 | if (!downstream) { |
645a1ca4 | 120 | /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */ |
9fd8dc1f | 121 | downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, ds, now, std::string()); |
8cf75ac2 | 122 | if (ds->d_config.useProxyProtocol) { |
645a1ca4 RG |
123 | registerOwnedDownstreamConnection(downstream); |
124 | } | |
275bcb5b RG |
125 | } |
126 | ||
086a43eb | 127 | return downstream; |
9396d955 RG |
128 | } |
129 | ||
ae3b96d9 | 130 | static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD); |
086a43eb | 131 | |
2c0e81bb | 132 | TCPClientCollection::TCPClientCollection(size_t maxThreads): d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads) |
cea9f70d | 133 | { |
ae3b96d9 RG |
134 | for (size_t idx = 0; idx < maxThreads; idx++) { |
135 | addTCPClientThread(); | |
136 | } | |
cea9f70d RG |
137 | } |
138 | ||
8a5d5053 | 139 | void TCPClientCollection::addTCPClientThread() |
a9bf3ec4 | 140 | { |
2c0e81bb RG |
141 | auto preparePipe = [](int fds[2], const std::string& type) -> bool { |
142 | if (pipe(fds) < 0) { | |
143 | errlog("Error creating the TCP thread %s pipe: %s", type, stringerror()); | |
144 | return false; | |
edbda1ad | 145 | } |
8a5d5053 | 146 | |
2c0e81bb | 147 | if (!setNonBlocking(fds[0])) { |
a702a96c | 148 | int err = errno; |
2c0e81bb RG |
149 | close(fds[0]); |
150 | close(fds[1]); | |
151 | errlog("Error setting the TCP thread %s pipe non-blocking: %s", type, stringerror(err)); | |
152 | return false; | |
3b07fd1b RG |
153 | } |
154 | ||
2c0e81bb | 155 | if (!setNonBlocking(fds[1])) { |
a702a96c | 156 | int err = errno; |
2c0e81bb RG |
157 | close(fds[0]); |
158 | close(fds[1]); | |
159 | errlog("Error setting the TCP thread %s pipe non-blocking: %s", type, stringerror(err)); | |
160 | return false; | |
edbda1ad | 161 | } |
4ce513a4 | 162 | |
2c0e81bb RG |
163 | if (g_tcpInternalPipeBufferSize > 0 && getPipeBufferSize(fds[0]) < g_tcpInternalPipeBufferSize) { |
164 | setPipeBufferSize(fds[0], g_tcpInternalPipeBufferSize); | |
4ce513a4 | 165 | } |
2c0e81bb RG |
166 | |
167 | return true; | |
168 | }; | |
169 | ||
170 | int pipefds[2] = { -1, -1}; | |
171 | if (!preparePipe(pipefds, "communication")) { | |
172 | return; | |
adbf75f7 RG |
173 | } |
174 | ||
ae3b96d9 RG |
175 | int crossProtocolQueriesFDs[2] = { -1, -1}; |
176 | if (!preparePipe(crossProtocolQueriesFDs, "cross-protocol queries")) { | |
177 | return; | |
178 | } | |
179 | ||
180 | int crossProtocolResponsesFDs[2] = { -1, -1}; | |
181 | if (!preparePipe(crossProtocolResponsesFDs, "cross-protocol responses")) { | |
2c0e81bb RG |
182 | return; |
183 | } | |
184 | ||
185 | vinfolog("Adding TCP Client thread"); | |
186 | ||
ded1985a | 187 | { |
90e4fcd8 | 188 | if (d_numthreads >= d_tcpclientthreads.size()) { |
06e4b948 | 189 | vinfolog("Adding a new TCP client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of TCP client threads with setMaxTCPClientThreads() in the configuration.", d_numthreads.load(), d_tcpclientthreads.size()); |
ae3b96d9 RG |
190 | close(crossProtocolQueriesFDs[0]); |
191 | close(crossProtocolQueriesFDs[1]); | |
192 | close(crossProtocolResponsesFDs[0]); | |
193 | close(crossProtocolResponsesFDs[1]); | |
2c0e81bb RG |
194 | close(pipefds[0]); |
195 | close(pipefds[1]); | |
ded1985a RG |
196 | return; |
197 | } | |
198 | ||
2c0e81bb RG |
199 | /* from now on this side of the pipe will be managed by that object, |
200 | no need to worry about it */ | |
ae3b96d9 | 201 | TCPWorkerThread worker(pipefds[1], crossProtocolQueriesFDs[1], crossProtocolResponsesFDs[1]); |
39386988 | 202 | try { |
ae3b96d9 | 203 | std::thread t1(tcpClientThread, pipefds[0], crossProtocolQueriesFDs[0], crossProtocolResponsesFDs[0], crossProtocolResponsesFDs[1]); |
39386988 RG |
204 | t1.detach(); |
205 | } | |
d094bec2 | 206 | catch (const std::runtime_error& e) { |
39386988 RG |
207 | /* the thread creation failed, don't leak */ |
208 | errlog("Error creating a TCP thread: %s", e.what()); | |
2c0e81bb | 209 | close(pipefds[0]); |
ae3b96d9 RG |
210 | close(crossProtocolQueriesFDs[0]); |
211 | close(crossProtocolResponsesFDs[0]); | |
39386988 RG |
212 | return; |
213 | } | |
214 | ||
2c0e81bb | 215 | d_tcpclientthreads.at(d_numthreads) = std::move(worker); |
dd9c8246 | 216 | ++d_numthreads; |
ded1985a | 217 | } |
8a5d5053 | 218 | } |
219 | ||
1f7646c2 | 220 | std::unique_ptr<TCPClientCollection> g_tcpclientthreads; |
d0ae6360 | 221 | |
44e34991 RG |
222 | static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) |
223 | { | |
224 | IOState result = IOState::Done; | |
225 | ||
d46e6c91 | 226 | while (state->active() && !state->d_queuedResponses.empty()) { |
44e34991 RG |
227 | DEBUGLOG("queue size is "<<state->d_queuedResponses.size()<<", sending the next one"); |
228 | TCPResponse resp = std::move(state->d_queuedResponses.front()); | |
229 | state->d_queuedResponses.pop_front(); | |
230 | state->d_state = IncomingTCPConnectionState::State::idle; | |
231 | result = state->sendResponse(state, now, std::move(resp)); | |
232 | if (result != IOState::Done) { | |
233 | return result; | |
234 | } | |
235 | } | |
236 | ||
4253054a RG |
237 | state->d_state = IncomingTCPConnectionState::State::idle; |
238 | return IOState::Done; | |
44e34991 RG |
239 | } |
240 | ||
5c7fb70a | 241 | static void handleResponseSent(std::shared_ptr<IncomingTCPConnectionState>& state, TCPResponse& currentResponse) |
d0ae6360 | 242 | { |
2c0e81bb | 243 | if (currentResponse.d_idstate.qtype == QType::AXFR || currentResponse.d_idstate.qtype == QType::IXFR) { |
4253054a RG |
244 | return; |
245 | } | |
d0ae6360 | 246 | |
4253054a | 247 | --state->d_currentQueriesCount; |
d0ae6360 | 248 | |
4253054a RG |
249 | if (currentResponse.d_selfGenerated == false && currentResponse.d_connection && currentResponse.d_connection->getDS()) { |
250 | const auto& ds = currentResponse.d_connection->getDS(); | |
4253054a RG |
251 | const auto& ids = currentResponse.d_idstate; |
252 | double udiff = ids.sentTime.udiff(); | |
8cf75ac2 | 253 | vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f usec", ds->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), currentResponse.d_buffer.size(), udiff); |
4253054a | 254 | |
d5578666 RG |
255 | auto backendProtocol = ds->getProtocol(); |
256 | if (backendProtocol == dnsdist::Protocol::DoUDP) { | |
257 | backendProtocol = dnsdist::Protocol::DoTCP; | |
258 | } | |
259 | ::handleResponseSent(ids, udiff, state->d_ci.remote, ds->d_config.remote, static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, backendProtocol); | |
12593efb CHB |
260 | } else { |
261 | const auto& ids = currentResponse.d_idstate; | |
30067fd6 | 262 | ::handleResponseSent(ids, 0., state->d_ci.remote, ComboAddress(), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ids.protocol); |
d0ae6360 | 263 | } |
5c7fb70a RG |
264 | |
265 | currentResponse.d_buffer.clear(); | |
266 | currentResponse.d_connection.reset(); | |
086a43eb | 267 | } |
cff9aa03 | 268 | |
acf2a221 | 269 | static void prependSizeToTCPQuery(PacketBuffer& buffer, size_t proxyProtocolPayloadSize) |
2c0e81bb | 270 | { |
acf2a221 RG |
271 | if (buffer.size() <= proxyProtocolPayloadSize) { |
272 | throw std::runtime_error("The payload size is smaller or equal to the buffer size"); | |
273 | } | |
274 | ||
275 | uint16_t queryLen = proxyProtocolPayloadSize > 0 ? (buffer.size() - proxyProtocolPayloadSize) : buffer.size(); | |
2c0e81bb RG |
276 | const uint8_t sizeBytes[] = { static_cast<uint8_t>(queryLen / 256), static_cast<uint8_t>(queryLen % 256) }; |
277 | /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes | |
278 | that could occur if we had to deal with the size during the processing, | |
279 | especially alignment issues */ | |
acf2a221 | 280 | buffer.insert(buffer.begin() + proxyProtocolPayloadSize, sizeBytes, sizeBytes + 2); |
2c0e81bb RG |
281 | } |
282 | ||
4253054a | 283 | bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now) |
20ad5012 | 284 | { |
40a81af4 RG |
285 | if (d_hadErrors) { |
286 | DEBUGLOG("not accepting new queries because we encountered some error during the processing already"); | |
287 | return false; | |
288 | } | |
289 | ||
20ad5012 RG |
290 | if (d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) { |
291 | DEBUGLOG("not accepting new queries because we already have "<<d_currentQueriesCount<<" out of "<<d_ci.cs->d_maxInFlightQueriesPerConn); | |
292 | return false; | |
293 | } | |
294 | ||
4253054a RG |
295 | if (g_maxTCPQueriesPerConn && d_queriesCount > g_maxTCPQueriesPerConn) { |
296 | vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, g_maxTCPQueriesPerConn); | |
297 | return false; | |
298 | } | |
299 | ||
300 | if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) { | |
301 | vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort()); | |
302 | return false; | |
303 | } | |
304 | ||
20ad5012 RG |
305 | return true; |
306 | } | |
307 | ||
086a43eb | 308 | void IncomingTCPConnectionState::resetForNewQuery() |
d0ae6360 | 309 | { |
086a43eb RG |
310 | d_buffer.resize(sizeof(uint16_t)); |
311 | d_currentPos = 0; | |
312 | d_querySize = 0; | |
4253054a | 313 | d_state = State::waitingForQuery; |
086a43eb | 314 | } |
d0ae6360 | 315 | |
645a1ca4 | 316 | std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getOwnedDownstreamConnection(const std::shared_ptr<DownstreamState>& ds, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) |
20ad5012 | 317 | { |
645a1ca4 RG |
318 | auto it = d_ownedConnectionsToBackend.find(ds); |
319 | if (it == d_ownedConnectionsToBackend.end()) { | |
320 | DEBUGLOG("no owned connection found for "<<ds->getName()); | |
20ad5012 RG |
321 | return nullptr; |
322 | } | |
323 | ||
324 | for (auto& conn : it->second) { | |
645a1ca4 RG |
325 | if (conn->canBeReused(true) && conn->matchesTLVs(tlvs)) { |
326 | DEBUGLOG("Got one owned connection accepting more for "<<ds->getName()); | |
c4f2293b | 327 | conn->setReused(); |
20ad5012 RG |
328 | return conn; |
329 | } | |
4f0a424e | 330 | DEBUGLOG("not accepting more for "<<ds->getName()); |
20ad5012 RG |
331 | } |
332 | ||
333 | return nullptr; | |
334 | } | |
335 | ||
645a1ca4 | 336 | void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn) |
20ad5012 | 337 | { |
645a1ca4 | 338 | d_ownedConnectionsToBackend[conn->getDS()].push_front(conn); |
20ad5012 RG |
339 | } |
340 | ||
44e34991 RG |
341 | /* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */ |
342 | IOState IncomingTCPConnectionState::sendResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response) | |
343 | { | |
344 | state->d_state = IncomingTCPConnectionState::State::sendingResponse; | |
345 | ||
346 | uint16_t responseSize = static_cast<uint16_t>(response.d_buffer.size()); | |
347 | const uint8_t sizeBytes[] = { static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256) }; | |
348 | /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes | |
349 | that could occur if we had to deal with the size during the processing, | |
350 | especially alignment issues */ | |
351 | response.d_buffer.insert(response.d_buffer.begin(), sizeBytes, sizeBytes + 2); | |
352 | state->d_currentPos = 0; | |
353 | state->d_currentResponse = std::move(response); | |
354 | ||
9c2ca55f RG |
355 | try { |
356 | auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); | |
357 | if (iostate == IOState::Done) { | |
4253054a | 358 | DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__); |
40a81af4 | 359 | handleResponseSent(state, state->d_currentResponse); |
4253054a | 360 | return iostate; |
9c2ca55f | 361 | } else { |
4253054a | 362 | state->d_lastIOBlocked = true; |
9c2ca55f | 363 | DEBUGLOG("partial write"); |
5da53ae3 | 364 | return iostate; |
44e34991 | 365 | } |
9c2ca55f RG |
366 | } |
367 | catch (const std::exception& e) { | |
368 | vinfolog("Closing TCP client connection with %s: %s", state->d_ci.remote.toStringWithPort(), e.what()); | |
369 | DEBUGLOG("Closing TCP client connection: "<<e.what()); | |
370 | ++state->d_ci.cs->tcpDiedSendingResponse; | |
371 | ||
372 | state->terminateClientConnection(); | |
373 | ||
374 | return IOState::Done; | |
44e34991 RG |
375 | } |
376 | } | |
377 | ||
9c2ca55f RG |
378 | void IncomingTCPConnectionState::terminateClientConnection() |
379 | { | |
40a81af4 | 380 | DEBUGLOG("terminating client connection"); |
9c2ca55f RG |
381 | d_queuedResponses.clear(); |
382 | /* we have already released idle connections that could be reused, | |
383 | we don't care about the ones still waiting for responses */ | |
645a1ca4 RG |
384 | for (auto& backend : d_ownedConnectionsToBackend) { |
385 | for (auto& conn : backend.second) { | |
386 | conn->release(); | |
387 | } | |
388 | } | |
389 | d_ownedConnectionsToBackend.clear(); | |
489caa9f | 390 | |
9c2ca55f RG |
391 | /* meaning we will no longer be 'active' when the backend |
392 | response or timeout comes in */ | |
4253054a | 393 | d_ioState.reset(); |
7a735295 RG |
394 | |
395 | /* if we do have remaining async descriptors associated with this TLS | |
396 | connection, we need to defer the destruction of the TLS object until | |
397 | the engine has reported back, otherwise we have a use-after-free.. */ | |
398 | auto afds = d_handler.getAsyncFDs(); | |
399 | if (afds.empty()) { | |
400 | d_handler.close(); | |
401 | } | |
402 | else { | |
403 | /* we might already be waiting, but we might also not because sometimes we have already been | |
404 | notified via the descriptor, not received Async again, but the async job still exists.. */ | |
405 | auto state = shared_from_this(); | |
406 | for (const auto fd : afds) { | |
407 | try { | |
5d08a451 | 408 | state->d_threadData.mplexer->addReadFD(fd, handleAsyncReady, state); |
7a735295 RG |
409 | } |
410 | catch (...) { | |
411 | } | |
412 | } | |
413 | ||
414 | } | |
9c2ca55f RG |
415 | } |
416 | ||
4253054a | 417 | void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response) |
086a43eb | 418 | { |
4253054a RG |
419 | // queue response |
420 | state->d_queuedResponses.push_back(std::move(response)); | |
421 | DEBUGLOG("queueing response, state is "<<(int)state->d_state<<", queue size is now "<<state->d_queuedResponses.size()); | |
422 | ||
423 | // when the response comes from a backend, there is a real possibility that we are currently | |
424 | // idle, and thus not trying to send the response right away would make our ref count go to 0. | |
425 | // Even if we are waiting for a query, we will not wake up before the new query arrives or a | |
426 | // timeout occurs | |
086a43eb | 427 | if (state->d_state == IncomingTCPConnectionState::State::idle || |
4253054a RG |
428 | state->d_state == IncomingTCPConnectionState::State::waitingForQuery) { |
429 | auto iostate = sendQueuedResponses(state, now); | |
086a43eb | 430 | |
d46e6c91 | 431 | if (iostate == IOState::Done && state->active()) { |
40a81af4 RG |
432 | if (state->canAcceptNewQueries(now)) { |
433 | state->resetForNewQuery(); | |
434 | state->d_state = IncomingTCPConnectionState::State::waitingForQuery; | |
435 | iostate = IOState::NeedRead; | |
436 | } | |
437 | else { | |
438 | state->d_state = IncomingTCPConnectionState::State::idle; | |
439 | } | |
4253054a RG |
440 | } |
441 | ||
442 | // for the same reason we need to update the state right away, nobody will do that for us | |
d46e6c91 | 443 | if (state->active()) { |
489caa9f RG |
444 | updateIO(state, iostate, now); |
445 | } | |
446 | } | |
447 | } | |
448 | ||
449 | void IncomingTCPConnectionState::handleAsyncReady(int fd, FDMultiplexer::funcparam_t& param) | |
450 | { | |
451 | auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param); | |
452 | ||
453 | /* If we are here, the async jobs for this SSL* are finished | |
454 | so we should be able to remove all FDs */ | |
455 | auto afds = state->d_handler.getAsyncFDs(); | |
456 | for (const auto afd : afds) { | |
457 | try { | |
458 | state->d_threadData.mplexer->removeReadFD(afd); | |
459 | } | |
460 | catch (...) { | |
461 | } | |
462 | } | |
463 | ||
7a735295 RG |
464 | if (state->active()) { |
465 | /* and now we restart our own I/O state machine */ | |
466 | struct timeval now; | |
467 | gettimeofday(&now, nullptr); | |
468 | handleIO(state, now); | |
469 | } | |
470 | else { | |
471 | /* we were only waiting for the engine to come back, | |
472 | to prevent a use-after-free */ | |
473 | state->d_handler.close(); | |
474 | } | |
489caa9f RG |
475 | } |
476 | ||
477 | void IncomingTCPConnectionState::updateIO(std::shared_ptr<IncomingTCPConnectionState>& state, IOState newState, const struct timeval& now) | |
478 | { | |
479 | if (newState == IOState::Async) { | |
480 | auto fds = state->d_handler.getAsyncFDs(); | |
481 | for (const auto fd : fds) { | |
482 | state->d_threadData.mplexer->addReadFD(fd, handleAsyncReady, state); | |
d46e6c91 | 483 | } |
489caa9f RG |
484 | state->d_ioState->update(IOState::Done, handleIOCallback, state); |
485 | } | |
486 | else { | |
487 | state->d_ioState->update(newState, handleIOCallback, state, newState == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now)); | |
d0ae6360 | 488 | } |
d0ae6360 RG |
489 | } |
490 | ||
44e34991 | 491 | /* called from the backend code when a new response has been received */ |
2c0e81bb | 492 | void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response) |
8a5d5053 | 493 | { |
2c0e81bb RG |
494 | std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this(); |
495 | ||
8cf75ac2 | 496 | if (response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) { |
645a1ca4 RG |
497 | // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway |
498 | if (!response.d_connection->willBeReusable(true)) { | |
499 | // if it can't be reused even by us, well | |
500 | const auto connIt = state->d_ownedConnectionsToBackend.find(response.d_connection->getDS()); | |
501 | if (connIt != state->d_ownedConnectionsToBackend.end()) { | |
89e62bd8 RG |
502 | auto& list = connIt->second; |
503 | ||
504 | for (auto it = list.begin(); it != list.end(); ++it) { | |
505 | if (*it == response.d_connection) { | |
506 | try { | |
507 | response.d_connection->release(); | |
89e62bd8 RG |
508 | } |
509 | catch (const std::exception& e) { | |
510 | vinfolog("Error releasing connection: %s", e.what()); | |
511 | } | |
512 | list.erase(it); | |
513 | break; | |
9c2ca55f | 514 | } |
44e34991 | 515 | } |
20ad5012 RG |
516 | } |
517 | } | |
518 | } | |
519 | ||
086a43eb | 520 | if (response.d_buffer.size() < sizeof(dnsheader)) { |
40a81af4 | 521 | state->terminateClientConnection(); |
d0ae6360 RG |
522 | return; |
523 | } | |
a227f47d | 524 | |
40a81af4 RG |
525 | try { |
526 | auto& ids = response.d_idstate; | |
527 | unsigned int qnameWireLength; | |
d70f95ac | 528 | if (!response.d_connection || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, response.d_connection->getDS(), qnameWireLength)) { |
40a81af4 RG |
529 | state->terminateClientConnection(); |
530 | return; | |
531 | } | |
d0ae6360 | 532 | |
4df166a9 RG |
533 | if (response.d_connection->getDS()) { |
534 | ++response.d_connection->getDS()->responses; | |
535 | } | |
536 | ||
d6e8cc56 | 537 | DNSResponse dr = makeDNSResponseFromIDState(ids, response.d_buffer); |
77c9bc9a | 538 | |
40a81af4 | 539 | memcpy(&response.d_cleartextDH, dr.getHeader(), sizeof(response.d_cleartextDH)); |
d11c4232 | 540 | |
d84ea5be | 541 | if (!processResponse(response.d_buffer, state->d_threadData.localRespRuleActions, dr, false, false)) { |
40a81af4 RG |
542 | state->terminateClientConnection(); |
543 | return; | |
544 | } | |
545 | } | |
546 | catch (const std::exception& e) { | |
80a9bf9c | 547 | vinfolog("Unexpected exception while handling response from backend: %s", e.what()); |
40a81af4 | 548 | state->terminateClientConnection(); |
d0ae6360 RG |
549 | return; |
550 | } | |
8a5d5053 | 551 | |
df5f9e85 RG |
552 | ++g_stats.responses; |
553 | ++state->d_ci.cs->responses; | |
2f967133 | 554 | |
4253054a | 555 | queueResponse(state, now, std::move(response)); |
d0ae6360 | 556 | } |
6ce2da14 | 557 | |
ae3b96d9 RG |
558 | struct TCPCrossProtocolResponse |
559 | { | |
560 | TCPCrossProtocolResponse(TCPResponse&& response, std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now): d_response(std::move(response)), d_state(state), d_now(now) | |
561 | { | |
562 | } | |
563 | ||
564 | TCPResponse d_response; | |
565 | std::shared_ptr<IncomingTCPConnectionState> d_state; | |
566 | struct timeval d_now; | |
567 | }; | |
568 | ||
569 | class TCPCrossProtocolQuerySender : public TCPQuerySender | |
570 | { | |
571 | public: | |
38edba89 | 572 | TCPCrossProtocolQuerySender(std::shared_ptr<IncomingTCPConnectionState>& state): d_state(state) |
ae3b96d9 RG |
573 | { |
574 | } | |
575 | ||
576 | bool active() const override | |
577 | { | |
578 | return d_state->active(); | |
579 | } | |
580 | ||
581 | const ClientState* getClientState() const override | |
582 | { | |
583 | return d_state->getClientState(); | |
584 | } | |
585 | ||
586 | void handleResponse(const struct timeval& now, TCPResponse&& response) override | |
587 | { | |
38edba89 | 588 | if (d_state->d_threadData.crossProtocolResponsesPipe == -1) { |
ae3b96d9 RG |
589 | throw std::runtime_error("Invalid pipe descriptor in TCP Cross Protocol Query Sender"); |
590 | } | |
591 | ||
592 | auto ptr = new TCPCrossProtocolResponse(std::move(response), d_state, now); | |
593 | static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranteed not to be interleaved and to either fully succeed or fail"); | |
38edba89 | 594 | ssize_t sent = write(d_state->d_threadData.crossProtocolResponsesPipe, &ptr, sizeof(ptr)); |
ae3b96d9 RG |
595 | if (sent != sizeof(ptr)) { |
596 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
eec63896 | 597 | ++g_stats.tcpCrossProtocolResponsePipeFull; |
ae3b96d9 RG |
598 | vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full"); |
599 | } | |
600 | else { | |
601 | vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because we couldn't write to the pipe: %s", stringerror()); | |
602 | } | |
603 | delete ptr; | |
604 | } | |
605 | } | |
606 | ||
607 | void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override | |
608 | { | |
609 | handleResponse(now, std::move(response)); | |
610 | } | |
611 | ||
612 | void notifyIOError(IDState&& query, const struct timeval& now) override | |
613 | { | |
614 | TCPResponse response(PacketBuffer(), std::move(query), nullptr); | |
615 | handleResponse(now, std::move(response)); | |
616 | } | |
617 | ||
618 | private: | |
619 | std::shared_ptr<IncomingTCPConnectionState> d_state; | |
ae3b96d9 RG |
620 | }; |
621 | ||
89e62bd8 RG |
622 | class TCPCrossProtocolQuery : public CrossProtocolQuery |
623 | { | |
624 | public: | |
ae3b96d9 | 625 | TCPCrossProtocolQuery(PacketBuffer&& buffer, IDState&& ids, std::shared_ptr<DownstreamState>& ds, std::shared_ptr<TCPCrossProtocolQuerySender>& sender): d_sender(sender) |
89e62bd8 RG |
626 | { |
627 | query = InternalQuery(std::move(buffer), std::move(ids)); | |
628 | downstream = ds; | |
89e62bd8 RG |
629 | proxyProtocolPayloadSize = 0; |
630 | } | |
631 | ||
632 | ~TCPCrossProtocolQuery() | |
633 | { | |
634 | } | |
635 | ||
636 | std::shared_ptr<TCPQuerySender> getTCPQuerySender() override | |
637 | { | |
638 | return d_sender; | |
639 | } | |
640 | ||
641 | private: | |
ae3b96d9 | 642 | std::shared_ptr<TCPCrossProtocolQuerySender> d_sender; |
89e62bd8 RG |
643 | }; |
644 | ||
4253054a | 645 | static void handleQuery(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) |
d0ae6360 RG |
646 | { |
647 | if (state->d_querySize < sizeof(dnsheader)) { | |
648 | ++g_stats.nonCompliantQueries; | |
fd23f5de | 649 | ++state->d_ci.cs->nonCompliantQueries; |
4253054a RG |
650 | state->terminateClientConnection(); |
651 | return; | |
d0ae6360 | 652 | } |
be35c484 | 653 | |
d0ae6360 RG |
654 | ++state->d_queriesCount; |
655 | ++state->d_ci.cs->queries; | |
656 | ++g_stats.queries; | |
657 | ||
bb3954f0 RG |
658 | if (state->d_handler.isTLS()) { |
659 | auto tlsVersion = state->d_handler.getTLSVersion(); | |
660 | switch (tlsVersion) { | |
661 | case LibsslTLSVersion::TLS10: | |
662 | ++state->d_ci.cs->tls10queries; | |
663 | break; | |
664 | case LibsslTLSVersion::TLS11: | |
665 | ++state->d_ci.cs->tls11queries; | |
666 | break; | |
667 | case LibsslTLSVersion::TLS12: | |
668 | ++state->d_ci.cs->tls12queries; | |
669 | break; | |
670 | case LibsslTLSVersion::TLS13: | |
671 | ++state->d_ci.cs->tls13queries; | |
672 | break; | |
673 | default: | |
674 | ++state->d_ci.cs->tlsUnknownqueries; | |
675 | } | |
676 | } | |
677 | ||
d0ae6360 RG |
678 | /* we need an accurate ("real") value for the response and |
679 | to store into the IDS, but not for insertion into the | |
680 | rings for example */ | |
d0ae6360 | 681 | struct timespec queryRealTime; |
d0ae6360 RG |
682 | gettime(&queryRealTime, true); |
683 | ||
73571c03 | 684 | std::unique_ptr<DNSCryptQuery> dnsCryptQuery{nullptr}; |
341d2553 | 685 | auto dnsCryptResponse = checkDNSCryptQuery(*state->d_ci.cs, state->d_buffer, dnsCryptQuery, queryRealTime.tv_sec, true); |
d0ae6360 | 686 | if (dnsCryptResponse) { |
086a43eb | 687 | TCPResponse response; |
086a43eb | 688 | state->d_state = IncomingTCPConnectionState::State::idle; |
20ad5012 | 689 | ++state->d_currentQueriesCount; |
4253054a RG |
690 | state->queueResponse(state, now, std::move(response)); |
691 | return; | |
d0ae6360 | 692 | } |
0beaa5c8 | 693 | |
9a872eb7 RG |
694 | { |
695 | /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */ | |
696 | auto* dh = reinterpret_cast<dnsheader*>(state->d_buffer.data()); | |
fd23f5de | 697 | if (!checkQueryHeaders(dh, *state->d_ci.cs)) { |
4253054a RG |
698 | state->terminateClientConnection(); |
699 | return; | |
9a872eb7 RG |
700 | } |
701 | ||
702 | if (dh->qdcount == 0) { | |
703 | TCPResponse response; | |
704 | dh->rcode = RCode::NotImp; | |
705 | dh->qr = true; | |
706 | response.d_selfGenerated = true; | |
707 | response.d_buffer = std::move(state->d_buffer); | |
708 | state->d_state = IncomingTCPConnectionState::State::idle; | |
709 | ++state->d_currentQueriesCount; | |
4253054a RG |
710 | state->queueResponse(state, now, std::move(response)); |
711 | return; | |
9a872eb7 | 712 | } |
d0ae6360 | 713 | } |
0beaa5c8 | 714 | |
d0ae6360 | 715 | uint16_t qtype, qclass; |
341d2553 RG |
716 | unsigned int qnameWireLength = 0; |
717 | DNSName qname(reinterpret_cast<const char*>(state->d_buffer.data()), state->d_buffer.size(), sizeof(dnsheader), false, &qtype, &qclass, &qnameWireLength); | |
2c0e81bb | 718 | dnsdist::Protocol protocol = dnsdist::Protocol::DoTCP; |
d84ea5be | 719 | if (dnsCryptQuery) { |
2c0e81bb | 720 | protocol = dnsdist::Protocol::DNSCryptTCP; |
d84ea5be RG |
721 | } |
722 | else if (state->d_handler.isTLS()) { | |
2c0e81bb | 723 | protocol = dnsdist::Protocol::DoT; |
d84ea5be RG |
724 | } |
725 | ||
726 | DNSQuestion dq(&qname, qtype, qclass, &state->d_proxiedDestination, &state->d_proxiedRemote, state->d_buffer, protocol, &queryRealTime); | |
d0ae6360 | 727 | dq.dnsCryptQuery = std::move(dnsCryptQuery); |
046bac5c | 728 | dq.sni = state->d_handler.getServerNameIndication(); |
448d66d4 | 729 | if (state->d_proxyProtocolValues) { |
c4f2293b RG |
730 | /* we need to copy them, because the next queries received on that connection will |
731 | need to get the _unaltered_ values */ | |
732 | dq.proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(*state->d_proxyProtocolValues); | |
448d66d4 | 733 | } |
e91084ce | 734 | |
2c0e81bb | 735 | if (dq.qtype == QType::AXFR || dq.qtype == QType::IXFR) { |
d0ae6360 RG |
736 | dq.skipCache = true; |
737 | } | |
9396d955 | 738 | |
086a43eb RG |
739 | std::shared_ptr<DownstreamState> ds; |
740 | auto result = processQuery(dq, *state->d_ci.cs, state->d_threadData.holders, ds); | |
9396d955 | 741 | |
d0ae6360 | 742 | if (result == ProcessQueryResult::Drop) { |
4253054a RG |
743 | state->terminateClientConnection(); |
744 | return; | |
d0ae6360 | 745 | } |
2efd427d | 746 | |
341d2553 | 747 | // the buffer might have been invalidated by now |
9a872eb7 | 748 | const dnsheader* dh = dq.getHeader(); |
d0ae6360 | 749 | if (result == ProcessQueryResult::SendAnswer) { |
086a43eb | 750 | TCPResponse response; |
3728cbca | 751 | response.d_selfGenerated = true; |
12593efb CHB |
752 | response.d_idstate.origID = dh->id; |
753 | response.d_idstate.cs = state->d_ci.cs; | |
30067fd6 | 754 | setIDStateFromDNSQuestion(response.d_idstate, dq, std::move(qname)); |
12593efb | 755 | |
30067fd6 CHB |
756 | memcpy(&response.d_cleartextDH, dh, sizeof(response.d_cleartextDH)); |
757 | response.d_buffer = std::move(state->d_buffer); | |
12593efb | 758 | |
086a43eb | 759 | state->d_state = IncomingTCPConnectionState::State::idle; |
20ad5012 | 760 | ++state->d_currentQueriesCount; |
4253054a RG |
761 | state->queueResponse(state, now, std::move(response)); |
762 | return; | |
d0ae6360 | 763 | } |
2efd427d | 764 | |
086a43eb | 765 | if (result != ProcessQueryResult::PassToBackend || ds == nullptr) { |
4253054a RG |
766 | state->terminateClientConnection(); |
767 | return; | |
086a43eb RG |
768 | } |
769 | ||
086a43eb | 770 | IDState ids; |
086a43eb | 771 | setIDStateFromDNSQuestion(ids, dq, std::move(qname)); |
8ac88d69 | 772 | ids.origID = dh->id; |
5a96fcd2 | 773 | ids.cs = state->d_ci.cs; |
2d4783a8 | 774 | |
89e62bd8 RG |
775 | ++state->d_currentQueriesCount; |
776 | ||
0e6892c6 | 777 | std::string proxyProtocolPayload; |
89e62bd8 | 778 | if (ds->isDoH()) { |
f05cd66c RG |
779 | vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).toString(), state->d_proxiedRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), state->d_buffer.size(), ds->getName()); |
780 | ||
0e6892c6 RG |
781 | /* we need to do this _before_ creating the cross protocol query because |
782 | after that the buffer will have been moved */ | |
8cf75ac2 | 783 | if (ds->d_config.useProxyProtocol) { |
0e6892c6 RG |
784 | proxyProtocolPayload = getProxyProtocolPayload(dq); |
785 | } | |
786 | ||
38edba89 | 787 | auto incoming = std::make_shared<TCPCrossProtocolQuerySender>(state); |
ae3b96d9 | 788 | auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(state->d_buffer), std::move(ids), ds, incoming); |
0e6892c6 | 789 | cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload); |
89e62bd8 RG |
790 | |
791 | ds->passCrossProtocolQuery(std::move(cpq)); | |
792 | return; | |
793 | } | |
794 | ||
acf2a221 | 795 | prependSizeToTCPQuery(state->d_buffer, 0); |
bde73d5b | 796 | |
c4f2293b | 797 | auto downstreamConnection = state->getDownstreamConnection(ds, dq.proxyProtocolValues, now); |
c4f2293b | 798 | |
8cf75ac2 | 799 | if (ds->d_config.useProxyProtocol) { |
fbfb8fb8 RG |
800 | /* if we ever sent a TLV over a connection, we can never go back */ |
801 | if (!state->d_proxyProtocolPayloadHasTLV) { | |
802 | state->d_proxyProtocolPayloadHasTLV = dq.proxyProtocolValues && !dq.proxyProtocolValues->empty(); | |
803 | } | |
bde73d5b | 804 | |
086a43eb | 805 | proxyProtocolPayload = getProxyProtocolPayload(dq); |
fbfb8fb8 | 806 | } |
bde73d5b | 807 | |
c4f2293b RG |
808 | if (dq.proxyProtocolValues) { |
809 | downstreamConnection->setProxyProtocolValuesSent(std::move(dq.proxyProtocolValues)); | |
810 | } | |
d11c4232 | 811 | |
97e8507d | 812 | TCPQuery query(std::move(state->d_buffer), std::move(ids)); |
c48a3e33 | 813 | query.d_proxyProtocolPayload = std::move(proxyProtocolPayload); |
5cc8371b | 814 | |
d84ea5be | 815 | vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), state->d_proxiedRemote.toStringWithPort(), (state->d_handler.isTLS() ? "DoT" : "TCP"), query.d_buffer.size(), ds->getName()); |
2c0e81bb RG |
816 | std::shared_ptr<TCPQuerySender> incoming = state; |
817 | downstreamConnection->queueQuery(incoming, std::move(query)); | |
d0ae6360 | 818 | } |
3f6d07a4 | 819 | |
086a43eb | 820 | void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param) |
d0ae6360 | 821 | { |
086a43eb | 822 | auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param); |
9c2ca55f RG |
823 | if (fd != conn->d_handler.getDescriptor()) { |
824 | throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor())); | |
d0ae6360 | 825 | } |
3f6d07a4 | 826 | |
086a43eb | 827 | struct timeval now; |
b7eb6b6e | 828 | gettimeofday(&now, nullptr); |
086a43eb RG |
829 | handleIO(conn, now); |
830 | } | |
8b92100f | 831 | |
086a43eb RG |
832 | void IncomingTCPConnectionState::handleIO(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) |
833 | { | |
086a43eb RG |
834 | // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read |
835 | // even though the underlying socket is not ready, so we need to actually ask for the data first | |
086a43eb RG |
836 | IOState iostate = IOState::Done; |
837 | do { | |
838 | iostate = IOState::Done; | |
839 | IOStateGuard ioGuard(state->d_ioState); | |
7129b5c4 | 840 | |
086a43eb RG |
841 | if (state->maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) { |
842 | vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", state->d_ci.remote.toStringWithPort()); | |
843 | // will be handled by the ioGuard | |
844 | //handleNewIOState(state, IOState::Done, fd, handleIOCallback); | |
845 | return; | |
d0ae6360 | 846 | } |
ca404e94 | 847 | |
4253054a RG |
848 | state->d_lastIOBlocked = false; |
849 | ||
086a43eb RG |
850 | try { |
851 | if (state->d_state == IncomingTCPConnectionState::State::doingHandshake) { | |
20ad5012 | 852 | DEBUGLOG("doing handshake"); |
086a43eb RG |
853 | iostate = state->d_handler.tryHandshake(); |
854 | if (iostate == IOState::Done) { | |
20ad5012 | 855 | DEBUGLOG("handshake done"); |
086a43eb RG |
856 | if (state->d_handler.isTLS()) { |
857 | if (!state->d_handler.hasTLSSessionBeenResumed()) { | |
858 | ++state->d_ci.cs->tlsNewSessions; | |
fbfb8fb8 RG |
859 | } |
860 | else { | |
086a43eb RG |
861 | ++state->d_ci.cs->tlsResumptions; |
862 | } | |
863 | if (state->d_handler.getResumedFromInactiveTicketKey()) { | |
864 | ++state->d_ci.cs->tlsInactiveTicketKey; | |
865 | } | |
866 | if (state->d_handler.getUnknownTicketKey()) { | |
867 | ++state->d_ci.cs->tlsUnknownTicketKey; | |
fbfb8fb8 RG |
868 | } |
869 | } | |
d8c19b98 | 870 | |
086a43eb | 871 | state->d_handshakeDoneTime = now; |
448d66d4 RG |
872 | if (expectProxyProtocolFrom(state->d_ci.remote)) { |
873 | state->d_state = IncomingTCPConnectionState::State::readingProxyProtocolHeader; | |
874 | state->d_buffer.resize(s_proxyProtocolMinimumHeaderSize); | |
875 | state->d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize; | |
876 | } | |
877 | else { | |
878 | state->d_state = IncomingTCPConnectionState::State::readingQuerySize; | |
879 | } | |
0ffc95ec | 880 | } |
086a43eb | 881 | else { |
4253054a | 882 | state->d_lastIOBlocked = true; |
0ffc95ec | 883 | } |
d0ae6360 | 884 | } |
a6e9e107 | 885 | |
4253054a | 886 | if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingProxyProtocolHeader) { |
448d66d4 | 887 | do { |
40a81af4 | 888 | DEBUGLOG("reading proxy protocol header"); |
448d66d4 RG |
889 | iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_proxyProtocolNeed); |
890 | if (iostate == IOState::Done) { | |
891 | state->d_buffer.resize(state->d_currentPos); | |
892 | ssize_t remaining = isProxyHeaderComplete(state->d_buffer); | |
893 | if (remaining == 0) { | |
894 | vinfolog("Unable to consume proxy protocol header in packet from TCP client %s", state->d_ci.remote.toStringWithPort()); | |
895 | ++g_stats.proxyProtocolInvalid; | |
896 | break; | |
897 | } | |
898 | else if (remaining < 0) { | |
899 | state->d_proxyProtocolNeed += -remaining; | |
900 | state->d_buffer.resize(state->d_currentPos + state->d_proxyProtocolNeed); | |
901 | /* we need to keep reading, since we might have buffered data */ | |
902 | iostate = IOState::NeedRead; | |
903 | } | |
904 | else { | |
905 | /* proxy header received */ | |
906 | std::vector<ProxyProtocolValue> proxyProtocolValues; | |
907 | if (!handleProxyProtocol(state->d_ci.remote, true, *state->d_threadData.holders.acl, state->d_buffer, state->d_proxiedRemote, state->d_proxiedDestination, proxyProtocolValues)) { | |
908 | vinfolog("Error handling the Proxy Protocol received from TCP client %s", state->d_ci.remote.toStringWithPort()); | |
909 | break; | |
910 | } | |
911 | ||
912 | if (!proxyProtocolValues.empty()) { | |
913 | state->d_proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(std::move(proxyProtocolValues)); | |
914 | } | |
915 | ||
916 | state->d_state = IncomingTCPConnectionState::State::readingQuerySize; | |
917 | state->d_buffer.resize(sizeof(uint16_t)); | |
918 | state->d_currentPos = 0; | |
919 | state->d_proxyProtocolNeed = 0; | |
920 | break; | |
921 | } | |
922 | } | |
923 | else { | |
4253054a | 924 | state->d_lastIOBlocked = true; |
448d66d4 RG |
925 | } |
926 | } | |
4253054a | 927 | while (state->active() && !state->d_lastIOBlocked); |
448d66d4 RG |
928 | } |
929 | ||
4253054a RG |
930 | if (!state->d_lastIOBlocked && (state->d_state == IncomingTCPConnectionState::State::waitingForQuery || |
931 | state->d_state == IncomingTCPConnectionState::State::readingQuerySize)) { | |
20ad5012 | 932 | DEBUGLOG("reading query size"); |
086a43eb | 933 | iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, sizeof(uint16_t)); |
4253054a RG |
934 | if (state->d_currentPos > 0) { |
935 | /* if we got at least one byte, we can't go around sending responses */ | |
936 | state->d_state = IncomingTCPConnectionState::State::readingQuerySize; | |
937 | } | |
938 | ||
086a43eb | 939 | if (iostate == IOState::Done) { |
20ad5012 | 940 | DEBUGLOG("query size received"); |
086a43eb RG |
941 | state->d_state = IncomingTCPConnectionState::State::readingQuery; |
942 | state->d_querySizeReadTime = now; | |
943 | if (state->d_queriesCount == 0) { | |
944 | state->d_firstQuerySizeReadTime = now; | |
945 | } | |
946 | state->d_querySize = state->d_buffer.at(0) * 256 + state->d_buffer.at(1); | |
947 | if (state->d_querySize < sizeof(dnsheader)) { | |
948 | /* go away */ | |
d46e6c91 | 949 | state->terminateClientConnection(); |
086a43eb RG |
950 | return; |
951 | } | |
e724d52a | 952 | |
086a43eb RG |
953 | /* allocate a bit more memory to be able to spoof the content, get an answer from the cache |
954 | or to add ECS without allocating a new buffer */ | |
955 | state->d_buffer.resize(std::max(state->d_querySize + static_cast<size_t>(512), s_maxPacketCacheEntrySize)); | |
956 | state->d_currentPos = 0; | |
957 | } | |
958 | else { | |
4253054a | 959 | state->d_lastIOBlocked = true; |
086a43eb | 960 | } |
e724d52a | 961 | } |
d0ae6360 | 962 | |
4253054a | 963 | if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::readingQuery) { |
20ad5012 | 964 | DEBUGLOG("reading query"); |
086a43eb RG |
965 | iostate = state->d_handler.tryRead(state->d_buffer, state->d_currentPos, state->d_querySize); |
966 | if (iostate == IOState::Done) { | |
20ad5012 | 967 | DEBUGLOG("query received"); |
341d2553 | 968 | state->d_buffer.resize(state->d_querySize); |
20ad5012 | 969 | |
4253054a RG |
970 | state->d_state = IncomingTCPConnectionState::State::idle; |
971 | handleQuery(state, now); | |
4253054a RG |
972 | /* the state might have been updated in the meantime, we don't want to override it |
973 | in that case */ | |
974 | if (state->active() && state->d_state != IncomingTCPConnectionState::State::idle) { | |
e82bf80f RG |
975 | if (state->d_ioState->isWaitingForRead()) { |
976 | iostate = IOState::NeedRead; | |
977 | } | |
978 | else if (state->d_ioState->isWaitingForWrite()) { | |
979 | iostate = IOState::NeedWrite; | |
980 | } | |
981 | else { | |
982 | iostate = IOState::Done; | |
983 | } | |
610e1bcc | 984 | } |
846b63bb | 985 | } |
086a43eb | 986 | else { |
4253054a | 987 | state->d_lastIOBlocked = true; |
086a43eb | 988 | } |
d0ae6360 | 989 | } |
d0ae6360 | 990 | |
4253054a | 991 | if (!state->d_lastIOBlocked && state->d_state == IncomingTCPConnectionState::State::sendingResponse) { |
20ad5012 | 992 | DEBUGLOG("sending response"); |
086a43eb RG |
993 | iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size()); |
994 | if (iostate == IOState::Done) { | |
4253054a | 995 | DEBUGLOG("response sent from "<<__PRETTY_FUNCTION__); |
40a81af4 | 996 | handleResponseSent(state, state->d_currentResponse); |
4253054a RG |
997 | state->d_state = IncomingTCPConnectionState::State::idle; |
998 | } | |
999 | else { | |
1000 | state->d_lastIOBlocked = true; | |
1001 | } | |
1002 | } | |
1003 | ||
1004 | if (state->active() && | |
1005 | !state->d_lastIOBlocked && | |
1006 | iostate == IOState::Done && | |
1007 | (state->d_state == IncomingTCPConnectionState::State::idle || | |
1008 | state->d_state == IncomingTCPConnectionState::State::waitingForQuery)) | |
1009 | { | |
86348274 | 1010 | // try sending queued responses |
40a81af4 | 1011 | DEBUGLOG("send responses, if any"); |
4253054a RG |
1012 | iostate = sendQueuedResponses(state, now); |
1013 | ||
d46e6c91 | 1014 | if (!state->d_lastIOBlocked && state->active() && iostate == IOState::Done) { |
4253054a RG |
1015 | // if the query has been passed to a backend, or dropped, and the responses have been sent, |
1016 | // we can start reading again | |
ee189f5c RG |
1017 | if (state->canAcceptNewQueries(now)) { |
1018 | state->resetForNewQuery(); | |
1019 | iostate = IOState::NeedRead; | |
1020 | } | |
1021 | else { | |
1022 | state->d_state = IncomingTCPConnectionState::State::idle; | |
1023 | iostate = IOState::Done; | |
44e34991 | 1024 | } |
61d10a4d | 1025 | } |
8a5d5053 | 1026 | } |
d0ae6360 | 1027 | |
086a43eb RG |
1028 | if (state->d_state != IncomingTCPConnectionState::State::idle && |
1029 | state->d_state != IncomingTCPConnectionState::State::doingHandshake && | |
448d66d4 | 1030 | state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader && |
4253054a | 1031 | state->d_state != IncomingTCPConnectionState::State::waitingForQuery && |
086a43eb RG |
1032 | state->d_state != IncomingTCPConnectionState::State::readingQuerySize && |
1033 | state->d_state != IncomingTCPConnectionState::State::readingQuery && | |
1034 | state->d_state != IncomingTCPConnectionState::State::sendingResponse) { | |
1035 | vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(state->d_state)); | |
d0ae6360 | 1036 | } |
4ab01344 | 1037 | } |
4253054a | 1038 | catch (const std::exception& e) { |
086a43eb RG |
1039 | /* most likely an EOF because the other end closed the connection, |
1040 | but it might also be a real IO error or something else. | |
1041 | Let's just drop the connection | |
1042 | */ | |
1043 | if (state->d_state == IncomingTCPConnectionState::State::idle || | |
3aeefc8e RG |
1044 | state->d_state == IncomingTCPConnectionState::State::waitingForQuery) { |
1045 | /* no need to increase any counters in that case, the client is simply done with us */ | |
1046 | } | |
1047 | else if (state->d_state == IncomingTCPConnectionState::State::doingHandshake || | |
1048 | state->d_state != IncomingTCPConnectionState::State::readingProxyProtocolHeader || | |
1049 | state->d_state == IncomingTCPConnectionState::State::waitingForQuery || | |
1050 | state->d_state == IncomingTCPConnectionState::State::readingQuerySize || | |
1051 | state->d_state == IncomingTCPConnectionState::State::readingQuery) { | |
086a43eb RG |
1052 | ++state->d_ci.cs->tcpDiedReadingQuery; |
1053 | } | |
1054 | else if (state->d_state == IncomingTCPConnectionState::State::sendingResponse) { | |
97e8507d | 1055 | /* unlikely to happen here, the exception should be handled in sendResponse() */ |
086a43eb | 1056 | ++state->d_ci.cs->tcpDiedSendingResponse; |
d0ae6360 | 1057 | } |
a227f47d | 1058 | |
e82bf80f | 1059 | if (state->d_ioState->isWaitingForWrite() || state->d_queriesCount == 0) { |
20ad5012 | 1060 | DEBUGLOG("Got an exception while handling TCP query: "<<e.what()); |
e82bf80f | 1061 | vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (state->d_ioState->isWaitingForRead() ? "reading" : "writing"), state->d_ci.remote.toStringWithPort(), e.what()); |
086a43eb RG |
1062 | } |
1063 | else { | |
c6f504cb | 1064 | vinfolog("Closing TCP client connection with %s: %s", state->d_ci.remote.toStringWithPort(), e.what()); |
20ad5012 | 1065 | DEBUGLOG("Closing TCP client connection: "<<e.what()); |
086a43eb RG |
1066 | } |
1067 | /* remove this FD from the IO multiplexer */ | |
40a81af4 | 1068 | state->terminateClientConnection(); |
a6e9e107 RG |
1069 | } |
1070 | ||
4253054a | 1071 | if (!state->active()) { |
40a81af4 | 1072 | DEBUGLOG("state is no longer active"); |
4253054a RG |
1073 | return; |
1074 | } | |
1075 | ||
086a43eb | 1076 | if (iostate == IOState::Done) { |
086a43eb | 1077 | state->d_ioState->update(iostate, handleIOCallback, state); |
d0ae6360 RG |
1078 | } |
1079 | else { | |
489caa9f | 1080 | updateIO(state, iostate, now); |
d0ae6360 | 1081 | } |
086a43eb RG |
1082 | ioGuard.release(); |
1083 | } | |
4253054a | 1084 | while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !state->d_lastIOBlocked); |
086a43eb RG |
1085 | } |
1086 | ||
2c0e81bb | 1087 | void IncomingTCPConnectionState::notifyIOError(IDState&& query, const struct timeval& now) |
086a43eb | 1088 | { |
2c0e81bb RG |
1089 | std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this(); |
1090 | ||
da029f91 | 1091 | --state->d_currentQueriesCount; |
40a81af4 | 1092 | state->d_hadErrors = true; |
da029f91 | 1093 | |
c6c6f96e | 1094 | if (state->d_state == State::sendingResponse) { |
086a43eb RG |
1095 | /* if we have responses to send, let's do that first */ |
1096 | } | |
c6c6f96e | 1097 | else if (!state->d_queuedResponses.empty()) { |
086a43eb | 1098 | /* stop reading and send what we have */ |
9c2ca55f | 1099 | try { |
df5f9e85 RG |
1100 | auto iostate = sendQueuedResponses(state, now); |
1101 | ||
1102 | if (state->active() && iostate != IOState::Done) { | |
1103 | // we need to update the state right away, nobody will do that for us | |
489caa9f | 1104 | updateIO(state, iostate, now); |
df5f9e85 | 1105 | } |
9c2ca55f RG |
1106 | } |
1107 | catch (const std::exception& e) { | |
df5f9e85 | 1108 | vinfolog("Exception in notifyIOError: %s", e.what()); |
9c2ca55f | 1109 | } |
d0ae6360 RG |
1110 | } |
1111 | else { | |
086a43eb | 1112 | // the backend code already tried to reconnect if it was possible |
40a81af4 | 1113 | state->terminateClientConnection(); |
d0ae6360 RG |
1114 | } |
1115 | } | |
1116 | ||
2c0e81bb | 1117 | void IncomingTCPConnectionState::handleXFRResponse(const struct timeval& now, TCPResponse&& response) |
77c3701d | 1118 | { |
2c0e81bb | 1119 | std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this(); |
4253054a | 1120 | queueResponse(state, now, std::move(response)); |
086a43eb | 1121 | } |
77c3701d | 1122 | |
c6c6f96e | 1123 | void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write) |
086a43eb | 1124 | { |
40a81af4 | 1125 | vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort()); |
20ad5012 | 1126 | DEBUGLOG("client timeout"); |
645a1ca4 | 1127 | DEBUGLOG("Processed "<<state->d_queriesCount<<" queries, current count is "<<state->d_currentQueriesCount<<", "<<state->d_ownedConnectionsToBackend.size()<<" owned connections, "<<state->d_queuedResponses.size()<<" response queued"); |
c6c6f96e RG |
1128 | |
1129 | if (write || state->d_currentQueriesCount == 0) { | |
1130 | ++state->d_ci.cs->tcpClientTimeouts; | |
40a81af4 | 1131 | state->d_ioState.reset(); |
c6c6f96e RG |
1132 | } |
1133 | else { | |
1134 | DEBUGLOG("Going idle"); | |
1135 | /* we still have some queries in flight, let's just stop reading for now */ | |
1136 | state->d_state = IncomingTCPConnectionState::State::idle; | |
1137 | state->d_ioState->update(IOState::Done, handleIOCallback, state); | |
c6c6f96e | 1138 | } |
77c3701d RG |
1139 | } |
1140 | ||
d0ae6360 RG |
1141 | static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param) |
1142 | { | |
1143 | auto threadData = boost::any_cast<TCPClientThreadData*>(param); | |
1144 | ||
1145 | ConnectionInfo* citmp{nullptr}; | |
1146 | ||
ea87ba72 RG |
1147 | ssize_t got = read(pipefd, &citmp, sizeof(citmp)); |
1148 | if (got == 0) { | |
acadc544 | 1149 | throw std::runtime_error("EOF while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); |
ea87ba72 RG |
1150 | } |
1151 | else if (got == -1) { | |
1152 | if (errno == EAGAIN || errno == EINTR) { | |
1153 | return; | |
1154 | } | |
a702a96c | 1155 | throw std::runtime_error("Error while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror()); |
d0ae6360 | 1156 | } |
ea87ba72 RG |
1157 | else if (got != sizeof(citmp)) { |
1158 | throw std::runtime_error("Partial read while reading from the TCP acceptor pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); | |
d0ae6360 RG |
1159 | } |
1160 | ||
acadc544 RG |
1161 | try { |
1162 | g_tcpclientthreads->decrementQueuedCount(); | |
d0ae6360 | 1163 | |
acadc544 | 1164 | struct timeval now; |
b7eb6b6e | 1165 | gettimeofday(&now, nullptr); |
acadc544 RG |
1166 | auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now); |
1167 | delete citmp; | |
1168 | citmp = nullptr; | |
d0ae6360 | 1169 | |
086a43eb | 1170 | IncomingTCPConnectionState::handleIO(state, now); |
acadc544 | 1171 | } |
2c0e81bb | 1172 | catch (...) { |
acadc544 RG |
1173 | delete citmp; |
1174 | citmp = nullptr; | |
1175 | throw; | |
1176 | } | |
d0ae6360 RG |
1177 | } |
1178 | ||
2c0e81bb RG |
1179 | static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param) |
1180 | { | |
1181 | auto threadData = boost::any_cast<TCPClientThreadData*>(param); | |
1182 | CrossProtocolQuery* tmp{nullptr}; | |
1183 | ||
1184 | ssize_t got = read(pipefd, &tmp, sizeof(tmp)); | |
1185 | if (got == 0) { | |
1186 | throw std::runtime_error("EOF while reading from the TCP cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); | |
1187 | } | |
1188 | else if (got == -1) { | |
1189 | if (errno == EAGAIN || errno == EINTR) { | |
1190 | return; | |
1191 | } | |
1192 | throw std::runtime_error("Error while reading from the TCP cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror()); | |
1193 | } | |
1194 | else if (got != sizeof(tmp)) { | |
1195 | throw std::runtime_error("Partial read while reading from the TCP cross-protocol pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); | |
1196 | } | |
1197 | ||
1198 | try { | |
1199 | struct timeval now; | |
1200 | gettimeofday(&now, nullptr); | |
1201 | ||
585c03c4 | 1202 | std::shared_ptr<TCPQuerySender> tqs = tmp->getTCPQuerySender(); |
2c0e81bb RG |
1203 | auto query = std::move(tmp->query); |
1204 | auto downstreamServer = std::move(tmp->downstream); | |
acf2a221 | 1205 | auto proxyProtocolPayloadSize = tmp->proxyProtocolPayloadSize; |
2c0e81bb RG |
1206 | delete tmp; |
1207 | tmp = nullptr; | |
1208 | ||
e82bf80f | 1209 | try { |
9fd8dc1f | 1210 | auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string()); |
2c0e81bb | 1211 | |
e82bf80f | 1212 | prependSizeToTCPQuery(query.d_buffer, proxyProtocolPayloadSize); |
1c9c001c | 1213 | query.d_proxyProtocolPayloadAddedSize = proxyProtocolPayloadSize; |
e82bf80f RG |
1214 | downstream->queueQuery(tqs, std::move(query)); |
1215 | } | |
1216 | catch (...) { | |
1217 | tqs->notifyIOError(std::move(query.d_idstate), now); | |
1218 | } | |
2c0e81bb RG |
1219 | } |
1220 | catch (...) { | |
1221 | delete tmp; | |
1222 | tmp = nullptr; | |
2c0e81bb RG |
1223 | } |
1224 | } | |
1225 | ||
ae3b96d9 RG |
1226 | static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& param) |
1227 | { | |
1228 | TCPCrossProtocolResponse* tmp{nullptr}; | |
1229 | ||
1230 | ssize_t got = read(pipefd, &tmp, sizeof(tmp)); | |
1231 | if (got == 0) { | |
1232 | throw std::runtime_error("EOF while reading from the TCP cross-protocol response pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); | |
1233 | } | |
1234 | else if (got == -1) { | |
1235 | if (errno == EAGAIN || errno == EINTR) { | |
1236 | return; | |
1237 | } | |
1238 | throw std::runtime_error("Error while reading from the TCP cross-protocol response pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode:" + stringerror()); | |
1239 | } | |
1240 | else if (got != sizeof(tmp)) { | |
1241 | throw std::runtime_error("Partial read while reading from the TCP cross-protocol response pipe (" + std::to_string(pipefd) + ") in " + std::string(isNonBlocking(pipefd) ? "non-blocking" : "blocking") + " mode"); | |
1242 | } | |
1243 | ||
1244 | auto response = std::move(*tmp); | |
1245 | delete tmp; | |
1246 | tmp = nullptr; | |
1247 | ||
f05cd66c RG |
1248 | try { |
1249 | if (response.d_response.d_buffer.empty()) { | |
1250 | response.d_state->notifyIOError(std::move(response.d_response.d_idstate), response.d_now); | |
1251 | } | |
1252 | else if (response.d_response.d_idstate.qtype == QType::AXFR || response.d_response.d_idstate.qtype == QType::IXFR) { | |
1253 | response.d_state->handleXFRResponse(response.d_now, std::move(response.d_response)); | |
1254 | } | |
1255 | else { | |
1256 | response.d_state->handleResponse(response.d_now, std::move(response.d_response)); | |
1257 | } | |
ae3b96d9 | 1258 | } |
f05cd66c RG |
1259 | catch (...) { |
1260 | /* no point bubbling up from there */ | |
ae3b96d9 RG |
1261 | } |
1262 | } | |
1263 | ||
1264 | static void tcpClientThread(int pipefd, int crossProtocolQueriesPipeFD, int crossProtocolResponsesListenPipeFD, int crossProtocolResponsesWritePipeFD) | |
d0ae6360 RG |
1265 | { |
1266 | /* we get launched with a pipe on which we receive file descriptors from clients that we own | |
1267 | from that point on */ | |
1268 | ||
1269 | setThreadName("dnsdist/tcpClie"); | |
1270 | ||
55a2979f RG |
1271 | try { |
1272 | TCPClientThreadData data; | |
1273 | /* this is the writing end! */ | |
1274 | data.crossProtocolResponsesPipe = crossProtocolResponsesWritePipeFD; | |
1275 | data.mplexer->addReadFD(pipefd, handleIncomingTCPQuery, &data); | |
1276 | data.mplexer->addReadFD(crossProtocolQueriesPipeFD, handleCrossProtocolQuery, &data); | |
1277 | data.mplexer->addReadFD(crossProtocolResponsesListenPipeFD, handleCrossProtocolResponse, &data); | |
8904f4fd | 1278 | |
55a2979f RG |
1279 | struct timeval now; |
1280 | gettimeofday(&now, nullptr); | |
1281 | time_t lastTimeoutScan = now.tv_sec; | |
1282 | ||
1283 | for (;;) { | |
1284 | data.mplexer->run(&now); | |
1285 | ||
1286 | try { | |
9fd8dc1f | 1287 | t_downstreamTCPConnectionsManager.cleanupClosedConnections(now); |
55a2979f RG |
1288 | |
1289 | if (now.tv_sec > lastTimeoutScan) { | |
1290 | lastTimeoutScan = now.tv_sec; | |
1291 | auto expiredReadConns = data.mplexer->getTimeouts(now, false); | |
1292 | for (const auto& cbData : expiredReadConns) { | |
1293 | if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { | |
1294 | auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); | |
1295 | if (cbData.first == state->d_handler.getDescriptor()) { | |
1296 | vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort()); | |
1297 | state->handleTimeout(state, false); | |
1298 | } | |
da63cb08 | 1299 | } |
55a2979f RG |
1300 | else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { |
1301 | auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); | |
1302 | vinfolog("Timeout (read) from remote backend %s", conn->getBackendName()); | |
1303 | conn->handleTimeout(now, false); | |
da63cb08 | 1304 | } |
55a2979f | 1305 | } |
da63cb08 | 1306 | |
55a2979f RG |
1307 | auto expiredWriteConns = data.mplexer->getTimeouts(now, true); |
1308 | for (const auto& cbData : expiredWriteConns) { | |
1309 | if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { | |
1310 | auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second); | |
1311 | if (cbData.first == state->d_handler.getDescriptor()) { | |
1312 | vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort()); | |
1313 | state->handleTimeout(state, true); | |
1314 | } | |
da63cb08 | 1315 | } |
55a2979f RG |
1316 | else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { |
1317 | auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second); | |
1318 | vinfolog("Timeout (write) from remote backend %s", conn->getBackendName()); | |
1319 | conn->handleTimeout(now, true); | |
da63cb08 | 1320 | } |
55a2979f RG |
1321 | } |
1322 | ||
1323 | if (g_tcpStatesDumpRequested > 0) { | |
1324 | /* just to keep things clean in the output, debug only */ | |
1325 | static std::mutex s_lock; | |
1326 | std::lock_guard<decltype(s_lock)> lck(s_lock); | |
1327 | if (g_tcpStatesDumpRequested > 0) { | |
1328 | /* no race here, we took the lock so it can only be increased in the meantime */ | |
1329 | --g_tcpStatesDumpRequested; | |
1330 | errlog("Dumping the TCP states, as requested:"); | |
1331 | data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) | |
1332 | { | |
1333 | struct timeval lnow; | |
1334 | gettimeofday(&lnow, nullptr); | |
1335 | if (ttd.tv_sec > 0) { | |
1336 | errlog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec-lnow.tv_sec)); | |
1337 | } | |
1338 | else { | |
1339 | errlog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write")); | |
1340 | } | |
1341 | ||
1342 | if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) { | |
1343 | auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param); | |
1344 | errlog(" - %s", state->toString()); | |
1345 | } | |
1346 | else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) { | |
1347 | auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param); | |
1348 | errlog(" - %s", conn->toString()); | |
1349 | } | |
1350 | else if (param.type() == typeid(TCPClientThreadData*)) { | |
1351 | errlog(" - Worker thread pipe"); | |
1352 | } | |
1353 | }); | |
4a62d2bf | 1354 | errlog("The TCP/DoT client cache has %d active and %d idle outgoing connections cached", t_downstreamTCPConnectionsManager.getActiveCount(), t_downstreamTCPConnectionsManager.getIdleCount()); |
da63cb08 | 1355 | } |
55a2979f | 1356 | } |
fb59decc | 1357 | } |
da63cb08 | 1358 | } |
55a2979f RG |
1359 | catch (const std::exception& e) { |
1360 | errlog("Error in TCP worker thread: %s", e.what()); | |
1361 | } | |
d0ae6360 | 1362 | } |
8a5d5053 | 1363 | } |
55a2979f RG |
1364 | catch (const std::exception& e) { |
1365 | errlog("Fatal error in TCP worker thread: %s", e.what()); | |
1366 | } | |
8a5d5053 | 1367 | } |
1368 | ||
d0ae6360 | 1369 | /* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and |
8a5d5053 | 1370 | they will hand off to worker threads & spawn more of them if required |
1371 | */ | |
fd51c832 | 1372 | void tcpAcceptorThread(ClientState* cs) |
8a5d5053 | 1373 | { |
519f5484 | 1374 | setThreadName("dnsdist/tcpAcce"); |
fd51c832 | 1375 | |
9396d955 | 1376 | bool tcpClientCountIncremented = false; |
8a5d5053 | 1377 | ComboAddress remote; |
1378 | remote.sin4.sin_family = cs->local.sin4.sin_family; | |
d0ae6360 | 1379 | |
b719e465 | 1380 | auto acl = g_ACL.getLocal(); |
8a5d5053 | 1381 | for(;;) { |
275bcb5b | 1382 | std::unique_ptr<ConnectionInfo> ci; |
9396d955 | 1383 | tcpClientCountIncremented = false; |
8a5d5053 | 1384 | try { |
0c8d25a7 | 1385 | socklen_t remlen = remote.getSocklen(); |
d094bec2 | 1386 | ci = std::make_unique<ConnectionInfo>(cs); |
0c8d25a7 | 1387 | #ifdef HAVE_ACCEPT4 |
3e425868 | 1388 | ci->fd = accept4(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK); |
0c8d25a7 | 1389 | #else |
3e425868 | 1390 | ci->fd = accept(cs->tcpFD, reinterpret_cast<struct sockaddr*>(&remote), &remlen); |
0c8d25a7 | 1391 | #endif |
d094bec2 | 1392 | // will be decremented when the ConnectionInfo object is destroyed, no matter the reason |
cc8416ef | 1393 | auto concurrentConnections = ++cs->tcpCurrentConnections; |
519a0072 RG |
1394 | if (cs->d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > cs->d_tcpConcurrentConnectionsLimit) { |
1395 | continue; | |
1396 | } | |
1397 | ||
05e59526 RG |
1398 | if (concurrentConnections > cs->tcpMaxConcurrentConnections.load()) { |
1399 | cs->tcpMaxConcurrentConnections.store(concurrentConnections); | |
cc8416ef | 1400 | } |
cff9aa03 | 1401 | |
d094bec2 | 1402 | if (ci->fd < 0) { |
a702a96c | 1403 | throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str()); |
0c8d25a7 | 1404 | } |
963bef8d | 1405 | |
d094bec2 | 1406 | if (!acl->match(remote)) { |
cb167afd | 1407 | ++g_stats.aclDrops; |
b719e465 | 1408 | vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort()); |
1409 | continue; | |
1410 | } | |
8a5d5053 | 1411 | |
0c8d25a7 RG |
1412 | #ifndef HAVE_ACCEPT4 |
1413 | if (!setNonBlocking(ci->fd)) { | |
0c8d25a7 RG |
1414 | continue; |
1415 | } | |
1416 | #endif | |
c75d3c10 | 1417 | setTCPNoDelay(ci->fd); // disable NAGLE |
d094bec2 | 1418 | if (g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) { |
6c1ca990 RG |
1419 | vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort()); |
1420 | continue; | |
1421 | } | |
1422 | ||
9396d955 | 1423 | if (g_maxTCPConnectionsPerClient) { |
3d313832 | 1424 | auto tcpClientsCount = s_tcpClientsCount.lock(); |
9396d955 | 1425 | |
3d313832 | 1426 | if ((*tcpClientsCount)[remote] >= g_maxTCPConnectionsPerClient) { |
9396d955 RG |
1427 | vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort()); |
1428 | continue; | |
1429 | } | |
3d313832 | 1430 | (*tcpClientsCount)[remote]++; |
9396d955 RG |
1431 | tcpClientCountIncremented = true; |
1432 | } | |
1433 | ||
b719e465 | 1434 | vinfolog("Got TCP connection from %s", remote.toStringWithPort()); |
9396d955 | 1435 | |
8a5d5053 | 1436 | ci->remote = remote; |
2c0e81bb | 1437 | if (!g_tcpclientthreads->passConnectionToThread(std::move(ci))) { |
d094bec2 | 1438 | if (tcpClientCountIncremented) { |
9396d955 RG |
1439 | decrementTCPClientCount(remote); |
1440 | } | |
a9bf3ec4 | 1441 | } |
8a5d5053 | 1442 | } |
d094bec2 | 1443 | catch (const std::exception& e) { |
3c17e261 | 1444 | errlog("While reading a TCP question: %s", e.what()); |
d094bec2 | 1445 | if (tcpClientCountIncremented) { |
9396d955 RG |
1446 | decrementTCPClientCount(remote); |
1447 | } | |
3c17e261 | 1448 | } |
d094bec2 | 1449 | catch (...){} |
8a5d5053 | 1450 | } |
8a5d5053 | 1451 | } |