]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Handle health checks over TCP and DNS over TLS
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 27 Apr 2021 13:06:25 +0000 (15:06 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 26 Aug 2021 14:30:26 +0000 (16:30 +0200)
pdns/dnsdist-lua.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/dnsdistdist/dnsdist-healthchecks.cc
pdns/dnsdistdist/dnsdist-healthchecks.hh
pdns/dnsdistdist/dnsdist-tcp-downstream.cc
pdns/dnsdistdist/dnsdist-tcp-upstream.hh
pdns/dnsdistdist/tcpiohandler-mplexer.hh
pdns/libssl.hh
pdns/sstuff.hh
pdns/tcpiohandler.hh

index d0a0822da499213a394f048305bb22cb22efcfed..4e44913e19d64d0d3147329c8ce7431ca8869a64 100644 (file)
@@ -441,13 +441,17 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
       }
 
       if(vars.count("checkFunction")) {
-        ret->checkFunction= boost::get<DownstreamState::checkfunc_t>(vars["checkFunction"]);
+        ret->checkFunction = boost::get<DownstreamState::checkfunc_t>(vars["checkFunction"]);
       }
 
       if(vars.count("checkTimeout")) {
         ret->checkTimeout = std::stoi(boost::get<string>(vars["checkTimeout"]));
       }
 
+      if (vars.count("checkTCP")) {
+        ret->d_tcpCheck = boost::get<bool>(vars.at("checkTCP"));
+      }
+
       if(vars.count("setCD")) {
         ret->setCD=boost::get<bool>(vars["setCD"]);
       }
index 16bde52b517aac30f45aff9c869412255e0e33ea..7accc1e6802708f8615d25ed799aa87f5f245c6d 100644 (file)
@@ -1712,7 +1712,7 @@ static void healthChecksThread()
 
       if (dss->availability == DownstreamState::Availability::Auto) {
         if (!queueHealthCheck(mplexer, dss)) {
-          updateHealthCheckResult(dss, false);
+          updateHealthCheckResult(dss, false, false);
         }
       }
 
index ba505177ef8f8463a703f82b02fb3956581834dd..95121e01676e0e75822e0190b517035667179bc0 100644 (file)
@@ -188,7 +188,7 @@ struct DNSResponse : DNSQuestion
 class DNSAction
 {
 public:
-  enum class Action { Drop, Nxdomain, Refused, Spoof, Allow, HeaderModify, Pool, Delay, Truncate, ServFail, None, NoOp, NoRecurse, SpoofRaw };
+  enum class Action : uint8_t { Drop, Nxdomain, Refused, Spoof, Allow, HeaderModify, Pool, Delay, Truncate, ServFail, None, NoOp, NoRecurse, SpoofRaw };
   static std::string typeToString(const Action& action)
   {
     switch(action) {
@@ -659,6 +659,7 @@ struct DownstreamState
   SharedLockGuarded<std::vector<unsigned int>> hashes;
   std::vector<int> sockets;
   const std::string sourceItfName;
+  std::string d_tlsSubjectName;
   std::mutex connectLock;
   LockGuarded<std::unique_ptr<FDMultiplexer>> mplexer{nullptr};
   std::shared_ptr<TLSCtx> d_tlsCtx{nullptr};
@@ -670,7 +671,6 @@ struct DownstreamState
   checkfunc_t checkFunction;
   DNSName checkName{"a.root-servers.net."};
   QType checkType{QType::A};
-  std::string d_tlsSubjectName;
   uint16_t checkClass{QClass::IN};
   std::atomic<uint64_t> idOffset{0};
   std::atomic<bool> hashesComputed{false};
@@ -723,7 +723,7 @@ struct DownstreamState
   uint8_t minRiseSuccesses{1};
   StopWatch sw;
   set<string> pools;
-  enum class Availability { Up, Down, Auto} availability{Availability::Auto};
+  enum class Availability : uint8_t { Up, Down, Auto} availability{Availability::Auto};
   bool mustResolve{false};
   bool upStatus{false};
   bool useECS{false};
@@ -735,6 +735,7 @@ struct DownstreamState
   bool tcpFastOpen{false};
   bool ipBindAddrNoPort{true};
   bool reconnectOnUp{false};
+  bool d_tcpCheck{false};
 
   bool isUp() const
   {
@@ -804,6 +805,11 @@ struct DownstreamState
 
   void incCurrentConnectionsCount();
 
+  bool doHealthcheckOverTCP() const
+  {
+    return d_tcpCheck || d_tlsCtx != nullptr;
+  }
+
 private:
   std::string name;
   std::string nameWithAddr;
@@ -992,7 +998,7 @@ extern std::set<std::string> g_capabilitiesToRetain;
 static const uint16_t s_udpIncomingBufferSize{1500}; // don't accept UDP queries larger than this value
 static const size_t s_maxPacketCacheEntrySize{4096}; // don't cache responses larger than this value
 
-enum class ProcessQueryResult { Drop, SendAnswer, PassToBackend };
+enum class ProcessQueryResult : uint8_t { Drop, SendAnswer, PassToBackend };
 ProcessQueryResult processQuery(DNSQuestion& dq, ClientState& cs, LocalHolders& holders, std::shared_ptr<DownstreamState>& selectedBackend);
 
 DNSResponse makeDNSResponseFromIDState(IDState& ids, PacketBuffer& data);
index 44629e7ba17e4f35ca5c601909449a289aee45b6..f1b1fd59250dacdf82aaed13d932a872f8611323 100644 (file)
  */
 
 #include "dnsdist-healthchecks.hh"
+#include "tcpiohandler-mplexer.hh"
 #include "dnswriter.hh"
 #include "dolog.hh"
 
 bool g_verboseHealthChecks{false};
 
-void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool newState)
+struct HealthCheckData
 {
+  enum class TCPState : uint8_t { WritingQuery, ReadingResponseSize, ReadingResponse };
+
+  HealthCheckData(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_ds(ds), d_mplexer(mplexer), d_udpSocket(-1), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID)
+  {
+  }
+
+  const std::shared_ptr<DownstreamState> d_ds;
+  std::shared_ptr<FDMultiplexer> d_mplexer;
+  std::unique_ptr<TCPIOHandler> d_tcpHandler{nullptr};
+  std::unique_ptr<IOStateHandler> d_ioState{nullptr};
+  PacketBuffer d_buffer;
+  Socket d_udpSocket;
+  DNSName d_checkName;
+  struct timeval d_ttd{0, 0};
+  size_t d_bufferPos{0};
+  uint16_t d_checkType;
+  uint16_t d_checkClass;
+  uint16_t d_queryID;
+  TCPState d_tcpState{TCPState::WritingQuery};
+  bool d_initial{false};
+};
+
+void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool initial, bool newState)
+{
+  if (initial) {
+    warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
+    dss->setUpStatus(newState);
+    return;
+  }
+
   if (newState) {
     /* check succeeded */
     dss->currentCheckFailures = 0;
@@ -57,7 +88,8 @@ void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool n
       }
     }
   }
-  if(newState != dss->upStatus) {
+
+  if (newState != dss->upStatus) {
     warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
 
     if (newState && (!dss->connected || dss->reconnectOnUp)) {
@@ -81,27 +113,14 @@ static bool handleResponse(std::shared_ptr<HealthCheckData>& data)
 {
   auto& ds = data->d_ds;
   try {
-    string reply;
-    ComboAddress from;
-    data->d_sock.recvFrom(reply, from);
-
-    /* we are using a connected socket but hey.. */
-    if (from != ds->remote) {
+    if (data->d_buffer.size() < sizeof(dnsheader)) {
       if (g_verboseHealthChecks) {
-        infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), ds->remote.toStringWithPort());
-      }
-      return false;
-    }
-
-    const dnsheader * responseHeader = reinterpret_cast<const dnsheader *>(reply.c_str());
-
-    if (reply.size() < sizeof(*responseHeader)) {
-      if (g_verboseHealthChecks) {
-        infolog("Invalid health check response of size %d from backend %s, expecting at least %d", reply.size(), ds->getNameWithAddr(), sizeof(*responseHeader));
+        infolog("Invalid health check response of size %d from backend %s, expecting at least %d", data->d_buffer.size(), ds->getNameWithAddr(), sizeof(dnsheader));
       }
       return false;
     }
 
+    const dnsheader * responseHeader = reinterpret_cast<const dnsheader*>(data->d_buffer.data());
     if (responseHeader->id != data->d_queryID) {
       if (g_verboseHealthChecks) {
         infolog("Invalid health check response id %d from backend %s, expecting %d", data->d_queryID, ds->getNameWithAddr(), data->d_queryID);
@@ -132,7 +151,7 @@ static bool handleResponse(std::shared_ptr<HealthCheckData>& data)
 
     uint16_t receivedType;
     uint16_t receivedClass;
-    DNSName receivedName(reply.c_str(), reply.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
+    DNSName receivedName(reinterpret_cast<const char*>(data->d_buffer.data()), data->d_buffer.size(), sizeof(dnsheader), false, &receivedType, &receivedClass);
 
     if (receivedName != data->d_checkName || receivedType != data->d_checkType || receivedClass != data->d_checkClass) {
       if (g_verboseHealthChecks) {
@@ -159,20 +178,90 @@ static bool handleResponse(std::shared_ptr<HealthCheckData>& data)
   return true;
 }
 
-static void healthCheckCallback(int fd, FDMultiplexer::funcparam_t& param)
+static void healthCheckUDPCallback(int fd, FDMultiplexer::funcparam_t& param)
 {
   auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
   data->d_mplexer->removeReadFD(fd);
-  updateHealthCheckResult(data->d_ds, handleResponse(data));
+
+  ComboAddress from;
+  from.sin4.sin_family = data->d_ds->remote.sin4.sin_family;
+  auto fromlen = from.getSocklen();
+  data->d_buffer.resize(512);
+  auto got = recvfrom(data->d_udpSocket.getHandle(), &data->d_buffer.at(0), data->d_buffer.size(), 0, reinterpret_cast<sockaddr *>(&from), &fromlen);
+  if (got < 0) {
+    if (g_verboseHealthChecks) {
+      infolog("Error receiving health check response from %s: %s", data->d_ds->remote.toStringWithPort(), stringerror());
+    }
+    updateHealthCheckResult(data->d_ds, data->d_initial, false);
+  }
+
+  /* we are using a connected socket but hey.. */
+  if (from != data->d_ds->remote) {
+    if (g_verboseHealthChecks) {
+      infolog("Invalid health check response received from %s, expecting one from %s", from.toStringWithPort(), data->d_ds->remote.toStringWithPort());
+    }
+    updateHealthCheckResult(data->d_ds, data->d_initial, false);
+  }
+
+  updateHealthCheckResult(data->d_ds, data->d_initial, handleResponse(data));
 }
 
-static void initialHealthCheckCallback(int fd, FDMultiplexer::funcparam_t& param)
+static void healthCheckTCPCallback(int fd, FDMultiplexer::funcparam_t& param)
 {
   auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(param);
-  data->d_mplexer->removeReadFD(fd);
-  bool up = handleResponse(data);
-  warnlog("Marking downstream %s as '%s'", data->d_ds->getNameWithAddr(), up ? "up" : "down");
-  data->d_ds->setUpStatus(up);
+
+  IOStateGuard ioGuard(data->d_ioState);
+  try {
+    auto ioState = IOState::Done;
+
+    if (data->d_tcpState == HealthCheckData::TCPState::WritingQuery) {
+      ioState = data->d_tcpHandler->tryWrite(data->d_buffer, data->d_bufferPos, data->d_buffer.size());
+      if (ioState == IOState::Done) {
+        data->d_bufferPos = 0;
+        data->d_buffer.resize(sizeof(uint16_t));
+        data->d_tcpState = HealthCheckData::TCPState::ReadingResponseSize;
+      }
+    }
+
+    if (data->d_tcpState == HealthCheckData::TCPState::ReadingResponseSize) {
+      ioState = data->d_tcpHandler->tryRead(data->d_buffer, data->d_bufferPos, data->d_buffer.size());
+      if (ioState == IOState::Done) {
+        data->d_bufferPos = 0;
+        uint16_t responseSize;
+        memcpy(&responseSize, &data->d_buffer.at(0), sizeof(responseSize));
+        data->d_buffer.resize(ntohs(responseSize));
+        data->d_tcpState = HealthCheckData::TCPState::ReadingResponse;
+      }
+    }
+
+    if (data->d_tcpState == HealthCheckData::TCPState::ReadingResponse) {
+      ioState = data->d_tcpHandler->tryRead(data->d_buffer, data->d_bufferPos, data->d_buffer.size());
+      if (ioState == IOState::Done) {
+        updateHealthCheckResult(data->d_ds, data->d_initial, handleResponse(data));
+      }
+    }
+
+    if (ioState == IOState::Done) {
+      /* remove us from the mplexer, we are done */
+      data->d_ioState->update(ioState, healthCheckTCPCallback, data);
+    }
+    else {
+      data->d_ioState->update(ioState, healthCheckTCPCallback, data, data->d_ttd);
+    }
+
+    /* the state has been updated, we can release the guard */
+    ioGuard.release();
+  }
+  catch (const std::exception& e) {
+    if (g_verboseHealthChecks) {
+      infolog("Error checking the health of backend %s: %s", data->d_ds->getNameWithAddr(), e.what());
+    }
+  }
+  catch (...) {
+    if (g_verboseHealthChecks) {
+      infolog("Unknown exception while checking the health of backend %s", data->d_ds->getNameWithAddr());
+    }
+  }
 }
 
 bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initialCheck)
@@ -204,18 +293,29 @@ bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared
 
     PacketBuffer packet;
     GenericDNSPacketWriter<PacketBuffer> dpw(packet, checkName, checkType, checkClass);
-    dnsheader * requestHeader = dpw.getHeader();
+    dnsheader* requestHeader = dpw.getHeader();
     *requestHeader = checkHeader;
 
+    /* we need to compute that _before_ adding the proxy protocol payload */
+    uint16_t packetSize = packet.size();
+    size_t proxyProtocolPayloadSize = 0;
     if (ds->useProxyProtocol) {
       auto payload = makeLocalProxyHeader();
+      proxyProtocolPayloadSize = payload.size();
       packet.insert(packet.begin(), payload.begin(), payload.end());
     }
 
-    Socket sock(ds->remote.sin4.sin_family, SOCK_DGRAM);
+    Socket sock(ds->remote.sin4.sin_family, ds->doHealthcheckOverTCP() ? SOCK_STREAM : SOCK_DGRAM);
+
     sock.setNonBlocking();
     if (!IsAnyAddress(ds->sourceAddr)) {
       sock.setReuseAddr();
+#ifdef IP_BIND_ADDRESS_NO_PORT
+      if (ds->ipBindAddrNoPort) {
+        SSetsockopt(sock.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
+      }
+#endif
+
       if (!ds->sourceItfName.empty()) {
 #ifdef SO_BINDTODEVICE
         int res = setsockopt(sock.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, ds->sourceItfName.c_str(), ds->sourceItfName.length());
@@ -226,29 +326,56 @@ bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared
       }
       sock.bind(ds->sourceAddr);
     }
-    sock.connect(ds->remote);
-    ssize_t sent = udpClientSendRequestToBackend(ds, sock.getHandle(), packet, true);
-    if (sent < 0) {
-      int ret = errno;
-      if (g_verboseHealthChecks)
-        infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
-      return false;
+
+    auto data = std::make_shared<HealthCheckData>(mplexer, ds, std::move(checkName), checkType, checkClass, queryID);
+    data->d_initial = initialCheck;
+
+    gettimeofday(&data->d_ttd, nullptr);
+    data->d_ttd.tv_sec += ds->checkTimeout / 1000; /* ms to seconds */
+    data->d_ttd.tv_usec += (ds->checkTimeout % 1000) * 1000; /* remaining ms to us */
+    if (data->d_ttd.tv_usec > 1000000) {
+      ++data->d_ttd.tv_sec;
+      data->d_ttd.tv_usec -= 1000000;
+    }
+
+    if (!ds->doHealthcheckOverTCP()) {
+      sock.connect(ds->remote);
+      data->d_udpSocket = std::move(sock);
+      ssize_t sent = udpClientSendRequestToBackend(ds, data->d_udpSocket.getHandle(), packet, true);
+      if (sent < 0) {
+        int ret = errno;
+        if (g_verboseHealthChecks) {
+          infolog("Error while sending a health check query to backend %s: %d", ds->getNameWithAddr(), ret);
+        }
+        return false;
+      }
+
+      mplexer->addReadFD(data->d_udpSocket.getHandle(), &healthCheckUDPCallback, data, &data->d_ttd);
     }
+    else {
+      data->d_tcpHandler = std::make_unique<TCPIOHandler>(ds->d_tlsSubjectName, sock.releaseHandle(), timeval{ds->checkTimeout,0}, ds->d_tlsCtx, time(nullptr));
+      data->d_ioState = std::make_unique<IOStateHandler>(*mplexer, data->d_tcpHandler->getDescriptor());
+
+      data->d_tcpHandler->tryConnect(ds->tcpFastOpen, ds->remote);
+
+      const uint8_t sizeBytes[] = { static_cast<uint8_t>(packetSize / 256), static_cast<uint8_t>(packetSize % 256) };
+      packet.insert(packet.begin() + proxyProtocolPayloadSize, sizeBytes, sizeBytes + 2);
+      data->d_buffer = std::move(packet);
 
-    auto data = std::make_shared<HealthCheckData>(mplexer, ds, std::move(sock), std::move(checkName), checkType, checkClass, queryID);
-    struct timeval ttd;
-    gettimeofday(&ttd, nullptr);
-    ttd.tv_sec += ds->checkTimeout / 1000; /* ms to seconds */
-    ttd.tv_usec += (ds->checkTimeout % 1000) * 1000; /* remaining ms to us */
-    if (ttd.tv_usec > 1000000) {
-      ++ttd.tv_sec;
-      ttd.tv_usec -= 1000000;
+      auto ioState = data->d_tcpHandler->tryWrite(data->d_buffer, data->d_bufferPos, data->d_buffer.size());
+      if (ioState == IOState::Done) {
+        data->d_bufferPos = 0;
+        data->d_buffer.resize(sizeof(uint16_t));
+        data->d_tcpState = HealthCheckData::TCPState::ReadingResponseSize;
+        ioState = IOState::NeedRead;
+      }
+
+      data->d_ioState->update(ioState, healthCheckTCPCallback, data, data->d_ttd);
     }
-    mplexer->addReadFD(data->d_sock.getHandle(), initialCheck ? &initialHealthCheckCallback : &healthCheckCallback, data, &ttd);
 
     return true;
   }
-  catch(const std::exception& e)
+  catch (const std::exception& e)
   {
     if (g_verboseHealthChecks) {
       infolog("Error checking the health of backend %s: %s", ds->getNameWithAddr(), e.what());
@@ -266,7 +393,7 @@ bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared
 
 void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial)
 {
-  while (mplexer->getWatchedFDCount(false) > 0) {
+  while (mplexer->getWatchedFDCount(false) > 0 || mplexer->getWatchedFDCount(true) > 0) {
     struct timeval now;
     int ret = mplexer->run(&now, 100);
     if (ret == -1) {
@@ -277,17 +404,52 @@ void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool init
     }
     auto timeouts = mplexer->getTimeouts(now);
     for (const auto& timeout : timeouts) {
-      mplexer->removeReadFD(timeout.first);
       auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second);
-      if (g_verboseHealthChecks) {
-        infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr());
+      try {
+        if (data->d_ioState) {
+          data->d_ioState.reset();
+        }
+        else {
+          mplexer->removeReadFD(timeout.first);
+        }
+        if (g_verboseHealthChecks) {
+          infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr());
+        }
+
+        updateHealthCheckResult(data->d_ds, initial, false);
       }
-      if (initial) {
-        warnlog("Marking downstream %s as 'down'", data->d_ds->getNameWithAddr());
-        data->d_ds->setUpStatus(false);
+      catch (const std::exception& e) {
+        if (g_verboseHealthChecks) {
+          infolog("Error while delaing with a timeout for the health check response from backend %s: %s", data->d_ds->getNameWithAddr(), e.what());
+        }
+      }
+      catch (...) {
+        if (g_verboseHealthChecks) {
+          infolog("Error while delaing with a timeout for the health check response from backend %s", data->d_ds->getNameWithAddr());
+        }
       }
-      else {
-        updateHealthCheckResult(data->d_ds, false);
+    }
+
+    timeouts = mplexer->getTimeouts(now, true);
+    for (const auto& timeout : timeouts) {
+      auto data = boost::any_cast<std::shared_ptr<HealthCheckData>>(timeout.second);
+      try {
+        data->d_ioState.reset();
+        if (g_verboseHealthChecks) {
+          infolog("Timeout while waiting for the health check response from backend %s", data->d_ds->getNameWithAddr());
+        }
+
+        updateHealthCheckResult(data->d_ds, initial, false);
+      }
+      catch (const std::exception& e) {
+        if (g_verboseHealthChecks) {
+          infolog("Error while delaing with a timeout for the health check response from backend %s: %s", data->d_ds->getNameWithAddr(), e.what());
+        }
+      }
+      catch (...) {
+        if (g_verboseHealthChecks) {
+          infolog("Error while delaing with a timeout for the health check response from backend %s", data->d_ds->getNameWithAddr());
+        }
       }
     }
   }
index 99d512b8c3979a183763fdea0e06a06dcb36e355..08ac59948d9b5ea0be99224c0941934256178ba9 100644 (file)
 #include "mplexer.hh"
 #include "sstuff.hh"
 
-struct HealthCheckData
-{
-  HealthCheckData(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, Socket&& sock, DNSName&& checkName, uint16_t checkType, uint16_t checkClass, uint16_t queryID): d_mplexer(mplexer), d_ds(ds), d_sock(std::move(sock)), d_checkName(std::move(checkName)), d_checkType(checkType), d_checkClass(checkClass), d_queryID(queryID)
-  {
-  }
-
-  std::shared_ptr<FDMultiplexer> d_mplexer;
-  const std::shared_ptr<DownstreamState> d_ds;
-  Socket d_sock;
-  DNSName d_checkName;
-  uint16_t d_checkType;
-  uint16_t d_checkClass;
-  uint16_t d_queryID;
-};
-
 extern bool g_verboseHealthChecks;
 
-void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool newState);
+void updateHealthCheckResult(const std::shared_ptr<DownstreamState>& dss, bool initial, bool newState);
 bool queueHealthCheck(std::shared_ptr<FDMultiplexer>& mplexer, const std::shared_ptr<DownstreamState>& ds, bool initial=false);
 void handleQueuedHealthChecks(std::shared_ptr<FDMultiplexer>& mplexer, bool initial=false);
 
index e7f67a056f6da521f9c0cee9eea755449c502aee..6b11c6809412b096b4ac7f8f3d7dc96a3a9a095f 100644 (file)
@@ -167,7 +167,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& c
 
         try {
           if (conn->reconnect()) {
-            conn->d_ioState = make_unique<IOStateHandler>(conn->d_mplexer, conn->d_handler->getDescriptor());
+            conn->d_ioState = make_unique<IOStateHandler>(*conn->d_mplexer, conn->d_handler->getDescriptor());
 
             /* we need to resend the queries that were in flight, if any */
             for (auto& pending : conn->d_pendingResponses) {
@@ -271,7 +271,7 @@ void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender,
 {
   if (!d_sender) {
     d_sender = sender;
-    d_ioState = make_unique<IOStateHandler>(d_mplexer, d_handler->getDescriptor());
+    d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
   }
   else if (d_sender != sender) {
     throw std::runtime_error("Assigning a query from a different client to an existing backend connection with pending queries");
index 7db91dca5e01bd8963d9da5801c338d1efdc0656..e628e7c908b27c415e4f078647dbbe4e99f8fdd0 100644 (file)
@@ -18,7 +18,7 @@ public:
 class IncomingTCPConnectionState : public TCPQuerySender, public std::enable_shared_from_this<IncomingTCPConnectionState>
 {
 public:
-  IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(s_maxPacketCacheEntrySize), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, timeval{g_tcpRecvTimeout,0}, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_ioState(make_unique<IOStateHandler>(threadData.mplexer, d_ci.fd)), d_connectionStartTime(now)
+  IncomingTCPConnectionState(ConnectionInfo&& ci, TCPClientThreadData& threadData, const struct timeval& now): d_buffer(s_maxPacketCacheEntrySize), d_threadData(threadData), d_ci(std::move(ci)), d_handler(d_ci.fd, timeval{g_tcpRecvTimeout,0}, d_ci.cs->tlsFrontend ? d_ci.cs->tlsFrontend->getContext() : nullptr, now.tv_sec), d_ioState(make_unique<IOStateHandler>(*threadData.mplexer, d_ci.fd)), d_connectionStartTime(now)
   {
     d_origDest.reset();
     d_origDest.sin4.sin_family = d_ci.remote.sin4.sin_family;
index c7d0c26e09f3ca64c4aef6c492e983f108d4e27d..7aae6e3df697e38469e7ed441a80e58e158f7175 100644 (file)
 class IOStateHandler
 {
 public:
-  IOStateHandler(std::unique_ptr<FDMultiplexer>& mplexer, const int fd): d_mplexer(mplexer), d_fd(fd), d_currentState(IOState::Done)
+  IOStateHandler(FDMultiplexer& mplexer, const int fd): d_mplexer(mplexer), d_fd(fd), d_currentState(IOState::Done)
   {
   }
 
-  IOStateHandler(std::unique_ptr<FDMultiplexer>& mplexer): d_mplexer(mplexer), d_fd(-1), d_currentState(IOState::Done)
+  IOStateHandler(FDMultiplexer& mplexer): d_mplexer(mplexer), d_fd(-1), d_currentState(IOState::Done)
   {
   }
 
@@ -53,12 +53,12 @@ public:
     DEBUGLOG("in "<<__PRETTY_FUNCTION__<<" for fd "<<d_fd<<", last state was "<<(int)d_currentState<<", new state is "<<(int)iostate);
     if (d_currentState == IOState::NeedRead && iostate == IOState::Done) {
       DEBUGLOG(__PRETTY_FUNCTION__<<": remove read FD "<<d_fd);
-      d_mplexer->removeReadFD(d_fd);
+      d_mplexer.removeReadFD(d_fd);
       d_currentState = IOState::Done;
     }
     else if (d_currentState == IOState::NeedWrite && iostate == IOState::Done) {
       DEBUGLOG(__PRETTY_FUNCTION__<<": remove write FD "<<d_fd);
-      d_mplexer->removeWriteFD(d_fd);
+      d_mplexer.removeWriteFD(d_fd);
       d_currentState = IOState::Done;
     }
 
@@ -66,17 +66,17 @@ public:
       if (d_currentState == IOState::NeedRead) {
         if (ttd) {
           /* let's update the TTD ! */
-          d_mplexer->setReadTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
+          d_mplexer.setReadTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
         }
         return;
       }
 
       if (d_currentState == IOState::NeedWrite) {
-        d_mplexer->alterFDToRead(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
+        d_mplexer.alterFDToRead(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
         DEBUGLOG(__PRETTY_FUNCTION__<<": alter from write to read FD "<<d_fd);
       }
       else {
-        d_mplexer->addReadFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
+        d_mplexer.addReadFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
         DEBUGLOG(__PRETTY_FUNCTION__<<": add read FD "<<d_fd);
       }
 
@@ -87,17 +87,17 @@ public:
       if (d_currentState == IOState::NeedWrite) {
         if (ttd) {
           /* let's update the TTD ! */
-          d_mplexer->setWriteTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
+          d_mplexer.setWriteTTD(d_fd, *ttd, /* we pass 0 here because we already have a TTD */0);
         }
         return;
       }
 
       if (d_currentState == IOState::NeedRead) {
-        d_mplexer->alterFDToWrite(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
+        d_mplexer.alterFDToWrite(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
         DEBUGLOG(__PRETTY_FUNCTION__<<": alter from read to write FD "<<d_fd);
       }
       else {
-        d_mplexer->addWriteFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
+        d_mplexer.addWriteFD(d_fd, callback, callbackData, ttd ? &*ttd : nullptr);
         DEBUGLOG(__PRETTY_FUNCTION__<<": add write FD "<<d_fd);
       }
 
@@ -110,7 +110,7 @@ public:
   }
 
 private:
-  std::unique_ptr<FDMultiplexer>& d_mplexer;
+  FDMultiplexer& d_mplexer;
   int d_fd;
   IOState d_currentState;
 };
index 514b2ad7df320b6b8e621f4ad24181a2c1251973..b090afa7b146d59200d1a79d1ffe881978de7557 100644 (file)
@@ -11,7 +11,7 @@
 #include "circular_buffer.hh"
 #include "lock.hh"
 
-enum class LibsslTLSVersion { Unknown, TLS10, TLS11, TLS12, TLS13 };
+enum class LibsslTLSVersion : uint8_t { Unknown, TLS10, TLS11, TLS12, TLS13 };
 
 class TLSConfig
 {
index 5787f41fb7431438317fb63ae12f12a78151fce6..f84d50c36409d586f46cb69b06a5464069309f56 100644 (file)
@@ -63,6 +63,14 @@ public:
     rhs.d_socket = -1;
   }
 
+  Socket& operator=(Socket&& rhs)
+  {
+    d_socket = rhs.d_socket;
+    rhs.d_socket = -1;
+    d_buffer = std::move(rhs.d_buffer);
+    return *this;
+  }
+
   ~Socket()
   {
     try {
index 6b8a4c1fba67556c9ae96e0ca5b06ee413c02826..c73d4d0999a80c0f9b4754f467e09cd6e4e66d43 100644 (file)
@@ -190,7 +190,7 @@ protected:
 class TCPIOHandler
 {
 public:
-  enum class Type { Client, Server };
+  enum class Type : uint8_t { Client, Server };
 
   TCPIOHandler(const std::string& host, int socket, const struct timeval& timeout, std::shared_ptr<TLSCtx> ctx, time_t now): d_socket(socket)
   {