unsigned int total=0;
{
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->respLock);
- if(!labels) {
- for(const auto& a : shard->respRing) {
+ auto rl = shard->respRing.lock();
+ if (!labels) {
+ for(const auto& a : *rl) {
if(!pred(a))
continue;
counts[a.name]++;
}
else {
unsigned int lab = *labels;
- for(const auto& a : shard->respRing) {
+ for(const auto& a : *rl) {
if(!pred(a))
continue;
StatNode root;
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->respLock);
+ auto rl = shard->respRing.lock();
- for(const auto& c : shard->respRing) {
+ for(const auto& c : *rl) {
if (now < c.when)
continue;
vector<pair<unsigned int, entry_t > > ret;
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->respLock);
+ auto rl = shard->respRing.lock();
entry_t e;
unsigned int count=1;
- for(const auto& c : shard->respRing) {
+ for(const auto& c : *rl) {
if(rcode && (rcode.get() != c.dh.rcode))
continue;
e["qname"]=c.name.toString();
counts.reserve(g_rings.getNumberOfResponseEntries());
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->respLock);
- for(const auto& c : shard->respRing) {
+ auto rl = shard->respRing.lock();
+ for(const auto& c : *rl) {
if(seconds && c.when < cutoff)
continue;
counts.reserve(g_rings.getNumberOfQueryEntries());
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& c : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for(const auto& c : *rl) {
if(seconds && c.when < cutoff)
continue;
if(now < c.when)
unsigned int total=0;
{
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& c : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for(const auto& c : *rl) {
counts[c.requestor]++;
total++;
}
unsigned int total=0;
if(!labels) {
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& a : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for(const auto& a : *rl) {
counts[a.name]++;
total++;
}
else {
unsigned int lab = *labels;
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(auto a : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for(auto a : *rl) {
a.name.trimToLabels(lab);
counts[a.name]++;
total++;
rings.reserve(g_rings.getNumberOfShards());
for (const auto& shard : g_rings.d_shards) {
{
- std::lock_guard<std::mutex> rl(shard->respLock);
- rings.push_back(shard->respRing);
+ auto rl = shard->respRing.lock();
+ rings.push_back(*rl);
}
totalEntries += rings.back().size();
}
rr.reserve(g_rings.getNumberOfResponseEntries());
for (const auto& shard : g_rings.d_shards) {
{
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for (const auto& entry : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for (const auto& entry : *rl) {
qr.push_back(entry);
}
}
{
- std::lock_guard<std::mutex> rl(shard->respLock);
- for (const auto& entry : shard->respRing) {
+ auto rl = shard->respRing.lock();
+ for (const auto& entry : *rl) {
rr.push_back(entry);
}
}
unsigned int size=0;
{
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->respLock);
- for(const auto& r : shard->respRing) {
+ auto rl = shard->respRing.lock();
+ for(const auto& r : *rl) {
/* skip actively discovered timeouts */
if (r.usec == std::numeric_limits<unsigned int>::max())
continue;
{
std::set<ComboAddress, ComboAddress::addressOnlyLessThan> s;
for (const auto& shard : d_shards) {
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& q : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for (const auto& q : *rl) {
s.insert(q.requestor);
}
}
uint64_t total=0;
for (const auto& shard : d_shards) {
{
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& q : shard->queryRing) {
- counts[q.requestor]+=q.size;
+ auto rl = shard->queryRing.lock();
+ for(const auto& q : *rl) {
+ counts[q.requestor] += q.size;
total+=q.size;
}
}
{
- std::lock_guard<std::mutex> rl(shard->respLock);
- for(const auto& r : shard->respRing) {
- counts[r.requestor]+=r.size;
+ auto rl = shard->respRing.lock();
+ for(const auto& r : *rl) {
+ counts[r.requestor] += r.size;
total+=r.size;
}
}
*/
#pragma once
-#include <mutex>
#include <time.h>
#include <unordered_map>
#include "circular_buffer.hh"
#include "dnsname.hh"
#include "iputils.hh"
+#include "lock.hh"
#include "stat_t.hh"
struct Shard
{
- boost::circular_buffer<Query> queryRing;
- boost::circular_buffer<Response> respRing;
- std::mutex queryLock;
- std::mutex respLock;
+ LockGuarded<boost::circular_buffer<Query>> queryRing{boost::circular_buffer<Query>()};
+ LockGuarded<boost::circular_buffer<Response>> respRing{boost::circular_buffer<Response>()};
};
Rings(size_t capacity=10000, size_t numberOfShards=10, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats)
/* resize all the rings */
for (auto& shard : d_shards) {
shard = std::unique_ptr<Shard>(new Shard());
- {
- std::lock_guard<std::mutex> wl(shard->queryLock);
- shard->queryRing.set_capacity(newCapacity / numberOfShards);
- }
- {
- std::lock_guard<std::mutex> wl(shard->respLock);
- shard->respRing.set_capacity(newCapacity / numberOfShards);
- }
+ shard->queryRing.lock()->set_capacity(newCapacity / numberOfShards);
+ shard->respRing.lock()->set_capacity(newCapacity / numberOfShards);
}
/* we just recreated the shards so they are now empty */
{
for (size_t idx = 0; idx < d_nbLockTries; idx++) {
auto& shard = getOneShard();
- std::unique_lock<std::mutex> wl(shard->queryLock, std::try_to_lock);
- if (wl.owns_lock()) {
- insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
+ auto lock = shard->queryRing.try_lock();
+ if (lock.owns_lock()) {
+ insertQueryLocked(*lock, when, requestor, name, qtype, size, dh);
return;
}
if (d_keepLockingStats) {
d_blockingResponseInserts++;
}
auto& shard = getOneShard();
- std::lock_guard<std::mutex> wl(shard->queryLock);
- insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
+ auto lock = shard->queryRing.lock();
+ insertQueryLocked(*lock, when, requestor, name, qtype, size, dh);
}
void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
{
for (size_t idx = 0; idx < d_nbLockTries; idx++) {
auto& shard = getOneShard();
- std::unique_lock<std::mutex> wl(shard->respLock, std::try_to_lock);
- if (wl.owns_lock()) {
- insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
+ auto lock = shard->respRing.try_lock();
+ if (lock.owns_lock()) {
+ insertResponseLocked(*lock, when, requestor, name, qtype, usec, size, dh, backend);
return;
}
if (d_keepLockingStats) {
d_blockingResponseInserts++;
}
auto& shard = getOneShard();
- std::lock_guard<std::mutex> wl(shard->respLock);
- insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
+ auto lock = shard->respRing.lock();
+ insertResponseLocked(*lock, when, requestor, name, qtype, usec, size, dh, backend);
}
void clear()
{
for (auto& shard : d_shards) {
- {
- std::lock_guard<std::mutex> wl(shard->queryLock);
- shard->queryRing.clear();
- }
- {
- std::lock_guard<std::mutex> wl(shard->respLock);
- shard->respRing.clear();
- }
+ shard->queryRing.lock()->clear();
+ shard->respRing.lock()->clear();
}
d_nbQueryEntries.store(0);
return d_shards[getShardId()];
}
- void insertQueryLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
+ void insertQueryLocked(boost::circular_buffer<Query>& ring, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
{
- if (!shard->queryRing.full()) {
+ if (!ring.full()) {
d_nbQueryEntries++;
}
- shard->queryRing.push_back({requestor, name, when, dh, size, qtype});
+ ring.push_back({requestor, name, when, dh, size, qtype});
}
- void insertResponseLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
+ void insertResponseLocked(boost::circular_buffer<Response>& ring, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
{
- if (!shard->respRing.full()) {
+ if (!ring.full()) {
d_nbResponseEntries++;
}
- shard->respRing.push_back({requestor, backend, name, when, dh, usec, size, qtype});
+ ring.push_back({requestor, backend, name, when, dh, usec, size, qtype});
}
std::atomic<size_t> d_nbQueryEntries;
}
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& c : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for(const auto& c : *rl) {
if (now < c.when) {
continue;
}
}
for (const auto& shard : g_rings.d_shards) {
- std::lock_guard<std::mutex> rl(shard->respLock);
- for(const auto& c : shard->respRing) {
+ auto rl = shard->respRing.lock();
+ for(const auto& c : *rl) {
if (now < c.when) {
continue;
}
BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
for (const auto& shard : rings.d_shards) {
- BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
- for (const auto& entry : shard->queryRing) {
+ auto ring = shard->queryRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
BOOST_CHECK_EQUAL(entry.name, qname);
BOOST_CHECK_EQUAL(entry.qtype, qtype);
BOOST_CHECK_EQUAL(entry.size, size);
BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U);
for (const auto& shard : rings.d_shards) {
- BOOST_CHECK_EQUAL(shard->queryRing.size(), entriesPerShard);
- for (const auto& entry : shard->queryRing) {
+ auto ring = shard->queryRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
BOOST_CHECK_EQUAL(entry.name, qname);
BOOST_CHECK_EQUAL(entry.qtype, qtype);
BOOST_CHECK_EQUAL(entry.size, size);
BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
for (const auto& shard : rings.d_shards) {
- BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
- for (const auto& entry : shard->respRing) {
+ auto ring = shard->respRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
BOOST_CHECK_EQUAL(entry.name, qname);
BOOST_CHECK_EQUAL(entry.qtype, qtype);
BOOST_CHECK_EQUAL(entry.size, size);
BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries);
BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries);
for (const auto& shard : rings.d_shards) {
- BOOST_CHECK_EQUAL(shard->respRing.size(), entriesPerShard);
- for (const auto& entry : shard->respRing) {
+ auto ring = shard->respRing.lock();
+ BOOST_CHECK_EQUAL(ring->size(), entriesPerShard);
+ for (const auto& entry : *ring) {
BOOST_CHECK_EQUAL(entry.name, qname);
BOOST_CHECK_EQUAL(entry.qtype, qtype);
BOOST_CHECK_EQUAL(entry.size, size);
for (const auto& shard : rings.d_shards) {
{
- std::lock_guard<std::mutex> rl(shard->queryLock);
- for(const auto& c : shard->queryRing) {
+ auto rl = shard->queryRing.lock();
+ for(const auto& c : *rl) {
numberOfQueries++;
// BOOST_CHECK* is slow as hell..
if(c.qtype != qtype) {
}
}
{
- std::lock_guard<std::mutex> rl(shard->respLock);
- for(const auto& c : shard->respRing) {
+ auto rl = shard->respRing.lock();
+ for(const auto& c : *rl) {
if(c.qtype != qtype) {
cerr<<"Invalid response QType!"<<endl;
return;
size_t totalQueries = 0;
size_t totalResponses = 0;
for (const auto& shard : rings.d_shards) {
- BOOST_CHECK_LE(shard->queryRing.size(), entriesPerShard);
- // verify that the shard is not empty
- BOOST_CHECK_GT(shard->queryRing.size(), (entriesPerShard * 0.5) + 1);
- // this would be optimal
- BOOST_WARN_GT(shard->queryRing.size(), entriesPerShard * 0.95);
- totalQueries += shard->queryRing.size();
- for (const auto& entry : shard->queryRing) {
- BOOST_CHECK_EQUAL(entry.name, qname);
- BOOST_CHECK_EQUAL(entry.qtype, qtype);
- BOOST_CHECK_EQUAL(entry.size, size);
- BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
- BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ {
+ auto ring = shard->queryRing.lock();
+ BOOST_CHECK_LE(ring->size(), entriesPerShard);
+ // verify that the shard is not empty
+ BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1);
+ // this would be optimal
+ BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95);
+ totalQueries += ring->size();
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ }
}
- BOOST_CHECK_LE(shard->respRing.size(), entriesPerShard);
- // verify that the shard is not empty
- BOOST_CHECK_GT(shard->queryRing.size(), (entriesPerShard * 0.5) + 1);
- // this would be optimal
- BOOST_WARN_GT(shard->respRing.size(), entriesPerShard * 0.95);
- totalResponses += shard->respRing.size();
- for (const auto& entry : shard->respRing) {
- BOOST_CHECK_EQUAL(entry.name, qname);
- BOOST_CHECK_EQUAL(entry.qtype, qtype);
- BOOST_CHECK_EQUAL(entry.size, size);
- BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
- BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
- BOOST_CHECK_EQUAL(entry.usec, latency);
- BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ {
+ auto ring = shard->respRing.lock();
+ BOOST_CHECK_LE(ring->size(), entriesPerShard);
+ // verify that the shard is not empty
+ BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1);
+ // this would be optimal
+ BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95);
+ totalResponses += ring->size();
+ for (const auto& entry : *ring) {
+ BOOST_CHECK_EQUAL(entry.name, qname);
+ BOOST_CHECK_EQUAL(entry.qtype, qtype);
+ BOOST_CHECK_EQUAL(entry.size, size);
+ BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec);
+ BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort());
+ BOOST_CHECK_EQUAL(entry.usec, latency);
+ BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort());
+ }
}
}
BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries);