2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
26 #include <unordered_map>
28 #include <boost/variant.hpp>
30 #include "circular_buffer.hh"
39 ComboAddress requestor;
48 ComboAddress requestor;
54 ComboAddress ds; // who handled it
59 boost::circular_buffer<Query> queryRing;
60 boost::circular_buffer<Response> respRing;
65 Rings(size_t capacity=10000, size_t numberOfShards=1, size_t nbLockTries=5, bool keepLockingStats=false): d_blockingQueryInserts(0), d_blockingResponseInserts(0), d_deferredQueryInserts(0), d_deferredResponseInserts(0), d_nbQueryEntries(0), d_nbResponseEntries(0), d_currentShardId(0), d_numberOfShards(numberOfShards), d_nbLockTries(nbLockTries), d_keepLockingStats(keepLockingStats)
67 setCapacity(capacity, numberOfShards);
68 if (numberOfShards <= 1) {
72 std::unordered_map<int, vector<boost::variant<string,double> > > getTopBandwidth(unsigned int numentries);
73 size_t numDistinctRequestors();
74 /* This function should only be called at configuration time before any query or response has been inserted */
75 void setCapacity(size_t newCapacity, size_t numberOfShards)
77 if (numberOfShards < d_numberOfShards) {
78 throw std::runtime_error("Decreasing the number of shards in the query and response rings is not supported");
81 d_shards.resize(numberOfShards);
82 d_numberOfShards = numberOfShards;
84 /* resize all the rings */
85 for (auto& shard : d_shards) {
86 shard = std::unique_ptr<Shard>(new Shard());
88 std::lock_guard<std::mutex> wl(shard->queryLock);
89 shard->queryRing.set_capacity(newCapacity / numberOfShards);
92 std::lock_guard<std::mutex> wl(shard->respLock);
93 shard->respRing.set_capacity(newCapacity / numberOfShards);
97 /* we just recreated the shards so they are now empty */
99 d_nbResponseEntries = 0;
102 void setNumberOfLockRetries(size_t retries)
104 if (d_numberOfShards <= 1) {
107 d_nbLockTries = retries;
111 size_t getNumberOfShards() const
113 return d_numberOfShards;
116 size_t getNumberOfQueryEntries() const
118 return d_nbQueryEntries;
121 size_t getNumberOfResponseEntries() const
123 return d_nbResponseEntries;
126 void insertQuery(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
128 for (size_t idx = 0; idx < d_nbLockTries; idx++) {
129 auto& shard = getOneShard();
130 std::unique_lock<std::mutex> wl(shard->queryLock, std::try_to_lock);
131 if (wl.owns_lock()) {
132 insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
135 if (d_keepLockingStats) {
136 d_deferredQueryInserts++;
140 /* out of luck, let's just wait */
141 if (d_keepLockingStats) {
142 d_blockingResponseInserts++;
144 auto& shard = getOneShard();
145 std::lock_guard<std::mutex> wl(shard->queryLock);
146 insertQueryLocked(shard, when, requestor, name, qtype, size, dh);
149 void insertResponse(const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
151 for (size_t idx = 0; idx < d_nbLockTries; idx++) {
152 auto& shard = getOneShard();
153 std::unique_lock<std::mutex> wl(shard->respLock, std::try_to_lock);
154 if (wl.owns_lock()) {
155 insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
158 if (d_keepLockingStats) {
159 d_deferredResponseInserts++;
163 /* out of luck, let's just wait */
164 if (d_keepLockingStats) {
165 d_blockingResponseInserts++;
167 auto& shard = getOneShard();
168 std::lock_guard<std::mutex> wl(shard->respLock);
169 insertResponseLocked(shard, when, requestor, name, qtype, usec, size, dh, backend);
174 for (auto& shard : d_shards) {
176 std::lock_guard<std::mutex> wl(shard->queryLock);
177 shard->queryRing.clear();
180 std::lock_guard<std::mutex> wl(shard->respLock);
181 shard->respRing.clear();
185 d_nbQueryEntries.store(0);
186 d_nbResponseEntries.store(0);
187 d_currentShardId.store(0);
188 d_blockingQueryInserts.store(0);
189 d_blockingResponseInserts.store(0);
190 d_deferredQueryInserts.store(0);
191 d_deferredResponseInserts.store(0);
194 std::vector<std::unique_ptr<Shard> > d_shards;
195 std::atomic<uint64_t> d_blockingQueryInserts;
196 std::atomic<uint64_t> d_blockingResponseInserts;
197 std::atomic<uint64_t> d_deferredQueryInserts;
198 std::atomic<uint64_t> d_deferredResponseInserts;
203 return (d_currentShardId++ % d_numberOfShards);
206 std::unique_ptr<Shard>& getOneShard()
208 return d_shards[getShardId()];
211 void insertQueryLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, uint16_t size, const struct dnsheader& dh)
213 if (!shard->queryRing.full()) {
216 shard->queryRing.push_back({when, requestor, name, size, qtype, dh});
219 void insertResponseLocked(std::unique_ptr<Shard>& shard, const struct timespec& when, const ComboAddress& requestor, const DNSName& name, uint16_t qtype, unsigned int usec, unsigned int size, const struct dnsheader& dh, const ComboAddress& backend)
221 if (!shard->respRing.full()) {
222 d_nbResponseEntries++;
224 shard->respRing.push_back({when, requestor, name, qtype, usec, size, dh, backend});
227 std::atomic<size_t> d_nbQueryEntries;
228 std::atomic<size_t> d_nbResponseEntries;
229 std::atomic<size_t> d_currentShardId;
231 size_t d_numberOfShards;
232 size_t d_nbLockTries = 5;
233 bool d_keepLockingStats{false};
236 extern Rings g_rings;