#endif /* HAVE_IPCIPHER */
{ "makeKey", true, "", "generate a new server access key, emit configuration line ready for pasting" },
{ "makeRule", true, "rule", "Make a NetmaskGroupRule() or a SuffixMatchNodeRule(), depending on how it is called" } ,
- { "MaxQPSIPRule", true, "qps, [v4Mask=32 [, v6Mask=64 [, burst=qps [, expiration=300 [, cleanupDelay=60]]]]]", "matches traffic exceeding the qps limit per subnet" },
+ { "MaxQPSIPRule", true, "qps, [v4Mask=32 [, v6Mask=64 [, burst=qps [, expiration=300 [, cleanupDelay=60 [, scanFraction=10 [, shards=10]]]]]]]", "matches traffic exceeding the qps limit per subnet" },
{ "MaxQPSRule", true, "qps", "matches traffic **not** exceeding this qps limit" },
{ "mvCacheHitResponseRule", true, "from, to", "move cache hit response rule 'from' to a position where it is in front of 'to'. 'to' can be one larger than the largest rule" },
{ "mvCacheHitResponseRuleToTop", true, "", "move the last cache hit response rule to the first position" },
return rulesToString(getTopRules(*rules, top.get_value_or(10)), vars);
});
- luaCtx.writeFunction("MaxQPSIPRule", [](unsigned int qps, boost::optional<unsigned int> ipv4trunc, boost::optional<unsigned int> ipv6trunc, boost::optional<unsigned int> burst, boost::optional<unsigned int> expiration, boost::optional<unsigned int> cleanupDelay, boost::optional<unsigned int> scanFraction) {
- return std::shared_ptr<DNSRule>(new MaxQPSIPRule(qps, burst.get_value_or(qps), ipv4trunc.get_value_or(32), ipv6trunc.get_value_or(64), expiration.get_value_or(300), cleanupDelay.get_value_or(60), scanFraction.get_value_or(10)));
+ luaCtx.writeFunction("MaxQPSIPRule", [](unsigned int qps, boost::optional<unsigned int> ipv4trunc, boost::optional<unsigned int> ipv6trunc, boost::optional<unsigned int> burst, boost::optional<unsigned int> expiration, boost::optional<unsigned int> cleanupDelay, boost::optional<unsigned int> scanFraction, boost::optional<unsigned int> shards) {
+ return std::shared_ptr<DNSRule>(new MaxQPSIPRule(qps, burst.get_value_or(qps), ipv4trunc.get_value_or(32), ipv6trunc.get_value_or(64), expiration.get_value_or(300), cleanupDelay.get_value_or(60), scanFraction.get_value_or(10), shards.get_value_or(10)));
});
luaCtx.writeFunction("MaxQPSRule", [](unsigned int qps, boost::optional<unsigned int> burst) {
class MaxQPSIPRule : public DNSRule
{
public:
- MaxQPSIPRule(unsigned int qps, unsigned int burst, unsigned int ipv4trunc=32, unsigned int ipv6trunc=64, unsigned int expiration=300, unsigned int cleanupDelay=60, unsigned int scanFraction=10):
- d_qps(qps), d_burst(burst), d_ipv4trunc(ipv4trunc), d_ipv6trunc(ipv6trunc), d_cleanupDelay(cleanupDelay), d_expiration(expiration), d_scanFraction(scanFraction)
+ MaxQPSIPRule(unsigned int qps, unsigned int burst, unsigned int ipv4trunc=32, unsigned int ipv6trunc=64, unsigned int expiration=300, unsigned int cleanupDelay=60, unsigned int scanFraction=10, size_t shardsCount=10):
+ d_shards(shardsCount), d_qps(qps), d_burst(burst), d_ipv4trunc(ipv4trunc), d_ipv6trunc(ipv6trunc), d_cleanupDelay(cleanupDelay), d_expiration(expiration), d_scanFraction(scanFraction)
{
d_cleaningUp.clear();
gettime(&d_lastCleanup, true);
void clear()
{
- d_limits.lock()->clear();
+ for (auto& shard : d_shards) {
+ shard.lock()->clear();
+ }
}
size_t cleanup(const struct timespec& cutOff, size_t* scannedCount=nullptr) const
{
- auto limits = d_limits.lock();
- size_t toLook = limits->size() / d_scanFraction + 1;
- size_t lookedAt = 0;
-
size_t removed = 0;
- auto& sequence = limits->get<SequencedTag>();
- for (auto entry = sequence.begin(); entry != sequence.end() && lookedAt < toLook; lookedAt++) {
- if (entry->d_limiter.seenSince(cutOff)) {
- /* entries are ordered from least recently seen to more recently
- seen, as soon as we see one that has not expired yet, we are
- done */
- lookedAt++;
- break;
- }
-
- entry = sequence.erase(entry);
- removed++;
+ if (scannedCount != nullptr) {
+ *scannedCount = 0;
}
- if (scannedCount != nullptr) {
- *scannedCount = lookedAt;
+ for (auto& shard : d_shards) {
+ auto limits = shard.lock();
+ const size_t toLook = std::round((1.0 * limits->size()) / d_scanFraction)+ 1;
+ size_t lookedAt = 0;
+
+ auto& sequence = limits->get<SequencedTag>();
+ for (auto entry = sequence.begin(); entry != sequence.end() && lookedAt < toLook; lookedAt++) {
+ if (entry->d_limiter.seenSince(cutOff)) {
+ /* entries are ordered from least recently seen to more recently
+ seen, as soon as we see one that has not expired yet, we are
+ done */
+ lookedAt++;
+ break;
+ }
+
+ entry = sequence.erase(entry);
+ removed++;
+ }
+
+ if (scannedCount != nullptr) {
+ *scannedCount += lookedAt;
+ }
}
return removed;
ComboAddress zeroport(dq->ids.origRemote);
zeroport.sin4.sin_port=0;
zeroport.truncate(zeroport.sin4.sin_family == AF_INET ? d_ipv4trunc : d_ipv6trunc);
+ auto hash = ComboAddress::addressOnlyHash()(zeroport);
+ auto& shard = d_shards[hash % d_shards.size()];
{
- auto limits = d_limits.lock();
+ auto limits = shard.lock();
auto iter = limits->find(zeroport);
if (iter == limits->end()) {
Entry e(zeroport, QPSLimiter(d_qps, d_burst));
size_t getEntriesCount() const
{
- return d_limits.lock()->size();
+ size_t count = 0;
+ for (auto& shard : d_shards) {
+ count += shard.lock()->size();
+ }
+ return count;
+ }
+
+ size_t getNumberOfShards() const
+ {
+ return d_shards.size();
}
private:
>
> qpsContainer_t;
- mutable LockGuarded<qpsContainer_t> d_limits;
+ mutable std::vector<LockGuarded<qpsContainer_t>> d_shards;
mutable struct timespec d_lastCleanup;
- unsigned int d_qps, d_burst, d_ipv4trunc, d_ipv6trunc, d_cleanupDelay, d_expiration;
- unsigned int d_scanFraction{10};
+ const unsigned int d_qps, d_burst, d_ipv4trunc, d_ipv6trunc, d_cleanupDelay, d_expiration;
+ const unsigned int d_scanFraction{10};
mutable std::atomic_flag d_cleaningUp;
};
:param string function: the name of a Lua function
-.. function:: MaxQPSIPRule(qps[, v4Mask[, v6Mask[, burst[, expiration[, cleanupDelay[, scanFraction]]]]]])
+.. function:: MaxQPSIPRule(qps[, v4Mask[, v6Mask[, burst[, expiration[, cleanupDelay[, scanFraction [, shards]]]]]]])
+
+ .. versionchanged:: 1.8.0
+ ``shards`` parameter added
Matches traffic for a subnet specified by ``v4Mask`` or ``v6Mask`` exceeding ``qps`` queries per second up to ``burst`` allowed.
This rule keeps track of QPS by netmask or source IP. This state is cleaned up regularly if ``cleanupDelay`` is greater than zero,
:param int expiration: How long to keep netmask or IP addresses after they have last been seen, in seconds. Default is 300
:param int cleanupDelay: The number of seconds between two cleanups. Default is 60
:param int scanFraction: The maximum fraction of the store to scan for expired entries, for example 5 would scan at most 20% of it. Default is 10 so 10%
+ :param int shards: How many shards to use, to decrease lock contention between threads. Default is 10 and is a safe default unless a very high number of threads are used to process incoming queries
.. function:: MaxQPSRule(qps)
auto removed = rule.cleanup(notExpiredTime, &scanned);
BOOST_CHECK_EQUAL(removed, 0U);
/* the first entry should still have been valid, we should not have scanned more */
- BOOST_CHECK_EQUAL(scanned, 1U);
+ BOOST_CHECK_EQUAL(scanned, rule.getNumberOfShards());
BOOST_CHECK_EQUAL(rule.getEntriesCount(), total);
/* make sure all entries are _not_ valid anymore */
expiredTime.tv_sec += 1;
removed = rule.cleanup(expiredTime, &scanned);
- BOOST_CHECK_EQUAL(removed, (total / scanFraction) + 1);
+ BOOST_CHECK_EQUAL(removed, (total / scanFraction) + 1 + rule.getNumberOfShards());
/* we should not have scanned more than scanFraction */
BOOST_CHECK_EQUAL(scanned, removed);
BOOST_CHECK_EQUAL(rule.getEntriesCount(), total - removed);