Client-side bandwidth limit (a.k.a., quota or delay pool) implementation.
In mobile environments, Squid may need to limit Squid-to-client bandwidth
available to individual users, identified by their IP addresses. The IP
address pool can be as large as a /10 IPv4 network (4 million unique IP
addresses) and even larger in IPv6 environments. On the other hand, the code
should support thousands of connections coming from a single IP (e.g., a child
proxy).
The implementation is based on storing bandwidth-related "bucket" information
in the existing "client database" hash (client_db.cc). The old code already
assigned each client IP a single ClientInfo object, which satisfies the
client-side IP-based bandwidth pooling requirements. The old hash size is
increased to support up to 32K concurrent clients if needed.
Client-side pools are configured similarly to server-side ones, but there is
only one pool class. See client_delay_pools,
client_delay_initial_bucket_level, client_delay_parameters, and
client_delay_access in squid.conf. The client_delay_access matches the client
with delay parameters. It does not pool clients from different IP addresses
together.
Special care is taken to provide fair distribution of bandwidth among clients
sharing the same bucket (i.e., clients coming from the same IP address).
Multiple same-IP clients competing for bandwidth are queued using FIFO
algorithm. If a bucket becomes empty, the first client among those sharing the
bucket is delayed by 1 second before it can attempt to receive more response
data from Squid. This delay may need to be lowered in high-bandwidth
environments.
This feature has been documented at
http://wiki.squid-cache.org/Features/ClientBandwidthLimit
<p>The most important of these new features are:
<itemize>
- <item>...
+ <item>Client Bandwidth Limits
</itemize>
Most user-facing changes are reflected in squid.conf (see below).
-<sect1>...
-<p>...
+<sect1> Client Bandwidth Limits
+<p>In mobile environments, Squid may need to limit Squid-to-client bandwidth
+ available to individual users, identified by their IP addresses. The IP
+ address pool can be as large as a /10 IPv4 network (4 million unique IP
+ addresses) and even larger in IPv6 environments. On the other hand, the code
+ should support thousands of connections coming from a single IP (e.g.,
+ a child proxy).
+
+<p>The implementation is based on storing bandwidth-related "bucket" information
+ in the existing "client database" hash (client_db.cc). The old code already
+ assigned each client IP a single ClientInfo object, which satisfies the
+ client-side IP-based bandwidth pooling requirements. The old hash size is
+ increased to support up to 32K concurrent clients if needed.
+
+<p>Client-side pools are configured similarly to server-side ones, but there is
+ only one pool class. See client_delay_pools,
+ client_delay_initial_bucket_level, client_delay_parameters, and
+ client_delay_access in squid.conf. The client_delay_access matches the client
+ with delay parameters. It does not pool clients from different IP addresses
+ together.
+<p>
+ Special care is taken to provide fair distribution of bandwidth among clients
+ sharing the same bucket (i.e., clients coming from the same IP address).
+ Multiple same-IP clients competing for bandwidth are queued using FIFO
+ algorithm. If a bucket becomes empty, the first client among those sharing
+ the bucket is delayed by 1 second before it can attempt to receive more
+ response data from Squid. This delay may need to be lowered in
+ high-bandwidth environments.
<sect>Changes to squid.conf since Squid-3.2
<p>
<sect1>New tags<label id="newtags">
<p>
<descrip>
-
+ <tag>client_delay_pools</tag>
+ <p> New setting for client bandwith limits to specifies the number
+ of client delay pools used.
+
+ <tag>client_delay_initial_bucket_level</tag>
+ <p> New setting for client bandwith limits to determine the initial
+ bucket size as a percentage of max_bucket_size from
+ client_delay_parameters.
+
+ <tag>client_delay_parameters</tag>
+ <p> New setting for client bandwith limits to configures client-side
+ bandwidth limits.
+
+ <tag>client_delay_access</tag>
+ <p> New setting for client bandwith limits to determines the
+ client-side delay pool for the request.
</descrip>
<sect1>Changes to existing tags<label id="modifiedtags">
#include "hash.h"
#include "enums.h"
#include "typedefs.h"
+#include "cbdata.h"
+#include <deque>
+
+#if DELAY_POOLS
+class CommQuotaQueue;
+#endif
class ClientInfo
{
} cutoff;
int n_established; /* number of current established connections */
time_t last_seen;
+#if DELAY_POOLS
+ double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
+ double prevTime; ///< previous time when we checked
+ double bucketSize; ///< how much can be written now
+ double bucketSizeLimit; ///< maximum bucket size
+ bool writeLimitingActive; ///< Is write limiter active
+ bool firstTimeConnection;///< is this first time connection for this client
+
+ CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
+ int rationedQuota; ///< precomputed quota preserving fairness among clients
+ int rationedCount; ///< number of clients that will receive rationedQuota
+ bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+ bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
+
+ // all those functions access Comm fd_table and are defined in comm.cc
+ bool hasQueue() const; ///< whether any clients are waiting for write quota
+ bool hasQueue(const CommQuotaQueue*) const; ///< has a given queue
+ unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary
+ int quotaPeekFd() const; ///< retuns the next fd reservation
+ unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
+ void quotaDequeue(); ///< pops queue head from queue
+ void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
+ int quotaForDequed(); ///< allocate quota for a just dequeued client
+ void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+ void quotaDumpQueue(); ///< dumps quota queue for debugging
+
+/**
+ * Configure client write limiting (note:"client" here means - IP). It is called
+ * by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
+ * computed, using the configured maximum bucket vavlue and configured initial
+ * bucket value(50% by default).
+ *
+ * \param writeSpeedLimit is speed limit configured in config for this pool
+ * \param initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
+ * \param highWatermark is maximum bucket value
+ */
+ void setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark);
+#endif
+};
+
+#if DELAY_POOLS
+// a queue of Comm clients waiting for I/O quota controlled by delay pools
+class CommQuotaQueue
+{
+public:
+ CommQuotaQueue(ClientInfo *info);
+ ~CommQuotaQueue();
+
+ bool empty() const { return fds.empty(); }
+ size_t size() const { return fds.size(); }
+ int front() const { return fds.front(); }
+ unsigned int enqueue(int fd);
+ void dequeue();
+
+ ClientInfo *clientInfo; ///< bucket responsible for quota maintenance
+
+ // these counters might overflow; that is OK because they are for IDs only
+ int ins; ///< number of enqueue calls, used to generate a "reservation" ID
+ int outs; ///< number of dequeue calls, used to check the "reservation" ID
+
+private:
+ // TODO: optimize using a Ring- or List-based store?
+ typedef std::deque<int> Store;
+ Store fds; ///< descriptor queue
+
+ CBDATA_CLASS2(CommQuotaQueue);
};
+#endif /* DELAY_POOLS */
#endif
DelayVector.cc \
DelayVector.h \
NullDelayId.cc \
- NullDelayId.h
+ NullDelayId.h \
+ ClientDelayConfig.cc \
+ ClientDelayConfig.h
+
if USE_DELAY_POOLS
DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE)
else
#endif
+#if DELAY_POOLS
+#include "ClientDelayConfig.h"
+/* do nothing - free_client_delay_pool_count is the magic free function.
+ * this is why client_delay_pool_count isn't just marked TYPE: ushort
+ */
+
+#define free_client_delay_pool_access(X)
+#define free_client_delay_pool_rates(X)
+#define dump_client_delay_pool_access(X, Y, Z)
+#define dump_client_delay_pool_rates(X, Y, Z)
+
+static void
+free_client_delay_pool_count(ClientDelayConfig * cfg)
+{
+ cfg->freePoolCount();
+}
+
+static void
+dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg)
+{
+ cfg.dumpPoolCount (entry, name);
+}
+
+static void
+parse_client_delay_pool_count(ClientDelayConfig * cfg)
+{
+ cfg->parsePoolCount();
+}
+
+static void
+parse_client_delay_pool_rates(ClientDelayConfig * cfg)
+{
+ cfg->parsePoolRates();
+}
+
+static void
+parse_client_delay_pool_access(ClientDelayConfig * cfg)
+{
+ cfg->parsePoolAccess(LegacyParser);
+}
+#endif
+
#if USE_HTTP_VIOLATIONS
static void
dump_http_header_access(StoreEntry * entry, const char *name, header_mangler header[])
delay_pool_class delay_pools
delay_pool_count
delay_pool_rates delay_class
+client_delay_pool_access acl
+client_delay_pool_count
+client_delay_pool_rates
denyinfo acl
eol
externalAclHelper auth_param
"seen" by squid).
DOC_END
+COMMENT_START
+ CLIENT DELAY POOL PARAMETERS
+ -----------------------------------------------------------------------------
+COMMENT_END
+
+NAME: client_delay_pools
+TYPE: client_delay_pool_count
+DEFAULT: 0
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+ This option specifies the number of client delay pools used. It must
+ preceed other client_delay_* options.
+
+Example:
+ client_delay_pools 2
+DOC_END
+
+NAME: client_delay_initial_bucket_level
+COMMENT: (percent, 0-no_limit)
+TYPE: ushort
+DEFAULT: 50
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay.initial
+DOC_START
+ This option determines the initial bucket size as a percentage of
+ max_bucket_size from client_delay_parameters. Buckets are created
+ at the time of the "first" connection from the matching IP. Idle
+ buckets are periodically deleted up.
+
+ You can specify more than 100 percent but note that such "oversized"
+ buckets are not refilled until their size goes down to max_bucket_size
+ from client_delay_parameters.
+
+Example:
+ client_delay_initial_bucket_level 50
+DOC_END
+
+NAME: client_delay_parameters
+TYPE: client_delay_pool_rates
+DEFAULT: none
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+
+ This option configures client-side bandwidth limits using the
+ following format:
+
+ client_delay_parameters pool speed_limit max_bucket_size
+
+ pool is an integer ID used for client_delay_access matching.
+
+ speed_limit is bytes added to the bucket per second.
+
+ max_bucket_size is the maximum size of a bucket, enforced after any
+ speed_limit additions.
+
+ Please see the delay_parameters option for more information and
+ examples.
+
+Example:
+ client_delay_parameters 1 1024 2048
+ client_delay_parameters 2 51200 16384
+DOC_END
+
+NAME: client_delay_access
+TYPE: client_delay_pool_access
+DEFAULT: none
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+
+ This option determines the client-side delay pool for the
+ request:
+
+ client_delay_access pool_ID allow|deny acl_name
+
+ All client_delay_access options are checked in their pool ID
+ order, starting with pool 1. The first checked pool with allowed
+ request is selected for the request. If no ACL matches or there
+ are no client_delay_access options, the request bandwidth is not
+ limited.
+
+ The ACL-selected pool is then used to find the
+ client_delay_parameters for the request. Client-side pools are
+ not used to aggregate clients. Clients are always aggregated
+ based on their source IP addresses (one bucket per source IP).
+
+ Please see delay_access for more examples.
+
+Example:
+ client_delay_access 1 allow low_rate_network
+ client_delay_access 2 allow vips_network
+DOC_END
+
COMMENT_START
WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
-----------------------------------------------------------------------------
static void clientdbStartGC(void);
static void clientdbScheduledGC(void *);
+#if DELAY_POOLS
+static int max_clients = 32768;
+#else
static int max_clients = 32;
+#endif
+
static int cleanup_running = 0;
static int cleanup_scheduled = 0;
static int cleanup_removed;
+#if DELAY_POOLS
+#define CLIENT_DB_HASH_SIZE 65357
+#else
#define CLIENT_DB_HASH_SIZE 467
+#endif
static ClientInfo *
c = (ClientInfo *)memAllocate(MEM_CLIENT_INFO);
c->hash.key = addr.NtoA(buf,MAX_IPSTRLEN);
c->addr = addr;
+#if DELAY_POOLS
+ /* setup default values for client write limiter */
+ c->writeLimitingActive=false;
+ c->writeSpeedLimit=0;
+ c->bucketSize = 0;
+ c->firstTimeConnection=true;
+ c->quotaQueue = NULL;
+ c->rationedQuota = 0;
+ c->rationedCount = 0;
+ c->selectWaiting = false;
+ c->eventWaiting = false;
+
+ /* get current time */
+ getCurrentTime();
+ c->prevTime=current_dtime;/* put current time to have something sensible here */
+#endif
hash_join(client_table, &c->hash);
statCounter.client_http.clients++;
return;
client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string);
-
}
+#if DELAY_POOLS
+/* returns ClientInfo for given IP addr
+ Returns NULL if no such client (or clientdb turned off)
+ (it is assumed that clientdbEstablished will be called before and create client record if needed)
+*/
+ClientInfo * clientdbGetInfo(const Ip::Address &addr)
+{
+ char key[MAX_IPSTRLEN];
+ ClientInfo *c;
+
+ if (!Config.onoff.client_db)
+ return NULL;
+
+ addr.NtoA(key,MAX_IPSTRLEN);
+
+ c = (ClientInfo *) hash_lookup(client_table, key);
+ if (c==NULL)
+ {
+ debugs(77,1,"Client db does not contain information for given IP address "<<(const char*)key);
+ return NULL;
+ }
+ return c;
+}
+#endif
void
clientdbUpdate(const Ip::Address &addr, log_type ltype, protocol_t p, size_t size)
{
{
ClientInfo *c = (ClientInfo *)data;
safe_free(c->hash.key);
+
+#if DELAY_POOLS
+ if (CommQuotaQueue *q = c->quotaQueue) {
+ q->clientInfo = NULL;
+ delete q; // invalidates cbdata, cancelling any pending kicks
+ cbdataReferenceDone(q);
+ }
+#endif
+
memFree(c, MEM_CLIENT_INFO);
}
#include "SquidTime.h"
#include "Store.h"
+#if DELAY_POOLS
+#include "ClientInfo.h"
+#endif
+
#if LINGERING_CLOSE
#define comm_close comm_lingering_close
#endif
clientdbEstablished(details->peer, 1);
+#if DELAY_POOLS
+ fd_table[newfd].clientInfo = NULL;
+
+ if (Config.onoff.client_db)
+ {
+ /* it was said several times that client write limiter does not work if client_db is disabled */
+
+ ClientDelayPools& pools(Config.ClientDelay.pools);
+ for (unsigned int pool = 0; pool < pools.size(); pool++) {
+
+ /* pools require explicit 'allow' to assign a client into them */
+ if (!pools[pool].access)
+ continue; // warned in ClientDelayConfig::Finalize()
+
+ ACLFilledChecklist ch(pools[pool].access, NULL, NULL);
+
+ // TODO: we check early to limit error response bandwith but we
+ // should recheck when we can honor delay_pool_uses_indirect
+
+ ch.src_addr = details->peer;
+ ch.my_addr = details->me;
+
+ if (ch.fastCheck()) {
+
+ /* request client information from db after we did all checks
+ this will save hash lookup if client failed checks */
+ ClientInfo * cli = clientdbGetInfo(details->peer);
+ assert(cli);
+
+ /* put client info in FDE */
+ fd_table[newfd].clientInfo = cli;
+
+ /* setup write limiter for this request */
+ const double burst = floor(0.5 +
+ (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
+ cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+ break;
+ }
+ }
+ }
+#endif
incoming_sockets_accepted++;
}
#include "ip/Intercept.h"
#include "ip/QosConfig.h"
#include "ip/tools.h"
+#include "ClientInfo.h"
+#include "cbdata.h"
#if defined(_SQUID_CYGWIN_)
#include <sys/ioctl.h>
#endif
static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
+#if DELAY_POOLS
+CBDATA_CLASS_INIT(CommQuotaQueue);
+
+static void commHandleWriteHelper(void * data);
+#endif
+
+static void commSelectOrQueueWrite(const int fd);
struct comm_io_callback_t {
iocb_type type;
int offset;
comm_err_t errcode;
int xerrno;
+#if DELAY_POOLS
+ unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
+#endif
+
bool active() const { return callback != NULL; }
};
ccb->errcode = code;
ccb->xerrno = xerrno;
+#if DELAY_POOLS
+ ccb->quotaQueueReserv = 0;
+#endif
+
comm_io_callback_t cb = *ccb;
/* We've got a copy; blow away the real one */
ccb->xerrno = 0;
ccb->callback = NULL;
+
+#if DELAY_POOLS
+ ccb->quotaQueueReserv = 0;
+#endif
}
/*
commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
}
+#if DELAY_POOLS
+ if (ClientInfo *clientInfo = F->clientInfo) {
+ if (clientInfo->selectWaiting) {
+ clientInfo->selectWaiting = false;
+ // kick queue or it will get stuck as commWriteHandle is not called
+ clientInfo->kickQuotaQueue();
+ }
+ }
+#endif
+
commCallCloseHandlers(fd);
if (F->pconn.uses)
safe_free(commfd_table);
}
+#if DELAY_POOLS
+// called when the queue is done waiting for the client bucket to fill
+static void
+commHandleWriteHelper(void * data)
+{
+ CommQuotaQueue *queue = static_cast<CommQuotaQueue*>(data);
+ assert(queue);
+
+ ClientInfo *clientInfo = queue->clientInfo;
+ // ClientInfo invalidates queue if freed, so if we got here through,
+ // evenAdd cbdata protections, everything should be valid and consistent
+ assert(clientInfo);
+ assert(clientInfo->hasQueue());
+ assert(clientInfo->hasQueue(queue));
+ assert(!clientInfo->selectWaiting);
+ assert(clientInfo->eventWaiting);
+ clientInfo->eventWaiting = false;
+
+ do {
+ // check that the head descriptor is still relevant
+ const int head = clientInfo->quotaPeekFd();
+ comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
+
+ if (fd_table[head].clientInfo == clientInfo &&
+ clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
+ !fd_table[head].closing()) {
+
+ // wait for the head descriptor to become ready for writing
+ commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+ clientInfo->selectWaiting = true;
+ return;
+ }
+
+ clientInfo->quotaDequeue(); // remove the no longer relevant descriptor
+ // and continue looking for a relevant one
+ } while (clientInfo->hasQueue());
+
+ debugs(77,3, HERE << "emptied queue");
+}
+
+bool
+ClientInfo::hasQueue() const
+{
+ assert(quotaQueue);
+ return !quotaQueue->empty();
+}
+
+bool
+ClientInfo::hasQueue(const CommQuotaQueue *q) const
+{
+ assert(quotaQueue);
+ return quotaQueue == q;
+}
+
+/// returns the first descriptor to be dequeued
+int
+ClientInfo::quotaPeekFd() const
+{
+ assert(quotaQueue);
+ return quotaQueue->front();
+}
+
+/// returns the reservation ID of the first descriptor to be dequeued
+unsigned int
+ClientInfo::quotaPeekReserv() const
+{
+ assert(quotaQueue);
+ return quotaQueue->outs + 1;
+}
+
+/// queues a given fd, creating the queue if necessary; returns reservation ID
+unsigned int
+ClientInfo::quotaEnqueue(int fd)
+{
+ assert(quotaQueue);
+ return quotaQueue->enqueue(fd);
+}
+
+/// removes queue head
+void
+ClientInfo::quotaDequeue()
+{
+ assert(quotaQueue);
+ quotaQueue->dequeue();
+}
+
+void
+ClientInfo::kickQuotaQueue()
+{
+ if (!eventWaiting && !selectWaiting && hasQueue()) {
+ // wait at least a second if the bucket is empty
+ const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+ eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
+ quotaQueue, delay, 0, true);
+ eventWaiting = true;
+ }
+}
+
+/// calculates how much to write for a single dequeued client
+int
+ClientInfo::quotaForDequed()
+{
+ /* If we have multiple clients and give full bucketSize to each client then
+ * clt1 may often get a lot more because clt1->clt2 time distance in the
+ * select(2) callback order may be a lot smaller than cltN->clt1 distance.
+ * We divide quota evenly to be more fair. */
+
+ if (!rationedCount) {
+ rationedCount = quotaQueue->size() + 1;
+
+ // The delay in ration recalculation _temporary_ deprives clients from
+ // bytes that should have trickled in while rationedCount was positive.
+ refillBucket();
+
+ // Rounding errors do not accumulate here, but we round down to avoid
+ // negative bucket sizes after write with rationedCount=1.
+ rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+ debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
+ '*' << rationedCount);
+ }
+
+ --rationedCount;
+ debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
+ " rations remaining: " << rationedCount);
+
+ // update 'last seen' time to prevent clientdb GC from dropping us
+ last_seen = squid_curtime;
+ return rationedQuota;
+}
+
+///< adds bytes to the quota bucket based on the rate and passed time
+void
+ClientInfo::refillBucket()
+{
+ // all these times are in seconds, with double precision
+ const double currTime = current_dtime;
+ const double timePassed = currTime - prevTime;
+
+ // Calculate allowance for the time passed. Use double to avoid
+ // accumulating rounding errors for small intervals. For example, always
+ // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+ const double gain = timePassed * writeSpeedLimit;
+
+ debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
+ bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
+ " = " << gain << ')');
+
+ // to further combat error accumulation during micro updates,
+ // quit before updating time if we cannot add at least one byte
+ if (gain < 1.0)
+ return;
+
+ prevTime = currTime;
+
+ // for "first" connections, drain initial fat before refilling but keep
+ // updating prevTime to avoid bursts after the fat is gone
+ if (bucketSize > bucketSizeLimit) {
+ debugs(77,4, HERE << "not refilling while draining initial fat");
+ return;
+ }
+
+ bucketSize += gain;
+
+ // obey quota limits
+ if (bucketSize > bucketSizeLimit)
+ bucketSize = bucketSizeLimit;
+}
+
+void
+ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
+{
+ debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+ " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
+ " highwatermark=" << aHighWatermark);
+
+ // set or possibly update traffic shaping parameters
+ writeLimitingActive = true;
+ writeSpeedLimit = aWriteSpeedLimit;
+ bucketSizeLimit = aHighWatermark;
+
+ // but some members should only be set once for a newly activated bucket
+ if (firstTimeConnection) {
+ firstTimeConnection = false;
+
+ assert(!selectWaiting);
+ assert(!quotaQueue);
+ quotaQueue = new CommQuotaQueue(this);
+ cbdataReference(quotaQueue);
+
+ bucketSize = anInitialBurst;
+ prevTime = current_dtime;
+ }
+}
+
+CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
+ ins(0), outs(0)
+{
+ assert(clientInfo);
+}
+
+CommQuotaQueue::~CommQuotaQueue()
+{
+ assert(!clientInfo); // ClientInfo should clear this before destroying us
+}
+
+/// places the given fd at the end of the queue; returns reservation ID
+unsigned int
+CommQuotaQueue::enqueue(int fd)
+{
+ debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+ ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
+ fds.push_back(fd);
+ return ++ins;
+}
+
+/// removes queue head
+void
+CommQuotaQueue::dequeue()
+{
+ assert(!fds.empty());
+ debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+ ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
+ fds.size());
+ fds.pop_front();
+ ++outs;
+}
+
+#endif
+
/* Write to FD. */
static void
commHandleWrite(int fd, void *data)
(long int) state->offset << ", sz " << (long int) state->size << ".");
nleft = state->size - state->offset;
+
+#if DELAY_POOLS
+ ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+ if (clientInfo && !clientInfo->writeLimitingActive)
+ clientInfo = NULL; // we only care about quota limits here
+
+ if (clientInfo) {
+ assert(clientInfo->selectWaiting);
+ clientInfo->selectWaiting = false;
+
+ assert(clientInfo->hasQueue());
+ assert(clientInfo->quotaPeekFd() == fd);
+ clientInfo->quotaDequeue(); // we will write or requeue below
+
+ if (nleft > 0) {
+ const int quota = clientInfo->quotaForDequed();
+ if (!quota) { // if no write quota left, queue this fd
+ state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+ clientInfo->kickQuotaQueue();
+ PROF_stop(commHandleWrite);
+ return;
+ }
+
+ const int nleft_corrected = min(nleft, quota);
+ if (nleft != nleft_corrected) {
+ debugs(5, 5, HERE << "FD " << fd << " writes only " <<
+ nleft_corrected << " out of " << nleft);
+ nleft = nleft_corrected;
+ }
+
+ }
+ }
+
+#endif
+
+ /* actually WRITE data */
len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
debugs(5, 5, "commHandleWrite: write() returns " << len);
+
+#if DELAY_POOLS
+ if (clientInfo) {
+ if (len > 0) {
+ /* we wrote data - drain them from bucket */
+ clientInfo->bucketSize -= len;
+ if (clientInfo->bucketSize < 0.0)
+ {
+ debugs(5,1, HERE << "drained too much"); // should not happen
+ clientInfo->bucketSize = 0;
+ }
+ }
+
+ // even if we wrote nothing, we were served; give others a chance
+ clientInfo->kickQuotaQueue();
+ }
+#endif
+
fd_bytes(fd, len, FD_WRITE);
statCounter.syscalls.sock.writes++;
// After each successful partial write,
commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
} else if (ignoreErrno(errno)) {
debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
- commSetSelect(fd,
- COMM_SELECT_WRITE,
- commHandleWrite,
- state,
- 0);
+ commSelectOrQueueWrite(fd);
} else {
debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
if (state->offset < state->size) {
/* Not done, reinstall the write handler and write some more */
- commSetSelect(fd,
- COMM_SELECT_WRITE,
- commHandleWrite,
- state,
- 0);
+ commSelectOrQueueWrite(fd);
} else {
commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
}
/* Queue the write */
commio_set_callback(fd, IOCB_WRITE, ccb, callback,
(char *)buf, free_func, size);
+
+ commSelectOrQueueWrite(fd);
+}
+
+// called when fd needs to write but may need to wait in line for its quota
+static void
+commSelectOrQueueWrite(const int fd)
+{
+ comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
+
+#if DELAY_POOLS
+ // stand in line if there is one
+ if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
+ if (clientInfo->writeLimitingActive) {
+ ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+ clientInfo->kickQuotaQueue();
+ return;
+ }
+ }
+#endif
+
commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
}
#include "comm.h"
#include "ip/Address.h"
+#if DELAY_POOLS
+class ClientInfo;
+#endif
class PconnPool;
class fde
PconnPool *pool;
} pconn;
+#if DELAY_POOLS
+ ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */
+#endif
unsigned epoll_state;
struct _fde_disk disk;
bytes_written = 0;
pconn.uses = 0;
pconn.pool = NULL;
+ clientInfo = NULL;
epoll_state = 0;
memset(&disk, 0, sizeof(_fde_disk));
read_handler = NULL;
#include "icmp/IcmpSquid.h"
#include "icmp/net_db.h"
+#if DELAY_POOLS
+#include "ClientDelayConfig.h"
+#endif
+
#if USE_LOADABLE_MODULES
#include "LoadableModules.h"
#endif
mimeInit(Config.mimeTablePathname);
+#if DELAY_POOLS
+ Config.ClientDelay.finalize();
+#endif
+
if (Config.onoff.announce) {
if (!eventFind(start_announce, NULL))
eventAdd("start_announce", start_announce, NULL, 3600.0, 1);
Esi::Init();
#endif
+#if DELAY_POOLS
+ Config.ClientDelay.finalize();
+#endif
+
debugs(1, 1, "Ready to serve requests.");
if (!configured_once) {
class HttpRequestMethod;
+#if DELAY_POOLS
+class ClientInfo;
+#endif
#if USE_FORW_VIA_DB
SQUIDCEXTERN void clientdbFreeMemory(void);
SQUIDCEXTERN int clientdbEstablished(const Ip::Address &, int);
+#if DELAY_POOLS
+SQUIDCEXTERN void clientdbSetWriteLimiter(ClientInfo * info, const int writeSpeedLimit,const double initialBurst,const double highWatermark);
+SQUIDCEXTERN ClientInfo * clientdbGetInfo(const Ip::Address &addr);
+#endif
SQUIDCEXTERN void clientOpenListenSockets(void);
SQUIDCEXTERN void clientHttpConnectionsClose(void);
SQUIDCEXTERN void httpRequestFree(void *);
#if DELAY_POOLS
#include "DelayConfig.h"
+#include "ClientDelayConfig.h"
#endif
#if USE_ICMP
#if DELAY_POOLS
DelayConfig Delay;
+ ClientDelayConfig ClientDelay;
#endif
struct {