From: Remi Gacogne Date: Fri, 16 Apr 2021 13:39:53 +0000 (+0200) Subject: dnsdist: Move the ring buffers to LockGuarded X-Git-Tag: dnsdist-1.7.0-alpha1~62^2~28 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ecab77e5007d82e2c339928d003438d6b555a1f3;p=thirdparty%2Fpdns.git dnsdist: Move the ring buffers to LockGuarded --- diff --git a/pdns/dnsdist-lua-inspection.cc b/pdns/dnsdist-lua-inspection.cc index 163df71d04..b1d6a38b33 100644 --- a/pdns/dnsdist-lua-inspection.cc +++ b/pdns/dnsdist-lua-inspection.cc @@ -33,9 +33,9 @@ static std::unordered_map>> g unsigned int total=0; { for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->respLock); - if(!labels) { - for(const auto& a : shard->respRing) { + auto rl = shard->respRing.lock(); + if (!labels) { + for(const auto& a : *rl) { if(!pred(a)) continue; counts[a.name]++; @@ -44,7 +44,7 @@ static std::unordered_map>> g } else { unsigned int lab = *labels; - for(const auto& a : shard->respRing) { + for(const auto& a : *rl) { if(!pred(a)) continue; @@ -115,9 +115,9 @@ static void statNodeRespRing(statvisitor_t visitor, unsigned int seconds) StatNode root; for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->respLock); + auto rl = shard->respRing.lock(); - for(const auto& c : shard->respRing) { + for(const auto& c : *rl) { if (now < c.when) continue; @@ -139,11 +139,11 @@ static vector > > getRespRi vector > ret; for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->respLock); + auto rl = shard->respRing.lock(); entry_t e; unsigned int count=1; - for(const auto& c : shard->respRing) { + for(const auto& c : *rl) { if(rcode && (rcode.get() != c.dh.rcode)) continue; e["qname"]=c.name.toString(); @@ -167,8 +167,8 @@ static counts_t exceedRespGen(unsigned int rate, int seconds, std::function rl(shard->respLock); - for(const auto& c : shard->respRing) { + auto rl = shard->respRing.lock(); + for(const auto& c : *rl) { if(seconds && c.when < cutoff) continue; @@ -196,8 +196,8 @@ static counts_t exceedQueryGen(unsigned int rate, int seconds, std::function rl(shard->queryLock); - for(const auto& c : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for(const auto& c : *rl) { if(seconds && c.when < cutoff) continue; if(now < c.when) @@ -239,8 +239,8 @@ void setupLuaInspection(LuaContext& luaCtx) unsigned int total=0; { for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->queryLock); - for(const auto& c : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for(const auto& c : *rl) { counts[c.requestor]++; total++; } @@ -272,8 +272,8 @@ void setupLuaInspection(LuaContext& luaCtx) unsigned int total=0; if(!labels) { for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->queryLock); - for(const auto& a : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for(const auto& a : *rl) { counts[a.name]++; total++; } @@ -282,8 +282,8 @@ void setupLuaInspection(LuaContext& luaCtx) else { unsigned int lab = *labels; for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->queryLock); - for(auto a : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for(auto a : *rl) { a.name.trimToLabels(lab); counts[a.name]++; total++; @@ -330,8 +330,8 @@ void setupLuaInspection(LuaContext& luaCtx) rings.reserve(g_rings.getNumberOfShards()); for (const auto& shard : g_rings.d_shards) { { - std::lock_guard rl(shard->respLock); - rings.push_back(shard->respRing); + auto rl = shard->respRing.lock(); + rings.push_back(*rl); } totalEntries += rings.back().size(); } @@ -426,14 +426,14 @@ void setupLuaInspection(LuaContext& luaCtx) rr.reserve(g_rings.getNumberOfResponseEntries()); for (const auto& shard : g_rings.d_shards) { { - std::lock_guard rl(shard->queryLock); - for (const auto& entry : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for (const auto& entry : *rl) { qr.push_back(entry); } } { - std::lock_guard rl(shard->respLock); - for (const auto& entry : shard->respRing) { + auto rl = shard->respRing.lock(); + for (const auto& entry : *rl) { rr.push_back(entry); } } @@ -544,8 +544,8 @@ void setupLuaInspection(LuaContext& luaCtx) unsigned int size=0; { for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->respLock); - for(const auto& r : shard->respRing) { + auto rl = shard->respRing.lock(); + for(const auto& r : *rl) { /* skip actively discovered timeouts */ if (r.usec == std::numeric_limits::max()) continue; diff --git a/pdns/dnsdist-rings.cc b/pdns/dnsdist-rings.cc index 8bb5ecef75..ce24a5f646 100644 --- a/pdns/dnsdist-rings.cc +++ b/pdns/dnsdist-rings.cc @@ -28,8 +28,8 @@ size_t Rings::numDistinctRequestors() { std::set s; for (const auto& shard : d_shards) { - std::lock_guard rl(shard->queryLock); - for(const auto& q : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for (const auto& q : *rl) { s.insert(q.requestor); } } @@ -42,16 +42,16 @@ std::unordered_map>> Rings::getTopBand uint64_t total=0; for (const auto& shard : d_shards) { { - std::lock_guard rl(shard->queryLock); - for(const auto& q : shard->queryRing) { - counts[q.requestor]+=q.size; + auto rl = shard->queryRing.lock(); + for(const auto& q : *rl) { + counts[q.requestor] += q.size; total+=q.size; } } { - std::lock_guard rl(shard->respLock); - for(const auto& r : shard->respRing) { - counts[r.requestor]+=r.size; + auto rl = shard->respRing.lock(); + for(const auto& r : *rl) { + counts[r.requestor] += r.size; total+=r.size; } } diff --git a/pdns/dnsdist-rings.hh b/pdns/dnsdist-rings.hh index d987880668..bc424a35c0 100644 --- a/pdns/dnsdist-rings.hh +++ b/pdns/dnsdist-rings.hh @@ -21,7 +21,6 @@ */ #pragma once -#include #include #include @@ -30,6 +29,7 @@ #include "circular_buffer.hh" #include "dnsname.hh" #include "iputils.hh" +#include "lock.hh" #include "stat_t.hh" @@ -57,10 +57,8 @@ struct Rings { struct Shard { - boost::circular_buffer queryRing; - boost::circular_buffer respRing; - std::mutex queryLock; - std::mutex respLock; + LockGuarded> queryRing{boost::circular_buffer()}; + LockGuarded> respRing{boost::circular_buffer()}; }; Rings(size_t capacity=10000, size_t numberOfShards=10, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats) @@ -83,14 +81,8 @@ struct Rings { /* resize all the rings */ for (auto& shard : d_shards) { shard = std::unique_ptr(new Shard()); - { - std::lock_guard wl(shard->queryLock); - shard->queryRing.set_capacity(newCapacity / numberOfShards); - } - { - std::lock_guard wl(shard->respLock); - shard->respRing.set_capacity(newCapacity / numberOfShards); - } + shard->queryRing.lock()->set_capacity(newCapacity / numberOfShards); + shard->respRing.lock()->set_capacity(newCapacity / numberOfShards); } /* we just recreated the shards so they are now empty */ @@ -126,9 +118,9 @@ struct Rings { { for (size_t idx = 0; idx < d_nbLockTries; idx++) { auto& shard = getOneShard(); - std::unique_lock wl(shard->queryLock, std::try_to_lock); - if (wl.owns_lock()) { - insertQueryLocked(shard, when, requestor, name, qtype, size, dh); + auto lock = shard->queryRing.try_lock(); + if (lock.owns_lock()) { + insertQueryLocked(*lock, when, requestor, name, qtype, size, dh); return; } if (d_keepLockingStats) { @@ -141,17 +133,17 @@ struct Rings { d_blockingResponseInserts++; } auto& shard = getOneShard(); - std::lock_guard wl(shard->queryLock); - insertQueryLocked(shard, when, requestor, name, qtype, size, dh); + auto lock = shard->queryRing.lock(); + insertQueryLocked(*lock, when, requestor, name, qtype, size, dh); } void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend) { for (size_t idx = 0; idx < d_nbLockTries; idx++) { auto& shard = getOneShard(); - std::unique_lock wl(shard->respLock, std::try_to_lock); - if (wl.owns_lock()) { - insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend); + auto lock = shard->respRing.try_lock(); + if (lock.owns_lock()) { + insertResponseLocked(*lock, when, requestor, name, qtype, usec, size, dh, backend); return; } if (d_keepLockingStats) { @@ -164,21 +156,15 @@ struct Rings { d_blockingResponseInserts++; } auto& shard = getOneShard(); - std::lock_guard wl(shard->respLock); - insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend); + auto lock = shard->respRing.lock(); + insertResponseLocked(*lock, when, requestor, name, qtype, usec, size, dh, backend); } void clear() { for (auto& shard : d_shards) { - { - std::lock_guard wl(shard->queryLock); - shard->queryRing.clear(); - } - { - std::lock_guard wl(shard->respLock); - shard->respRing.clear(); - } + shard->queryRing.lock()->clear(); + shard->respRing.lock()->clear(); } d_nbQueryEntries.store(0); @@ -211,20 +197,20 @@ private: return d_shards[getShardId()]; } - void insertQueryLocked(std::unique_ptr& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh) + void insertQueryLocked(boost::circular_buffer& ring, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh) { - if (!shard->queryRing.full()) { + if (!ring.full()) { d_nbQueryEntries++; } - shard->queryRing.push_back({requestor, name, when, dh, size, qtype}); + ring.push_back({requestor, name, when, dh, size, qtype}); } - void insertResponseLocked(std::unique_ptr& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend) + void insertResponseLocked(boost::circular_buffer& ring, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend) { - if (!shard->respRing.full()) { + if (!ring.full()) { d_nbResponseEntries++; } - shard->respRing.push_back({requestor, backend, name, when, dh, usec, size, qtype}); + ring.push_back({requestor, backend, name, when, dh, usec, size, qtype}); } std::atomic d_nbQueryEntries; diff --git a/pdns/dnsdistdist/dnsdist-dynblocks.cc b/pdns/dnsdistdist/dnsdist-dynblocks.cc index 1c03aedff3..ef33e33d98 100644 --- a/pdns/dnsdistdist/dnsdist-dynblocks.cc +++ b/pdns/dnsdistdist/dnsdist-dynblocks.cc @@ -290,8 +290,8 @@ void DynBlockRulesGroup::processQueryRules(counts_t& counts, const struct timesp } for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->queryLock); - for(const auto& c : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for(const auto& c : *rl) { if (now < c.when) { continue; } @@ -349,8 +349,8 @@ void DynBlockRulesGroup::processResponseRules(counts_t& counts, StatNode& root, } for (const auto& shard : g_rings.d_shards) { - std::lock_guard rl(shard->respLock); - for(const auto& c : shard->respRing) { + auto rl = shard->respRing.lock(); + for(const auto& c : *rl) { if (now < c.when) { continue; } diff --git a/pdns/dnsdistdist/test-dnsdistrings_cc.cc b/pdns/dnsdistdist/test-dnsdistrings_cc.cc index fee1f51dd9..b0194bc95d 100644 --- a/pdns/dnsdistdist/test-dnsdistrings_cc.cc +++ b/pdns/dnsdistdist/test-dnsdistrings_cc.cc @@ -40,8 +40,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); for (const auto& shard : rings.d_shards) { - BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard); - for (const auto& entry : shard->queryRing) { + auto ring = shard->queryRing.lock(); + BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); + for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); @@ -57,8 +58,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); for (const auto& shard : rings.d_shards) { - BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard); - for (const auto& entry : shard->queryRing) { + auto ring = shard->queryRing.lock(); + BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); + for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); @@ -77,8 +79,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); for (const auto& shard : rings.d_shards) { - BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard); - for (const auto& entry : shard->respRing) { + auto ring = shard->respRing.lock(); + BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); + for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); @@ -96,8 +99,9 @@ static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTri BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); for (const auto& shard : rings.d_shards) { - BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard); - for (const auto& entry : shard->respRing) { + auto ring = shard->respRing.lock(); + BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); + for (const auto& entry : *ring) { BOOST_CHECK_EQUAL(entry.name, qname); BOOST_CHECK_EQUAL(entry.qtype, qtype); BOOST_CHECK_EQUAL(entry.size, size); @@ -130,8 +134,8 @@ static void ringReaderThread(Rings& rings, std::atomic& done, size_t numbe for (const auto& shard : rings.d_shards) { { - std::lock_guard rl(shard->queryLock); - for(const auto& c : shard->queryRing) { + auto rl = shard->queryRing.lock(); + for(const auto& c : *rl) { numberOfQueries++; // BOOST_CHECK* is slow as hell.. if(c.qtype != qtype) { @@ -141,8 +145,8 @@ static void ringReaderThread(Rings& rings, std::atomic& done, size_t numbe } } { - std::lock_guard rl(shard->respLock); - for(const auto& c : shard->respRing) { + auto rl = shard->respRing.lock(); + for(const auto& c : *rl) { if(c.qtype != qtype) { cerr<<"Invalid response QType!"<queryRing.size(), entriesPerShard); - // verify that the shard is not empty - BOOST_CHECK_GT(shard->queryRing.size(), (entriesPerShard * 0.5) + 1); - // this would be optimal - BOOST_WARN_GT(shard->queryRing.size(), entriesPerShard * 0.95); - totalQueries += shard->queryRing.size(); - for (const auto& entry : shard->queryRing) { - BOOST_CHECK_EQUAL(entry.name, qname); - BOOST_CHECK_EQUAL(entry.qtype, qtype); - BOOST_CHECK_EQUAL(entry.size, size); - BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); - BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); + { + auto ring = shard->queryRing.lock(); + BOOST_CHECK_LE(ring->size(), entriesPerShard); + // verify that the shard is not empty + BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1); + // this would be optimal + BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95); + totalQueries += ring->size(); + for (const auto& entry : *ring) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); + } } - BOOST_CHECK_LE(shard->respRing.size(), entriesPerShard); - // verify that the shard is not empty - BOOST_CHECK_GT(shard->queryRing.size(), (entriesPerShard * 0.5) + 1); - // this would be optimal - BOOST_WARN_GT(shard->respRing.size(), entriesPerShard * 0.95); - totalResponses += shard->respRing.size(); - for (const auto& entry : shard->respRing) { - BOOST_CHECK_EQUAL(entry.name, qname); - BOOST_CHECK_EQUAL(entry.qtype, qtype); - BOOST_CHECK_EQUAL(entry.size, size); - BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); - BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); - BOOST_CHECK_EQUAL(entry.usec, latency); - BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); + { + auto ring = shard->respRing.lock(); + BOOST_CHECK_LE(ring->size(), entriesPerShard); + // verify that the shard is not empty + BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1); + // this would be optimal + BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95); + totalResponses += ring->size(); + for (const auto& entry : *ring) { + BOOST_CHECK_EQUAL(entry.name, qname); + BOOST_CHECK_EQUAL(entry.qtype, qtype); + BOOST_CHECK_EQUAL(entry.size, size); + BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); + BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); + BOOST_CHECK_EQUAL(entry.usec, latency); + BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); + } } } BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries);