./pdns/packethandler.cc
./pdns/packethandler.hh
./pdns/pdns_hw.cc
-./pdns/pdns_recursor.cc
./pdns/pdnsexception.hh
./pdns/pdnsutil.cc
./pdns/pkcs11signers.cc
#include "filterpo.hh"
#include "rec-snmp.hh"
#include <unordered_set>
+#include "rec-main.hh"
RecursorLua4::RecursorLua4() { prepareContext(); }
#include "lua-recursor4-ffi.hh"
-PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
-unsigned int getRecursorThreadId();
-
// pdns_ffi_param_t is a lightuserdata
template <>
struct LuaContext::Pusher<pdns_ffi_param*>
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
+
+#include "rec-main.hh"
#include <netdb.h>
#include <sys/stat.h>
#include "xpf.hh"
#include "rec-eventtrace.hh"
-typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
-
-static thread_local std::shared_ptr<RecursorLua4> t_pdl;
-static thread_local unsigned int t_id = 0;
+thread_local std::shared_ptr<RecursorLua4> t_pdl;
+thread_local unsigned int t_id = 0;
static thread_local std::shared_ptr<Regex> t_traceRegex;
-static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
-static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
+thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers{nullptr};
static thread_local uint64_t t_protobufServersGeneration;
-static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
+thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers{nullptr};
static thread_local uint64_t t_outgoingProtobufServersGeneration;
#ifdef HAVE_FSTRM
#endif /* NOD_ENABLED */
__thread struct timeval g_now; // timestamp, updated (too) frequently
-typedef vector<pair<int, boost::function< void(int, boost::any&) > > > deferredAdd_t;
-
// for communicating with our threads
// effectively readonly after startup
struct RecThreadInfo
/* without reuseport, all listeners share the same sockets */
static deferredAdd_t g_deferredAdds;
-typedef vector<int> tcpListenSockets_t;
typedef map<int, ComboAddress> listenSocketsAddresses_t; // is shared across all threads right now
enum class PaddingMode { Always, PaddedQueries };
static std::shared_ptr<NetmaskGroup> g_initialAllowFrom; // new thread needs to be setup with this
static std::shared_ptr<NetmaskGroup> g_initialAllowNotifyFrom; // new threads need this to be setup
static std::shared_ptr<notifyset_t> g_initialAllowNotifyFor; // new threads need this to be setup
-static NetmaskGroup g_XPFAcl;
+NetmaskGroup g_XPFAcl;
static NetmaskGroup g_proxyProtocolACL;
-static NetmaskGroup g_paddingFrom;
+NetmaskGroup g_paddingFrom;
static boost::optional<ComboAddress> g_dns64Prefix{boost::none};
static DNSName g_dns64PrefixReverse;
-static size_t g_proxyProtocolMaximumSize;
-static size_t g_tcpMaxQueriesPerConn;
+size_t g_proxyProtocolMaximumSize;
static size_t s_maxUDPQueriesPerRound;
static uint64_t g_latencyStatSize;
static uint32_t g_disthashseed;
-static unsigned int g_maxTCPPerClient;
-static unsigned int g_maxMThreads;
+unsigned int g_maxMThreads;
static unsigned int g_numDistributorThreads;
static unsigned int g_numWorkerThreads;
-static unsigned int g_paddingTag;
-static int g_tcpTimeout;
+unsigned int g_paddingTag;
static uint16_t g_udpTruncationThreshold;
static uint16_t g_xpfRRCode{0};
static PaddingMode g_paddingMode;
static std::atomic<bool> statsWanted;
-static std::atomic<bool> g_quiet;
-static bool g_logCommonErrors;
-static bool g_anyToTcp;
+std::atomic<bool> g_quiet;
+bool g_logCommonErrors;
static bool g_weDistributeQueries; // if true, 1 or more threads listen on the incoming query sockets and distribute them to workers
-static bool g_reusePort{false};
-static bool g_gettagNeedsEDNSOptions{false};
+bool g_reusePort{false};
+bool g_gettagNeedsEDNSOptions{false};
static time_t g_statisticsInterval;
static bool g_useIncomingECS;
static bool g_useKernelTimestamp;
bool wantAnswer;
};
-//! used to send information to a newborn mthread
-struct DNSComboWriter {
- DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
- {
- }
-
- DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set<std::string>&& policyTags, LuaContext::LuaObject&& data, std::vector<DNSRecord>&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data))
- {
- }
-
- void setRemote(const ComboAddress& sa)
- {
- d_remote=sa;
- }
-
- void setSource(const ComboAddress& sa)
- {
- d_source=sa;
- }
-
- void setLocal(const ComboAddress& sa)
- {
- d_local=sa;
- }
-
- void setDestination(const ComboAddress& sa)
- {
- d_destination=sa;
- }
-
- void setSocket(int sock)
- {
- d_socket=sock;
- }
-
- string getRemote() const
- {
- if (d_source == d_remote) {
- return d_source.toStringWithPort();
- }
- return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
- }
-
- std::vector<ProxyProtocolValue> d_proxyProtocolValues;
- MOADNSParser d_mdp;
- struct timeval d_now;
- /* Remote client, might differ from d_source
- in case of XPF, in which case d_source holds
- the IP of the client and d_remote of the proxy
- */
- ComboAddress d_remote;
- ComboAddress d_source;
- /* Destination address, might differ from
- d_destination in case of XPF, in which case
- d_destination holds the IP of the proxy and
- d_local holds our own. */
- ComboAddress d_local;
- ComboAddress d_destination;
- RecEventTrace d_eventTrace;
- boost::uuids::uuid d_uuid;
- string d_requestorId;
- string d_deviceId;
- string d_deviceName;
- struct timeval d_kernelTimestamp{0,0};
- std::string d_query;
- std::unordered_set<std::string> d_policyTags;
- std::string d_routingTag;
- std::vector<DNSRecord> d_records;
- LuaContext::LuaObject d_data;
- EDNSSubnetOpts d_ednssubnet;
- shared_ptr<TCPConnection> d_tcpConnection;
- boost::optional<uint16_t> d_extendedErrorCode{boost::none};
- string d_extendedErrorExtra;
- boost::optional<int> d_rcode{boost::none};
- int d_socket{-1};
- unsigned int d_tag{0};
- uint32_t d_qhash{0};
- uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
- bool d_variable{false};
- bool d_ecsFound{false};
- bool d_ecsParsed{false};
- bool d_followCNAMERecords{false};
- bool d_logResponse{false};
- bool d_tcp{false};
- bool d_responsePaddingDisabled{false};
- std::map<std::string, RecursorLua4::MetaValue> d_meta;
-};
-
-MT_t* getMT()
-{
- return MT ? MT.get() : nullptr;
-}
ArgvMap &arg()
{
return theArg;
}
-unsigned int getRecursorThreadId()
-{
- return t_id;
-}
static bool isDistributorThread()
{
return s_threadInfos.at(t_id).isHandler;
}
-#if 0
-#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0)
-#else
-#define TCPLOG(pid, x)
-#endif
-
-static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
-static void TCPIOHandlerStateChange(IOState, IOState, std::shared_ptr<PacketID>&);
-
-LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
-{
- TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
-
- auto pident = std::make_shared<PacketID>();
- pident->tcphandler = handler;
- pident->tcpsock = handler->getDescriptor();
- pident->outMSG = data;
- pident->highState = TCPAction::DoingWrite;
-
- IOState state;
- try {
- TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
- state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
- TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
-
- if (state == IOState::Done) {
- TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
- return LWResult::Result::Success;
- }
- }
- catch (const std::exception& e) {
- TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
- return LWResult::Result::PermanentError;
- }
-
- // Will set pident->lowState
- TCPIOHandlerStateChange(IOState::Done, state, pident);
-
- PacketBuffer packet;
- int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
- TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
- if (ret == 0) {
- TCPLOG(pident->tcpsock, "timeout" << endl);
- TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
- return LWResult::Result::Timeout;
- }
- else if (ret == -1) { // error
- TCPLOG(pident->tcpsock, "PermanentError" << endl);
- TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
- return LWResult::Result::PermanentError;
- }
- else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
- // fd housekeeping done by TCPIOHandlerIO
- TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
- return LWResult::Result::PermanentError;
- }
-
- TCPLOG(pident->tcpsock, "asendtcp success" << endl);
- return LWResult::Result::Success;
-}
-
-LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
-{
- TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
- data.resize(len);
-
- // We might have data already available from the TLS layer, try to get that into the buffer
- size_t pos = 0;
- IOState state;
- try {
- TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
- state = handler->tryRead(data, pos, len);
- TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
- switch (state) {
- case IOState::Done:
- case IOState::NeedRead:
- if (pos == len || (incompleteOkay && pos > 0)) {
- data.resize(pos);
- TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
- return LWResult::Result::Success;
- }
- break;
- case IOState::NeedWrite:
- break;
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
- }
- catch (const std::exception& e) {
- TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
- return LWResult::Result::PermanentError;
- }
-
- auto pident = std::make_shared<PacketID>();
- pident->tcphandler = handler;
- pident->tcpsock = handler->getDescriptor();
- // We might have a partial result
- pident->inMSG = std::move(data);
- pident->inPos = pos;
- pident->inWanted = len;
- pident->inIncompleteOkay = incompleteOkay;
- pident->highState = TCPAction::DoingRead;
-
- data.clear();
-
- // Will set pident->lowState
- TCPIOHandlerStateChange(IOState::Done, state, pident);
-
- int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
- TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' );
- if (ret == 0) {
- TCPLOG(pident->tcpsock, "timeout" << endl);
- TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
- return LWResult::Result::Timeout;
- }
- else if (ret == -1) {
- TCPLOG(pident->tcpsock, "PermanentError" << endl);
- TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
- return LWResult::Result::PermanentError;
- }
- else if (data.empty()) {// error, EOF or other
- // fd housekeeping done by TCPIOHandlerIO
- TCPLOG(pident->tcpsock, "EOF" << endl);
- return LWResult::Result::PermanentError;
- }
-
- TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
- return LWResult::Result::Success;
-}
static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
{
}
}
-uint16_t TCPConnection::s_maxInFlight;
-
-TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
-{
- ++s_currentConnections;
- (*t_tcpClientCounts)[d_remote]++;
-}
-
-TCPConnection::~TCPConnection()
-{
- try {
- if(closesocket(d_fd) < 0)
- g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
- }
- catch(const PDNSException& e) {
- g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
- }
-
- if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
- t_tcpClientCounts->erase(d_remote);
- --s_currentConnections;
-}
-
-std::atomic<uint32_t> TCPConnection::s_currentConnections;
-
-static void terminateTCPConnection(int fd)
-{
- try {
- t_fdm->removeReadFD(fd);
- }
- catch (const FDMultiplexerException& fde)
- {
- }
-}
-
-/* this function is called with both a string and a vector<uint8_t> representing a packet */
-template <class T>
-static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const T& packet)
-{
- uint8_t buf[2];
- buf[0] = packet.size() / 256;
- buf[1] = packet.size() % 256;
-
- Utility::iovec iov[2];
- iov[0].iov_base = (void*)buf; iov[0].iov_len = 2;
- iov[1].iov_base = (void*)&*packet.begin(); iov[1].iov_len = packet.size();
-
- int wret = Utility::writev(dc->d_socket, iov, 2);
- bool hadError = true;
-
- if (wret == 0) {
- g_log<<Logger::Warning<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
- } else if (wret < 0 ) {
- int err = errno;
- g_log << Logger::Warning << "Error writing TCP answer to " << dc->getRemote() << ": " << strerror(err) << endl;
- } else if ((unsigned int)wret != 2 + packet.size()) {
- g_log<<Logger::Warning<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
- } else {
- hadError = false;
- }
-
- return hadError;
-}
-
-static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& dc, int rcode)
-{
- std::vector<uint8_t> packet;
- if (dc->d_mdp.d_header.qdcount == 0) {
- /* header-only */
- packet.resize(sizeof(dnsheader));
- }
- else {
- DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
- if (dc->d_mdp.hasEDNS()) {
- /* we try to add the EDNS OPT RR even for truncated answers,
- as rfc6891 states:
- "The minimal response MUST be the DNS header, question section, and an
- OPT record. This MUST also occur when a truncated response (using
- the DNS header's TC bit) is returned."
- */
- pw.addOpt(512, 0, 0);
- pw.commit();
- }
- }
-
- dnsheader& header = reinterpret_cast<dnsheader&>(packet.at(0));
- header.aa = 0;
- header.ra = 1;
- header.qr = 1;
- header.tc = 0;
- header.id = dc->d_mdp.d_header.id;
- header.rd = dc->d_mdp.d_header.rd;
- header.cd = dc->d_mdp.d_header.cd;
- header.rcode = rcode;
-
- sendResponseOverTCP(dc, packet);
-}
-
-static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
-static void finishTCPReply(std::unique_ptr<DNSComboWriter>& dc, bool hadError, bool updateInFlight)
-{
- // update tcp connection status, closing if needed and doing the fd multiplexer accounting
- if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight > 0) {
- dc->d_tcpConnection->d_requestsInFlight--;
- }
-
- // In the code below, we try to remove the fd from the set, but
- // we don't know if another mthread already did the remove, so we can get a
- // "Tried to remove unlisted fd" exception. Not that an inflight < limit test
- // will not work since we do not know if the other mthread got an error or not.
- if (hadError) {
- terminateTCPConnection(dc->d_socket);
- dc->d_socket = -1;
- return;
- }
- dc->d_tcpConnection->queriesCount++;
- if ((g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) ||
- (dc->d_tcpConnection->isDropOnIdle() && dc->d_tcpConnection->d_requestsInFlight == 0)) {
- try {
- t_fdm->removeReadFD(dc->d_socket);
- }
- catch (FDMultiplexerException &) {
- }
- dc->d_socket = -1;
- return;
- }
- Utility::gettimeofday(&g_now, nullptr); // needs to be updated
- struct timeval ttd = g_now;
-
- // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
- if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
- // A read error might have happened. If we add the fd back, it will most likely error again.
- // This is not a big issue, the next handleTCPClientReadable() will see another read error
- // and take action.
- ttd.tv_sec += g_tcpTimeout;
- t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
- return;
- }
- // fd might have been removed by read error code, or a read timeout, so expect an exception
- try {
- t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
- }
- catch (const FDMultiplexerException &) {
- // but if the FD was removed because of a timeout while we were sending a response,
- // we need to re-arm it. If it was an error it will error again.
- ttd.tv_sec += g_tcpTimeout;
- t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
- }
-}
// the idea is, only do things that depend on the *response* here. Incoming accounting is on incoming.
static void updateResponseStats(int res, const ComboAddress& remote, unsigned int packetsize, const DNSName* query, uint16_t qtype)
return "Exception making error message for exception";
}
-static void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta)
+void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta)
{
if (!t_protobufServers) {
return;
}
}
-static void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
+void protobufLogResponse(pdns::ProtoZero::RecMessage& message)
{
if (!t_protobufServers) {
return;
}
}
-static void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder<LuaConfigItems>& luaconfsLocal,
+void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder<LuaConfigItems>& luaconfsLocal,
const RecursorPacketCache::OptPBData& pbData, const struct timeval& tv,
bool tcp, const ComboAddress& source, const ComboAddress& destination,
const EDNSSubnetOpts& ednssubnet,
return result;
}
-static bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
{
if (!luaconfsLocal->protobufExportConfig.enabled) {
if (t_protobufServers) {
return result;
}
-static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
{
if (!luaconfsLocal->frameStreamExportConfig.enabled) {
if (t_frameStreamServers) {
return true;
}
-static bool isAllowNotifyForZone(DNSName qname)
+bool isAllowNotifyForZone(DNSName qname)
{
if (t_allowNotifyFor->empty()) {
return false;
return false;
}
-static void startDoResolve(void *p)
+void startDoResolve(void *p)
{
auto dc=std::unique_ptr<DNSComboWriter>(reinterpret_cast<DNSComboWriter*>(p));
try {
}
}
-static void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
+void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest)
{
}
}
-static bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data,
+bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data,
DNSName& qname, uint16_t& qtype, uint16_t& qclass,
const struct timeval& now,
string& response, uint32_t& qhash,
return nullptr;
}
-static void requestWipeCaches(const DNSName& canon)
+void requestWipeCaches(const DNSName& canon)
{
// send a message to the handler thread asking it
// to wipe all of the caches
}
}
-/*
- * A helper class that by default closes the incoming TCP connection on destruct
- * If you want to keep the connection alive, call keep() on the guard object
- */
-class RunningTCPQuestionGuard {
-public:
- RunningTCPQuestionGuard(int fd)
- {
- d_fd = fd;
- }
- ~RunningTCPQuestionGuard()
- {
- if (d_fd != -1) {
- terminateTCPConnection(d_fd);
- d_fd = -1;
- }
- }
- void keep()
- {
- d_fd = -1;
- }
- bool handleTCPReadResult(int fd, ssize_t bytes)
- {
- if (bytes == 0) {
- /* EOF */
- return false;
- }
- else if (bytes < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- return false;
- }
- }
- keep();
- return true;
- }
-
-private:
- int d_fd{-1};
-};
-
-static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
-{
- shared_ptr<TCPConnection> conn=boost::any_cast<shared_ptr<TCPConnection> >(var);
-
- RunningTCPQuestionGuard tcpGuard{fd};
-
- if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
- ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
- if (bytes <= 0) {
- tcpGuard.handleTCPReadResult(fd, bytes);
- return;
- }
-
- conn->proxyProtocolGot += bytes;
- conn->data.resize(conn->proxyProtocolGot);
- ssize_t remaining = isProxyHeaderComplete(conn->data);
- if (remaining == 0) {
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Unable to consume proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
- }
- ++g_stats.proxyProtocolInvalidCount;
- return;
- }
- else if (remaining < 0) {
- conn->proxyProtocolNeed = -remaining;
- conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
- tcpGuard.keep();
- return;
- }
- else {
- /* proxy header received */
- /* we ignore the TCP field for now, but we could properly set whether
- the connection was received over UDP or TCP if needed */
- bool tcp;
- bool proxy = false;
- size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
- if (used <= 0) {
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Unable to parse proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
- }
- ++g_stats.proxyProtocolInvalidCount;
- return;
- }
- else if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl;
- }
- ++g_stats.proxyProtocolInvalidCount;
- return;
- }
-
- /* Now that we have retrieved the address of the client, as advertised by the proxy
- via the proxy protocol header, check that it is allowed by our ACL */
- /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
- if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) {
- if (!g_quiet) {
- g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<conn->d_source.toString()<<", address not matched by allow-from"<<endl;
- }
-
- ++g_stats.unauthorizedTCP;
- return;
- }
-
- conn->data.resize(2);
- conn->state = TCPConnection::BYTE0;
- }
- }
-
- if (conn->state==TCPConnection::BYTE0) {
- ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
- if(bytes==1)
- conn->state=TCPConnection::BYTE1;
- if(bytes==2) {
- conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
- conn->data.resize(conn->qlen);
- conn->bytesread=0;
- conn->state=TCPConnection::GETQUESTION;
- }
- if (bytes <= 0) {
- tcpGuard.handleTCPReadResult(fd, bytes);
- return;
- }
- }
-
- if (conn->state==TCPConnection::BYTE1) {
- ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
- if(bytes==1) {
- conn->state=TCPConnection::GETQUESTION;
- conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
- conn->data.resize(conn->qlen);
- conn->bytesread=0;
- }
- if (bytes <= 0) {
- if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
- if(g_logCommonErrors) {
- g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
- }
- }
- return;
- }
- }
-
- if(conn->state==TCPConnection::GETQUESTION) {
- ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
- if (bytes <= 0) {
- if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
- if(g_logCommonErrors) {
- g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
- }
- }
- return;
- }
- else if (bytes > std::numeric_limits<std::uint16_t>::max()) {
- if(g_logCommonErrors) {
- g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<<endl;
- }
- return;
- }
- conn->bytesread+=(uint16_t)bytes;
- if(conn->bytesread==conn->qlen) {
- conn->state = TCPConnection::BYTE0;
- std::unique_ptr<DNSComboWriter> dc;
- try {
- dc = std::make_unique<DNSComboWriter>(conn->data, g_now);
- }
- catch(const MOADNSException &mde) {
- g_stats.clientParseError++;
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
- }
- return;
- }
- dc->d_tcpConnection = conn; // carry the torch
- dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
- dc->d_tcp=true;
- dc->setRemote(conn->d_remote);
- dc->setSource(conn->d_source);
- ComboAddress dest;
- dest.reset();
- dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
- socklen_t len = dest.getSocklen();
- getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
- dc->setLocal(dest);
- dc->setDestination(conn->d_destination);
- /* we can't move this if we want to be able to access the values in
- all queries sent over this connection */
- dc->d_proxyProtocolValues = conn->proxyProtocolValues;
-
- struct timeval start;
- Utility::gettimeofday(&start, nullptr);
-
- DNSName qname;
- uint16_t qtype=0;
- uint16_t qclass=0;
- bool needECS = false;
- bool needXPF = g_XPFAcl.match(conn->d_remote);
- string requestorId;
- string deviceId;
- string deviceName;
- bool logQuery = false;
- bool qnameParsed = false;
-
- dc->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled);
- dc->d_eventTrace.add(RecEventTrace::ReqRecv);
- auto luaconfsLocal = g_luaconfs.getLocal();
- if (checkProtobufExport(luaconfsLocal)) {
- needECS = true;
- }
- logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
- dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
-
-#ifdef HAVE_FSTRM
- checkFrameStreamExport(luaconfsLocal);
-#endif
-
- if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) {
-
- try {
- EDNSOptionViewMap ednsOptions;
- bool xpfFound = false;
- dc->d_ecsParsed = true;
- dc->d_ecsFound = false;
- getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
- dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
- xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
- qnameParsed = true;
-
- if(t_pdl) {
- try {
- if (t_pdl->d_gettag_ffi) {
- RecursorLua4::FFIParams params(qname, qtype, dc->d_destination, dc->d_source, dc->d_ednssubnet.source, dc->d_data, dc->d_policyTags, dc->d_records, ednsOptions, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, true, logQuery, dc->d_logResponse, dc->d_followCNAMERecords, dc->d_extendedErrorCode, dc->d_extendedErrorExtra, dc->d_responsePaddingDisabled, dc->d_meta);
- dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
- dc->d_tag = t_pdl->gettag_ffi(params);
- dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, dc->d_tag, false);
- }
- else if (t_pdl->d_gettag) {
- dc->d_eventTrace.add(RecEventTrace::LuaGetTag);
- dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues);
- dc->d_eventTrace.add(RecEventTrace::LuaGetTag, dc->d_tag, false);
- }
- }
- catch(const std::exception& e) {
- if(g_logCommonErrors) {
- g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
- }
- }
- }
- }
- catch(const std::exception& e)
- {
- if (g_logCommonErrors) {
- g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
- }
- }
- }
-
- if (dc->d_tag == 0 && !dc->d_responsePaddingDisabled && g_paddingFrom.match(dc->d_remote)) {
- dc->d_tag = g_paddingTag;
- }
-
- const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
-
- if (t_protobufServers || t_outgoingProtobufServers) {
- dc->d_requestorId = requestorId;
- dc->d_deviceId = deviceId;
- dc->d_deviceName = deviceName;
- dc->d_uuid = getUniqueID();
- }
-
- if(t_protobufServers) {
- try {
-
- if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
- protobufLogQuery(luaconfsLocal, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta);
- }
- }
- catch (const std::exception& e) {
- if (g_logCommonErrors) {
- g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
- }
- }
- }
-
- if (t_pdl) {
- bool ipf = t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace);
- if (ipf) {
- if (!g_quiet) {
- g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
- }
- g_stats.policyDrops++;
- return;
- }
- }
-
- if (dc->d_mdp.d_header.qr) {
- g_stats.ignoredCount++;
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
- }
- return;
- }
- if (dc->d_mdp.d_header.opcode != Opcode::Query && dc->d_mdp.d_header.opcode != Opcode::Notify) {
- g_stats.ignoredCount++;
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Ignoring unsupported opcode "<<Opcode::to_s(dc->d_mdp.d_header.opcode)<<" from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
- }
- sendErrorOverTCP(dc, RCode::NotImp);
- tcpGuard.keep();
- return;
- }
- else if (dh->qdcount == 0) {
- g_stats.emptyQueriesCount++;
- if (g_logCommonErrors) {
- g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
- }
- sendErrorOverTCP(dc, RCode::NotImp);
- tcpGuard.keep();
- return;
- }
- else {
- // We have read a proper query
- ++g_stats.qcounter;
- ++g_stats.tcpqcounter;
-
- if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
- if(!t_allowNotifyFrom || !t_allowNotifyFrom->match(dc->d_source)) {
- if(!g_quiet) {
- g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", address not matched by allow-notify-from"<<endl;
- }
-
- g_stats.sourceDisallowedNotify++;
- return;
- }
-
- if(!isAllowNotifyForZone(qname)) {
- if(!g_quiet) {
- g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", for "<<qname.toLogString()<<", zone not matched by allow-notify-for"<<endl;
- }
-
- g_stats.zoneDisallowedNotify++;
- return;
- }
- }
-
- string response;
- RecursorPacketCache::OptPBData pbData{boost::none};
-
- if(dc->d_mdp.d_header.opcode == Opcode::Query) {
- /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
- but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
- as cacheable we would cache it with a wrong tag, so better safe than sorry. */
- dc->d_eventTrace.add(RecEventTrace::PCacheCheck);
- bool cacheHit = checkForCacheHit(qnameParsed, dc->d_tag, conn->data, qname, qtype, qclass, g_now, response, dc->d_qhash, pbData, true, dc->d_source);
- dc->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
-
- if (cacheHit) {
- if (!g_quiet) {
- g_log<<Logger::Notice<<t_id<< " TCP question answered from packet cache tag="<<dc->d_tag<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
- }
-
- bool hadError = sendResponseOverTCP(dc, response);
- finishTCPReply(dc, hadError, false);
- struct timeval now;
- Utility::gettimeofday(&now, nullptr);
- uint64_t spentUsec = uSec(now - start);
- g_stats.cumulativeAnswers(spentUsec);
- dc->d_eventTrace.add(RecEventTrace::AnswerSent);
-
- if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
- struct timeval tv{0, 0};
- protobufLogResponse(dh, luaconfsLocal, pbData, tv, true, dc->d_source, dc->d_destination, dc->d_ednssubnet, dc->d_uuid, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta, dc->d_eventTrace);
- }
-
- if (dc->d_eventTrace.enabled() && SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) {
- g_log << Logger::Info << dc->d_eventTrace.toString() << endl;
- }
- tcpGuard.keep();
- return;
- } // cache hit
- } // query opcode
-
- if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
- if (!g_quiet) {
- g_log<<Logger::Notice<<t_id<< " got NOTIFY for "<<qname.toLogString()<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
- }
-
- requestWipeCaches(qname);
-
- // the operation will now be treated as a Query, generating
- // a normal response, as the rest of the code does not
- // check dh->opcode, but we need to ensure that the response
- // to this request does not get put into the packet cache
- dc->d_variable = true;
- }
-
- // setup for startDoResolve() in an mthread
- ++conn->d_requestsInFlight;
- if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
- t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
- } else {
- Utility::gettimeofday(&g_now, nullptr); // needed?
- struct timeval ttd = g_now;
- t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
- }
- tcpGuard.keep();
- MT->makeThread(startDoResolve, dc.release()); // deletes dc
- } // good query
- } // read full query
- } // reading query
-
- // more to come
- tcpGuard.keep();
-}
-
-static bool expectProxyProtocol(const ComboAddress& from)
+bool expectProxyProtocol(const ComboAddress& from)
{
return g_proxyProtocolACL.match(from);
}
-//! Handle new incoming TCP connection
-static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
-{
- ComboAddress addr;
- socklen_t addrlen=sizeof(addr);
- int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
- if(newsock>=0) {
- if(MT->numProcesses() > g_maxMThreads) {
- g_stats.overCapacityDrops++;
- try {
- closesocket(newsock);
- }
- catch(const PDNSException& e) {
- g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
- }
- return;
- }
-
- if(t_remotes) {
- t_remotes->push_back(addr);
- }
-
- bool fromProxyProtocolSource = expectProxyProtocol(addr);
- if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) {
- if(!g_quiet)
- g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address neither matched by allow-from nor proxy-protocol-from"<<endl;
-
- g_stats.unauthorizedTCP++;
- try {
- closesocket(newsock);
- }
- catch(const PDNSException& e) {
- g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
- }
- return;
- }
-
- if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
- g_stats.tcpClientOverflow++;
- try {
- closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
- }
- catch(const PDNSException& e) {
- g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
- }
- return;
- }
-
- setNonBlocking(newsock);
- std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
- tc->d_source = addr;
- tc->d_destination.reset();
- tc->d_destination.sin4.sin_family = addr.sin4.sin_family;
- socklen_t len = tc->d_destination.getSocklen();
- getsockname(tc->getFD(), reinterpret_cast<sockaddr*>(&tc->d_destination), &len); // if this fails, we're ok with it
-
- if (fromProxyProtocolSource) {
- tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
- tc->data.resize(tc->proxyProtocolNeed);
- tc->state = TCPConnection::PROXYPROTOCOLHEADER;
- }
- else {
- tc->state = TCPConnection::BYTE0;
- }
-
- struct timeval ttd;
- Utility::gettimeofday(&ttd, nullptr);
- ttd.tv_sec += g_tcpTimeout;
-
- t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
- }
-}
static string* doProcessUDPQuestion(const std::string& question, const ComboAddress& fromaddr, const ComboAddress& destaddr, ComboAddress source, ComboAddress destination, struct timeval tv, int fd, std::vector<ProxyProtocolValue>& proxyProtocolValues, RecEventTrace& eventTrace)
{
}
}
-static void checkFastOpenSysctl(bool active)
-{
-#ifdef __linux__
- string line;
- if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
- int flag = std::stoi(line);
- if (active && !(flag & 1)) {
- g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
- }
- if (!active && !(flag & 2)) {
- g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
- }
- }
- else {
- g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
- }
-#else
- g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
-#endif
-}
-
-static void checkTFOconnect()
-{
- try {
- Socket s(AF_INET, SOCK_STREAM);
- s.setNonBlocking();
- s.setFastOpenConnect();
- }
- catch (const NetworkError& e) {
- g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl;
- }
-}
-
-static void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
-{
- int fd;
- vector<string>locals;
- stringtok(locals,::arg()["local-address"]," ,");
-
- if(locals.empty())
- throw PDNSException("No local address specified");
-
- for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
- ServiceTuple st;
- st.port=::arg().asNum("local-port");
- parseService(*i, st);
-
- ComboAddress sin;
-
- sin.reset();
- sin.sin4.sin_family = AF_INET;
- if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
- sin.sin6.sin6_family = AF_INET6;
- if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
- throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
- }
-
- fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
- if(fd<0)
- throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
-
- setCloseOnExec(fd);
-
- int tmp=1;
- if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
- g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
- exit(1);
- }
- if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
- int err = errno;
- g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(err)<<endl;
- }
-
-#ifdef TCP_DEFER_ACCEPT
- if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
- if(i==locals.begin())
- g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
- }
-#endif
-
- if( ::arg().mustDo("non-local-bind") )
- Utility::setBindAny(AF_INET, fd);
-
- if (g_reusePort) {
-#if defined(SO_REUSEPORT_LB)
- try {
- SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
- }
- catch (const std::exception& e) {
- throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
- }
-#elif defined(SO_REUSEPORT)
- try {
- SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
- }
- catch (const std::exception& e) {
- throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
- }
-#endif
- }
-
- if (SyncRes::s_tcp_fast_open > 0) {
- checkFastOpenSysctl(false);
-#ifdef TCP_FASTOPEN
- if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
- int err = errno;
- g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(err)<<endl;
- }
-#else
- g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
-#endif
- }
-
- sin.sin4.sin_port = htons(st.port);
- socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
- if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
- throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
-
- setNonBlocking(fd);
- try {
- setSocketSendBuffer(fd, 65000);
- }
- catch (const std::exception& e) {
- g_log<<Logger::Error<<e.what()<<endl;
- }
-
- listen(fd, 128);
- deferredAdds.emplace_back(fd, handleNewTCPQuestion);
- tcpSockets.insert(fd);
-
- // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
- // - fd is not that which we know here, but returned from accept()
- if(sin.sin4.sin_family == AF_INET)
- g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
- else
- g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
- }
-}
-
static void makeUDPServerSockets(deferredAdd_t& deferredAdds)
{
int one=1;
}
}
-static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
-{
- TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
-
- pid->lowState = newstate;
-
- // handle state transitions
- switch (oldstate) {
- case IOState::NeedRead:
-
- switch (newstate) {
- case IOState::NeedWrite:
- TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
- t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
- break;
- case IOState::NeedRead:
- break;
- case IOState::Done:
- TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
- t_fdm->removeReadFD(pid->tcpsock);
- break;
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
- break;
-
- case IOState::NeedWrite:
-
- switch (newstate) {
- case IOState::NeedRead:
- TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
- t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
- break;
- case IOState::NeedWrite:
- break;
- case IOState::Done:
- TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
- t_fdm->removeWriteFD(pid->tcpsock);
- break;
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
- break;
-
- case IOState::Done:
- switch (newstate) {
- case IOState::NeedRead:
- TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
- t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
- break;
- case IOState::NeedWrite:
- TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
- t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
- break;
- case IOState::Done:
- break;
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
- break;
-
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
-
-}
-
-static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var)
-{
- std::shared_ptr<PacketID> pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
- assert(pid->tcphandler);
- assert(fd == pid->tcphandler->getDescriptor());
- IOState newstate = IOState::Done;
-
- TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
-
- // In the code below, we want to update the state of the fd before calling sendEvent
- // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
-
- switch (pid->highState) {
- case TCPAction::DoingRead:
- TCPLOG(pid->tcpsock, "highState: Reading" << endl);
- // In arecvtcp, the buffer was resized already so inWanted bytes will fit
- // try reading
- try {
- newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
- switch (newstate) {
- case IOState::Done:
- case IOState::NeedRead:
- TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
- TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
- if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
- pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
- newstate = IOState::Done;
- TCPIOHandlerStateChange(pid->lowState, newstate, pid);
- MT->sendEvent(pid, &pid->inMSG);
- return;
- }
- break;
- case IOState::NeedWrite:
- break;
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
- }
- catch (const std::exception& e) {
- newstate = IOState::Done;
- TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
- PacketBuffer empty;
- TCPIOHandlerStateChange(pid->lowState, newstate, pid);
- MT->sendEvent(pid, &empty); // this conveys error status
- return;
- }
- break;
-
- case TCPAction::DoingWrite:
- TCPLOG(pid->tcpsock, "highState: Writing" << endl);
- try {
- TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
- newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
- TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
- switch (newstate) {
- case IOState::Done: {
- TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
- TCPIOHandlerStateChange(pid->lowState, newstate, pid);
- MT->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
- return;
- }
- case IOState::NeedRead:
- TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
- break;
- case IOState::NeedWrite:
- TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
- break;
- case IOState::Async:
- throw std::runtime_error("TLS async mode not supported");
- break;
- }
- }
- catch (const std::exception& e) {
- newstate = IOState::Done;
- TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
- PacketBuffer sent;
- TCPIOHandlerStateChange(pid->lowState, newstate, pid);
- MT->sendEvent(pid, &sent); // we convey error status by sending empty string
- return;
- }
- break;
- }
-
- // Cases that did not end up doing a sendEvent
- TCPIOHandlerStateChange(pid->lowState, newstate, pid);
-}
// resend event to everybody chained onto it
static void doResends(MT_t::waiters_t::iterator& iter, const std::shared_ptr<PacketID>& resend, const PacketBuffer& content)
#include "namespaces.hh"
#include "rec-taskqueue.hh"
#include "rec-tcpout.hh"
+#include "rec-main.hh"
std::pair<std::string, std::string> PrefixDashNumberCompare::prefixAndTrailingNum(const std::string& a)
{
rcpgenerator.cc rcpgenerator.hh \
rec-carbon.cc \
rec-eventtrace.cc rec-eventtrace.hh \
+ ec-main.hh rec-main.cc \
rec-lua-conf.hh rec-lua-conf.cc \
rec-protozero.cc rec-protozero.hh \
rec-snmp.hh rec-snmp.cc \
rec-taskqueue.cc rec-taskqueue.hh \
+ rec-tcp.cc \
rec-tcpout.cc rec-tcpout.hh \
rec-zonetocache.cc rec-zonetocache.hh \
rec_channel.cc rec_channel.hh rec_metrics.hh \
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "rec-main.hh"
+
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "logger.hh"
+#include "lua-recursor4.hh"
+#include "mplexer.hh"
+#include "namespaces.hh"
+#include "rec-lua-conf.hh"
+#include "rec-protozero.hh"
+#include "syncres.hh"
+
+
+//! used to send information to a newborn mthread
+struct DNSComboWriter {
+ DNSComboWriter(const std::string& query, const struct timeval& now): d_mdp(true, query), d_now(now), d_query(query)
+ {
+ }
+
+ DNSComboWriter(const std::string& query, const struct timeval& now, std::unordered_set<std::string>&& policyTags, LuaContext::LuaObject&& data, std::vector<DNSRecord>&& records): d_mdp(true, query), d_now(now), d_query(query), d_policyTags(std::move(policyTags)), d_records(std::move(records)), d_data(std::move(data))
+ {
+ }
+
+ void setRemote(const ComboAddress& sa)
+ {
+ d_remote=sa;
+ }
+
+ void setSource(const ComboAddress& sa)
+ {
+ d_source=sa;
+ }
+
+ void setLocal(const ComboAddress& sa)
+ {
+ d_local=sa;
+ }
+
+ void setDestination(const ComboAddress& sa)
+ {
+ d_destination=sa;
+ }
+
+ void setSocket(int sock)
+ {
+ d_socket=sock;
+ }
+
+ string getRemote() const
+ {
+ if (d_source == d_remote) {
+ return d_source.toStringWithPort();
+ }
+ return d_source.toStringWithPort() + " (proxied by " + d_remote.toStringWithPort() + ")";
+ }
+
+ std::vector<ProxyProtocolValue> d_proxyProtocolValues;
+ MOADNSParser d_mdp;
+ struct timeval d_now;
+ /* Remote client, might differ from d_source
+ in case of XPF, in which case d_source holds
+ the IP of the client and d_remote of the proxy
+ */
+ ComboAddress d_remote;
+ ComboAddress d_source;
+ /* Destination address, might differ from
+ d_destination in case of XPF, in which case
+ d_destination holds the IP of the proxy and
+ d_local holds our own. */
+ ComboAddress d_local;
+ ComboAddress d_destination;
+ RecEventTrace d_eventTrace;
+ boost::uuids::uuid d_uuid;
+ string d_requestorId;
+ string d_deviceId;
+ string d_deviceName;
+ struct timeval d_kernelTimestamp{0,0};
+ std::string d_query;
+ std::unordered_set<std::string> d_policyTags;
+ std::string d_routingTag;
+ std::vector<DNSRecord> d_records;
+ LuaContext::LuaObject d_data;
+ EDNSSubnetOpts d_ednssubnet;
+ shared_ptr<TCPConnection> d_tcpConnection;
+ boost::optional<uint16_t> d_extendedErrorCode{boost::none};
+ string d_extendedErrorExtra;
+ boost::optional<int> d_rcode{boost::none};
+ int d_socket{-1};
+ unsigned int d_tag{0};
+ uint32_t d_qhash{0};
+ uint32_t d_ttlCap{std::numeric_limits<uint32_t>::max()};
+ bool d_variable{false};
+ bool d_ecsFound{false};
+ bool d_ecsParsed{false};
+ bool d_followCNAMERecords{false};
+ bool d_logResponse{false};
+ bool d_tcp{false};
+ bool d_responsePaddingDisabled{false};
+ std::map<std::string, RecursorLua4::MetaValue> d_meta;
+};
+
+
+typedef MTasker<std::shared_ptr<PacketID>, PacketBuffer, PacketIDCompare> MT_t;
+extern thread_local std::unique_ptr<MT_t> MT; // the big MTasker
+
+extern thread_local FDMultiplexer* t_fdm;
+extern bool g_logCommonErrors;
+extern size_t g_proxyProtocolMaximumSize;
+extern std::atomic<bool> g_quiet;
+extern NetmaskGroup g_XPFAcl;
+extern thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_protobufServers;
+extern thread_local std::shared_ptr<RecursorLua4> t_pdl;
+extern bool g_gettagNeedsEDNSOptions;
+extern NetmaskGroup g_paddingFrom;
+extern unsigned int g_paddingTag;
+extern thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> t_outgoingProtobufServers;
+extern unsigned int g_maxMThreads;
+extern bool g_reusePort;
+extern bool g_anyToTcp;
+extern size_t g_tcpMaxQueriesPerConn;
+extern unsigned int g_maxTCPPerClient;
+extern int g_tcpTimeout;
+
+typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
+extern thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
+
+typedef vector<pair<int, boost::function< void(int, boost::any&) > > > deferredAdd_t;
+
+inline MT_t* getMT()
+{
+ return MT ? MT.get() : nullptr;
+}
+
+extern thread_local unsigned int t_id;
+
+inline unsigned int getRecursorThreadId()
+{
+ return t_id;
+}
+
+/* this function is called with both a string and a vector<uint8_t> representing a packet */
+template <class T>
+static bool sendResponseOverTCP(const std::unique_ptr<DNSComboWriter>& dc, const T& packet)
+{
+ uint8_t buf[2];
+ buf[0] = packet.size() / 256;
+ buf[1] = packet.size() % 256;
+
+ Utility::iovec iov[2];
+ iov[0].iov_base = (void*)buf; iov[0].iov_len = 2;
+ iov[1].iov_base = (void*)&*packet.begin(); iov[1].iov_len = packet.size();
+
+ int wret = Utility::writev(dc->d_socket, iov, 2);
+ bool hadError = true;
+
+ if (wret == 0) {
+ g_log<<Logger::Warning<<"EOF writing TCP answer to "<<dc->getRemote()<<endl;
+ } else if (wret < 0 ) {
+ int err = errno;
+ g_log << Logger::Warning << "Error writing TCP answer to " << dc->getRemote() << ": " << strerror(err) << endl;
+ } else if ((unsigned int)wret != 2 + packet.size()) {
+ g_log<<Logger::Warning<<"Oops, partial answer sent to "<<dc->getRemote()<<" for "<<dc->d_mdp.d_qname<<" (size="<< (2 + packet.size()) <<", sent "<<wret<<")"<<endl;
+ } else {
+ hadError = false;
+ }
+
+ return hadError;
+}
+
+PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
+bool checkProtobufExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
+bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal);
+void getQNameAndSubnet(const std::string& question, DNSName* dnsname, uint16_t* qtype, uint16_t* qclass,
+ bool& foundECS, EDNSSubnetOpts* ednssubnet, EDNSOptionViewMap* options,
+ bool& foundXPF, ComboAddress* xpfSource, ComboAddress* xpfDest);
+void protobufLogQuery(LocalStateHolder<LuaConfigItems>& luaconfsLocal, const boost::uuids::uuid& uniqueId, const ComboAddress& remote, const ComboAddress& local, const Netmask& ednssubnet, bool tcp, uint16_t id, size_t len, const DNSName& qname, uint16_t qtype, uint16_t qclass, const std::unordered_set<std::string>& policyTags, const std::string& requestorId, const std::string& deviceId, const std::string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta);
+bool isAllowNotifyForZone(DNSName qname);
+bool checkForCacheHit(bool qnameParsed, unsigned int tag, const string& data,
+ DNSName& qname, uint16_t& qtype, uint16_t& qclass,
+ const struct timeval& now,
+ string& response, uint32_t& qhash,
+ RecursorPacketCache::OptPBData& pbData, bool tcp, const ComboAddress& source);
+void protobufLogResponse(pdns::ProtoZero::RecMessage& message);
+void protobufLogResponse(const struct dnsheader* dh, LocalStateHolder<LuaConfigItems>& luaconfsLocal,
+ const RecursorPacketCache::OptPBData& pbData, const struct timeval& tv,
+ bool tcp, const ComboAddress& source, const ComboAddress& destination,
+ const EDNSSubnetOpts& ednssubnet,
+ const boost::uuids::uuid& uniqueId, const string& requestorId, const string& deviceId,
+ const string& deviceName, const std::map<std::string, RecursorLua4::MetaValue>& meta,
+ const RecEventTrace& eventTrace);
+void requestWipeCaches(const DNSName& canon);
+void startDoResolve(void *p);
+bool expectProxyProtocol(const ComboAddress& from);
+void finishTCPReply(std::unique_ptr<DNSComboWriter>& dc, bool hadError, bool updateInFlight);
+void checkFastOpenSysctl(bool active);
+void checkTFOconnect();
+void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets);
+void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& );
--- /dev/null
+/*
+ * This file is part of PowerDNS or dnsdist.
+ * Copyright -- PowerDNS.COM B.V. and its contributors
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In addition, for the avoidance of any doubt, permission is granted to
+ * link this program with OpenSSL and to (re)distribute the binaries
+ * produced as the result of such linking.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include "rec-main.hh"
+
+#include "arguments.hh"
+#include "logger.hh"
+#include "mplexer.hh"
+#include "uuid-utils.hh"
+
+size_t g_tcpMaxQueriesPerConn;
+unsigned int g_maxTCPPerClient;
+int g_tcpTimeout;
+bool g_anyToTcp;
+
+uint16_t TCPConnection::s_maxInFlight;
+std::atomic<uint32_t> TCPConnection::s_currentConnections;
+
+typedef map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan> tcpClientCounts_t;
+thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
+
+static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var);
+
+#if 0
+#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0)
+#else
+#define TCPLOG(pid, x)
+#endif
+
+TCPConnection::TCPConnection(int fd, const ComboAddress& addr) : data(2, 0), d_remote(addr), d_fd(fd)
+{
+ ++s_currentConnections;
+ (*t_tcpClientCounts)[d_remote]++;
+}
+
+TCPConnection::~TCPConnection()
+{
+ try {
+ if(closesocket(d_fd) < 0)
+ g_log<<Logger::Error<<"Error closing socket for TCPConnection"<<endl;
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error closing TCPConnection socket: "<<e.reason<<endl;
+ }
+
+ if(t_tcpClientCounts->count(d_remote) && !(*t_tcpClientCounts)[d_remote]--)
+ t_tcpClientCounts->erase(d_remote);
+ --s_currentConnections;
+}
+
+
+static void terminateTCPConnection(int fd)
+{
+ try {
+ t_fdm->removeReadFD(fd);
+ }
+ catch (const FDMultiplexerException& fde)
+ {
+ }
+}
+
+static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& dc, int rcode)
+{
+ std::vector<uint8_t> packet;
+ if (dc->d_mdp.d_header.qdcount == 0) {
+ /* header-only */
+ packet.resize(sizeof(dnsheader));
+ }
+ else {
+ DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
+ if (dc->d_mdp.hasEDNS()) {
+ /* we try to add the EDNS OPT RR even for truncated answers,
+ as rfc6891 states:
+ "The minimal response MUST be the DNS header, question section, and an
+ OPT record. This MUST also occur when a truncated response (using
+ the DNS header's TC bit) is returned."
+ */
+ pw.addOpt(512, 0, 0);
+ pw.commit();
+ }
+ }
+
+ dnsheader& header = reinterpret_cast<dnsheader&>(packet.at(0));
+ header.aa = 0;
+ header.ra = 1;
+ header.qr = 1;
+ header.tc = 0;
+ header.id = dc->d_mdp.d_header.id;
+ header.rd = dc->d_mdp.d_header.rd;
+ header.cd = dc->d_mdp.d_header.cd;
+ header.rcode = rcode;
+
+ sendResponseOverTCP(dc, packet);
+}
+
+void finishTCPReply(std::unique_ptr<DNSComboWriter>& dc, bool hadError, bool updateInFlight)
+{
+ // update tcp connection status, closing if needed and doing the fd multiplexer accounting
+ if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight > 0) {
+ dc->d_tcpConnection->d_requestsInFlight--;
+ }
+
+ // In the code below, we try to remove the fd from the set, but
+ // we don't know if another mthread already did the remove, so we can get a
+ // "Tried to remove unlisted fd" exception. Not that an inflight < limit test
+ // will not work since we do not know if the other mthread got an error or not.
+ if (hadError) {
+ terminateTCPConnection(dc->d_socket);
+ dc->d_socket = -1;
+ return;
+ }
+ dc->d_tcpConnection->queriesCount++;
+ if ((g_tcpMaxQueriesPerConn && dc->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) ||
+ (dc->d_tcpConnection->isDropOnIdle() && dc->d_tcpConnection->d_requestsInFlight == 0)) {
+ try {
+ t_fdm->removeReadFD(dc->d_socket);
+ }
+ catch (FDMultiplexerException &) {
+ }
+ dc->d_socket = -1;
+ return;
+ }
+
+ Utility::gettimeofday(&g_now, nullptr); // needs to be updated
+ struct timeval ttd = g_now;
+
+ // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
+ if (updateInFlight && dc->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
+ // A read error might have happened. If we add the fd back, it will most likely error again.
+ // This is not a big issue, the next handleTCPClientReadable() will see another read error
+ // and take action.
+ ttd.tv_sec += g_tcpTimeout;
+ t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
+ return;
+ }
+ // fd might have been removed by read error code, or a read timeout, so expect an exception
+ try {
+ t_fdm->setReadTTD(dc->d_socket, ttd, g_tcpTimeout);
+ }
+ catch (const FDMultiplexerException &) {
+ // but if the FD was removed because of a timeout while we were sending a response,
+ // we need to re-arm it. If it was an error it will error again.
+ ttd.tv_sec += g_tcpTimeout;
+ t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
+ }
+}
+
+/*
+ * A helper class that by default closes the incoming TCP connection on destruct
+ * If you want to keep the connection alive, call keep() on the guard object
+ */
+class RunningTCPQuestionGuard {
+public:
+ RunningTCPQuestionGuard(int fd)
+ {
+ d_fd = fd;
+ }
+ ~RunningTCPQuestionGuard()
+ {
+ if (d_fd != -1) {
+ terminateTCPConnection(d_fd);
+ d_fd = -1;
+ }
+ }
+ void keep()
+ {
+ d_fd = -1;
+ }
+ bool handleTCPReadResult(int fd, ssize_t bytes)
+ {
+ if (bytes == 0) {
+ /* EOF */
+ return false;
+ }
+ else if (bytes < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ return false;
+ }
+ }
+ keep();
+ return true;
+ }
+
+private:
+ int d_fd{-1};
+};
+
+static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
+{
+ shared_ptr<TCPConnection> conn=boost::any_cast<shared_ptr<TCPConnection> >(var);
+
+ RunningTCPQuestionGuard tcpGuard{fd};
+
+ if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
+ ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
+ if (bytes <= 0) {
+ tcpGuard.handleTCPReadResult(fd, bytes);
+ return;
+ }
+
+ conn->proxyProtocolGot += bytes;
+ conn->data.resize(conn->proxyProtocolGot);
+ ssize_t remaining = isProxyHeaderComplete(conn->data);
+ if (remaining == 0) {
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Unable to consume proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
+ }
+ ++g_stats.proxyProtocolInvalidCount;
+ return;
+ }
+ else if (remaining < 0) {
+ conn->proxyProtocolNeed = -remaining;
+ conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
+ tcpGuard.keep();
+ return;
+ }
+ else {
+ /* proxy header received */
+ /* we ignore the TCP field for now, but we could properly set whether
+ the connection was received over UDP or TCP if needed */
+ bool tcp;
+ bool proxy = false;
+ size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
+ if (used <= 0) {
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Unable to parse proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
+ }
+ ++g_stats.proxyProtocolInvalidCount;
+ return;
+ }
+ else if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Proxy protocol header in packet from TCP client "<< conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping"<< endl;
+ }
+ ++g_stats.proxyProtocolInvalidCount;
+ return;
+ }
+
+ /* Now that we have retrieved the address of the client, as advertised by the proxy
+ via the proxy protocol header, check that it is allowed by our ACL */
+ /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
+ if (t_allowFrom && !t_allowFrom->match(&conn->d_source)) {
+ if (!g_quiet) {
+ g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<conn->d_source.toString()<<", address not matched by allow-from"<<endl;
+ }
+
+ ++g_stats.unauthorizedTCP;
+ return;
+ }
+
+ conn->data.resize(2);
+ conn->state = TCPConnection::BYTE0;
+ }
+ }
+
+ if (conn->state==TCPConnection::BYTE0) {
+ ssize_t bytes=recv(conn->getFD(), &conn->data[0], 2, 0);
+ if(bytes==1)
+ conn->state=TCPConnection::BYTE1;
+ if(bytes==2) {
+ conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
+ conn->data.resize(conn->qlen);
+ conn->bytesread=0;
+ conn->state=TCPConnection::GETQUESTION;
+ }
+ if (bytes <= 0) {
+ tcpGuard.handleTCPReadResult(fd, bytes);
+ return;
+ }
+ }
+
+ if (conn->state==TCPConnection::BYTE1) {
+ ssize_t bytes=recv(conn->getFD(), &conn->data[1], 1, 0);
+ if(bytes==1) {
+ conn->state=TCPConnection::GETQUESTION;
+ conn->qlen=(((unsigned char)conn->data[0]) << 8)+ (unsigned char)conn->data[1];
+ conn->data.resize(conn->qlen);
+ conn->bytesread=0;
+ }
+ if (bytes <= 0) {
+ if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
+ if(g_logCommonErrors) {
+ g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected after first byte"<<endl;
+ }
+ }
+ return;
+ }
+ }
+
+ if(conn->state==TCPConnection::GETQUESTION) {
+ ssize_t bytes=recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
+ if (bytes <= 0) {
+ if (!tcpGuard.handleTCPReadResult(fd, bytes)) {
+ if(g_logCommonErrors) {
+ g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" disconnected while reading question body"<<endl;
+ }
+ }
+ return;
+ }
+ else if (bytes > std::numeric_limits<std::uint16_t>::max()) {
+ if(g_logCommonErrors) {
+ g_log<<Logger::Error<<"TCP client "<< conn->d_remote.toStringWithPort() <<" sent an invalid question size while reading question body"<<endl;
+ }
+ return;
+ }
+ conn->bytesread+=(uint16_t)bytes;
+ if(conn->bytesread==conn->qlen) {
+ conn->state = TCPConnection::BYTE0;
+ std::unique_ptr<DNSComboWriter> dc;
+ try {
+ dc = std::make_unique<DNSComboWriter>(conn->data, g_now);
+ }
+ catch(const MOADNSException &mde) {
+ g_stats.clientParseError++;
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Unable to parse packet from TCP client "<< conn->d_remote.toStringWithPort() <<endl;
+ }
+ return;
+ }
+ dc->d_tcpConnection = conn; // carry the torch
+ dc->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
+ dc->d_tcp=true;
+ dc->setRemote(conn->d_remote);
+ dc->setSource(conn->d_source);
+ ComboAddress dest;
+ dest.reset();
+ dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
+ socklen_t len = dest.getSocklen();
+ getsockname(conn->getFD(), (sockaddr*)&dest, &len); // if this fails, we're ok with it
+ dc->setLocal(dest);
+ dc->setDestination(conn->d_destination);
+ /* we can't move this if we want to be able to access the values in
+ all queries sent over this connection */
+ dc->d_proxyProtocolValues = conn->proxyProtocolValues;
+
+ struct timeval start;
+ Utility::gettimeofday(&start, nullptr);
+
+ DNSName qname;
+ uint16_t qtype=0;
+ uint16_t qclass=0;
+ bool needECS = false;
+ bool needXPF = g_XPFAcl.match(conn->d_remote);
+ string requestorId;
+ string deviceId;
+ string deviceName;
+ bool logQuery = false;
+ bool qnameParsed = false;
+
+ dc->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled);
+ dc->d_eventTrace.add(RecEventTrace::ReqRecv);
+ auto luaconfsLocal = g_luaconfs.getLocal();
+ if (checkProtobufExport(luaconfsLocal)) {
+ needECS = true;
+ }
+ logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+ dc->d_logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
+
+ if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag)) || dc->d_mdp.d_header.opcode == Opcode::Notify) {
+
+ try {
+ EDNSOptionViewMap ednsOptions;
+ bool xpfFound = false;
+ dc->d_ecsParsed = true;
+ dc->d_ecsFound = false;
+ getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
+ dc->d_ecsFound, &dc->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr,
+ xpfFound, needXPF ? &dc->d_source : nullptr, needXPF ? &dc->d_destination : nullptr);
+ qnameParsed = true;
+
+ if(t_pdl) {
+ try {
+ if (t_pdl->d_gettag_ffi) {
+ RecursorLua4::FFIParams params(qname, qtype, dc->d_destination, dc->d_source, dc->d_ednssubnet.source, dc->d_data, dc->d_policyTags, dc->d_records, ednsOptions, dc->d_proxyProtocolValues, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_rcode, dc->d_ttlCap, dc->d_variable, true, logQuery, dc->d_logResponse, dc->d_followCNAMERecords, dc->d_extendedErrorCode, dc->d_extendedErrorExtra, dc->d_responsePaddingDisabled, dc->d_meta);
+ dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
+ dc->d_tag = t_pdl->gettag_ffi(params);
+ dc->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, dc->d_tag, false);
+ }
+ else if (t_pdl->d_gettag) {
+ dc->d_eventTrace.add(RecEventTrace::LuaGetTag);
+ dc->d_tag = t_pdl->gettag(dc->d_source, dc->d_ednssubnet.source, dc->d_destination, qname, qtype, &dc->d_policyTags, dc->d_data, ednsOptions, true, requestorId, deviceId, deviceName, dc->d_routingTag, dc->d_proxyProtocolValues);
+ dc->d_eventTrace.add(RecEventTrace::LuaGetTag, dc->d_tag, false);
+ }
+ }
+ catch(const std::exception& e) {
+ if(g_logCommonErrors) {
+ g_log<<Logger::Warning<<"Error parsing a query packet qname='"<<qname<<"' for tag determination, setting tag=0: "<<e.what()<<endl;
+ }
+ }
+ }
+ }
+ catch(const std::exception& e)
+ {
+ if (g_logCommonErrors) {
+ g_log<<Logger::Warning<<"Error parsing a query packet for tag determination, setting tag=0: "<<e.what()<<endl;
+ }
+ }
+ }
+
+ if (dc->d_tag == 0 && !dc->d_responsePaddingDisabled && g_paddingFrom.match(dc->d_remote)) {
+ dc->d_tag = g_paddingTag;
+ }
+
+ const struct dnsheader* dh = reinterpret_cast<const struct dnsheader*>(&conn->data[0]);
+
+ if (t_protobufServers || t_outgoingProtobufServers) {
+ dc->d_requestorId = requestorId;
+ dc->d_deviceId = deviceId;
+ dc->d_deviceName = deviceName;
+ dc->d_uuid = getUniqueID();
+ }
+
+ if(t_protobufServers) {
+ try {
+
+ if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && dc->d_policyTags.empty())) {
+ protobufLogQuery(luaconfsLocal, dc->d_uuid, dc->d_source, dc->d_destination, dc->d_ednssubnet.source, true, dh->id, conn->qlen, qname, qtype, qclass, dc->d_policyTags, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta);
+ }
+ }
+ catch (const std::exception& e) {
+ if (g_logCommonErrors) {
+ g_log<<Logger::Warning<<"Error parsing a TCP query packet for edns subnet: "<<e.what()<<endl;
+ }
+ }
+ }
+
+ if (t_pdl) {
+ bool ipf = t_pdl->ipfilter(dc->d_source, dc->d_destination, *dh, dc->d_eventTrace);
+ if (ipf) {
+ if (!g_quiet) {
+ g_log<<Logger::Notice<<t_id<<" ["<<MT->getTid()<<"/"<<MT->numProcesses()<<"] DROPPED TCP question from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<" based on policy"<<endl;
+ }
+ g_stats.policyDrops++;
+ return;
+ }
+ }
+
+ if (dc->d_mdp.d_header.qr) {
+ g_stats.ignoredCount++;
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Ignoring answer from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
+ }
+ return;
+ }
+ if (dc->d_mdp.d_header.opcode != Opcode::Query && dc->d_mdp.d_header.opcode != Opcode::Notify) {
+ g_stats.ignoredCount++;
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Ignoring unsupported opcode "<<Opcode::to_s(dc->d_mdp.d_header.opcode)<<" from TCP client "<< dc->getRemote() <<" on server socket!"<<endl;
+ }
+ sendErrorOverTCP(dc, RCode::NotImp);
+ tcpGuard.keep();
+ return;
+ }
+ else if (dh->qdcount == 0) {
+ g_stats.emptyQueriesCount++;
+ if (g_logCommonErrors) {
+ g_log<<Logger::Error<<"Ignoring empty (qdcount == 0) query from "<< dc->getRemote() <<" on server socket!"<<endl;
+ }
+ sendErrorOverTCP(dc, RCode::NotImp);
+ tcpGuard.keep();
+ return;
+ }
+ else {
+ // We have read a proper query
+ ++g_stats.qcounter;
+ ++g_stats.tcpqcounter;
+
+ if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
+ if(!t_allowNotifyFrom || !t_allowNotifyFrom->match(dc->d_source)) {
+ if(!g_quiet) {
+ g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", address not matched by allow-notify-from"<<endl;
+ }
+
+ g_stats.sourceDisallowedNotify++;
+ return;
+ }
+
+ if(!isAllowNotifyForZone(qname)) {
+ if(!g_quiet) {
+ g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP NOTIFY from "<<dc->d_source.toString()<<", for "<<qname.toLogString()<<", zone not matched by allow-notify-for"<<endl;
+ }
+
+ g_stats.zoneDisallowedNotify++;
+ return;
+ }
+ }
+
+ string response;
+ RecursorPacketCache::OptPBData pbData{boost::none};
+
+ if(dc->d_mdp.d_header.opcode == Opcode::Query) {
+ /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
+ but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
+ as cacheable we would cache it with a wrong tag, so better safe than sorry. */
+ dc->d_eventTrace.add(RecEventTrace::PCacheCheck);
+ bool cacheHit = checkForCacheHit(qnameParsed, dc->d_tag, conn->data, qname, qtype, qclass, g_now, response, dc->d_qhash, pbData, true, dc->d_source);
+ dc->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
+
+ if (cacheHit) {
+ if (!g_quiet) {
+ g_log<<Logger::Notice<<t_id<< " TCP question answered from packet cache tag="<<dc->d_tag<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
+ }
+
+ bool hadError = sendResponseOverTCP(dc, response);
+ finishTCPReply(dc, hadError, false);
+ struct timeval now;
+ Utility::gettimeofday(&now, nullptr);
+ uint64_t spentUsec = uSec(now - start);
+ g_stats.cumulativeAnswers(spentUsec);
+ dc->d_eventTrace.add(RecEventTrace::AnswerSent);
+
+ if (t_protobufServers && dc->d_logResponse && !(luaconfsLocal->protobufExportConfig.taggedOnly && pbData && !pbData->d_tagged)) {
+ struct timeval tv{0, 0};
+ protobufLogResponse(dh, luaconfsLocal, pbData, tv, true, dc->d_source, dc->d_destination, dc->d_ednssubnet, dc->d_uuid, dc->d_requestorId, dc->d_deviceId, dc->d_deviceName, dc->d_meta, dc->d_eventTrace);
+ }
+
+ if (dc->d_eventTrace.enabled() && SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) {
+ g_log << Logger::Info << dc->d_eventTrace.toString() << endl;
+ }
+ tcpGuard.keep();
+ return;
+ } // cache hit
+ } // query opcode
+
+ if(dc->d_mdp.d_header.opcode == Opcode::Notify) {
+ if (!g_quiet) {
+ g_log<<Logger::Notice<<t_id<< " got NOTIFY for "<<qname.toLogString()<<" from "<<dc->d_source.toStringWithPort()<<(dc->d_source != dc->d_remote ? " (via "+dc->d_remote.toStringWithPort()+")" : "")<<endl;
+ }
+
+ requestWipeCaches(qname);
+
+ // the operation will now be treated as a Query, generating
+ // a normal response, as the rest of the code does not
+ // check dh->opcode, but we need to ensure that the response
+ // to this request does not get put into the packet cache
+ dc->d_variable = true;
+ }
+
+ // setup for startDoResolve() in an mthread
+ ++conn->d_requestsInFlight;
+ if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
+ t_fdm->removeReadFD(fd); // should no longer awake ourselves when there is data to read
+ } else {
+ Utility::gettimeofday(&g_now, nullptr); // needed?
+ struct timeval ttd = g_now;
+ t_fdm->setReadTTD(fd, ttd, g_tcpTimeout);
+ }
+ tcpGuard.keep();
+ MT->makeThread(startDoResolve, dc.release()); // deletes dc
+ } // good query
+ } // read full query
+ } // reading query
+
+ // more to come
+ tcpGuard.keep();
+}
+
+//! Handle new incoming TCP connection
+void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
+{
+ ComboAddress addr;
+ socklen_t addrlen=sizeof(addr);
+ int newsock=accept(fd, (struct sockaddr*)&addr, &addrlen);
+ if(newsock>=0) {
+ if(MT->numProcesses() > g_maxMThreads) {
+ g_stats.overCapacityDrops++;
+ try {
+ closesocket(newsock);
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error closing TCP socket after an over capacity drop: "<<e.reason<<endl;
+ }
+ return;
+ }
+
+ if(t_remotes) {
+ t_remotes->push_back(addr);
+ }
+
+ bool fromProxyProtocolSource = expectProxyProtocol(addr);
+ if(t_allowFrom && !t_allowFrom->match(&addr) && !fromProxyProtocolSource) {
+ if(!g_quiet)
+ g_log<<Logger::Error<<"["<<MT->getTid()<<"] dropping TCP query from "<<addr.toString()<<", address neither matched by allow-from nor proxy-protocol-from"<<endl;
+
+ g_stats.unauthorizedTCP++;
+ try {
+ closesocket(newsock);
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error closing TCP socket after an ACL drop: "<<e.reason<<endl;
+ }
+ return;
+ }
+
+ if(g_maxTCPPerClient && t_tcpClientCounts->count(addr) && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
+ g_stats.tcpClientOverflow++;
+ try {
+ closesocket(newsock); // don't call TCPConnection::closeAndCleanup here - did not enter it in the counts yet!
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error closing TCP socket after an overflow drop: "<<e.reason<<endl;
+ }
+ return;
+ }
+
+ setNonBlocking(newsock);
+ std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
+ tc->d_source = addr;
+ tc->d_destination.reset();
+ tc->d_destination.sin4.sin_family = addr.sin4.sin_family;
+ socklen_t len = tc->d_destination.getSocklen();
+ getsockname(tc->getFD(), reinterpret_cast<sockaddr*>(&tc->d_destination), &len); // if this fails, we're ok with it
+
+ if (fromProxyProtocolSource) {
+ tc->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
+ tc->data.resize(tc->proxyProtocolNeed);
+ tc->state = TCPConnection::PROXYPROTOCOLHEADER;
+ }
+ else {
+ tc->state = TCPConnection::BYTE0;
+ }
+
+ struct timeval ttd;
+ Utility::gettimeofday(&ttd, nullptr);
+ ttd.tv_sec += g_tcpTimeout;
+
+ t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
+ }
+}
+
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
+
+static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
+{
+ TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
+
+ pid->lowState = newstate;
+
+ // handle state transitions
+ switch (oldstate) {
+ case IOState::NeedRead:
+
+ switch (newstate) {
+ case IOState::NeedWrite:
+ TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
+ t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::NeedRead:
+ break;
+ case IOState::Done:
+ TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
+ t_fdm->removeReadFD(pid->tcpsock);
+ break;
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+ break;
+
+ case IOState::NeedWrite:
+
+ switch (newstate) {
+ case IOState::NeedRead:
+ TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
+ t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::NeedWrite:
+ break;
+ case IOState::Done:
+ TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
+ t_fdm->removeWriteFD(pid->tcpsock);
+ break;
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+ break;
+
+ case IOState::Done:
+ switch (newstate) {
+ case IOState::NeedRead:
+ TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
+ t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::NeedWrite:
+ TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
+ t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::Done:
+ break;
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+ break;
+
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+
+}
+
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var)
+{
+ std::shared_ptr<PacketID> pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
+ assert(pid->tcphandler);
+ assert(fd == pid->tcphandler->getDescriptor());
+ IOState newstate = IOState::Done;
+
+ TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
+
+ // In the code below, we want to update the state of the fd before calling sendEvent
+ // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
+
+ switch (pid->highState) {
+ case TCPAction::DoingRead:
+ TCPLOG(pid->tcpsock, "highState: Reading" << endl);
+ // In arecvtcp, the buffer was resized already so inWanted bytes will fit
+ // try reading
+ try {
+ newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
+ switch (newstate) {
+ case IOState::Done:
+ case IOState::NeedRead:
+ TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
+ TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
+ if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
+ pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
+ newstate = IOState::Done;
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(pid, &pid->inMSG);
+ return;
+ }
+ break;
+ case IOState::NeedWrite:
+ break;
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+ }
+ catch (const std::exception& e) {
+ newstate = IOState::Done;
+ TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
+ PacketBuffer empty;
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(pid, &empty); // this conveys error status
+ return;
+ }
+ break;
+
+ case TCPAction::DoingWrite:
+ TCPLOG(pid->tcpsock, "highState: Writing" << endl);
+ try {
+ TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
+ newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
+ TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
+ switch (newstate) {
+ case IOState::Done: {
+ TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
+ return;
+ }
+ case IOState::NeedRead:
+ TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
+ break;
+ case IOState::NeedWrite:
+ TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
+ break;
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+ }
+ catch (const std::exception& e) {
+ newstate = IOState::Done;
+ TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
+ PacketBuffer sent;
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(pid, &sent); // we convey error status by sending empty string
+ return;
+ }
+ break;
+ }
+
+ // Cases that did not end up doing a sendEvent
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+}
+
+
+void checkFastOpenSysctl(bool active)
+{
+#ifdef __linux__
+ string line;
+ if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
+ int flag = std::stoi(line);
+ if (active && !(flag & 1)) {
+ g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
+ }
+ if (!active && !(flag & 2)) {
+ g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl;
+ }
+ }
+ else {
+ g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
+ }
+#else
+ g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl;
+#endif
+}
+
+void checkTFOconnect()
+{
+ try {
+ Socket s(AF_INET, SOCK_STREAM);
+ s.setNonBlocking();
+ s.setFastOpenConnect();
+ }
+ catch (const NetworkError& e) {
+ g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl;
+ }
+}
+
+
+
+LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
+{
+ TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
+
+ auto pident = std::make_shared<PacketID>();
+ pident->tcphandler = handler;
+ pident->tcpsock = handler->getDescriptor();
+ pident->outMSG = data;
+ pident->highState = TCPAction::DoingWrite;
+
+ IOState state;
+ try {
+ TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
+ state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
+ TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
+
+ if (state == IOState::Done) {
+ TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
+ return LWResult::Result::Success;
+ }
+ }
+ catch (const std::exception& e) {
+ TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
+ return LWResult::Result::PermanentError;
+ }
+
+ // Will set pident->lowState
+ TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+ PacketBuffer packet;
+ int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
+ TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
+ if (ret == 0) {
+ TCPLOG(pident->tcpsock, "timeout" << endl);
+ TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+ return LWResult::Result::Timeout;
+ }
+ else if (ret == -1) { // error
+ TCPLOG(pident->tcpsock, "PermanentError" << endl);
+ TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+ return LWResult::Result::PermanentError;
+ }
+ else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
+ // fd housekeeping done by TCPIOHandlerIO
+ TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
+ return LWResult::Result::PermanentError;
+ }
+
+ TCPLOG(pident->tcpsock, "asendtcp success" << endl);
+ return LWResult::Result::Success;
+}
+
+LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
+{
+ TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
+ data.resize(len);
+
+ // We might have data already available from the TLS layer, try to get that into the buffer
+ size_t pos = 0;
+ IOState state;
+ try {
+ TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
+ state = handler->tryRead(data, pos, len);
+ TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
+ switch (state) {
+ case IOState::Done:
+ case IOState::NeedRead:
+ if (pos == len || (incompleteOkay && pos > 0)) {
+ data.resize(pos);
+ TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
+ return LWResult::Result::Success;
+ }
+ break;
+ case IOState::NeedWrite:
+ break;
+ case IOState::Async:
+ throw std::runtime_error("TLS async mode not supported");
+ break;
+ }
+ }
+ catch (const std::exception& e) {
+ TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
+ return LWResult::Result::PermanentError;
+ }
+
+ auto pident = std::make_shared<PacketID>();
+ pident->tcphandler = handler;
+ pident->tcpsock = handler->getDescriptor();
+ // We might have a partial result
+ pident->inMSG = std::move(data);
+ pident->inPos = pos;
+ pident->inWanted = len;
+ pident->inIncompleteOkay = incompleteOkay;
+ pident->highState = TCPAction::DoingRead;
+
+ data.clear();
+
+ // Will set pident->lowState
+ TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+ int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
+ TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' );
+ if (ret == 0) {
+ TCPLOG(pident->tcpsock, "timeout" << endl);
+ TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+ return LWResult::Result::Timeout;
+ }
+ else if (ret == -1) {
+ TCPLOG(pident->tcpsock, "PermanentError" << endl);
+ TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
+ return LWResult::Result::PermanentError;
+ }
+ else if (data.empty()) {// error, EOF or other
+ // fd housekeeping done by TCPIOHandlerIO
+ TCPLOG(pident->tcpsock, "EOF" << endl);
+ return LWResult::Result::PermanentError;
+ }
+
+ TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
+ return LWResult::Result::Success;
+}
+
+void makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets)
+{
+ int fd;
+ vector<string>locals;
+ stringtok(locals,::arg()["local-address"]," ,");
+
+ if(locals.empty())
+ throw PDNSException("No local address specified");
+
+ for(vector<string>::const_iterator i=locals.begin();i!=locals.end();++i) {
+ ServiceTuple st;
+ st.port=::arg().asNum("local-port");
+ parseService(*i, st);
+
+ ComboAddress sin;
+
+ sin.reset();
+ sin.sin4.sin_family = AF_INET;
+ if(!IpToU32(st.host, (uint32_t*)&sin.sin4.sin_addr.s_addr)) {
+ sin.sin6.sin6_family = AF_INET6;
+ if(makeIPv6sockaddr(st.host, &sin.sin6) < 0)
+ throw PDNSException("Unable to resolve local address for TCP server on '"+ st.host +"'");
+ }
+
+ fd=socket(sin.sin6.sin6_family, SOCK_STREAM, 0);
+ if(fd<0)
+ throw PDNSException("Making a TCP server socket for resolver: "+stringerror());
+
+ setCloseOnExec(fd);
+
+ int tmp=1;
+ if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp)<0) {
+ g_log<<Logger::Error<<"Setsockopt failed for TCP listening socket"<<endl;
+ exit(1);
+ }
+ if(sin.sin6.sin6_family == AF_INET6 && setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
+ int err = errno;
+ g_log<<Logger::Error<<"Failed to set IPv6 socket to IPv6 only, continuing anyhow: "<<strerror(err)<<endl;
+ }
+
+#ifdef TCP_DEFER_ACCEPT
+ if(setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
+ if(i==locals.begin())
+ g_log<<Logger::Info<<"Enabled TCP data-ready filter for (slight) DoS protection"<<endl;
+ }
+#endif
+
+ if( ::arg().mustDo("non-local-bind") )
+ Utility::setBindAny(AF_INET, fd);
+
+ if (g_reusePort) {
+#if defined(SO_REUSEPORT_LB)
+ try {
+ SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
+ }
+ catch (const std::exception& e) {
+ throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
+ }
+#elif defined(SO_REUSEPORT)
+ try {
+ SSetsockopt(fd, SOL_SOCKET, SO_REUSEPORT, 1);
+ }
+ catch (const std::exception& e) {
+ throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
+ }
+#endif
+ }
+
+ if (SyncRes::s_tcp_fast_open > 0) {
+ checkFastOpenSysctl(false);
+#ifdef TCP_FASTOPEN
+ if (setsockopt(fd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
+ int err = errno;
+ g_log<<Logger::Error<<"Failed to enable TCP Fast Open for listening socket: "<<strerror(err)<<endl;
+ }
+#else
+ g_log<<Logger::Warning<<"TCP Fast Open configured but not supported for listening socket"<<endl;
+#endif
+ }
+
+ sin.sin4.sin_port = htons(st.port);
+ socklen_t socklen=sin.sin4.sin_family==AF_INET ? sizeof(sin.sin4) : sizeof(sin.sin6);
+ if (::bind(fd, (struct sockaddr *)&sin, socklen )<0)
+ throw PDNSException("Binding TCP server socket for "+ st.host +": "+stringerror());
+
+ setNonBlocking(fd);
+ try {
+ setSocketSendBuffer(fd, 65000);
+ }
+ catch (const std::exception& e) {
+ g_log<<Logger::Error<<e.what()<<endl;
+ }
+
+ listen(fd, 128);
+ deferredAdds.emplace_back(fd, handleNewTCPQuestion);
+ tcpSockets.insert(fd);
+
+ // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
+ // - fd is not that which we know here, but returned from accept()
+ if(sin.sin4.sin_family == AF_INET)
+ g_log<<Logger::Info<<"Listening for TCP queries on "<< sin.toString() <<":"<<st.port<<endl;
+ else
+ g_log<<Logger::Info<<"Listening for TCP queries on ["<< sin.toString() <<"]:"<<st.port<<endl;
+ }
+}
};
extern std::unique_ptr<MemRecursorCache> g_recCache;
extern thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
-typedef MTasker<std::shared_ptr<PacketID>, PacketBuffer, PacketIDCompare> MT_t;
-MT_t* getMT();
struct RecursorStats
{
#include "rpzloader.hh"
#include "uuid-utils.hh"
#include "tcpiohandler.hh"
+#include "rec-main.hh"
extern thread_local FDMultiplexer* t_fdm;