From: Remi Gacogne Date: Fri, 30 Apr 2021 13:53:30 +0000 (+0200) Subject: dnsdist: Convert most of the remaining locks to LockGuarded X-Git-Tag: dnsdist-1.7.0-alpha1~62^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=17e1699bf256c7e511a0f5aab4d5298f992a279a;p=thirdparty%2Fpdns.git dnsdist: Convert most of the remaining locks to LockGuarded --- diff --git a/pdns/dnsdist-carbon.cc b/pdns/dnsdist-carbon.cc index 9d973b454b..e9d169e8a0 100644 --- a/pdns/dnsdist-carbon.cc +++ b/pdns/dnsdist-carbon.cc @@ -238,14 +238,14 @@ void carbonDumpThread() #endif /* HAVE_DNS_OVER_HTTPS */ { - WriteLock wl(&g_qcount.queryLock); std::string qname; - for(auto &record: g_qcount.records) { + auto records = g_qcount.records.lock(); + for (const auto &record : *records) { qname = record.first; boost::replace_all(qname, ".", "_"); str<<"dnsdist.querycount." << qname << ".queries " << record.second << " " << now << "\r\n"; } - g_qcount.records.clear(); + records->clear(); } const string msg = str.str(); diff --git a/pdns/dnsdist-lua.cc b/pdns/dnsdist-lua.cc index 2c4b1c6021..cce08db5f5 100644 --- a/pdns/dnsdist-lua.cc +++ b/pdns/dnsdist-lua.cc @@ -1046,9 +1046,9 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("clearQueryCounters", []() { unsigned int size{0}; { - WriteLock wl(&g_qcount.queryLock); - size = g_qcount.records.size(); - g_qcount.records.clear(); + auto records = g_qcount.records.lock(); + size = records->size(); + records->clear(); } boost::format fmt("%d records cleared from query counter buffer\n"); @@ -1057,16 +1057,15 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck) luaCtx.writeFunction("getQueryCounters", [](boost::optional optMax) { setLuaNoSideEffect(); - ReadLock rl(&g_qcount.queryLock); + auto records = g_qcount.records.read_lock(); g_outputBuffer = "query counting is currently: "; g_outputBuffer+= g_qcount.enabled ? "enabled" : "disabled"; - g_outputBuffer+= (boost::format(" (%d records in buffer)\n") % g_qcount.records.size()).str(); + g_outputBuffer+= (boost::format(" (%d records in buffer)\n") % records->size()).str(); boost::format fmt("%-3d %s: %d request(s)\n"); - QueryCountRecords::iterator it; unsigned int max = optMax ? *optMax : 10; unsigned int index{1}; - for(it = g_qcount.records.begin(); it != g_qcount.records.end() && index <= max; ++it, ++index) { + for (auto it = records->begin(); it != records->end() && index <= max; ++it, ++index) { g_outputBuffer += (fmt % index % it->first % it->second).str(); } }); diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index ba25be0765..f4b5804ee5 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -538,10 +538,7 @@ static void pickBackendSocketsReadyForReceiving(const std::shared_ptr lock(state->socketsLock); - state->mplexer->getAvailableFDs(ready, 1000); - } + (*state->mplexer.lock())->getAvailableFDs(ready, 1000); } // listens on a dedicated socket, lobs answers from downstream servers to original requestors @@ -848,20 +845,20 @@ static bool applyRulesToQuery(LocalHolders& holders, DNSQuestion& dq, const stru { g_rings.insertQuery(now, *dq.remote, *dq.qname, dq.qtype, dq.getData().size(), *dq.getHeader()); - if(g_qcount.enabled) { + if (g_qcount.enabled) { string qname = (*dq.qname).toLogString(); bool countQuery{true}; - if(g_qcount.filter) { + if (g_qcount.filter) { auto lock = g_lua.lock(); std::tie (countQuery, qname) = g_qcount.filter(&dq); } - if(countQuery) { - WriteLock wl(&g_qcount.queryLock); - if(!g_qcount.records.count(qname)) { - g_qcount.records[qname] = 0; + if (countQuery) { + auto records = g_qcount.records.lock(); + if (!records->count(qname)) { + (*records)[qname] = 0; } - g_qcount.records[qname]++; + (*records)[qname]++; } } diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 74b0647cff..30ef1ad0ce 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -757,9 +757,8 @@ struct QueryCount { ~QueryCount() { } - QueryCountRecords records; + SharedLockGuarded records; QueryCountFilter filter; - ReadWriteLock queryLock; bool enabled{false}; }; @@ -925,13 +924,11 @@ struct DownstreamState ~DownstreamState(); boost::uuids::uuid id; - std::vector hashes; - mutable ReadWriteLock d_lock; + SharedLockGuarded> hashes; std::vector sockets; const std::string sourceItfName; - std::mutex socketsLock; std::mutex connectLock; - std::unique_ptr mplexer{nullptr}; + LockGuarded> mplexer{nullptr}; std::shared_ptr d_tlsCtx{nullptr}; std::thread tid; const ComboAddress remote; @@ -943,6 +940,7 @@ struct DownstreamState QType checkType{QType::A}; uint16_t checkClass{QClass::IN}; std::atomic idOffset{0}; + std::atomic hashesComputed{false}; stat_t sendErrors{0}; stat_t outstanding{0}; stat_t reuseds{0}; @@ -1119,77 +1117,13 @@ struct ServerPool std::shared_ptr packetCache{nullptr}; std::shared_ptr policy{nullptr}; - size_t countServers(bool upOnly) - { - size_t count = 0; - ReadLock rl(&d_lock); - for (const auto& server : *d_servers) { - if (!upOnly || std::get<1>(server)->isUp() ) { - count++; - } - } - return count; - } - - const std::shared_ptr getServers() - { - std::shared_ptr result; - { - ReadLock rl(&d_lock); - result = d_servers; - } - return result; - } - - void addServer(shared_ptr& server) - { - WriteLock wl(&d_lock); - /* we can't update the content of the shared pointer directly even when holding the lock, - as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */ - unsigned int count = static_cast(d_servers->size()); - auto newServers = std::make_shared(*d_servers); - newServers->push_back(make_pair(++count, server)); - /* we need to reorder based on the server 'order' */ - std::stable_sort(newServers->begin(), newServers->end(), [](const std::pair >& a, const std::pair >& b) { - return a.second->order < b.second->order; - }); - /* and now we need to renumber for Lua (custom policies) */ - size_t idx = 1; - for (auto& serv : *newServers) { - serv.first = idx++; - } - d_servers = newServers; - } - - void removeServer(shared_ptr& server) - { - WriteLock wl(&d_lock); - /* we can't update the content of the shared pointer directly even when holding the lock, - as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */ - auto newServers = std::make_shared(*d_servers); - size_t idx = 1; - bool found = false; - for (auto it = newServers->begin(); it != newServers->end();) { - if (found) { - /* we need to renumber the servers placed - after the removed one, for Lua (custom policies) */ - it->first = idx++; - it++; - } - else if (it->second == server) { - it = newServers->erase(it); - found = true; - } else { - idx++; - it++; - } - } - d_servers = newServers; - } + size_t countServers(bool upOnly); + const std::shared_ptr getServers(); + void addServer(shared_ptr& server); + void removeServer(shared_ptr& server); private: - std::shared_ptr d_servers; - ReadWriteLock d_lock; + SharedLockGuarded> d_servers; bool d_useECS{false}; }; diff --git a/pdns/dnsdistdist/dnsdist-backend.cc b/pdns/dnsdistdist/dnsdist-backend.cc index 68df19bc72..ad290d17db 100644 --- a/pdns/dnsdistdist/dnsdist-backend.cc +++ b/pdns/dnsdistdist/dnsdist-backend.cc @@ -35,8 +35,7 @@ bool DownstreamState::reconnect() for (auto& fd : sockets) { if (fd != -1) { if (sockets.size() > 1) { - std::lock_guard lock(socketsLock); - mplexer->removeReadFD(fd); + (*mplexer.lock())->removeReadFD(fd); } /* shutdown() is needed to wake up recv() in the responderThread */ shutdown(fd, SHUT_RDWR); @@ -61,8 +60,7 @@ bool DownstreamState::reconnect() try { SConnect(fd, remote); if (sockets.size() > 1) { - std::lock_guard lock(socketsLock); - mplexer->addReadFD(fd, [](int, boost::any) {}); + (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {}); } connected = true; } @@ -80,8 +78,7 @@ bool DownstreamState::reconnect() if (fd != -1) { if (sockets.size() > 1) { try { - std::lock_guard lock(socketsLock); - mplexer->removeReadFD(fd); + (*mplexer.lock())->removeReadFD(fd); } catch (const FDMultiplexerException& e) { /* some sockets might not have been added to the multiplexer @@ -105,7 +102,7 @@ void DownstreamState::stop() { std::lock_guard tl(connectLock); - std::lock_guard slock(socketsLock); + auto slock = mplexer.lock(); for (auto& fd : sockets) { if (fd != -1) { @@ -120,23 +117,24 @@ void DownstreamState::hash() { vinfolog("Computing hashes for id=%s and weight=%d", id, weight); auto w = weight; - WriteLock wl(&d_lock); - hashes.clear(); - hashes.reserve(w); + auto lockedHashes = hashes.lock(); + lockedHashes->clear(); + lockedHashes->reserve(w); while (w > 0) { std::string uuid = boost::str(boost::format("%s-%d") % id % w); unsigned int wshash = burtleCI(reinterpret_cast(uuid.c_str()), uuid.size(), g_hashperturb); - hashes.push_back(wshash); + lockedHashes->push_back(wshash); --w; } - std::sort(hashes.begin(), hashes.end()); + std::sort(lockedHashes->begin(), lockedHashes->end()); + hashesComputed = true; } void DownstreamState::setId(const boost::uuids::uuid& newId) { id = newId; // compute hashes only if already done - if (!hashes.empty()) { + if (hashesComputed) { hash(); } } @@ -148,7 +146,7 @@ void DownstreamState::setWeight(int newWeight) return ; } weight = newWeight; - if (!hashes.empty()) { + if (hashesComputed) { hash(); } } @@ -158,7 +156,7 @@ DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress id = getUniqueID(); threadStarted.clear(); - mplexer = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); + *(mplexer.lock()) = std::unique_ptr(FDMultiplexer::getMultiplexerSilent()); sockets.resize(numberOfSockets); for (auto& fd : sockets) { @@ -194,3 +192,70 @@ void DownstreamState::incCurrentConnectionsCount() tcpMaxConcurrentConnections.store(currentConnectionsCount); } } + +size_t ServerPool::countServers(bool upOnly) +{ + size_t count = 0; + auto servers = d_servers.read_lock(); + for (const auto& server : **servers) { + if (!upOnly || std::get<1>(server)->isUp() ) { + count++; + } + } + return count; +} + +const std::shared_ptr ServerPool::getServers() +{ + std::shared_ptr result; + { + result = *(d_servers.read_lock()); + } + return result; +} + +void ServerPool::addServer(shared_ptr& server) +{ + auto servers = d_servers.lock(); + /* we can't update the content of the shared pointer directly even when holding the lock, + as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */ + unsigned int count = static_cast((*servers)->size()); + auto newServers = std::make_shared(*(*servers)); + newServers->push_back(make_pair(++count, server)); + /* we need to reorder based on the server 'order' */ + std::stable_sort(newServers->begin(), newServers->end(), [](const std::pair >& a, const std::pair >& b) { + return a.second->order < b.second->order; + }); + /* and now we need to renumber for Lua (custom policies) */ + size_t idx = 1; + for (auto& serv : *newServers) { + serv.first = idx++; + } + *servers = std::move(newServers); +} + +void ServerPool::removeServer(shared_ptr& server) +{ + auto servers = d_servers.lock(); + /* we can't update the content of the shared pointer directly even when holding the lock, + as other threads might hold a copy. We can however update the pointer as long as we hold the lock. */ + auto newServers = std::make_shared(*(*servers)); + size_t idx = 1; + bool found = false; + for (auto it = newServers->begin(); it != newServers->end();) { + if (found) { + /* we need to renumber the servers placed + after the removed one, for Lua (custom policies) */ + it->first = idx++; + it++; + } + else if (it->second == server) { + it = newServers->erase(it); + found = true; + } else { + idx++; + it++; + } + } + *servers = std::move(newServers); +} diff --git a/pdns/dnsdistdist/dnsdist-lbpolicies.cc b/pdns/dnsdistdist/dnsdist-lbpolicies.cc index 7138782774..d747ca763e 100644 --- a/pdns/dnsdistdist/dnsdist-lbpolicies.cc +++ b/pdns/dnsdistdist/dnsdist-lbpolicies.cc @@ -163,20 +163,20 @@ shared_ptr chashedFromHash(const ServerPolicy::NumberedServerVe for (const auto& d: servers) { if (d.second->isUp() && (g_consistentHashBalancingFactor == 0 || d.second->outstanding <= (targetLoad * d.second->weight))) { // make sure hashes have been computed - if (d.second->hashes.empty()) { + if (!d.second->hashesComputed) { d.second->hash(); } { - ReadLock rl(&(d.second->d_lock)); const auto& server = d.second; + auto hashes = server->hashes.read_lock(); // we want to keep track of the last hash - if (min > *(server->hashes.begin())) { - min = *(server->hashes.begin()); + if (min > *(hashes->begin())) { + min = *(hashes->begin()); first = server; } - auto hash_it = std::lower_bound(server->hashes.begin(), server->hashes.end(), qhash); - if (hash_it != server->hashes.end()) { + auto hash_it = std::lower_bound(hashes->begin(), hashes->end(), qhash); + if (hash_it != hashes->end()) { if (*hash_it < sel) { sel = *hash_it; ret = server; diff --git a/pdns/dnsdistdist/dnsdist-rules.hh b/pdns/dnsdistdist/dnsdist-rules.hh index c40519363c..89b7cae813 100644 --- a/pdns/dnsdistdist/dnsdist-rules.hh +++ b/pdns/dnsdistdist/dnsdist-rules.hh @@ -40,18 +40,17 @@ public: void clear() { - std::lock_guard lock(d_lock); - d_limits.clear(); + d_limits.lock()->clear(); } size_t cleanup(const struct timespec& cutOff, size_t* scannedCount=nullptr) const { - std::lock_guard lock(d_lock); - size_t toLook = d_limits.size() / d_scanFraction + 1; + auto limits = d_limits.lock(); + size_t toLook = limits->size() / d_scanFraction + 1; size_t lookedAt = 0; size_t removed = 0; - auto& sequence = d_limits.get(); + auto& sequence = limits->get(); for (auto entry = sequence.begin(); entry != sequence.end() && lookedAt < toLook; lookedAt++) { if (entry->d_limiter.seenSince(cutOff)) { /* entries are ordered from least recently seen to more recently @@ -98,14 +97,14 @@ public: zeroport.sin4.sin_port=0; zeroport.truncate(zeroport.sin4.sin_family == AF_INET ? d_ipv4trunc : d_ipv6trunc); { - std::lock_guard lock(d_lock); - auto iter = d_limits.find(zeroport); - if (iter == d_limits.end()) { + auto limits = d_limits.lock(); + auto iter = limits->find(zeroport); + if (iter == limits->end()) { Entry e(zeroport, QPSLimiter(d_qps, d_burst)); - iter = d_limits.insert(e).first; + iter = limits->insert(e).first; } - moveCacheItemToBack(d_limits, iter); + moveCacheItemToBack(*limits, iter); return !iter->d_limiter.check(d_qps, d_burst); } } @@ -117,8 +116,7 @@ public: size_t getEntriesCount() const { - std::lock_guard lock(d_lock); - return d_limits.size(); + return d_limits.lock()->size(); } private: @@ -141,8 +139,7 @@ private: > > qpsContainer_t; - mutable std::mutex d_lock; - mutable qpsContainer_t d_limits; + mutable LockGuarded d_limits; mutable struct timespec d_lastCleanup; unsigned int d_qps, d_burst, d_ipv4trunc, d_ipv6trunc, d_cleanupDelay, d_expiration; unsigned int d_scanFraction{10}; @@ -240,85 +237,80 @@ public: } bool matches(const DNSQuestion* dq) const override { - if(dq->remote->sin4.sin_family == AF_INET) { - ReadLock rl(&d_lock4); - auto fnd = d_ip4s.find(dq->remote->sin4.sin_addr.s_addr); - if(fnd == d_ip4s.end()) { + if (dq->remote->sin4.sin_family == AF_INET) { + auto ip4s = d_ip4s.read_lock(); + auto fnd = ip4s->find(dq->remote->sin4.sin_addr.s_addr); + if (fnd == ip4s->end()) { return false; } - return time(0) < fnd->second; + return time(nullptr) < fnd->second; } else { - ReadLock rl(&d_lock6); - auto fnd = d_ip6s.find({*dq->remote}); - if(fnd == d_ip6s.end()) { + auto ip6s = d_ip6s.read_lock(); + auto fnd = ip6s->find({*dq->remote}); + if (fnd == ip6s->end()) { return false; } - return time(0) < fnd->second; + return time(nullptr) < fnd->second; } } void add(const ComboAddress& ca, time_t ttd) { // think twice before adding templates here - if(ca.sin4.sin_family == AF_INET) { - WriteLock rl(&d_lock4); - auto res=d_ip4s.insert({ca.sin4.sin_addr.s_addr, ttd}); - if(!res.second && (time_t)res.first->second < ttd) + if (ca.sin4.sin_family == AF_INET) { + auto res = d_ip4s.lock()->insert({ca.sin4.sin_addr.s_addr, ttd}); + if (!res.second && (time_t)res.first->second < ttd) { res.first->second = (uint32_t)ttd; + } } else { - WriteLock rl(&d_lock6); - auto res=d_ip6s.insert({{ca}, ttd}); - if(!res.second && (time_t)res.first->second < ttd) + auto res = d_ip6s.lock()->insert({{ca}, ttd}); + if (!res.second && (time_t)res.first->second < ttd) { res.first->second = (uint32_t)ttd; + } } } void remove(const ComboAddress& ca) { - if(ca.sin4.sin_family == AF_INET) { - WriteLock rl(&d_lock4); - d_ip4s.erase(ca.sin4.sin_addr.s_addr); + if (ca.sin4.sin_family == AF_INET) { + d_ip4s.lock()->erase(ca.sin4.sin_addr.s_addr); } else { - WriteLock rl(&d_lock6); - d_ip6s.erase({ca}); + d_ip6s.lock()->erase({ca}); } } void clear() { - { - WriteLock rl(&d_lock4); - d_ip4s.clear(); - } - WriteLock rl(&d_lock6); - d_ip6s.clear(); + d_ip4s.lock()->clear(); + d_ip6s.lock()->clear(); } void cleanup() { time_t now = time(nullptr); { - WriteLock rl(&d_lock4); - - for(auto iter = d_ip4s.begin(); iter != d_ip4s.end(); ) { - if(iter->second < now) - iter=d_ip4s.erase(iter); - else + auto ip4s = d_ip4s.lock(); + for (auto iter = ip4s->begin(); iter != ip4s->end(); ) { + if (iter->second < now) { + iter = ip4s->erase(iter); + } + else { ++iter; + } } - } { - WriteLock rl(&d_lock6); - - for(auto iter = d_ip6s.begin(); iter != d_ip6s.end(); ) { - if(iter->second < now) - iter=d_ip6s.erase(iter); - else + auto ip6s = d_ip6s.lock(); + for (auto iter = ip6s->begin(); iter != ip6s->end(); ) { + if (iter->second < now) { + iter = ip6s->erase(iter); + } + else { ++iter; + } } } @@ -327,19 +319,19 @@ public: string toString() const override { - time_t now=time(0); + time_t now = time(nullptr); uint64_t count = 0; - { - ReadLock rl(&d_lock4); - for(const auto& ip : d_ip4s) - if(now < ip.second) - ++count; + + for (const auto& ip : *(d_ip4s.read_lock())) { + if (now < ip.second) { + ++count; + } } - { - ReadLock rl(&d_lock6); - for(const auto& ip : d_ip6s) - if(now < ip.second) - ++count; + + for (const auto& ip : *(d_ip6s.read_lock())) { + if (now < ip.second) { + ++count; + } } return "Src: "+std::to_string(count)+" ips"; @@ -354,10 +346,8 @@ private: return ah & (bh<<1); } }; - std::unordered_map d_ip6s; - std::unordered_map d_ip4s; - mutable ReadWriteLock d_lock4; - mutable ReadWriteLock d_lock6; + mutable SharedLockGuarded> d_ip6s; + mutable SharedLockGuarded> d_ip4s; }; diff --git a/pdns/libssl.cc b/pdns/libssl.cc index 5b64a19755..2d2a717b82 100644 --- a/pdns/libssl.cc +++ b/pdns/libssl.cc @@ -493,7 +493,7 @@ bool libssl_set_min_tls_version(std::unique_ptr& ctx OpenSSLTLSTicketKeysRing::OpenSSLTLSTicketKeysRing(size_t capacity) { - d_ticketKeys.set_capacity(capacity); + d_ticketKeys.lock()->set_capacity(capacity); } OpenSSLTLSTicketKeysRing::~OpenSSLTLSTicketKeysRing() @@ -502,22 +502,20 @@ OpenSSLTLSTicketKeysRing::~OpenSSLTLSTicketKeysRing() void OpenSSLTLSTicketKeysRing::addKey(std::shared_ptr newKey) { - WriteLock wl(&d_lock); - d_ticketKeys.push_front(newKey); + d_ticketKeys.lock()->push_front(newKey); } std::shared_ptr OpenSSLTLSTicketKeysRing::getEncryptionKey() { - ReadLock rl(&d_lock); - return d_ticketKeys.front(); + return d_ticketKeys.read_lock()->front(); } std::shared_ptr OpenSSLTLSTicketKeysRing::getDecryptionKey(unsigned char name[TLS_TICKETS_KEY_NAME_SIZE], bool& activeKey) { - ReadLock rl(&d_lock); - for (auto& key : d_ticketKeys) { + auto keys = d_ticketKeys.read_lock(); + for (auto& key : *keys) { if (key->nameMatches(name)) { - activeKey = (key == d_ticketKeys.front()); + activeKey = (key == keys->front()); return key; } } @@ -526,8 +524,7 @@ std::shared_ptr OpenSSLTLSTicketKeysRing::getDecryptionKey( size_t OpenSSLTLSTicketKeysRing::getKeysCount() { - ReadLock rl(&d_lock); - return d_ticketKeys.size(); + return d_ticketKeys.read_lock()->size(); } void OpenSSLTLSTicketKeysRing::loadTicketsKeys(const std::string& keyFile) diff --git a/pdns/libssl.hh b/pdns/libssl.hh index 360f46a041..514b2ad7df 100644 --- a/pdns/libssl.hh +++ b/pdns/libssl.hh @@ -99,8 +99,7 @@ public: void rotateTicketsKey(time_t now); private: - boost::circular_buffer > d_ticketKeys; - ReadWriteLock d_lock; + SharedLockGuarded > > d_ticketKeys; }; void* libssl_get_ticket_key_callback_data(SSL* s);