From: Otto Date: Fri, 28 May 2021 08:52:49 +0000 (+0200) Subject: Align TCPIOHandlerReadable and Writeable and process some review comments X-Git-Tag: dnsdist-1.7.0-alpha1~138^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4bcf780e33949041e9b38eecb677cd1e8f3c4c13;p=thirdparty%2Fpdns.git Align TCPIOHandlerReadable and Writeable and process some review comments --- diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 3ea85cf725..322005a5c7 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -397,26 +397,39 @@ static bool isHandlerThread() } #if 0 -#define TCPLOG(x) cerr << x +#define TCPLOG(x) cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << ' ' << x #else #define TCPLOG(x) #endif -static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var); +static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var); +static void TCPIOHandlerStateChange(IOState, IOState, std::shared_ptr&); LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr& handler) { TCPLOG("asendtcp called " << data.size() << endl); - PacketID pident; - pident.tcphandler = handler; - pident.tcpsock = handler->getDescriptor(); - pident.outMSG = data; + auto pident = std::make_shared(); + pident->tcphandler = handler; + pident->tcpsock = handler->getDescriptor(); + pident->outMSG = data; + pident->highState = TCPAction::DoingWrite; - t_fdm->addWriteFD(handler->getDescriptor(), TCPIOHandlerWritable, pident); - PacketBuffer packet; - int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec); + TCPLOG("Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> "); + IOState state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size()); + TCPLOG(pident->outPos << '/' << pident->outMSG.size() << endl); + + if (state == IOState::Done) { + TCPLOG("asendtcp success A" << endl); + return LWResult::Result::Success; + } + + // Will set pident->lowState + TCPIOHandlerStateChange(IOState::Done, state, pident); + + PacketBuffer packet; + int ret = MT->waitEvent(*pident, &packet, g_networkTimeoutMsec); TCPLOG("asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' '); if (ret == 0) { t_fdm->removeWriteFD(handler->getDescriptor()); @@ -437,33 +450,53 @@ LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr& ha return LWResult::Result::Success; } -static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var); - LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr& handler, const bool incompleteOkay) { TCPLOG("arecvtcp called " << len << ' ' << data.size() << endl); - size_t pos = 0; data.resize(len); - TCPLOG("calling tryRead() " << data.size() << ' ' << len << endl); - /* IOState state = */ handler->tryRead(data, pos, len); - TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl); - if (pos == len || (incompleteOkay && pos > 0)) { - data.resize(pos); - TCPLOG("acecvtcp success A" << endl); - return LWResult::Result::Success; + // We might have data already available from the TLS layer, try to get that into the buffer + size_t pos = 0; + IOState state; + try { + TCPLOG("calling tryRead() " << len << endl); + state = handler->tryRead(data, pos, len); + TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl); + switch (state) { + case IOState::Done: + case IOState::NeedRead: + if (pos == len || (incompleteOkay && pos > 0)) { + data.resize(pos); + TCPLOG("acecvtcp success A" << endl); + return LWResult::Result::Success; + } + break; + case IOState::NeedWrite: + break; + } + } + catch (const std::exception& e) { + TCPLOG("tryRead() exception..." << e.what() << endl); + return LWResult::Result::PermanentError; } + auto pident = std::make_shared(); + pident->tcphandler = handler; + pident->tcpsock = handler->getDescriptor(); + // We might have a partial result + data.resize(pos); + pident->inMSG = data; + pident->inNeeded = len; + pident->inIncompleteOkay = incompleteOkay; + pident->highState = TCPAction::DoingRead; + data.clear(); - PacketID pident; - pident.tcphandler = handler; - pident.tcpsock = handler->getDescriptor(); - pident.inNeeded = len; - pident.inIncompleteOkay = incompleteOkay; - t_fdm->addReadFD(handler->getDescriptor(), TCPIOHandlerReadable, pident); - - int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec); - TCPLOG("arecvtcp" << ret << ' ' << data.size() << ' '); + + // Will set pident->lowState + TCPIOHandlerStateChange(IOState::Done, state, pident); + + int ret = MT->waitEvent(*pident, &data, g_networkTimeoutMsec); + TCPLOG("arecvtcp " << ret << ' ' << data.size() << ' ' ); if (ret == 0) { TCPLOG("timeout" << endl); t_fdm->removeReadFD(handler->getDescriptor()); @@ -507,12 +540,13 @@ static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var) // cerr<<"Had some kind of error: "<addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident); PacketBuffer data; - int ret=MT->waitEvent(pident, &data, g_networkTimeoutMsec); - + if(!ret || ret==-1) { // timeout t_fdm->removeReadFD(s.getHandle()); } @@ -4054,87 +4087,146 @@ static void handleRCC(int fd, FDMultiplexer::funcparam_t& var) } } -static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var) +static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr& pid) { - PacketID* pident=boost::any_cast(&var); - assert(pident->tcphandler); - assert(fd == pident->tcphandler->getDescriptor()); + TCPLOG("State transation " << int(oldstate) << "->" << int(newstate) << endl); - TCPLOG("TCPIOHandlerReadable" << endl); - try { - size_t pos = pident->inMSG.size(); - pident->inMSG.resize(pos + pident->inNeeded); // make room for what we'll read - IOState state = pident->tcphandler->tryRead(pident->inMSG, pos, pident->inNeeded); + pid->lowState = newstate; - switch (state) { + // handle state transitions + switch (oldstate) { + case IOState::NeedRead: + + switch (newstate) { + case IOState::NeedWrite: + TCPLOG("NeedRead -> NeedWrite: flip FD" << endl); + t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::NeedRead: + break; case IOState::Done: + TCPLOG("Done -> removeReadFD" << endl); + t_fdm->removeReadFD(pid->tcpsock); + break; + } + break; + + case IOState::NeedWrite: + + switch (newstate) { case IOState::NeedRead: - TCPLOG("TCPIOHandlerReadable state Done or Read " << int(state) << ' ' << buffer.size() << " bytes " << pident->inNeeded << '/' << pos << endl); - pident->inMSG.resize(pos); // old content (if there) + new bytes read - pident->inNeeded -= pos; - TCPLOG("TCPIOHandlerReadable " << pident->inNeeded << ' ' << pident->inIncompleteOkay << endl); - if (pident->inNeeded == 0 || pident->inIncompleteOkay) { - // removeReadFD seems to clobber PacketID, so take a copy - PacketID pid = *pident; - t_fdm->removeReadFD(fd); - MT->sendEvent(pid, &pid.inMSG); - break; - } - TCPLOG("TCPIOHandlerReadable more? flip to write seems needed???..." << endl); - t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident); + TCPLOG("NeedWrite -> NeedRead: flip FD" << endl); + t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid); break; case IOState::NeedWrite: - TCPLOG("NeedWrite... flip FD" << endl); - t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident); + break; + case IOState::Done: + TCPLOG("Done -> removeWriteFD" << endl); + t_fdm->removeWriteFD(pid->tcpsock); break; } + break; + + case IOState::Done: + switch (newstate) { + case IOState::NeedRead: + TCPLOG("NeedRead: addReadFD" << endl); + t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::NeedWrite: + TCPLOG("NeedWrite: addWriteFD" << endl); + t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid); + break; + case IOState::Done: + break; + } + break; } - catch (const std::exception& e) { - TCPLOG("TCPIOHandlerReadble exception..." << e.what() << endl); - PacketID tmp = *pident; - t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still) - PacketBuffer empty; - MT->sendEvent(tmp, &empty); // this conveys error status - } + } -static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var) +static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var) { - PacketID* pid = boost::any_cast(&var); + auto pid = boost::any_cast>(var); assert(pid->tcphandler); assert(fd == pid->tcphandler->getDescriptor()); + IOState newstate = IOState::Done; - TCPLOG("TCPIOHandlerWritable " << pid->outPos << '/' << pid->outMSG.size() << endl); - try { - IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size()); - switch (state) { - case IOState::Done: { - TCPLOG("TCPIOHandlerWritable Done" << endl); - // removeWriteFD seems to clobber PacketID, so take a copy - PacketID tmp = *pid; - t_fdm->removeWriteFD(fd); - MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok - break; + TCPLOG("TCPIOHandlerIO: lowState " << int(pid->lowState) << endl); + + // In the code below, we want to update the state of the fd before calling sendEvent + // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd + + switch (pid->highState) { + case TCPAction::DoingRead: + TCPLOG("highState: Reading" << endl); + // try reading + try { + size_t pos = pid->inMSG.size(); + pid->inMSG.resize(pos + pid->inNeeded); // make room for what we'll read + newstate = pid->tcphandler->tryRead(pid->inMSG, pos, pid->inNeeded); + switch (newstate) { + case IOState::Done: + case IOState::NeedRead: + TCPLOG("tryRead: Done or NeedRead " << int(newstate) << ' ' << pos << '/' << pid->inNeeded << endl); + pid->inMSG.resize(pos); // old content (if there) + new bytes read + pid->inNeeded -= pos; + TCPLOG("TCPIOHandlerIO " << pid->inNeeded << ' ' << pid->inIncompleteOkay << endl); + if (pid->inNeeded == 0 || pid->inIncompleteOkay) { + newstate = IOState::Done; + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(*pid, &pid->inMSG); + return; + } + break; + case IOState::NeedWrite: + break; + } } - case IOState::NeedWrite: - TCPLOG("TCPIOHandlerWritable NeedWrite" << endl); - // We'll get back later - break; - case IOState::NeedRead: - TCPLOG("NeedRead: flip FD" << endl); - pid->inNeeded = 1; - t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, *pid); - break; + catch (const std::exception& e) { + newstate = IOState::Done; + TCPLOG("read exception..." << e.what() << endl); + PacketBuffer empty; + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(*pid, &empty); // this conveys error status + return; } + break; + + case TCPAction::DoingWrite: + TCPLOG("highState: Writing" << endl); + try { + TCPLOG("tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << pid << " -> "); + newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size()); + TCPLOG(pid->outPos << '/' << pid->outMSG.size() << endl); + switch (newstate) { + case IOState::Done: { + TCPLOG("tryWrite: Done" << endl); + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(*pid, &pid->outMSG); // send back what we sent to convey everything is ok + return; + } + case IOState::NeedRead: + TCPLOG("tryWrite: NeedRead" << endl); + break; + case IOState::NeedWrite: + TCPLOG("tryWrite: NeedWrite" << endl); + break; + } + } + catch (const std::exception& e) { + newstate = IOState::Done; + TCPLOG("write exception..." << e.what() << endl); + PacketBuffer sent; + TCPIOHandlerStateChange(pid->lowState, newstate, pid); + MT->sendEvent(*pid, &sent); // we convey error status by sending empty string + return; + } + break; } - catch (const std::exception& e) { - TCPLOG("TCPIOHandlerWritable exception..." << e.what() << endl); - // removeWriteFD seems to clobber PacketID, so take a copy - PacketID tmp = *pid; - t_fdm->removeWriteFD(fd); - PacketBuffer sent; - MT->sendEvent(tmp, &sent); // we convey error status by sending empty string - } + + // Cases that did not end up doing a sendEvent + TCPIOHandlerStateChange(pid->lowState, newstate, pid); } // resend event to everybody chained onto it diff --git a/pdns/recursordist/docs/settings.rst b/pdns/recursordist/docs/settings.rst index 7b18b624e2..b63ec0871e 100644 --- a/pdns/recursordist/docs/settings.rst +++ b/pdns/recursordist/docs/settings.rst @@ -386,6 +386,17 @@ If `pdns-distributes-queries`_ is set, spawn this number of distributor threads handle incoming queries and distribute them to other threads based on a hash of the query, to maximize the cache hit ratio. +.. _settings-dot-to-port-853: + +``dot-to-port-853`` +------------------- +.. versionadded:: 4.6.0 + +- Boolean +- Default: ``yes`` if DoT support is compiled in, ``no`` otherwise. + +Enable DoT to forwarders that specify port 853. + .. _setting-dns64-prefix: ``dns64-prefix`` diff --git a/pdns/syncres.hh b/pdns/syncres.hh index a9bf06b960..0e820dc469 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -926,6 +926,8 @@ private: LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr&); LWResult::Result arecvtcp(PacketBuffer& data, size_t len, shared_ptr&, bool incompleteOkay); +enum TCPAction : uint8_t { DoingRead, DoingWrite }; + struct PacketID { PacketID() @@ -951,6 +953,8 @@ struct PacketID bool inIncompleteOkay{false}; uint16_t id{0}; // wait for a specific id/remote pair uint16_t type{0}; // and this is its type + TCPAction highState; + IOState lowState; bool operator<(const PacketID& b) const { diff --git a/pdns/tcpiohandler.hh b/pdns/tcpiohandler.hh index 33e09773f5..6b8a4c1fba 100644 --- a/pdns/tcpiohandler.hh +++ b/pdns/tcpiohandler.hh @@ -8,7 +8,7 @@ #include "misc.hh" #include "noinitvector.hh" -enum class IOState { Done, NeedRead, NeedWrite }; +enum class IOState : uint8_t { Done, NeedRead, NeedWrite }; class TLSConnection {