]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Align TCPIOHandlerReadable and Writeable and process some review comments
authorOtto <otto.moerbeek@open-xchange.com>
Fri, 28 May 2021 08:52:49 +0000 (10:52 +0200)
committerOtto <otto.moerbeek@open-xchange.com>
Fri, 18 Jun 2021 06:47:42 +0000 (08:47 +0200)
pdns/pdns_recursor.cc
pdns/recursordist/docs/settings.rst
pdns/syncres.hh
pdns/tcpiohandler.hh

index 3ea85cf725dfcbab67212b684b01be3f923a12e6..322005a5c7c7b82857812d8b40129d6d9d38b3dc 100644 (file)
@@ -397,26 +397,39 @@ static bool isHandlerThread()
 }
 
 #if 0
-#define TCPLOG(x) cerr << x
+#define TCPLOG(x) cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10  + t.tv_usec/1000000.0; }() << ' ' << x
 #else
 #define TCPLOG(x)
 #endif
 
-static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var);
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
+static void TCPIOHandlerStateChange(IOState, IOState, std::shared_ptr<PacketID>&);
 
 LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
 {
   TCPLOG("asendtcp called " << data.size() << endl);
 
-  PacketID pident;
-  pident.tcphandler = handler;
-  pident.tcpsock = handler->getDescriptor();
-  pident.outMSG = data;
+  auto pident = std::make_shared<PacketID>();
+  pident->tcphandler = handler;
+  pident->tcpsock = handler->getDescriptor();
+  pident->outMSG = data;
+  pident->highState = TCPAction::DoingWrite;
 
-  t_fdm->addWriteFD(handler->getDescriptor(), TCPIOHandlerWritable, pident);
-  PacketBuffer packet;
 
-  int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
+  TCPLOG("Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
+  IOState state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
+  TCPLOG(pident->outPos << '/' << pident->outMSG.size() << endl);
+
+  if (state == IOState::Done) {
+    TCPLOG("asendtcp success A" << endl);
+    return LWResult::Result::Success;
+  }
+
+  // Will set pident->lowState
+  TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+  PacketBuffer packet;
+  int ret = MT->waitEvent(*pident, &packet, g_networkTimeoutMsec);
   TCPLOG("asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
   if (ret == 0) {
     t_fdm->removeWriteFD(handler->getDescriptor());
@@ -437,33 +450,53 @@ LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& ha
   return LWResult::Result::Success;
 }
 
-static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var);
-
 LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
 {
   TCPLOG("arecvtcp called " << len << ' ' << data.size() << endl);
-  size_t pos = 0;
   data.resize(len);
-  TCPLOG("calling tryRead() " << data.size() << ' ' << len << endl);
-  /* IOState state = */ handler->tryRead(data, pos, len);
-  TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
 
-  if (pos == len || (incompleteOkay && pos > 0)) {
-    data.resize(pos);
-    TCPLOG("acecvtcp success A" << endl);
-    return LWResult::Result::Success;
+  // We might have data already available from the TLS layer, try to get that into the buffer
+  size_t pos = 0;
+  IOState state;
+  try {
+    TCPLOG("calling tryRead() " << len << endl);
+    state = handler->tryRead(data, pos, len);
+    TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
+    switch (state) {
+    case IOState::Done:
+    case IOState::NeedRead:
+      if (pos == len || (incompleteOkay && pos > 0)) {
+        data.resize(pos);
+        TCPLOG("acecvtcp success A" << endl);
+        return LWResult::Result::Success;
+      }
+      break;
+    case IOState::NeedWrite:
+      break;
+    }
+  }
+  catch (const std::exception& e) {
+    TCPLOG("tryRead() exception..." << e.what() << endl);
+    return LWResult::Result::PermanentError;
   }
 
+  auto pident = std::make_shared<PacketID>();
+  pident->tcphandler = handler;
+  pident->tcpsock = handler->getDescriptor();
+  // We might have a partial result
+  data.resize(pos);
+  pident->inMSG = data;
+  pident->inNeeded = len;
+  pident->inIncompleteOkay = incompleteOkay;
+  pident->highState = TCPAction::DoingRead;
+
   data.clear();
-  PacketID pident;
-  pident.tcphandler = handler;
-  pident.tcpsock = handler->getDescriptor();
-  pident.inNeeded = len;
-  pident.inIncompleteOkay = incompleteOkay;
-  t_fdm->addReadFD(handler->getDescriptor(), TCPIOHandlerReadable, pident);
-
-  int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
-  TCPLOG("arecvtcp" << ret << ' ' << data.size() << ' ');
+
+  // Will set pident->lowState
+  TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+  int ret = MT->waitEvent(*pident, &data, g_networkTimeoutMsec);
+  TCPLOG("arecvtcp " << ret << ' ' << data.size() << ' ' );
   if (ret == 0) {
     TCPLOG("timeout" << endl);
     t_fdm->removeReadFD(handler->getDescriptor());
@@ -507,12 +540,13 @@ static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
     //    cerr<<"Had some kind of error: "<<ret<<", "<<stringerror()<<endl;
   }
 }
+
 PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query)
 {
   Socket s(dest.sin4.sin_family, SOCK_DGRAM);
   s.setNonBlocking();
   ComboAddress local = pdns::getQueryLocalAddress(dest.sin4.sin_family, 0);
-  
+
   s.bind(local);
   s.connect(dest);
   s.send(query);
@@ -524,9 +558,8 @@ PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query)
   t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
 
   PacketBuffer data;
   int ret=MT->waitEvent(pident, &data, g_networkTimeoutMsec);
+
   if(!ret || ret==-1) { // timeout
     t_fdm->removeReadFD(s.getHandle());
   }
@@ -4054,87 +4087,146 @@ static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
   }
 }
 
-static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var)
+static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
 {
-  PacketID* pident=boost::any_cast<PacketID>(&var);
-  assert(pident->tcphandler);
-  assert(fd == pident->tcphandler->getDescriptor());
+  TCPLOG("State transation " << int(oldstate) << "->" << int(newstate) << endl);
 
-  TCPLOG("TCPIOHandlerReadable" << endl);
-  try {
-    size_t pos = pident->inMSG.size();
-    pident->inMSG.resize(pos + pident->inNeeded); // make room for what we'll read
-    IOState state = pident->tcphandler->tryRead(pident->inMSG, pos, pident->inNeeded);
+  pid->lowState = newstate;
 
-    switch (state) {
+  // handle state transitions
+  switch (oldstate) {
+  case IOState::NeedRead:
+
+    switch (newstate) {
+    case IOState::NeedWrite:
+      TCPLOG("NeedRead -> NeedWrite: flip FD" << endl);
+      t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::NeedRead:
+      break;
     case IOState::Done:
+      TCPLOG("Done -> removeReadFD" << endl);
+      t_fdm->removeReadFD(pid->tcpsock);
+      break;
+    }
+    break;
+
+  case IOState::NeedWrite:
+
+    switch (newstate) {
     case IOState::NeedRead:
-      TCPLOG("TCPIOHandlerReadable state Done or Read " << int(state) << ' ' << buffer.size() << " bytes " <<  pident->inNeeded << '/' << pos << endl);
-      pident->inMSG.resize(pos); // old content (if there) + new bytes read
-      pident->inNeeded -= pos;
-      TCPLOG("TCPIOHandlerReadable " << pident->inNeeded << ' ' << pident->inIncompleteOkay << endl);
-      if (pident->inNeeded == 0 || pident->inIncompleteOkay) {
-        // removeReadFD seems to clobber PacketID, so take a copy
-        PacketID pid = *pident;
-        t_fdm->removeReadFD(fd);
-        MT->sendEvent(pid, &pid.inMSG);
-        break;
-      }
-      TCPLOG("TCPIOHandlerReadable more? flip to write seems needed???..." << endl);
-      t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident);
+      TCPLOG("NeedWrite -> NeedRead: flip FD" << endl);
+      t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
       break;
     case IOState::NeedWrite:
-      TCPLOG("NeedWrite... flip FD" << endl);
-      t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident);
+      break;
+    case IOState::Done:
+      TCPLOG("Done -> removeWriteFD" << endl);
+      t_fdm->removeWriteFD(pid->tcpsock);
       break;
     }
+    break;
+
+  case IOState::Done:
+    switch (newstate) {
+    case IOState::NeedRead:
+      TCPLOG("NeedRead: addReadFD" << endl);
+      t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::NeedWrite:
+      TCPLOG("NeedWrite: addWriteFD" << endl);
+      t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
+      break;
+    case IOState::Done:
+      break;
+    }
+    break;
   }
-  catch (const std::exception& e) {
-    TCPLOG("TCPIOHandlerReadble exception..." << e.what() << endl);
-    PacketID tmp = *pident;
-    t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
-    PacketBuffer empty;
-    MT->sendEvent(tmp, &empty); // this conveys error status
-  }
+
 }
 
-static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var)
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var)
 {
-  PacketID* pid = boost::any_cast<PacketID>(&var);
+  auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
   assert(pid->tcphandler);
   assert(fd == pid->tcphandler->getDescriptor());
+  IOState newstate = IOState::Done;
 
-  TCPLOG("TCPIOHandlerWritable " << pid->outPos << '/' << pid->outMSG.size() << endl);
-  try {
-    IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
-    switch (state) {
-    case IOState::Done: {
-      TCPLOG("TCPIOHandlerWritable Done" << endl);
-      // removeWriteFD seems to clobber PacketID, so take a copy
-      PacketID tmp = *pid;
-      t_fdm->removeWriteFD(fd);
-      MT->sendEvent(tmp, &tmp.outMSG);  // send back what we sent to convey everything is ok
-      break;
+  TCPLOG("TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
+
+  // In the code below, we want to update the state of the fd before calling sendEvent
+  // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
+  
+  switch (pid->highState) {
+  case TCPAction::DoingRead:
+    TCPLOG("highState: Reading" << endl);
+    // try reading
+    try {
+      size_t pos = pid->inMSG.size();
+      pid->inMSG.resize(pos + pid->inNeeded); // make room for what we'll read
+      newstate = pid->tcphandler->tryRead(pid->inMSG, pos, pid->inNeeded);
+      switch (newstate) {
+      case IOState::Done:
+      case IOState::NeedRead:
+        TCPLOG("tryRead: Done or NeedRead " << int(newstate) << ' ' << pos << '/' << pid->inNeeded << endl);
+        pid->inMSG.resize(pos); // old content (if there) + new bytes read
+        pid->inNeeded -= pos;
+        TCPLOG("TCPIOHandlerIO " << pid->inNeeded << ' ' << pid->inIncompleteOkay << endl);
+        if (pid->inNeeded == 0 || pid->inIncompleteOkay) {
+          newstate = IOState::Done;
+          TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+          MT->sendEvent(*pid, &pid->inMSG);
+          return;
+        }
+        break;
+      case IOState::NeedWrite:
+        break;
+      }
     }
-    case IOState::NeedWrite:
-      TCPLOG("TCPIOHandlerWritable NeedWrite" << endl);
-      // We'll get back later
-      break;
-    case IOState::NeedRead:
-      TCPLOG("NeedRead: flip FD" << endl);
-      pid->inNeeded = 1;
-      t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, *pid);
-      break;
+    catch (const std::exception& e) {
+      newstate = IOState::Done;
+      TCPLOG("read exception..." << e.what() << endl);
+      PacketBuffer empty;
+      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+      MT->sendEvent(*pid, &empty); // this conveys error status
+      return;
     }
+    break;
+
+  case TCPAction::DoingWrite:
+    TCPLOG("highState: Writing" << endl);
+    try {
+      TCPLOG("tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << pid << " -> ");
+      newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
+      TCPLOG(pid->outPos << '/' << pid->outMSG.size() << endl);
+      switch (newstate) {
+      case IOState::Done: {
+        TCPLOG("tryWrite: Done" << endl);
+        TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+        MT->sendEvent(*pid, &pid->outMSG); // send back what we sent to convey everything is ok
+        return;
+      }
+      case IOState::NeedRead:
+        TCPLOG("tryWrite: NeedRead" << endl);
+        break;
+      case IOState::NeedWrite:
+        TCPLOG("tryWrite: NeedWrite" << endl);
+        break;
+      }
+    }
+    catch (const std::exception& e) {
+      newstate = IOState::Done;
+      TCPLOG("write exception..." << e.what() << endl);
+      PacketBuffer sent;
+      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+      MT->sendEvent(*pid, &sent); // we convey error status by sending empty string
+      return;
+    }
+    break;
   }
-  catch (const std::exception& e) {
-    TCPLOG("TCPIOHandlerWritable exception..." << e.what() << endl);
-    // removeWriteFD seems to clobber PacketID, so take a copy
-    PacketID tmp = *pid;
-    t_fdm->removeWriteFD(fd);
-    PacketBuffer sent;
-    MT->sendEvent(tmp, &sent);         // we convey error status by sending empty string
-  }
+
+  // Cases that did not end up doing a sendEvent
+  TCPIOHandlerStateChange(pid->lowState, newstate, pid);
 }
 
 // resend event to everybody chained onto it
index 7b18b624e274c92feb52f918f1ab3389eed3ca8c..b63ec0871ed4f25fc7b92f81f87396c4421b5677 100644 (file)
@@ -386,6 +386,17 @@ If `pdns-distributes-queries`_ is set, spawn this number of distributor threads
 handle incoming queries and distribute them to other threads based on a hash of the query, to maximize the cache hit
 ratio.
 
+.. _settings-dot-to-port-853:
+
+``dot-to-port-853``
+-------------------
+.. versionadded:: 4.6.0
+
+- Boolean
+- Default: ``yes`` if DoT support is compiled in, ``no`` otherwise.
+
+Enable DoT to forwarders that specify port 853.
+
 .. _setting-dns64-prefix:
 
 ``dns64-prefix``
index a9bf06b9602b643ae112e7978a06380c698c6f23..0e820dc46942e800d5f8d496074bedfb9e8ef8f7 100644 (file)
@@ -926,6 +926,8 @@ private:
 LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>&);
 LWResult::Result arecvtcp(PacketBuffer& data, size_t len, shared_ptr<TCPIOHandler>&, bool incompleteOkay);
 
+enum TCPAction : uint8_t { DoingRead, DoingWrite };
+
 struct PacketID
 {
   PacketID()
@@ -951,6 +953,8 @@ struct PacketID
   bool inIncompleteOkay{false};
   uint16_t id{0};  // wait for a specific id/remote pair
   uint16_t type{0};             // and this is its type
+  TCPAction highState;
+  IOState lowState;
 
   bool operator<(const PacketID& b) const
   {
index 33e09773f533363ddc341a0cdab2974363c5dc48..6b8a4c1fba67556c9ae96e0ca5b06ee413c02826 100644 (file)
@@ -8,7 +8,7 @@
 #include "misc.hh"
 #include "noinitvector.hh"
 
-enum class IOState { Done, NeedRead, NeedWrite };
+enum class IOState : uint8_t { Done, NeedRead, NeedWrite };
 
 class TLSConnection
 {