#endif /* HAVE_DNS_OVER_HTTPS */
{
- WriteLock wl(&g_qcount.queryLock);
std::string qname;
- for(auto &record: g_qcount.records) {
+ auto records = g_qcount.records.lock();
+ for (const auto &record : *records) {
qname = record.first;
boost::replace_all(qname, ".", "_");
str<<"dnsdist.querycount." << qname << ".queries " << record.second << " " << now << "\r\n";
}
- g_qcount.records.clear();
+ records->clear();
}
const string msg = str.str();
luaCtx.writeFunction("clearQueryCounters", []() {
unsigned int size{0};
{
- WriteLock wl(&g_qcount.queryLock);
- size = g_qcount.records.size();
- g_qcount.records.clear();
+ auto records = g_qcount.records.lock();
+ size = records->size();
+ records->clear();
}
boost::format fmt("%d records cleared from query counter buffer\n");
luaCtx.writeFunction("getQueryCounters", [](boost::optional<unsigned int> optMax) {
setLuaNoSideEffect();
- ReadLock rl(&g_qcount.queryLock);
+ auto records = g_qcount.records.read_lock();
g_outputBuffer = "query counting is currently: ";
g_outputBuffer+= g_qcount.enabled ? "enabled" : "disabled";
- g_outputBuffer+= (boost::format(" (%d records in buffer)\n") % g_qcount.records.size()).str();
+ g_outputBuffer+= (boost::format(" (%d records in buffer)\n") % records->size()).str();
boost::format fmt("%-3d %s: %d request(s)\n");
- QueryCountRecords::iterator it;
unsigned int max = optMax ? *optMax : 10;
unsigned int index{1};
- for(it = g_qcount.records.begin(); it != g_qcount.records.end() && index <= max; ++it, ++index) {
+ for (auto it = records->begin(); it != records->end() && index <= max; ++it, ++index) {
g_outputBuffer += (fmt % index % it->first % it->second).str();
}
});
return ;
}
- {
- std::lock_guard<std::mutex> lock(state->socketsLock);
- state->mplexer->getAvailableFDs(ready, 1000);
- }
+ (*state->mplexer.lock())->getAvailableFDs(ready, 1000);
}
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
{
g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.getData().size(), *dq.getHeader());
- if(g_qcount.enabled) {
+ if (g_qcount.enabled) {
string qname = (*dq.qname).toLogString();
bool countQuery{true};
- if(g_qcount.filter) {
+ if (g_qcount.filter) {
auto lock = g_lua.lock();
std::tie (countQuery, qname) = g_qcount.filter(&dq);
}
- if(countQuery) {
- WriteLock wl(&g_qcount.queryLock);
- if(!g_qcount.records.count(qname)) {
- g_qcount.records[qname] = 0;
+ if (countQuery) {
+ auto records = g_qcount.records.lock();
+ if (!records->count(qname)) {
+ (*records)[qname] = 0;
}
- g_qcount.records[qname]++;
+ (*records)[qname]++;
}
}
~QueryCount()
{
}
- QueryCountRecords records;
+ SharedLockGuarded<QueryCountRecords> records;
QueryCountFilter filter;
- ReadWriteLock queryLock;
bool enabled{false};
};
~DownstreamState();
boost::uuids::uuid id;
- std::vector<unsigned int> hashes;
- mutable ReadWriteLock d_lock;
+ SharedLockGuarded<std::vector<unsigned int>> hashes;
std::vector<int> sockets;
const std::string sourceItfName;
- std::mutex socketsLock;
std::mutex connectLock;
- std::unique_ptr<FDMultiplexer> mplexer{nullptr};
+ LockGuarded<std::unique_ptr<FDMultiplexer>> mplexer{nullptr};
std::shared_ptr<TLSCtx> d_tlsCtx{nullptr};
std::thread tid;
const ComboAddress remote;
QType checkType{QType::A};
uint16_t checkClass{QClass::IN};
std::atomic<uint64_t> idOffset{0};
+ std::atomic<bool> hashesComputed{false};
stat_t sendErrors{0};
stat_t outstanding{0};
stat_t reuseds{0};
std::shared_ptr<DNSDistPacketCache> packetCache{nullptr};
std::shared_ptr<ServerPolicy> policy{nullptr};
- size_t countServers(bool upOnly)
- {
- size_t count = 0;
- ReadLock rl(&d_lock);
- for (const auto& server : *d_servers) {
- if (!upOnly || std::get<1>(server)->isUp() ) {
- count++;
- }
- }
- return count;
- }
-
- const std::shared_ptr<ServerPolicy::NumberedServerVector> getServers()
- {
- std::shared_ptr<ServerPolicy::NumberedServerVector> result;
- {
- ReadLock rl(&d_lock);
- result = d_servers;
- }
- return result;
- }
-
- void addServer(shared_ptr<DownstreamState>& server)
- {
- WriteLock wl(&d_lock);
- /* we can't update the content of the shared pointer directly even when holding the lock,
- as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
- unsigned int count = static_cast<unsigned int>(d_servers->size());
- auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*d_servers);
- newServers->push_back(make_pair(++count, server));
- /* we need to reorder based on the server 'order' */
- std::stable_sort(newServers->begin(), newServers->end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
- return a.second->order < b.second->order;
- });
- /* and now we need to renumber for Lua (custom policies) */
- size_t idx = 1;
- for (auto& serv : *newServers) {
- serv.first = idx++;
- }
- d_servers = newServers;
- }
-
- void removeServer(shared_ptr<DownstreamState>& server)
- {
- WriteLock wl(&d_lock);
- /* we can't update the content of the shared pointer directly even when holding the lock,
- as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
- auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*d_servers);
- size_t idx = 1;
- bool found = false;
- for (auto it = newServers->begin(); it != newServers->end();) {
- if (found) {
- /* we need to renumber the servers placed
- after the removed one, for Lua (custom policies) */
- it->first = idx++;
- it++;
- }
- else if (it->second == server) {
- it = newServers->erase(it);
- found = true;
- } else {
- idx++;
- it++;
- }
- }
- d_servers = newServers;
- }
+ size_t countServers(bool upOnly);
+ const std::shared_ptr<ServerPolicy::NumberedServerVector> getServers();
+ void addServer(shared_ptr<DownstreamState>& server);
+ void removeServer(shared_ptr<DownstreamState>& server);
private:
- std::shared_ptr<ServerPolicy::NumberedServerVector> d_servers;
- ReadWriteLock d_lock;
+ SharedLockGuarded<std::shared_ptr<ServerPolicy::NumberedServerVector>> d_servers;
bool d_useECS{false};
};
for (auto& fd : sockets) {
if (fd != -1) {
if (sockets.size() > 1) {
- std::lock_guard<std::mutex> lock(socketsLock);
- mplexer->removeReadFD(fd);
+ (*mplexer.lock())->removeReadFD(fd);
}
/* shutdown() is needed to wake up recv() in the responderThread */
shutdown(fd, SHUT_RDWR);
try {
SConnect(fd, remote);
if (sockets.size() > 1) {
- std::lock_guard<std::mutex> lock(socketsLock);
- mplexer->addReadFD(fd, [](int, boost::any) {});
+ (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
}
connected = true;
}
if (fd != -1) {
if (sockets.size() > 1) {
try {
- std::lock_guard<std::mutex> lock(socketsLock);
- mplexer->removeReadFD(fd);
+ (*mplexer.lock())->removeReadFD(fd);
}
catch (const FDMultiplexerException& e) {
/* some sockets might not have been added to the multiplexer
{
std::lock_guard<std::mutex> tl(connectLock);
- std::lock_guard<std::mutex> slock(socketsLock);
+ auto slock = mplexer.lock();
for (auto& fd : sockets) {
if (fd != -1) {
{
vinfolog("Computing hashes for id=%s and weight=%d", id, weight);
auto w = weight;
- WriteLock wl(&d_lock);
- hashes.clear();
- hashes.reserve(w);
+ auto lockedHashes = hashes.lock();
+ lockedHashes->clear();
+ lockedHashes->reserve(w);
while (w > 0) {
std::string uuid = boost::str(boost::format("%s-%d") % id % w);
unsigned int wshash = burtleCI(reinterpret_cast<const unsigned char*>(uuid.c_str()), uuid.size(), g_hashperturb);
- hashes.push_back(wshash);
+ lockedHashes->push_back(wshash);
--w;
}
- std::sort(hashes.begin(), hashes.end());
+ std::sort(lockedHashes->begin(), lockedHashes->end());
+ hashesComputed = true;
}
void DownstreamState::setId(const boost::uuids::uuid& newId)
{
id = newId;
// compute hashes only if already done
- if (!hashes.empty()) {
+ if (hashesComputed) {
hash();
}
}
return ;
}
weight = newWeight;
- if (!hashes.empty()) {
+ if (hashesComputed) {
hash();
}
}
id = getUniqueID();
threadStarted.clear();
- mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+ *(mplexer.lock()) = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
sockets.resize(numberOfSockets);
for (auto& fd : sockets) {
tcpMaxConcurrentConnections.store(currentConnectionsCount);
}
}
+
+size_t ServerPool::countServers(bool upOnly)
+{
+ size_t count = 0;
+ auto servers = d_servers.read_lock();
+ for (const auto& server : **servers) {
+ if (!upOnly || std::get<1>(server)->isUp() ) {
+ count++;
+ }
+ }
+ return count;
+}
+
+const std::shared_ptr<ServerPolicy::NumberedServerVector> ServerPool::getServers()
+{
+ std::shared_ptr<ServerPolicy::NumberedServerVector> result;
+ {
+ result = *(d_servers.read_lock());
+ }
+ return result;
+}
+
+void ServerPool::addServer(shared_ptr<DownstreamState>& server)
+{
+ auto servers = d_servers.lock();
+ /* we can't update the content of the shared pointer directly even when holding the lock,
+ as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
+ unsigned int count = static_cast<unsigned int>((*servers)->size());
+ auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*(*servers));
+ newServers->push_back(make_pair(++count, server));
+ /* we need to reorder based on the server 'order' */
+ std::stable_sort(newServers->begin(), newServers->end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& a, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& b) {
+ return a.second->order < b.second->order;
+ });
+ /* and now we need to renumber for Lua (custom policies) */
+ size_t idx = 1;
+ for (auto& serv : *newServers) {
+ serv.first = idx++;
+ }
+ *servers = std::move(newServers);
+}
+
+void ServerPool::removeServer(shared_ptr<DownstreamState>& server)
+{
+ auto servers = d_servers.lock();
+ /* we can't update the content of the shared pointer directly even when holding the lock,
+ as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */
+ auto newServers = std::make_shared<ServerPolicy::NumberedServerVector>(*(*servers));
+ size_t idx = 1;
+ bool found = false;
+ for (auto it = newServers->begin(); it != newServers->end();) {
+ if (found) {
+ /* we need to renumber the servers placed
+ after the removed one, for Lua (custom policies) */
+ it->first = idx++;
+ it++;
+ }
+ else if (it->second == server) {
+ it = newServers->erase(it);
+ found = true;
+ } else {
+ idx++;
+ it++;
+ }
+ }
+ *servers = std::move(newServers);
+}
for (const auto& d: servers) {
if (d.second->isUp() && (g_consistentHashBalancingFactor == 0 || d.second->outstanding <= (targetLoad * d.second->weight))) {
// make sure hashes have been computed
- if (d.second->hashes.empty()) {
+ if (!d.second->hashesComputed) {
d.second->hash();
}
{
- ReadLock rl(&(d.second->d_lock));
const auto& server = d.second;
+ auto hashes = server->hashes.read_lock();
// we want to keep track of the last hash
- if (min > *(server->hashes.begin())) {
- min = *(server->hashes.begin());
+ if (min > *(hashes->begin())) {
+ min = *(hashes->begin());
first = server;
}
- auto hash_it = std::lower_bound(server->hashes.begin(), server->hashes.end(), qhash);
- if (hash_it != server->hashes.end()) {
+ auto hash_it = std::lower_bound(hashes->begin(), hashes->end(), qhash);
+ if (hash_it != hashes->end()) {
if (*hash_it < sel) {
sel = *hash_it;
ret = server;
void clear()
{
- std::lock_guard<std::mutex> lock(d_lock);
- d_limits.clear();
+ d_limits.lock()->clear();
}
size_t cleanup(const struct timespec& cutOff, size_t* scannedCount=nullptr) const
{
- std::lock_guard<std::mutex> lock(d_lock);
- size_t toLook = d_limits.size() / d_scanFraction + 1;
+ auto limits = d_limits.lock();
+ size_t toLook = limits->size() / d_scanFraction + 1;
size_t lookedAt = 0;
size_t removed = 0;
- auto& sequence = d_limits.get<SequencedTag>();
+ auto& sequence = limits->get<SequencedTag>();
for (auto entry = sequence.begin(); entry != sequence.end() && lookedAt < toLook; lookedAt++) {
if (entry->d_limiter.seenSince(cutOff)) {
/* entries are ordered from least recently seen to more recently
zeroport.sin4.sin_port=0;
zeroport.truncate(zeroport.sin4.sin_family == AF_INET ? d_ipv4trunc : d_ipv6trunc);
{
- std::lock_guard<std::mutex> lock(d_lock);
- auto iter = d_limits.find(zeroport);
- if (iter == d_limits.end()) {
+ auto limits = d_limits.lock();
+ auto iter = limits->find(zeroport);
+ if (iter == limits->end()) {
Entry e(zeroport, QPSLimiter(d_qps, d_burst));
- iter = d_limits.insert(e).first;
+ iter = limits->insert(e).first;
}
- moveCacheItemToBack<SequencedTag>(d_limits, iter);
+ moveCacheItemToBack<SequencedTag>(*limits, iter);
return !iter->d_limiter.check(d_qps, d_burst);
}
}
size_t getEntriesCount() const
{
- std::lock_guard<std::mutex> lock(d_lock);
- return d_limits.size();
+ return d_limits.lock()->size();
}
private:
>
> qpsContainer_t;
- mutable std::mutex d_lock;
- mutable qpsContainer_t d_limits;
+ mutable LockGuarded<qpsContainer_t> d_limits;
mutable struct timespec d_lastCleanup;
unsigned int d_qps, d_burst, d_ipv4trunc, d_ipv6trunc, d_cleanupDelay, d_expiration;
unsigned int d_scanFraction{10};
}
bool matches(const DNSQuestion* dq) const override
{
- if(dq->remote->sin4.sin_family == AF_INET) {
- ReadLock rl(&d_lock4);
- auto fnd = d_ip4s.find(dq->remote->sin4.sin_addr.s_addr);
- if(fnd == d_ip4s.end()) {
+ if (dq->remote->sin4.sin_family == AF_INET) {
+ auto ip4s = d_ip4s.read_lock();
+ auto fnd = ip4s->find(dq->remote->sin4.sin_addr.s_addr);
+ if (fnd == ip4s->end()) {
return false;
}
- return time(0) < fnd->second;
+ return time(nullptr) < fnd->second;
} else {
- ReadLock rl(&d_lock6);
- auto fnd = d_ip6s.find({*dq->remote});
- if(fnd == d_ip6s.end()) {
+ auto ip6s = d_ip6s.read_lock();
+ auto fnd = ip6s->find({*dq->remote});
+ if (fnd == ip6s->end()) {
return false;
}
- return time(0) < fnd->second;
+ return time(nullptr) < fnd->second;
}
}
void add(const ComboAddress& ca, time_t ttd)
{
// think twice before adding templates here
- if(ca.sin4.sin_family == AF_INET) {
- WriteLock rl(&d_lock4);
- auto res=d_ip4s.insert({ca.sin4.sin_addr.s_addr, ttd});
- if(!res.second && (time_t)res.first->second < ttd)
+ if (ca.sin4.sin_family == AF_INET) {
+ auto res = d_ip4s.lock()->insert({ca.sin4.sin_addr.s_addr, ttd});
+ if (!res.second && (time_t)res.first->second < ttd) {
res.first->second = (uint32_t)ttd;
+ }
}
else {
- WriteLock rl(&d_lock6);
- auto res=d_ip6s.insert({{ca}, ttd});
- if(!res.second && (time_t)res.first->second < ttd)
+ auto res = d_ip6s.lock()->insert({{ca}, ttd});
+ if (!res.second && (time_t)res.first->second < ttd) {
res.first->second = (uint32_t)ttd;
+ }
}
}
void remove(const ComboAddress& ca)
{
- if(ca.sin4.sin_family == AF_INET) {
- WriteLock rl(&d_lock4);
- d_ip4s.erase(ca.sin4.sin_addr.s_addr);
+ if (ca.sin4.sin_family == AF_INET) {
+ d_ip4s.lock()->erase(ca.sin4.sin_addr.s_addr);
}
else {
- WriteLock rl(&d_lock6);
- d_ip6s.erase({ca});
+ d_ip6s.lock()->erase({ca});
}
}
void clear()
{
- {
- WriteLock rl(&d_lock4);
- d_ip4s.clear();
- }
- WriteLock rl(&d_lock6);
- d_ip6s.clear();
+ d_ip4s.lock()->clear();
+ d_ip6s.lock()->clear();
}
void cleanup()
{
time_t now = time(nullptr);
{
- WriteLock rl(&d_lock4);
-
- for(auto iter = d_ip4s.begin(); iter != d_ip4s.end(); ) {
- if(iter->second < now)
- iter=d_ip4s.erase(iter);
- else
+ auto ip4s = d_ip4s.lock();
+ for (auto iter = ip4s->begin(); iter != ip4s->end(); ) {
+ if (iter->second < now) {
+ iter = ip4s->erase(iter);
+ }
+ else {
++iter;
+ }
}
-
}
{
- WriteLock rl(&d_lock6);
-
- for(auto iter = d_ip6s.begin(); iter != d_ip6s.end(); ) {
- if(iter->second < now)
- iter=d_ip6s.erase(iter);
- else
+ auto ip6s = d_ip6s.lock();
+ for (auto iter = ip6s->begin(); iter != ip6s->end(); ) {
+ if (iter->second < now) {
+ iter = ip6s->erase(iter);
+ }
+ else {
++iter;
+ }
}
}
string toString() const override
{
- time_t now=time(0);
+ time_t now = time(nullptr);
uint64_t count = 0;
- {
- ReadLock rl(&d_lock4);
- for(const auto& ip : d_ip4s)
- if(now < ip.second)
- ++count;
+
+ for (const auto& ip : *(d_ip4s.read_lock())) {
+ if (now < ip.second) {
+ ++count;
+ }
}
- {
- ReadLock rl(&d_lock6);
- for(const auto& ip : d_ip6s)
- if(now < ip.second)
- ++count;
+
+ for (const auto& ip : *(d_ip6s.read_lock())) {
+ if (now < ip.second) {
+ ++count;
+ }
}
return "Src: "+std::to_string(count)+" ips";
return ah & (bh<<1);
}
};
- std::unordered_map<IPv6, time_t, IPv6Hash> d_ip6s;
- std::unordered_map<uint32_t, time_t> d_ip4s;
- mutable ReadWriteLock d_lock4;
- mutable ReadWriteLock d_lock6;
+ mutable SharedLockGuarded<std::unordered_map<IPv6, time_t, IPv6Hash>> d_ip6s;
+ mutable SharedLockGuarded<std::unordered_map<uint32_t, time_t>> d_ip4s;
};
OpenSSLTLSTicketKeysRing::OpenSSLTLSTicketKeysRing(size_t capacity)
{
- d_ticketKeys.set_capacity(capacity);
+ d_ticketKeys.lock()->set_capacity(capacity);
}
OpenSSLTLSTicketKeysRing::~OpenSSLTLSTicketKeysRing()
void OpenSSLTLSTicketKeysRing::addKey(std::shared_ptr<OpenSSLTLSTicketKey> newKey)
{
- WriteLock wl(&d_lock);
- d_ticketKeys.push_front(newKey);
+ d_ticketKeys.lock()->push_front(newKey);
}
std::shared_ptr<OpenSSLTLSTicketKey> OpenSSLTLSTicketKeysRing::getEncryptionKey()
{
- ReadLock rl(&d_lock);
- return d_ticketKeys.front();
+ return d_ticketKeys.read_lock()->front();
}
std::shared_ptr<OpenSSLTLSTicketKey> OpenSSLTLSTicketKeysRing::getDecryptionKey(unsigned char name[TLS_TICKETS_KEY_NAME_SIZE], bool& activeKey)
{
- ReadLock rl(&d_lock);
- for (auto& key : d_ticketKeys) {
+ auto keys = d_ticketKeys.read_lock();
+ for (auto& key : *keys) {
if (key->nameMatches(name)) {
- activeKey = (key == d_ticketKeys.front());
+ activeKey = (key == keys->front());
return key;
}
}
size_t OpenSSLTLSTicketKeysRing::getKeysCount()
{
- ReadLock rl(&d_lock);
- return d_ticketKeys.size();
+ return d_ticketKeys.read_lock()->size();
}
void OpenSSLTLSTicketKeysRing::loadTicketsKeys(const std::string& keyFile)
void rotateTicketsKey(time_t now);
private:
- boost::circular_buffer<std::shared_ptr<OpenSSLTLSTicketKey> > d_ticketKeys;
- ReadWriteLock d_lock;
+ SharedLockGuarded<boost::circular_buffer<std::shared_ptr<OpenSSLTLSTicketKey> > > d_ticketKeys;
};
void* libssl_get_ticket_key_callback_data(SSL* s);