+#include <future>
+#include <mutex>
+#include <boost/format.hpp>
#include "version.hh"
#include "ext/luawrapper/include/LuaContext.hpp"
#include "lua-auth4.hh"
-#include <thread>
#include "sstuff.hh"
-#include <mutex>
#include "minicurl.hh"
#include "ueberbackend.hh"
-#include <boost/format.hpp>
#include "dnsrecords.hh"
#include "dns_random.hh"
-
+#include "common_startup.hh"
#include "../modules/geoipbackend/geoipinterface.hh" // only for the enum
/* to do:
std::make_tuple(rhs.rem, rhs.url, rhsoopts);
}
};
+ struct CheckState
+ {
+ CheckState(time_t _lastAccess): lastAccess(_lastAccess) {}
+ /* current status */
+ std::atomic<bool> status{false};
+ /* first check ? */
+ std::atomic<bool> first{true};
+ /* last time the status was accessed */
+ std::atomic<time_t> lastAccess{0};
+ };
+
+ pthread_rwlock_t d_lock;
public:
+ IsUpOracle()
+ {
+ pthread_rwlock_init(&d_lock, nullptr);
+ }
+ ~IsUpOracle()
+ {
+ pthread_rwlock_destroy(&d_lock);
+ }
bool isUp(const ComboAddress& remote, const opts_t& opts);
bool isUp(const ComboAddress& remote, const std::string& url, const opts_t& opts);
bool isUp(const CheckDesc& cd);
private:
- void checkURLThread(ComboAddress rem, std::string url, const opts_t& opts);
- void checkTCPThread(ComboAddress rem, const opts_t& opts);
+ void checkURL(const CheckDesc& cd, const bool status, const bool first = false)
+ {
+ try {
+ int timeout = 1;
+ if (cd.opts.count("timeout")) {
+ timeout = std::atoi(cd.opts.at("timeout").c_str());
+ }
+ string useragent = productName();
+ if (cd.opts.count("useragent")) {
+ useragent = cd.opts.at("useragent");
+ }
+ MiniCurl mc(useragent);
- struct Checker
+ string content;
+ if (cd.opts.count("source")) {
+ ComboAddress src(cd.opts.at("source"));
+ content=mc.getURL(cd.url, &cd.rem, &src, timeout);
+ }
+ else {
+ content=mc.getURL(cd.url, &cd.rem, nullptr, timeout);
+ }
+ if (cd.opts.count("stringmatch") && content.find(cd.opts.at("stringmatch")) == string::npos) {
+ throw std::runtime_error(boost::str(boost::format("unable to match content with `%s`") % cd.opts.at("stringmatch")));
+ }
+ if(!status) {
+ g_log<<Logger::Warning<<"LUA record monitoring declaring "<<cd.rem.toString()<<" UP for URL "<<cd.url<<"!"<<endl;
+ }
+ setUp(cd);
+ }
+ catch(std::exception& ne) {
+ if(status || first)
+ g_log<<Logger::Warning<<"LUA record monitoring declaring "<<cd.rem.toString()<<" DOWN for URL "<<cd.url<<", error: "<<ne.what()<<endl;
+ setDown(cd);
+ }
+ }
+ void checkTCP(const CheckDesc& cd, const bool status, const bool first = false) {
+ try {
+ int timeout = 1;
+ if (cd.opts.count("timeout")) {
+ timeout = std::atoi(cd.opts.at("timeout").c_str());
+ }
+ Socket s(cd.rem.sin4.sin_family, SOCK_STREAM);
+ ComboAddress src;
+ s.setNonBlocking();
+ if (cd.opts.count("source")) {
+ src = ComboAddress(cd.opts.at("source"));
+ s.bind(src);
+ }
+ s.connect(cd.rem, timeout);
+ if (!status) {
+ g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<cd.rem.toStringWithPort()<<" ";
+ if(cd.opts.count("source"))
+ g_log<<"(source "<<src.toString()<<") ";
+ g_log<<"UP!"<<endl;
+ }
+ setUp(cd);
+ }
+ catch (const NetworkError& ne) {
+ if(status || first) {
+ g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<cd.rem.toStringWithPort()<<" DOWN: "<<ne.what()<<endl;
+ }
+ setDown(cd);
+ }
+ }
+ void checkThread()
{
- std::thread thr;
- bool status;
- };
+ while (true)
+ {
+ std::chrono::system_clock::time_point checkStart = std::chrono::system_clock::now();
+ std::vector<std::future<void>> results;
+ std::vector<CheckDesc> toDelete;
+ {
+ ReadLock lock{&d_lock}; // make sure there's no insertion
+ for (auto& it: d_statuses) {
+ auto& desc = it.first;
+ auto& state = it.second;
+
+ if (desc.url.empty()) { // TCP
+ results.push_back(std::async(std::launch::async, &IsUpOracle::checkTCP, this, desc, state->status.load(), state->first.load()));
+ } else { // URL
+ results.push_back(std::async(std::launch::async, &IsUpOracle::checkURL, this, desc, state->status.load(), state->first.load()));
+ }
+ if (std::chrono::system_clock::from_time_t(state->lastAccess) < (checkStart - std::chrono::seconds(g_luaHealthChecksExpireDelay))) {
+ toDelete.push_back(desc);
+ }
+ }
+ }
+ // we can release the lock as nothing will be deleted
+ for (auto& future: results) {
+ future.wait();
+ }
+ if (!toDelete.empty()) {
+ WriteLock lock{&d_lock};
+ for (auto& it: toDelete) {
+ d_statuses.erase(it);
+ }
+ }
+ std::this_thread::sleep_until(checkStart + std::chrono::seconds(g_luaHealthChecksInterval));
+ }
+ }
- typedef map<CheckDesc, Checker> statuses_t;
+ typedef map<CheckDesc, std::unique_ptr<CheckState>> statuses_t;
statuses_t d_statuses;
- std::mutex d_mutex;
+ std::unique_ptr<std::thread> d_checkerThread;
void setStatus(const CheckDesc& cd, bool status)
{
- std::lock_guard<std::mutex> l(d_mutex);
- d_statuses[cd].status=status;
+ auto& state = d_statuses[cd];
+ state->status = status;
+ if (state->first) {
+ state->first = false;
+ }
}
void setDown(const ComboAddress& rem, const std::string& url=std::string(), const opts_t& opts = opts_t())
{
setStatus(cd, true);
}
-
- bool upStatus(const ComboAddress& rem, const std::string& url=std::string(), const opts_t& opts = opts_t())
- {
- CheckDesc cd{rem, url, opts};
- std::lock_guard<std::mutex> l(d_mutex);
- return d_statuses[cd].status;
- }
};
bool IsUpOracle::isUp(const CheckDesc& cd)
{
- std::lock_guard<std::mutex> l(d_mutex);
- auto iter = d_statuses.find(cd);
- if(iter == d_statuses.end()) {
- d_statuses[cd]=Checker{std::thread(&IsUpOracle::checkTCPThread, this, cd.rem, cd.opts), false};
- return false;
+ if (!d_checkerThread) {
+ d_checkerThread = std::unique_ptr<std::thread>(new std::thread(&IsUpOracle::checkThread, this));
}
- return iter->second.status;
-
+ time_t now = time(nullptr);
+ {
+ ReadLock lock{&d_lock};
+ auto iter = d_statuses.find(cd);
+ if (iter != d_statuses.end()) {
+ iter->second->lastAccess = now;
+ return iter->second->status;
+ }
+ }
+ {
+ WriteLock lock{&d_lock};
+ // Make sure we don't insert new entry twice now we have the lock
+ if (d_statuses.find(cd) == d_statuses.end()) {
+ d_statuses[cd] = std::unique_ptr<CheckState>(new CheckState{now});
+ }
+ }
+ return false;
}
bool IsUpOracle::isUp(const ComboAddress& remote, const opts_t& opts)
bool IsUpOracle::isUp(const ComboAddress& remote, const std::string& url, const opts_t& opts)
{
CheckDesc cd{remote, url, opts};
- std::lock_guard<std::mutex> l(d_mutex);
- auto iter = d_statuses.find(cd);
- if(iter == d_statuses.end()) {
- // g_log<<Logger::Warning<<"Launching HTTP(s) status checker for "<<remote.toStringWithPort()<<" and URL "<<url<<endl;
- d_statuses[cd]=Checker{std::thread(&IsUpOracle::checkURLThread, this, remote, url, opts), false};
- return false;
- }
-
- return iter->second.status;
-}
-
-void IsUpOracle::checkTCPThread(ComboAddress rem, const opts_t& opts)
-{
- CheckDesc cd{rem, "", opts};
- setDown(cd);
- for(bool first=true;;first=false) {
- try {
- Socket s(rem.sin4.sin_family, SOCK_STREAM);
- ComboAddress src;
- s.setNonBlocking();
- if(opts.count("source")) {
- src=ComboAddress(opts.at("source"));
- s.bind(src);
- }
- s.connect(rem, 1);
- if(!isUp(cd)) {
- g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<rem.toStringWithPort()<<" ";
- if(opts.count("source"))
- g_log<<"(source "<<src.toString()<<") ";
- g_log<<"UP!"<<endl;
- }
- setUp(cd);
- }
- catch(NetworkError& ne) {
- if(isUp(rem, opts) || first)
- g_log<<Logger::Warning<<"Lua record monitoring declaring TCP/IP "<<rem.toStringWithPort()<<" DOWN: "<<ne.what()<<endl;
- setDown(cd);
- }
- sleep(1);
- }
-}
-
-
-void IsUpOracle::checkURLThread(ComboAddress rem, std::string url, const opts_t& opts)
-{
- setDown(rem, url, opts);
- for(bool first=true;;first=false) {
- try {
- string useragent = productName();
- if (opts.count("useragent")) {
- useragent = opts.at("useragent");
- }
- MiniCurl mc(useragent);
-
- string content;
- if(opts.count("source")) {
- ComboAddress src(opts.at("source"));
- content=mc.getURL(url, &rem, &src);
- }
- else {
- content=mc.getURL(url, &rem);
- }
- if(opts.count("stringmatch") && content.find(opts.at("stringmatch")) == string::npos) {
- throw std::runtime_error(boost::str(boost::format("unable to match content with `%s`") % opts.at("stringmatch")));
- }
- if(!upStatus(rem,url,opts))
- g_log<<Logger::Warning<<"LUA record monitoring declaring "<<rem.toString()<<" UP for URL "<<url<<"!"<<endl;
- setUp(rem, url,opts);
- }
- catch(std::exception& ne) {
- if(upStatus(rem,url,opts) || first)
- g_log<<Logger::Warning<<"LUA record monitoring declaring "<<rem.toString()<<" DOWN for URL "<<url<<", error: "<<ne.what()<<endl;
- setDown(rem,url,opts);
- }
- sleep(5);
- }
+ return isUp(cd);
}
-
IsUpOracle g_up;
namespace {
template<typename T, typename C>
return url.substr(pos, endpos-pos);
}
-void MiniCurl::setupURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src)
+void MiniCurl::setupURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src, int timeout)
{
if(rem) {
struct curl_slist *hostlist = nullptr; // THIS SHOULD BE FREED
curl_easy_setopt(d_curl, CURLOPT_URL, str.c_str());
curl_easy_setopt(d_curl, CURLOPT_WRITEFUNCTION, write_callback);
curl_easy_setopt(d_curl, CURLOPT_WRITEDATA, this);
- curl_easy_setopt(d_curl, CURLOPT_TIMEOUT, 2L);
+ curl_easy_setopt(d_curl, CURLOPT_TIMEOUT, static_cast<long>(timeout));
clearHeaders();
d_data.clear();
}
-std::string MiniCurl::getURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src)
+std::string MiniCurl::getURL(const std::string& str, const ComboAddress* rem, const ComboAddress* src, int timeout)
{
- setupURL(str, rem, src);
+ setupURL(str, rem, src, timeout);
auto res = curl_easy_perform(d_curl);
long http_code = 0;
curl_easy_getinfo(d_curl, CURLINFO_RESPONSE_CODE, &http_code);
return ret;
}
-std::string MiniCurl::postURL(const std::string& str, const std::string& postdata, MiniCurlHeaders& headers)
+std::string MiniCurl::postURL(const std::string& str, const std::string& postdata, MiniCurlHeaders& headers, int timeout)
{
- setupURL(str);
+ setupURL(str, nullptr, nullptr, timeout);
setHeaders(headers);
curl_easy_setopt(d_curl, CURLOPT_POSTFIELDSIZE, postdata.size());
curl_easy_setopt(d_curl, CURLOPT_POSTFIELDS, postdata.c_str());