From bc7a7b248ecb8e9d53654b9ab7dd385f73c21c65 Mon Sep 17 00:00:00 2001 From: Otto Moerbeek Date: Tue, 11 Jan 2022 10:01:45 +0100 Subject: [PATCH] Introduce rec-main.hh and split out TCP code to rec-tcp.cc --- .not-formatted | 1 - pdns/lua-recursor4.cc | 1 + pdns/lua-recursor4.hh | 3 - pdns/pdns_recursor.cc | 1221 +-------------------------------- pdns/rec_channel_rec.cc | 1 + pdns/recursordist/Makefile.am | 2 + pdns/recursordist/rec-main.cc | 24 + pdns/recursordist/rec-main.hh | 222 ++++++ pdns/recursordist/rec-tcp.cc | 1077 +++++++++++++++++++++++++++++ pdns/syncres.hh | 2 - pdns/ws-recursor.cc | 1 + 11 files changed, 1354 insertions(+), 1201 deletions(-) create mode 100644 pdns/recursordist/rec-main.cc create mode 100644 pdns/recursordist/rec-main.hh create mode 100644 pdns/recursordist/rec-tcp.cc diff --git a/.not-formatted b/.not-formatted index fb2b51ceab..0dfb64d254 100644 --- a/.not-formatted +++ b/.not-formatted @@ -226,7 +226,6 @@ ./pdns/packethandler.cc ./pdns/packethandler.hh ./pdns/pdns_hw.cc -./pdns/pdns_recursor.cc ./pdns/pdnsexception.hh ./pdns/pdnsutil.cc ./pdns/pkcs11signers.cc diff --git a/pdns/lua-recursor4.cc b/pdns/lua-recursor4.cc index b0731d4822..b346c3f413 100644 --- a/pdns/lua-recursor4.cc +++ b/pdns/lua-recursor4.cc @@ -31,6 +31,7 @@ #include "filterpo.hh" #include "rec-snmp.hh" #include +#include "rec-main.hh" RecursorLua4::RecursorLua4() { prepareContext(); } diff --git a/pdns/lua-recursor4.hh b/pdns/lua-recursor4.hh index c877725982..2fabcc06c3 100644 --- a/pdns/lua-recursor4.hh +++ b/pdns/lua-recursor4.hh @@ -41,9 +41,6 @@ #include "lua-recursor4-ffi.hh" -PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query); -unsigned int getRecursorThreadId(); - // pdns_ffi_param_t is a lightuserdata template <> struct LuaContext::Pusher diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 090bb4f16c..5853c1339b 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -19,9 +19,8 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif + +#include "rec-main.hh" #include #include @@ -117,15 +116,12 @@ #include "xpf.hh" #include "rec-eventtrace.hh" -typedef map tcpClientCounts_t; - -static thread_local std::shared_ptr t_pdl; -static thread_local unsigned int t_id = 0; +thread_local std::shared_ptr t_pdl; +thread_local unsigned int t_id = 0; static thread_local std::shared_ptr t_traceRegex; -static thread_local std::unique_ptr t_tcpClientCounts; -static thread_local std::shared_ptr>> t_protobufServers{nullptr}; +thread_local std::shared_ptr>> t_protobufServers{nullptr}; static thread_local uint64_t t_protobufServersGeneration; -static thread_local std::shared_ptr>> t_outgoingProtobufServers{nullptr}; +thread_local std::shared_ptr>> t_outgoingProtobufServers{nullptr}; static thread_local uint64_t t_outgoingProtobufServersGeneration; #ifdef HAVE_FSTRM @@ -150,8 +146,6 @@ thread_local std::shared_ptr t_udrDBp; #endif /* NOD_ENABLED */ __thread struct timeval g_now; // timestamp, updated (too) frequently -typedef vector > > deferredAdd_t; - // for communicating with our threads // effectively readonly after startup struct RecThreadInfo @@ -196,7 +190,6 @@ static std::vector s_threadInfos; /* without reuseport, all listeners share the same sockets */ static deferredAdd_t g_deferredAdds; -typedef vector tcpListenSockets_t; typedef map listenSocketsAddresses_t; // is shared across all threads right now enum class PaddingMode { Always, PaddedQueries }; @@ -207,32 +200,28 @@ static std::shared_ptr g_initialDomainMap; // new threads static std::shared_ptr g_initialAllowFrom; // new thread needs to be setup with this static std::shared_ptr g_initialAllowNotifyFrom; // new threads need this to be setup static std::shared_ptr g_initialAllowNotifyFor; // new threads need this to be setup -static NetmaskGroup g_XPFAcl; +NetmaskGroup g_XPFAcl; static NetmaskGroup g_proxyProtocolACL; -static NetmaskGroup g_paddingFrom; +NetmaskGroup g_paddingFrom; static boost::optional g_dns64Prefix{boost::none}; static DNSName g_dns64PrefixReverse; -static size_t g_proxyProtocolMaximumSize; -static size_t g_tcpMaxQueriesPerConn; +size_t g_proxyProtocolMaximumSize; static size_t s_maxUDPQueriesPerRound; static uint64_t g_latencyStatSize; static uint32_t g_disthashseed; -static unsigned int g_maxTCPPerClient; -static unsigned int g_maxMThreads; +unsigned int g_maxMThreads; static unsigned int g_numDistributorThreads; static unsigned int g_numWorkerThreads; -static unsigned int g_paddingTag; -static int g_tcpTimeout; +unsigned int g_paddingTag; static uint16_t g_udpTruncationThreshold; static uint16_t g_xpfRRCode{0}; static PaddingMode g_paddingMode; static std::atomic statsWanted; -static std::atomic g_quiet; -static bool g_logCommonErrors; -static bool g_anyToTcp; +std::atomic g_quiet; +bool g_logCommonErrors; static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers -static bool g_reusePort{false}; -static bool g_gettagNeedsEDNSOptions{false}; +bool g_reusePort{false}; +bool g_gettagNeedsEDNSOptions{false}; static time_t g_statisticsInterval; static bool g_useIncomingECS; static bool g_useKernelTimestamp; @@ -290,98 +279,6 @@ struct ThreadMSG bool wantAnswer; }; -//! used to send information to a newborn mthread -struct DNSComboWriter { - DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query) - { - } - - DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set&& policyTags, LuaContext::LuaObject&& data, std::vector&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data)) - { - } - - void setRemote(const ComboAddress& sa) - { - d_remote=sa; - } - - void setSource(const ComboAddress& sa) - { - d_source=sa; - } - - void setLocal(const ComboAddress& sa) - { - d_local=sa; - } - - void setDestination(const ComboAddress& sa) - { - d_destination=sa; - } - - void setSocket(int sock) - { - d_socket=sock; - } - - string getRemote() const - { - if (d_source == d_remote) { - return d_source.toStringWithPort(); - } - return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")"; - } - - std::vector d_proxyProtocolValues; - MOADNSParser d_mdp; - struct timeval d_now; - /* Remote client, might differ from d_source - in case of XPF, in which case d_source holds - the IP of the client and d_remote of the proxy - */ - ComboAddress d_remote; - ComboAddress d_source; - /* Destination address, might differ from - d_destination in case of XPF, in which case - d_destination holds the IP of the proxy and - d_local holds our own. */ - ComboAddress d_local; - ComboAddress d_destination; - RecEventTrace d_eventTrace; - boost::uuids::uuid d_uuid; - string d_requestorId; - string d_deviceId; - string d_deviceName; - struct timeval d_kernelTimestamp{0,0}; - std::string d_query; - std::unordered_set d_policyTags; - std::string d_routingTag; - std::vector d_records; - LuaContext::LuaObject d_data; - EDNSSubnetOpts d_ednssubnet; - shared_ptr d_tcpConnection; - boost::optional d_extendedErrorCode{boost::none}; - string d_extendedErrorExtra; - boost::optional d_rcode{boost::none}; - int d_socket{-1}; - unsigned int d_tag{0}; - uint32_t d_qhash{0}; - uint32_t d_ttlCap{std::numeric_limits::max()}; - bool d_variable{false}; - bool d_ecsFound{false}; - bool d_ecsParsed{false}; - bool d_followCNAMERecords{false}; - bool d_logResponse{false}; - bool d_tcp{false}; - bool d_responsePaddingDisabled{false}; - std::map d_meta; -}; - -MT_t* getMT() -{ - return MT ? MT.get() : nullptr; -} ArgvMap &arg() { @@ -389,10 +286,6 @@ ArgvMap &arg() return theArg; } -unsigned int getRecursorThreadId() -{ - return t_id; -} static bool isDistributorThread() { @@ -412,136 +305,6 @@ static bool isHandlerThread() return s_threadInfos.at(t_id).isHandler; } -#if 0 -#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0) -#else -#define TCPLOG(pid, x) -#endif - -static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var); -static void TCPIOHandlerStateChange(IOState, IOState, std::shared_ptr&); - -LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr& handler) -{ - TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl); - - auto pident = std::make_shared(); - pident->tcphandler = handler; - pident->tcpsock = handler->getDescriptor(); - pident->outMSG = data; - pident->highState = TCPAction::DoingWrite; - - IOState state; - try { - TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> "); - state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size()); - TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl); - - if (state == IOState::Done) { - TCPLOG(pident->tcpsock, "asendtcp success A" << endl); - return LWResult::Result::Success; - } - } - catch (const std::exception& e) { - TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl); - return LWResult::Result::PermanentError; - } - - // Will set pident->lowState - TCPIOHandlerStateChange(IOState::Done, state, pident); - - PacketBuffer packet; - int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec); - TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' '); - if (ret == 0) { - TCPLOG(pident->tcpsock, "timeout" << endl); - TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); - return LWResult::Result::Timeout; - } - else if (ret == -1) { // error - TCPLOG(pident->tcpsock, "PermanentError" << endl); - TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); - return LWResult::Result::PermanentError; - } - else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error - // fd housekeeping done by TCPIOHandlerIO - TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl); - return LWResult::Result::PermanentError; - } - - TCPLOG(pident->tcpsock, "asendtcp success" << endl); - return LWResult::Result::Success; -} - -LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr& handler, const bool incompleteOkay) -{ - TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl); - data.resize(len); - - // We might have data already available from the TLS layer, try to get that into the buffer - size_t pos = 0; - IOState state; - try { - TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl); - state = handler->tryRead(data, pos, len); - TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl); - switch (state) { - case IOState::Done: - case IOState::NeedRead: - if (pos == len || (incompleteOkay && pos > 0)) { - data.resize(pos); - TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl); - return LWResult::Result::Success; - } - break; - case IOState::NeedWrite: - break; - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - } - catch (const std::exception& e) { - TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl); - return LWResult::Result::PermanentError; - } - - auto pident = std::make_shared(); - pident->tcphandler = handler; - pident->tcpsock = handler->getDescriptor(); - // We might have a partial result - pident->inMSG = std::move(data); - pident->inPos = pos; - pident->inWanted = len; - pident->inIncompleteOkay = incompleteOkay; - pident->highState = TCPAction::DoingRead; - - data.clear(); - - // Will set pident->lowState - TCPIOHandlerStateChange(IOState::Done, state, pident); - - int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec); - TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' ); - if (ret == 0) { - TCPLOG(pident->tcpsock, "timeout" << endl); - TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); - return LWResult::Result::Timeout; - } - else if (ret == -1) { - TCPLOG(pident->tcpsock, "PermanentError" << endl); - TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); - return LWResult::Result::PermanentError; - } - else if (data.empty()) {// error, EOF or other - // fd housekeeping done by TCPIOHandlerIO - TCPLOG(pident->tcpsock, "EOF" << endl); - return LWResult::Result::PermanentError; - } - - TCPLOG(pident->tcpsock, "arecvtcp success" << endl); - return LWResult::Result::Success; -} static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var) { @@ -818,157 +581,8 @@ static void writePid(void) } } -uint16_t TCPConnection::s_maxInFlight; - -TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd) -{ - ++s_currentConnections; - (*t_tcpClientCounts)[d_remote]++; -} - -TCPConnection::~TCPConnection() -{ - try { - if(closesocket(d_fd) < 0) - g_log<count(d_remote) && !(*t_tcpClientCounts)[d_remote]--) - t_tcpClientCounts->erase(d_remote); - --s_currentConnections; -} - -std::atomic TCPConnection::s_currentConnections; - -static void terminateTCPConnection(int fd) -{ - try { - t_fdm->removeReadFD(fd); - } - catch (const FDMultiplexerException& fde) - { - } -} - -/* this function is called with both a string and a vector representing a packet */ -template -static bool sendResponseOverTCP(const std::unique_ptr& dc, const T& packet) -{ - uint8_t buf[2]; - buf[0] = packet.size() / 256; - buf[1] = packet.size() % 256; - - Utility::iovec iov[2]; - iov[0].iov_base = (void*)buf; iov[0].iov_len = 2; - iov[1].iov_base = (void*)&*packet.begin(); iov[1].iov_len = packet.size(); - - int wret = Utility::writev(dc->d_socket, iov, 2); - bool hadError = true; - - if (wret == 0) { - g_log<getRemote()<getRemote() << ": " << strerror(err) << endl; - } else if ((unsigned int)wret != 2 + packet.size()) { - g_log<getRemote()<<" for "<d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<& dc, int rcode) -{ - std::vector packet; - if (dc->d_mdp.d_header.qdcount == 0) { - /* header-only */ - packet.resize(sizeof(dnsheader)); - } - else { - DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass); - if (dc->d_mdp.hasEDNS()) { - /* we try to add the EDNS OPT RR even for truncated answers, - as rfc6891 states: - "The minimal response MUST be the DNS header, question section, and an - OPT record. This MUST also occur when a truncated response (using - the DNS header's TC bit) is returned." - */ - pw.addOpt(512, 0, 0); - pw.commit(); - } - } - - dnsheader& header = reinterpret_cast(packet.at(0)); - header.aa = 0; - header.ra = 1; - header.qr = 1; - header.tc = 0; - header.id = dc->d_mdp.d_header.id; - header.rd = dc->d_mdp.d_header.rd; - header.cd = dc->d_mdp.d_header.cd; - header.rcode = rcode; - - sendResponseOverTCP(dc, packet); -} - -static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var); -static void finishTCPReply(std::unique_ptr& dc, bool hadError, bool updateInFlight) -{ - // update tcp connection status, closing if needed and doing the fd multiplexer accounting - if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight > 0) { - dc->d_tcpConnection->d_requestsInFlight--; - } - - // In the code below, we try to remove the fd from the set, but - // we don't know if another mthread already did the remove, so we can get a - // "Tried to remove unlisted fd" exception. Not that an inflight < limit test - // will not work since we do not know if the other mthread got an error or not. - if (hadError) { - terminateTCPConnection(dc->d_socket); - dc->d_socket = -1; - return; - } - dc->d_tcpConnection->queriesCount++; - if ((g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || - (dc->d_tcpConnection->isDropOnIdle() && dc->d_tcpConnection->d_requestsInFlight == 0)) { - try { - t_fdm->removeReadFD(dc->d_socket); - } - catch (FDMultiplexerException &) { - } - dc->d_socket = -1; - return; - } - Utility::gettimeofday(&g_now, nullptr); // needs to be updated - struct timeval ttd = g_now; - - // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back - if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) { - // A read error might have happened. If we add the fd back, it will most likely error again. - // This is not a big issue, the next handleTCPClientReadable() will see another read error - // and take action. - ttd.tv_sec += g_tcpTimeout; - t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); - return; - } - // fd might have been removed by read error code, or a read timeout, so expect an exception - try { - t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout); - } - catch (const FDMultiplexerException &) { - // but if the FD was removed because of a timeout while we were sending a response, - // we need to re-arm it. If it was an error it will error again. - ttd.tv_sec += g_tcpTimeout; - t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); - } -} // the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming. static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype) @@ -1003,7 +617,7 @@ catch(...) return "Exception making error message for exception"; } -static void protobufLogQuery(LocalStateHolder& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map& meta) +void protobufLogQuery(LocalStateHolder& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map& meta) { if (!t_protobufServers) { return; @@ -1035,7 +649,7 @@ static void protobufLogQuery(LocalStateHolder& luaconfsLocal, co } } -static void protobufLogResponse(pdns::ProtoZero::RecMessage& message) +void protobufLogResponse(pdns::ProtoZero::RecMessage& message) { if (!t_protobufServers) { return; @@ -1047,7 +661,7 @@ static void protobufLogResponse(pdns::ProtoZero::RecMessage& message) } } -static void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder& luaconfsLocal, +void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder& luaconfsLocal, const RecursorPacketCache::OptPBData& pbData, const struct timeval& tv, bool tcp, const ComboAddress& source, const ComboAddress& destination, const EDNSSubnetOpts& ednssubnet, @@ -1284,7 +898,7 @@ static std::shared_ptr>> startProtobuf return result; } -static bool checkProtobufExport(LocalStateHolder& luaconfsLocal) +bool checkProtobufExport(LocalStateHolder& luaconfsLocal) { if (!luaconfsLocal->protobufExportConfig.enabled) { if (t_protobufServers) { @@ -1386,7 +1000,7 @@ static std::shared_ptr>> startFra return result; } -static bool checkFrameStreamExport(LocalStateHolder& luaconfsLocal) +bool checkFrameStreamExport(LocalStateHolder& luaconfsLocal) { if (!luaconfsLocal->frameStreamExportConfig.enabled) { if (t_frameStreamServers) { @@ -1617,7 +1231,7 @@ static bool answerIsNOData(uint16_t requestedType, int rcode, const std::vector< return true; } -static bool isAllowNotifyForZone(DNSName qname) +bool isAllowNotifyForZone(DNSName qname) { if (t_allowNotifyFor->empty()) { return false; @@ -1632,7 +1246,7 @@ static bool isAllowNotifyForZone(DNSName qname) return false; } -static void startDoResolve(void *p) +void startDoResolve(void *p) { auto dc=std::unique_ptr(reinterpret_cast(p)); try { @@ -2533,7 +2147,7 @@ static void makeControlChannelSocket(int processNum=-1) } } -static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, +void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options, bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest) { @@ -2604,7 +2218,7 @@ static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uin } } -static bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data, +bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data, DNSName& qname, uint16_t& qtype, uint16_t& qclass, const struct timeval& now, string& response, uint32_t& qhash, @@ -2656,7 +2270,7 @@ static void* pleaseWipeCaches(const DNSName& canon, bool subtree, uint16_t qtype return nullptr; } -static void requestWipeCaches(const DNSName& canon) +void requestWipeCaches(const DNSName& canon) { // send a message to the handler thread asking it // to wipe all of the caches @@ -2670,497 +2284,11 @@ static void requestWipeCaches(const DNSName& canon) } } -/* - * A helper class that by default closes the incoming TCP connection on destruct - * If you want to keep the connection alive, call keep() on the guard object - */ -class RunningTCPQuestionGuard { -public: - RunningTCPQuestionGuard(int fd) - { - d_fd = fd; - } - ~RunningTCPQuestionGuard() - { - if (d_fd != -1) { - terminateTCPConnection(d_fd); - d_fd = -1; - } - } - void keep() - { - d_fd = -1; - } - bool handleTCPReadResult(int fd, ssize_t bytes) - { - if (bytes == 0) { - /* EOF */ - return false; - } - else if (bytes < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - return false; - } - } - keep(); - return true; - } - -private: - int d_fd{-1}; -}; - -static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) -{ - shared_ptr conn=boost::any_cast >(var); - - RunningTCPQuestionGuard tcpGuard{fd}; - - if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) { - ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0); - if (bytes <= 0) { - tcpGuard.handleTCPReadResult(fd, bytes); - return; - } - - conn->proxyProtocolGot += bytes; - conn->data.resize(conn->proxyProtocolGot); - ssize_t remaining = isProxyHeaderComplete(conn->data); - if (remaining == 0) { - if (g_logCommonErrors) { - g_log<d_remote.toStringWithPort() <proxyProtocolNeed = -remaining; - conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed); - tcpGuard.keep(); - return; - } - else { - /* proxy header received */ - /* we ignore the TCP field for now, but we could properly set whether - the connection was received over UDP or TCP if needed */ - bool tcp; - bool proxy = false; - size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues); - if (used <= 0) { - if (g_logCommonErrors) { - g_log<d_remote.toStringWithPort() <(used) > g_proxyProtocolMaximumSize) { - if (g_logCommonErrors) { - g_log<d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl; - } - ++g_stats.proxyProtocolInvalidCount; - return; - } - - /* Now that we have retrieved the address of the client, as advertised by the proxy - via the proxy protocol header, check that it is allowed by our ACL */ - /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */ - if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) { - if (!g_quiet) { - g_log<getTid()<<"] dropping TCP query from "<d_source.toString()<<", address not matched by allow-from"<data.resize(2); - conn->state = TCPConnection::BYTE0; - } - } - - if (conn->state==TCPConnection::BYTE0) { - ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0); - if(bytes==1) - conn->state=TCPConnection::BYTE1; - if(bytes==2) { - conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1]; - conn->data.resize(conn->qlen); - conn->bytesread=0; - conn->state=TCPConnection::GETQUESTION; - } - if (bytes <= 0) { - tcpGuard.handleTCPReadResult(fd, bytes); - return; - } - } - - if (conn->state==TCPConnection::BYTE1) { - ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0); - if(bytes==1) { - conn->state=TCPConnection::GETQUESTION; - conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1]; - conn->data.resize(conn->qlen); - conn->bytesread=0; - } - if (bytes <= 0) { - if (!tcpGuard.handleTCPReadResult(fd, bytes)) { - if(g_logCommonErrors) { - g_log<d_remote.toStringWithPort() <<" disconnected after first byte"<state==TCPConnection::GETQUESTION) { - ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0); - if (bytes <= 0) { - if (!tcpGuard.handleTCPReadResult(fd, bytes)) { - if(g_logCommonErrors) { - g_log<d_remote.toStringWithPort() <<" disconnected while reading question body"< std::numeric_limits::max()) { - if(g_logCommonErrors) { - g_log<d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<bytesread+=(uint16_t)bytes; - if(conn->bytesread==conn->qlen) { - conn->state = TCPConnection::BYTE0; - std::unique_ptr dc; - try { - dc = std::make_unique(conn->data, g_now); - } - catch(const MOADNSException &mde) { - g_stats.clientParseError++; - if (g_logCommonErrors) { - g_log<d_remote.toStringWithPort() <d_tcpConnection = conn; // carry the torch - dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd - dc->d_tcp=true; - dc->setRemote(conn->d_remote); - dc->setSource(conn->d_source); - ComboAddress dest; - dest.reset(); - dest.sin4.sin_family = conn->d_remote.sin4.sin_family; - socklen_t len = dest.getSocklen(); - getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it - dc->setLocal(dest); - dc->setDestination(conn->d_destination); - /* we can't move this if we want to be able to access the values in - all queries sent over this connection */ - dc->d_proxyProtocolValues = conn->proxyProtocolValues; - - struct timeval start; - Utility::gettimeofday(&start, nullptr); - - DNSName qname; - uint16_t qtype=0; - uint16_t qclass=0; - bool needECS = false; - bool needXPF = g_XPFAcl.match(conn->d_remote); - string requestorId; - string deviceId; - string deviceName; - bool logQuery = false; - bool qnameParsed = false; - - dc->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled); - dc->d_eventTrace.add(RecEventTrace::ReqRecv); - auto luaconfsLocal = g_luaconfs.getLocal(); - if (checkProtobufExport(luaconfsLocal)) { - needECS = true; - } - logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries; - dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses; - -#ifdef HAVE_FSTRM - checkFrameStreamExport(luaconfsLocal); -#endif - - if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) { - - try { - EDNSOptionViewMap ednsOptions; - bool xpfFound = false; - dc->d_ecsParsed = true; - dc->d_ecsFound = false; - getQNameAndSubnet(conn->data, &qname, &qtype, &qclass, - dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr, - xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr); - qnameParsed = true; - - if(t_pdl) { - try { - if (t_pdl->d_gettag_ffi) { - RecursorLua4::FFIParams params(qname, qtype, dc->d_destination, dc->d_source, dc->d_ednssubnet.source, dc->d_data, dc->d_policyTags, dc->d_records, ednsOptions, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, true, logQuery, dc->d_logResponse, dc->d_followCNAMERecords, dc->d_extendedErrorCode, dc->d_extendedErrorExtra, dc->d_responsePaddingDisabled, dc->d_meta); - dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI); - dc->d_tag = t_pdl->gettag_ffi(params); - dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, dc->d_tag, false); - } - else if (t_pdl->d_gettag) { - dc->d_eventTrace.add(RecEventTrace::LuaGetTag); - dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues); - dc->d_eventTrace.add(RecEventTrace::LuaGetTag, dc->d_tag, false); - } - } - catch(const std::exception& e) { - if(g_logCommonErrors) { - g_log<d_tag == 0 && !dc->d_responsePaddingDisabled && g_paddingFrom.match(dc->d_remote)) { - dc->d_tag = g_paddingTag; - } - - const struct dnsheader* dh = reinterpret_cast(&conn->data[0]); - - if (t_protobufServers || t_outgoingProtobufServers) { - dc->d_requestorId = requestorId; - dc->d_deviceId = deviceId; - dc->d_deviceName = deviceName; - dc->d_uuid = getUniqueID(); - } - - if(t_protobufServers) { - try { - - if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) { - protobufLogQuery(luaconfsLocal, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta); - } - } - catch (const std::exception& e) { - if (g_logCommonErrors) { - g_log<ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace); - if (ipf) { - if (!g_quiet) { - g_log<getTid()<<"/"<numProcesses()<<"] DROPPED TCP question from "<d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<d_mdp.d_header.qr) { - g_stats.ignoredCount++; - if (g_logCommonErrors) { - g_log<getRemote() <<" on server socket!"<d_mdp.d_header.opcode != Opcode::Query && dc->d_mdp.d_header.opcode != Opcode::Notify) { - g_stats.ignoredCount++; - if (g_logCommonErrors) { - g_log<d_mdp.d_header.opcode)<<" from TCP client "<< dc->getRemote() <<" on server socket!"<qdcount == 0) { - g_stats.emptyQueriesCount++; - if (g_logCommonErrors) { - g_log<getRemote() <<" on server socket!"<d_mdp.d_header.opcode == Opcode::Notify) { - if(!t_allowNotifyFrom || !t_allowNotifyFrom->match(dc->d_source)) { - if(!g_quiet) { - g_log<getTid()<<"] dropping TCP NOTIFY from "<d_source.toString()<<", address not matched by allow-notify-from"<getTid()<<"] dropping TCP NOTIFY from "<d_source.toString()<<", for "<d_mdp.d_header.opcode == Opcode::Query) { - /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable, - but it means that the hash would not be computed. If some script decides at a later time to mark back the answer - as cacheable we would cache it with a wrong tag, so better safe than sorry. */ - dc->d_eventTrace.add(RecEventTrace::PCacheCheck); - bool cacheHit = checkForCacheHit(qnameParsed, dc->d_tag, conn->data, qname, qtype, qclass, g_now, response, dc->d_qhash, pbData, true, dc->d_source); - dc->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false); - - if (cacheHit) { - if (!g_quiet) { - g_log<d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<d_eventTrace.add(RecEventTrace::AnswerSent); - - if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) { - struct timeval tv{0, 0}; - protobufLogResponse(dh, luaconfsLocal, pbData, tv, true, dc->d_source, dc->d_destination, dc->d_ednssubnet, dc->d_uuid, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta, dc->d_eventTrace); - } - - if (dc->d_eventTrace.enabled() && SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) { - g_log << Logger::Info << dc->d_eventTrace.toString() << endl; - } - tcpGuard.keep(); - return; - } // cache hit - } // query opcode - - if(dc->d_mdp.d_header.opcode == Opcode::Notify) { - if (!g_quiet) { - g_log<d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<opcode, but we need to ensure that the response - // to this request does not get put into the packet cache - dc->d_variable = true; - } - - // setup for startDoResolve() in an mthread - ++conn->d_requestsInFlight; - if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) { - t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read - } else { - Utility::gettimeofday(&g_now, nullptr); // needed? - struct timeval ttd = g_now; - t_fdm->setReadTTD(fd, ttd, g_tcpTimeout); - } - tcpGuard.keep(); - MT->makeThread(startDoResolve, dc.release()); // deletes dc - } // good query - } // read full query - } // reading query - - // more to come - tcpGuard.keep(); -} - -static bool expectProxyProtocol(const ComboAddress& from) +bool expectProxyProtocol(const ComboAddress& from) { return g_proxyProtocolACL.match(from); } -//! Handle new incoming TCP connection -static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& ) -{ - ComboAddress addr; - socklen_t addrlen=sizeof(addr); - int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen); - if(newsock>=0) { - if(MT->numProcesses() > g_maxMThreads) { - g_stats.overCapacityDrops++; - try { - closesocket(newsock); - } - catch(const PDNSException& e) { - g_log<push_back(addr); - } - - bool fromProxyProtocolSource = expectProxyProtocol(addr); - if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) { - if(!g_quiet) - g_log<getTid()<<"] dropping TCP query from "<count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) { - g_stats.tcpClientOverflow++; - try { - closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet! - } - catch(const PDNSException& e) { - g_log< tc = std::make_shared(newsock, addr); - tc->d_source = addr; - tc->d_destination.reset(); - tc->d_destination.sin4.sin_family = addr.sin4.sin_family; - socklen_t len = tc->d_destination.getSocklen(); - getsockname(tc->getFD(), reinterpret_cast(&tc->d_destination), &len); // if this fails, we're ok with it - - if (fromProxyProtocolSource) { - tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize; - tc->data.resize(tc->proxyProtocolNeed); - tc->state = TCPConnection::PROXYPROTOCOLHEADER; - } - else { - tc->state = TCPConnection::BYTE0; - } - - struct timeval ttd; - Utility::gettimeofday(&ttd, nullptr); - ttd.tv_sec += g_tcpTimeout; - - t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd); - } -} static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector& proxyProtocolValues, RecEventTrace& eventTrace) { @@ -3616,145 +2744,6 @@ static void handleNewUDPQuestion(int fd, FDMultiplexer::funcparam_t& var) } } -static void checkFastOpenSysctl(bool active) -{ -#ifdef __linux__ - string line; - if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) { - int flag = std::stoi(line); - if (active && !(flag & 1)) { - g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl; - } - if (!active && !(flag & 2)) { - g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl; - } - } - else { - g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl; - } -#else - g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl; -#endif -} - -static void checkTFOconnect() -{ - try { - Socket s(AF_INET, SOCK_STREAM); - s.setNonBlocking(); - s.setFastOpenConnect(); - } - catch (const NetworkError& e) { - g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl; - } -} - -static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set& tcpSockets) -{ - int fd; - vectorlocals; - stringtok(locals,::arg()["local-address"]," ,"); - - if(locals.empty()) - throw PDNSException("No local address specified"); - - for(vector::const_iterator i=locals.begin();i!=locals.end();++i) { - ServiceTuple st; - st.port=::arg().asNum("local-port"); - parseService(*i, st); - - ComboAddress sin; - - sin.reset(); - sin.sin4.sin_family = AF_INET; - if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) { - sin.sin6.sin6_family = AF_INET6; - if(makeIPv6sockaddr(st.host, &sin.sin6) < 0) - throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'"); - } - - fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0); - if(fd<0) - throw PDNSException("Making a TCP server socket for resolver: "+stringerror()); - - setCloseOnExec(fd); - - int tmp=1; - if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) { - g_log<= 0) { - if(i==locals.begin()) - g_log< 0) { - checkFastOpenSysctl(false); -#ifdef TCP_FASTOPEN - if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) { - int err = errno; - g_log<& pid) -{ - TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl); - - pid->lowState = newstate; - - // handle state transitions - switch (oldstate) { - case IOState::NeedRead: - - switch (newstate) { - case IOState::NeedWrite: - TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl); - t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid); - break; - case IOState::NeedRead: - break; - case IOState::Done: - TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl); - t_fdm->removeReadFD(pid->tcpsock); - break; - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - break; - - case IOState::NeedWrite: - - switch (newstate) { - case IOState::NeedRead: - TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl); - t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid); - break; - case IOState::NeedWrite: - break; - case IOState::Done: - TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl); - t_fdm->removeWriteFD(pid->tcpsock); - break; - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - break; - - case IOState::Done: - switch (newstate) { - case IOState::NeedRead: - TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl); - t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid); - break; - case IOState::NeedWrite: - TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl); - t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid); - break; - case IOState::Done: - break; - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - break; - - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - -} - -static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var) -{ - std::shared_ptr pid = boost::any_cast>(var); - assert(pid->tcphandler); - assert(fd == pid->tcphandler->getDescriptor()); - IOState newstate = IOState::Done; - - TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl); - - // In the code below, we want to update the state of the fd before calling sendEvent - // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd - - switch (pid->highState) { - case TCPAction::DoingRead: - TCPLOG(pid->tcpsock, "highState: Reading" << endl); - // In arecvtcp, the buffer was resized already so inWanted bytes will fit - // try reading - try { - newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted); - switch (newstate) { - case IOState::Done: - case IOState::NeedRead: - TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl); - TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl); - if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) { - pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case - newstate = IOState::Done; - TCPIOHandlerStateChange(pid->lowState, newstate, pid); - MT->sendEvent(pid, &pid->inMSG); - return; - } - break; - case IOState::NeedWrite: - break; - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - } - catch (const std::exception& e) { - newstate = IOState::Done; - TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl); - PacketBuffer empty; - TCPIOHandlerStateChange(pid->lowState, newstate, pid); - MT->sendEvent(pid, &empty); // this conveys error status - return; - } - break; - - case TCPAction::DoingWrite: - TCPLOG(pid->tcpsock, "highState: Writing" << endl); - try { - TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> "); - newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size()); - TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl); - switch (newstate) { - case IOState::Done: { - TCPLOG(pid->tcpsock, "tryWrite: Done" << endl); - TCPIOHandlerStateChange(pid->lowState, newstate, pid); - MT->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok - return; - } - case IOState::NeedRead: - TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl); - break; - case IOState::NeedWrite: - TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl); - break; - case IOState::Async: - throw std::runtime_error("TLS async mode not supported"); - break; - } - } - catch (const std::exception& e) { - newstate = IOState::Done; - TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl); - PacketBuffer sent; - TCPIOHandlerStateChange(pid->lowState, newstate, pid); - MT->sendEvent(pid, &sent); // we convey error status by sending empty string - return; - } - break; - } - - // Cases that did not end up doing a sendEvent - TCPIOHandlerStateChange(pid->lowState, newstate, pid); -} // resend event to everybody chained onto it static void doResends(MT_t::waiters_t::iterator& iter, const std::shared_ptr& resend, const PacketBuffer& content) diff --git a/pdns/rec_channel_rec.cc b/pdns/rec_channel_rec.cc index c87c4cc56b..8f325ae428 100644 --- a/pdns/rec_channel_rec.cc +++ b/pdns/rec_channel_rec.cc @@ -40,6 +40,7 @@ #include "namespaces.hh" #include "rec-taskqueue.hh" #include "rec-tcpout.hh" +#include "rec-main.hh" std::pair PrefixDashNumberCompare::prefixAndTrailingNum(const std::string& a) { diff --git a/pdns/recursordist/Makefile.am b/pdns/recursordist/Makefile.am index d1829d4b17..420c5de448 100644 --- a/pdns/recursordist/Makefile.am +++ b/pdns/recursordist/Makefile.am @@ -160,10 +160,12 @@ pdns_recursor_SOURCES = \ rcpgenerator.cc rcpgenerator.hh \ rec-carbon.cc \ rec-eventtrace.cc rec-eventtrace.hh \ + ec-main.hh rec-main.cc \ rec-lua-conf.hh rec-lua-conf.cc \ rec-protozero.cc rec-protozero.hh \ rec-snmp.hh rec-snmp.cc \ rec-taskqueue.cc rec-taskqueue.hh \ + rec-tcp.cc \ rec-tcpout.cc rec-tcpout.hh \ rec-zonetocache.cc rec-zonetocache.hh \ rec_channel.cc rec_channel.hh rec_metrics.hh \ diff --git a/pdns/recursordist/rec-main.cc b/pdns/recursordist/rec-main.cc new file mode 100644 index 0000000000..eff5f89ed5 --- /dev/null +++ b/pdns/recursordist/rec-main.cc @@ -0,0 +1,24 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "rec-main.hh" + diff --git a/pdns/recursordist/rec-main.hh b/pdns/recursordist/rec-main.hh new file mode 100644 index 0000000000..82604275d9 --- /dev/null +++ b/pdns/recursordist/rec-main.hh @@ -0,0 +1,222 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "logger.hh" +#include "lua-recursor4.hh" +#include "mplexer.hh" +#include "namespaces.hh" +#include "rec-lua-conf.hh" +#include "rec-protozero.hh" +#include "syncres.hh" + + +//! used to send information to a newborn mthread +struct DNSComboWriter { + DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query) + { + } + + DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set&& policyTags, LuaContext::LuaObject&& data, std::vector&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data)) + { + } + + void setRemote(const ComboAddress& sa) + { + d_remote=sa; + } + + void setSource(const ComboAddress& sa) + { + d_source=sa; + } + + void setLocal(const ComboAddress& sa) + { + d_local=sa; + } + + void setDestination(const ComboAddress& sa) + { + d_destination=sa; + } + + void setSocket(int sock) + { + d_socket=sock; + } + + string getRemote() const + { + if (d_source == d_remote) { + return d_source.toStringWithPort(); + } + return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")"; + } + + std::vector d_proxyProtocolValues; + MOADNSParser d_mdp; + struct timeval d_now; + /* Remote client, might differ from d_source + in case of XPF, in which case d_source holds + the IP of the client and d_remote of the proxy + */ + ComboAddress d_remote; + ComboAddress d_source; + /* Destination address, might differ from + d_destination in case of XPF, in which case + d_destination holds the IP of the proxy and + d_local holds our own. */ + ComboAddress d_local; + ComboAddress d_destination; + RecEventTrace d_eventTrace; + boost::uuids::uuid d_uuid; + string d_requestorId; + string d_deviceId; + string d_deviceName; + struct timeval d_kernelTimestamp{0,0}; + std::string d_query; + std::unordered_set d_policyTags; + std::string d_routingTag; + std::vector d_records; + LuaContext::LuaObject d_data; + EDNSSubnetOpts d_ednssubnet; + shared_ptr d_tcpConnection; + boost::optional d_extendedErrorCode{boost::none}; + string d_extendedErrorExtra; + boost::optional d_rcode{boost::none}; + int d_socket{-1}; + unsigned int d_tag{0}; + uint32_t d_qhash{0}; + uint32_t d_ttlCap{std::numeric_limits::max()}; + bool d_variable{false}; + bool d_ecsFound{false}; + bool d_ecsParsed{false}; + bool d_followCNAMERecords{false}; + bool d_logResponse{false}; + bool d_tcp{false}; + bool d_responsePaddingDisabled{false}; + std::map d_meta; +}; + + +typedef MTasker, PacketBuffer, PacketIDCompare> MT_t; +extern thread_local std::unique_ptr MT; // the big MTasker + +extern thread_local FDMultiplexer* t_fdm; +extern bool g_logCommonErrors; +extern size_t g_proxyProtocolMaximumSize; +extern std::atomic g_quiet; +extern NetmaskGroup g_XPFAcl; +extern thread_local std::shared_ptr>> t_protobufServers; +extern thread_local std::shared_ptr t_pdl; +extern bool g_gettagNeedsEDNSOptions; +extern NetmaskGroup g_paddingFrom; +extern unsigned int g_paddingTag; +extern thread_local std::shared_ptr>> t_outgoingProtobufServers; +extern unsigned int g_maxMThreads; +extern bool g_reusePort; +extern bool g_anyToTcp; +extern size_t g_tcpMaxQueriesPerConn; +extern unsigned int g_maxTCPPerClient; +extern int g_tcpTimeout; + +typedef map tcpClientCounts_t; +extern thread_local std::unique_ptr t_tcpClientCounts; + +typedef vector > > deferredAdd_t; + +inline MT_t* getMT() +{ + return MT ? MT.get() : nullptr; +} + +extern thread_local unsigned int t_id; + +inline unsigned int getRecursorThreadId() +{ + return t_id; +} + +/* this function is called with both a string and a vector representing a packet */ +template +static bool sendResponseOverTCP(const std::unique_ptr& dc, const T& packet) +{ + uint8_t buf[2]; + buf[0] = packet.size() / 256; + buf[1] = packet.size() % 256; + + Utility::iovec iov[2]; + iov[0].iov_base = (void*)buf; iov[0].iov_len = 2; + iov[1].iov_base = (void*)&*packet.begin(); iov[1].iov_len = packet.size(); + + int wret = Utility::writev(dc->d_socket, iov, 2); + bool hadError = true; + + if (wret == 0) { + g_log<getRemote()<getRemote() << ": " << strerror(err) << endl; + } else if ((unsigned int)wret != 2 + packet.size()) { + g_log<getRemote()<<" for "<d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<& luaconfsLocal); +bool checkFrameStreamExport(LocalStateHolder& luaconfsLocal); +void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass, + bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options, + bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest); +void protobufLogQuery(LocalStateHolder& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map& meta); +bool isAllowNotifyForZone(DNSName qname); +bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data, + DNSName& qname, uint16_t& qtype, uint16_t& qclass, + const struct timeval& now, + string& response, uint32_t& qhash, + RecursorPacketCache::OptPBData& pbData, bool tcp, const ComboAddress& source); +void protobufLogResponse(pdns::ProtoZero::RecMessage& message); +void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder& luaconfsLocal, + const RecursorPacketCache::OptPBData& pbData, const struct timeval& tv, + bool tcp, const ComboAddress& source, const ComboAddress& destination, + const EDNSSubnetOpts& ednssubnet, + const boost::uuids::uuid& uniqueId, const string& requestorId, const string& deviceId, + const string& deviceName, const std::map& meta, + const RecEventTrace& eventTrace); +void requestWipeCaches(const DNSName& canon); +void startDoResolve(void *p); +bool expectProxyProtocol(const ComboAddress& from); +void finishTCPReply(std::unique_ptr& dc, bool hadError, bool updateInFlight); +void checkFastOpenSysctl(bool active); +void checkTFOconnect(); +void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set& tcpSockets); +void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& ); diff --git a/pdns/recursordist/rec-tcp.cc b/pdns/recursordist/rec-tcp.cc new file mode 100644 index 0000000000..86740f347f --- /dev/null +++ b/pdns/recursordist/rec-tcp.cc @@ -0,0 +1,1077 @@ +/* + * This file is part of PowerDNS or dnsdist. + * Copyright -- PowerDNS.COM B.V. and its contributors + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In addition, for the avoidance of any doubt, permission is granted to + * link this program with OpenSSL and to (re)distribute the binaries + * produced as the result of such linking. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +#include "rec-main.hh" + +#include "arguments.hh" +#include "logger.hh" +#include "mplexer.hh" +#include "uuid-utils.hh" + +size_t g_tcpMaxQueriesPerConn; +unsigned int g_maxTCPPerClient; +int g_tcpTimeout; +bool g_anyToTcp; + +uint16_t TCPConnection::s_maxInFlight; +std::atomic TCPConnection::s_currentConnections; + +typedef map tcpClientCounts_t; +thread_local std::unique_ptr t_tcpClientCounts; + +static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var); + +#if 0 +#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0) +#else +#define TCPLOG(pid, x) +#endif + +TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd) +{ + ++s_currentConnections; + (*t_tcpClientCounts)[d_remote]++; +} + +TCPConnection::~TCPConnection() +{ + try { + if(closesocket(d_fd) < 0) + g_log<count(d_remote) && !(*t_tcpClientCounts)[d_remote]--) + t_tcpClientCounts->erase(d_remote); + --s_currentConnections; +} + + +static void terminateTCPConnection(int fd) +{ + try { + t_fdm->removeReadFD(fd); + } + catch (const FDMultiplexerException& fde) + { + } +} + +static void sendErrorOverTCP(std::unique_ptr& dc, int rcode) +{ + std::vector packet; + if (dc->d_mdp.d_header.qdcount == 0) { + /* header-only */ + packet.resize(sizeof(dnsheader)); + } + else { + DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass); + if (dc->d_mdp.hasEDNS()) { + /* we try to add the EDNS OPT RR even for truncated answers, + as rfc6891 states: + "The minimal response MUST be the DNS header, question section, and an + OPT record. This MUST also occur when a truncated response (using + the DNS header's TC bit) is returned." + */ + pw.addOpt(512, 0, 0); + pw.commit(); + } + } + + dnsheader& header = reinterpret_cast(packet.at(0)); + header.aa = 0; + header.ra = 1; + header.qr = 1; + header.tc = 0; + header.id = dc->d_mdp.d_header.id; + header.rd = dc->d_mdp.d_header.rd; + header.cd = dc->d_mdp.d_header.cd; + header.rcode = rcode; + + sendResponseOverTCP(dc, packet); +} + +void finishTCPReply(std::unique_ptr& dc, bool hadError, bool updateInFlight) +{ + // update tcp connection status, closing if needed and doing the fd multiplexer accounting + if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight > 0) { + dc->d_tcpConnection->d_requestsInFlight--; + } + + // In the code below, we try to remove the fd from the set, but + // we don't know if another mthread already did the remove, so we can get a + // "Tried to remove unlisted fd" exception. Not that an inflight < limit test + // will not work since we do not know if the other mthread got an error or not. + if (hadError) { + terminateTCPConnection(dc->d_socket); + dc->d_socket = -1; + return; + } + dc->d_tcpConnection->queriesCount++; + if ((g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || + (dc->d_tcpConnection->isDropOnIdle() && dc->d_tcpConnection->d_requestsInFlight == 0)) { + try { + t_fdm->removeReadFD(dc->d_socket); + } + catch (FDMultiplexerException &) { + } + dc->d_socket = -1; + return; + } + + Utility::gettimeofday(&g_now, nullptr); // needs to be updated + struct timeval ttd = g_now; + + // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back + if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) { + // A read error might have happened. If we add the fd back, it will most likely error again. + // This is not a big issue, the next handleTCPClientReadable() will see another read error + // and take action. + ttd.tv_sec += g_tcpTimeout; + t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); + return; + } + // fd might have been removed by read error code, or a read timeout, so expect an exception + try { + t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout); + } + catch (const FDMultiplexerException &) { + // but if the FD was removed because of a timeout while we were sending a response, + // we need to re-arm it. If it was an error it will error again. + ttd.tv_sec += g_tcpTimeout; + t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd); + } +} + +/* + * A helper class that by default closes the incoming TCP connection on destruct + * If you want to keep the connection alive, call keep() on the guard object + */ +class RunningTCPQuestionGuard { +public: + RunningTCPQuestionGuard(int fd) + { + d_fd = fd; + } + ~RunningTCPQuestionGuard() + { + if (d_fd != -1) { + terminateTCPConnection(d_fd); + d_fd = -1; + } + } + void keep() + { + d_fd = -1; + } + bool handleTCPReadResult(int fd, ssize_t bytes) + { + if (bytes == 0) { + /* EOF */ + return false; + } + else if (bytes < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + return false; + } + } + keep(); + return true; + } + +private: + int d_fd{-1}; +}; + +static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var) +{ + shared_ptr conn=boost::any_cast >(var); + + RunningTCPQuestionGuard tcpGuard{fd}; + + if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) { + ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0); + if (bytes <= 0) { + tcpGuard.handleTCPReadResult(fd, bytes); + return; + } + + conn->proxyProtocolGot += bytes; + conn->data.resize(conn->proxyProtocolGot); + ssize_t remaining = isProxyHeaderComplete(conn->data); + if (remaining == 0) { + if (g_logCommonErrors) { + g_log<d_remote.toStringWithPort() <proxyProtocolNeed = -remaining; + conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed); + tcpGuard.keep(); + return; + } + else { + /* proxy header received */ + /* we ignore the TCP field for now, but we could properly set whether + the connection was received over UDP or TCP if needed */ + bool tcp; + bool proxy = false; + size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues); + if (used <= 0) { + if (g_logCommonErrors) { + g_log<d_remote.toStringWithPort() <(used) > g_proxyProtocolMaximumSize) { + if (g_logCommonErrors) { + g_log<d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl; + } + ++g_stats.proxyProtocolInvalidCount; + return; + } + + /* Now that we have retrieved the address of the client, as advertised by the proxy + via the proxy protocol header, check that it is allowed by our ACL */ + /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */ + if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) { + if (!g_quiet) { + g_log<getTid()<<"] dropping TCP query from "<d_source.toString()<<", address not matched by allow-from"<data.resize(2); + conn->state = TCPConnection::BYTE0; + } + } + + if (conn->state==TCPConnection::BYTE0) { + ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0); + if(bytes==1) + conn->state=TCPConnection::BYTE1; + if(bytes==2) { + conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1]; + conn->data.resize(conn->qlen); + conn->bytesread=0; + conn->state=TCPConnection::GETQUESTION; + } + if (bytes <= 0) { + tcpGuard.handleTCPReadResult(fd, bytes); + return; + } + } + + if (conn->state==TCPConnection::BYTE1) { + ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0); + if(bytes==1) { + conn->state=TCPConnection::GETQUESTION; + conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1]; + conn->data.resize(conn->qlen); + conn->bytesread=0; + } + if (bytes <= 0) { + if (!tcpGuard.handleTCPReadResult(fd, bytes)) { + if(g_logCommonErrors) { + g_log<d_remote.toStringWithPort() <<" disconnected after first byte"<state==TCPConnection::GETQUESTION) { + ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0); + if (bytes <= 0) { + if (!tcpGuard.handleTCPReadResult(fd, bytes)) { + if(g_logCommonErrors) { + g_log<d_remote.toStringWithPort() <<" disconnected while reading question body"< std::numeric_limits::max()) { + if(g_logCommonErrors) { + g_log<d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<bytesread+=(uint16_t)bytes; + if(conn->bytesread==conn->qlen) { + conn->state = TCPConnection::BYTE0; + std::unique_ptr dc; + try { + dc = std::make_unique(conn->data, g_now); + } + catch(const MOADNSException &mde) { + g_stats.clientParseError++; + if (g_logCommonErrors) { + g_log<d_remote.toStringWithPort() <d_tcpConnection = conn; // carry the torch + dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd + dc->d_tcp=true; + dc->setRemote(conn->d_remote); + dc->setSource(conn->d_source); + ComboAddress dest; + dest.reset(); + dest.sin4.sin_family = conn->d_remote.sin4.sin_family; + socklen_t len = dest.getSocklen(); + getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it + dc->setLocal(dest); + dc->setDestination(conn->d_destination); + /* we can't move this if we want to be able to access the values in + all queries sent over this connection */ + dc->d_proxyProtocolValues = conn->proxyProtocolValues; + + struct timeval start; + Utility::gettimeofday(&start, nullptr); + + DNSName qname; + uint16_t qtype=0; + uint16_t qclass=0; + bool needECS = false; + bool needXPF = g_XPFAcl.match(conn->d_remote); + string requestorId; + string deviceId; + string deviceName; + bool logQuery = false; + bool qnameParsed = false; + + dc->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled); + dc->d_eventTrace.add(RecEventTrace::ReqRecv); + auto luaconfsLocal = g_luaconfs.getLocal(); + if (checkProtobufExport(luaconfsLocal)) { + needECS = true; + } + logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries; + dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses; + +#ifdef HAVE_FSTRM + checkFrameStreamExport(luaconfsLocal); +#endif + + if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) { + + try { + EDNSOptionViewMap ednsOptions; + bool xpfFound = false; + dc->d_ecsParsed = true; + dc->d_ecsFound = false; + getQNameAndSubnet(conn->data, &qname, &qtype, &qclass, + dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr, + xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr); + qnameParsed = true; + + if(t_pdl) { + try { + if (t_pdl->d_gettag_ffi) { + RecursorLua4::FFIParams params(qname, qtype, dc->d_destination, dc->d_source, dc->d_ednssubnet.source, dc->d_data, dc->d_policyTags, dc->d_records, ednsOptions, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, true, logQuery, dc->d_logResponse, dc->d_followCNAMERecords, dc->d_extendedErrorCode, dc->d_extendedErrorExtra, dc->d_responsePaddingDisabled, dc->d_meta); + dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI); + dc->d_tag = t_pdl->gettag_ffi(params); + dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, dc->d_tag, false); + } + else if (t_pdl->d_gettag) { + dc->d_eventTrace.add(RecEventTrace::LuaGetTag); + dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues); + dc->d_eventTrace.add(RecEventTrace::LuaGetTag, dc->d_tag, false); + } + } + catch(const std::exception& e) { + if(g_logCommonErrors) { + g_log<d_tag == 0 && !dc->d_responsePaddingDisabled && g_paddingFrom.match(dc->d_remote)) { + dc->d_tag = g_paddingTag; + } + + const struct dnsheader* dh = reinterpret_cast(&conn->data[0]); + + if (t_protobufServers || t_outgoingProtobufServers) { + dc->d_requestorId = requestorId; + dc->d_deviceId = deviceId; + dc->d_deviceName = deviceName; + dc->d_uuid = getUniqueID(); + } + + if(t_protobufServers) { + try { + + if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) { + protobufLogQuery(luaconfsLocal, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta); + } + } + catch (const std::exception& e) { + if (g_logCommonErrors) { + g_log<ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace); + if (ipf) { + if (!g_quiet) { + g_log<getTid()<<"/"<numProcesses()<<"] DROPPED TCP question from "<d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<d_mdp.d_header.qr) { + g_stats.ignoredCount++; + if (g_logCommonErrors) { + g_log<getRemote() <<" on server socket!"<d_mdp.d_header.opcode != Opcode::Query && dc->d_mdp.d_header.opcode != Opcode::Notify) { + g_stats.ignoredCount++; + if (g_logCommonErrors) { + g_log<d_mdp.d_header.opcode)<<" from TCP client "<< dc->getRemote() <<" on server socket!"<qdcount == 0) { + g_stats.emptyQueriesCount++; + if (g_logCommonErrors) { + g_log<getRemote() <<" on server socket!"<d_mdp.d_header.opcode == Opcode::Notify) { + if(!t_allowNotifyFrom || !t_allowNotifyFrom->match(dc->d_source)) { + if(!g_quiet) { + g_log<getTid()<<"] dropping TCP NOTIFY from "<d_source.toString()<<", address not matched by allow-notify-from"<getTid()<<"] dropping TCP NOTIFY from "<d_source.toString()<<", for "<d_mdp.d_header.opcode == Opcode::Query) { + /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable, + but it means that the hash would not be computed. If some script decides at a later time to mark back the answer + as cacheable we would cache it with a wrong tag, so better safe than sorry. */ + dc->d_eventTrace.add(RecEventTrace::PCacheCheck); + bool cacheHit = checkForCacheHit(qnameParsed, dc->d_tag, conn->data, qname, qtype, qclass, g_now, response, dc->d_qhash, pbData, true, dc->d_source); + dc->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false); + + if (cacheHit) { + if (!g_quiet) { + g_log<d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<d_eventTrace.add(RecEventTrace::AnswerSent); + + if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) { + struct timeval tv{0, 0}; + protobufLogResponse(dh, luaconfsLocal, pbData, tv, true, dc->d_source, dc->d_destination, dc->d_ednssubnet, dc->d_uuid, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta, dc->d_eventTrace); + } + + if (dc->d_eventTrace.enabled() && SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) { + g_log << Logger::Info << dc->d_eventTrace.toString() << endl; + } + tcpGuard.keep(); + return; + } // cache hit + } // query opcode + + if(dc->d_mdp.d_header.opcode == Opcode::Notify) { + if (!g_quiet) { + g_log<d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<opcode, but we need to ensure that the response + // to this request does not get put into the packet cache + dc->d_variable = true; + } + + // setup for startDoResolve() in an mthread + ++conn->d_requestsInFlight; + if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) { + t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read + } else { + Utility::gettimeofday(&g_now, nullptr); // needed? + struct timeval ttd = g_now; + t_fdm->setReadTTD(fd, ttd, g_tcpTimeout); + } + tcpGuard.keep(); + MT->makeThread(startDoResolve, dc.release()); // deletes dc + } // good query + } // read full query + } // reading query + + // more to come + tcpGuard.keep(); +} + +//! Handle new incoming TCP connection +void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& ) +{ + ComboAddress addr; + socklen_t addrlen=sizeof(addr); + int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen); + if(newsock>=0) { + if(MT->numProcesses() > g_maxMThreads) { + g_stats.overCapacityDrops++; + try { + closesocket(newsock); + } + catch(const PDNSException& e) { + g_log<push_back(addr); + } + + bool fromProxyProtocolSource = expectProxyProtocol(addr); + if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) { + if(!g_quiet) + g_log<getTid()<<"] dropping TCP query from "<count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) { + g_stats.tcpClientOverflow++; + try { + closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet! + } + catch(const PDNSException& e) { + g_log< tc = std::make_shared(newsock, addr); + tc->d_source = addr; + tc->d_destination.reset(); + tc->d_destination.sin4.sin_family = addr.sin4.sin_family; + socklen_t len = tc->d_destination.getSocklen(); + getsockname(tc->getFD(), reinterpret_cast(&tc->d_destination), &len); // if this fails, we're ok with it + + if (fromProxyProtocolSource) { + tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize; + tc->data.resize(tc->proxyProtocolNeed); + tc->state = TCPConnection::PROXYPROTOCOLHEADER; + } + else { + tc->state = TCPConnection::BYTE0; + } + + struct timeval ttd; + Utility::gettimeofday(&ttd, nullptr); + ttd.tv_sec += g_tcpTimeout; + + t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd); + } +} + +static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var); + +static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr& pid) +{ + TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl); + + pid->lowState = newstate; + + // handle state transitions + switch (oldstate) { + case IOState::NeedRead: + + switch (newstate) { + case IOState::NeedWrite: + TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl); + t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::NeedRead: + break; + case IOState::Done: + TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl); + t_fdm->removeReadFD(pid->tcpsock); + break; + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + break; + + case IOState::NeedWrite: + + switch (newstate) { + case IOState::NeedRead: + TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl); + t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::NeedWrite: + break; + case IOState::Done: + TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl); + t_fdm->removeWriteFD(pid->tcpsock); + break; + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + break; + + case IOState::Done: + switch (newstate) { + case IOState::NeedRead: + TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl); + t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::NeedWrite: + TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl); + t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::Done: + break; + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + break; + + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + +} + +static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var) +{ + std::shared_ptr pid = boost::any_cast>(var); + assert(pid->tcphandler); + assert(fd == pid->tcphandler->getDescriptor()); + IOState newstate = IOState::Done; + + TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl); + + // In the code below, we want to update the state of the fd before calling sendEvent + // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd + + switch (pid->highState) { + case TCPAction::DoingRead: + TCPLOG(pid->tcpsock, "highState: Reading" << endl); + // In arecvtcp, the buffer was resized already so inWanted bytes will fit + // try reading + try { + newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted); + switch (newstate) { + case IOState::Done: + case IOState::NeedRead: + TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl); + TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl); + if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) { + pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case + newstate = IOState::Done; + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(pid, &pid->inMSG); + return; + } + break; + case IOState::NeedWrite: + break; + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + } + catch (const std::exception& e) { + newstate = IOState::Done; + TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl); + PacketBuffer empty; + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(pid, &empty); // this conveys error status + return; + } + break; + + case TCPAction::DoingWrite: + TCPLOG(pid->tcpsock, "highState: Writing" << endl); + try { + TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> "); + newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size()); + TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl); + switch (newstate) { + case IOState::Done: { + TCPLOG(pid->tcpsock, "tryWrite: Done" << endl); + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok + return; + } + case IOState::NeedRead: + TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl); + break; + case IOState::NeedWrite: + TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl); + break; + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + } + catch (const std::exception& e) { + newstate = IOState::Done; + TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl); + PacketBuffer sent; + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(pid, &sent); // we convey error status by sending empty string + return; + } + break; + } + + // Cases that did not end up doing a sendEvent + TCPIOHandlerStateChange(pid->lowState, newstate, pid); +} + + +void checkFastOpenSysctl(bool active) +{ +#ifdef __linux__ + string line; + if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) { + int flag = std::stoi(line); + if (active && !(flag & 1)) { + g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl; + } + if (!active && !(flag & 2)) { + g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl; + } + } + else { + g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl; + } +#else + g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl; +#endif +} + +void checkTFOconnect() +{ + try { + Socket s(AF_INET, SOCK_STREAM); + s.setNonBlocking(); + s.setFastOpenConnect(); + } + catch (const NetworkError& e) { + g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl; + } +} + + + +LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr& handler) +{ + TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl); + + auto pident = std::make_shared(); + pident->tcphandler = handler; + pident->tcpsock = handler->getDescriptor(); + pident->outMSG = data; + pident->highState = TCPAction::DoingWrite; + + IOState state; + try { + TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> "); + state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size()); + TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl); + + if (state == IOState::Done) { + TCPLOG(pident->tcpsock, "asendtcp success A" << endl); + return LWResult::Result::Success; + } + } + catch (const std::exception& e) { + TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl); + return LWResult::Result::PermanentError; + } + + // Will set pident->lowState + TCPIOHandlerStateChange(IOState::Done, state, pident); + + PacketBuffer packet; + int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec); + TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' '); + if (ret == 0) { + TCPLOG(pident->tcpsock, "timeout" << endl); + TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); + return LWResult::Result::Timeout; + } + else if (ret == -1) { // error + TCPLOG(pident->tcpsock, "PermanentError" << endl); + TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); + return LWResult::Result::PermanentError; + } + else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error + // fd housekeeping done by TCPIOHandlerIO + TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl); + return LWResult::Result::PermanentError; + } + + TCPLOG(pident->tcpsock, "asendtcp success" << endl); + return LWResult::Result::Success; +} + +LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr& handler, const bool incompleteOkay) +{ + TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl); + data.resize(len); + + // We might have data already available from the TLS layer, try to get that into the buffer + size_t pos = 0; + IOState state; + try { + TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl); + state = handler->tryRead(data, pos, len); + TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl); + switch (state) { + case IOState::Done: + case IOState::NeedRead: + if (pos == len || (incompleteOkay && pos > 0)) { + data.resize(pos); + TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl); + return LWResult::Result::Success; + } + break; + case IOState::NeedWrite: + break; + case IOState::Async: + throw std::runtime_error("TLS async mode not supported"); + break; + } + } + catch (const std::exception& e) { + TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl); + return LWResult::Result::PermanentError; + } + + auto pident = std::make_shared(); + pident->tcphandler = handler; + pident->tcpsock = handler->getDescriptor(); + // We might have a partial result + pident->inMSG = std::move(data); + pident->inPos = pos; + pident->inWanted = len; + pident->inIncompleteOkay = incompleteOkay; + pident->highState = TCPAction::DoingRead; + + data.clear(); + + // Will set pident->lowState + TCPIOHandlerStateChange(IOState::Done, state, pident); + + int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec); + TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' ); + if (ret == 0) { + TCPLOG(pident->tcpsock, "timeout" << endl); + TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); + return LWResult::Result::Timeout; + } + else if (ret == -1) { + TCPLOG(pident->tcpsock, "PermanentError" << endl); + TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident); + return LWResult::Result::PermanentError; + } + else if (data.empty()) {// error, EOF or other + // fd housekeeping done by TCPIOHandlerIO + TCPLOG(pident->tcpsock, "EOF" << endl); + return LWResult::Result::PermanentError; + } + + TCPLOG(pident->tcpsock, "arecvtcp success" << endl); + return LWResult::Result::Success; +} + +void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set& tcpSockets) +{ + int fd; + vectorlocals; + stringtok(locals,::arg()["local-address"]," ,"); + + if(locals.empty()) + throw PDNSException("No local address specified"); + + for(vector::const_iterator i=locals.begin();i!=locals.end();++i) { + ServiceTuple st; + st.port=::arg().asNum("local-port"); + parseService(*i, st); + + ComboAddress sin; + + sin.reset(); + sin.sin4.sin_family = AF_INET; + if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) { + sin.sin6.sin6_family = AF_INET6; + if(makeIPv6sockaddr(st.host, &sin.sin6) < 0) + throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'"); + } + + fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0); + if(fd<0) + throw PDNSException("Making a TCP server socket for resolver: "+stringerror()); + + setCloseOnExec(fd); + + int tmp=1; + if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) { + g_log<= 0) { + if(i==locals.begin()) + g_log< 0) { + checkFastOpenSysctl(false); +#ifdef TCP_FASTOPEN + if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) { + int err = errno; + g_log< g_recCache; extern thread_local std::unique_ptr t_packetCache; -typedef MTasker, PacketBuffer, PacketIDCompare> MT_t; -MT_t* getMT(); struct RecursorStats { diff --git a/pdns/ws-recursor.cc b/pdns/ws-recursor.cc index 84feae9286..de4b6e5fec 100644 --- a/pdns/ws-recursor.cc +++ b/pdns/ws-recursor.cc @@ -43,6 +43,7 @@ #include "rpzloader.hh" #include "uuid-utils.hh" #include "tcpiohandler.hh" +#include "rec-main.hh" extern thread_local FDMultiplexer* t_fdm; -- 2.47.2