]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Add response delay pools feature for Squid-to-client speed limiting.
authorEduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
Sun, 19 Feb 2017 17:13:27 +0000 (06:13 +1300)
committerAmos Jeffries <squid3@treenet.co.nz>
Sun, 19 Feb 2017 17:13:27 +0000 (06:13 +1300)
The feature restricts Squid-to-client bandwidth only.  It applies to
both cache hits and misses.

  * Rationale *

  This may be useful for specific response(s) bandwidth limiting.
  There are situations when doing this is hardly possible
  (or impossible) by means of netfilter/iptables operating with
  TCP/IP packets and IP addresses information for filtering. In other
  words, sometimes it is problematic to 'extract' a single response from
  TCP/IP data flow at system level. For example, a single Squid-to-client
  TCP connection can transmit multiple responses (persistent connections,
  pipelining or HTTP/2 connection multiplexing) or be encrypted
  (HTTPS proxy mode).

  * Description *

  When Squid starts delivering the final HTTP response to a client,
  Squid checks response_delay_pool_access rules (supporting fast ACLs
  only), in the order they were declared. The first rule with a
  matching ACL wins.  If (and only if) an "allow" rule won, Squid
  assigns the response to the corresponding named delay pool.

  If a response is assigned to a delay pool, the response becomes
  subject to the configured bucket and aggregate bandwidth limits of
  that pool, similar to the current "class 2" server-side delay pools,
  but with a brand new, dedicated "individual" filled bucket assigned to
  the matched response.

  The new feature serves the same purpose as the existing client-side
  pools: both features limit Squid-to-client bandwidth. Their common
  interface was placed into a new base BandwidthBucket class.  The
  difference is that client-side pools do not aggregate clients and
  always use one bucket per client IP. It is possible that a response
  becomes a subject of both these pools. In such situations only matched
  response delay pool will be used for Squid-to-client speed limiting.

  * Limitations *

  The accurate SMP support (with the aggregate bucket shared among
  workers) is outside this patch scope. In SMP configurations,
  Squid should automatically divide the aggregate_speed_limit and
  max_aggregate_size values among the configured number of Squid
  workers.

  * Also: *

  Fixed ClientDelayConfig which did not perform cleanup on
  destruction, causing memory problems detected by Valgrind. It was not
  possible to fix this with minimal changes because of linker problems
  with SquidConfig while checking with test-builds.sh. So I had
  to refactor ClientDelayConfig module, separating configuration code
  (old ClientDelayConfig class) from configured data (a new
  ClientDelayPools class) and minimizing dependencies with SquidConfig.

26 files changed:
src/BandwidthBucket.cc [new file with mode: 0644]
src/BandwidthBucket.h [new file with mode: 0644]
src/ClientDelayConfig.cc
src/ClientDelayConfig.h
src/ClientInfo.h
src/Makefile.am
src/MessageBucket.cc [new file with mode: 0644]
src/MessageBucket.h [new file with mode: 0644]
src/MessageDelayPools.cc [new file with mode: 0644]
src/MessageDelayPools.h [new file with mode: 0644]
src/SquidConfig.h
src/cache_cf.cc
src/cf.data.depend
src/cf.data.pre
src/client_db.cc
src/client_side.cc
src/client_side.h
src/comm.cc
src/comm/IoCallback.cc
src/comm/Loops.h
src/comm/Write.cc
src/comm/Write.h
src/comm/forward.h
src/fde.h
src/http/Stream.cc
src/http/Stream.h

diff --git a/src/BandwidthBucket.cc b/src/BandwidthBucket.cc
new file mode 100644 (file)
index 0000000..d919a44
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "ClientInfo.h"
+#include "comm/Connection.h"
+#include "Debug.h"
+#include "fde.h"
+
+BandwidthBucket::BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit) :
+    bucketLevel( sizeLimit * (initialLevelPercent / 100.0)),
+    selectWaiting(false),
+    writeSpeedLimit(speed),
+    bucketSizeLimit(sizeLimit)
+{
+    getCurrentTime();
+    /* put current time to have something sensible here */
+    prevTime = current_dtime;
+}
+
+void
+BandwidthBucket::refillBucket()
+{
+    if (noLimit())
+        return;
+    // all these times are in seconds, with double precision
+    const double currTime = current_dtime;
+    const double timePassed = currTime - prevTime;
+
+    // Calculate allowance for the time passed. Use double to avoid
+    // accumulating rounding errors for small intervals. For example, always
+    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+    const double gain = timePassed * writeSpeedLimit;
+
+    // to further combat error accumulation during micro updates,
+    // quit before updating time if we cannot add at least one byte
+    if (gain < 1.0)
+        return;
+
+    prevTime = currTime;
+
+    // for "first" connections, drain initial fat before refilling but keep
+    // updating prevTime to avoid bursts after the fat is gone
+    if (bucketLevel > bucketSizeLimit) {
+        debugs(77, 4, "not refilling while draining initial fat");
+        return;
+    }
+
+    bucketLevel += gain;
+
+    // obey quota limits
+    if (bucketLevel > bucketSizeLimit)
+        bucketLevel = bucketSizeLimit;
+}
+
+bool
+BandwidthBucket::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+    const int q = quota();
+    if (!q)
+        return false;
+    else if (q < 0)
+        return true;
+    const int nleft_corrected = min(nleft, q);
+    if (nleft != nleft_corrected) {
+        debugs(77, 5, state->conn << " writes only " <<
+               nleft_corrected << " out of " << nleft);
+        nleft = nleft_corrected;
+    }
+    return true;
+}
+
+void
+BandwidthBucket::reduceBucket(const int len)
+{
+    if (len <= 0 || noLimit())
+        return;
+    bucketLevel -= len;
+    if (bucketLevel < 0.0) {
+        debugs(77, DBG_IMPORTANT, "drained too much"); // should not happen
+        bucketLevel = 0;
+    }
+}
+
+BandwidthBucket *
+BandwidthBucket::SelectBucket(fde *f)
+{
+    BandwidthBucket *bucket = f->writeQuotaHandler.getRaw();
+    if (!bucket) {
+        ClientInfo *clientInfo = f->clientInfo;
+        if (clientInfo && clientInfo->writeLimitingActive)
+            bucket = clientInfo;
+    }
+    return bucket;
+}
+
+#endif /* USE_DELAY_POOLS */
+
diff --git a/src/BandwidthBucket.h b/src/BandwidthBucket.h
new file mode 100644 (file)
index 0000000..c6ba7f3
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef BANDWIDTHBUCKET_H
+#define BANDWIDTHBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "comm/IoCallback.h"
+
+class fde;
+
+/// Base class for Squid-to-client bandwidth limiting
+class BandwidthBucket
+{
+public:
+    BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit);
+    virtual ~BandwidthBucket() {}
+
+    static BandwidthBucket *SelectBucket(fde *f);
+
+    /// \returns the number of bytes this bucket allows to write,
+    /// also considering aggregates, if any. Negative quota means
+    /// no limitations by this bucket.
+    virtual int quota() = 0;
+    /// Adjusts nleft to not exceed the current bucket quota value,
+    /// if needed.
+    virtual bool applyQuota(int &nleft, Comm::IoCallback *state);
+    /// Will plan another write call.
+    virtual void scheduleWrite(Comm::IoCallback *state) = 0;
+    /// Performs cleanup when the related file descriptor becomes closed.
+    virtual void onFdClosed() { selectWaiting = false; }
+    /// Decreases the bucket level.
+    virtual void reduceBucket(const int len);
+    /// Whether this bucket will not do bandwidth limiting.
+    bool noLimit() const { return writeSpeedLimit < 0; }
+
+protected:
+    /// Increases the bucket level with the writeSpeedLimit speed.
+    void refillBucket();
+
+public:
+    double bucketLevel; ///< how much can be written now
+    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+
+protected:
+    double prevTime; ///< previous time when we checked
+    double writeSpeedLimit; ///< Write speed limit in bytes per second.
+    double bucketSizeLimit; ///< maximum bucket size
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+
index 1f20dfddd5b390c34bcb68b6f57960cc62ef0012..07e0a067af6fbf89e4a7f9341fcb1f70e65a4f4b 100644 (file)
 #include "Parsing.h"
 #include "Store.h"
 
+ClientDelayPool::~ClientDelayPool()
+{
+    if (access)
+        aclDestroyAccessList(&access);
+}
+
 void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) const
 {
     LOCAL_ARRAY(char, nom, 32);
@@ -23,81 +29,85 @@ void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne)
     storeAppendPrintf(entry, "\n");
 }
 
+ClientDelayPools *
+ClientDelayPools::Instance()
+{
+    static ClientDelayPools pools;
+    return &pools;
+}
+
+ClientDelayPools::~ClientDelayPools()
+{
+    pools.clear();
+}
+
 void
 ClientDelayConfig::finalize()
 {
-    for (unsigned int i = 0; i < pools.size(); ++i) {
+    for (unsigned int i = 0; i < pools().size(); ++i) {
         /* pools require explicit 'allow' to assign a client into them */
-        if (!pools[i].access) {
-            debugs(77, DBG_IMPORTANT, "client_delay_pool #" << (i+1) <<
+        if (!pool(i).access) {
+            debugs(77, DBG_IMPORTANT, "WARNING: client_delay_pool #" << (i+1) <<
                    " has no client_delay_access configured. " <<
                    "No client will ever use it.");
         }
     }
 }
 
-void ClientDelayConfig::freePoolCount()
+void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
 {
-    pools.clear();
+    const auto &pools_ = ClientDelayPools::Instance()->pools;
+    if (pools_.size()) {
+        storeAppendPrintf(entry, "%s %d\n", name, static_cast<int>(pools_.size()));
+        for (unsigned int i = 0; i < pools_.size(); ++i)
+            pools_[i]->dump(entry, i);
+    }
 }
 
-void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
+void
+ClientDelayConfig::freePools()
 {
-    if (pools.size()) {
-        storeAppendPrintf(entry, "%s %d\n", name, (int)pools.size());
-        for (unsigned int i = 0; i < pools.size(); ++i)
-            pools[i].dump(entry, i);
-    }
+    pools().clear();
 }
 
 void ClientDelayConfig::parsePoolCount()
 {
-    if (pools.size()) {
-        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, aborting all previous client_delay_pools config");
-        clean();
+    if (pools().size()) {
+        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, " <<
+               "aborting all previous client_delay_pools config");
+        freePools();
     }
     unsigned short pools_;
     ConfigParser::ParseUShort(&pools_);
-    for (int i = 0; i < pools_; ++i) {
-        pools.push_back(ClientDelayPool());
-    }
+    for (int i = 0; i < pools_; ++i)
+        pools().push_back(new ClientDelayPool());
 }
 
 void ClientDelayConfig::parsePoolRates()
 {
-    unsigned short pool;
-    ConfigParser::ParseUShort(&pool);
-
-    if (pool < 1 || pool > pools.size()) {
-        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
-        return;
+    if (unsigned short poolId = parsePoolId()) {
+        --poolId;
+        pool(poolId).rate = GetInteger();
+        pool(poolId).highwatermark = GetInteger64();
     }
-
-    --pool;
-
-    pools[pool].rate = GetInteger();
-    pools[pool].highwatermark = GetInteger64();
 }
 
 void ClientDelayConfig::parsePoolAccess(ConfigParser &parser)
 {
-    unsigned short pool;
-
-    ConfigParser::ParseUShort(&pool);
-
-    if (pool < 1 || pool > pools.size()) {
-        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
-        return;
-    }
-
-    --pool;
-    aclParseAccessLine("client_delay_access", parser, &pools[pool].access);
+    if (const unsigned short poolId = parsePoolId())
+        aclParseAccessLine("client_delay_access", parser, &(pool(poolId-1).access));
 }
 
-void ClientDelayConfig::clean()
+unsigned short
+ClientDelayConfig::parsePoolId()
 {
-    for (unsigned int i = 0; i < pools.size(); ++i) {
-        aclDestroyAccessList(&pools[i].access);
+    unsigned short poolId = 0;
+    ConfigParser::ParseUShort(&poolId);
+    if (poolId < 1 || poolId > pools().size()) {
+        debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " <<
+               poolId << " not in 1 .. " << pools().size());
+        return 0;
     }
+    return poolId;
 }
 
index 0217150d1e169d729ab9d630bcc9dd6c66fd145e..5465780a878c3bb188e54ca40631426c97f2adca 100644 (file)
@@ -10,6 +10,7 @@
 #define SQUID_CLIENTDELAYCONFIG_H
 
 #include "acl/forward.h"
+#include "base/RefCount.h"
 
 #include <vector>
 
@@ -19,18 +20,35 @@ class ConfigParser;
 /// \ingroup DelayPoolsAPI
 
 /* represents one client write limiting delay 'pool' */
-class ClientDelayPool
+class ClientDelayPool : public RefCountable
 {
 public:
+    typedef RefCount<ClientDelayPool> Pointer;
+
     ClientDelayPool()
-        :   access(NULL), rate(0), highwatermark(0) {}
+        :   access(nullptr), rate(0), highwatermark(0) {}
+    ~ClientDelayPool();
+    ClientDelayPool(const ClientDelayPool &) = delete;
+    ClientDelayPool &operator=(const ClientDelayPool &) = delete;
+
     void dump (StoreEntry * entry, unsigned int poolNumberMinusOne) const;
     acl_access *access;
     int rate;
     int64_t highwatermark;
 };
 
-typedef std::vector<ClientDelayPool> ClientDelayPools;
+class ClientDelayPools
+{
+public:
+    ClientDelayPools(const ClientDelayPools &) = delete;
+    ClientDelayPools &operator=(const ClientDelayPools &) = delete;
+    static ClientDelayPools *Instance();
+
+    std::vector<ClientDelayPool::Pointer> pools;
+private:
+    ClientDelayPools() {}
+    ~ClientDelayPools();
+};
 
 /* represents configuration of client write limiting delay pools */
 class ClientDelayConfig
@@ -38,7 +56,10 @@ class ClientDelayConfig
 public:
     ClientDelayConfig()
         :   initial(50) {}
-    void freePoolCount();
+    ClientDelayConfig(const ClientDelayConfig &) = delete;
+    ClientDelayConfig &operator=(const ClientDelayConfig &) = delete;
+
+    void freePools();
     void dumpPoolCount(StoreEntry * entry, const char *name) const;
     /* parsing of client_delay_pools - number of pools */
     void parsePoolCount();
@@ -51,9 +72,11 @@ public:
 
     /* initial bucket level, how fill bucket at startup */
     unsigned short initial;
-    ClientDelayPools pools;
+
 private:
-    void clean();
+    unsigned short parsePoolId();
+    std::vector<ClientDelayPool::Pointer> &pools() { return ClientDelayPools::Instance()->pools; }
+    ClientDelayPool &pool(const int i) { return *(ClientDelayPools::Instance()->pools.at(i)); }
 };
 
 #endif // SQUID_CLIENTDELAYCONFIG_H
index 9bc03c62b46d3bd302d7a73414ee7844b3c87174..9458f52078b93e9695e45201728f10568b7523de 100644 (file)
@@ -9,6 +9,9 @@
 #ifndef SQUID__SRC_CLIENTINFO_H
 #define SQUID__SRC_CLIENTINFO_H
 
+#if USE_DELAY_POOLS
+#include "BandwidthBucket.h"
+#endif
 #include "base/ByteCounter.h"
 #include "cbdata.h"
 #include "enums.h"
 class CommQuotaQueue;
 #endif
 
-class ClientInfo
+class ClientInfo : public hash_link
+#if USE_DELAY_POOLS
+    , public BandwidthBucket
+#endif
 {
     MEMPROXY_CLASS(ClientInfo);
 
@@ -32,8 +38,6 @@ public:
     explicit ClientInfo(const Ip::Address &);
     ~ClientInfo();
 
-    hash_link hash;             /* must be first */
-
     Ip::Address addr;
 
     struct Protocol {
@@ -58,17 +62,12 @@ public:
     int n_established;          /* number of current established connections */
     time_t last_seen;
 #if USE_DELAY_POOLS
-    double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
-    double prevTime; ///< previous time when we checked
-    double bucketSize; ///< how much can be written now
-    double bucketSizeLimit;  ///< maximum bucket size
     bool writeLimitingActive; ///< Is write limiter active
     bool firstTimeConnection;///< is this first time connection for this client
 
     CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
     int rationedQuota; ///< precomputed quota preserving fairness among clients
     int rationedCount; ///< number of clients that will receive rationedQuota
-    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
     bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
 
     // all those functions access Comm fd_table and are defined in comm.cc
@@ -79,8 +78,13 @@ public:
     unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
     void quotaDequeue(); ///< pops queue head from queue
     void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
-    int quotaForDequed(); ///< allocate quota for a just dequeued client
-    void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+    /* BandwidthBucket API */
+    virtual int quota() override; ///< allocate quota for a just dequeued client
+    virtual bool applyQuota(int &nleft, Comm::IoCallback *state) override;
+    virtual void scheduleWrite(Comm::IoCallback *state) override;
+    virtual void onFdClosed() override;
+    virtual void reduceBucket(int len) override;
 
     void quotaDumpQueue(); ///< dumps quota queue for debugging
 
index cc4ae2c2c98eb3e263e2bd113eae8855bc8561c6..c4ae896eec90a3bfeb06c87efc250c6b325be1aa 100644 (file)
@@ -88,6 +88,8 @@ endif
 DIST_SUBDIRS += esi
 
 DELAY_POOL_ALL_SOURCE = \
+       BandwidthBucket.cc \
+       BandwidthBucket.h \
        CommonPool.h \
        CompositePoolNode.h \
        delay_pools.cc \
@@ -109,6 +111,10 @@ DELAY_POOL_ALL_SOURCE = \
        DelayUser.h \
        DelayVector.cc \
        DelayVector.h \
+       MessageBucket.cc \
+       MessageBucket.h \
+       MessageDelayPools.h \
+       MessageDelayPools.cc \
        NullDelayId.h \
        ClientDelayConfig.cc \
        ClientDelayConfig.h
diff --git a/src/MessageBucket.cc b/src/MessageBucket.cc
new file mode 100644 (file)
index 0000000..b296e57
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "comm/Connection.h"
+#include "DelayPools.h"
+#include "fde.h"
+#include "MessageBucket.h"
+
+MessageBucket::MessageBucket(const int speed, const int initialLevelPercent,
+                             const double sizeLimit, MessageDelayPool::Pointer pool) :
+    BandwidthBucket(speed, initialLevelPercent, sizeLimit),
+    theAggregate(pool) {}
+
+int
+MessageBucket::quota()
+{
+    refillBucket();
+    theAggregate->refillBucket();
+    if (theAggregate->noLimit())
+        return bucketLevel;
+    else if (noLimit())
+        return theAggregate->level();
+    else
+        return min(bucketLevel, static_cast<double>(theAggregate->level()));
+}
+
+void
+MessageBucket::reduceBucket(int len)
+{
+    BandwidthBucket::reduceBucket(len);
+    theAggregate->bytesIn(len);
+}
+
+void
+MessageBucket::scheduleWrite(Comm::IoCallback *state)
+{
+    fde *F = &fd_table[state->conn->fd];
+    if (!F->writeQuotaHandler->selectWaiting) {
+        F->writeQuotaHandler->selectWaiting = true;
+        // message delay pools limit this write; see checkTimeouts()
+        SetSelect(state->conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, state, 0);
+    }
+}
+
+#endif /* USE_DELAY_POOLS */
+
diff --git a/src/MessageBucket.h b/src/MessageBucket.h
new file mode 100644 (file)
index 0000000..b10129c
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEBUCKET_H
+#define MESSAGEBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "base/RefCount.h"
+#include "comm/forward.h"
+#include "MessageDelayPools.h"
+
+/// Limits Squid-to-client bandwidth for each matching response
+class MessageBucket : public RefCountable, public BandwidthBucket
+{
+    MEMPROXY_CLASS(MessageBucket);
+
+public:
+    typedef RefCount<MessageBucket> Pointer;
+
+    MessageBucket(const int speed, const int initialLevelPercent, const double sizeLimit, MessageDelayPool::Pointer pool);
+
+    /* BandwidthBucket API */
+    virtual int quota() override;
+    virtual void scheduleWrite(Comm::IoCallback *state) override;
+    virtual void reduceBucket(int len);
+
+private:
+    MessageDelayPool::Pointer theAggregate;
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+
diff --git a/src/MessageDelayPools.cc b/src/MessageDelayPools.cc
new file mode 100644 (file)
index 0000000..36d9640
--- /dev/null
@@ -0,0 +1,202 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "acl/Gadgets.h"
+#include "cache_cf.h"
+#include "ConfigParser.h"
+#include "DelaySpec.h"
+#include "event.h"
+#include "MessageBucket.h"
+#include "MessageDelayPools.h"
+#include "Parsing.h"
+#include "SquidTime.h"
+#include "Store.h"
+
+#include <algorithm>
+#include <map>
+
+MessageDelayPools::~MessageDelayPools()
+{
+    freePools();
+}
+
+MessageDelayPools *
+MessageDelayPools::Instance()
+{
+    static MessageDelayPools pools;
+    return &pools;
+}
+
+MessageDelayPool::Pointer
+MessageDelayPools::pool(const SBuf &name)
+{
+    auto it = std::find_if(pools.begin(), pools.end(),
+    [&name](const MessageDelayPool::Pointer p) { return p->poolName == name; });
+    return it == pools.end() ? 0 : *it;
+}
+
+void
+MessageDelayPools::add(MessageDelayPool *p)
+{
+    const auto it = std::find_if(pools.begin(), pools.end(),
+    [&p](const MessageDelayPool::Pointer mp) { return mp->poolName == p->poolName; });
+    if (it != pools.end()) {
+        debugs(3, DBG_CRITICAL, "WARNING: Ignoring duplicate " << p->poolName << " response delay pool");
+        return;
+    }
+    pools.push_back(p);
+}
+
+void
+MessageDelayPools::freePools()
+{
+    pools.clear();
+}
+
+MessageDelayPool::MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+                                   int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent):
+    access(0),
+    poolName(name),
+    individualRestore(bucketSpeed),
+    individualMaximum(bucketSize),
+    aggregateRestore(aggregateSpeed),
+    aggregateMaximum(aggregateSize),
+    initialBucketLevel(initialBucketPercent),
+    lastUpdate(squid_curtime)
+{
+    theBucket.level() = aggregateMaximum;
+}
+
+MessageDelayPool::~MessageDelayPool()
+{
+    if (access)
+        aclDestroyAccessList(&access);
+}
+
+void
+MessageDelayPool::refillBucket()
+{
+    if (noLimit())
+        return;
+    const int incr = squid_curtime - lastUpdate;
+    if (incr >= 1) {
+        lastUpdate = squid_curtime;
+        DelaySpec spec;
+        spec.restore_bps = aggregateRestore;
+        spec.max_bytes = aggregateMaximum;
+        theBucket.update(spec, incr);
+    }
+}
+
+void
+MessageDelayPool::dump(StoreEntry *entry) const
+{
+    SBuf name("response_delay_pool_access ");
+    name.append(poolName);
+    dump_acl_access(entry, name.c_str(), access);
+    storeAppendPrintf(entry, "response_delay_pool parameters %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %d\n",
+                      individualRestore, individualMaximum, aggregateRestore, aggregateMaximum, initialBucketLevel);
+    storeAppendPrintf(entry, "\n");
+}
+
+MessageBucket::Pointer
+MessageDelayPool::createBucket()
+{
+    return new MessageBucket(individualRestore, initialBucketLevel, individualMaximum, this);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPool()
+{
+    static const SBuf bucketSpeedLimit("individual-restore");
+    static const SBuf maxBucketSize("individual-maximum");
+    static const SBuf aggregateSpeedLimit("aggregate-restore");
+    static const SBuf maxAggregateSize("aggregate-maximum");
+    static const SBuf initialBucketPercent("initial-bucket-level");
+
+    static std::map<SBuf, int64_t> params;
+    params[bucketSpeedLimit] = -1;
+    params[maxBucketSize] = -1;
+    params[aggregateSpeedLimit] = -1;
+    params[maxAggregateSize] = -1;
+    params[initialBucketPercent] = 50;
+
+    const SBuf name(ConfigParser::NextToken());
+    if (name.isEmpty()) {
+        debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool missing required \"name\" parameter.");
+        self_destruct();
+    }
+
+    char *key = nullptr;
+    char *value = nullptr;
+    while (ConfigParser::NextKvPair(key, value)) {
+        if (!value) {
+            debugs(3, DBG_CRITICAL, "FATAL: '" << key << "' option missing value");
+            self_destruct();
+        }
+        auto it = params.find(SBuf(key));
+        if (it == params.end()) {
+            debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool unknown option '" << key << "'");
+            self_destruct();
+        }
+        it->second = (it->first == initialBucketPercent) ? xatos(value) : xatoll(value, 10);
+    }
+
+    const char *fatalMsg = nullptr;
+    if ((params[bucketSpeedLimit] < 0) != (params[maxBucketSize] < 0))
+        fatalMsg = "'individual-restore' and 'individual-maximum'";
+    else if ((params[aggregateSpeedLimit] < 0) != (params[maxAggregateSize] < 0))
+        fatalMsg = "'aggregate-restore' and 'aggregate-maximum'";
+
+    if (fatalMsg) {
+        debugs(3, DBG_CRITICAL, "FATAL: must use " << fatalMsg << " options in conjunction");
+        self_destruct();
+    }
+
+    MessageDelayPool *pool = new MessageDelayPool(name,
+            params[bucketSpeedLimit],
+            params[maxBucketSize],
+            params[aggregateSpeedLimit],
+            params[maxAggregateSize],
+            static_cast<uint16_t>(params[initialBucketPercent])
+                                                 );
+    MessageDelayPools::Instance()->add(pool);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPoolAccess() {
+    const char *token = ConfigParser::NextToken();
+    if (!token) {
+        debugs(3, DBG_CRITICAL, "ERROR: required pool_name option missing");
+        return;
+    }
+    MessageDelayPool::Pointer pool = MessageDelayPools::Instance()->pool(SBuf(token));
+    static ConfigParser parser;
+    if (pool)
+        aclParseAccessLine("response_delay_pool_access", parser, &pool->access);
+}
+
+void
+MessageDelayConfig::freePools()
+{
+    MessageDelayPools::Instance()->freePools();
+}
+
+void
+MessageDelayConfig::dumpResponseDelayPoolParameters(StoreEntry *entry, const char *name)
+{
+    auto &pools = MessageDelayPools::Instance()->pools;
+    for (auto pool: pools)
+        pool->dump(entry);
+}
+
+#endif
+
diff --git a/src/MessageDelayPools.h b/src/MessageDelayPools.h
new file mode 100644 (file)
index 0000000..73b499d
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEDELAYPOOLS_H
+#define MESSAGEDELAYPOOLS_H
+
+#if USE_DELAY_POOLS
+
+#include "acl/Acl.h"
+#include "base/RefCount.h"
+#include "DelayBucket.h"
+#include "DelayPools.h"
+
+class MessageBucket;
+typedef RefCount<MessageBucket> MessageBucketPointer;
+
+/// \ingroup DelayPoolsAPI
+/// Represents one 'response' delay pool, creates individual response
+/// buckets and performes aggregate limiting for them
+class MessageDelayPool : public RefCountable
+{
+public:
+    typedef RefCount<MessageDelayPool> Pointer;
+
+    MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+                     int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent);
+    ~MessageDelayPool();
+    MessageDelayPool(const MessageDelayPool &) = delete;
+    MessageDelayPool &operator=(const MessageDelayPool &) = delete;
+
+    /// Increases the aggregate bucket level with the aggregateRestore speed.
+    void refillBucket();
+    /// decreases the aggregate level
+    void bytesIn(int qty) { if (!noLimit()) theBucket.bytesIn(qty); }
+    /// current aggregate level
+    int level() { return theBucket.level(); }
+    /// creates an individual response bucket
+    MessageBucketPointer createBucket();
+    /// whether the aggregate bucket has no limit
+    bool noLimit () const { return aggregateRestore < 0; }
+
+    void dump (StoreEntry * entry) const;
+
+    acl_access *access;
+    /// the response delay pool name
+    SBuf poolName;
+    /// the speed limit of an individual bucket (bytes/s)
+    int64_t individualRestore;
+    /// the maximum size of an individual bucket
+    int64_t individualMaximum;
+    /// the speed limit of the aggregate bucket (bytes/s)
+    int64_t aggregateRestore;
+    /// the maximum size of the aggregate bucket
+    int64_t aggregateMaximum;
+    /// the initial bucket size as a percentage of individualMaximum
+    uint16_t initialBucketLevel;
+    /// the aggregate bucket
+    DelayBucket theBucket;
+
+private:
+    /// Time the aggregate bucket level was last refilled.
+    time_t lastUpdate;
+};
+
+/// \ingroup DelayPoolsAPI
+/// represents all configured 'response' delay pools
+class MessageDelayPools
+{
+public:
+    MessageDelayPools(const MessageDelayPools &) = delete;
+    MessageDelayPools &operator=(const MessageDelayPools &) = delete;
+
+    static MessageDelayPools *Instance();
+
+    /// returns a MessageDelayPool with a given name or null otherwise
+    MessageDelayPool::Pointer pool(const SBuf &name);
+    /// appends a single MessageDelayPool, created during configuration
+    void add(MessageDelayPool *pool);
+    /// memory cleanup, performing during reconfiguration
+    void freePools();
+
+    std::vector<MessageDelayPool::Pointer> pools;
+
+private:
+    MessageDelayPools() {}
+    ~MessageDelayPools();
+    void Stats() { } // TODO
+};
+
+/// represents configuration for response delay pools
+class MessageDelayConfig
+{
+public:
+    void parseResponseDelayPool();
+    void dumpResponseDelayPoolParameters(StoreEntry *e, const char *name);
+    void parseResponseDelayPoolAccess();
+    void freePools();
+};
+
+#define free_response_delay_pool_access(X)
+#define dump_response_delay_pool_access(X, Y, Z)
+
+inline void
+free_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+    cfg->freePools();
+}
+
+inline void
+dump_response_delay_pool_parameters(StoreEntry *entry, const char *name, MessageDelayConfig &cfg)
+{
+    cfg.dumpResponseDelayPoolParameters(entry, name);
+}
+
+inline void
+parse_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+    cfg->parseResponseDelayPool();
+}
+
+inline void
+parse_response_delay_pool_access(MessageDelayConfig * cfg)
+{
+    cfg->parseResponseDelayPoolAccess();
+}
+
+#endif
+#endif
+
index 8770018f4d9923c8e6e5c8a15a8a1c78f1cd345c..f42acdaeb87a71825325427c795cf27e499e6d7c 100644 (file)
 #include "acl/forward.h"
 #include "base/RefCount.h"
 #include "base/YesNoNone.h"
+#if USE_DELAY_POOLS
 #include "ClientDelayConfig.h"
 #include "DelayConfig.h"
+#endif
 #include "helper/ChildConfig.h"
 #include "HttpHeaderTools.h"
 #include "ip/Address.h"
+#if USE_DELAY_POOLS
+#include "MessageDelayPools.h"
+#endif
 #include "Notes.h"
 #include "security/forward.h"
 #include "SquidTime.h"
@@ -433,6 +438,7 @@ public:
 
     DelayConfig Delay;
     ClientDelayConfig ClientDelay;
+    MessageDelayConfig MessageDelay;
 #endif
 
     struct {
index a37a29ba97e7179ad4309831acef00a6d9686eda..ae3dccd5a35b846f970c76ef99630bee41efc5d7 100644 (file)
@@ -42,6 +42,7 @@
 #include "log/Config.h"
 #include "log/CustomLog.h"
 #include "MemBuf.h"
+#include "MessageDelayPools.h"
 #include "mgr/ActionPasswordList.h"
 #include "mgr/Registration.h"
 #include "neighbors.h"
@@ -1680,7 +1681,7 @@ parse_delay_pool_access(DelayConfig * cfg)
 static void
 free_client_delay_pool_count(ClientDelayConfig * cfg)
 {
-    cfg->freePoolCount();
+    cfg->freePools();
 }
 
 static void
index d11e7e2cd74baeb31b9280ead3b7f76d2804f3f5..fb9ea2f569a5996c19140f4993dd96ed0bc2963c 100644 (file)
@@ -33,6 +33,8 @@ delay_pool_rates      delay_class
 client_delay_pool_access       acl
 client_delay_pool_count
 client_delay_pool_rates
+response_delay_pool_access     acl
+response_delay_pool_parameters
 denyinfo               acl
 eol
 externalAclHelper      auth_param
index d9cc7f8c5de02031e27e6ad3d91275936e4375d7..d7a520aab8560678d50d8535c25555519d558dee 100644 (file)
@@ -7202,6 +7202,64 @@ DOC_START
        See also client_delay_parameters and client_delay_pools.
 DOC_END
 
+NAME: response_delay_pool
+TYPE: response_delay_pool_parameters
+DEFAULT: none
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+       This option configures client response bandwidth limits using the
+       following format:
+
+       response_delay_pool name [option=value] ...
+
+       name    the response delay pool name
+
+       available options:
+
+               individual-restore      The speed limit of an individual
+                                       bucket(bytes/s). To be used in conjunction
+                                       with 'individual-maximum'.
+
+               individual-maximum      The maximum number of bytes which can
+                                       be placed into the individual bucket. To be used
+                                       in conjunction with 'individual-restore'.
+
+               aggregate-restore       The speed limit for the aggregate
+                                       bucket(bytes/s). To be used in conjunction with
+                                       'aggregate-maximum'.
+
+               aggregate-maximum       The maximum number of bytes which can
+                                       be placed into the aggregate bucket. To be used
+                                       in conjunction with 'aggregate-restore'.
+
+               initial-bucket-level    The initial bucket size as a percentage
+                                       of individual-maximum.
+
+       Individual and(or) aggregate bucket options may not be specified,
+       meaning no individual and(or) aggregate speed limitation.
+       See also response_delay_pool_access and delay_parameters for
+       terminology details.
+DOC_END
+
+NAME: response_delay_pool_access
+TYPE: response_delay_pool_access
+DEFAULT: none
+DEFAULT_DOC: Deny use of the pool, unless allow rules exist in squid.conf for the pool.
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+       Determines whether a specific named response delay pool is used
+       for the transaction. The syntax for this directive is:
+
+       response_delay_pool_access pool_name allow|deny acl_name
+
+       All response_delay_pool_access options are checked in the order
+       they appear in this configuration file. The first rule with a
+       matching ACL wins. If (and only if) an "allow" rule won, Squid
+       assigns the response to the corresponding named delay pool.
+DOC_END
+
 COMMENT_START
  WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
  -----------------------------------------------------------------------------
index f528321d3bf56b19f6dc8b1d4935d6b71f40efeb..bde5da619ec216c4d9aefca7d4bbe8842ffe357e 100644 (file)
@@ -53,40 +53,31 @@ static int cleanup_removed;
 #endif
 
 ClientInfo::ClientInfo(const Ip::Address &ip) :
+#if USE_DELAY_POOLS
+    BandwidthBucket(0, 0, 0),
+#endif
     addr(ip),
     n_established(0),
     last_seen(0)
 #if USE_DELAY_POOLS
-    , writeSpeedLimit(0),
-    prevTime(0),
-    bucketSize(0),
-    bucketSizeLimit(0),
-    writeLimitingActive(false),
+    , writeLimitingActive(false),
     firstTimeConnection(true),
     quotaQueue(nullptr),
     rationedQuota(0),
     rationedCount(0),
-    selectWaiting(false),
     eventWaiting(false)
 #endif
 {
     debugs(77, 9, "ClientInfo constructed, this=" << static_cast<void*>(this));
-
-#if USE_DELAY_POOLS
-    getCurrentTime();
-    /* put current time to have something sensible here */
-    prevTime = current_dtime;
-#endif
-
     char *buf = static_cast<char*>(xmalloc(MAX_IPSTRLEN)); // becomes hash.key
-    hash.key = addr.toStr(buf,MAX_IPSTRLEN);
+    key = addr.toStr(buf,MAX_IPSTRLEN);
 }
 
 static ClientInfo *
 clientdbAdd(const Ip::Address &addr)
 {
     ClientInfo *c = new ClientInfo(addr);
-    hash_join(client_table, &c->hash);
+    hash_join(client_table, static_cast<hash_link*>(c));
     ++statCounter.client_http.clients;
 
     if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) {
@@ -277,7 +268,6 @@ void
 clientdbDump(StoreEntry * sentry)
 {
     const char *name;
-    ClientInfo *c;
     int icp_total = 0;
     int icp_hits = 0;
     int http_total = 0;
@@ -285,8 +275,9 @@ clientdbDump(StoreEntry * sentry)
     storeAppendPrintf(sentry, "Cache Clients:\n");
     hash_first(client_table);
 
-    while ((c = (ClientInfo *) hash_next(client_table))) {
-        storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(&c->hash));
+    while (hash_link *hash = hash_next(client_table)) {
+        const ClientInfo *c = reinterpret_cast<const ClientInfo *>(hash);
+        storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(hash));
         if ( (name = fqdncache_gethostbyaddr(c->addr, 0)) ) {
             storeAppendPrintf(sentry, "Name:    %s\n", name);
         }
@@ -344,7 +335,7 @@ clientdbFreeItem(void *data)
 
 ClientInfo::~ClientInfo()
 {
-    safe_free(hash.key);
+    safe_free(key);
 
 #if USE_DELAY_POOLS
     if (CommQuotaQueue *q = quotaQueue) {
@@ -399,7 +390,7 @@ clientdbGC(void *)
         if (age < 60)
             continue;
 
-        hash_remove_link(client_table, &c->hash);
+        hash_remove_link(client_table, static_cast<hash_link*>(c));
 
         clientdbFreeItem(c);
 
@@ -438,30 +429,22 @@ clientdbStartGC(void)
 Ip::Address *
 client_entry(Ip::Address *current)
 {
-    ClientInfo *c = NULL;
     char key[MAX_IPSTRLEN];
+    hash_first(client_table);
 
     if (current) {
         current->toStr(key,MAX_IPSTRLEN);
-        hash_first(client_table);
-        while ((c = (ClientInfo *) hash_next(client_table))) {
-            if (!strcmp(key, hashKeyStr(&c->hash)))
+        while (hash_link *hash = hash_next(client_table)) {
+            if (!strcmp(key, hashKeyStr(hash)))
                 break;
         }
-
-        c = (ClientInfo *) hash_next(client_table);
-    } else {
-        hash_first(client_table);
-        c = (ClientInfo *) hash_next(client_table);
     }
 
-    hash_last(client_table);
+    ClientInfo *c = reinterpret_cast<ClientInfo *>(hash_next(client_table));
 
-    if (c)
-        return (&c->addr);
-    else
-        return (NULL);
+    hash_last(client_table);
 
+    return c ? &c->addr : nullptr;
 }
 
 variable_list *
index 4a52cf4870ec4f4523ffff59050d55046642166e..19cff95024fefe1d294377b121745e76855f4d7c 100644 (file)
 #endif
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
+#include "MessageDelayPools.h"
 #endif
 #if USE_OPENSSL
 #include "ssl/bio.h"
@@ -2488,7 +2489,7 @@ ConnStateData::start()
     if (Config.onoff.client_db) {
         /* it was said several times that client write limiter does not work if client_db is disabled */
 
-        ClientDelayPools& pools(Config.ClientDelay.pools);
+        auto &pools = ClientDelayPools::Instance()->pools;
         ACLFilledChecklist ch(NULL, NULL, NULL);
 
         // TODO: we check early to limit error response bandwith but we
@@ -2500,8 +2501,8 @@ ConnStateData::start()
         for (unsigned int pool = 0; pool < pools.size(); ++pool) {
 
             /* pools require explicit 'allow' to assign a client into them */
-            if (pools[pool].access) {
-                ch.changeAcl(pools[pool].access);
+            if (pools[pool]->access) {
+                ch.changeAcl(pools[pool]->access);
                 allow_t answer = ch.fastCheck();
                 if (answer == ACCESS_ALLOWED) {
 
@@ -2515,8 +2516,8 @@ ConnStateData::start()
 
                     /* setup write limiter for this request */
                     const double burst = floor(0.5 +
-                                               (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
-                    cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+                                               (pools[pool]->highwatermark * Config.ClientDelay.initial)/100.0);
+                    cli->setWriteLimiter(pools[pool]->rate, burst, pools[pool]->highwatermark);
                     break;
                 } else {
                     debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer);
index 8b0c1dd03507c369ebdb1fe09eb18c03f51bb6d0..3876a87981609c87e4140940807d4502aa6edca3 100644 (file)
@@ -27,6 +27,9 @@
 #include "security/Handshake.h"
 #include "ssl/support.h"
 #endif
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
 
 class ClientHttpRequest;
 class HttpHdrRangeSpec;
index 83f781c6a6d07a38b68777a84f63cad6b1be6abf..10fa8f55c0c170bf8718954e80b62e61335afbfd 100644 (file)
@@ -901,12 +901,9 @@ _comm_close(int fd, char const *file, int line)
     }
 
 #if USE_DELAY_POOLS
-    if (ClientInfo *clientInfo = F->clientInfo) {
-        if (clientInfo->selectWaiting) {
-            clientInfo->selectWaiting = false;
-            // kick queue or it will get stuck as commWriteHandle is not called
-            clientInfo->kickQuotaQueue();
-        }
+    if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(F)) {
+        if (bucket->selectWaiting)
+            bucket->onFdClosed();
     }
 #endif
 
@@ -1334,7 +1331,7 @@ ClientInfo::kickQuotaQueue()
 {
     if (!eventWaiting && !selectWaiting && hasQueue()) {
         // wait at least a second if the bucket is empty
-        const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+        const double delay = (bucketLevel < 1.0) ? 1.0 : 0.0;
         eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
                  quotaQueue, delay, 0, true);
         eventWaiting = true;
@@ -1343,7 +1340,7 @@ ClientInfo::kickQuotaQueue()
 
 /// calculates how much to write for a single dequeued client
 int
-ClientInfo::quotaForDequed()
+ClientInfo::quota()
 {
     /* If we have multiple clients and give full bucketSize to each client then
      * clt1 may often get a lot more because clt1->clt2 time distance in the
@@ -1359,7 +1356,7 @@ ClientInfo::quotaForDequed()
 
         // Rounding errors do not accumulate here, but we round down to avoid
         // negative bucket sizes after write with rationedCount=1.
-        rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+        rationedQuota = static_cast<int>(floor(bucketLevel/rationedCount));
         debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
                '*' << rationedCount);
     }
@@ -1373,48 +1370,50 @@ ClientInfo::quotaForDequed()
     return rationedQuota;
 }
 
-///< adds bytes to the quota bucket based on the rate and passed time
+bool
+ClientInfo::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+    assert(hasQueue());
+    assert(quotaPeekFd() == state->conn->fd);
+    quotaDequeue(); // we will write or requeue below
+    if (nleft > 0 && !BandwidthBucket::applyQuota(nleft, state)) {
+        state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+        kickQuotaQueue();
+        return false;
+    }
+    return true;
+}
+
 void
-ClientInfo::refillBucket()
+ClientInfo::scheduleWrite(Comm::IoCallback *state)
 {
-    // all these times are in seconds, with double precision
-    const double currTime = current_dtime;
-    const double timePassed = currTime - prevTime;
-
-    // Calculate allowance for the time passed. Use double to avoid
-    // accumulating rounding errors for small intervals. For example, always
-    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
-    const double gain = timePassed * writeSpeedLimit;
-
-    debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
-           bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
-           " = " << gain << ')');
-
-    // to further combat error accumulation during micro updates,
-    // quit before updating time if we cannot add at least one byte
-    if (gain < 1.0)
-        return;
-
-    prevTime = currTime;
-
-    // for "first" connections, drain initial fat before refilling but keep
-    // updating prevTime to avoid bursts after the fat is gone
-    if (bucketSize > bucketSizeLimit) {
-        debugs(77,4, HERE << "not refilling while draining initial fat");
-        return;
+    if (writeLimitingActive) {
+        state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+        kickQuotaQueue();
     }
+}
 
-    bucketSize += gain;
+void
+ClientInfo::onFdClosed()
+{
+    BandwidthBucket::onFdClosed();
+    // kick queue or it will get stuck as commWriteHandle is not called
+    kickQuotaQueue();
+}
 
-    // obey quota limits
-    if (bucketSize > bucketSizeLimit)
-        bucketSize = bucketSizeLimit;
+void
+ClientInfo::reduceBucket(const int len)
+{
+    if (len > 0)
+        BandwidthBucket::reduceBucket(len);
+    // even if we wrote nothing, we were served; give others a chance
+    kickQuotaQueue();
 }
 
 void
 ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
 {
-    debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+    debugs(77,5, "Write limits for " << (const char*)key <<
            " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
            " highwatermark=" << aHighWatermark);
 
@@ -1431,7 +1430,7 @@ ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBu
         assert(!quotaQueue);
         quotaQueue = new CommQuotaQueue(this);
 
-        bucketSize = anInitialBurst;
+        bucketLevel = anInitialBurst;
         prevTime = current_dtime;
     }
 }
@@ -1451,7 +1450,7 @@ CommQuotaQueue::~CommQuotaQueue()
 unsigned int
 CommQuotaQueue::enqueue(int fd)
 {
-    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+    debugs(77,5, "clt" << (const char*)clientInfo->key <<
            ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
     fds.push_back(fd);
     return ++ins;
@@ -1462,13 +1461,13 @@ void
 CommQuotaQueue::dequeue()
 {
     assert(!fds.empty());
-    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+    debugs(77,5, "clt" << (const char*)clientInfo->key <<
            ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
            fds.size());
     fds.pop_front();
     ++outs;
 }
-#endif
+#endif /* USE_DELAY_POOLS */
 
 /*
  * hm, this might be too general-purpose for all the places we'd
@@ -1576,7 +1575,16 @@ checkTimeouts(void)
             debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
             Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
             COMMIO_FD_WRITECB(fd)->finish(Comm::COMM_ERROR, ETIMEDOUT);
-        } else if (AlreadyTimedOut(F))
+#if USE_DELAY_POOLS
+        } else if (F->writeQuotaHandler != nullptr && COMMIO_FD_WRITECB(fd)->conn != nullptr) {
+            if (!F->writeQuotaHandler->selectWaiting && F->writeQuotaHandler->quota() && !F->closing()) {
+                F->writeQuotaHandler->selectWaiting = true;
+                Comm::SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, COMMIO_FD_WRITECB(fd), 0);
+            }
+            continue;
+#endif
+        }
+        else if (AlreadyTimedOut(F))
             continue;
 
         debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
index 3ba2bd3ad68f3e22b534bf34c758f1c6a8529fc6..92666fc1e53f14beac20532ae528da585660635f 100644 (file)
@@ -69,13 +69,9 @@ void
 Comm::IoCallback::selectOrQueueWrite()
 {
 #if USE_DELAY_POOLS
-    // stand in line if there is one
-    if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) {
-        if (clientInfo->writeLimitingActive) {
-            quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd);
-            clientInfo->kickQuotaQueue();
-            return;
-        }
+    if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[conn->fd])) {
+        bucket->scheduleWrite(this);
+        return;
     }
 #endif
 
index 5d62bf2f8c36dbeecb19d4ecac50eb57b88753a2..37f3e06f82669d914f85b206c4727118b7a6a8c9 100644 (file)
@@ -24,9 +24,6 @@ namespace Comm
 /// Initialize the module on Squid startup
 void SelectLoopInit(void);
 
-/// Mark an FD to be watched for its IO status.
-void SetSelect(int, unsigned int, PF *, void *, time_t);
-
 /// reset/undo/unregister the watch for an FD which was set by Comm::SetSelect()
 void ResetSelect(int);
 
index 010ef8623798d326ad76295321c9473a63eebd29..b8cc11e60389dc6de0d27d49d13a9db477bb4aad 100644 (file)
@@ -7,8 +7,10 @@
  */
 
 #include "squid.h"
+#include "cbdata.h"
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
+#include "comm/Loops.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
@@ -59,7 +61,8 @@ Comm::HandleWrite(int fd, void *data)
     int len = 0;
     int nleft;
 
-    assert(state->conn != NULL && state->conn->fd == fd);
+    assert(state->conn != NULL);
+    assert(state->conn->fd == fd);
 
     PROF_start(commHandleWrite);
     debugs(5, 5, HERE << state->conn << ": off " <<
@@ -68,35 +71,13 @@ Comm::HandleWrite(int fd, void *data)
     nleft = state->size - state->offset;
 
 #if USE_DELAY_POOLS
-    ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
-    if (clientInfo && !clientInfo->writeLimitingActive)
-        clientInfo = NULL; // we only care about quota limits here
-
-    if (clientInfo) {
-        assert(clientInfo->selectWaiting);
-        clientInfo->selectWaiting = false;
-
-        assert(clientInfo->hasQueue());
-        assert(clientInfo->quotaPeekFd() == fd);
-        clientInfo->quotaDequeue(); // we will write or requeue below
-
-        if (nleft > 0) {
-            const int quota = clientInfo->quotaForDequed();
-            if (!quota) {  // if no write quota left, queue this fd
-                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-                clientInfo->kickQuotaQueue();
-                PROF_stop(commHandleWrite);
-                return;
-            }
-
-            const int nleft_corrected = min(nleft, quota);
-            if (nleft != nleft_corrected) {
-                debugs(5, 5, HERE << state->conn << " writes only " <<
-                       nleft_corrected << " out of " << nleft);
-                nleft = nleft_corrected;
-            }
-
+    BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[fd]);
+    if (bucket) {
+        assert(bucket->selectWaiting);
+        bucket->selectWaiting = false;
+        if (nleft > 0 && !bucket->applyQuota(nleft, state)) {
+            PROF_stop(commHandleWrite);
+            return;
         }
     }
 #endif /* USE_DELAY_POOLS */
@@ -108,18 +89,9 @@ Comm::HandleWrite(int fd, void *data)
     debugs(5, 5, HERE << "write() returns " << len);
 
 #if USE_DELAY_POOLS
-    if (clientInfo) {
-        if (len > 0) {
-            /* we wrote data - drain them from bucket */
-            clientInfo->bucketSize -= len;
-            if (clientInfo->bucketSize < 0.0) {
-                debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
-                clientInfo->bucketSize = 0;
-            }
-        }
-
-        // even if we wrote nothing, we were served; give others a chance
-        clientInfo->kickQuotaQueue();
+    if (bucket) {
+        /* we wrote data - drain them from bucket */
+        bucket->reduceBucket(len);
     }
 #endif /* USE_DELAY_POOLS */
 
index aa149d8d426ec1a630b6464a11f07e81a65c4913..ca3d9a02ff80941e7a3f22ea1164d1023697609f 100644 (file)
@@ -34,9 +34,6 @@ void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &
 /// Cancel the write pending on FD. No action if none pending.
 void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
 
-// callback handler to process an FD which is available for writing.
-extern PF HandleWrite;
-
 } // namespace Comm
 
 #endif /* _SQUID_COMM_IOWRITE_H */
index cb5553b49cf9b0474724a2415a81c9489be0de46..a0211860455b1e733d01894e158502c64e9d90d3 100644 (file)
 
 #include <vector>
 
+/// legacy CBDATA callback functions ABI definition for read or write I/O events
+/// \deprecated use CommCalls API instead where possible
+typedef void PF(int, void *);
+
 /// Abstraction layer for TCP, UDP, TLS, UDS and filedescriptor sockets.
 namespace Comm
 {
@@ -26,11 +30,13 @@ typedef std::vector<Comm::ConnectionPointer> ConnectionList;
 
 bool IsConnOpen(const Comm::ConnectionPointer &conn);
 
-}; // namespace Comm
+// callback handler to process an FD which is available for writing.
+PF HandleWrite;
 
-/// legacy CBDATA callback functions ABI definition for read or write I/O events
-/// \deprecated use CommCalls API instead where possible
-typedef void PF(int, void *);
+/// Mark an FD to be watched for its IO status.
+void SetSelect(int, unsigned int, PF *, void *, time_t);
+
+}; // namespace Comm
 
 #endif /* _SQUID_COMM_FORWARD_H */
 
index 89e978f36275b0c2197def11d187ee63b19f4623..f0040844d3cac2b8c36fb3ea3c786a33cc5472e2 100644 (file)
--- a/src/fde.h
+++ b/src/fde.h
@@ -17,6 +17,7 @@
 #include "typedefs.h" //DRCB, DWCB
 
 #if USE_DELAY_POOLS
+#include "MessageBucket.h"
 class ClientInfo;
 #endif
 class dwrite_q;
@@ -117,6 +118,7 @@ public:
 #if USE_DELAY_POOLS
     /// pointer to client info used in client write limiter or nullptr if not present
     ClientInfo * clientInfo = nullptr;
+    MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
 #endif
     unsigned epoll_state = 0;
 
index 859eb51fb690242968e782250d1042128011514f..3c13c868fa167d0ac87735fa3d1ac14cfa162d5e 100644 (file)
 #include "HttpHeaderTools.h"
 #include "Store.h"
 #include "TimeOrTag.h"
+#if USE_DELAY_POOLS
+#include "acl/FilledChecklist.h"
+#include "ClientInfo.h"
+#include "fde.h"
+#include "MessageDelayPools.h"
+#endif
 
 Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
     clientConnection(aConn),
@@ -283,6 +289,24 @@ Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
             mb->append(bodyData.data, length);
         }
     }
+#if USE_DELAY_POOLS
+    for (const auto &pool: MessageDelayPools::Instance()->pools) {
+        if (pool->access) {
+            std::unique_ptr<ACLFilledChecklist> chl(clientAclChecklistCreate(pool->access, http));
+            chl->reply = rep;
+            HTTPMSGLOCK(chl->reply);
+            const allow_t answer = chl->fastCheck();
+            if (answer == ACCESS_ALLOWED) {
+                writeQuotaHandler = pool->createBucket();
+                fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
+                break;
+            } else {
+                debugs(83, 4, "Response delay pool " << pool->poolName <<
+                       " skipped because ACL " << answer);
+            }
+        }
+    }
+#endif
 
     getConn()->write(mb);
     delete mb;
index 1d933e427ee0a1a1bd04719cf488b8064eb18597..585ed04e5dfd89905747fc2e6a707803dfbcac74 100644 (file)
@@ -12,6 +12,9 @@
 #include "http/forward.h"
 #include "mem/forward.h"
 #include "StoreIOBuffer.h"
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
 
 class clientStreamNode;
 class ClientHttpRequest;
@@ -161,6 +164,9 @@ private:
 
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
+#if USE_DELAY_POOLS
+    MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
+#endif
 };
 
 } // namespace Http