]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
auth: lua refactor health checks monitoring
authorCharles-Henri Bruyand <charles-henri.bruyand@open-xchange.com>
Thu, 29 Aug 2019 10:36:57 +0000 (12:36 +0200)
committerPeter van Dijk <peter.van.dijk@powerdns.com>
Mon, 9 Dec 2019 08:41:57 +0000 (09:41 +0100)
docs/lua-records/functions.rst
docs/settings.rst
pdns/common_startup.cc
pdns/common_startup.hh
pdns/lua-record.cc
pdns/minicurl.cc
pdns/minicurl.hh
regression-tests.auth-py/authtests.py

index 3b30327dab42ed0a7d821cbd03c99cdfe3af5158..227a2df96923d043b86fd3fb6c96799749c0cd27 100644 (file)
@@ -49,6 +49,7 @@ Record creation functions
   - ``selector``: used to pick the IP address from list of viable candidates. Choices include 'pickclosest', 'random', 'hashed', 'all' (default to 'random').
   - ``backupSelector``: used to pick the IP address from list of all candidates if all addresses are down. Choices include 'pickclosest', 'random', 'hashed', 'all' (default to 'random').
   - ``source``: Source IP address to check from
+  - ``timeout``: Maximum time in seconds that you allow the check to take (default 2)
 
 
 .. function:: ifurlup(url, addresses[, options])
@@ -68,6 +69,7 @@ Record creation functions
   - ``selector``: used to pick the IP address from list of viable candidates. Choices include 'pickclosest', 'random', 'hashed', 'all' (default to 'random').
   - ``backupSelector``: used to pick the IP address from list of all candidates if all addresses are down. Choices include 'pickclosest', 'random', 'hashed', 'all' (default to 'random').
   - ``source``: Source IP address to check from
+  - ``timeout``: Maximum time in seconds that you allow the check to take (default 2)
   - ``stringmatch``: check ``url`` for this string, only declare 'up' if found
   - ``useragent``: Set the HTTP "User-Agent" header in the requests. By default it is set to "PowerDNS Authoritative Server"
 
index a5d7b2b9c4dbbc34e39714be516e57a08f964d0d..2bf913eb7871d55139afb6239cc8937cf910be55 100644 (file)
@@ -877,6 +877,32 @@ e.g. error = 3, warning = 4, notice = 5, info = 6
 
 Script to be used to edit incoming AXFRs, see :ref:`modes-of-operation-axfrfilter`
 
+.. _setting-lua-health-checks-expire-delay:
+
+``lua-health-checks-expire-delay``
+-----------------------------
+
+-  Integer
+-  Default: 3600
+
+.. versionadded:: 4.3.0
+
+Amount of time (in seconds) to expire (remove) a LUA monitoring check when the record
+isn't used any more (either deleted or modified).
+
+.. _setting-lua-health-checks-interval:
+
+``lua-health-checks-interval``
+-----------------------------
+
+-  Integer
+-  Default: 5
+
+.. versionadded:: 4.3.0
+
+Amount of time (in seconds) between subsequent monitoring health checks. Does nothing
+if the checks take more than that time to execute.
+
 .. _setting-lua-prequery-script:
 
 ``lua-prequery-script``
index 9ad7459e21805234f204b9b6f3c0428215bf7ff7..37a843634f3613a21dbaaa500099a7c9d0b8c60e 100644 (file)
@@ -43,6 +43,8 @@ bool g_8bitDNS;
 #ifdef HAVE_LUA_RECORDS
 bool g_doLuaRecord;
 int g_luaRecordExecLimit;
+time_t g_luaHealthChecksInterval{5};
+time_t g_luaHealthChecksExpireDelay{3600};
 #endif
 typedef Distributor<DNSPacket,DNSPacket,PacketHandler> DNSDistributor;
 
@@ -217,6 +219,9 @@ void declareArguments()
 #ifdef HAVE_LUA_RECORDS
   ::arg().setSwitch("enable-lua-records", "Process LUA records for all zones (metadata overrides this)")="no";
   ::arg().set("lua-records-exec-limit", "LUA records scripts execution limit (instructions count). Values <= 0 mean no limit")="1000";
+  ::arg().set("lua-health-checks-expire-delay", "Stops doing health checks after the record hasn't been unused for that delay (in seconds)")="3600";
+  ::arg().set("lua-health-checks-interval", "LUA records health checks monitoring interval in seconds")="5";
+  ::arg().set("lua-health-checks-timeout", "Maximum time in milliseconds that you allow the LUA monitoring health checks to take")="500";
 #endif
   ::arg().setSwitch("axfr-lower-serial", "Also AXFR a zone from a master with a lower serial")="no";
 
@@ -519,6 +524,8 @@ void mainthread()
    g_doLuaRecord = ::arg().mustDo("enable-lua-records");
    g_LuaRecordSharedState = (::arg()["enable-lua-records"] == "shared");
    g_luaRecordExecLimit = ::arg().asNum("lua-records-exec-limit");
+   g_luaHealthChecksInterval = ::arg().asNum("lua-health-checks-interval");
+   g_luaHealthChecksExpireDelay = ::arg().asNum("lua-health-checks-expire-delay");
 #endif
 
    DNSPacket::s_udpTruncationThreshold = std::max(512, ::arg().asNum("udp-truncation-threshold"));
index d7c0869a8ba1759f3988ffe57c988150221dba03..01f2d041e776212d77c95df4832fa7084f41efda 100644 (file)
@@ -58,6 +58,8 @@ extern bool g_8bitDNS;
 #ifdef HAVE_LUA_RECORDS
 extern bool g_doLuaRecord;
 extern bool g_LuaRecordSharedState;
+extern time_t g_luaHealthChecksInterval;
+extern time_t g_luaHealthChecksExpireDelay;
 #endif // HAVE_LUA_RECORDS
 
 #endif // COMMON_STARTUP_HH
index f3f6bc1776042b232127bf3b0e2bf2efd3fb8e2f..6e1418ff91c446cbcfe4ddd4666ab1431dfe1480 100644 (file)
@@ -1,15 +1,15 @@
+#include <future>
+#include <mutex>
+#include <boost/format.hpp>
 #include "version.hh"
 #include "ext/luawrapper/include/LuaContext.hpp"
 #include "lua-auth4.hh"
-#include <thread>
 #include "sstuff.hh"
-#include <mutex>
 #include "minicurl.hh"
 #include "ueberbackend.hh"
-#include <boost/format.hpp>
 #include "dnsrecords.hh"
 #include "dns_random.hh"
-
+#include "common_startup.hh"
 #include "../modules/geoipbackend/geoipinterface.hh" // only for the enum
 
 /* to do:
@@ -60,30 +60,145 @@ private:
         std::make_tuple(rhs.rem, rhs.url, rhsoopts);
     }
   };
+  struct CheckState
+  {
+    CheckState(time_t _lastAccess): lastAccess(_lastAccess) {}
+    /* current status */
+    std::atomic<bool> status{false};
+    /* first check ? */
+    std::atomic<bool> first{true};
+    /* last time the status was accessed */
+    std::atomic<time_t> lastAccess{0};
+  };
+
+  pthread_rwlock_t d_lock;
 public:
+  IsUpOracle()
+  {
+    pthread_rwlock_init(&d_lock, nullptr);
+  }
+  ~IsUpOracle()
+  {
+    pthread_rwlock_destroy(&d_lock);
+  }
   bool isUp(const ComboAddress& remote, const opts_t& opts);
   bool isUp(const ComboAddress& remote, const std::string& url, const opts_t& opts);
   bool isUp(const CheckDesc& cd);
 
 private:
-  void checkURLThread(ComboAddress rem, std::string url, const opts_t& opts);
-  void checkTCPThread(ComboAddress rem, const opts_t& opts);
+  void checkURL(const CheckDesc& cd, const bool status, const bool first = false)
+  {
+    try {
+      int timeout = 1;
+      if (cd.opts.count("timeout")) {
+        timeout = std::atoi(cd.opts.at("timeout").c_str());
+      }
+      string useragent = productName();
+      if (cd.opts.count("useragent")) {
+        useragent = cd.opts.at("useragent");
+      }
+      MiniCurl mc(useragent);
 
-  struct Checker
+      string content;
+      if (cd.opts.count("source")) {
+        ComboAddress src(cd.opts.at("source"));
+        content=mc.getURL(cd.url, &cd.rem, &src, timeout);
+      }
+      else {
+        content=mc.getURL(cd.url, &cd.rem, nullptr, timeout);
+      }
+      if (cd.opts.count("stringmatch") && content.find(cd.opts.at("stringmatch")) == string::npos) {
+        throw std::runtime_error(boost::str(boost::format("unable to match content with `%s`") % cd.opts.at("stringmatch")));
+      }
+      if(!status) {
+        g_log<<Logger::Warning<<"LUA record monitoring declaring "<<cd.rem.toString()<<" UP for URL "<<cd.url<<"!"<<endl;
+      }
+      setUp(cd);
+    }
+    catch(std::exception& ne) {
+      if(status || first)
+        g_log<<Logger::Warning<<"LUA record monitoring declaring "<<cd.rem.toString()<<" DOWN for URL "<<cd.url<<", error: "<<ne.what()<<endl;
+      setDown(cd);
+    }
+  }
+  void checkTCP(const CheckDesc& cd, const bool status, const bool first = false) {
+    try {
+      int timeout = 1;
+      if (cd.opts.count("timeout")) {
+        timeout = std::atoi(cd.opts.at("timeout").c_str());
+      }
+      Socket s(cd.rem.sin4.sin_family, SOCK_STREAM);
+      ComboAddress src;
+      s.setNonBlocking();
+      if (cd.opts.count("source")) {
+        src = ComboAddress(cd.opts.at("source"));
+        s.bind(src);
+      }
+      s.connect(cd.rem, timeout);
+      if (!status) {
+        g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<cd.rem.toStringWithPort()<<" ";
+        if(cd.opts.count("source"))
+          g_log<<"(source "<<src.toString()<<") ";
+        g_log<<"UP!"<<endl;
+      }
+      setUp(cd);
+    }
+    catch (const NetworkError& ne) {
+      if(status || first) {
+        g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<cd.rem.toStringWithPort()<<" DOWN: "<<ne.what()<<endl;
+      }
+      setDown(cd);
+    }
+  }
+  void checkThread()
   {
-    std::thread thr;
-    bool status;
-  };
+    while (true)
+    {
+      std::chrono::system_clock::time_point checkStart = std::chrono::system_clock::now();
+      std::vector<std::future<void>> results;
+      std::vector<CheckDesc> toDelete;
+      {
+        ReadLock lock{&d_lock}; // make sure there's no insertion
+        for (auto& it: d_statuses) {
+          auto& desc = it.first;
+          auto& state = it.second;
+
+          if (desc.url.empty()) { // TCP
+            results.push_back(std::async(std::launch::async, &IsUpOracle::checkTCP, this, desc, state->status.load(), state->first.load()));
+          } else { // URL
+            results.push_back(std::async(std::launch::async, &IsUpOracle::checkURL, this, desc, state->status.load(), state->first.load()));
+          }
+          if (std::chrono::system_clock::from_time_t(state->lastAccess) < (checkStart - std::chrono::seconds(g_luaHealthChecksExpireDelay))) {
+            toDelete.push_back(desc);
+          }
+        }
+      }
+      // we can release the lock as nothing will be deleted
+      for (auto& future: results) {
+        future.wait();
+      }
+      if (!toDelete.empty()) {
+        WriteLock lock{&d_lock};
+        for (auto& it: toDelete) {
+          d_statuses.erase(it);
+        }
+      }
+      std::this_thread::sleep_until(checkStart + std::chrono::seconds(g_luaHealthChecksInterval));
+    }
+  }
 
-  typedef map<CheckDesc, Checker> statuses_t;
+  typedef map<CheckDesc, std::unique_ptr<CheckState>> statuses_t;
   statuses_t d_statuses;
 
-  std::mutex d_mutex;
+  std::unique_ptr<std::thread> d_checkerThread;
 
   void setStatus(const CheckDesc& cd, bool status)
   {
-    std::lock_guard<std::mutex> l(d_mutex);
-    d_statuses[cd].status=status;
+    auto& state = d_statuses[cd];
+    state->status = status;
+    if (state->first) {
+      state->first = false;
+    }
   }
 
   void setDown(const ComboAddress& rem, const std::string& url=std::string(), const opts_t& opts = opts_t())
@@ -108,25 +223,30 @@ private:
   {
     setStatus(cd, true);
   }
-
-  bool upStatus(const ComboAddress& rem, const std::string& url=std::string(), const opts_t& opts = opts_t())
-  {
-    CheckDesc cd{rem, url, opts};
-    std::lock_guard<std::mutex> l(d_mutex);
-    return d_statuses[cd].status;
-  }
 };
 
 bool IsUpOracle::isUp(const CheckDesc& cd)
 {
-  std::lock_guard<std::mutex> l(d_mutex);
-  auto iter = d_statuses.find(cd);
-  if(iter == d_statuses.end()) {
-    d_statuses[cd]=Checker{std::thread(&IsUpOracle::checkTCPThread, this, cd.rem, cd.opts), false};
-    return false;
+  if (!d_checkerThread) {
+    d_checkerThread = std::unique_ptr<std::thread>(new std::thread(&IsUpOracle::checkThread, this));
   }
-  return iter->second.status;
-
+  time_t now = time(nullptr);
+  {
+    ReadLock lock{&d_lock};
+    auto iter = d_statuses.find(cd);
+    if (iter != d_statuses.end()) {
+      iter->second->lastAccess = now;
+      return iter->second->status;
+    }
+  }
+  {
+    WriteLock lock{&d_lock};
+    // Make sure we don't insert new entry twice now we have the lock
+    if (d_statuses.find(cd) == d_statuses.end()) {
+      d_statuses[cd] = std::unique_ptr<CheckState>(new CheckState{now});
+    }
+  }
+  return false;
 }
 
 bool IsUpOracle::isUp(const ComboAddress& remote, const opts_t& opts)
@@ -138,85 +258,9 @@ bool IsUpOracle::isUp(const ComboAddress& remote, const opts_t& opts)
 bool IsUpOracle::isUp(const ComboAddress& remote, const std::string& url, const opts_t& opts)
 {
   CheckDesc cd{remote, url, opts};
-  std::lock_guard<std::mutex> l(d_mutex);
-  auto iter = d_statuses.find(cd);
-  if(iter == d_statuses.end()) {
-    //    g_log<<Logger::Warning<<"Launching HTTP(s) status checker for "<<remote.toStringWithPort()<<" and URL "<<url<<endl;
-    d_statuses[cd]=Checker{std::thread(&IsUpOracle::checkURLThread, this, remote, url, opts), false};
-    return false;
-  }
-
-  return iter->second.status;
-}
-
-void IsUpOracle::checkTCPThread(ComboAddress rem, const opts_t& opts)
-{
-  CheckDesc cd{rem, "", opts};
-  setDown(cd);
-  for(bool first=true;;first=false) {
-    try {
-      Socket s(rem.sin4.sin_family, SOCK_STREAM);
-      ComboAddress src;
-      s.setNonBlocking();
-      if(opts.count("source")) {
-        src=ComboAddress(opts.at("source"));
-        s.bind(src);
-      }
-      s.connect(rem, 1);
-      if(!isUp(cd)) {
-        g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<rem.toStringWithPort()<<" ";
-        if(opts.count("source"))
-          g_log<<"(source "<<src.toString()<<") ";
-        g_log<<"UP!"<<endl;
-      }
-      setUp(cd);
-    }
-    catch(NetworkError& ne) {
-      if(isUp(rem, opts) || first)
-        g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<rem.toStringWithPort()<<" DOWN: "<<ne.what()<<endl;
-      setDown(cd);
-    }
-    sleep(1);
-  }
-}
-
-
-void IsUpOracle::checkURLThread(ComboAddress rem, std::string url, const opts_t& opts)
-{
-  setDown(rem, url, opts);
-  for(bool first=true;;first=false) {
-    try {
-      string useragent = productName();
-      if (opts.count("useragent")) {
-        useragent = opts.at("useragent");
-      }
-      MiniCurl mc(useragent);
-
-      string content;
-      if(opts.count("source")) {
-        ComboAddress src(opts.at("source"));
-        content=mc.getURL(url, &rem, &src);
-      }
-      else {
-        content=mc.getURL(url, &rem);
-      }
-      if(opts.count("stringmatch") && content.find(opts.at("stringmatch")) == string::npos) {
-        throw std::runtime_error(boost::str(boost::format("unable to match content with `%s`") % opts.at("stringmatch")));
-      }
-      if(!upStatus(rem,url,opts))
-        g_log<<Logger::Warning<<"LUA record monitoring declaring "<<rem.toString()<<" UP for URL "<<url<<"!"<<endl;
-      setUp(rem, url,opts);
-    }
-    catch(std::exception& ne) {
-      if(upStatus(rem,url,opts) || first)
-        g_log<<Logger::Warning<<"LUA record monitoring declaring "<<rem.toString()<<" DOWN for URL "<<url<<", error: "<<ne.what()<<endl;
-      setDown(rem,url,opts);
-    }
-    sleep(5);
-  }
+  return isUp(cd);
 }
 
-
 IsUpOracle g_up;
 namespace {
 template<typename T, typename C>
index 5c846d09bd67fb666d975e3de048e3615cf808aa..68ad5f5e2078d486e9ae3dc542c1a9b8dde4a9b5 100644 (file)
@@ -74,7 +74,7 @@ static string extractHostFromURL(const std::string& url)
   return url.substr(pos, endpos-pos);
 }
 
-void MiniCurl::setupURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src)
+void MiniCurl::setupURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src, int timeout)
 {
   if(rem) {
     struct curl_slist *hostlist = nullptr; // THIS SHOULD BE FREED
@@ -111,15 +111,15 @@ void MiniCurl::setupURL(const std::string& str, const ComboAddress* rem, const C
   curl_easy_setopt(d_curl, CURLOPT_URL, str.c_str());
   curl_easy_setopt(d_curl, CURLOPT_WRITEFUNCTION, write_callback);
   curl_easy_setopt(d_curl, CURLOPT_WRITEDATA, this);
-  curl_easy_setopt(d_curl, CURLOPT_TIMEOUT, 2L);
+  curl_easy_setopt(d_curl, CURLOPT_TIMEOUT, static_cast<long>(timeout));
 
   clearHeaders();
   d_data.clear();
 }
 
-std::string MiniCurl::getURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src)
+std::string MiniCurl::getURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src, int timeout)
 {
-  setupURL(str, rem, src);
+  setupURL(str, rem, src, timeout);
   auto res = curl_easy_perform(d_curl);
   long http_code = 0;
   curl_easy_getinfo(d_curl, CURLINFO_RESPONSE_CODE, &http_code);
@@ -132,9 +132,9 @@ std::string MiniCurl::getURL(const std::string& str, const ComboAddress* rem, co
   return ret;
 }
 
-std::string MiniCurl::postURL(const std::string& str, const std::string& postdata, MiniCurlHeaders& headers)
+std::string MiniCurl::postURL(const std::string& str, const std::string& postdata, MiniCurlHeaders& headers, int timeout)
 {
-  setupURL(str);
+  setupURL(str, nullptr, nullptr, timeout);
   setHeaders(headers);
   curl_easy_setopt(d_curl, CURLOPT_POSTFIELDSIZE, postdata.size());
   curl_easy_setopt(d_curl, CURLOPT_POSTFIELDS, postdata.c_str());
index a4a366398103536e4efdb4d038c69da1d8f94e05..50b144a1cfd7e10bd85c357aee338e78467f1ce7 100644 (file)
@@ -38,14 +38,14 @@ public:
   MiniCurl(const string& useragent="MiniCurl/0.0");
   ~MiniCurl();
   MiniCurl& operator=(const MiniCurl&) = delete;
-  std::string getURL(const std::string& str, const ComboAddress* rem=0, const ComboAddress* src=0);
-  std::string postURL(const std::string& str, const std::string& postdata, MiniCurlHeaders& headers);
+  std::string getURL(const std::string& str, const ComboAddress* rem=nullptr, const ComboAddress* src=nullptr, int timeout = 2);
+  std::string postURL(const std::string& str, const std::string& postdata, MiniCurlHeaders& headers, int timeout = 2);
 private:
   CURL *d_curl;
   static size_t write_callback(char *ptr, size_t size, size_t nmemb, void *userdata);
   std::string d_data;
   struct curl_slist* d_header_list = nullptr;
-  void setupURL(const std::string& str, const ComboAddress* rem=0, const ComboAddress* src=0);
+  void setupURL(const std::string& str, const ComboAddress* rem=nullptr, const ComboAddress* src=nullptr, int timeout = 2);
   void setHeaders(const MiniCurlHeaders& headers);
   void clearHeaders();
 };
index f91c03510fbd93ca285882ac8e7431f3495586ce..6bb1f686752f6498508cb92d08b7f54294f32e01 100644 (file)
@@ -180,8 +180,8 @@ options {
         authcmd.append('--local-port=%s' % cls._authPort)
         authcmd.append('--loglevel=9')
         authcmd.append('--enable-lua-records')
+        authcmd.append('--lua-health-checks-interval=1')
         print(' '.join(authcmd))
-
         logFile = os.path.join(confdir, 'pdns.log')
         with open(logFile, 'w') as fdLog:
             cls._auths[ipaddress] = subprocess.Popen(authcmd, close_fds=True,