From: Otto Date: Wed, 19 May 2021 10:29:38 +0000 (+0200) Subject: Handle IOState::NeedWrite/NeedRead by flipping the status X-Git-Tag: dnsdist-1.7.0-alpha1~138^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=66242d006ab49ac4c94a50e452a109377de0dd64;p=thirdparty%2Fpdns.git Handle IOState::NeedWrite/NeedRead by flipping the status --- diff --git a/pdns/lwres.cc b/pdns/lwres.cc index 8c3e431b7a..706b3926a4 100644 --- a/pdns/lwres.cc +++ b/pdns/lwres.cc @@ -349,8 +349,8 @@ LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int s.bind(localip); std::shared_ptr tlsCtx{nullptr}; - TCPIOHandler handler("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec); - IOState state = handler.tryConnect(SyncRes::s_tcp_fast_open_connect, ip); + auto handler = std::make_shared("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec); + /* auto state = */ handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip); uint16_t tlen=htons(vpacket.size()); char *lenP=(char*)&tlen; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 49d59b01dc..551eb6c122 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -396,52 +396,25 @@ static bool isHandlerThread() return s_threadInfos.at(t_id).isHandler; } -static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var); - -LWResult::Result asendtcp(const PacketBuffer& data, Socket* sock) -{ - PacketID pident; - pident.tcpsock=sock->getHandle(); - pident.outMSG = data; - - t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident); - PacketBuffer packet; - - int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec); - if (ret == 0) { //timeout - t_fdm->removeWriteFD(sock->getHandle()); - return LWResult::Result::Timeout; - } - else if (ret == -1) { // error - t_fdm->removeWriteFD(sock->getHandle()); - return LWResult::Result::PermanentError; - } - else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error - return LWResult::Result::PermanentError; - } - - return LWResult::Result::Success; -} - static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var); -LWResult::Result asendtcp(const PacketBuffer& data, TCPIOHandler& handler) +LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr& handler) { PacketID pident; - pident.tcphandler = &handler; - pident.tcpsock = handler.getDescriptor(); + pident.tcphandler = handler; + pident.tcpsock = handler->getDescriptor(); pident.outMSG = data; - t_fdm->addWriteFD(handler.getDescriptor(), TCPIOHandlerWritable, pident); + t_fdm->addWriteFD(handler->getDescriptor(), TCPIOHandlerWritable, pident); PacketBuffer packet; int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec); if (ret == 0) { //timeout - t_fdm->removeWriteFD(handler.getDescriptor()); + t_fdm->removeWriteFD(handler->getDescriptor()); return LWResult::Result::Timeout; } else if (ret == -1) { // error - t_fdm->removeWriteFD(handler.getDescriptor()); + t_fdm->removeWriteFD(handler->getDescriptor()); return LWResult::Result::PermanentError; } else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error @@ -451,53 +424,26 @@ LWResult::Result asendtcp(const PacketBuffer& data, TCPIOHandler& handler) return LWResult::Result::Success; } -static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var); - -LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, Socket* sock, const bool incompleteOkay) -{ - data.clear(); - PacketID pident; - pident.tcpsock=sock->getHandle(); - pident.inNeeded=len; - pident.inIncompleteOkay=incompleteOkay; - t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident); - - int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec); - if (ret == 0) { - t_fdm->removeReadFD(sock->getHandle()); - return LWResult::Result::Timeout; - } - else if (ret == -1) { - t_fdm->removeWriteFD(sock->getHandle()); - return LWResult::Result::PermanentError; - } - else if (data.empty()) {// error, EOF or other - return LWResult::Result::PermanentError; - } - - return LWResult::Result::Success; -} - static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var); -LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, TCPIOHandler& handler, const bool incompleteOkay) +LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr& handler, const bool incompleteOkay) { data.clear(); PacketID pident; - pident.tcphandler = &handler; - pident.tcpsock = handler.getDescriptor(); + pident.tcphandler = handler; + pident.tcpsock = handler->getDescriptor(); pident.inNeeded = len; pident.inIncompleteOkay = incompleteOkay; - t_fdm->addReadFD(handler.getDescriptor(), TCPIOHandlerReadable, pident); + t_fdm->addReadFD(handler->getDescriptor(), TCPIOHandlerReadable, pident); int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec); if (ret == 0) { - t_fdm->removeReadFD(handler.getDescriptor()); + t_fdm->removeReadFD(handler->getDescriptor()); return LWResult::Result::Timeout; } else if (ret == -1) { - t_fdm->removeWriteFD(handler.getDescriptor()); + t_fdm->removeWriteFD(handler->getDescriptor()); return LWResult::Result::PermanentError; } else if (data.empty()) {// error, EOF or other @@ -4078,37 +4024,6 @@ static void handleRCC(int fd, FDMultiplexer::funcparam_t& var) } } -static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var) -{ - PacketID* pident=boost::any_cast(&var); - // cerr<<"handleTCPClientReadable called for fd "<inNeeded: "<inNeeded<<", "<sock->getHandle()< buffer(new char[pident->inNeeded]); - - ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0); - if(ret > 0) { - pident->inMSG.insert(pident->inMSG.end(), &buffer[0], &buffer[ret]); - pident->inNeeded -= (size_t)ret; - if(!pident->inNeeded || pident->inIncompleteOkay) { - // cerr<<"Got entire load of "<inMSG.size()<<" bytes"<inMSG; - - t_fdm->removeReadFD(fd); - MT->sendEvent(pid, &msg); - } - else { - // cerr<<"Still have "<inNeeded<<" left to go"<removeReadFD(fd); // pident might now be invalid (it isn't, but still) - PacketBuffer empty; - MT->sendEvent(tmp, &empty); // this conveys error status - } -} - static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var) { PacketID* pident=boost::any_cast(&var); @@ -4129,14 +4044,14 @@ static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var) pident->inMSG.insert(pident->inMSG.end(), buffer.data(), buffer.data() + pos); pident->inNeeded -= pos; if (pident->inNeeded == 0 || pident->inIncompleteOkay) { + // removeReadFD seems to clobber PacketID, so take a copy PacketID pid = *pident; - PacketBuffer msg = pident->inMSG; t_fdm->removeReadFD(fd); - MT->sendEvent(pid, &msg); + MT->sendEvent(pid, &pid.inMSG); } break; case IOState::NeedWrite: - // What to do? + t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, var); break; } } @@ -4148,26 +4063,6 @@ static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var) } } -static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var) -{ - PacketID* pid = boost::any_cast(&var); - ssize_t ret = send(fd, pid->outMSG.data() + pid->outPos, pid->outMSG.size() - pid->outPos,0); - if (ret > 0) { - pid->outPos += (ssize_t)ret; - if (pid->outPos == pid->outMSG.size()) { - PacketID tmp=*pid; - t_fdm->removeWriteFD(fd); - MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok - } - } - else { // error or EOF - PacketID tmp(*pid); - t_fdm->removeWriteFD(fd); - PacketBuffer sent; - MT->sendEvent(tmp, &sent); // we convey error status by sending empty string - } -} - static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var) { PacketID* pid = boost::any_cast(&var); @@ -4178,6 +4073,7 @@ static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var) IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size()); switch (state) { case IOState::Done: { + // removeWriteFD seems to clobber PacketID, so take a copy PacketID tmp = *pid; t_fdm->removeWriteFD(fd); MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok @@ -4187,11 +4083,12 @@ static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var) // We'll get back later break; case IOState::NeedRead: - // What to do? + t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, var); break; } } catch (const std::runtime_error& e) { + // removeWriteFD seems to clobber PacketID, so take a copy PacketID tmp = *pid; t_fdm->removeWriteFD(fd); PacketBuffer sent; @@ -5881,3 +5778,5 @@ int main(int argc, char **argv) return ret; } + +bool g_verbose; // XXX FIX ME XXX, see tcpiohandler.cc diff --git a/pdns/rec-carbon.cc b/pdns/rec-carbon.cc index 07d117bdbc..6dbbf4b8d9 100644 --- a/pdns/rec-carbon.cc +++ b/pdns/rec-carbon.cc @@ -49,16 +49,18 @@ try for(const auto& carbonServer: carbonServers) { ComboAddress remote(carbonServer, 2003); Socket s(remote.sin4.sin_family, SOCK_STREAM); - s.setNonBlocking(); - s.connect(remote); // we do the connect so the first attempt happens while we gather stats - + std::shared_ptr tlsCtx{nullptr}; + const int timeout = (g_networkTimeoutMsec + 999) / 1000; // XXX tcpiohandler's unit is seconds + auto handler = std::make_shared("", s.releaseHandle(), timeout, tlsCtx, time(nullptr)); + handler->tryConnect(SyncRes::s_tcp_fast_open_connect, remote);// we do the connect so the first attempt happens while we gather stats + if(msg.empty()) { auto all = getAllStatsMap(StatComponent::Carbon); - + ostringstream str; time_t now=time(0); - + for(const auto& val : all) { str<getPort() == 853; -// Option below is for debugging purposes ony -#define USE_TCP_ONLY 0 - -#if !USE_TCP_ONLY - gotAnswer = doResolveAtThisIP(prefix, qname, qtype, lwr, ednsmask, auth, sendRDQuery, wasForwarded, - tns->first, *remoteIP, false, truncated, spoofed); - if (spoofed || (gotAnswer && truncated)) { -#else - { -#endif + if (!forceTCP) { + gotAnswer = doResolveAtThisIP(prefix, qname, qtype, lwr, ednsmask, auth, sendRDQuery, wasForwarded, + tns->first, *remoteIP, false, truncated, spoofed); + } + if (forceTCP || (spoofed || (gotAnswer && truncated))) { /* retry, over TCP this time */ gotAnswer = doResolveAtThisIP(prefix, qname, qtype, lwr, ednsmask, auth, sendRDQuery, wasForwarded, tns->first, *remoteIP, true, truncated, spoofed); diff --git a/pdns/syncres.hh b/pdns/syncres.hh index 3a5e71a76d..19c7706133 100644 --- a/pdns/syncres.hh +++ b/pdns/syncres.hh @@ -919,12 +919,9 @@ private: LogMode d_lm; }; -class Socket; /* external functions, opaque to us */ -LWResult::Result asendtcp(const PacketBuffer& data, Socket* sock); -LWResult::Result arecvtcp(PacketBuffer& data, size_t len, Socket* sock, bool incompleteOkay); -LWResult::Result asendtcp(const PacketBuffer& data, TCPIOHandler&); -LWResult::Result arecvtcp(PacketBuffer& data, size_t len, TCPIOHandler&, bool incompleteOkay); +LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr&); +LWResult::Result arecvtcp(PacketBuffer& data, size_t len, shared_ptr&, bool incompleteOkay); struct PacketID { @@ -941,7 +938,7 @@ struct PacketID typedef set chain_t; mutable chain_t chain; - TCPIOHandler *tcphandler{nullptr}; + shared_ptr tcphandler{nullptr}; size_t inNeeded{0}; // if this is set, we'll read until inNeeded bytes are read string::size_type outPos{0}; // how far we are along in the outMSG mutable uint32_t nearMisses{0}; // number of near misses - host correct, id wrong diff --git a/pdns/ws-recursor.cc b/pdns/ws-recursor.cc index ed6e4e2c5e..4be72fc98e 100644 --- a/pdns/ws-recursor.cc +++ b/pdns/ws-recursor.cc @@ -42,6 +42,7 @@ #include "rec-lua-conf.hh" #include "rpzloader.hh" #include "uuid-utils.hh" +#include "tcpiohandler.hh" extern thread_local FDMultiplexer* t_fdm; @@ -1249,10 +1250,14 @@ void AsyncWebServer::serveConnection(std::shared_ptr client) const { yarl.initialize(&req); client->setNonBlocking(); + const int timeout = (g_networkTimeoutMsec + 999) / 1000; // XXX tcpiohandler's unit is seconds + std::shared_ptr tlsCtx{nullptr}; + auto handler = std::make_shared("", client->releaseHandle(), timeout, tlsCtx, time(nullptr)); + PacketBuffer data; try { while(!req.complete) { - auto ret = arecvtcp(data, 16384, client.get(), true); + auto ret = arecvtcp(data, 16384, handler, true); if (ret == LWResult::Result::Success) { string str(reinterpret_cast(data.data()), data.size()); req.complete = yarl.feed(str); @@ -1282,7 +1287,7 @@ void AsyncWebServer::serveConnection(std::shared_ptr client) const { logResponse(resp, remote, logprefix); // now send the reply - if (asendtcp(reply, client.get()) != LWResult::Result::Success || reply.empty()) { + if (asendtcp(reply, handler) != LWResult::Result::Success || reply.empty()) { g_log<