From: Eduard Bagdasaryan Date: Sun, 19 Feb 2017 17:13:27 +0000 (+1300) Subject: Add response delay pools feature for Squid-to-client speed limiting. X-Git-Tag: M-staged-PR71~259 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b27668ec6cfce0b56ca6d8ef4b887207f4ecc6b7;p=thirdparty%2Fsquid.git Add response delay pools feature for Squid-to-client speed limiting. The feature restricts Squid-to-client bandwidth only. It applies to both cache hits and misses. * Rationale * This may be useful for specific response(s) bandwidth limiting. There are situations when doing this is hardly possible (or impossible) by means of netfilter/iptables operating with TCP/IP packets and IP addresses information for filtering. In other words, sometimes it is problematic to 'extract' a single response from TCP/IP data flow at system level. For example, a single Squid-to-client TCP connection can transmit multiple responses (persistent connections, pipelining or HTTP/2 connection multiplexing) or be encrypted (HTTPS proxy mode). * Description * When Squid starts delivering the final HTTP response to a client, Squid checks response_delay_pool_access rules (supporting fast ACLs only), in the order they were declared. The first rule with a matching ACL wins. If (and only if) an "allow" rule won, Squid assigns the response to the corresponding named delay pool. If a response is assigned to a delay pool, the response becomes subject to the configured bucket and aggregate bandwidth limits of that pool, similar to the current "class 2" server-side delay pools, but with a brand new, dedicated "individual" filled bucket assigned to the matched response. The new feature serves the same purpose as the existing client-side pools: both features limit Squid-to-client bandwidth. Their common interface was placed into a new base BandwidthBucket class. The difference is that client-side pools do not aggregate clients and always use one bucket per client IP. It is possible that a response becomes a subject of both these pools. In such situations only matched response delay pool will be used for Squid-to-client speed limiting. * Limitations * The accurate SMP support (with the aggregate bucket shared among workers) is outside this patch scope. In SMP configurations, Squid should automatically divide the aggregate_speed_limit and max_aggregate_size values among the configured number of Squid workers. * Also: * Fixed ClientDelayConfig which did not perform cleanup on destruction, causing memory problems detected by Valgrind. It was not possible to fix this with minimal changes because of linker problems with SquidConfig while checking with test-builds.sh. So I had to refactor ClientDelayConfig module, separating configuration code (old ClientDelayConfig class) from configured data (a new ClientDelayPools class) and minimizing dependencies with SquidConfig. --- diff --git a/src/BandwidthBucket.cc b/src/BandwidthBucket.cc new file mode 100644 index 0000000000..d919a44415 --- /dev/null +++ b/src/BandwidthBucket.cc @@ -0,0 +1,107 @@ +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" + +#if USE_DELAY_POOLS + +#include "BandwidthBucket.h" +#include "ClientInfo.h" +#include "comm/Connection.h" +#include "Debug.h" +#include "fde.h" + +BandwidthBucket::BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit) : + bucketLevel( sizeLimit * (initialLevelPercent / 100.0)), + selectWaiting(false), + writeSpeedLimit(speed), + bucketSizeLimit(sizeLimit) +{ + getCurrentTime(); + /* put current time to have something sensible here */ + prevTime = current_dtime; +} + +void +BandwidthBucket::refillBucket() +{ + if (noLimit()) + return; + // all these times are in seconds, with double precision + const double currTime = current_dtime; + const double timePassed = currTime - prevTime; + + // Calculate allowance for the time passed. Use double to avoid + // accumulating rounding errors for small intervals. For example, always + // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error. + const double gain = timePassed * writeSpeedLimit; + + // to further combat error accumulation during micro updates, + // quit before updating time if we cannot add at least one byte + if (gain < 1.0) + return; + + prevTime = currTime; + + // for "first" connections, drain initial fat before refilling but keep + // updating prevTime to avoid bursts after the fat is gone + if (bucketLevel > bucketSizeLimit) { + debugs(77, 4, "not refilling while draining initial fat"); + return; + } + + bucketLevel += gain; + + // obey quota limits + if (bucketLevel > bucketSizeLimit) + bucketLevel = bucketSizeLimit; +} + +bool +BandwidthBucket::applyQuota(int &nleft, Comm::IoCallback *state) +{ + const int q = quota(); + if (!q) + return false; + else if (q < 0) + return true; + const int nleft_corrected = min(nleft, q); + if (nleft != nleft_corrected) { + debugs(77, 5, state->conn << " writes only " << + nleft_corrected << " out of " << nleft); + nleft = nleft_corrected; + } + return true; +} + +void +BandwidthBucket::reduceBucket(const int len) +{ + if (len <= 0 || noLimit()) + return; + bucketLevel -= len; + if (bucketLevel < 0.0) { + debugs(77, DBG_IMPORTANT, "drained too much"); // should not happen + bucketLevel = 0; + } +} + +BandwidthBucket * +BandwidthBucket::SelectBucket(fde *f) +{ + BandwidthBucket *bucket = f->writeQuotaHandler.getRaw(); + if (!bucket) { + ClientInfo *clientInfo = f->clientInfo; + if (clientInfo && clientInfo->writeLimitingActive) + bucket = clientInfo; + } + return bucket; +} + +#endif /* USE_DELAY_POOLS */ + diff --git a/src/BandwidthBucket.h b/src/BandwidthBucket.h new file mode 100644 index 0000000000..c6ba7f3042 --- /dev/null +++ b/src/BandwidthBucket.h @@ -0,0 +1,60 @@ +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef BANDWIDTHBUCKET_H +#define BANDWIDTHBUCKET_H + +#if USE_DELAY_POOLS + +#include "comm/IoCallback.h" + +class fde; + +/// Base class for Squid-to-client bandwidth limiting +class BandwidthBucket +{ +public: + BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit); + virtual ~BandwidthBucket() {} + + static BandwidthBucket *SelectBucket(fde *f); + + /// \returns the number of bytes this bucket allows to write, + /// also considering aggregates, if any. Negative quota means + /// no limitations by this bucket. + virtual int quota() = 0; + /// Adjusts nleft to not exceed the current bucket quota value, + /// if needed. + virtual bool applyQuota(int &nleft, Comm::IoCallback *state); + /// Will plan another write call. + virtual void scheduleWrite(Comm::IoCallback *state) = 0; + /// Performs cleanup when the related file descriptor becomes closed. + virtual void onFdClosed() { selectWaiting = false; } + /// Decreases the bucket level. + virtual void reduceBucket(const int len); + /// Whether this bucket will not do bandwidth limiting. + bool noLimit() const { return writeSpeedLimit < 0; } + +protected: + /// Increases the bucket level with the writeSpeedLimit speed. + void refillBucket(); + +public: + double bucketLevel; ///< how much can be written now + bool selectWaiting; ///< is between commSetSelect and commHandleWrite + +protected: + double prevTime; ///< previous time when we checked + double writeSpeedLimit; ///< Write speed limit in bytes per second. + double bucketSizeLimit; ///< maximum bucket size +}; + +#endif /* USE_DELAY_POOLS */ + +#endif + diff --git a/src/ClientDelayConfig.cc b/src/ClientDelayConfig.cc index 1f20dfddd5..07e0a067af 100644 --- a/src/ClientDelayConfig.cc +++ b/src/ClientDelayConfig.cc @@ -14,6 +14,12 @@ #include "Parsing.h" #include "Store.h" +ClientDelayPool::~ClientDelayPool() +{ + if (access) + aclDestroyAccessList(&access); +} + void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) const { LOCAL_ARRAY(char, nom, 32); @@ -23,81 +29,85 @@ void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) storeAppendPrintf(entry, "\n"); } +ClientDelayPools * +ClientDelayPools::Instance() +{ + static ClientDelayPools pools; + return &pools; +} + +ClientDelayPools::~ClientDelayPools() +{ + pools.clear(); +} + void ClientDelayConfig::finalize() { - for (unsigned int i = 0; i < pools.size(); ++i) { + for (unsigned int i = 0; i < pools().size(); ++i) { /* pools require explicit 'allow' to assign a client into them */ - if (!pools[i].access) { - debugs(77, DBG_IMPORTANT, "client_delay_pool #" << (i+1) << + if (!pool(i).access) { + debugs(77, DBG_IMPORTANT, "WARNING: client_delay_pool #" << (i+1) << " has no client_delay_access configured. " << "No client will ever use it."); } } } -void ClientDelayConfig::freePoolCount() +void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const { - pools.clear(); + const auto &pools_ = ClientDelayPools::Instance()->pools; + if (pools_.size()) { + storeAppendPrintf(entry, "%s %d\n", name, static_cast(pools_.size())); + for (unsigned int i = 0; i < pools_.size(); ++i) + pools_[i]->dump(entry, i); + } } -void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const +void +ClientDelayConfig::freePools() { - if (pools.size()) { - storeAppendPrintf(entry, "%s %d\n", name, (int)pools.size()); - for (unsigned int i = 0; i < pools.size(); ++i) - pools[i].dump(entry, i); - } + pools().clear(); } void ClientDelayConfig::parsePoolCount() { - if (pools.size()) { - debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, aborting all previous client_delay_pools config"); - clean(); + if (pools().size()) { + debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, " << + "aborting all previous client_delay_pools config"); + freePools(); } unsigned short pools_; ConfigParser::ParseUShort(&pools_); - for (int i = 0; i < pools_; ++i) { - pools.push_back(ClientDelayPool()); - } + for (int i = 0; i < pools_; ++i) + pools().push_back(new ClientDelayPool()); } void ClientDelayConfig::parsePoolRates() { - unsigned short pool; - ConfigParser::ParseUShort(&pool); - - if (pool < 1 || pool > pools.size()) { - debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size()); - return; + if (unsigned short poolId = parsePoolId()) { + --poolId; + pool(poolId).rate = GetInteger(); + pool(poolId).highwatermark = GetInteger64(); } - - --pool; - - pools[pool].rate = GetInteger(); - pools[pool].highwatermark = GetInteger64(); } void ClientDelayConfig::parsePoolAccess(ConfigParser &parser) { - unsigned short pool; - - ConfigParser::ParseUShort(&pool); - - if (pool < 1 || pool > pools.size()) { - debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size()); - return; - } - - --pool; - aclParseAccessLine("client_delay_access", parser, &pools[pool].access); + if (const unsigned short poolId = parsePoolId()) + aclParseAccessLine("client_delay_access", parser, &(pool(poolId-1).access)); } -void ClientDelayConfig::clean() +unsigned short +ClientDelayConfig::parsePoolId() { - for (unsigned int i = 0; i < pools.size(); ++i) { - aclDestroyAccessList(&pools[i].access); + unsigned short poolId = 0; + ConfigParser::ParseUShort(&poolId); + if (poolId < 1 || poolId > pools().size()) { + debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << + poolId << " not in 1 .. " << pools().size()); + return 0; } + return poolId; } diff --git a/src/ClientDelayConfig.h b/src/ClientDelayConfig.h index 0217150d1e..5465780a87 100644 --- a/src/ClientDelayConfig.h +++ b/src/ClientDelayConfig.h @@ -10,6 +10,7 @@ #define SQUID_CLIENTDELAYCONFIG_H #include "acl/forward.h" +#include "base/RefCount.h" #include @@ -19,18 +20,35 @@ class ConfigParser; /// \ingroup DelayPoolsAPI /* represents one client write limiting delay 'pool' */ -class ClientDelayPool +class ClientDelayPool : public RefCountable { public: + typedef RefCount Pointer; + ClientDelayPool() - : access(NULL), rate(0), highwatermark(0) {} + : access(nullptr), rate(0), highwatermark(0) {} + ~ClientDelayPool(); + ClientDelayPool(const ClientDelayPool &) = delete; + ClientDelayPool &operator=(const ClientDelayPool &) = delete; + void dump (StoreEntry * entry, unsigned int poolNumberMinusOne) const; acl_access *access; int rate; int64_t highwatermark; }; -typedef std::vector ClientDelayPools; +class ClientDelayPools +{ +public: + ClientDelayPools(const ClientDelayPools &) = delete; + ClientDelayPools &operator=(const ClientDelayPools &) = delete; + static ClientDelayPools *Instance(); + + std::vector pools; +private: + ClientDelayPools() {} + ~ClientDelayPools(); +}; /* represents configuration of client write limiting delay pools */ class ClientDelayConfig @@ -38,7 +56,10 @@ class ClientDelayConfig public: ClientDelayConfig() : initial(50) {} - void freePoolCount(); + ClientDelayConfig(const ClientDelayConfig &) = delete; + ClientDelayConfig &operator=(const ClientDelayConfig &) = delete; + + void freePools(); void dumpPoolCount(StoreEntry * entry, const char *name) const; /* parsing of client_delay_pools - number of pools */ void parsePoolCount(); @@ -51,9 +72,11 @@ public: /* initial bucket level, how fill bucket at startup */ unsigned short initial; - ClientDelayPools pools; + private: - void clean(); + unsigned short parsePoolId(); + std::vector &pools() { return ClientDelayPools::Instance()->pools; } + ClientDelayPool &pool(const int i) { return *(ClientDelayPools::Instance()->pools.at(i)); } }; #endif // SQUID_CLIENTDELAYCONFIG_H diff --git a/src/ClientInfo.h b/src/ClientInfo.h index 9bc03c62b4..9458f52078 100644 --- a/src/ClientInfo.h +++ b/src/ClientInfo.h @@ -9,6 +9,9 @@ #ifndef SQUID__SRC_CLIENTINFO_H #define SQUID__SRC_CLIENTINFO_H +#if USE_DELAY_POOLS +#include "BandwidthBucket.h" +#endif #include "base/ByteCounter.h" #include "cbdata.h" #include "enums.h" @@ -24,7 +27,10 @@ class CommQuotaQueue; #endif -class ClientInfo +class ClientInfo : public hash_link +#if USE_DELAY_POOLS + , public BandwidthBucket +#endif { MEMPROXY_CLASS(ClientInfo); @@ -32,8 +38,6 @@ public: explicit ClientInfo(const Ip::Address &); ~ClientInfo(); - hash_link hash; /* must be first */ - Ip::Address addr; struct Protocol { @@ -58,17 +62,12 @@ public: int n_established; /* number of current established connections */ time_t last_seen; #if USE_DELAY_POOLS - double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client - double prevTime; ///< previous time when we checked - double bucketSize; ///< how much can be written now - double bucketSizeLimit; ///< maximum bucket size bool writeLimitingActive; ///< Is write limiter active bool firstTimeConnection;///< is this first time connection for this client CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota int rationedQuota; ///< precomputed quota preserving fairness among clients int rationedCount; ///< number of clients that will receive rationedQuota - bool selectWaiting; ///< is between commSetSelect and commHandleWrite bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire // all those functions access Comm fd_table and are defined in comm.cc @@ -79,8 +78,13 @@ public: unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop void quotaDequeue(); ///< pops queue head from queue void kickQuotaQueue(); ///< schedule commHandleWriteHelper call - int quotaForDequed(); ///< allocate quota for a just dequeued client - void refillBucket(); ///< adds bytes to bucket based on rate and time + + /* BandwidthBucket API */ + virtual int quota() override; ///< allocate quota for a just dequeued client + virtual bool applyQuota(int &nleft, Comm::IoCallback *state) override; + virtual void scheduleWrite(Comm::IoCallback *state) override; + virtual void onFdClosed() override; + virtual void reduceBucket(int len) override; void quotaDumpQueue(); ///< dumps quota queue for debugging diff --git a/src/Makefile.am b/src/Makefile.am index cc4ae2c2c9..c4ae896eec 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -88,6 +88,8 @@ endif DIST_SUBDIRS += esi DELAY_POOL_ALL_SOURCE = \ + BandwidthBucket.cc \ + BandwidthBucket.h \ CommonPool.h \ CompositePoolNode.h \ delay_pools.cc \ @@ -109,6 +111,10 @@ DELAY_POOL_ALL_SOURCE = \ DelayUser.h \ DelayVector.cc \ DelayVector.h \ + MessageBucket.cc \ + MessageBucket.h \ + MessageDelayPools.h \ + MessageDelayPools.cc \ NullDelayId.h \ ClientDelayConfig.cc \ ClientDelayConfig.h diff --git a/src/MessageBucket.cc b/src/MessageBucket.cc new file mode 100644 index 0000000000..b296e57598 --- /dev/null +++ b/src/MessageBucket.cc @@ -0,0 +1,54 @@ +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" + +#if USE_DELAY_POOLS +#include "comm/Connection.h" +#include "DelayPools.h" +#include "fde.h" +#include "MessageBucket.h" + +MessageBucket::MessageBucket(const int speed, const int initialLevelPercent, + const double sizeLimit, MessageDelayPool::Pointer pool) : + BandwidthBucket(speed, initialLevelPercent, sizeLimit), + theAggregate(pool) {} + +int +MessageBucket::quota() +{ + refillBucket(); + theAggregate->refillBucket(); + if (theAggregate->noLimit()) + return bucketLevel; + else if (noLimit()) + return theAggregate->level(); + else + return min(bucketLevel, static_cast(theAggregate->level())); +} + +void +MessageBucket::reduceBucket(int len) +{ + BandwidthBucket::reduceBucket(len); + theAggregate->bytesIn(len); +} + +void +MessageBucket::scheduleWrite(Comm::IoCallback *state) +{ + fde *F = &fd_table[state->conn->fd]; + if (!F->writeQuotaHandler->selectWaiting) { + F->writeQuotaHandler->selectWaiting = true; + // message delay pools limit this write; see checkTimeouts() + SetSelect(state->conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, state, 0); + } +} + +#endif /* USE_DELAY_POOLS */ + diff --git a/src/MessageBucket.h b/src/MessageBucket.h new file mode 100644 index 0000000000..b10129ccfe --- /dev/null +++ b/src/MessageBucket.h @@ -0,0 +1,41 @@ +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef MESSAGEBUCKET_H +#define MESSAGEBUCKET_H + +#if USE_DELAY_POOLS + +#include "BandwidthBucket.h" +#include "base/RefCount.h" +#include "comm/forward.h" +#include "MessageDelayPools.h" + +/// Limits Squid-to-client bandwidth for each matching response +class MessageBucket : public RefCountable, public BandwidthBucket +{ + MEMPROXY_CLASS(MessageBucket); + +public: + typedef RefCount Pointer; + + MessageBucket(const int speed, const int initialLevelPercent, const double sizeLimit, MessageDelayPool::Pointer pool); + + /* BandwidthBucket API */ + virtual int quota() override; + virtual void scheduleWrite(Comm::IoCallback *state) override; + virtual void reduceBucket(int len); + +private: + MessageDelayPool::Pointer theAggregate; +}; + +#endif /* USE_DELAY_POOLS */ + +#endif + diff --git a/src/MessageDelayPools.cc b/src/MessageDelayPools.cc new file mode 100644 index 0000000000..36d9640b7c --- /dev/null +++ b/src/MessageDelayPools.cc @@ -0,0 +1,202 @@ +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" + +#if USE_DELAY_POOLS +#include "acl/Gadgets.h" +#include "cache_cf.h" +#include "ConfigParser.h" +#include "DelaySpec.h" +#include "event.h" +#include "MessageBucket.h" +#include "MessageDelayPools.h" +#include "Parsing.h" +#include "SquidTime.h" +#include "Store.h" + +#include +#include + +MessageDelayPools::~MessageDelayPools() +{ + freePools(); +} + +MessageDelayPools * +MessageDelayPools::Instance() +{ + static MessageDelayPools pools; + return &pools; +} + +MessageDelayPool::Pointer +MessageDelayPools::pool(const SBuf &name) +{ + auto it = std::find_if(pools.begin(), pools.end(), + [&name](const MessageDelayPool::Pointer p) { return p->poolName == name; }); + return it == pools.end() ? 0 : *it; +} + +void +MessageDelayPools::add(MessageDelayPool *p) +{ + const auto it = std::find_if(pools.begin(), pools.end(), + [&p](const MessageDelayPool::Pointer mp) { return mp->poolName == p->poolName; }); + if (it != pools.end()) { + debugs(3, DBG_CRITICAL, "WARNING: Ignoring duplicate " << p->poolName << " response delay pool"); + return; + } + pools.push_back(p); +} + +void +MessageDelayPools::freePools() +{ + pools.clear(); +} + +MessageDelayPool::MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize, + int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent): + access(0), + poolName(name), + individualRestore(bucketSpeed), + individualMaximum(bucketSize), + aggregateRestore(aggregateSpeed), + aggregateMaximum(aggregateSize), + initialBucketLevel(initialBucketPercent), + lastUpdate(squid_curtime) +{ + theBucket.level() = aggregateMaximum; +} + +MessageDelayPool::~MessageDelayPool() +{ + if (access) + aclDestroyAccessList(&access); +} + +void +MessageDelayPool::refillBucket() +{ + if (noLimit()) + return; + const int incr = squid_curtime - lastUpdate; + if (incr >= 1) { + lastUpdate = squid_curtime; + DelaySpec spec; + spec.restore_bps = aggregateRestore; + spec.max_bytes = aggregateMaximum; + theBucket.update(spec, incr); + } +} + +void +MessageDelayPool::dump(StoreEntry *entry) const +{ + SBuf name("response_delay_pool_access "); + name.append(poolName); + dump_acl_access(entry, name.c_str(), access); + storeAppendPrintf(entry, "response_delay_pool parameters %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %d\n", + individualRestore, individualMaximum, aggregateRestore, aggregateMaximum, initialBucketLevel); + storeAppendPrintf(entry, "\n"); +} + +MessageBucket::Pointer +MessageDelayPool::createBucket() +{ + return new MessageBucket(individualRestore, initialBucketLevel, individualMaximum, this); +} + +void +MessageDelayConfig::parseResponseDelayPool() +{ + static const SBuf bucketSpeedLimit("individual-restore"); + static const SBuf maxBucketSize("individual-maximum"); + static const SBuf aggregateSpeedLimit("aggregate-restore"); + static const SBuf maxAggregateSize("aggregate-maximum"); + static const SBuf initialBucketPercent("initial-bucket-level"); + + static std::map params; + params[bucketSpeedLimit] = -1; + params[maxBucketSize] = -1; + params[aggregateSpeedLimit] = -1; + params[maxAggregateSize] = -1; + params[initialBucketPercent] = 50; + + const SBuf name(ConfigParser::NextToken()); + if (name.isEmpty()) { + debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool missing required \"name\" parameter."); + self_destruct(); + } + + char *key = nullptr; + char *value = nullptr; + while (ConfigParser::NextKvPair(key, value)) { + if (!value) { + debugs(3, DBG_CRITICAL, "FATAL: '" << key << "' option missing value"); + self_destruct(); + } + auto it = params.find(SBuf(key)); + if (it == params.end()) { + debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool unknown option '" << key << "'"); + self_destruct(); + } + it->second = (it->first == initialBucketPercent) ? xatos(value) : xatoll(value, 10); + } + + const char *fatalMsg = nullptr; + if ((params[bucketSpeedLimit] < 0) != (params[maxBucketSize] < 0)) + fatalMsg = "'individual-restore' and 'individual-maximum'"; + else if ((params[aggregateSpeedLimit] < 0) != (params[maxAggregateSize] < 0)) + fatalMsg = "'aggregate-restore' and 'aggregate-maximum'"; + + if (fatalMsg) { + debugs(3, DBG_CRITICAL, "FATAL: must use " << fatalMsg << " options in conjunction"); + self_destruct(); + } + + MessageDelayPool *pool = new MessageDelayPool(name, + params[bucketSpeedLimit], + params[maxBucketSize], + params[aggregateSpeedLimit], + params[maxAggregateSize], + static_cast(params[initialBucketPercent]) + ); + MessageDelayPools::Instance()->add(pool); +} + +void +MessageDelayConfig::parseResponseDelayPoolAccess() { + const char *token = ConfigParser::NextToken(); + if (!token) { + debugs(3, DBG_CRITICAL, "ERROR: required pool_name option missing"); + return; + } + MessageDelayPool::Pointer pool = MessageDelayPools::Instance()->pool(SBuf(token)); + static ConfigParser parser; + if (pool) + aclParseAccessLine("response_delay_pool_access", parser, &pool->access); +} + +void +MessageDelayConfig::freePools() +{ + MessageDelayPools::Instance()->freePools(); +} + +void +MessageDelayConfig::dumpResponseDelayPoolParameters(StoreEntry *entry, const char *name) +{ + auto &pools = MessageDelayPools::Instance()->pools; + for (auto pool: pools) + pool->dump(entry); +} + +#endif + diff --git a/src/MessageDelayPools.h b/src/MessageDelayPools.h new file mode 100644 index 0000000000..73b499d023 --- /dev/null +++ b/src/MessageDelayPools.h @@ -0,0 +1,134 @@ +/* + * Copyright (C) 1996-2017 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef MESSAGEDELAYPOOLS_H +#define MESSAGEDELAYPOOLS_H + +#if USE_DELAY_POOLS + +#include "acl/Acl.h" +#include "base/RefCount.h" +#include "DelayBucket.h" +#include "DelayPools.h" + +class MessageBucket; +typedef RefCount MessageBucketPointer; + +/// \ingroup DelayPoolsAPI +/// Represents one 'response' delay pool, creates individual response +/// buckets and performes aggregate limiting for them +class MessageDelayPool : public RefCountable +{ +public: + typedef RefCount Pointer; + + MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize, + int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent); + ~MessageDelayPool(); + MessageDelayPool(const MessageDelayPool &) = delete; + MessageDelayPool &operator=(const MessageDelayPool &) = delete; + + /// Increases the aggregate bucket level with the aggregateRestore speed. + void refillBucket(); + /// decreases the aggregate level + void bytesIn(int qty) { if (!noLimit()) theBucket.bytesIn(qty); } + /// current aggregate level + int level() { return theBucket.level(); } + /// creates an individual response bucket + MessageBucketPointer createBucket(); + /// whether the aggregate bucket has no limit + bool noLimit () const { return aggregateRestore < 0; } + + void dump (StoreEntry * entry) const; + + acl_access *access; + /// the response delay pool name + SBuf poolName; + /// the speed limit of an individual bucket (bytes/s) + int64_t individualRestore; + /// the maximum size of an individual bucket + int64_t individualMaximum; + /// the speed limit of the aggregate bucket (bytes/s) + int64_t aggregateRestore; + /// the maximum size of the aggregate bucket + int64_t aggregateMaximum; + /// the initial bucket size as a percentage of individualMaximum + uint16_t initialBucketLevel; + /// the aggregate bucket + DelayBucket theBucket; + +private: + /// Time the aggregate bucket level was last refilled. + time_t lastUpdate; +}; + +/// \ingroup DelayPoolsAPI +/// represents all configured 'response' delay pools +class MessageDelayPools +{ +public: + MessageDelayPools(const MessageDelayPools &) = delete; + MessageDelayPools &operator=(const MessageDelayPools &) = delete; + + static MessageDelayPools *Instance(); + + /// returns a MessageDelayPool with a given name or null otherwise + MessageDelayPool::Pointer pool(const SBuf &name); + /// appends a single MessageDelayPool, created during configuration + void add(MessageDelayPool *pool); + /// memory cleanup, performing during reconfiguration + void freePools(); + + std::vector pools; + +private: + MessageDelayPools() {} + ~MessageDelayPools(); + void Stats() { } // TODO +}; + +/// represents configuration for response delay pools +class MessageDelayConfig +{ +public: + void parseResponseDelayPool(); + void dumpResponseDelayPoolParameters(StoreEntry *e, const char *name); + void parseResponseDelayPoolAccess(); + void freePools(); +}; + +#define free_response_delay_pool_access(X) +#define dump_response_delay_pool_access(X, Y, Z) + +inline void +free_response_delay_pool_parameters(MessageDelayConfig * cfg) +{ + cfg->freePools(); +} + +inline void +dump_response_delay_pool_parameters(StoreEntry *entry, const char *name, MessageDelayConfig &cfg) +{ + cfg.dumpResponseDelayPoolParameters(entry, name); +} + +inline void +parse_response_delay_pool_parameters(MessageDelayConfig * cfg) +{ + cfg->parseResponseDelayPool(); +} + +inline void +parse_response_delay_pool_access(MessageDelayConfig * cfg) +{ + cfg->parseResponseDelayPoolAccess(); +} + +#endif +#endif + diff --git a/src/SquidConfig.h b/src/SquidConfig.h index 8770018f4d..f42acdaeb8 100644 --- a/src/SquidConfig.h +++ b/src/SquidConfig.h @@ -12,11 +12,16 @@ #include "acl/forward.h" #include "base/RefCount.h" #include "base/YesNoNone.h" +#if USE_DELAY_POOLS #include "ClientDelayConfig.h" #include "DelayConfig.h" +#endif #include "helper/ChildConfig.h" #include "HttpHeaderTools.h" #include "ip/Address.h" +#if USE_DELAY_POOLS +#include "MessageDelayPools.h" +#endif #include "Notes.h" #include "security/forward.h" #include "SquidTime.h" @@ -433,6 +438,7 @@ public: DelayConfig Delay; ClientDelayConfig ClientDelay; + MessageDelayConfig MessageDelay; #endif struct { diff --git a/src/cache_cf.cc b/src/cache_cf.cc index a37a29ba97..ae3dccd5a3 100644 --- a/src/cache_cf.cc +++ b/src/cache_cf.cc @@ -42,6 +42,7 @@ #include "log/Config.h" #include "log/CustomLog.h" #include "MemBuf.h" +#include "MessageDelayPools.h" #include "mgr/ActionPasswordList.h" #include "mgr/Registration.h" #include "neighbors.h" @@ -1680,7 +1681,7 @@ parse_delay_pool_access(DelayConfig * cfg) static void free_client_delay_pool_count(ClientDelayConfig * cfg) { - cfg->freePoolCount(); + cfg->freePools(); } static void diff --git a/src/cf.data.depend b/src/cf.data.depend index d11e7e2cd7..fb9ea2f569 100644 --- a/src/cf.data.depend +++ b/src/cf.data.depend @@ -33,6 +33,8 @@ delay_pool_rates delay_class client_delay_pool_access acl client_delay_pool_count client_delay_pool_rates +response_delay_pool_access acl +response_delay_pool_parameters denyinfo acl eol externalAclHelper auth_param diff --git a/src/cf.data.pre b/src/cf.data.pre index d9cc7f8c5d..d7a520aab8 100644 --- a/src/cf.data.pre +++ b/src/cf.data.pre @@ -7202,6 +7202,64 @@ DOC_START See also client_delay_parameters and client_delay_pools. DOC_END +NAME: response_delay_pool +TYPE: response_delay_pool_parameters +DEFAULT: none +IFDEF: USE_DELAY_POOLS +LOC: Config.MessageDelay +DOC_START + This option configures client response bandwidth limits using the + following format: + + response_delay_pool name [option=value] ... + + name the response delay pool name + + available options: + + individual-restore The speed limit of an individual + bucket(bytes/s). To be used in conjunction + with 'individual-maximum'. + + individual-maximum The maximum number of bytes which can + be placed into the individual bucket. To be used + in conjunction with 'individual-restore'. + + aggregate-restore The speed limit for the aggregate + bucket(bytes/s). To be used in conjunction with + 'aggregate-maximum'. + + aggregate-maximum The maximum number of bytes which can + be placed into the aggregate bucket. To be used + in conjunction with 'aggregate-restore'. + + initial-bucket-level The initial bucket size as a percentage + of individual-maximum. + + Individual and(or) aggregate bucket options may not be specified, + meaning no individual and(or) aggregate speed limitation. + See also response_delay_pool_access and delay_parameters for + terminology details. +DOC_END + +NAME: response_delay_pool_access +TYPE: response_delay_pool_access +DEFAULT: none +DEFAULT_DOC: Deny use of the pool, unless allow rules exist in squid.conf for the pool. +IFDEF: USE_DELAY_POOLS +LOC: Config.MessageDelay +DOC_START + Determines whether a specific named response delay pool is used + for the transaction. The syntax for this directive is: + + response_delay_pool_access pool_name allow|deny acl_name + + All response_delay_pool_access options are checked in the order + they appear in this configuration file. The first rule with a + matching ACL wins. If (and only if) an "allow" rule won, Squid + assigns the response to the corresponding named delay pool. +DOC_END + COMMENT_START WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS ----------------------------------------------------------------------------- diff --git a/src/client_db.cc b/src/client_db.cc index f528321d3b..bde5da619e 100644 --- a/src/client_db.cc +++ b/src/client_db.cc @@ -53,40 +53,31 @@ static int cleanup_removed; #endif ClientInfo::ClientInfo(const Ip::Address &ip) : +#if USE_DELAY_POOLS + BandwidthBucket(0, 0, 0), +#endif addr(ip), n_established(0), last_seen(0) #if USE_DELAY_POOLS - , writeSpeedLimit(0), - prevTime(0), - bucketSize(0), - bucketSizeLimit(0), - writeLimitingActive(false), + , writeLimitingActive(false), firstTimeConnection(true), quotaQueue(nullptr), rationedQuota(0), rationedCount(0), - selectWaiting(false), eventWaiting(false) #endif { debugs(77, 9, "ClientInfo constructed, this=" << static_cast(this)); - -#if USE_DELAY_POOLS - getCurrentTime(); - /* put current time to have something sensible here */ - prevTime = current_dtime; -#endif - char *buf = static_cast(xmalloc(MAX_IPSTRLEN)); // becomes hash.key - hash.key = addr.toStr(buf,MAX_IPSTRLEN); + key = addr.toStr(buf,MAX_IPSTRLEN); } static ClientInfo * clientdbAdd(const Ip::Address &addr) { ClientInfo *c = new ClientInfo(addr); - hash_join(client_table, &c->hash); + hash_join(client_table, static_cast(c)); ++statCounter.client_http.clients; if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) { @@ -277,7 +268,6 @@ void clientdbDump(StoreEntry * sentry) { const char *name; - ClientInfo *c; int icp_total = 0; int icp_hits = 0; int http_total = 0; @@ -285,8 +275,9 @@ clientdbDump(StoreEntry * sentry) storeAppendPrintf(sentry, "Cache Clients:\n"); hash_first(client_table); - while ((c = (ClientInfo *) hash_next(client_table))) { - storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(&c->hash)); + while (hash_link *hash = hash_next(client_table)) { + const ClientInfo *c = reinterpret_cast(hash); + storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(hash)); if ( (name = fqdncache_gethostbyaddr(c->addr, 0)) ) { storeAppendPrintf(sentry, "Name: %s\n", name); } @@ -344,7 +335,7 @@ clientdbFreeItem(void *data) ClientInfo::~ClientInfo() { - safe_free(hash.key); + safe_free(key); #if USE_DELAY_POOLS if (CommQuotaQueue *q = quotaQueue) { @@ -399,7 +390,7 @@ clientdbGC(void *) if (age < 60) continue; - hash_remove_link(client_table, &c->hash); + hash_remove_link(client_table, static_cast(c)); clientdbFreeItem(c); @@ -438,30 +429,22 @@ clientdbStartGC(void) Ip::Address * client_entry(Ip::Address *current) { - ClientInfo *c = NULL; char key[MAX_IPSTRLEN]; + hash_first(client_table); if (current) { current->toStr(key,MAX_IPSTRLEN); - hash_first(client_table); - while ((c = (ClientInfo *) hash_next(client_table))) { - if (!strcmp(key, hashKeyStr(&c->hash))) + while (hash_link *hash = hash_next(client_table)) { + if (!strcmp(key, hashKeyStr(hash))) break; } - - c = (ClientInfo *) hash_next(client_table); - } else { - hash_first(client_table); - c = (ClientInfo *) hash_next(client_table); } - hash_last(client_table); + ClientInfo *c = reinterpret_cast(hash_next(client_table)); - if (c) - return (&c->addr); - else - return (NULL); + hash_last(client_table); + return c ? &c->addr : nullptr; } variable_list * diff --git a/src/client_side.cc b/src/client_side.cc index 4a52cf4870..19cff95024 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -120,6 +120,7 @@ #endif #if USE_DELAY_POOLS #include "ClientInfo.h" +#include "MessageDelayPools.h" #endif #if USE_OPENSSL #include "ssl/bio.h" @@ -2488,7 +2489,7 @@ ConnStateData::start() if (Config.onoff.client_db) { /* it was said several times that client write limiter does not work if client_db is disabled */ - ClientDelayPools& pools(Config.ClientDelay.pools); + auto &pools = ClientDelayPools::Instance()->pools; ACLFilledChecklist ch(NULL, NULL, NULL); // TODO: we check early to limit error response bandwith but we @@ -2500,8 +2501,8 @@ ConnStateData::start() for (unsigned int pool = 0; pool < pools.size(); ++pool) { /* pools require explicit 'allow' to assign a client into them */ - if (pools[pool].access) { - ch.changeAcl(pools[pool].access); + if (pools[pool]->access) { + ch.changeAcl(pools[pool]->access); allow_t answer = ch.fastCheck(); if (answer == ACCESS_ALLOWED) { @@ -2515,8 +2516,8 @@ ConnStateData::start() /* setup write limiter for this request */ const double burst = floor(0.5 + - (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0); - cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark); + (pools[pool]->highwatermark * Config.ClientDelay.initial)/100.0); + cli->setWriteLimiter(pools[pool]->rate, burst, pools[pool]->highwatermark); break; } else { debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer); diff --git a/src/client_side.h b/src/client_side.h index 8b0c1dd035..3876a87981 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -27,6 +27,9 @@ #include "security/Handshake.h" #include "ssl/support.h" #endif +#if USE_DELAY_POOLS +#include "MessageBucket.h" +#endif class ClientHttpRequest; class HttpHdrRangeSpec; diff --git a/src/comm.cc b/src/comm.cc index 83f781c6a6..10fa8f55c0 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -901,12 +901,9 @@ _comm_close(int fd, char const *file, int line) } #if USE_DELAY_POOLS - if (ClientInfo *clientInfo = F->clientInfo) { - if (clientInfo->selectWaiting) { - clientInfo->selectWaiting = false; - // kick queue or it will get stuck as commWriteHandle is not called - clientInfo->kickQuotaQueue(); - } + if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(F)) { + if (bucket->selectWaiting) + bucket->onFdClosed(); } #endif @@ -1334,7 +1331,7 @@ ClientInfo::kickQuotaQueue() { if (!eventWaiting && !selectWaiting && hasQueue()) { // wait at least a second if the bucket is empty - const double delay = (bucketSize < 1.0) ? 1.0 : 0.0; + const double delay = (bucketLevel < 1.0) ? 1.0 : 0.0; eventAdd("commHandleWriteHelper", &commHandleWriteHelper, quotaQueue, delay, 0, true); eventWaiting = true; @@ -1343,7 +1340,7 @@ ClientInfo::kickQuotaQueue() /// calculates how much to write for a single dequeued client int -ClientInfo::quotaForDequed() +ClientInfo::quota() { /* If we have multiple clients and give full bucketSize to each client then * clt1 may often get a lot more because clt1->clt2 time distance in the @@ -1359,7 +1356,7 @@ ClientInfo::quotaForDequed() // Rounding errors do not accumulate here, but we round down to avoid // negative bucket sizes after write with rationedCount=1. - rationedQuota = static_cast(floor(bucketSize/rationedCount)); + rationedQuota = static_cast(floor(bucketLevel/rationedCount)); debugs(77,5, HERE << "new rationedQuota: " << rationedQuota << '*' << rationedCount); } @@ -1373,48 +1370,50 @@ ClientInfo::quotaForDequed() return rationedQuota; } -///< adds bytes to the quota bucket based on the rate and passed time +bool +ClientInfo::applyQuota(int &nleft, Comm::IoCallback *state) +{ + assert(hasQueue()); + assert(quotaPeekFd() == state->conn->fd); + quotaDequeue(); // we will write or requeue below + if (nleft > 0 && !BandwidthBucket::applyQuota(nleft, state)) { + state->quotaQueueReserv = quotaEnqueue(state->conn->fd); + kickQuotaQueue(); + return false; + } + return true; +} + void -ClientInfo::refillBucket() +ClientInfo::scheduleWrite(Comm::IoCallback *state) { - // all these times are in seconds, with double precision - const double currTime = current_dtime; - const double timePassed = currTime - prevTime; - - // Calculate allowance for the time passed. Use double to avoid - // accumulating rounding errors for small intervals. For example, always - // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error. - const double gain = timePassed * writeSpeedLimit; - - debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " << - bucketSize << " + (" << timePassed << " * " << writeSpeedLimit << - " = " << gain << ')'); - - // to further combat error accumulation during micro updates, - // quit before updating time if we cannot add at least one byte - if (gain < 1.0) - return; - - prevTime = currTime; - - // for "first" connections, drain initial fat before refilling but keep - // updating prevTime to avoid bursts after the fat is gone - if (bucketSize > bucketSizeLimit) { - debugs(77,4, HERE << "not refilling while draining initial fat"); - return; + if (writeLimitingActive) { + state->quotaQueueReserv = quotaEnqueue(state->conn->fd); + kickQuotaQueue(); } +} - bucketSize += gain; +void +ClientInfo::onFdClosed() +{ + BandwidthBucket::onFdClosed(); + // kick queue or it will get stuck as commWriteHandle is not called + kickQuotaQueue(); +} - // obey quota limits - if (bucketSize > bucketSizeLimit) - bucketSize = bucketSizeLimit; +void +ClientInfo::reduceBucket(const int len) +{ + if (len > 0) + BandwidthBucket::reduceBucket(len); + // even if we wrote nothing, we were served; give others a chance + kickQuotaQueue(); } void ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark) { - debugs(77,5, HERE << "Write limits for " << (const char*)hash.key << + debugs(77,5, "Write limits for " << (const char*)key << " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst << " highwatermark=" << aHighWatermark); @@ -1431,7 +1430,7 @@ ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBu assert(!quotaQueue); quotaQueue = new CommQuotaQueue(this); - bucketSize = anInitialBurst; + bucketLevel = anInitialBurst; prevTime = current_dtime; } } @@ -1451,7 +1450,7 @@ CommQuotaQueue::~CommQuotaQueue() unsigned int CommQuotaQueue::enqueue(int fd) { - debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key << + debugs(77,5, "clt" << (const char*)clientInfo->key << ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size()); fds.push_back(fd); return ++ins; @@ -1462,13 +1461,13 @@ void CommQuotaQueue::dequeue() { assert(!fds.empty()); - debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key << + debugs(77,5, "clt" << (const char*)clientInfo->key << ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' << fds.size()); fds.pop_front(); ++outs; } -#endif +#endif /* USE_DELAY_POOLS */ /* * hm, this might be too general-purpose for all the places we'd @@ -1576,7 +1575,16 @@ checkTimeouts(void) debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout"); Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); COMMIO_FD_WRITECB(fd)->finish(Comm::COMM_ERROR, ETIMEDOUT); - } else if (AlreadyTimedOut(F)) +#if USE_DELAY_POOLS + } else if (F->writeQuotaHandler != nullptr && COMMIO_FD_WRITECB(fd)->conn != nullptr) { + if (!F->writeQuotaHandler->selectWaiting && F->writeQuotaHandler->quota() && !F->closing()) { + F->writeQuotaHandler->selectWaiting = true; + Comm::SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, COMMIO_FD_WRITECB(fd), 0); + } + continue; +#endif + } + else if (AlreadyTimedOut(F)) continue; debugs(5, 5, "checkTimeouts: FD " << fd << " Expired"); diff --git a/src/comm/IoCallback.cc b/src/comm/IoCallback.cc index 3ba2bd3ad6..92666fc1e5 100644 --- a/src/comm/IoCallback.cc +++ b/src/comm/IoCallback.cc @@ -69,13 +69,9 @@ void Comm::IoCallback::selectOrQueueWrite() { #if USE_DELAY_POOLS - // stand in line if there is one - if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) { - if (clientInfo->writeLimitingActive) { - quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd); - clientInfo->kickQuotaQueue(); - return; - } + if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[conn->fd])) { + bucket->scheduleWrite(this); + return; } #endif diff --git a/src/comm/Loops.h b/src/comm/Loops.h index 5d62bf2f8c..37f3e06f82 100644 --- a/src/comm/Loops.h +++ b/src/comm/Loops.h @@ -24,9 +24,6 @@ namespace Comm /// Initialize the module on Squid startup void SelectLoopInit(void); -/// Mark an FD to be watched for its IO status. -void SetSelect(int, unsigned int, PF *, void *, time_t); - /// reset/undo/unregister the watch for an FD which was set by Comm::SetSelect() void ResetSelect(int); diff --git a/src/comm/Write.cc b/src/comm/Write.cc index 010ef86237..b8cc11e603 100644 --- a/src/comm/Write.cc +++ b/src/comm/Write.cc @@ -7,8 +7,10 @@ */ #include "squid.h" +#include "cbdata.h" #include "comm/Connection.h" #include "comm/IoCallback.h" +#include "comm/Loops.h" #include "comm/Write.h" #include "fd.h" #include "fde.h" @@ -59,7 +61,8 @@ Comm::HandleWrite(int fd, void *data) int len = 0; int nleft; - assert(state->conn != NULL && state->conn->fd == fd); + assert(state->conn != NULL); + assert(state->conn->fd == fd); PROF_start(commHandleWrite); debugs(5, 5, HERE << state->conn << ": off " << @@ -68,35 +71,13 @@ Comm::HandleWrite(int fd, void *data) nleft = state->size - state->offset; #if USE_DELAY_POOLS - ClientInfo * clientInfo=fd_table[fd].clientInfo; - - if (clientInfo && !clientInfo->writeLimitingActive) - clientInfo = NULL; // we only care about quota limits here - - if (clientInfo) { - assert(clientInfo->selectWaiting); - clientInfo->selectWaiting = false; - - assert(clientInfo->hasQueue()); - assert(clientInfo->quotaPeekFd() == fd); - clientInfo->quotaDequeue(); // we will write or requeue below - - if (nleft > 0) { - const int quota = clientInfo->quotaForDequed(); - if (!quota) { // if no write quota left, queue this fd - state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); - clientInfo->kickQuotaQueue(); - PROF_stop(commHandleWrite); - return; - } - - const int nleft_corrected = min(nleft, quota); - if (nleft != nleft_corrected) { - debugs(5, 5, HERE << state->conn << " writes only " << - nleft_corrected << " out of " << nleft); - nleft = nleft_corrected; - } - + BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[fd]); + if (bucket) { + assert(bucket->selectWaiting); + bucket->selectWaiting = false; + if (nleft > 0 && !bucket->applyQuota(nleft, state)) { + PROF_stop(commHandleWrite); + return; } } #endif /* USE_DELAY_POOLS */ @@ -108,18 +89,9 @@ Comm::HandleWrite(int fd, void *data) debugs(5, 5, HERE << "write() returns " << len); #if USE_DELAY_POOLS - if (clientInfo) { - if (len > 0) { - /* we wrote data - drain them from bucket */ - clientInfo->bucketSize -= len; - if (clientInfo->bucketSize < 0.0) { - debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen - clientInfo->bucketSize = 0; - } - } - - // even if we wrote nothing, we were served; give others a chance - clientInfo->kickQuotaQueue(); + if (bucket) { + /* we wrote data - drain them from bucket */ + bucket->reduceBucket(len); } #endif /* USE_DELAY_POOLS */ diff --git a/src/comm/Write.h b/src/comm/Write.h index aa149d8d42..ca3d9a02ff 100644 --- a/src/comm/Write.h +++ b/src/comm/Write.h @@ -34,9 +34,6 @@ void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer & /// Cancel the write pending on FD. No action if none pending. void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason); -// callback handler to process an FD which is available for writing. -extern PF HandleWrite; - } // namespace Comm #endif /* _SQUID_COMM_IOWRITE_H */ diff --git a/src/comm/forward.h b/src/comm/forward.h index cb5553b49c..a021186045 100644 --- a/src/comm/forward.h +++ b/src/comm/forward.h @@ -13,6 +13,10 @@ #include +/// legacy CBDATA callback functions ABI definition for read or write I/O events +/// \deprecated use CommCalls API instead where possible +typedef void PF(int, void *); + /// Abstraction layer for TCP, UDP, TLS, UDS and filedescriptor sockets. namespace Comm { @@ -26,11 +30,13 @@ typedef std::vector ConnectionList; bool IsConnOpen(const Comm::ConnectionPointer &conn); -}; // namespace Comm +// callback handler to process an FD which is available for writing. +PF HandleWrite; -/// legacy CBDATA callback functions ABI definition for read or write I/O events -/// \deprecated use CommCalls API instead where possible -typedef void PF(int, void *); +/// Mark an FD to be watched for its IO status. +void SetSelect(int, unsigned int, PF *, void *, time_t); + +}; // namespace Comm #endif /* _SQUID_COMM_FORWARD_H */ diff --git a/src/fde.h b/src/fde.h index 89e978f362..f0040844d3 100644 --- a/src/fde.h +++ b/src/fde.h @@ -17,6 +17,7 @@ #include "typedefs.h" //DRCB, DWCB #if USE_DELAY_POOLS +#include "MessageBucket.h" class ClientInfo; #endif class dwrite_q; @@ -117,6 +118,7 @@ public: #if USE_DELAY_POOLS /// pointer to client info used in client write limiter or nullptr if not present ClientInfo * clientInfo = nullptr; + MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured #endif unsigned epoll_state = 0; diff --git a/src/http/Stream.cc b/src/http/Stream.cc index 859eb51fb6..3c13c868fa 100644 --- a/src/http/Stream.cc +++ b/src/http/Stream.cc @@ -13,6 +13,12 @@ #include "HttpHeaderTools.h" #include "Store.h" #include "TimeOrTag.h" +#if USE_DELAY_POOLS +#include "acl/FilledChecklist.h" +#include "ClientInfo.h" +#include "fde.h" +#include "MessageDelayPools.h" +#endif Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) : clientConnection(aConn), @@ -283,6 +289,24 @@ Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData) mb->append(bodyData.data, length); } } +#if USE_DELAY_POOLS + for (const auto &pool: MessageDelayPools::Instance()->pools) { + if (pool->access) { + std::unique_ptr chl(clientAclChecklistCreate(pool->access, http)); + chl->reply = rep; + HTTPMSGLOCK(chl->reply); + const allow_t answer = chl->fastCheck(); + if (answer == ACCESS_ALLOWED) { + writeQuotaHandler = pool->createBucket(); + fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler; + break; + } else { + debugs(83, 4, "Response delay pool " << pool->poolName << + " skipped because ACL " << answer); + } + } + } +#endif getConn()->write(mb); delete mb; diff --git a/src/http/Stream.h b/src/http/Stream.h index 1d933e427e..585ed04e5d 100644 --- a/src/http/Stream.h +++ b/src/http/Stream.h @@ -12,6 +12,9 @@ #include "http/forward.h" #include "mem/forward.h" #include "StoreIOBuffer.h" +#if USE_DELAY_POOLS +#include "MessageBucket.h" +#endif class clientStreamNode; class ClientHttpRequest; @@ -161,6 +164,9 @@ private: bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */ bool connRegistered_; +#if USE_DELAY_POOLS + MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured +#endif }; } // namespace Http