--- /dev/null
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "ClientInfo.h"
+#include "comm/Connection.h"
+#include "Debug.h"
+#include "fde.h"
+
+BandwidthBucket::BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit) :
+ bucketLevel( sizeLimit * (initialLevelPercent / 100.0)),
+ selectWaiting(false),
+ writeSpeedLimit(speed),
+ bucketSizeLimit(sizeLimit)
+{
+ getCurrentTime();
+ /* put current time to have something sensible here */
+ prevTime = current_dtime;
+}
+
+void
+BandwidthBucket::refillBucket()
+{
+ if (noLimit())
+ return;
+ // all these times are in seconds, with double precision
+ const double currTime = current_dtime;
+ const double timePassed = currTime - prevTime;
+
+ // Calculate allowance for the time passed. Use double to avoid
+ // accumulating rounding errors for small intervals. For example, always
+ // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+ const double gain = timePassed * writeSpeedLimit;
+
+ // to further combat error accumulation during micro updates,
+ // quit before updating time if we cannot add at least one byte
+ if (gain < 1.0)
+ return;
+
+ prevTime = currTime;
+
+ // for "first" connections, drain initial fat before refilling but keep
+ // updating prevTime to avoid bursts after the fat is gone
+ if (bucketLevel > bucketSizeLimit) {
+ debugs(77, 4, "not refilling while draining initial fat");
+ return;
+ }
+
+ bucketLevel += gain;
+
+ // obey quota limits
+ if (bucketLevel > bucketSizeLimit)
+ bucketLevel = bucketSizeLimit;
+}
+
+bool
+BandwidthBucket::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+ const int q = quota();
+ if (!q)
+ return false;
+ else if (q < 0)
+ return true;
+ const int nleft_corrected = min(nleft, q);
+ if (nleft != nleft_corrected) {
+ debugs(77, 5, state->conn << " writes only " <<
+ nleft_corrected << " out of " << nleft);
+ nleft = nleft_corrected;
+ }
+ return true;
+}
+
+void
+BandwidthBucket::reduceBucket(const int len)
+{
+ if (len <= 0 || noLimit())
+ return;
+ bucketLevel -= len;
+ if (bucketLevel < 0.0) {
+ debugs(77, DBG_IMPORTANT, "drained too much"); // should not happen
+ bucketLevel = 0;
+ }
+}
+
+BandwidthBucket *
+BandwidthBucket::SelectBucket(fde *f)
+{
+ BandwidthBucket *bucket = f->writeQuotaHandler.getRaw();
+ if (!bucket) {
+ ClientInfo *clientInfo = f->clientInfo;
+ if (clientInfo && clientInfo->writeLimitingActive)
+ bucket = clientInfo;
+ }
+ return bucket;
+}
+
+#endif /* USE_DELAY_POOLS */
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef BANDWIDTHBUCKET_H
+#define BANDWIDTHBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "comm/IoCallback.h"
+
+class fde;
+
+/// Base class for Squid-to-client bandwidth limiting
+class BandwidthBucket
+{
+public:
+ BandwidthBucket(const int speed, const int initialLevelPercent, const double sizeLimit);
+ virtual ~BandwidthBucket() {}
+
+ static BandwidthBucket *SelectBucket(fde *f);
+
+ /// \returns the number of bytes this bucket allows to write,
+ /// also considering aggregates, if any. Negative quota means
+ /// no limitations by this bucket.
+ virtual int quota() = 0;
+ /// Adjusts nleft to not exceed the current bucket quota value,
+ /// if needed.
+ virtual bool applyQuota(int &nleft, Comm::IoCallback *state);
+ /// Will plan another write call.
+ virtual void scheduleWrite(Comm::IoCallback *state) = 0;
+ /// Performs cleanup when the related file descriptor becomes closed.
+ virtual void onFdClosed() { selectWaiting = false; }
+ /// Decreases the bucket level.
+ virtual void reduceBucket(const int len);
+ /// Whether this bucket will not do bandwidth limiting.
+ bool noLimit() const { return writeSpeedLimit < 0; }
+
+protected:
+ /// Increases the bucket level with the writeSpeedLimit speed.
+ void refillBucket();
+
+public:
+ double bucketLevel; ///< how much can be written now
+ bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+
+protected:
+ double prevTime; ///< previous time when we checked
+ double writeSpeedLimit; ///< Write speed limit in bytes per second.
+ double bucketSizeLimit; ///< maximum bucket size
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+
#include "Parsing.h"
#include "Store.h"
+ClientDelayPool::~ClientDelayPool()
+{
+ if (access)
+ aclDestroyAccessList(&access);
+}
+
void ClientDelayPool::dump(StoreEntry * entry, unsigned int poolNumberMinusOne) const
{
LOCAL_ARRAY(char, nom, 32);
storeAppendPrintf(entry, "\n");
}
+ClientDelayPools *
+ClientDelayPools::Instance()
+{
+ static ClientDelayPools pools;
+ return &pools;
+}
+
+ClientDelayPools::~ClientDelayPools()
+{
+ pools.clear();
+}
+
void
ClientDelayConfig::finalize()
{
- for (unsigned int i = 0; i < pools.size(); ++i) {
+ for (unsigned int i = 0; i < pools().size(); ++i) {
/* pools require explicit 'allow' to assign a client into them */
- if (!pools[i].access) {
- debugs(77, DBG_IMPORTANT, "client_delay_pool #" << (i+1) <<
+ if (!pool(i).access) {
+ debugs(77, DBG_IMPORTANT, "WARNING: client_delay_pool #" << (i+1) <<
" has no client_delay_access configured. " <<
"No client will ever use it.");
}
}
}
-void ClientDelayConfig::freePoolCount()
+void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
{
- pools.clear();
+ const auto &pools_ = ClientDelayPools::Instance()->pools;
+ if (pools_.size()) {
+ storeAppendPrintf(entry, "%s %d\n", name, static_cast<int>(pools_.size()));
+ for (unsigned int i = 0; i < pools_.size(); ++i)
+ pools_[i]->dump(entry, i);
+ }
}
-void ClientDelayConfig::dumpPoolCount(StoreEntry * entry, const char *name) const
+void
+ClientDelayConfig::freePools()
{
- if (pools.size()) {
- storeAppendPrintf(entry, "%s %d\n", name, (int)pools.size());
- for (unsigned int i = 0; i < pools.size(); ++i)
- pools[i].dump(entry, i);
- }
+ pools().clear();
}
void ClientDelayConfig::parsePoolCount()
{
- if (pools.size()) {
- debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, aborting all previous client_delay_pools config");
- clean();
+ if (pools().size()) {
+ debugs(3, DBG_CRITICAL, "parse_client_delay_pool_count: multiple client_delay_pools lines, " <<
+ "aborting all previous client_delay_pools config");
+ freePools();
}
unsigned short pools_;
ConfigParser::ParseUShort(&pools_);
- for (int i = 0; i < pools_; ++i) {
- pools.push_back(ClientDelayPool());
- }
+ for (int i = 0; i < pools_; ++i)
+ pools().push_back(new ClientDelayPool());
}
void ClientDelayConfig::parsePoolRates()
{
- unsigned short pool;
- ConfigParser::ParseUShort(&pool);
-
- if (pool < 1 || pool > pools.size()) {
- debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
- return;
+ if (unsigned short poolId = parsePoolId()) {
+ --poolId;
+ pool(poolId).rate = GetInteger();
+ pool(poolId).highwatermark = GetInteger64();
}
-
- --pool;
-
- pools[pool].rate = GetInteger();
- pools[pool].highwatermark = GetInteger64();
}
void ClientDelayConfig::parsePoolAccess(ConfigParser &parser)
{
- unsigned short pool;
-
- ConfigParser::ParseUShort(&pool);
-
- if (pool < 1 || pool > pools.size()) {
- debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " << pool << " not in 1 .. " << pools.size());
- return;
- }
-
- --pool;
- aclParseAccessLine("client_delay_access", parser, &pools[pool].access);
+ if (const unsigned short poolId = parsePoolId())
+ aclParseAccessLine("client_delay_access", parser, &(pool(poolId-1).access));
}
-void ClientDelayConfig::clean()
+unsigned short
+ClientDelayConfig::parsePoolId()
{
- for (unsigned int i = 0; i < pools.size(); ++i) {
- aclDestroyAccessList(&pools[i].access);
+ unsigned short poolId = 0;
+ ConfigParser::ParseUShort(&poolId);
+ if (poolId < 1 || poolId > pools().size()) {
+ debugs(3, DBG_CRITICAL, "parse_client_delay_pool_rates: Ignoring pool " <<
+ poolId << " not in 1 .. " << pools().size());
+ return 0;
}
+ return poolId;
}
#define SQUID_CLIENTDELAYCONFIG_H
#include "acl/forward.h"
+#include "base/RefCount.h"
#include <vector>
/// \ingroup DelayPoolsAPI
/* represents one client write limiting delay 'pool' */
-class ClientDelayPool
+class ClientDelayPool : public RefCountable
{
public:
+ typedef RefCount<ClientDelayPool> Pointer;
+
ClientDelayPool()
- : access(NULL), rate(0), highwatermark(0) {}
+ : access(nullptr), rate(0), highwatermark(0) {}
+ ~ClientDelayPool();
+ ClientDelayPool(const ClientDelayPool &) = delete;
+ ClientDelayPool &operator=(const ClientDelayPool &) = delete;
+
void dump (StoreEntry * entry, unsigned int poolNumberMinusOne) const;
acl_access *access;
int rate;
int64_t highwatermark;
};
-typedef std::vector<ClientDelayPool> ClientDelayPools;
+class ClientDelayPools
+{
+public:
+ ClientDelayPools(const ClientDelayPools &) = delete;
+ ClientDelayPools &operator=(const ClientDelayPools &) = delete;
+ static ClientDelayPools *Instance();
+
+ std::vector<ClientDelayPool::Pointer> pools;
+private:
+ ClientDelayPools() {}
+ ~ClientDelayPools();
+};
/* represents configuration of client write limiting delay pools */
class ClientDelayConfig
public:
ClientDelayConfig()
: initial(50) {}
- void freePoolCount();
+ ClientDelayConfig(const ClientDelayConfig &) = delete;
+ ClientDelayConfig &operator=(const ClientDelayConfig &) = delete;
+
+ void freePools();
void dumpPoolCount(StoreEntry * entry, const char *name) const;
/* parsing of client_delay_pools - number of pools */
void parsePoolCount();
/* initial bucket level, how fill bucket at startup */
unsigned short initial;
- ClientDelayPools pools;
+
private:
- void clean();
+ unsigned short parsePoolId();
+ std::vector<ClientDelayPool::Pointer> &pools() { return ClientDelayPools::Instance()->pools; }
+ ClientDelayPool &pool(const int i) { return *(ClientDelayPools::Instance()->pools.at(i)); }
};
#endif // SQUID_CLIENTDELAYCONFIG_H
#ifndef SQUID__SRC_CLIENTINFO_H
#define SQUID__SRC_CLIENTINFO_H
+#if USE_DELAY_POOLS
+#include "BandwidthBucket.h"
+#endif
#include "base/ByteCounter.h"
#include "cbdata.h"
#include "enums.h"
class CommQuotaQueue;
#endif
-class ClientInfo
+class ClientInfo : public hash_link
+#if USE_DELAY_POOLS
+ , public BandwidthBucket
+#endif
{
MEMPROXY_CLASS(ClientInfo);
explicit ClientInfo(const Ip::Address &);
~ClientInfo();
- hash_link hash; /* must be first */
-
Ip::Address addr;
struct Protocol {
int n_established; /* number of current established connections */
time_t last_seen;
#if USE_DELAY_POOLS
- double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
- double prevTime; ///< previous time when we checked
- double bucketSize; ///< how much can be written now
- double bucketSizeLimit; ///< maximum bucket size
bool writeLimitingActive; ///< Is write limiter active
bool firstTimeConnection;///< is this first time connection for this client
CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
int rationedQuota; ///< precomputed quota preserving fairness among clients
int rationedCount; ///< number of clients that will receive rationedQuota
- bool selectWaiting; ///< is between commSetSelect and commHandleWrite
bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
// all those functions access Comm fd_table and are defined in comm.cc
unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
void quotaDequeue(); ///< pops queue head from queue
void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
- int quotaForDequed(); ///< allocate quota for a just dequeued client
- void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+ /* BandwidthBucket API */
+ virtual int quota() override; ///< allocate quota for a just dequeued client
+ virtual bool applyQuota(int &nleft, Comm::IoCallback *state) override;
+ virtual void scheduleWrite(Comm::IoCallback *state) override;
+ virtual void onFdClosed() override;
+ virtual void reduceBucket(int len) override;
void quotaDumpQueue(); ///< dumps quota queue for debugging
DIST_SUBDIRS += esi
DELAY_POOL_ALL_SOURCE = \
+ BandwidthBucket.cc \
+ BandwidthBucket.h \
CommonPool.h \
CompositePoolNode.h \
delay_pools.cc \
DelayUser.h \
DelayVector.cc \
DelayVector.h \
+ MessageBucket.cc \
+ MessageBucket.h \
+ MessageDelayPools.h \
+ MessageDelayPools.cc \
NullDelayId.h \
ClientDelayConfig.cc \
ClientDelayConfig.h
--- /dev/null
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "comm/Connection.h"
+#include "DelayPools.h"
+#include "fde.h"
+#include "MessageBucket.h"
+
+MessageBucket::MessageBucket(const int speed, const int initialLevelPercent,
+ const double sizeLimit, MessageDelayPool::Pointer pool) :
+ BandwidthBucket(speed, initialLevelPercent, sizeLimit),
+ theAggregate(pool) {}
+
+int
+MessageBucket::quota()
+{
+ refillBucket();
+ theAggregate->refillBucket();
+ if (theAggregate->noLimit())
+ return bucketLevel;
+ else if (noLimit())
+ return theAggregate->level();
+ else
+ return min(bucketLevel, static_cast<double>(theAggregate->level()));
+}
+
+void
+MessageBucket::reduceBucket(int len)
+{
+ BandwidthBucket::reduceBucket(len);
+ theAggregate->bytesIn(len);
+}
+
+void
+MessageBucket::scheduleWrite(Comm::IoCallback *state)
+{
+ fde *F = &fd_table[state->conn->fd];
+ if (!F->writeQuotaHandler->selectWaiting) {
+ F->writeQuotaHandler->selectWaiting = true;
+ // message delay pools limit this write; see checkTimeouts()
+ SetSelect(state->conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, state, 0);
+ }
+}
+
+#endif /* USE_DELAY_POOLS */
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEBUCKET_H
+#define MESSAGEBUCKET_H
+
+#if USE_DELAY_POOLS
+
+#include "BandwidthBucket.h"
+#include "base/RefCount.h"
+#include "comm/forward.h"
+#include "MessageDelayPools.h"
+
+/// Limits Squid-to-client bandwidth for each matching response
+class MessageBucket : public RefCountable, public BandwidthBucket
+{
+ MEMPROXY_CLASS(MessageBucket);
+
+public:
+ typedef RefCount<MessageBucket> Pointer;
+
+ MessageBucket(const int speed, const int initialLevelPercent, const double sizeLimit, MessageDelayPool::Pointer pool);
+
+ /* BandwidthBucket API */
+ virtual int quota() override;
+ virtual void scheduleWrite(Comm::IoCallback *state) override;
+ virtual void reduceBucket(int len);
+
+private:
+ MessageDelayPool::Pointer theAggregate;
+};
+
+#endif /* USE_DELAY_POOLS */
+
+#endif
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if USE_DELAY_POOLS
+#include "acl/Gadgets.h"
+#include "cache_cf.h"
+#include "ConfigParser.h"
+#include "DelaySpec.h"
+#include "event.h"
+#include "MessageBucket.h"
+#include "MessageDelayPools.h"
+#include "Parsing.h"
+#include "SquidTime.h"
+#include "Store.h"
+
+#include <algorithm>
+#include <map>
+
+MessageDelayPools::~MessageDelayPools()
+{
+ freePools();
+}
+
+MessageDelayPools *
+MessageDelayPools::Instance()
+{
+ static MessageDelayPools pools;
+ return &pools;
+}
+
+MessageDelayPool::Pointer
+MessageDelayPools::pool(const SBuf &name)
+{
+ auto it = std::find_if(pools.begin(), pools.end(),
+ [&name](const MessageDelayPool::Pointer p) { return p->poolName == name; });
+ return it == pools.end() ? 0 : *it;
+}
+
+void
+MessageDelayPools::add(MessageDelayPool *p)
+{
+ const auto it = std::find_if(pools.begin(), pools.end(),
+ [&p](const MessageDelayPool::Pointer mp) { return mp->poolName == p->poolName; });
+ if (it != pools.end()) {
+ debugs(3, DBG_CRITICAL, "WARNING: Ignoring duplicate " << p->poolName << " response delay pool");
+ return;
+ }
+ pools.push_back(p);
+}
+
+void
+MessageDelayPools::freePools()
+{
+ pools.clear();
+}
+
+MessageDelayPool::MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+ int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent):
+ access(0),
+ poolName(name),
+ individualRestore(bucketSpeed),
+ individualMaximum(bucketSize),
+ aggregateRestore(aggregateSpeed),
+ aggregateMaximum(aggregateSize),
+ initialBucketLevel(initialBucketPercent),
+ lastUpdate(squid_curtime)
+{
+ theBucket.level() = aggregateMaximum;
+}
+
+MessageDelayPool::~MessageDelayPool()
+{
+ if (access)
+ aclDestroyAccessList(&access);
+}
+
+void
+MessageDelayPool::refillBucket()
+{
+ if (noLimit())
+ return;
+ const int incr = squid_curtime - lastUpdate;
+ if (incr >= 1) {
+ lastUpdate = squid_curtime;
+ DelaySpec spec;
+ spec.restore_bps = aggregateRestore;
+ spec.max_bytes = aggregateMaximum;
+ theBucket.update(spec, incr);
+ }
+}
+
+void
+MessageDelayPool::dump(StoreEntry *entry) const
+{
+ SBuf name("response_delay_pool_access ");
+ name.append(poolName);
+ dump_acl_access(entry, name.c_str(), access);
+ storeAppendPrintf(entry, "response_delay_pool parameters %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %d\n",
+ individualRestore, individualMaximum, aggregateRestore, aggregateMaximum, initialBucketLevel);
+ storeAppendPrintf(entry, "\n");
+}
+
+MessageBucket::Pointer
+MessageDelayPool::createBucket()
+{
+ return new MessageBucket(individualRestore, initialBucketLevel, individualMaximum, this);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPool()
+{
+ static const SBuf bucketSpeedLimit("individual-restore");
+ static const SBuf maxBucketSize("individual-maximum");
+ static const SBuf aggregateSpeedLimit("aggregate-restore");
+ static const SBuf maxAggregateSize("aggregate-maximum");
+ static const SBuf initialBucketPercent("initial-bucket-level");
+
+ static std::map<SBuf, int64_t> params;
+ params[bucketSpeedLimit] = -1;
+ params[maxBucketSize] = -1;
+ params[aggregateSpeedLimit] = -1;
+ params[maxAggregateSize] = -1;
+ params[initialBucketPercent] = 50;
+
+ const SBuf name(ConfigParser::NextToken());
+ if (name.isEmpty()) {
+ debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool missing required \"name\" parameter.");
+ self_destruct();
+ }
+
+ char *key = nullptr;
+ char *value = nullptr;
+ while (ConfigParser::NextKvPair(key, value)) {
+ if (!value) {
+ debugs(3, DBG_CRITICAL, "FATAL: '" << key << "' option missing value");
+ self_destruct();
+ }
+ auto it = params.find(SBuf(key));
+ if (it == params.end()) {
+ debugs(3, DBG_CRITICAL, "FATAL: response_delay_pool unknown option '" << key << "'");
+ self_destruct();
+ }
+ it->second = (it->first == initialBucketPercent) ? xatos(value) : xatoll(value, 10);
+ }
+
+ const char *fatalMsg = nullptr;
+ if ((params[bucketSpeedLimit] < 0) != (params[maxBucketSize] < 0))
+ fatalMsg = "'individual-restore' and 'individual-maximum'";
+ else if ((params[aggregateSpeedLimit] < 0) != (params[maxAggregateSize] < 0))
+ fatalMsg = "'aggregate-restore' and 'aggregate-maximum'";
+
+ if (fatalMsg) {
+ debugs(3, DBG_CRITICAL, "FATAL: must use " << fatalMsg << " options in conjunction");
+ self_destruct();
+ }
+
+ MessageDelayPool *pool = new MessageDelayPool(name,
+ params[bucketSpeedLimit],
+ params[maxBucketSize],
+ params[aggregateSpeedLimit],
+ params[maxAggregateSize],
+ static_cast<uint16_t>(params[initialBucketPercent])
+ );
+ MessageDelayPools::Instance()->add(pool);
+}
+
+void
+MessageDelayConfig::parseResponseDelayPoolAccess() {
+ const char *token = ConfigParser::NextToken();
+ if (!token) {
+ debugs(3, DBG_CRITICAL, "ERROR: required pool_name option missing");
+ return;
+ }
+ MessageDelayPool::Pointer pool = MessageDelayPools::Instance()->pool(SBuf(token));
+ static ConfigParser parser;
+ if (pool)
+ aclParseAccessLine("response_delay_pool_access", parser, &pool->access);
+}
+
+void
+MessageDelayConfig::freePools()
+{
+ MessageDelayPools::Instance()->freePools();
+}
+
+void
+MessageDelayConfig::dumpResponseDelayPoolParameters(StoreEntry *entry, const char *name)
+{
+ auto &pools = MessageDelayPools::Instance()->pools;
+ for (auto pool: pools)
+ pool->dump(entry);
+}
+
+#endif
+
--- /dev/null
+/*
+ * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef MESSAGEDELAYPOOLS_H
+#define MESSAGEDELAYPOOLS_H
+
+#if USE_DELAY_POOLS
+
+#include "acl/Acl.h"
+#include "base/RefCount.h"
+#include "DelayBucket.h"
+#include "DelayPools.h"
+
+class MessageBucket;
+typedef RefCount<MessageBucket> MessageBucketPointer;
+
+/// \ingroup DelayPoolsAPI
+/// Represents one 'response' delay pool, creates individual response
+/// buckets and performes aggregate limiting for them
+class MessageDelayPool : public RefCountable
+{
+public:
+ typedef RefCount<MessageDelayPool> Pointer;
+
+ MessageDelayPool(const SBuf &name, int64_t bucketSpeed, int64_t bucketSize,
+ int64_t aggregateSpeed, int64_t aggregateSize, uint16_t initialBucketPercent);
+ ~MessageDelayPool();
+ MessageDelayPool(const MessageDelayPool &) = delete;
+ MessageDelayPool &operator=(const MessageDelayPool &) = delete;
+
+ /// Increases the aggregate bucket level with the aggregateRestore speed.
+ void refillBucket();
+ /// decreases the aggregate level
+ void bytesIn(int qty) { if (!noLimit()) theBucket.bytesIn(qty); }
+ /// current aggregate level
+ int level() { return theBucket.level(); }
+ /// creates an individual response bucket
+ MessageBucketPointer createBucket();
+ /// whether the aggregate bucket has no limit
+ bool noLimit () const { return aggregateRestore < 0; }
+
+ void dump (StoreEntry * entry) const;
+
+ acl_access *access;
+ /// the response delay pool name
+ SBuf poolName;
+ /// the speed limit of an individual bucket (bytes/s)
+ int64_t individualRestore;
+ /// the maximum size of an individual bucket
+ int64_t individualMaximum;
+ /// the speed limit of the aggregate bucket (bytes/s)
+ int64_t aggregateRestore;
+ /// the maximum size of the aggregate bucket
+ int64_t aggregateMaximum;
+ /// the initial bucket size as a percentage of individualMaximum
+ uint16_t initialBucketLevel;
+ /// the aggregate bucket
+ DelayBucket theBucket;
+
+private:
+ /// Time the aggregate bucket level was last refilled.
+ time_t lastUpdate;
+};
+
+/// \ingroup DelayPoolsAPI
+/// represents all configured 'response' delay pools
+class MessageDelayPools
+{
+public:
+ MessageDelayPools(const MessageDelayPools &) = delete;
+ MessageDelayPools &operator=(const MessageDelayPools &) = delete;
+
+ static MessageDelayPools *Instance();
+
+ /// returns a MessageDelayPool with a given name or null otherwise
+ MessageDelayPool::Pointer pool(const SBuf &name);
+ /// appends a single MessageDelayPool, created during configuration
+ void add(MessageDelayPool *pool);
+ /// memory cleanup, performing during reconfiguration
+ void freePools();
+
+ std::vector<MessageDelayPool::Pointer> pools;
+
+private:
+ MessageDelayPools() {}
+ ~MessageDelayPools();
+ void Stats() { } // TODO
+};
+
+/// represents configuration for response delay pools
+class MessageDelayConfig
+{
+public:
+ void parseResponseDelayPool();
+ void dumpResponseDelayPoolParameters(StoreEntry *e, const char *name);
+ void parseResponseDelayPoolAccess();
+ void freePools();
+};
+
+#define free_response_delay_pool_access(X)
+#define dump_response_delay_pool_access(X, Y, Z)
+
+inline void
+free_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+ cfg->freePools();
+}
+
+inline void
+dump_response_delay_pool_parameters(StoreEntry *entry, const char *name, MessageDelayConfig &cfg)
+{
+ cfg.dumpResponseDelayPoolParameters(entry, name);
+}
+
+inline void
+parse_response_delay_pool_parameters(MessageDelayConfig * cfg)
+{
+ cfg->parseResponseDelayPool();
+}
+
+inline void
+parse_response_delay_pool_access(MessageDelayConfig * cfg)
+{
+ cfg->parseResponseDelayPoolAccess();
+}
+
+#endif
+#endif
+
#include "acl/forward.h"
#include "base/RefCount.h"
#include "base/YesNoNone.h"
+#if USE_DELAY_POOLS
#include "ClientDelayConfig.h"
#include "DelayConfig.h"
+#endif
#include "helper/ChildConfig.h"
#include "HttpHeaderTools.h"
#include "ip/Address.h"
+#if USE_DELAY_POOLS
+#include "MessageDelayPools.h"
+#endif
#include "Notes.h"
#include "security/forward.h"
#include "SquidTime.h"
DelayConfig Delay;
ClientDelayConfig ClientDelay;
+ MessageDelayConfig MessageDelay;
#endif
struct {
#include "log/Config.h"
#include "log/CustomLog.h"
#include "MemBuf.h"
+#include "MessageDelayPools.h"
#include "mgr/ActionPasswordList.h"
#include "mgr/Registration.h"
#include "neighbors.h"
static void
free_client_delay_pool_count(ClientDelayConfig * cfg)
{
- cfg->freePoolCount();
+ cfg->freePools();
}
static void
client_delay_pool_access acl
client_delay_pool_count
client_delay_pool_rates
+response_delay_pool_access acl
+response_delay_pool_parameters
denyinfo acl
eol
externalAclHelper auth_param
See also client_delay_parameters and client_delay_pools.
DOC_END
+NAME: response_delay_pool
+TYPE: response_delay_pool_parameters
+DEFAULT: none
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+ This option configures client response bandwidth limits using the
+ following format:
+
+ response_delay_pool name [option=value] ...
+
+ name the response delay pool name
+
+ available options:
+
+ individual-restore The speed limit of an individual
+ bucket(bytes/s). To be used in conjunction
+ with 'individual-maximum'.
+
+ individual-maximum The maximum number of bytes which can
+ be placed into the individual bucket. To be used
+ in conjunction with 'individual-restore'.
+
+ aggregate-restore The speed limit for the aggregate
+ bucket(bytes/s). To be used in conjunction with
+ 'aggregate-maximum'.
+
+ aggregate-maximum The maximum number of bytes which can
+ be placed into the aggregate bucket. To be used
+ in conjunction with 'aggregate-restore'.
+
+ initial-bucket-level The initial bucket size as a percentage
+ of individual-maximum.
+
+ Individual and(or) aggregate bucket options may not be specified,
+ meaning no individual and(or) aggregate speed limitation.
+ See also response_delay_pool_access and delay_parameters for
+ terminology details.
+DOC_END
+
+NAME: response_delay_pool_access
+TYPE: response_delay_pool_access
+DEFAULT: none
+DEFAULT_DOC: Deny use of the pool, unless allow rules exist in squid.conf for the pool.
+IFDEF: USE_DELAY_POOLS
+LOC: Config.MessageDelay
+DOC_START
+ Determines whether a specific named response delay pool is used
+ for the transaction. The syntax for this directive is:
+
+ response_delay_pool_access pool_name allow|deny acl_name
+
+ All response_delay_pool_access options are checked in the order
+ they appear in this configuration file. The first rule with a
+ matching ACL wins. If (and only if) an "allow" rule won, Squid
+ assigns the response to the corresponding named delay pool.
+DOC_END
+
COMMENT_START
WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
-----------------------------------------------------------------------------
#endif
ClientInfo::ClientInfo(const Ip::Address &ip) :
+#if USE_DELAY_POOLS
+ BandwidthBucket(0, 0, 0),
+#endif
addr(ip),
n_established(0),
last_seen(0)
#if USE_DELAY_POOLS
- , writeSpeedLimit(0),
- prevTime(0),
- bucketSize(0),
- bucketSizeLimit(0),
- writeLimitingActive(false),
+ , writeLimitingActive(false),
firstTimeConnection(true),
quotaQueue(nullptr),
rationedQuota(0),
rationedCount(0),
- selectWaiting(false),
eventWaiting(false)
#endif
{
debugs(77, 9, "ClientInfo constructed, this=" << static_cast<void*>(this));
-
-#if USE_DELAY_POOLS
- getCurrentTime();
- /* put current time to have something sensible here */
- prevTime = current_dtime;
-#endif
-
char *buf = static_cast<char*>(xmalloc(MAX_IPSTRLEN)); // becomes hash.key
- hash.key = addr.toStr(buf,MAX_IPSTRLEN);
+ key = addr.toStr(buf,MAX_IPSTRLEN);
}
static ClientInfo *
clientdbAdd(const Ip::Address &addr)
{
ClientInfo *c = new ClientInfo(addr);
- hash_join(client_table, &c->hash);
+ hash_join(client_table, static_cast<hash_link*>(c));
++statCounter.client_http.clients;
if ((statCounter.client_http.clients > max_clients) && !cleanup_running && cleanup_scheduled < 2) {
clientdbDump(StoreEntry * sentry)
{
const char *name;
- ClientInfo *c;
int icp_total = 0;
int icp_hits = 0;
int http_total = 0;
storeAppendPrintf(sentry, "Cache Clients:\n");
hash_first(client_table);
- while ((c = (ClientInfo *) hash_next(client_table))) {
- storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(&c->hash));
+ while (hash_link *hash = hash_next(client_table)) {
+ const ClientInfo *c = reinterpret_cast<const ClientInfo *>(hash);
+ storeAppendPrintf(sentry, "Address: %s\n", hashKeyStr(hash));
if ( (name = fqdncache_gethostbyaddr(c->addr, 0)) ) {
storeAppendPrintf(sentry, "Name: %s\n", name);
}
ClientInfo::~ClientInfo()
{
- safe_free(hash.key);
+ safe_free(key);
#if USE_DELAY_POOLS
if (CommQuotaQueue *q = quotaQueue) {
if (age < 60)
continue;
- hash_remove_link(client_table, &c->hash);
+ hash_remove_link(client_table, static_cast<hash_link*>(c));
clientdbFreeItem(c);
Ip::Address *
client_entry(Ip::Address *current)
{
- ClientInfo *c = NULL;
char key[MAX_IPSTRLEN];
+ hash_first(client_table);
if (current) {
current->toStr(key,MAX_IPSTRLEN);
- hash_first(client_table);
- while ((c = (ClientInfo *) hash_next(client_table))) {
- if (!strcmp(key, hashKeyStr(&c->hash)))
+ while (hash_link *hash = hash_next(client_table)) {
+ if (!strcmp(key, hashKeyStr(hash)))
break;
}
-
- c = (ClientInfo *) hash_next(client_table);
- } else {
- hash_first(client_table);
- c = (ClientInfo *) hash_next(client_table);
}
- hash_last(client_table);
+ ClientInfo *c = reinterpret_cast<ClientInfo *>(hash_next(client_table));
- if (c)
- return (&c->addr);
- else
- return (NULL);
+ hash_last(client_table);
+ return c ? &c->addr : nullptr;
}
variable_list *
#endif
#if USE_DELAY_POOLS
#include "ClientInfo.h"
+#include "MessageDelayPools.h"
#endif
#if USE_OPENSSL
#include "ssl/bio.h"
if (Config.onoff.client_db) {
/* it was said several times that client write limiter does not work if client_db is disabled */
- ClientDelayPools& pools(Config.ClientDelay.pools);
+ auto &pools = ClientDelayPools::Instance()->pools;
ACLFilledChecklist ch(NULL, NULL, NULL);
// TODO: we check early to limit error response bandwith but we
for (unsigned int pool = 0; pool < pools.size(); ++pool) {
/* pools require explicit 'allow' to assign a client into them */
- if (pools[pool].access) {
- ch.changeAcl(pools[pool].access);
+ if (pools[pool]->access) {
+ ch.changeAcl(pools[pool]->access);
allow_t answer = ch.fastCheck();
if (answer == ACCESS_ALLOWED) {
/* setup write limiter for this request */
const double burst = floor(0.5 +
- (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
- cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+ (pools[pool]->highwatermark * Config.ClientDelay.initial)/100.0);
+ cli->setWriteLimiter(pools[pool]->rate, burst, pools[pool]->highwatermark);
break;
} else {
debugs(83, 4, HERE << "Delay pool " << pool << " skipped because ACL " << answer);
#include "security/Handshake.h"
#include "ssl/support.h"
#endif
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
class ClientHttpRequest;
class HttpHdrRangeSpec;
}
#if USE_DELAY_POOLS
- if (ClientInfo *clientInfo = F->clientInfo) {
- if (clientInfo->selectWaiting) {
- clientInfo->selectWaiting = false;
- // kick queue or it will get stuck as commWriteHandle is not called
- clientInfo->kickQuotaQueue();
- }
+ if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(F)) {
+ if (bucket->selectWaiting)
+ bucket->onFdClosed();
}
#endif
{
if (!eventWaiting && !selectWaiting && hasQueue()) {
// wait at least a second if the bucket is empty
- const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+ const double delay = (bucketLevel < 1.0) ? 1.0 : 0.0;
eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
quotaQueue, delay, 0, true);
eventWaiting = true;
/// calculates how much to write for a single dequeued client
int
-ClientInfo::quotaForDequed()
+ClientInfo::quota()
{
/* If we have multiple clients and give full bucketSize to each client then
* clt1 may often get a lot more because clt1->clt2 time distance in the
// Rounding errors do not accumulate here, but we round down to avoid
// negative bucket sizes after write with rationedCount=1.
- rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+ rationedQuota = static_cast<int>(floor(bucketLevel/rationedCount));
debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
'*' << rationedCount);
}
return rationedQuota;
}
-///< adds bytes to the quota bucket based on the rate and passed time
+bool
+ClientInfo::applyQuota(int &nleft, Comm::IoCallback *state)
+{
+ assert(hasQueue());
+ assert(quotaPeekFd() == state->conn->fd);
+ quotaDequeue(); // we will write or requeue below
+ if (nleft > 0 && !BandwidthBucket::applyQuota(nleft, state)) {
+ state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+ kickQuotaQueue();
+ return false;
+ }
+ return true;
+}
+
void
-ClientInfo::refillBucket()
+ClientInfo::scheduleWrite(Comm::IoCallback *state)
{
- // all these times are in seconds, with double precision
- const double currTime = current_dtime;
- const double timePassed = currTime - prevTime;
-
- // Calculate allowance for the time passed. Use double to avoid
- // accumulating rounding errors for small intervals. For example, always
- // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
- const double gain = timePassed * writeSpeedLimit;
-
- debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
- bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
- " = " << gain << ')');
-
- // to further combat error accumulation during micro updates,
- // quit before updating time if we cannot add at least one byte
- if (gain < 1.0)
- return;
-
- prevTime = currTime;
-
- // for "first" connections, drain initial fat before refilling but keep
- // updating prevTime to avoid bursts after the fat is gone
- if (bucketSize > bucketSizeLimit) {
- debugs(77,4, HERE << "not refilling while draining initial fat");
- return;
+ if (writeLimitingActive) {
+ state->quotaQueueReserv = quotaEnqueue(state->conn->fd);
+ kickQuotaQueue();
}
+}
- bucketSize += gain;
+void
+ClientInfo::onFdClosed()
+{
+ BandwidthBucket::onFdClosed();
+ // kick queue or it will get stuck as commWriteHandle is not called
+ kickQuotaQueue();
+}
- // obey quota limits
- if (bucketSize > bucketSizeLimit)
- bucketSize = bucketSizeLimit;
+void
+ClientInfo::reduceBucket(const int len)
+{
+ if (len > 0)
+ BandwidthBucket::reduceBucket(len);
+ // even if we wrote nothing, we were served; give others a chance
+ kickQuotaQueue();
}
void
ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
{
- debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+ debugs(77,5, "Write limits for " << (const char*)key <<
" speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
" highwatermark=" << aHighWatermark);
assert(!quotaQueue);
quotaQueue = new CommQuotaQueue(this);
- bucketSize = anInitialBurst;
+ bucketLevel = anInitialBurst;
prevTime = current_dtime;
}
}
unsigned int
CommQuotaQueue::enqueue(int fd)
{
- debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+ debugs(77,5, "clt" << (const char*)clientInfo->key <<
": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
fds.push_back(fd);
return ++ins;
CommQuotaQueue::dequeue()
{
assert(!fds.empty());
- debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+ debugs(77,5, "clt" << (const char*)clientInfo->key <<
": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
fds.size());
fds.pop_front();
++outs;
}
-#endif
+#endif /* USE_DELAY_POOLS */
/*
* hm, this might be too general-purpose for all the places we'd
debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
COMMIO_FD_WRITECB(fd)->finish(Comm::COMM_ERROR, ETIMEDOUT);
- } else if (AlreadyTimedOut(F))
+#if USE_DELAY_POOLS
+ } else if (F->writeQuotaHandler != nullptr && COMMIO_FD_WRITECB(fd)->conn != nullptr) {
+ if (!F->writeQuotaHandler->selectWaiting && F->writeQuotaHandler->quota() && !F->closing()) {
+ F->writeQuotaHandler->selectWaiting = true;
+ Comm::SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, COMMIO_FD_WRITECB(fd), 0);
+ }
+ continue;
+#endif
+ }
+ else if (AlreadyTimedOut(F))
continue;
debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
Comm::IoCallback::selectOrQueueWrite()
{
#if USE_DELAY_POOLS
- // stand in line if there is one
- if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) {
- if (clientInfo->writeLimitingActive) {
- quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd);
- clientInfo->kickQuotaQueue();
- return;
- }
+ if (BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[conn->fd])) {
+ bucket->scheduleWrite(this);
+ return;
}
#endif
/// Initialize the module on Squid startup
void SelectLoopInit(void);
-/// Mark an FD to be watched for its IO status.
-void SetSelect(int, unsigned int, PF *, void *, time_t);
-
/// reset/undo/unregister the watch for an FD which was set by Comm::SetSelect()
void ResetSelect(int);
*/
#include "squid.h"
+#include "cbdata.h"
#include "comm/Connection.h"
#include "comm/IoCallback.h"
+#include "comm/Loops.h"
#include "comm/Write.h"
#include "fd.h"
#include "fde.h"
int len = 0;
int nleft;
- assert(state->conn != NULL && state->conn->fd == fd);
+ assert(state->conn != NULL);
+ assert(state->conn->fd == fd);
PROF_start(commHandleWrite);
debugs(5, 5, HERE << state->conn << ": off " <<
nleft = state->size - state->offset;
#if USE_DELAY_POOLS
- ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
- if (clientInfo && !clientInfo->writeLimitingActive)
- clientInfo = NULL; // we only care about quota limits here
-
- if (clientInfo) {
- assert(clientInfo->selectWaiting);
- clientInfo->selectWaiting = false;
-
- assert(clientInfo->hasQueue());
- assert(clientInfo->quotaPeekFd() == fd);
- clientInfo->quotaDequeue(); // we will write or requeue below
-
- if (nleft > 0) {
- const int quota = clientInfo->quotaForDequed();
- if (!quota) { // if no write quota left, queue this fd
- state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
- clientInfo->kickQuotaQueue();
- PROF_stop(commHandleWrite);
- return;
- }
-
- const int nleft_corrected = min(nleft, quota);
- if (nleft != nleft_corrected) {
- debugs(5, 5, HERE << state->conn << " writes only " <<
- nleft_corrected << " out of " << nleft);
- nleft = nleft_corrected;
- }
-
+ BandwidthBucket *bucket = BandwidthBucket::SelectBucket(&fd_table[fd]);
+ if (bucket) {
+ assert(bucket->selectWaiting);
+ bucket->selectWaiting = false;
+ if (nleft > 0 && !bucket->applyQuota(nleft, state)) {
+ PROF_stop(commHandleWrite);
+ return;
}
}
#endif /* USE_DELAY_POOLS */
debugs(5, 5, HERE << "write() returns " << len);
#if USE_DELAY_POOLS
- if (clientInfo) {
- if (len > 0) {
- /* we wrote data - drain them from bucket */
- clientInfo->bucketSize -= len;
- if (clientInfo->bucketSize < 0.0) {
- debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
- clientInfo->bucketSize = 0;
- }
- }
-
- // even if we wrote nothing, we were served; give others a chance
- clientInfo->kickQuotaQueue();
+ if (bucket) {
+ /* we wrote data - drain them from bucket */
+ bucket->reduceBucket(len);
}
#endif /* USE_DELAY_POOLS */
/// Cancel the write pending on FD. No action if none pending.
void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
-// callback handler to process an FD which is available for writing.
-extern PF HandleWrite;
-
} // namespace Comm
#endif /* _SQUID_COMM_IOWRITE_H */
#include <vector>
+/// legacy CBDATA callback functions ABI definition for read or write I/O events
+/// \deprecated use CommCalls API instead where possible
+typedef void PF(int, void *);
+
/// Abstraction layer for TCP, UDP, TLS, UDS and filedescriptor sockets.
namespace Comm
{
bool IsConnOpen(const Comm::ConnectionPointer &conn);
-}; // namespace Comm
+// callback handler to process an FD which is available for writing.
+PF HandleWrite;
-/// legacy CBDATA callback functions ABI definition for read or write I/O events
-/// \deprecated use CommCalls API instead where possible
-typedef void PF(int, void *);
+/// Mark an FD to be watched for its IO status.
+void SetSelect(int, unsigned int, PF *, void *, time_t);
+
+}; // namespace Comm
#endif /* _SQUID_COMM_FORWARD_H */
#include "typedefs.h" //DRCB, DWCB
#if USE_DELAY_POOLS
+#include "MessageBucket.h"
class ClientInfo;
#endif
class dwrite_q;
#if USE_DELAY_POOLS
/// pointer to client info used in client write limiter or nullptr if not present
ClientInfo * clientInfo = nullptr;
+ MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
#endif
unsigned epoll_state = 0;
#include "HttpHeaderTools.h"
#include "Store.h"
#include "TimeOrTag.h"
+#if USE_DELAY_POOLS
+#include "acl/FilledChecklist.h"
+#include "ClientInfo.h"
+#include "fde.h"
+#include "MessageDelayPools.h"
+#endif
Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
clientConnection(aConn),
mb->append(bodyData.data, length);
}
}
+#if USE_DELAY_POOLS
+ for (const auto &pool: MessageDelayPools::Instance()->pools) {
+ if (pool->access) {
+ std::unique_ptr<ACLFilledChecklist> chl(clientAclChecklistCreate(pool->access, http));
+ chl->reply = rep;
+ HTTPMSGLOCK(chl->reply);
+ const allow_t answer = chl->fastCheck();
+ if (answer == ACCESS_ALLOWED) {
+ writeQuotaHandler = pool->createBucket();
+ fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
+ break;
+ } else {
+ debugs(83, 4, "Response delay pool " << pool->poolName <<
+ " skipped because ACL " << answer);
+ }
+ }
+ }
+#endif
getConn()->write(mb);
delete mb;
#include "http/forward.h"
#include "mem/forward.h"
#include "StoreIOBuffer.h"
+#if USE_DELAY_POOLS
+#include "MessageBucket.h"
+#endif
class clientStreamNode;
class ClientHttpRequest;
bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
bool connRegistered_;
+#if USE_DELAY_POOLS
+ MessageBucket::Pointer writeQuotaHandler; ///< response write limiter, if configured
+#endif
};
} // namespace Http