]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/recursordist/rec-tcp.cc
Merge pull request #14018 from omoerbeek/rec-proxy-exception
[thirdparty/pdns.git] / pdns / recursordist / rec-tcp.cc
CommitLineData
bc7a7b24
OM
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22
23#include "rec-main.hh"
24
25#include "arguments.hh"
26#include "logger.hh"
27#include "mplexer.hh"
28#include "uuid-utils.hh"
29
dbe7b146
OM
30// OLD PRE 5.0.0 situation:
31//
84b183ca 32// When pdns-distributes-queries is false with reuseport true (the default since 4.9.0), TCP queries
f30f6502
OM
33// are read and handled by worker threads. If the kernel balancing is OK for TCP sockets (observed
34// to be good on Debian bullseye, but not good on e.g. MacOS), the TCP handling is no extra burden.
35// In the case of MacOS all incoming TCP queries are handled by a single worker, while incoming UDP
36// queries do get distributed round-robin over the worker threads. Do note the TCP queries might
37// need to wait until the g_maxUDPQueriesPerRound is reached.
38//
dbe7b146 39// In the case of pdns-distributes-queries true and reuseport false the queries were read and
f30f6502
OM
40// initially processed by the distributor thread(s).
41//
42// Initial processing consist of parsing, calling gettag and checking if we have a packet cache
43// hit. If that does not produce a hit, the query is passed to an mthread in the same way as with
44// UDP queries, but do note that the mthread processing is serviced by the distributor thread. The
45// final answer will be sent by the same distributor thread that originally picked up the query.
46//
47// Changing this, and having incoming TCP queries handled by worker threads is somewhat more complex
dbe7b146 48// than UDP, as the socket must remain available in the distributor thread (for reading more
f30f6502
OM
49// queries), but the TCP socket must also be passed to a worker thread so it can write its
50// answer. The in-flight bookkeeping also has to be aware of how a query is handled to do the
51// accounting properly. I am not sure if changing the current setup is worth all this trouble,
84b183ca 52// especially since the default is now to not use pdns-distributes-queries, which works well in many
f30f6502
OM
53// cases.
54//
dbe7b146
OM
55// NEW SITUATION SINCE 5.0.0:
56//
f30f6502
OM
57// The drawback mentioned in https://github.com/PowerDNS/pdns/issues/8394 are not longer true, so an
58// alternative approach would be to introduce dedicated TCP worker thread(s).
48ec0d7e 59//
dbe7b146
OM
60// This approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor and
61// worker thread(s) now no longer process TCP queries.
48ec0d7e 62
bc7a7b24
OM
63size_t g_tcpMaxQueriesPerConn;
64unsigned int g_maxTCPPerClient;
65int g_tcpTimeout;
66bool g_anyToTcp;
67
68uint16_t TCPConnection::s_maxInFlight;
bc7a7b24 69
bc7a7b24
OM
70thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
71
c0db60c3 72static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var);
bc7a7b24
OM
73
74#if 0
8c1cb8aa
OM
75#define TCPLOG(tcpsock, x) \
76 do { \
77 cerr << []() { timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; \
78 } while (0)
bc7a7b24 79#else
e8e12e8c 80// We do not define this as empty since that produces a duplicate case label warning from clang-tidy
22577619
OM
81#define TCPLOG(pid, x) /* NOLINT(cppcoreguidelines-macro-usage) */ \
82 while (false) { \
83 cerr << x; /* NOLINT(bugprone-macro-parentheses) */ \
84 }
bc7a7b24
OM
85#endif
86
b35191d7
OM
87std::atomic<uint32_t> TCPConnection::s_currentConnections;
88
c0db60c3
OM
89TCPConnection::TCPConnection(int fileDesc, const ComboAddress& addr) :
90 data(2, 0), d_remote(addr), d_fd(fileDesc)
bc7a7b24
OM
91{
92 ++s_currentConnections;
93 (*t_tcpClientCounts)[d_remote]++;
94}
95
96TCPConnection::~TCPConnection()
97{
98 try {
c0db60c3 99 if (closesocket(d_fd) < 0) {
ab26d8d5 100 SLOG(g_log << Logger::Error << "Error closing socket for TCPConnection" << endl,
91092a9f 101 g_slogtcpin->info(Logr::Error, "Error closing socket for TCPConnection"));
c0db60c3 102 }
bc7a7b24 103 }
8c1cb8aa 104 catch (const PDNSException& e) {
ab26d8d5 105 SLOG(g_log << Logger::Error << "Error closing TCPConnection socket: " << e.reason << endl,
91092a9f 106 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCPConnection socket", "exception", Logging::Loggable("PDNSException")));
bc7a7b24
OM
107 }
108
c0db60c3 109 if (t_tcpClientCounts->count(d_remote) != 0 && (*t_tcpClientCounts)[d_remote]-- == 0) {
bc7a7b24 110 t_tcpClientCounts->erase(d_remote);
c0db60c3 111 }
bc7a7b24
OM
112 --s_currentConnections;
113}
114
c0db60c3 115static void terminateTCPConnection(int fileDesc)
bc7a7b24
OM
116{
117 try {
c0db60c3 118 t_fdm->removeReadFD(fileDesc);
bc7a7b24 119 }
8c1cb8aa 120 catch (const FDMultiplexerException& fde) {
bc7a7b24
OM
121 }
122}
123
c0db60c3 124static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& comboWriter, int rcode)
bc7a7b24
OM
125{
126 std::vector<uint8_t> packet;
b4a5be57 127 if (comboWriter->d_mdp.d_header.qdcount == 0U) {
bc7a7b24
OM
128 /* header-only */
129 packet.resize(sizeof(dnsheader));
130 }
131 else {
c0db60c3
OM
132 DNSPacketWriter packetWriter(packet, comboWriter->d_mdp.d_qname, comboWriter->d_mdp.d_qtype, comboWriter->d_mdp.d_qclass);
133 if (comboWriter->d_mdp.hasEDNS()) {
bc7a7b24
OM
134 /* we try to add the EDNS OPT RR even for truncated answers,
135 as rfc6891 states:
136 "The minimal response MUST be the DNS header, question section, and an
137 OPT record. This MUST also occur when a truncated response (using
138 the DNS header's TC bit) is returned."
139 */
c0db60c3
OM
140 packetWriter.addOpt(512, 0, 0);
141 packetWriter.commit();
bc7a7b24
OM
142 }
143 }
144
c0db60c3 145 auto& header = reinterpret_cast<dnsheader&>(packet.at(0)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) safe cast
bc7a7b24
OM
146 header.aa = 0;
147 header.ra = 1;
148 header.qr = 1;
149 header.tc = 0;
c0db60c3
OM
150 header.id = comboWriter->d_mdp.d_header.id;
151 header.rd = comboWriter->d_mdp.d_header.rd;
152 header.cd = comboWriter->d_mdp.d_header.cd;
bc7a7b24
OM
153 header.rcode = rcode;
154
c0db60c3 155 sendResponseOverTCP(comboWriter, packet);
bc7a7b24
OM
156}
157
c0db60c3 158void finishTCPReply(std::unique_ptr<DNSComboWriter>& comboWriter, bool hadError, bool updateInFlight)
bc7a7b24
OM
159{
160 // update tcp connection status, closing if needed and doing the fd multiplexer accounting
c0db60c3
OM
161 if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight > 0) {
162 comboWriter->d_tcpConnection->d_requestsInFlight--;
bc7a7b24
OM
163 }
164
165 // In the code below, we try to remove the fd from the set, but
166 // we don't know if another mthread already did the remove, so we can get a
167 // "Tried to remove unlisted fd" exception. Not that an inflight < limit test
168 // will not work since we do not know if the other mthread got an error or not.
169 if (hadError) {
c0db60c3
OM
170 terminateTCPConnection(comboWriter->d_socket);
171 comboWriter->d_socket = -1;
bc7a7b24
OM
172 return;
173 }
c0db60c3
OM
174 comboWriter->d_tcpConnection->queriesCount++;
175 if ((g_tcpMaxQueriesPerConn > 0 && comboWriter->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || (comboWriter->d_tcpConnection->isDropOnIdle() && comboWriter->d_tcpConnection->d_requestsInFlight == 0)) {
bc7a7b24 176 try {
c0db60c3 177 t_fdm->removeReadFD(comboWriter->d_socket);
bc7a7b24 178 }
8c1cb8aa 179 catch (FDMultiplexerException&) {
bc7a7b24 180 }
c0db60c3 181 comboWriter->d_socket = -1;
bc7a7b24
OM
182 return;
183 }
184
185 Utility::gettimeofday(&g_now, nullptr); // needs to be updated
186 struct timeval ttd = g_now;
187
188 // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
c0db60c3 189 if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
bc7a7b24
OM
190 // A read error might have happened. If we add the fd back, it will most likely error again.
191 // This is not a big issue, the next handleTCPClientReadable() will see another read error
192 // and take action.
193 ttd.tv_sec += g_tcpTimeout;
c0db60c3 194 t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
bc7a7b24
OM
195 return;
196 }
197 // fd might have been removed by read error code, or a read timeout, so expect an exception
198 try {
c0db60c3 199 t_fdm->setReadTTD(comboWriter->d_socket, ttd, g_tcpTimeout);
bc7a7b24 200 }
8c1cb8aa 201 catch (const FDMultiplexerException&) {
bc7a7b24
OM
202 // but if the FD was removed because of a timeout while we were sending a response,
203 // we need to re-arm it. If it was an error it will error again.
204 ttd.tv_sec += g_tcpTimeout;
c0db60c3 205 t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
bc7a7b24
OM
206 }
207}
208
209/*
210 * A helper class that by default closes the incoming TCP connection on destruct
211 * If you want to keep the connection alive, call keep() on the guard object
212 */
8c1cb8aa
OM
213class RunningTCPQuestionGuard
214{
bc7a7b24 215public:
c0db60c3
OM
216 RunningTCPQuestionGuard(const RunningTCPQuestionGuard&) = default;
217 RunningTCPQuestionGuard(RunningTCPQuestionGuard&&) = delete;
218 RunningTCPQuestionGuard& operator=(const RunningTCPQuestionGuard&) = default;
219 RunningTCPQuestionGuard& operator=(RunningTCPQuestionGuard&&) = delete;
032240be
OM
220 RunningTCPQuestionGuard(int fileDesc) :
221 d_fd(fileDesc) {}
bc7a7b24
OM
222 ~RunningTCPQuestionGuard()
223 {
224 if (d_fd != -1) {
225 terminateTCPConnection(d_fd);
226 d_fd = -1;
227 }
228 }
229 void keep()
230 {
231 d_fd = -1;
232 }
8b428a6b 233 bool handleTCPReadResult(int /* fd */, ssize_t bytes)
bc7a7b24
OM
234 {
235 if (bytes == 0) {
236 /* EOF */
237 return false;
238 }
c0db60c3 239 if (bytes < 0) {
bc7a7b24
OM
240 if (errno != EAGAIN && errno != EWOULDBLOCK) {
241 return false;
242 }
243 }
244 keep();
245 return true;
246 }
247
248private:
249 int d_fd{-1};
250};
251
f30f6502
OM
252static void handleNotify(std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname)
253{
254 if (!t_allowNotifyFrom || !t_allowNotifyFrom->match(comboWriter->d_mappedSource)) {
255 if (!g_quiet) {
256 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", address not matched by allow-notify-from" << endl,
257 g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, address not matched by allow-notify-from", "source", Logging::Loggable(comboWriter->d_mappedSource)));
258 }
259
260 t_Counters.at(rec::Counter::sourceDisallowedNotify)++;
261 return;
262 }
263
264 if (!isAllowNotifyForZone(qname)) {
265 if (!g_quiet) {
266 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", for " << qname.toLogString() << ", zone not matched by allow-notify-for" << endl,
267 g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, zone not matched by allow-notify-for", "source", Logging::Loggable(comboWriter->d_mappedSource), "zone", Logging::Loggable(qname)));
268 }
269
270 t_Counters.at(rec::Counter::zoneDisallowedNotify)++;
271 return;
272 }
273}
274
275static void doProtobufLogQuery(bool logQuery, LocalStateHolder<LuaConfigItems>& luaconfsLocal, const std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname, QType qtype, QClass qclass, const dnsheader* dnsheader, const shared_ptr<TCPConnection>& conn)
276{
277 try {
278 if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && comboWriter->d_policyTags.empty())) {
279 protobufLogQuery(luaconfsLocal, comboWriter->d_uuid, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet.source, true, dnsheader->id, conn->qlen, qname, qtype, qclass, comboWriter->d_policyTags, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta);
280 }
281 }
282 catch (const std::exception& e) {
283 if (g_logCommonErrors) {
284 SLOG(g_log << Logger::Warning << "Error parsing a TCP query packet for edns subnet: " << e.what() << endl,
285 g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a TCP query packet for edns subnet", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote)));
286 }
287 }
288}
cc79ff2b
OM
289
290static void doProcessTCPQuestion(std::unique_ptr<DNSComboWriter>& comboWriter, shared_ptr<TCPConnection>& conn, RunningTCPQuestionGuard& tcpGuard, int fileDesc)
291{
9bed6715 292 RecThreadInfo::self().incNumberOfDistributedQueries();
f30f6502
OM
293 struct timeval start
294 {
295 };
cc79ff2b
OM
296 Utility::gettimeofday(&start, nullptr);
297
298 DNSName qname;
299 uint16_t qtype = 0;
300 uint16_t qclass = 0;
301 bool needECS = false;
302 string requestorId;
303 string deviceId;
304 string deviceName;
305 bool logQuery = false;
306 bool qnameParsed = false;
307
308 comboWriter->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled != 0);
309 comboWriter->d_eventTrace.add(RecEventTrace::ReqRecv);
310 auto luaconfsLocal = g_luaconfs.getLocal();
311 if (checkProtobufExport(luaconfsLocal)) {
312 needECS = true;
313 }
314 logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
315 comboWriter->d_logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
316
3bc38426 317 if (needECS || (t_pdl && (t_pdl->hasGettagFFIFunc() || t_pdl->hasGettagFunc())) || comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
cc79ff2b
OM
318
319 try {
320 EDNSOptionViewMap ednsOptions;
321 comboWriter->d_ecsParsed = true;
322 comboWriter->d_ecsFound = false;
323 getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
324 comboWriter->d_ecsFound, &comboWriter->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr);
325 qnameParsed = true;
326
327 if (t_pdl) {
328 try {
3bc38426 329 if (t_pdl->hasGettagFFIFunc()) {
cc79ff2b
OM
330 RecursorLua4::FFIParams params(qname, qtype, comboWriter->d_destination, comboWriter->d_source, comboWriter->d_ednssubnet.source, comboWriter->d_data, comboWriter->d_policyTags, comboWriter->d_records, ednsOptions, comboWriter->d_proxyProtocolValues, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_rcode, comboWriter->d_ttlCap, comboWriter->d_variable, true, logQuery, comboWriter->d_logResponse, comboWriter->d_followCNAMERecords, comboWriter->d_extendedErrorCode, comboWriter->d_extendedErrorExtra, comboWriter->d_responsePaddingDisabled, comboWriter->d_meta);
331 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
332 comboWriter->d_tag = t_pdl->gettag_ffi(params);
333 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, comboWriter->d_tag, false);
334 }
3bc38426 335 else if (t_pdl->hasGettagFunc()) {
cc79ff2b
OM
336 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag);
337 comboWriter->d_tag = t_pdl->gettag(comboWriter->d_source, comboWriter->d_ednssubnet.source, comboWriter->d_destination, qname, qtype, &comboWriter->d_policyTags, comboWriter->d_data, ednsOptions, true, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_proxyProtocolValues);
338 comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag, comboWriter->d_tag, false);
339 }
340 }
341 catch (const std::exception& e) {
342 if (g_logCommonErrors) {
343 SLOG(g_log << Logger::Warning << "Error parsing a query packet qname='" << qname << "' for tag determination, setting tag=0: " << e.what() << endl,
344 g_slogtcpin->info(Logr::Warning, "Error parsing a query packet for tag determination, setting tag=0", "remote", Logging::Loggable(conn->d_remote), "qname", Logging::Loggable(qname)));
345 }
346 }
347 }
348 }
349 catch (const std::exception& e) {
350 if (g_logCommonErrors) {
351 SLOG(g_log << Logger::Warning << "Error parsing a query packet for tag determination, setting tag=0: " << e.what() << endl,
352 g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a query packet for tag determination, setting tag=0", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote)));
353 }
354 }
355 }
356
357 if (comboWriter->d_tag == 0 && !comboWriter->d_responsePaddingDisabled && g_paddingFrom.match(comboWriter->d_remote)) {
358 comboWriter->d_tag = g_paddingTag;
359 }
360
361 const dnsheader_aligned headerdata(conn->data.data());
362 const struct dnsheader* dnsheader = headerdata.get();
363
364 if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
4c5a50dc
OM
365 comboWriter->d_requestorId = std::move(requestorId);
366 comboWriter->d_deviceId = std::move(deviceId);
367 comboWriter->d_deviceName = std::move(deviceName);
cc79ff2b
OM
368 comboWriter->d_uuid = getUniqueID();
369 }
370
371 if (t_protobufServers.servers) {
f30f6502 372 doProtobufLogQuery(logQuery, luaconfsLocal, comboWriter, qname, qtype, qclass, dnsheader, conn);
cc79ff2b
OM
373 }
374
375 if (t_pdl) {
376 bool ipf = t_pdl->ipfilter(comboWriter->d_source, comboWriter->d_destination, *dnsheader, comboWriter->d_eventTrace);
377 if (ipf) {
378 if (!g_quiet) {
379 SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] DROPPED TCP question from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl,
380 g_slogtcpin->info(Logr::Info, "Dropped TCP question based on policy", "remote", Logging::Loggable(conn->d_remote), "source", Logging::Loggable(comboWriter->d_source)));
381 }
382 t_Counters.at(rec::Counter::policyDrops)++;
383 return;
384 }
385 }
386
387 if (comboWriter->d_mdp.d_header.qr) {
388 t_Counters.at(rec::Counter::ignoredCount)++;
389 if (g_logCommonErrors) {
390 SLOG(g_log << Logger::Error << "Ignoring answer from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
391 g_slogtcpin->info(Logr::Error, "Ignoring answer from TCP client on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
392 }
393 return;
394 }
395 if (comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Query) && comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Notify)) {
396 t_Counters.at(rec::Counter::ignoredCount)++;
397 if (g_logCommonErrors) {
398 SLOG(g_log << Logger::Error << "Ignoring unsupported opcode " << Opcode::to_s(comboWriter->d_mdp.d_header.opcode) << " from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
399 g_slogtcpin->info(Logr::Error, "Ignoring unsupported opcode from TCP client", "remote", Logging::Loggable(comboWriter->getRemote()), "opcode", Logging::Loggable(Opcode::to_s(comboWriter->d_mdp.d_header.opcode))));
400 }
401 sendErrorOverTCP(comboWriter, RCode::NotImp);
402 tcpGuard.keep();
403 return;
404 }
405 if (dnsheader->qdcount == 0U) {
406 t_Counters.at(rec::Counter::emptyQueriesCount)++;
407 if (g_logCommonErrors) {
408 SLOG(g_log << Logger::Error << "Ignoring empty (qdcount == 0) query from " << comboWriter->getRemote() << " on server socket!" << endl,
409 g_slogtcpin->info(Logr::Error, "Ignoring empty (qdcount == 0) query on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
410 }
411 sendErrorOverTCP(comboWriter, RCode::NotImp);
412 tcpGuard.keep();
413 return;
414 }
415 {
416 // We have read a proper query
cc79ff2b
OM
417 ++t_Counters.at(rec::Counter::qcounter);
418 ++t_Counters.at(rec::Counter::tcpqcounter);
9bed6715
OM
419 if (comboWriter->d_source.sin4.sin_family == AF_INET6) {
420 ++t_Counters.at(rec::Counter::ipv6qcounter);
421 }
cc79ff2b
OM
422
423 if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
f30f6502 424 handleNotify(comboWriter, qname);
cc79ff2b
OM
425 }
426
427 string response;
428 RecursorPacketCache::OptPBData pbData{boost::none};
429
430 if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Query)) {
431 /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
432 but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
433 as cacheable we would cache it with a wrong tag, so better safe than sorry. */
434 comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck);
435 bool cacheHit = checkForCacheHit(qnameParsed, comboWriter->d_tag, conn->data, qname, qtype, qclass, g_now, response, comboWriter->d_qhash, pbData, true, comboWriter->d_source, comboWriter->d_mappedSource);
436 comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
437
438 if (cacheHit) {
439 if (!g_quiet) {
440 SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " TCP question answered from packet cache tag=" << comboWriter->d_tag << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
441 g_slogtcpin->info(Logr::Notice, "TCP question answered from packet cache", "tag", Logging::Loggable(comboWriter->d_tag),
442 "qname", Logging::Loggable(qname), "qtype", Logging::Loggable(QType(qtype)),
443 "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
444 }
445
446 bool hadError = sendResponseOverTCP(comboWriter, response);
447 finishTCPReply(comboWriter, hadError, false);
448 struct timeval now
f30f6502
OM
449 {
450 };
cc79ff2b
OM
451 Utility::gettimeofday(&now, nullptr);
452 uint64_t spentUsec = uSec(now - start);
453 t_Counters.at(rec::Histogram::cumulativeAnswers)(spentUsec);
454 comboWriter->d_eventTrace.add(RecEventTrace::AnswerSent);
455
456 if (t_protobufServers.servers && comboWriter->d_logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || !pbData || pbData->d_tagged)) {
457 struct timeval tval
f30f6502
OM
458 {
459 0, 0
460 };
cc79ff2b
OM
461 protobufLogResponse(dnsheader, luaconfsLocal, pbData, tval, true, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet, comboWriter->d_uuid, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, comboWriter->d_eventTrace, comboWriter->d_policyTags);
462 }
463
464 if (comboWriter->d_eventTrace.enabled() && (SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) != 0) {
465 SLOG(g_log << Logger::Info << comboWriter->d_eventTrace.toString() << endl,
466 g_slogtcpin->info(Logr::Info, comboWriter->d_eventTrace.toString())); // More fancy?
467 }
468 tcpGuard.keep();
469 t_Counters.updateSnap(g_regressionTestMode);
470 return;
471 } // cache hit
472 } // query opcode
473
474 if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
475 if (!g_quiet) {
476 SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
477 g_slogtcpin->info(Logr::Notice, "Got NOTIFY", "qname", Logging::Loggable(qname), "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
478 }
479
480 requestWipeCaches(qname);
481
482 // the operation will now be treated as a Query, generating
483 // a normal response, as the rest of the code does not
484 // check dh->opcode, but we need to ensure that the response
485 // to this request does not get put into the packet cache
486 comboWriter->d_variable = true;
487 }
488
489 // setup for startDoResolve() in an mthread
490 ++conn->d_requestsInFlight;
491 if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
492 t_fdm->removeReadFD(fileDesc); // should no longer awake ourselves when there is data to read
493 }
494 else {
495 Utility::gettimeofday(&g_now, nullptr); // needed?
496 struct timeval ttd = g_now;
497 t_fdm->setReadTTD(fileDesc, ttd, g_tcpTimeout);
498 }
499 tcpGuard.keep();
500 g_multiTasker->makeThread(startDoResolve, comboWriter.release()); // deletes dc
501 } // good query
502}
503
f30f6502 504static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var)
bc7a7b24 505{
c0db60c3 506 auto conn = boost::any_cast<shared_ptr<TCPConnection>>(var);
bc7a7b24 507
c0db60c3 508 RunningTCPQuestionGuard tcpGuard{fileDesc};
bc7a7b24
OM
509
510 if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
511 ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
512 if (bytes <= 0) {
c0db60c3 513 tcpGuard.handleTCPReadResult(fileDesc, bytes);
bc7a7b24
OM
514 return;
515 }
516
517 conn->proxyProtocolGot += bytes;
518 conn->data.resize(conn->proxyProtocolGot);
519 ssize_t remaining = isProxyHeaderComplete(conn->data);
520 if (remaining == 0) {
521 if (g_logCommonErrors) {
ab26d8d5 522 SLOG(g_log << Logger::Error << "Unable to consume proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
91092a9f 523 g_slogtcpin->info(Logr::Error, "Unable to consume proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
bc7a7b24 524 }
7d3d2f4f 525 ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
bc7a7b24
OM
526 return;
527 }
c0db60c3 528 if (remaining < 0) {
bc7a7b24
OM
529 conn->proxyProtocolNeed = -remaining;
530 conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
531 tcpGuard.keep();
532 return;
533 }
c0db60c3 534 {
bc7a7b24
OM
535 /* proxy header received */
536 /* we ignore the TCP field for now, but we could properly set whether
537 the connection was received over UDP or TCP if needed */
c0db60c3 538 bool tcp = false;
bc7a7b24
OM
539 bool proxy = false;
540 size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
541 if (used <= 0) {
542 if (g_logCommonErrors) {
ab26d8d5 543 SLOG(g_log << Logger::Error << "Unable to parse proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
91092a9f 544 g_slogtcpin->info(Logr::Error, "Unable to parse proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
bc7a7b24 545 }
7d3d2f4f 546 ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
bc7a7b24
OM
547 return;
548 }
c0db60c3 549 if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
bc7a7b24 550 if (g_logCommonErrors) {
ab26d8d5 551 SLOG(g_log << Logger::Error << "Proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping" << endl,
91092a9f 552 g_slogtcpin->info(Logr::Error, "Proxy protocol header in packet from TCP client is larger than proxy-protocol-maximum-size", "remote", Logging::Loggable(conn->d_remote), "size", Logging::Loggable(used)));
bc7a7b24 553 }
7d3d2f4f 554 ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
bc7a7b24
OM
555 return;
556 }
557
558 /* Now that we have retrieved the address of the client, as advertised by the proxy
559 via the proxy protocol header, check that it is allowed by our ACL */
560 /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
e81063e5
OM
561 conn->d_mappedSource = conn->d_source;
562 if (t_proxyMapping) {
032240be 563 if (const auto* iter = t_proxyMapping->lookup(conn->d_source)) {
c0db60c3
OM
564 conn->d_mappedSource = iter->second.address;
565 ++iter->second.stats.netmaskMatches;
e81063e5
OM
566 }
567 }
568 if (t_allowFrom && !t_allowFrom->match(&conn->d_mappedSource)) {
bc7a7b24 569 if (!g_quiet) {
e8e12e8c 570 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << conn->d_mappedSource.toString() << ", address not matched by allow-from" << endl,
91092a9f 571 g_slogtcpin->info(Logr::Error, "Dropping TCP query, address not matched by allow-from", "remote", Logging::Loggable(conn->d_remote)));
bc7a7b24
OM
572 }
573
7d3d2f4f 574 ++t_Counters.at(rec::Counter::unauthorizedTCP);
bc7a7b24
OM
575 return;
576 }
577
578 conn->data.resize(2);
579 conn->state = TCPConnection::BYTE0;
580 }
581 }
582
8c1cb8aa 583 if (conn->state == TCPConnection::BYTE0) {
c0db60c3
OM
584 ssize_t bytes = recv(conn->getFD(), conn->data.data(), 2, 0);
585 if (bytes == 1) {
8c1cb8aa 586 conn->state = TCPConnection::BYTE1;
c0db60c3 587 }
8c1cb8aa
OM
588 if (bytes == 2) {
589 conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
bc7a7b24 590 conn->data.resize(conn->qlen);
8c1cb8aa
OM
591 conn->bytesread = 0;
592 conn->state = TCPConnection::GETQUESTION;
bc7a7b24
OM
593 }
594 if (bytes <= 0) {
c0db60c3 595 tcpGuard.handleTCPReadResult(fileDesc, bytes);
bc7a7b24
OM
596 return;
597 }
598 }
599
8c1cb8aa
OM
600 if (conn->state == TCPConnection::BYTE1) {
601 ssize_t bytes = recv(conn->getFD(), &conn->data[1], 1, 0);
602 if (bytes == 1) {
603 conn->state = TCPConnection::GETQUESTION;
604 conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
bc7a7b24 605 conn->data.resize(conn->qlen);
8c1cb8aa 606 conn->bytesread = 0;
bc7a7b24
OM
607 }
608 if (bytes <= 0) {
c0db60c3 609 if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
8c1cb8aa 610 if (g_logCommonErrors) {
ab26d8d5 611 SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected after first byte" << endl,
91092a9f 612 g_slogtcpin->info(Logr::Error, "TCP client disconnected after first byte", "remote", Logging::Loggable(conn->d_remote)));
bc7a7b24
OM
613 }
614 }
615 return;
616 }
617 }
618
8c1cb8aa
OM
619 if (conn->state == TCPConnection::GETQUESTION) {
620 ssize_t bytes = recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
bc7a7b24 621 if (bytes <= 0) {
c0db60c3 622 if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
8c1cb8aa 623 if (g_logCommonErrors) {
ab26d8d5 624 SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected while reading question body" << endl,
91092a9f 625 g_slogtcpin->info(Logr::Error, "TCP client disconnected while reading question body", "remote", Logging::Loggable(conn->d_remote)));
bc7a7b24
OM
626 }
627 }
628 return;
629 }
c0db60c3 630 if (bytes > std::numeric_limits<std::uint16_t>::max()) {
8c1cb8aa 631 if (g_logCommonErrors) {
ab26d8d5 632 SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " sent an invalid question size while reading question body" << endl,
91092a9f 633 g_slogtcpin->info(Logr::Error, "TCP client sent an invalid question size while reading question body", "remote", Logging::Loggable(conn->d_remote)));
bc7a7b24
OM
634 }
635 return;
636 }
8c1cb8aa
OM
637 conn->bytesread += (uint16_t)bytes;
638 if (conn->bytesread == conn->qlen) {
bc7a7b24 639 conn->state = TCPConnection::BYTE0;
c0db60c3 640 std::unique_ptr<DNSComboWriter> comboWriter;
bc7a7b24 641 try {
c0db60c3 642 comboWriter = std::make_unique<DNSComboWriter>(conn->data, g_now, t_pdl);
bc7a7b24 643 }
8c1cb8aa 644 catch (const MOADNSException& mde) {
7d3d2f4f 645 t_Counters.at(rec::Counter::clientParseError)++;
bc7a7b24 646 if (g_logCommonErrors) {
ab26d8d5 647 SLOG(g_log << Logger::Error << "Unable to parse packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
91092a9f 648 g_slogtcpin->info(Logr::Error, "Unable to parse packet from TCP client", "remte", Logging::Loggable(conn->d_remote)));
bc7a7b24
OM
649 }
650 return;
651 }
cfbcc0f3 652
c0db60c3
OM
653 comboWriter->d_tcpConnection = conn; // carry the torch
654 comboWriter->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
655 comboWriter->d_tcp = true;
656 comboWriter->setRemote(conn->d_remote); // the address the query was received from
657 comboWriter->setSource(conn->d_source); // the address we assume the query is coming from, might be set by proxy protocol
bc7a7b24
OM
658 ComboAddress dest;
659 dest.reset();
660 dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
661 socklen_t len = dest.getSocklen();
b4a5be57 662 getsockname(conn->getFD(), reinterpret_cast<sockaddr*>(&dest), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
c0db60c3
OM
663 comboWriter->setLocal(dest); // the address we received the query on
664 comboWriter->setDestination(conn->d_destination); // the address we assume the query is received on, might be set by proxy protocol
665 comboWriter->setMappedSource(conn->d_mappedSource); // the address we assume the query is coming from after table based mapping
bc7a7b24
OM
666 /* we can't move this if we want to be able to access the values in
667 all queries sent over this connection */
c0db60c3 668 comboWriter->d_proxyProtocolValues = conn->proxyProtocolValues;
bc7a7b24 669
cc79ff2b
OM
670 doProcessTCPQuestion(comboWriter, conn, tcpGuard, fileDesc);
671 } // reading query
672 }
bc7a7b24
OM
673 // more to come
674 tcpGuard.keep();
675}
676
677//! Handle new incoming TCP connection
c0db60c3 678void handleNewTCPQuestion(int fileDesc, [[maybe_unused]] FDMultiplexer::funcparam_t& var)
bc7a7b24
OM
679{
680 ComboAddress addr;
8c1cb8aa 681 socklen_t addrlen = sizeof(addr);
b4a5be57 682 int newsock = accept(fileDesc, reinterpret_cast<struct sockaddr*>(&addr), &addrlen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
8c1cb8aa 683 if (newsock >= 0) {
e8e12e8c 684 if (g_multiTasker->numProcesses() > g_maxMThreads) {
7d3d2f4f 685 t_Counters.at(rec::Counter::overCapacityDrops)++;
bc7a7b24
OM
686 try {
687 closesocket(newsock);
688 }
8c1cb8aa 689 catch (const PDNSException& e) {
ab26d8d5 690 SLOG(g_log << Logger::Error << "Error closing TCP socket after an over capacity drop: " << e.reason << endl,
91092a9f 691 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCP socket after an over capacity drop", "exception", Logging::Loggable("PDNSException")));
bc7a7b24
OM
692 }
693 return;
694 }
695
8c1cb8aa 696 if (t_remotes) {
bc7a7b24
OM
697 t_remotes->push_back(addr);
698 }
699
8a6888f7
OM
700 ComboAddress destaddr;
701 socklen_t len = sizeof(destaddr);
702 getsockname(newsock, reinterpret_cast<sockaddr*>(&destaddr), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
703 bool fromProxyProtocolSource = expectProxyProtocol(addr, destaddr);
e81063e5
OM
704 ComboAddress mappedSource = addr;
705 if (!fromProxyProtocolSource && t_proxyMapping) {
032240be 706 if (const auto* iter = t_proxyMapping->lookup(addr)) {
c0db60c3
OM
707 mappedSource = iter->second.address;
708 ++iter->second.stats.netmaskMatches;
e81063e5
OM
709 }
710 }
711 if (!fromProxyProtocolSource && t_allowFrom && !t_allowFrom->match(&mappedSource)) {
c0db60c3 712 if (!g_quiet) {
e8e12e8c 713 SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << mappedSource.toString() << ", address neither matched by allow-from nor proxy-protocol-from" << endl,
91092a9f 714 g_slogtcpin->info(Logr::Error, "dropping TCP query address neither matched by allow-from nor proxy-protocol-from", "source", Logging::Loggable(mappedSource)));
c0db60c3 715 }
7d3d2f4f 716 t_Counters.at(rec::Counter::unauthorizedTCP)++;
bc7a7b24
OM
717 try {
718 closesocket(newsock);
719 }
8c1cb8aa 720 catch (const PDNSException& e) {
ab26d8d5 721 SLOG(g_log << Logger::Error << "Error closing TCP socket after an ACL drop: " << e.reason << endl,
91092a9f 722 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCP socket after an ACL drop", "exception", Logging::Loggable("PDNSException")));
bc7a7b24
OM
723 }
724 return;
725 }
726
c0db60c3 727 if (g_maxTCPPerClient > 0 && t_tcpClientCounts->count(addr) > 0 && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
7d3d2f4f 728 t_Counters.at(rec::Counter::tcpClientOverflow)++;
bc7a7b24
OM
729 try {
730 closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
731 }
8c1cb8aa 732 catch (const PDNSException& e) {
ab26d8d5 733 SLOG(g_log << Logger::Error << "Error closing TCP socket after an overflow drop: " << e.reason << endl,
91092a9f 734 g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCP socket after an overflow drop", "exception", Logging::Loggable("PDNSException")));
bc7a7b24
OM
735 }
736 return;
737 }
738
739 setNonBlocking(newsock);
34b7ae04 740 setTCPNoDelay(newsock);
c0db60c3
OM
741 std::shared_ptr<TCPConnection> tcpConn = std::make_shared<TCPConnection>(newsock, addr);
742 tcpConn->d_source = addr;
8a6888f7 743 tcpConn->d_destination = destaddr;
c0db60c3 744 tcpConn->d_mappedSource = mappedSource;
bc7a7b24
OM
745
746 if (fromProxyProtocolSource) {
c0db60c3
OM
747 tcpConn->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
748 tcpConn->data.resize(tcpConn->proxyProtocolNeed);
749 tcpConn->state = TCPConnection::PROXYPROTOCOLHEADER;
bc7a7b24
OM
750 }
751 else {
c0db60c3 752 tcpConn->state = TCPConnection::BYTE0;
bc7a7b24
OM
753 }
754
032240be
OM
755 struct timeval ttd
756 {
757 };
bc7a7b24
OM
758 Utility::gettimeofday(&ttd, nullptr);
759 ttd.tv_sec += g_tcpTimeout;
760
c0db60c3 761 t_fdm->addReadFD(tcpConn->getFD(), handleRunningTCPQuestion, tcpConn, &ttd);
bc7a7b24
OM
762 }
763}
764
c0db60c3 765static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var);
bc7a7b24
OM
766
767static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
768{
769 TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
770
771 pid->lowState = newstate;
772
773 // handle state transitions
774 switch (oldstate) {
775 case IOState::NeedRead:
776
777 switch (newstate) {
778 case IOState::NeedWrite:
779 TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
780 t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
781 break;
782 case IOState::NeedRead:
783 break;
784 case IOState::Done:
785 TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
786 t_fdm->removeReadFD(pid->tcpsock);
787 break;
788 case IOState::Async:
789 throw std::runtime_error("TLS async mode not supported");
790 break;
791 }
792 break;
793
794 case IOState::NeedWrite:
795
796 switch (newstate) {
797 case IOState::NeedRead:
798 TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
799 t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
800 break;
801 case IOState::NeedWrite:
802 break;
803 case IOState::Done:
804 TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
805 t_fdm->removeWriteFD(pid->tcpsock);
806 break;
807 case IOState::Async:
808 throw std::runtime_error("TLS async mode not supported");
809 break;
810 }
811 break;
812
813 case IOState::Done:
814 switch (newstate) {
815 case IOState::NeedRead:
816 TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
817 t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
818 break;
819 case IOState::NeedWrite:
820 TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
821 t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
822 break;
823 case IOState::Done:
824 break;
825 case IOState::Async:
826 throw std::runtime_error("TLS async mode not supported");
827 break;
828 }
829 break;
830
831 case IOState::Async:
832 throw std::runtime_error("TLS async mode not supported");
833 break;
834 }
bc7a7b24
OM
835}
836
c0db60c3 837static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var)
bc7a7b24 838{
c0db60c3 839 auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
b4a5be57
OM
840 assert(pid->tcphandler); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay): def off assert triggers it
841 assert(fileDesc == pid->tcphandler->getDescriptor()); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay) idem
bc7a7b24
OM
842 IOState newstate = IOState::Done;
843
844 TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
845
846 // In the code below, we want to update the state of the fd before calling sendEvent
847 // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
848
849 switch (pid->highState) {
850 case TCPAction::DoingRead:
851 TCPLOG(pid->tcpsock, "highState: Reading" << endl);
852 // In arecvtcp, the buffer was resized already so inWanted bytes will fit
853 // try reading
854 try {
855 newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
856 switch (newstate) {
857 case IOState::Done:
858 case IOState::NeedRead:
859 TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
860 TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
861 if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
862 pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
863 newstate = IOState::Done;
864 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
e8e12e8c 865 g_multiTasker->sendEvent(pid, &pid->inMSG);
bc7a7b24
OM
866 return;
867 }
868 break;
869 case IOState::NeedWrite:
870 break;
871 case IOState::Async:
872 throw std::runtime_error("TLS async mode not supported");
873 break;
874 }
875 }
876 catch (const std::exception& e) {
877 newstate = IOState::Done;
878 TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
879 PacketBuffer empty;
880 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
e8e12e8c 881 g_multiTasker->sendEvent(pid, &empty); // this conveys error status
bc7a7b24
OM
882 return;
883 }
884 break;
885
886 case TCPAction::DoingWrite:
887 TCPLOG(pid->tcpsock, "highState: Writing" << endl);
888 try {
889 TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
890 newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
891 TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
892 switch (newstate) {
893 case IOState::Done: {
894 TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
895 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
e8e12e8c 896 g_multiTasker->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
bc7a7b24
OM
897 return;
898 }
899 case IOState::NeedRead:
900 TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
901 break;
902 case IOState::NeedWrite:
903 TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
904 break;
905 case IOState::Async:
906 throw std::runtime_error("TLS async mode not supported");
907 break;
908 }
909 }
910 catch (const std::exception& e) {
911 newstate = IOState::Done;
912 TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
913 PacketBuffer sent;
914 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
e8e12e8c 915 g_multiTasker->sendEvent(pid, &sent); // we convey error status by sending empty string
bc7a7b24
OM
916 return;
917 }
918 break;
919 }
920
921 // Cases that did not end up doing a sendEvent
922 TCPIOHandlerStateChange(pid->lowState, newstate, pid);
923}
924
3bb98d97 925void checkFastOpenSysctl([[maybe_unused]] bool active, [[maybe_unused]] Logr::log_t log)
bc7a7b24
OM
926{
927#ifdef __linux__
928 string line;
929 if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
930 int flag = std::stoi(line);
931 if (active && !(flag & 1)) {
62b191dc
OM
932 SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
933 log->info(Logr::Error, "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it"));
bc7a7b24
OM
934 }
935 if (!active && !(flag & 2)) {
62b191dc
OM
936 SLOG(g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
937 log->info(Logr::Error, "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it"));
bc7a7b24
OM
938 }
939 }
940 else {
62b191dc
OM
941 SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
942 log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
8c1cb8aa 943 }
bc7a7b24 944#else
62b191dc
OM
945 SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
946 log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
bc7a7b24
OM
947#endif
948}
949
43f91cad 950void checkTFOconnect(Logr::log_t log)
bc7a7b24
OM
951{
952 try {
c0db60c3
OM
953 Socket socket(AF_INET, SOCK_STREAM);
954 socket.setNonBlocking();
955 socket.setFastOpenConnect();
bc7a7b24
OM
956 }
957 catch (const NetworkError& e) {
62b191dc
OM
958 SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl,
959 log->error(Logr::Error, e.what(), "tcp-fast-open-connect enabled but returned error"));
bc7a7b24
OM
960 }
961}
962
bc7a7b24
OM
963LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
964{
965 TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
966
967 auto pident = std::make_shared<PacketID>();
968 pident->tcphandler = handler;
969 pident->tcpsock = handler->getDescriptor();
970 pident->outMSG = data;
971 pident->highState = TCPAction::DoingWrite;
972
c0db60c3 973 IOState state = IOState::Done;
bc7a7b24
OM
974 try {
975 TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
976 state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
977 TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
978
979 if (state == IOState::Done) {
980 TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
981 return LWResult::Result::Success;
982 }
983 }
984 catch (const std::exception& e) {
985 TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
986 return LWResult::Result::PermanentError;
987 }
988
989 // Will set pident->lowState
990 TCPIOHandlerStateChange(IOState::Done, state, pident);
991
992 PacketBuffer packet;
e8e12e8c 993 int ret = g_multiTasker->waitEvent(pident, &packet, g_networkTimeoutMsec);
bc7a7b24
OM
994 TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
995 if (ret == 0) {
996 TCPLOG(pident->tcpsock, "timeout" << endl);
997 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
998 return LWResult::Result::Timeout;
999 }
c0db60c3 1000 if (ret == -1) { // error
bc7a7b24
OM
1001 TCPLOG(pident->tcpsock, "PermanentError" << endl);
1002 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
1003 return LWResult::Result::PermanentError;
1004 }
c0db60c3 1005 if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
bc7a7b24
OM
1006 // fd housekeeping done by TCPIOHandlerIO
1007 TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
1008 return LWResult::Result::PermanentError;
1009 }
1010
1011 TCPLOG(pident->tcpsock, "asendtcp success" << endl);
1012 return LWResult::Result::Success;
1013}
1014
1015LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
1016{
1017 TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
1018 data.resize(len);
1019
1020 // We might have data already available from the TLS layer, try to get that into the buffer
1021 size_t pos = 0;
c0db60c3 1022 IOState state = IOState::Done;
bc7a7b24
OM
1023 try {
1024 TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
1025 state = handler->tryRead(data, pos, len);
1026 TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
1027 switch (state) {
1028 case IOState::Done:
1029 case IOState::NeedRead:
1030 if (pos == len || (incompleteOkay && pos > 0)) {
1031 data.resize(pos);
1032 TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
1033 return LWResult::Result::Success;
1034 }
1035 break;
1036 case IOState::NeedWrite:
1037 break;
1038 case IOState::Async:
1039 throw std::runtime_error("TLS async mode not supported");
1040 break;
1041 }
1042 }
1043 catch (const std::exception& e) {
1044 TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
1045 return LWResult::Result::PermanentError;
1046 }
1047
1048 auto pident = std::make_shared<PacketID>();
1049 pident->tcphandler = handler;
1050 pident->tcpsock = handler->getDescriptor();
1051 // We might have a partial result
1052 pident->inMSG = std::move(data);
1053 pident->inPos = pos;
1054 pident->inWanted = len;
1055 pident->inIncompleteOkay = incompleteOkay;
1056 pident->highState = TCPAction::DoingRead;
1057
1058 data.clear();
1059
1060 // Will set pident->lowState
1061 TCPIOHandlerStateChange(IOState::Done, state, pident);
1062
e8e12e8c 1063 int ret = g_multiTasker->waitEvent(pident, &data, g_networkTimeoutMsec);
8c1cb8aa 1064 TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ');
bc7a7b24
OM
1065 if (ret == 0) {
1066 TCPLOG(pident->tcpsock, "timeout" << endl);
1067 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
1068 return LWResult::Result::Timeout;
1069 }
c0db60c3 1070 if (ret == -1) {
bc7a7b24
OM
1071 TCPLOG(pident->tcpsock, "PermanentError" << endl);
1072 TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
1073 return LWResult::Result::PermanentError;
1074 }
c0db60c3 1075 if (data.empty()) { // error, EOF or other
bc7a7b24
OM
1076 // fd housekeeping done by TCPIOHandlerIO
1077 TCPLOG(pident->tcpsock, "EOF" << endl);
1078 return LWResult::Result::PermanentError;
1079 }
1080
1081 TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
1082 return LWResult::Result::Success;
1083}
1084
d61b8a01 1085void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets, Logr::log_t log)
bc7a7b24 1086{
ddbd9c34
FM
1087 vector<string> localAddresses;
1088 stringtok(localAddresses, ::arg()["local-address"], " ,");
bc7a7b24 1089
ddbd9c34 1090 if (localAddresses.empty()) {
bc7a7b24 1091 throw PDNSException("No local address specified");
ddbd9c34 1092 }
bc7a7b24 1093
73e77f7f 1094#ifdef TCP_DEFER_ACCEPT
ddbd9c34 1095 auto first = true;
73e77f7f 1096#endif
94fc1dc5 1097 const uint16_t defaultLocalPort = ::arg().asNum("local-port");
ddbd9c34 1098 for (const auto& localAddress : localAddresses) {
94fc1dc5 1099 ComboAddress address{localAddress, defaultLocalPort};
ddbd9c34
FM
1100 const int socketFd = socket(address.sin6.sin6_family, SOCK_STREAM, 0);
1101 if (socketFd < 0) {
8c1cb8aa 1102 throw PDNSException("Making a TCP server socket for resolver: " + stringerror());
ddbd9c34 1103 }
bc7a7b24 1104
ddbd9c34 1105 setCloseOnExec(socketFd);
bc7a7b24 1106
8c1cb8aa 1107 int tmp = 1;
ddbd9c34 1108 if (setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp) < 0) {
62b191dc
OM
1109 int err = errno;
1110 SLOG(g_log << Logger::Error << "Setsockopt failed for TCP listening socket" << endl,
1111 log->error(Logr::Critical, err, "Setsockopt failed for TCP listening socket"));
c0db60c3 1112 _exit(1);
bc7a7b24 1113 }
ddbd9c34 1114 if (address.sin6.sin6_family == AF_INET6 && setsockopt(socketFd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
bc7a7b24 1115 int err = errno;
c0db60c3 1116 SLOG(g_log << Logger::Error << "Failed to set IPv6 socket to IPv6 only, continuing anyhow: " << stringerror(err) << endl,
c42e8958 1117 log->error(Logr::Warning, err, "Failed to set IPv6 socket to IPv6 only, continuing anyhow"));
bc7a7b24
OM
1118 }
1119
1120#ifdef TCP_DEFER_ACCEPT
ddbd9c34
FM
1121 if (setsockopt(socketFd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
1122 if (first) {
d8d1d955
OM
1123 SLOG(g_log << Logger::Info << "Enabled TCP data-ready filter for (slight) DoS protection" << endl,
1124 log->info(Logr::Info, "Enabled TCP data-ready filter for (slight) DoS protection"));
6cb5db59 1125 }
bc7a7b24
OM
1126 }
1127#endif
1128
ddbd9c34
FM
1129 if (::arg().mustDo("non-local-bind")) {
1130 Utility::setBindAny(AF_INET, socketFd);
1131 }
bc7a7b24
OM
1132
1133 if (g_reusePort) {
1134#if defined(SO_REUSEPORT_LB)
1135 try {
398bb0f1 1136 SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
bc7a7b24
OM
1137 }
1138 catch (const std::exception& e) {
1139 throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
1140 }
1141#elif defined(SO_REUSEPORT)
1142 try {
ddbd9c34 1143 SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, 1);
bc7a7b24
OM
1144 }
1145 catch (const std::exception& e) {
1146 throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
1147 }
1148#endif
1149 }
1150
1151 if (SyncRes::s_tcp_fast_open > 0) {
62b191dc 1152 checkFastOpenSysctl(false, log);
bc7a7b24 1153#ifdef TCP_FASTOPEN
ddbd9c34 1154 if (setsockopt(socketFd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
bc7a7b24 1155 int err = errno;
c0db60c3 1156 SLOG(g_log << Logger::Error << "Failed to enable TCP Fast Open for listening socket: " << stringerror(err) << endl,
d8d1d955 1157 log->error(Logr::Error, err, "Failed to enable TCP Fast Open for listening socket"));
bc7a7b24
OM
1158 }
1159#else
d8d1d955
OM
1160 SLOG(g_log << Logger::Warning << "TCP Fast Open configured but not supported for listening socket" << endl,
1161 log->info(Logr::Warning, "TCP Fast Open configured but not supported for listening socket"));
bc7a7b24
OM
1162#endif
1163 }
1164
ddbd9c34 1165 socklen_t socklen = address.sin4.sin_family == AF_INET ? sizeof(address.sin4) : sizeof(address.sin6);
b4a5be57 1166 if (::bind(socketFd, reinterpret_cast<struct sockaddr*>(&address), socklen) < 0) { // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
94fc1dc5 1167 throw PDNSException("Binding TCP server socket for " + address.toStringWithPort() + ": " + stringerror());
ddbd9c34 1168 }
bc7a7b24 1169
ddbd9c34 1170 setNonBlocking(socketFd);
bc7a7b24 1171 try {
ddbd9c34 1172 setSocketSendBuffer(socketFd, 65000);
bc7a7b24
OM
1173 }
1174 catch (const std::exception& e) {
d8d1d955
OM
1175 SLOG(g_log << Logger::Error << e.what() << endl,
1176 log->error(Logr::Error, e.what(), "Exception while setting socket send buffer"));
bc7a7b24
OM
1177 }
1178
ddbd9c34
FM
1179 listen(socketFd, 128);
1180 deferredAdds.emplace_back(socketFd, handleNewTCPQuestion);
1181 tcpSockets.insert(socketFd);
bc7a7b24
OM
1182
1183 // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
1184 // - fd is not that which we know here, but returned from accept()
ddbd9c34
FM
1185 SLOG(g_log << Logger::Info << "Listening for TCP queries on " << address.toStringWithPort() << endl,
1186 log->info(Logr::Info, "Listening for queries", "protocol", Logging::Loggable("TCP"), "address", Logging::Loggable(address)));
1187
73e77f7f 1188#ifdef TCP_DEFER_ACCEPT
ddbd9c34 1189 first = false;
73e77f7f 1190#endif
bc7a7b24
OM
1191 }
1192}