]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Author: Alex Rousskov <rousskov@measurement-factory.com>
authorChristos Tsantilas <chtsanti@users.sourceforge.net>
Mon, 25 Oct 2010 18:25:19 +0000 (21:25 +0300)
committerChristos Tsantilas <chtsanti@users.sourceforge.net>
Mon, 25 Oct 2010 18:25:19 +0000 (21:25 +0300)
Client-side bandwidth limit (a.k.a., quota or delay pool) implementation.

In mobile environments, Squid may need to limit Squid-to-client bandwidth
available to individual users, identified by their IP addresses. The IP
address pool can be as large as a /10 IPv4 network (4 million unique IP
addresses) and even larger in IPv6 environments. On the other hand, the code
should support thousands of connections coming from a single IP (e.g., a child
proxy).

The implementation is based on storing bandwidth-related "bucket" information
in the existing "client database" hash (client_db.cc). The old code already
assigned each client IP a single ClientInfo object, which satisfies the
client-side IP-based bandwidth pooling requirements. The old hash size is
increased to support up to 32K concurrent clients if needed.

Client-side pools are configured similarly to server-side ones, but there is
only one pool class. See client_delay_pools,
client_delay_initial_bucket_level, client_delay_parameters, and
client_delay_access in squid.conf. The client_delay_access matches the client
with delay parameters. It does not pool clients from different IP addresses
together.

Special care is taken to provide fair distribution of bandwidth among clients
sharing the same bucket (i.e., clients coming from the same IP address).
Multiple same-IP clients competing for bandwidth are queued using FIFO
algorithm. If a bucket becomes empty, the first client among those sharing the
bucket is delayed by 1 second before it can attempt to receive more response
data from Squid.  This delay may need to be lowered in high-bandwidth
environments.

This feature has been documented at
http://wiki.squid-cache.org/Features/ClientBandwidthLimit

13 files changed:
doc/release-notes/release-3.3.sgml
src/ClientInfo.h
src/Makefile.am
src/cache_cf.cc
src/cf.data.depend
src/cf.data.pre
src/client_db.cc
src/client_side.cc
src/comm.cc
src/fde.h
src/main.cc
src/protos.h
src/structs.h

index 0f4f158e9bc7224e53069b4a55e7048563d8ff8b..6e2c26914dcb778ef3c102b5e1343670f1b0cbf3 100644 (file)
@@ -34,14 +34,40 @@ The 3.3 change history can be <url url="http://www.squid-cache.org/Versions/v3/3
 
 <p>The most important of these new features are:
 <itemize>
-       <item>...
+       <item>Client Bandwidth Limits
 </itemize>
 
 Most user-facing changes are reflected in squid.conf (see below).
 
-<sect1>...
-<p>...
+<sect1> Client Bandwidth Limits
+<p>In mobile environments, Squid may need to limit Squid-to-client bandwidth
+   available to individual users, identified by their IP addresses. The IP
+   address pool can be as large as a /10 IPv4 network (4 million unique IP
+   addresses) and even larger in IPv6 environments. On the other hand, the code
+   should support thousands of connections coming from a single IP (e.g.,
+   a child proxy).
+
+<p>The implementation is based on storing bandwidth-related "bucket" information
+   in the existing "client database" hash (client_db.cc). The old code already
+   assigned each client IP a single ClientInfo object, which satisfies the
+   client-side IP-based bandwidth pooling requirements. The old hash size is
+   increased to support up to 32K concurrent clients if needed.
+
+<p>Client-side pools are configured similarly to server-side ones, but there is
+   only one pool class. See client_delay_pools,
+   client_delay_initial_bucket_level, client_delay_parameters, and
+   client_delay_access in squid.conf. The client_delay_access matches the client
+   with delay parameters. It does not pool clients from different IP addresses
+   together.
 
+<p>
+   Special care is taken to provide fair distribution of bandwidth among clients
+   sharing the same bucket (i.e., clients coming from the same IP address).
+   Multiple same-IP clients competing for bandwidth are queued using FIFO
+   algorithm. If a bucket becomes empty, the first client among those sharing 
+   the bucket is delayed by 1 second before it can attempt to receive more
+   response data from Squid.  This delay may need to be lowered in 
+   high-bandwidth environments.
 
 <sect>Changes to squid.conf since Squid-3.2
 <p>
@@ -59,7 +85,22 @@ This section gives a thorough account of those changes in three categories:
 <sect1>New tags<label id="newtags">
 <p>
 <descrip>
-
+            <tag>client_delay_pools</tag>
+            <p> New setting for client bandwith limits to specifies the number 
+              of client delay pools used.
+
+            <tag>client_delay_initial_bucket_level</tag>
+            <p> New setting for client bandwith limits to determine the initial 
+              bucket size as a percentage of  max_bucket_size from 
+              client_delay_parameters.
+
+            <tag>client_delay_parameters</tag>
+            <p> New setting for client bandwith limits to configures client-side 
+              bandwidth limits.
+
+            <tag>client_delay_access</tag>
+            <p> New setting for client bandwith limits to determines the 
+              client-side delay pool for the request.
 </descrip>
 
 <sect1>Changes to existing tags<label id="modifiedtags">
index 0a44d955743f8c60febc54f34a7977eb4f40b582..e9a75963c085150fe8dcd77698260676354bfcca 100644 (file)
@@ -5,6 +5,12 @@
 #include "hash.h"
 #include "enums.h"
 #include "typedefs.h"
+#include "cbdata.h"
+#include <deque>
+
+#if DELAY_POOLS
+class CommQuotaQueue;
+#endif
 
 class ClientInfo
 {
@@ -28,6 +34,74 @@ public:
     } cutoff;
     int n_established;          /* number of current established connections */
     time_t last_seen;
+#if DELAY_POOLS
+    double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client 
+    double prevTime; ///< previous time when we checked 
+    double bucketSize; ///< how much can be written now
+    double bucketSizeLimit;  ///< maximum bucket size
+    bool writeLimitingActive; ///< Is write limiter active 
+    bool firstTimeConnection;///< is this first time connection for this client
+
+    CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
+    int rationedQuota; ///< precomputed quota preserving fairness among clients
+    int rationedCount; ///< number of clients that will receive rationedQuota
+    bool selectWaiting; ///< is between commSetSelect and commHandleWrite
+    bool eventWaiting; ///< waiting for commHandleWriteHelper event to fire
+
+    // all those functions access Comm fd_table and are defined in comm.cc
+    bool hasQueue() const;  ///< whether any clients are waiting for write quota
+    bool hasQueue(const CommQuotaQueue*) const;  ///< has a given queue
+    unsigned int quotaEnqueue(int fd); ///< client starts waiting in queue; create the queue if necessary
+    int quotaPeekFd() const; ///< retuns the next fd reservation
+    unsigned int quotaPeekReserv() const; ///< returns the next reserv. to pop
+    void quotaDequeue(); ///< pops queue head from queue
+    void kickQuotaQueue(); ///< schedule commHandleWriteHelper call
+    int quotaForDequed(); ///< allocate quota for a just dequeued client
+    void refillBucket(); ///< adds bytes to bucket based on rate and time
+
+    void quotaDumpQueue(); ///< dumps quota queue for debugging
+   
+/**
+ * Configure client write limiting (note:"client" here means - IP). It is called
+ * by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
+ * computed, using the configured maximum bucket vavlue and configured initial 
+ * bucket value(50% by default).
+ *
+ * \param  writeSpeedLimit is speed limit configured in config for this pool
+ * \param  initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
+ *  \param highWatermark is maximum bucket value
+ */
+    void setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark);
+#endif
+};
+
+#if DELAY_POOLS
+// a queue of Comm clients waiting for I/O quota controlled by delay pools
+class CommQuotaQueue
+{
+public:
+    CommQuotaQueue(ClientInfo *info);
+    ~CommQuotaQueue();
+
+    bool empty() const { return fds.empty(); }
+    size_t size() const { return fds.size(); }
+    int front() const { return fds.front(); }
+    unsigned int enqueue(int fd);
+    void dequeue();
+    
+    ClientInfo *clientInfo; ///< bucket responsible for quota maintenance
+
+    // these counters might overflow; that is OK because they are for IDs only
+    int ins; ///< number of enqueue calls, used to generate a "reservation" ID
+    int outs; ///< number of dequeue calls, used to check the "reservation" ID
+
+private:
+    // TODO: optimize using a Ring- or List-based store?
+    typedef std::deque<int> Store;
+    Store fds; ///< descriptor queue
+
+    CBDATA_CLASS2(CommQuotaQueue);
 };
+#endif /* DELAY_POOLS */
 
 #endif
index b0df536bcaf365cb6230fc6130cd8c7a6009f7ce..b993a61aef92576e03c0f749ca1d9941079031fe 100644 (file)
@@ -75,7 +75,10 @@ DELAY_POOL_ALL_SOURCE = \
        DelayVector.cc \
        DelayVector.h \
        NullDelayId.cc \
-       NullDelayId.h
+       NullDelayId.h \
+       ClientDelayConfig.cc \
+       ClientDelayConfig.h
+       
 if USE_DELAY_POOLS
 DELAY_POOL_SOURCE = $(DELAY_POOL_ALL_SOURCE)
 else
index 49f600408d9301640a3ba519d5b1be131ef18b2b..03602a75d859d6261d4723f0c16a5b153fb672e6 100644 (file)
@@ -1550,6 +1550,48 @@ parse_delay_pool_access(DelayConfig * cfg)
 
 #endif
 
+#if DELAY_POOLS
+#include "ClientDelayConfig.h"
+/* do nothing - free_client_delay_pool_count is the magic free function.
+ * this is why client_delay_pool_count isn't just marked TYPE: ushort
+ */
+
+#define free_client_delay_pool_access(X)
+#define free_client_delay_pool_rates(X)
+#define dump_client_delay_pool_access(X, Y, Z)
+#define dump_client_delay_pool_rates(X, Y, Z)
+
+static void
+free_client_delay_pool_count(ClientDelayConfig * cfg)
+{
+    cfg->freePoolCount();
+}
+
+static void
+dump_client_delay_pool_count(StoreEntry * entry, const char *name, ClientDelayConfig &cfg)
+{
+    cfg.dumpPoolCount (entry, name);
+}
+
+static void
+parse_client_delay_pool_count(ClientDelayConfig * cfg)
+{
+    cfg->parsePoolCount();
+}
+
+static void
+parse_client_delay_pool_rates(ClientDelayConfig * cfg)
+{
+    cfg->parsePoolRates();
+}
+
+static void
+parse_client_delay_pool_access(ClientDelayConfig * cfg)
+{
+    cfg->parsePoolAccess(LegacyParser);
+}
+#endif
+
 #if USE_HTTP_VIOLATIONS
 static void
 dump_http_header_access(StoreEntry * entry, const char *name, header_mangler header[])
index 92ff6af37648d29721b8b7496ec8faed7616c4d4..b1496baf5888a22472d81f0de2fe589d0ec74bcb 100644 (file)
@@ -20,6 +20,9 @@ delay_pool_access     acl     delay_class
 delay_pool_class       delay_pools
 delay_pool_count
 delay_pool_rates       delay_class
+client_delay_pool_access       acl
+client_delay_pool_count
+client_delay_pool_rates
 denyinfo               acl
 eol
 externalAclHelper      auth_param
index 32f57864ba936fbddec10acc9ff0041b503ec2b3..74d640f88b412317766e6f588b2a3b4bd7069eec 100644 (file)
@@ -4870,6 +4870,101 @@ DOC_START
        "seen" by squid).
 DOC_END
 
+COMMENT_START
+ CLIENT DELAY POOL PARAMETERS
+ -----------------------------------------------------------------------------
+COMMENT_END
+
+NAME: client_delay_pools
+TYPE: client_delay_pool_count
+DEFAULT: 0
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+       This option specifies the number of client delay pools used. It must
+       preceed other client_delay_* options.
+
+Example:
+ client_delay_pools 2
+DOC_END
+
+NAME: client_delay_initial_bucket_level
+COMMENT: (percent, 0-no_limit)
+TYPE: ushort
+DEFAULT: 50
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay.initial
+DOC_START
+       This option determines the initial bucket size as a percentage of
+       max_bucket_size from client_delay_parameters. Buckets are created
+       at the time of the "first" connection from the matching IP. Idle
+       buckets are periodically deleted up.
+
+       You can specify more than 100 percent but note that such "oversized"
+       buckets are not refilled until their size goes down to max_bucket_size
+       from client_delay_parameters.
+
+Example:
+ client_delay_initial_bucket_level 50
+DOC_END
+
+NAME: client_delay_parameters
+TYPE: client_delay_pool_rates
+DEFAULT: none
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+
+       This option configures client-side bandwidth limits using the
+       following format:
+
+           client_delay_parameters pool speed_limit max_bucket_size
+
+       pool is an integer ID used for client_delay_access matching.
+
+       speed_limit is bytes added to the bucket per second.
+
+       max_bucket_size is the maximum size of a bucket, enforced after any
+       speed_limit additions.
+
+       Please see the delay_parameters option for more information and
+       examples.
+
+Example:
+ client_delay_parameters 1 1024 2048
+ client_delay_parameters 2 51200 16384
+DOC_END
+
+NAME: client_delay_access
+TYPE: client_delay_pool_access
+DEFAULT: none
+IFDEF: DELAY_POOLS
+LOC: Config.ClientDelay
+DOC_START
+
+       This option determines the client-side delay pool for the
+       request:
+
+           client_delay_access pool_ID allow|deny acl_name
+
+       All client_delay_access options are checked in their pool ID
+       order, starting with pool 1. The first checked pool with allowed
+       request is selected for the request. If no ACL matches or there
+       are no client_delay_access options, the request bandwidth is not
+       limited.
+
+       The ACL-selected pool is then used to find the
+       client_delay_parameters for the request. Client-side pools are
+       not used to aggregate clients. Clients are always aggregated
+       based on their source IP addresses (one bucket per source IP).
+
+       Please see delay_access for more examples.
+
+Example:
+ client_delay_access 1 allow low_rate_network
+ client_delay_access 2 allow vips_network
+DOC_END
+
 COMMENT_START
  WCCPv1 AND WCCPv2 CONFIGURATION OPTIONS
  -----------------------------------------------------------------------------
index 8e05adec2cac639c3fc04c28c64090a4c7afbd44..10312c9eda53375e918f49b5c577c95e426a4124 100644 (file)
@@ -49,12 +49,21 @@ static FREE clientdbFreeItem;
 static void clientdbStartGC(void);
 static void clientdbScheduledGC(void *);
 
+#if DELAY_POOLS
+static int max_clients = 32768;
+#else
 static int max_clients = 32;
+#endif
+
 static int cleanup_running = 0;
 static int cleanup_scheduled = 0;
 static int cleanup_removed;
 
+#if DELAY_POOLS
+#define CLIENT_DB_HASH_SIZE 65357
+#else
 #define CLIENT_DB_HASH_SIZE 467
+#endif
 
 static ClientInfo *
 
@@ -65,6 +74,22 @@ clientdbAdd(const Ip::Address &addr)
     c = (ClientInfo *)memAllocate(MEM_CLIENT_INFO);
     c->hash.key = addr.NtoA(buf,MAX_IPSTRLEN);
     c->addr = addr;
+#if DELAY_POOLS
+    /* setup default values for client write limiter */
+    c->writeLimitingActive=false;
+    c->writeSpeedLimit=0;
+    c->bucketSize = 0;
+    c->firstTimeConnection=true;
+    c->quotaQueue = NULL;
+    c->rationedQuota = 0;
+    c->rationedCount = 0;
+    c->selectWaiting = false;
+    c->eventWaiting = false;
+
+    /* get current time */
+    getCurrentTime();
+    c->prevTime=current_dtime;/* put current time to have something sensible here */
+#endif
     hash_join(client_table, &c->hash);
     statCounter.client_http.clients++;
 
@@ -92,9 +117,32 @@ clientdbInit(void)
         return;
 
     client_table = hash_create((HASHCMP *) strcmp, CLIENT_DB_HASH_SIZE, hash_string);
-
 }
 
+#if DELAY_POOLS
+/* returns ClientInfo for given IP addr
+   Returns NULL if no such client (or clientdb turned off)
+   (it is assumed that clientdbEstablished will be called before and create client record if needed)
+*/
+ClientInfo * clientdbGetInfo(const Ip::Address &addr)
+{
+    char key[MAX_IPSTRLEN];
+    ClientInfo *c;
+
+    if (!Config.onoff.client_db)
+        return NULL;
+
+    addr.NtoA(key,MAX_IPSTRLEN);
+
+    c = (ClientInfo *) hash_lookup(client_table, key);
+    if (c==NULL)
+    {
+        debugs(77,1,"Client db does not contain information for given IP address "<<(const char*)key);
+        return NULL;
+    }
+    return c;
+}
+#endif
 void
 clientdbUpdate(const Ip::Address &addr, log_type ltype, protocol_t p, size_t size)
 {
@@ -299,6 +347,15 @@ clientdbFreeItem(void *data)
 {
     ClientInfo *c = (ClientInfo *)data;
     safe_free(c->hash.key);
+
+#if DELAY_POOLS
+    if (CommQuotaQueue *q = c->quotaQueue) {
+        q->clientInfo = NULL;
+        delete q; // invalidates cbdata, cancelling any pending kicks
+        cbdataReferenceDone(q);
+       }
+#endif
+
     memFree(c, MEM_CLIENT_INFO);
 }
 
index 0724e448770dbbbc4e4e1ca4ea014e034d4d6d43..a738bf47e68b5926abdb31d8bfa56dc12f7f048e 100644 (file)
 #include "SquidTime.h"
 #include "Store.h"
 
+#if DELAY_POOLS
+#include "ClientInfo.h"
+#endif
+
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
 #endif
@@ -3148,6 +3152,47 @@ httpAccept(int sock, int newfd, ConnectionDetail *details,
 
     clientdbEstablished(details->peer, 1);
 
+#if DELAY_POOLS
+    fd_table[newfd].clientInfo = NULL;
+
+    if (Config.onoff.client_db)
+    {
+        /* it was said several times that client write limiter does not work if client_db is disabled */
+
+        ClientDelayPools& pools(Config.ClientDelay.pools);
+        for (unsigned int pool = 0; pool < pools.size(); pool++) {
+
+            /* pools require explicit 'allow' to assign a client into them */
+            if (!pools[pool].access)
+                continue; // warned in ClientDelayConfig::Finalize()
+
+            ACLFilledChecklist ch(pools[pool].access, NULL, NULL);
+
+            // TODO: we check early to limit error response bandwith but we 
+            // should recheck when we can honor delay_pool_uses_indirect
+
+            ch.src_addr = details->peer;
+            ch.my_addr = details->me;
+
+            if (ch.fastCheck()) {
+
+                /*  request client information from db after we did all checks
+                    this will save hash lookup if client failed checks */
+                ClientInfo * cli = clientdbGetInfo(details->peer);
+                assert(cli);
+
+                /* put client info in FDE */
+                fd_table[newfd].clientInfo = cli;
+
+                /* setup write limiter for this request */
+                const double burst = floor(0.5 +
+                    (pools[pool].highwatermark * Config.ClientDelay.initial)/100.0);
+                cli->setWriteLimiter(pools[pool].rate, burst, pools[pool].highwatermark);
+                break;
+            }
+        }
+    }
+#endif
     incoming_sockets_accepted++;
 }
 
index a5a4a7002ae939d6370b8f2472a7605f624b6e2b..60eade943529b8274c8a796096cb782123e21018 100644 (file)
@@ -53,7 +53,9 @@
 #include "ip/Intercept.h"
 #include "ip/QosConfig.h"
 #include "ip/tools.h"
+#include "ClientInfo.h"
 
+#include "cbdata.h"
 #if defined(_SQUID_CYGWIN_)
 #include <sys/ioctl.h>
 #endif
@@ -76,6 +78,13 @@ static IOCB commHalfClosedReader;
 static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
 static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
 
+#if DELAY_POOLS
+CBDATA_CLASS_INIT(CommQuotaQueue);
+
+static void commHandleWriteHelper(void * data);
+#endif
+
+static void commSelectOrQueueWrite(const int fd);
 
 struct comm_io_callback_t {
     iocb_type type;
@@ -87,6 +96,10 @@ struct comm_io_callback_t {
     int offset;
     comm_err_t errcode;
     int xerrno;
+#if DELAY_POOLS
+    unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
+#endif
+
 
     bool active() const { return callback != NULL; }
 };
@@ -146,6 +159,10 @@ commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xer
     ccb->errcode = code;
     ccb->xerrno = xerrno;
 
+#if DELAY_POOLS
+    ccb->quotaQueueReserv = 0;
+#endif
+
     comm_io_callback_t cb = *ccb;
 
     /* We've got a copy; blow away the real one */
@@ -187,6 +204,10 @@ commio_cancel_callback(int fd, comm_io_callback_t *ccb)
 
     ccb->xerrno = 0;
     ccb->callback = NULL;
+
+#if DELAY_POOLS
+    ccb->quotaQueueReserv = 0;
+#endif
 }
 
 /*
@@ -1589,6 +1610,16 @@ _comm_close(int fd, char const *file, int line)
         commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
     }
 
+#if DELAY_POOLS
+    if (ClientInfo *clientInfo = F->clientInfo) {
+        if (clientInfo->selectWaiting) {
+            clientInfo->selectWaiting = false;
+            // kick queue or it will get stuck as commWriteHandle is not called
+            clientInfo->kickQuotaQueue();
+        }
+     }
+#endif
+
     commCallCloseHandlers(fd);
 
     if (F->pconn.uses)
@@ -1935,6 +1966,235 @@ comm_exit(void)
     safe_free(commfd_table);
 }
 
+#if DELAY_POOLS
+// called when the queue is done waiting for the client bucket to fill
+static void
+commHandleWriteHelper(void * data)
+{
+    CommQuotaQueue *queue = static_cast<CommQuotaQueue*>(data);
+    assert(queue);
+
+    ClientInfo *clientInfo = queue->clientInfo;
+    // ClientInfo invalidates queue if freed, so if we got here through,
+    // evenAdd cbdata protections, everything should be valid and consistent
+    assert(clientInfo); 
+    assert(clientInfo->hasQueue());
+    assert(clientInfo->hasQueue(queue));
+    assert(!clientInfo->selectWaiting);
+    assert(clientInfo->eventWaiting);
+    clientInfo->eventWaiting = false;
+
+    do {
+        // check that the head descriptor is still relevant
+        const int head = clientInfo->quotaPeekFd();
+        comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
+
+        if (fd_table[head].clientInfo == clientInfo &&
+            clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
+            !fd_table[head].closing()) {
+
+            // wait for the head descriptor to become ready for writing
+            commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+            clientInfo->selectWaiting = true;
+            return;
+         }
+
+         clientInfo->quotaDequeue(); // remove the no longer relevant descriptor
+         // and continue looking for a relevant one
+    } while (clientInfo->hasQueue());
+
+    debugs(77,3, HERE << "emptied queue");
+}
+
+bool
+ClientInfo::hasQueue() const
+{
+    assert(quotaQueue);
+    return !quotaQueue->empty();
+}
+
+bool
+ClientInfo::hasQueue(const CommQuotaQueue *q) const
+{
+    assert(quotaQueue);
+    return quotaQueue == q;
+}
+
+/// returns the first descriptor to be dequeued
+int
+ClientInfo::quotaPeekFd() const
+{
+    assert(quotaQueue);
+    return quotaQueue->front();
+}
+
+/// returns the reservation ID of the first descriptor to be dequeued
+unsigned int
+ClientInfo::quotaPeekReserv() const
+{
+    assert(quotaQueue);
+    return quotaQueue->outs + 1;
+}
+
+/// queues a given fd, creating the queue if necessary; returns reservation ID
+unsigned int
+ClientInfo::quotaEnqueue(int fd)
+{
+    assert(quotaQueue);
+    return quotaQueue->enqueue(fd);
+}
+
+/// removes queue head
+void
+ClientInfo::quotaDequeue()
+{
+    assert(quotaQueue);
+    quotaQueue->dequeue();
+}
+
+void
+ClientInfo::kickQuotaQueue()
+{
+    if (!eventWaiting && !selectWaiting && hasQueue()) {
+        // wait at least a second if the bucket is empty
+        const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
+        eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
+            quotaQueue, delay, 0, true);
+        eventWaiting = true;
+    }
+}
+
+/// calculates how much to write for a single dequeued client
+int
+ClientInfo::quotaForDequed()
+{
+    /* If we have multiple clients and give full bucketSize to each client then
+     * clt1 may often get a lot more because clt1->clt2 time distance in the
+     * select(2) callback order may be a lot smaller than cltN->clt1 distance.
+     * We divide quota evenly to be more fair. */
+
+    if (!rationedCount) {
+        rationedCount = quotaQueue->size() + 1;
+
+        // The delay in ration recalculation _temporary_ deprives clients from
+        // bytes that should have trickled in while rationedCount was positive.
+        refillBucket();
+
+        // Rounding errors do not accumulate here, but we round down to avoid
+        // negative bucket sizes after write with rationedCount=1.
+        rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
+        debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
+            '*' << rationedCount);
+    }
+
+    --rationedCount;
+    debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
+        " rations remaining: " << rationedCount);
+
+    // update 'last seen' time to prevent clientdb GC from dropping us
+    last_seen = squid_curtime;
+    return rationedQuota;
+}
+
+///< adds bytes to the quota bucket based on the rate and passed time
+void
+ClientInfo::refillBucket()
+{
+    // all these times are in seconds, with double precision
+    const double currTime = current_dtime;
+    const double timePassed = currTime - prevTime;
+
+    // Calculate allowance for the time passed. Use double to avoid 
+    // accumulating rounding errors for small intervals. For example, always
+    // adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
+    const double gain = timePassed * writeSpeedLimit;
+
+    debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
+        bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
+        " = " << gain << ')');
+
+    // to further combat error accumulation during micro updates,
+    // quit before updating time if we cannot add at least one byte
+    if (gain < 1.0)
+       return;
+
+    prevTime = currTime;
+
+    // for "first" connections, drain initial fat before refilling but keep
+    // updating prevTime to avoid bursts after the fat is gone
+    if (bucketSize > bucketSizeLimit) {
+       debugs(77,4, HERE << "not refilling while draining initial fat");
+       return;
+    }
+
+    bucketSize += gain;
+
+    // obey quota limits
+    if (bucketSize > bucketSizeLimit)
+        bucketSize = bucketSizeLimit;
+}
+
+void 
+ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
+{
+    debugs(77,5, HERE << "Write limits for " << (const char*)hash.key << 
+        " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
+        " highwatermark=" << aHighWatermark);
+
+    // set or possibly update traffic shaping parameters
+    writeLimitingActive = true;
+    writeSpeedLimit = aWriteSpeedLimit;
+    bucketSizeLimit = aHighWatermark;
+
+    // but some members should only be set once for a newly activated bucket
+    if (firstTimeConnection) {
+        firstTimeConnection = false;
+
+        assert(!selectWaiting);
+        assert(!quotaQueue);
+        quotaQueue = new CommQuotaQueue(this);
+        cbdataReference(quotaQueue);
+
+        bucketSize = anInitialBurst;
+        prevTime = current_dtime;
+    }
+}
+
+CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
+    ins(0), outs(0)
+{
+    assert(clientInfo);
+}
+
+CommQuotaQueue::~CommQuotaQueue()
+{
+    assert(!clientInfo); // ClientInfo should clear this before destroying us
+}
+
+/// places the given fd at the end of the queue; returns reservation ID
+unsigned int
+CommQuotaQueue::enqueue(int fd)
+{
+    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+        ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
+    fds.push_back(fd);
+    return ++ins;
+}
+
+/// removes queue head
+void
+CommQuotaQueue::dequeue()
+{
+    assert(!fds.empty());
+    debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
+        ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
+        fds.size());
+    fds.pop_front();
+    ++outs;
+}
+
+#endif
+
 /* Write to FD. */
 static void
 commHandleWrite(int fd, void *data)
@@ -1950,8 +2210,63 @@ commHandleWrite(int fd, void *data)
            (long int) state->offset << ", sz " << (long int) state->size << ".");
 
     nleft = state->size - state->offset;
+
+#if DELAY_POOLS
+    ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+    if (clientInfo && !clientInfo->writeLimitingActive)
+        clientInfo = NULL; // we only care about quota limits here
+
+    if (clientInfo) {
+        assert(clientInfo->selectWaiting);
+        clientInfo->selectWaiting = false;
+
+        assert(clientInfo->hasQueue());
+        assert(clientInfo->quotaPeekFd() == fd);
+        clientInfo->quotaDequeue(); // we will write or requeue below
+
+        if (nleft > 0) {
+            const int quota = clientInfo->quotaForDequed();
+            if (!quota) {  // if no write quota left, queue this fd
+                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+                clientInfo->kickQuotaQueue();
+                PROF_stop(commHandleWrite);
+                return;
+            }
+
+            const int nleft_corrected = min(nleft, quota);
+            if (nleft != nleft_corrected) {
+                debugs(5, 5, HERE << "FD " << fd << " writes only " <<
+                    nleft_corrected << " out of " << nleft);
+                nleft = nleft_corrected;
+            }
+
+        }
+    }
+
+#endif
+
+    /* actually WRITE data */
     len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
     debugs(5, 5, "commHandleWrite: write() returns " << len);
+
+#if DELAY_POOLS
+    if (clientInfo) {
+        if (len > 0) {
+            /* we wrote data - drain them from bucket */
+            clientInfo->bucketSize -= len;
+            if (clientInfo->bucketSize < 0.0)
+            {
+                debugs(5,1, HERE << "drained too much"); // should not happen
+                clientInfo->bucketSize = 0;
+            }
+         }
+
+         // even if we wrote nothing, we were served; give others a chance
+         clientInfo->kickQuotaQueue();
+    }
+#endif
+
     fd_bytes(fd, len, FD_WRITE);
     statCounter.syscalls.sock.writes++;
     // After each successful partial write,
@@ -1974,11 +2289,7 @@ commHandleWrite(int fd, void *data)
             commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
         } else if (ignoreErrno(errno)) {
             debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commSetSelect(fd,
-                          COMM_SELECT_WRITE,
-                          commHandleWrite,
-                          state,
-                          0);
+            commSelectOrQueueWrite(fd);
         } else {
             debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
             commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
@@ -1989,11 +2300,7 @@ commHandleWrite(int fd, void *data)
 
         if (state->offset < state->size) {
             /* Not done, reinstall the write handler and write some more */
-            commSetSelect(fd,
-                          COMM_SELECT_WRITE,
-                          commHandleWrite,
-                          state,
-                          0);
+            commSelectOrQueueWrite(fd);
         } else {
             commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
         }
@@ -2032,6 +2339,27 @@ comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE
     /* Queue the write */
     commio_set_callback(fd, IOCB_WRITE, ccb, callback,
                         (char *)buf, free_func, size);
+
+    commSelectOrQueueWrite(fd);
+}
+
+// called when fd needs to write but may need to wait in line for its quota
+static void
+commSelectOrQueueWrite(const int fd)
+{
+    comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
+
+#if DELAY_POOLS
+    // stand in line if there is one
+    if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
+        if (clientInfo->writeLimitingActive) {
+            ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+            clientInfo->kickQuotaQueue();
+            return;
+        }
+    }
+#endif
+
     commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
 }
 
index 089ad24c17baf5f491cd75f3ebf1160c1def5d0b..52535d1445eaafae9045d9143b7dac9484ccb626 100644 (file)
--- a/src/fde.h
+++ b/src/fde.h
@@ -33,6 +33,9 @@
 #include "comm.h"
 #include "ip/Address.h"
 
+#if DELAY_POOLS
+class ClientInfo;
+#endif
 class PconnPool;
 
 class fde
@@ -89,6 +92,9 @@ public:
         PconnPool *pool;
     } pconn;
 
+#if DELAY_POOLS
+    ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */
+#endif
     unsigned epoll_state;
 
     struct _fde_disk disk;
@@ -140,6 +146,7 @@ private:
         bytes_written = 0;
         pconn.uses = 0;
         pconn.pool = NULL;
+        clientInfo = NULL;
         epoll_state = 0;
         memset(&disk, 0, sizeof(_fde_disk));
         read_handler = NULL;
index a326df7686ba8265a41368d9e72d47f57b0c46a7..e8bc8b0d4ee0d9972050bed3f1d068a4bae0a3a5 100644 (file)
 #include "icmp/IcmpSquid.h"
 #include "icmp/net_db.h"
 
+#if DELAY_POOLS
+#include "ClientDelayConfig.h"
+#endif
+
 #if USE_LOADABLE_MODULES
 #include "LoadableModules.h"
 #endif
@@ -838,6 +842,10 @@ mainReconfigureFinish(void *)
 
     mimeInit(Config.mimeTablePathname);
 
+#if DELAY_POOLS
+    Config.ClientDelay.finalize();
+#endif
+
     if (Config.onoff.announce) {
         if (!eventFind(start_announce, NULL))
             eventAdd("start_announce", start_announce, NULL, 3600.0, 1);
@@ -1160,6 +1168,10 @@ mainInitialize(void)
     Esi::Init();
 #endif
 
+#if DELAY_POOLS
+    Config.ClientDelay.finalize();
+#endif
+
     debugs(1, 1, "Ready to serve requests.");
 
     if (!configured_once) {
index 2d284779dcb594d936095bed78051c27e72edfc1..9b71b3d4968a8e004c70c0975c2590d9d6b80c9a 100644 (file)
@@ -45,6 +45,9 @@
 
 
 class HttpRequestMethod;
+#if DELAY_POOLS
+class ClientInfo;
+#endif
 
 
 #if USE_FORW_VIA_DB
@@ -89,6 +92,10 @@ void clientdbDump(StoreEntry *);
 SQUIDCEXTERN void clientdbFreeMemory(void);
 
 SQUIDCEXTERN int clientdbEstablished(const Ip::Address &, int);
+#if DELAY_POOLS
+SQUIDCEXTERN void clientdbSetWriteLimiter(ClientInfo * info, const int writeSpeedLimit,const double initialBurst,const double highWatermark);
+SQUIDCEXTERN ClientInfo * clientdbGetInfo(const Ip::Address &addr);
+#endif
 SQUIDCEXTERN void clientOpenListenSockets(void);
 SQUIDCEXTERN void clientHttpConnectionsClose(void);
 SQUIDCEXTERN void httpRequestFree(void *);
index b0e51a2d7eaf9ead2ff2f243f029824ab0c4937b..ee814a69fce192783f2c637cd08999800bc5560a 100644 (file)
@@ -126,6 +126,7 @@ struct relist {
 
 #if DELAY_POOLS
 #include "DelayConfig.h"
+#include "ClientDelayConfig.h"
 #endif
 
 #if USE_ICMP
@@ -535,6 +536,7 @@ struct SquidConfig {
 #if DELAY_POOLS
 
     DelayConfig Delay;
+    ClientDelayConfig ClientDelay;
 #endif
 
     struct {