]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/recursordist/rec-tcp.cc
Merge pull request #14018 from omoerbeek/rec-proxy-exception
[thirdparty/pdns.git] / pdns / recursordist / rec-tcp.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23 #include "rec-main.hh"
24
25 #include "arguments.hh"
26 #include "logger.hh"
27 #include "mplexer.hh"
28 #include "uuid-utils.hh"
29
30 // OLD PRE 5.0.0 situation:
31 //
32 // When pdns-distributes-queries is false with reuseport true (the default since 4.9.0), TCP queries
33 // are read and handled by worker threads. If the kernel balancing is OK for TCP sockets (observed
34 // to be good on Debian bullseye, but not good on e.g. MacOS), the TCP handling is no extra burden.
35 // In the case of MacOS all incoming TCP queries are handled by a single worker, while incoming UDP
36 // queries do get distributed round-robin over the worker threads. Do note the TCP queries might
37 // need to wait until the g_maxUDPQueriesPerRound is reached.
38 //
39 // In the case of pdns-distributes-queries true and reuseport false the queries were read and
40 // initially processed by the distributor thread(s).
41 //
42 // Initial processing consist of parsing, calling gettag and checking if we have a packet cache
43 // hit. If that does not produce a hit, the query is passed to an mthread in the same way as with
44 // UDP queries, but do note that the mthread processing is serviced by the distributor thread. The
45 // final answer will be sent by the same distributor thread that originally picked up the query.
46 //
47 // Changing this, and having incoming TCP queries handled by worker threads is somewhat more complex
48 // than UDP, as the socket must remain available in the distributor thread (for reading more
49 // queries), but the TCP socket must also be passed to a worker thread so it can write its
50 // answer. The in-flight bookkeeping also has to be aware of how a query is handled to do the
51 // accounting properly. I am not sure if changing the current setup is worth all this trouble,
52 // especially since the default is now to not use pdns-distributes-queries, which works well in many
53 // cases.
54 //
55 // NEW SITUATION SINCE 5.0.0:
56 //
57 // The drawback mentioned in https://github.com/PowerDNS/pdns/issues/8394 are not longer true, so an
58 // alternative approach would be to introduce dedicated TCP worker thread(s).
59 //
60 // This approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor and
61 // worker thread(s) now no longer process TCP queries.
62
63 size_t g_tcpMaxQueriesPerConn;
64 unsigned int g_maxTCPPerClient;
65 int g_tcpTimeout;
66 bool g_anyToTcp;
67
68 uint16_t TCPConnection::s_maxInFlight;
69
70 thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
71
72 static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var);
73
74 #if 0
75 #define TCPLOG(tcpsock, x) \
76 do { \
77 cerr << []() { timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; \
78 } while (0)
79 #else
80 // We do not define this as empty since that produces a duplicate case label warning from clang-tidy
81 #define TCPLOG(pid, x) /* NOLINT(cppcoreguidelines-macro-usage) */ \
82 while (false) { \
83 cerr << x; /* NOLINT(bugprone-macro-parentheses) */ \
84 }
85 #endif
86
87 std::atomic<uint32_t> TCPConnection::s_currentConnections;
88
89 TCPConnection::TCPConnection(int fileDesc, const ComboAddress& addr) :
90 data(2, 0), d_remote(addr), d_fd(fileDesc)
91 {
92 ++s_currentConnections;
93 (*t_tcpClientCounts)[d_remote]++;
94 }
95
96 TCPConnection::~TCPConnection()
97 {
98 try {
99 if (closesocket(d_fd) < 0) {
100 SLOG(g_log << Logger::Error << "Error closing socket for TCPConnection" << endl,
101 g_slogtcpin->info(Logr::Error, "Error closing socket for TCPConnection"));
102 }
103 }
104 catch (const PDNSException& e) {
105 SLOG(g_log << Logger::Error << "Error closing TCPConnection socket: " << e.reason << endl,
106 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCPConnection socket", "exception", Logging::Loggable("PDNSException")));
107 }
108
109 if (t_tcpClientCounts->count(d_remote) != 0 && (*t_tcpClientCounts)[d_remote]-- == 0) {
110 t_tcpClientCounts->erase(d_remote);
111 }
112 --s_currentConnections;
113 }
114
115 static void terminateTCPConnection(int fileDesc)
116 {
117 try {
118 t_fdm->removeReadFD(fileDesc);
119 }
120 catch (const FDMultiplexerException& fde) {
121 }
122 }
123
124 static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& comboWriter, int rcode)
125 {
126 std::vector<uint8_t> packet;
127 if (comboWriter->d_mdp.d_header.qdcount == 0U) {
128 /* header-only */
129 packet.resize(sizeof(dnsheader));
130 }
131 else {
132 DNSPacketWriter packetWriter(packet, comboWriter->d_mdp.d_qname, comboWriter->d_mdp.d_qtype, comboWriter->d_mdp.d_qclass);
133 if (comboWriter->d_mdp.hasEDNS()) {
134 /* we try to add the EDNS OPT RR even for truncated answers,
135 as rfc6891 states:
136 "The minimal response MUST be the DNS header, question section, and an
137 OPT record. This MUST also occur when a truncated response (using
138 the DNS header's TC bit) is returned."
139 */
140 packetWriter.addOpt(512, 0, 0);
141 packetWriter.commit();
142 }
143 }
144
145 auto& header = reinterpret_cast<dnsheader&>(packet.at(0)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) safe cast
146 header.aa = 0;
147 header.ra = 1;
148 header.qr = 1;
149 header.tc = 0;
150 header.id = comboWriter->d_mdp.d_header.id;
151 header.rd = comboWriter->d_mdp.d_header.rd;
152 header.cd = comboWriter->d_mdp.d_header.cd;
153 header.rcode = rcode;
154
155 sendResponseOverTCP(comboWriter, packet);
156 }
157
158 void finishTCPReply(std::unique_ptr<DNSComboWriter>& comboWriter, bool hadError, bool updateInFlight)
159 {
160 // update tcp connection status, closing if needed and doing the fd multiplexer accounting
161 if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight > 0) {
162 comboWriter->d_tcpConnection->d_requestsInFlight--;
163 }
164
165 // In the code below, we try to remove the fd from the set, but
166 // we don't know if another mthread already did the remove, so we can get a
167 // "Tried to remove unlisted fd" exception. Not that an inflight < limit test
168 // will not work since we do not know if the other mthread got an error or not.
169 if (hadError) {
170 terminateTCPConnection(comboWriter->d_socket);
171 comboWriter->d_socket = -1;
172 return;
173 }
174 comboWriter->d_tcpConnection->queriesCount++;
175 if ((g_tcpMaxQueriesPerConn > 0 && comboWriter->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || (comboWriter->d_tcpConnection->isDropOnIdle() && comboWriter->d_tcpConnection->d_requestsInFlight == 0)) {
176 try {
177 t_fdm->removeReadFD(comboWriter->d_socket);
178 }
179 catch (FDMultiplexerException&) {
180 }
181 comboWriter->d_socket = -1;
182 return;
183 }
184
185 Utility::gettimeofday(&g_now, nullptr); // needs to be updated
186 struct timeval ttd = g_now;
187
188 // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
189 if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
190 // A read error might have happened. If we add the fd back, it will most likely error again.
191 // This is not a big issue, the next handleTCPClientReadable() will see another read error
192 // and take action.
193 ttd.tv_sec += g_tcpTimeout;
194 t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
195 return;
196 }
197 // fd might have been removed by read error code, or a read timeout, so expect an exception
198 try {
199 t_fdm->setReadTTD(comboWriter->d_socket, ttd, g_tcpTimeout);
200 }
201 catch (const FDMultiplexerException&) {
202 // but if the FD was removed because of a timeout while we were sending a response,
203 // we need to re-arm it. If it was an error it will error again.
204 ttd.tv_sec += g_tcpTimeout;
205 t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
206 }
207 }
208
209 /*
210 * A helper class that by default closes the incoming TCP connection on destruct
211 * If you want to keep the connection alive, call keep() on the guard object
212 */
213 class RunningTCPQuestionGuard
214 {
215 public:
216 RunningTCPQuestionGuard(const RunningTCPQuestionGuard&) = default;
217 RunningTCPQuestionGuard(RunningTCPQuestionGuard&&) = delete;
218 RunningTCPQuestionGuard& operator=(const RunningTCPQuestionGuard&) = default;
219 RunningTCPQuestionGuard& operator=(RunningTCPQuestionGuard&&) = delete;
220 RunningTCPQuestionGuard(int fileDesc) :
221 d_fd(fileDesc) {}
222 ~RunningTCPQuestionGuard()
223 {
224 if (d_fd != -1) {
225 terminateTCPConnection(d_fd);
226 d_fd = -1;
227 }
228 }
229 void keep()
230 {
231 d_fd = -1;
232 }
233 bool handleTCPReadResult(int /* fd */, ssize_t bytes)
234 {
235 if (bytes == 0) {
236 /* EOF */
237 return false;
238 }
239 if (bytes < 0) {
240 if (errno != EAGAIN && errno != EWOULDBLOCK) {
241 return false;
242 }
243 }
244 keep();
245 return true;
246 }
247
248 private:
249 int d_fd{-1};
250 };
251
252 static void handleNotify(std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname)
253 {
254 if (!t_allowNotifyFrom || !t_allowNotifyFrom->match(comboWriter->d_mappedSource)) {
255 if (!g_quiet) {
256 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", address not matched by allow-notify-from" << endl,
257 g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, address not matched by allow-notify-from", "source", Logging::Loggable(comboWriter->d_mappedSource)));
258 }
259
260 t_Counters.at(rec::Counter::sourceDisallowedNotify)++;
261 return;
262 }
263
264 if (!isAllowNotifyForZone(qname)) {
265 if (!g_quiet) {
266 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", for " << qname.toLogString() << ", zone not matched by allow-notify-for" << endl,
267 g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, zone not matched by allow-notify-for", "source", Logging::Loggable(comboWriter->d_mappedSource), "zone", Logging::Loggable(qname)));
268 }
269
270 t_Counters.at(rec::Counter::zoneDisallowedNotify)++;
271 return;
272 }
273 }
274
275 static void doProtobufLogQuery(bool logQuery, LocalStateHolder<LuaConfigItems>& luaconfsLocal, const std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname, QType qtype, QClass qclass, const dnsheader* dnsheader, const shared_ptr<TCPConnection>& conn)
276 {
277 try {
278 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && comboWriter->d_policyTags.empty())) {
279 protobufLogQuery(luaconfsLocal, comboWriter->d_uuid, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet.source, true, dnsheader->id, conn->qlen, qname, qtype, qclass, comboWriter->d_policyTags, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta);
280 }
281 }
282 catch (const std::exception& e) {
283 if (g_logCommonErrors) {
284 SLOG(g_log << Logger::Warning << "Error parsing a TCP query packet for edns subnet: " << e.what() << endl,
285 g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a TCP query packet for edns subnet", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote)));
286 }
287 }
288 }
289
290 static void doProcessTCPQuestion(std::unique_ptr<DNSComboWriter>& comboWriter, shared_ptr<TCPConnection>& conn, RunningTCPQuestionGuard& tcpGuard, int fileDesc)
291 {
292 RecThreadInfo::self().incNumberOfDistributedQueries();
293 struct timeval start
294 {
295 };
296 Utility::gettimeofday(&start, nullptr);
297
298 DNSName qname;
299 uint16_t qtype = 0;
300 uint16_t qclass = 0;
301 bool needECS = false;
302 string requestorId;
303 string deviceId;
304 string deviceName;
305 bool logQuery = false;
306 bool qnameParsed = false;
307
308 comboWriter->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled != 0);
309 comboWriter->d_eventTrace.add(RecEventTrace::ReqRecv);
310 auto luaconfsLocal = g_luaconfs.getLocal();
311 if (checkProtobufExport(luaconfsLocal)) {
312 needECS = true;
313 }
314 logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
315 comboWriter->d_logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
316
317 if (needECS || (t_pdl && (t_pdl->hasGettagFFIFunc() || t_pdl->hasGettagFunc())) || comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
318
319 try {
320 EDNSOptionViewMap ednsOptions;
321 comboWriter->d_ecsParsed = true;
322 comboWriter->d_ecsFound = false;
323 getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
324 comboWriter->d_ecsFound, &comboWriter->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
325 qnameParsed = true;
326
327 if (t_pdl) {
328 try {
329 if (t_pdl->hasGettagFFIFunc()) {
330 RecursorLua4::FFIParams params(qname, qtype, comboWriter->d_destination, comboWriter->d_source, comboWriter->d_ednssubnet.source, comboWriter->d_data, comboWriter->d_policyTags, comboWriter->d_records, ednsOptions, comboWriter->d_proxyProtocolValues, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_rcode, comboWriter->d_ttlCap, comboWriter->d_variable, true, logQuery, comboWriter->d_logResponse, comboWriter->d_followCNAMERecords, comboWriter->d_extendedErrorCode, comboWriter->d_extendedErrorExtra, comboWriter->d_responsePaddingDisabled, comboWriter->d_meta);
331 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
332 comboWriter->d_tag = t_pdl->gettag_ffi(params);
333 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, comboWriter->d_tag, false);
334 }
335 else if (t_pdl->hasGettagFunc()) {
336 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag);
337 comboWriter->d_tag = t_pdl->gettag(comboWriter->d_source, comboWriter->d_ednssubnet.source, comboWriter->d_destination, qname, qtype, &comboWriter->d_policyTags, comboWriter->d_data, ednsOptions, true, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_proxyProtocolValues);
338 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag, comboWriter->d_tag, false);
339 }
340 }
341 catch (const std::exception& e) {
342 if (g_logCommonErrors) {
343 SLOG(g_log << Logger::Warning << "Error parsing a query packet qname='" << qname << "' for tag determination, setting tag=0: " << e.what() << endl,
344 g_slogtcpin->info(Logr::Warning, "Error parsing a query packet for tag determination, setting tag=0", "remote", Logging::Loggable(conn->d_remote), "qname", Logging::Loggable(qname)));
345 }
346 }
347 }
348 }
349 catch (const std::exception& e) {
350 if (g_logCommonErrors) {
351 SLOG(g_log << Logger::Warning << "Error parsing a query packet for tag determination, setting tag=0: " << e.what() << endl,
352 g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a query packet for tag determination, setting tag=0", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote)));
353 }
354 }
355 }
356
357 if (comboWriter->d_tag == 0 && !comboWriter->d_responsePaddingDisabled && g_paddingFrom.match(comboWriter->d_remote)) {
358 comboWriter->d_tag = g_paddingTag;
359 }
360
361 const dnsheader_aligned headerdata(conn->data.data());
362 const struct dnsheader* dnsheader = headerdata.get();
363
364 if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
365 comboWriter->d_requestorId = std::move(requestorId);
366 comboWriter->d_deviceId = std::move(deviceId);
367 comboWriter->d_deviceName = std::move(deviceName);
368 comboWriter->d_uuid = getUniqueID();
369 }
370
371 if (t_protobufServers.servers) {
372 doProtobufLogQuery(logQuery, luaconfsLocal, comboWriter, qname, qtype, qclass, dnsheader, conn);
373 }
374
375 if (t_pdl) {
376 bool ipf = t_pdl->ipfilter(comboWriter->d_source, comboWriter->d_destination, *dnsheader, comboWriter->d_eventTrace);
377 if (ipf) {
378 if (!g_quiet) {
379 SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] DROPPED TCP question from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl,
380 g_slogtcpin->info(Logr::Info, "Dropped TCP question based on policy", "remote", Logging::Loggable(conn->d_remote), "source", Logging::Loggable(comboWriter->d_source)));
381 }
382 t_Counters.at(rec::Counter::policyDrops)++;
383 return;
384 }
385 }
386
387 if (comboWriter->d_mdp.d_header.qr) {
388 t_Counters.at(rec::Counter::ignoredCount)++;
389 if (g_logCommonErrors) {
390 SLOG(g_log << Logger::Error << "Ignoring answer from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
391 g_slogtcpin->info(Logr::Error, "Ignoring answer from TCP client on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
392 }
393 return;
394 }
395 if (comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Query) && comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Notify)) {
396 t_Counters.at(rec::Counter::ignoredCount)++;
397 if (g_logCommonErrors) {
398 SLOG(g_log << Logger::Error << "Ignoring unsupported opcode " << Opcode::to_s(comboWriter->d_mdp.d_header.opcode) << " from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
399 g_slogtcpin->info(Logr::Error, "Ignoring unsupported opcode from TCP client", "remote", Logging::Loggable(comboWriter->getRemote()), "opcode", Logging::Loggable(Opcode::to_s(comboWriter->d_mdp.d_header.opcode))));
400 }
401 sendErrorOverTCP(comboWriter, RCode::NotImp);
402 tcpGuard.keep();
403 return;
404 }
405 if (dnsheader->qdcount == 0U) {
406 t_Counters.at(rec::Counter::emptyQueriesCount)++;
407 if (g_logCommonErrors) {
408 SLOG(g_log << Logger::Error << "Ignoring empty (qdcount == 0) query from " << comboWriter->getRemote() << " on server socket!" << endl,
409 g_slogtcpin->info(Logr::Error, "Ignoring empty (qdcount == 0) query on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
410 }
411 sendErrorOverTCP(comboWriter, RCode::NotImp);
412 tcpGuard.keep();
413 return;
414 }
415 {
416 // We have read a proper query
417 ++t_Counters.at(rec::Counter::qcounter);
418 ++t_Counters.at(rec::Counter::tcpqcounter);
419 if (comboWriter->d_source.sin4.sin_family == AF_INET6) {
420 ++t_Counters.at(rec::Counter::ipv6qcounter);
421 }
422
423 if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
424 handleNotify(comboWriter, qname);
425 }
426
427 string response;
428 RecursorPacketCache::OptPBData pbData{boost::none};
429
430 if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Query)) {
431 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
432 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
433 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
434 comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck);
435 bool cacheHit = checkForCacheHit(qnameParsed, comboWriter->d_tag, conn->data, qname, qtype, qclass, g_now, response, comboWriter->d_qhash, pbData, true, comboWriter->d_source, comboWriter->d_mappedSource);
436 comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
437
438 if (cacheHit) {
439 if (!g_quiet) {
440 SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " TCP question answered from packet cache tag=" << comboWriter->d_tag << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
441 g_slogtcpin->info(Logr::Notice, "TCP question answered from packet cache", "tag", Logging::Loggable(comboWriter->d_tag),
442 "qname", Logging::Loggable(qname), "qtype", Logging::Loggable(QType(qtype)),
443 "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
444 }
445
446 bool hadError = sendResponseOverTCP(comboWriter, response);
447 finishTCPReply(comboWriter, hadError, false);
448 struct timeval now
449 {
450 };
451 Utility::gettimeofday(&now, nullptr);
452 uint64_t spentUsec = uSec(now - start);
453 t_Counters.at(rec::Histogram::cumulativeAnswers)(spentUsec);
454 comboWriter->d_eventTrace.add(RecEventTrace::AnswerSent);
455
456 if (t_protobufServers.servers && comboWriter->d_logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || !pbData || pbData->d_tagged)) {
457 struct timeval tval
458 {
459 0, 0
460 };
461 protobufLogResponse(dnsheader, luaconfsLocal, pbData, tval, true, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet, comboWriter->d_uuid, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, comboWriter->d_eventTrace, comboWriter->d_policyTags);
462 }
463
464 if (comboWriter->d_eventTrace.enabled() && (SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) != 0) {
465 SLOG(g_log << Logger::Info << comboWriter->d_eventTrace.toString() << endl,
466 g_slogtcpin->info(Logr::Info, comboWriter->d_eventTrace.toString())); // More fancy?
467 }
468 tcpGuard.keep();
469 t_Counters.updateSnap(g_regressionTestMode);
470 return;
471 } // cache hit
472 } // query opcode
473
474 if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
475 if (!g_quiet) {
476 SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
477 g_slogtcpin->info(Logr::Notice, "Got NOTIFY", "qname", Logging::Loggable(qname), "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
478 }
479
480 requestWipeCaches(qname);
481
482 // the operation will now be treated as a Query, generating
483 // a normal response, as the rest of the code does not
484 // check dh->opcode, but we need to ensure that the response
485 // to this request does not get put into the packet cache
486 comboWriter->d_variable = true;
487 }
488
489 // setup for startDoResolve() in an mthread
490 ++conn->d_requestsInFlight;
491 if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
492 t_fdm->removeReadFD(fileDesc); // should no longer awake ourselves when there is data to read
493 }
494 else {
495 Utility::gettimeofday(&g_now, nullptr); // needed?
496 struct timeval ttd = g_now;
497 t_fdm->setReadTTD(fileDesc, ttd, g_tcpTimeout);
498 }
499 tcpGuard.keep();
500 g_multiTasker->makeThread(startDoResolve, comboWriter.release()); // deletes dc
501 } // good query
502 }
503
504 static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var)
505 {
506 auto conn = boost::any_cast<shared_ptr<TCPConnection>>(var);
507
508 RunningTCPQuestionGuard tcpGuard{fileDesc};
509
510 if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
511 ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
512 if (bytes <= 0) {
513 tcpGuard.handleTCPReadResult(fileDesc, bytes);
514 return;
515 }
516
517 conn->proxyProtocolGot += bytes;
518 conn->data.resize(conn->proxyProtocolGot);
519 ssize_t remaining = isProxyHeaderComplete(conn->data);
520 if (remaining == 0) {
521 if (g_logCommonErrors) {
522 SLOG(g_log << Logger::Error << "Unable to consume proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
523 g_slogtcpin->info(Logr::Error, "Unable to consume proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
524 }
525 ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
526 return;
527 }
528 if (remaining < 0) {
529 conn->proxyProtocolNeed = -remaining;
530 conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
531 tcpGuard.keep();
532 return;
533 }
534 {
535 /* proxy header received */
536 /* we ignore the TCP field for now, but we could properly set whether
537 the connection was received over UDP or TCP if needed */
538 bool tcp = false;
539 bool proxy = false;
540 size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
541 if (used <= 0) {
542 if (g_logCommonErrors) {
543 SLOG(g_log << Logger::Error << "Unable to parse proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
544 g_slogtcpin->info(Logr::Error, "Unable to parse proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
545 }
546 ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
547 return;
548 }
549 if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
550 if (g_logCommonErrors) {
551 SLOG(g_log << Logger::Error << "Proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping" << endl,
552 g_slogtcpin->info(Logr::Error, "Proxy protocol header in packet from TCP client is larger than proxy-protocol-maximum-size", "remote", Logging::Loggable(conn->d_remote), "size", Logging::Loggable(used)));
553 }
554 ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
555 return;
556 }
557
558 /* Now that we have retrieved the address of the client, as advertised by the proxy
559 via the proxy protocol header, check that it is allowed by our ACL */
560 /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
561 conn->d_mappedSource = conn->d_source;
562 if (t_proxyMapping) {
563 if (const auto* iter = t_proxyMapping->lookup(conn->d_source)) {
564 conn->d_mappedSource = iter->second.address;
565 ++iter->second.stats.netmaskMatches;
566 }
567 }
568 if (t_allowFrom && !t_allowFrom->match(&conn->d_mappedSource)) {
569 if (!g_quiet) {
570 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << conn->d_mappedSource.toString() << ", address not matched by allow-from" << endl,
571 g_slogtcpin->info(Logr::Error, "Dropping TCP query, address not matched by allow-from", "remote", Logging::Loggable(conn->d_remote)));
572 }
573
574 ++t_Counters.at(rec::Counter::unauthorizedTCP);
575 return;
576 }
577
578 conn->data.resize(2);
579 conn->state = TCPConnection::BYTE0;
580 }
581 }
582
583 if (conn->state == TCPConnection::BYTE0) {
584 ssize_t bytes = recv(conn->getFD(), conn->data.data(), 2, 0);
585 if (bytes == 1) {
586 conn->state = TCPConnection::BYTE1;
587 }
588 if (bytes == 2) {
589 conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
590 conn->data.resize(conn->qlen);
591 conn->bytesread = 0;
592 conn->state = TCPConnection::GETQUESTION;
593 }
594 if (bytes <= 0) {
595 tcpGuard.handleTCPReadResult(fileDesc, bytes);
596 return;
597 }
598 }
599
600 if (conn->state == TCPConnection::BYTE1) {
601 ssize_t bytes = recv(conn->getFD(), &conn->data[1], 1, 0);
602 if (bytes == 1) {
603 conn->state = TCPConnection::GETQUESTION;
604 conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
605 conn->data.resize(conn->qlen);
606 conn->bytesread = 0;
607 }
608 if (bytes <= 0) {
609 if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
610 if (g_logCommonErrors) {
611 SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected after first byte" << endl,
612 g_slogtcpin->info(Logr::Error, "TCP client disconnected after first byte", "remote", Logging::Loggable(conn->d_remote)));
613 }
614 }
615 return;
616 }
617 }
618
619 if (conn->state == TCPConnection::GETQUESTION) {
620 ssize_t bytes = recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
621 if (bytes <= 0) {
622 if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
623 if (g_logCommonErrors) {
624 SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected while reading question body" << endl,
625 g_slogtcpin->info(Logr::Error, "TCP client disconnected while reading question body", "remote", Logging::Loggable(conn->d_remote)));
626 }
627 }
628 return;
629 }
630 if (bytes > std::numeric_limits<std::uint16_t>::max()) {
631 if (g_logCommonErrors) {
632 SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " sent an invalid question size while reading question body" << endl,
633 g_slogtcpin->info(Logr::Error, "TCP client sent an invalid question size while reading question body", "remote", Logging::Loggable(conn->d_remote)));
634 }
635 return;
636 }
637 conn->bytesread += (uint16_t)bytes;
638 if (conn->bytesread == conn->qlen) {
639 conn->state = TCPConnection::BYTE0;
640 std::unique_ptr<DNSComboWriter> comboWriter;
641 try {
642 comboWriter = std::make_unique<DNSComboWriter>(conn->data, g_now, t_pdl);
643 }
644 catch (const MOADNSException& mde) {
645 t_Counters.at(rec::Counter::clientParseError)++;
646 if (g_logCommonErrors) {
647 SLOG(g_log << Logger::Error << "Unable to parse packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
648 g_slogtcpin->info(Logr::Error, "Unable to parse packet from TCP client", "remte", Logging::Loggable(conn->d_remote)));
649 }
650 return;
651 }
652
653 comboWriter->d_tcpConnection = conn; // carry the torch
654 comboWriter->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
655 comboWriter->d_tcp = true;
656 comboWriter->setRemote(conn->d_remote); // the address the query was received from
657 comboWriter->setSource(conn->d_source); // the address we assume the query is coming from, might be set by proxy protocol
658 ComboAddress dest;
659 dest.reset();
660 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
661 socklen_t len = dest.getSocklen();
662 getsockname(conn->getFD(), reinterpret_cast<sockaddr*>(&dest), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
663 comboWriter->setLocal(dest); // the address we received the query on
664 comboWriter->setDestination(conn->d_destination); // the address we assume the query is received on, might be set by proxy protocol
665 comboWriter->setMappedSource(conn->d_mappedSource); // the address we assume the query is coming from after table based mapping
666 /* we can't move this if we want to be able to access the values in
667 all queries sent over this connection */
668 comboWriter->d_proxyProtocolValues = conn->proxyProtocolValues;
669
670 doProcessTCPQuestion(comboWriter, conn, tcpGuard, fileDesc);
671 } // reading query
672 }
673 // more to come
674 tcpGuard.keep();
675 }
676
677 //! Handle new incoming TCP connection
678 void handleNewTCPQuestion(int fileDesc, [[maybe_unused]] FDMultiplexer::funcparam_t& var)
679 {
680 ComboAddress addr;
681 socklen_t addrlen = sizeof(addr);
682 int newsock = accept(fileDesc, reinterpret_cast<struct sockaddr*>(&addr), &addrlen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
683 if (newsock >= 0) {
684 if (g_multiTasker->numProcesses() > g_maxMThreads) {
685 t_Counters.at(rec::Counter::overCapacityDrops)++;
686 try {
687 closesocket(newsock);
688 }
689 catch (const PDNSException& e) {
690 SLOG(g_log << Logger::Error << "Error closing TCP socket after an over capacity drop: " << e.reason << endl,
691 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCP socket after an over capacity drop", "exception", Logging::Loggable("PDNSException")));
692 }
693 return;
694 }
695
696 if (t_remotes) {
697 t_remotes->push_back(addr);
698 }
699
700 ComboAddress destaddr;
701 socklen_t len = sizeof(destaddr);
702 getsockname(newsock, reinterpret_cast<sockaddr*>(&destaddr), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
703 bool fromProxyProtocolSource = expectProxyProtocol(addr, destaddr);
704 ComboAddress mappedSource = addr;
705 if (!fromProxyProtocolSource && t_proxyMapping) {
706 if (const auto* iter = t_proxyMapping->lookup(addr)) {
707 mappedSource = iter->second.address;
708 ++iter->second.stats.netmaskMatches;
709 }
710 }
711 if (!fromProxyProtocolSource && t_allowFrom && !t_allowFrom->match(&mappedSource)) {
712 if (!g_quiet) {
713 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << mappedSource.toString() << ", address neither matched by allow-from nor proxy-protocol-from" << endl,
714 g_slogtcpin->info(Logr::Error, "dropping TCP query address neither matched by allow-from nor proxy-protocol-from", "source", Logging::Loggable(mappedSource)));
715 }
716 t_Counters.at(rec::Counter::unauthorizedTCP)++;
717 try {
718 closesocket(newsock);
719 }
720 catch (const PDNSException& e) {
721 SLOG(g_log << Logger::Error << "Error closing TCP socket after an ACL drop: " << e.reason << endl,
722 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCP socket after an ACL drop", "exception", Logging::Loggable("PDNSException")));
723 }
724 return;
725 }
726
727 if (g_maxTCPPerClient > 0 && t_tcpClientCounts->count(addr) > 0 && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
728 t_Counters.at(rec::Counter::tcpClientOverflow)++;
729 try {
730 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
731 }
732 catch (const PDNSException& e) {
733 SLOG(g_log << Logger::Error << "Error closing TCP socket after an overflow drop: " << e.reason << endl,
734 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCP socket after an overflow drop", "exception", Logging::Loggable("PDNSException")));
735 }
736 return;
737 }
738
739 setNonBlocking(newsock);
740 setTCPNoDelay(newsock);
741 std::shared_ptr<TCPConnection> tcpConn = std::make_shared<TCPConnection>(newsock, addr);
742 tcpConn->d_source = addr;
743 tcpConn->d_destination = destaddr;
744 tcpConn->d_mappedSource = mappedSource;
745
746 if (fromProxyProtocolSource) {
747 tcpConn->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
748 tcpConn->data.resize(tcpConn->proxyProtocolNeed);
749 tcpConn->state = TCPConnection::PROXYPROTOCOLHEADER;
750 }
751 else {
752 tcpConn->state = TCPConnection::BYTE0;
753 }
754
755 struct timeval ttd
756 {
757 };
758 Utility::gettimeofday(&ttd, nullptr);
759 ttd.tv_sec += g_tcpTimeout;
760
761 t_fdm->addReadFD(tcpConn->getFD(), handleRunningTCPQuestion, tcpConn, &ttd);
762 }
763 }
764
765 static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var);
766
767 static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
768 {
769 TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
770
771 pid->lowState = newstate;
772
773 // handle state transitions
774 switch (oldstate) {
775 case IOState::NeedRead:
776
777 switch (newstate) {
778 case IOState::NeedWrite:
779 TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
780 t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
781 break;
782 case IOState::NeedRead:
783 break;
784 case IOState::Done:
785 TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
786 t_fdm->removeReadFD(pid->tcpsock);
787 break;
788 case IOState::Async:
789 throw std::runtime_error("TLS async mode not supported");
790 break;
791 }
792 break;
793
794 case IOState::NeedWrite:
795
796 switch (newstate) {
797 case IOState::NeedRead:
798 TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
799 t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
800 break;
801 case IOState::NeedWrite:
802 break;
803 case IOState::Done:
804 TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
805 t_fdm->removeWriteFD(pid->tcpsock);
806 break;
807 case IOState::Async:
808 throw std::runtime_error("TLS async mode not supported");
809 break;
810 }
811 break;
812
813 case IOState::Done:
814 switch (newstate) {
815 case IOState::NeedRead:
816 TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
817 t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
818 break;
819 case IOState::NeedWrite:
820 TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
821 t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
822 break;
823 case IOState::Done:
824 break;
825 case IOState::Async:
826 throw std::runtime_error("TLS async mode not supported");
827 break;
828 }
829 break;
830
831 case IOState::Async:
832 throw std::runtime_error("TLS async mode not supported");
833 break;
834 }
835 }
836
837 static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var)
838 {
839 auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
840 assert(pid->tcphandler); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay): def off assert triggers it
841 assert(fileDesc == pid->tcphandler->getDescriptor()); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay) idem
842 IOState newstate = IOState::Done;
843
844 TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
845
846 // In the code below, we want to update the state of the fd before calling sendEvent
847 // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
848
849 switch (pid->highState) {
850 case TCPAction::DoingRead:
851 TCPLOG(pid->tcpsock, "highState: Reading" << endl);
852 // In arecvtcp, the buffer was resized already so inWanted bytes will fit
853 // try reading
854 try {
855 newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
856 switch (newstate) {
857 case IOState::Done:
858 case IOState::NeedRead:
859 TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
860 TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
861 if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
862 pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
863 newstate = IOState::Done;
864 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
865 g_multiTasker->sendEvent(pid, &pid->inMSG);
866 return;
867 }
868 break;
869 case IOState::NeedWrite:
870 break;
871 case IOState::Async:
872 throw std::runtime_error("TLS async mode not supported");
873 break;
874 }
875 }
876 catch (const std::exception& e) {
877 newstate = IOState::Done;
878 TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
879 PacketBuffer empty;
880 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
881 g_multiTasker->sendEvent(pid, &empty); // this conveys error status
882 return;
883 }
884 break;
885
886 case TCPAction::DoingWrite:
887 TCPLOG(pid->tcpsock, "highState: Writing" << endl);
888 try {
889 TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
890 newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
891 TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
892 switch (newstate) {
893 case IOState::Done: {
894 TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
895 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
896 g_multiTasker->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
897 return;
898 }
899 case IOState::NeedRead:
900 TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
901 break;
902 case IOState::NeedWrite:
903 TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
904 break;
905 case IOState::Async:
906 throw std::runtime_error("TLS async mode not supported");
907 break;
908 }
909 }
910 catch (const std::exception& e) {
911 newstate = IOState::Done;
912 TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
913 PacketBuffer sent;
914 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
915 g_multiTasker->sendEvent(pid, &sent); // we convey error status by sending empty string
916 return;
917 }
918 break;
919 }
920
921 // Cases that did not end up doing a sendEvent
922 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
923 }
924
925 void checkFastOpenSysctl([[maybe_unused]] bool active, [[maybe_unused]] Logr::log_t log)
926 {
927 #ifdef __linux__
928 string line;
929 if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
930 int flag = std::stoi(line);
931 if (active && !(flag & 1)) {
932 SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
933 log->info(Logr::Error, "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it"));
934 }
935 if (!active && !(flag & 2)) {
936 SLOG(g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
937 log->info(Logr::Error, "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it"));
938 }
939 }
940 else {
941 SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
942 log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
943 }
944 #else
945 SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
946 log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
947 #endif
948 }
949
950 void checkTFOconnect(Logr::log_t log)
951 {
952 try {
953 Socket socket(AF_INET, SOCK_STREAM);
954 socket.setNonBlocking();
955 socket.setFastOpenConnect();
956 }
957 catch (const NetworkError& e) {
958 SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl,
959 log->error(Logr::Error, e.what(), "tcp-fast-open-connect enabled but returned error"));
960 }
961 }
962
963 LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
964 {
965 TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
966
967 auto pident = std::make_shared<PacketID>();
968 pident->tcphandler = handler;
969 pident->tcpsock = handler->getDescriptor();
970 pident->outMSG = data;
971 pident->highState = TCPAction::DoingWrite;
972
973 IOState state = IOState::Done;
974 try {
975 TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
976 state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
977 TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
978
979 if (state == IOState::Done) {
980 TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
981 return LWResult::Result::Success;
982 }
983 }
984 catch (const std::exception& e) {
985 TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
986 return LWResult::Result::PermanentError;
987 }
988
989 // Will set pident->lowState
990 TCPIOHandlerStateChange(IOState::Done, state, pident);
991
992 PacketBuffer packet;
993 int ret = g_multiTasker->waitEvent(pident, &packet, g_networkTimeoutMsec);
994 TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
995 if (ret == 0) {
996 TCPLOG(pident->tcpsock, "timeout" << endl);
997 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
998 return LWResult::Result::Timeout;
999 }
1000 if (ret == -1) { // error
1001 TCPLOG(pident->tcpsock, "PermanentError" << endl);
1002 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
1003 return LWResult::Result::PermanentError;
1004 }
1005 if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
1006 // fd housekeeping done by TCPIOHandlerIO
1007 TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
1008 return LWResult::Result::PermanentError;
1009 }
1010
1011 TCPLOG(pident->tcpsock, "asendtcp success" << endl);
1012 return LWResult::Result::Success;
1013 }
1014
1015 LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
1016 {
1017 TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
1018 data.resize(len);
1019
1020 // We might have data already available from the TLS layer, try to get that into the buffer
1021 size_t pos = 0;
1022 IOState state = IOState::Done;
1023 try {
1024 TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
1025 state = handler->tryRead(data, pos, len);
1026 TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
1027 switch (state) {
1028 case IOState::Done:
1029 case IOState::NeedRead:
1030 if (pos == len || (incompleteOkay && pos > 0)) {
1031 data.resize(pos);
1032 TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
1033 return LWResult::Result::Success;
1034 }
1035 break;
1036 case IOState::NeedWrite:
1037 break;
1038 case IOState::Async:
1039 throw std::runtime_error("TLS async mode not supported");
1040 break;
1041 }
1042 }
1043 catch (const std::exception& e) {
1044 TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
1045 return LWResult::Result::PermanentError;
1046 }
1047
1048 auto pident = std::make_shared<PacketID>();
1049 pident->tcphandler = handler;
1050 pident->tcpsock = handler->getDescriptor();
1051 // We might have a partial result
1052 pident->inMSG = std::move(data);
1053 pident->inPos = pos;
1054 pident->inWanted = len;
1055 pident->inIncompleteOkay = incompleteOkay;
1056 pident->highState = TCPAction::DoingRead;
1057
1058 data.clear();
1059
1060 // Will set pident->lowState
1061 TCPIOHandlerStateChange(IOState::Done, state, pident);
1062
1063 int ret = g_multiTasker->waitEvent(pident, &data, g_networkTimeoutMsec);
1064 TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ');
1065 if (ret == 0) {
1066 TCPLOG(pident->tcpsock, "timeout" << endl);
1067 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
1068 return LWResult::Result::Timeout;
1069 }
1070 if (ret == -1) {
1071 TCPLOG(pident->tcpsock, "PermanentError" << endl);
1072 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
1073 return LWResult::Result::PermanentError;
1074 }
1075 if (data.empty()) { // error, EOF or other
1076 // fd housekeeping done by TCPIOHandlerIO
1077 TCPLOG(pident->tcpsock, "EOF" << endl);
1078 return LWResult::Result::PermanentError;
1079 }
1080
1081 TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
1082 return LWResult::Result::Success;
1083 }
1084
1085 void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets, Logr::log_t log)
1086 {
1087 vector<string> localAddresses;
1088 stringtok(localAddresses, ::arg()["local-address"], " ,");
1089
1090 if (localAddresses.empty()) {
1091 throw PDNSException("No local address specified");
1092 }
1093
1094 #ifdef TCP_DEFER_ACCEPT
1095 auto first = true;
1096 #endif
1097 const uint16_t defaultLocalPort = ::arg().asNum("local-port");
1098 for (const auto& localAddress : localAddresses) {
1099 ComboAddress address{localAddress, defaultLocalPort};
1100 const int socketFd = socket(address.sin6.sin6_family, SOCK_STREAM, 0);
1101 if (socketFd < 0) {
1102 throw PDNSException("Making a TCP server socket for resolver: " + stringerror());
1103 }
1104
1105 setCloseOnExec(socketFd);
1106
1107 int tmp = 1;
1108 if (setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp) < 0) {
1109 int err = errno;
1110 SLOG(g_log << Logger::Error << "Setsockopt failed for TCP listening socket" << endl,
1111 log->error(Logr::Critical, err, "Setsockopt failed for TCP listening socket"));
1112 _exit(1);
1113 }
1114 if (address.sin6.sin6_family == AF_INET6 && setsockopt(socketFd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
1115 int err = errno;
1116 SLOG(g_log << Logger::Error << "Failed to set IPv6 socket to IPv6 only, continuing anyhow: " << stringerror(err) << endl,
1117 log->error(Logr::Warning, err, "Failed to set IPv6 socket to IPv6 only, continuing anyhow"));
1118 }
1119
1120 #ifdef TCP_DEFER_ACCEPT
1121 if (setsockopt(socketFd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
1122 if (first) {
1123 SLOG(g_log << Logger::Info << "Enabled TCP data-ready filter for (slight) DoS protection" << endl,
1124 log->info(Logr::Info, "Enabled TCP data-ready filter for (slight) DoS protection"));
1125 }
1126 }
1127 #endif
1128
1129 if (::arg().mustDo("non-local-bind")) {
1130 Utility::setBindAny(AF_INET, socketFd);
1131 }
1132
1133 if (g_reusePort) {
1134 #if defined(SO_REUSEPORT_LB)
1135 try {
1136 SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
1137 }
1138 catch (const std::exception& e) {
1139 throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
1140 }
1141 #elif defined(SO_REUSEPORT)
1142 try {
1143 SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, 1);
1144 }
1145 catch (const std::exception& e) {
1146 throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
1147 }
1148 #endif
1149 }
1150
1151 if (SyncRes::s_tcp_fast_open > 0) {
1152 checkFastOpenSysctl(false, log);
1153 #ifdef TCP_FASTOPEN
1154 if (setsockopt(socketFd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
1155 int err = errno;
1156 SLOG(g_log << Logger::Error << "Failed to enable TCP Fast Open for listening socket: " << stringerror(err) << endl,
1157 log->error(Logr::Error, err, "Failed to enable TCP Fast Open for listening socket"));
1158 }
1159 #else
1160 SLOG(g_log << Logger::Warning << "TCP Fast Open configured but not supported for listening socket" << endl,
1161 log->info(Logr::Warning, "TCP Fast Open configured but not supported for listening socket"));
1162 #endif
1163 }
1164
1165 socklen_t socklen = address.sin4.sin_family == AF_INET ? sizeof(address.sin4) : sizeof(address.sin6);
1166 if (::bind(socketFd, reinterpret_cast<struct sockaddr*>(&address), socklen) < 0) { // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
1167 throw PDNSException("Binding TCP server socket for " + address.toStringWithPort() + ": " + stringerror());
1168 }
1169
1170 setNonBlocking(socketFd);
1171 try {
1172 setSocketSendBuffer(socketFd, 65000);
1173 }
1174 catch (const std::exception& e) {
1175 SLOG(g_log << Logger::Error << e.what() << endl,
1176 log->error(Logr::Error, e.what(), "Exception while setting socket send buffer"));
1177 }
1178
1179 listen(socketFd, 128);
1180 deferredAdds.emplace_back(socketFd, handleNewTCPQuestion);
1181 tcpSockets.insert(socketFd);
1182
1183 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
1184 // - fd is not that which we know here, but returned from accept()
1185 SLOG(g_log << Logger::Info << "Listening for TCP queries on " << address.toStringWithPort() << endl,
1186 log->info(Logr::Info, "Listening for queries", "protocol", Logging::Loggable("TCP"), "address", Logging::Loggable(address)));
1187
1188 #ifdef TCP_DEFER_ACCEPT
1189 first = false;
1190 #endif
1191 }
1192 }