int n_established; /* number of current established connections */
time_t last_seen;
#if DELAY_POOLS
- double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
- double prevTime; ///< previous time when we checked
+ double writeSpeedLimit;///< Write speed limit in bytes per second, can be less than 1, if too close to zero this could result in timeouts from client
+ double prevTime; ///< previous time when we checked
double bucketSize; ///< how much can be written now
double bucketSizeLimit; ///< maximum bucket size
- bool writeLimitingActive; ///< Is write limiter active
+ bool writeLimitingActive; ///< Is write limiter active
bool firstTimeConnection;///< is this first time connection for this client
CommQuotaQueue *quotaQueue; ///< clients waiting for more write quota
void refillBucket(); ///< adds bytes to bucket based on rate and time
void quotaDumpQueue(); ///< dumps quota queue for debugging
-
-/**
- * Configure client write limiting (note:"client" here means - IP). It is called
- * by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
- * computed, using the configured maximum bucket vavlue and configured initial
- * bucket value(50% by default).
- *
- * \param writeSpeedLimit is speed limit configured in config for this pool
- * \param initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
- * \param highWatermark is maximum bucket value
- */
+
+ /**
+ * Configure client write limiting (note:"client" here means - IP). It is called
+ * by httpAccept in client_side.cc, where the initial bucket size (anInitialBurst)
+ * computed, using the configured maximum bucket vavlue and configured initial
+ * bucket value(50% by default).
+ *
+ * \param writeSpeedLimit is speed limit configured in config for this pool
+ * \param initialBurst is initial bucket size to use for this client(i.e. client can burst at first)
+ * \param highWatermark is maximum bucket value
+ */
void setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark);
#endif
};
int front() const { return fds.front(); }
unsigned int enqueue(int fd);
void dequeue();
-
+
ClientInfo *clientInfo; ///< bucket responsible for quota maintenance
// these counters might overflow; that is OK because they are for IDs only
// kick queue or it will get stuck as commWriteHandle is not called
clientInfo->kickQuotaQueue();
}
- }
+ }
#endif
commCallCloseHandlers(fd);
ClientInfo *clientInfo = queue->clientInfo;
// ClientInfo invalidates queue if freed, so if we got here through,
// evenAdd cbdata protections, everything should be valid and consistent
- assert(clientInfo);
+ assert(clientInfo);
assert(clientInfo->hasQueue());
assert(clientInfo->hasQueue(queue));
assert(!clientInfo->selectWaiting);
comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
if (fd_table[head].clientInfo == clientInfo &&
- clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
- !fd_table[head].closing()) {
+ clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
+ !fd_table[head].closing()) {
// wait for the head descriptor to become ready for writing
commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
clientInfo->selectWaiting = true;
return;
- }
+ }
- clientInfo->quotaDequeue(); // remove the no longer relevant descriptor
- // and continue looking for a relevant one
+ clientInfo->quotaDequeue(); // remove the no longer relevant descriptor
+ // and continue looking for a relevant one
} while (clientInfo->hasQueue());
debugs(77,3, HERE << "emptied queue");
// wait at least a second if the bucket is empty
const double delay = (bucketSize < 1.0) ? 1.0 : 0.0;
eventAdd("commHandleWriteHelper", &commHandleWriteHelper,
- quotaQueue, delay, 0, true);
+ quotaQueue, delay, 0, true);
eventWaiting = true;
}
}
// negative bucket sizes after write with rationedCount=1.
rationedQuota = static_cast<int>(floor(bucketSize/rationedCount));
debugs(77,5, HERE << "new rationedQuota: " << rationedQuota <<
- '*' << rationedCount);
+ '*' << rationedCount);
}
--rationedCount;
debugs(77,7, HERE << "rationedQuota: " << rationedQuota <<
- " rations remaining: " << rationedCount);
+ " rations remaining: " << rationedCount);
// update 'last seen' time to prevent clientdb GC from dropping us
last_seen = squid_curtime;
const double currTime = current_dtime;
const double timePassed = currTime - prevTime;
- // Calculate allowance for the time passed. Use double to avoid
+ // Calculate allowance for the time passed. Use double to avoid
// accumulating rounding errors for small intervals. For example, always
// adding 1 byte instead of 1.4 results in 29% bandwidth allocation error.
const double gain = timePassed * writeSpeedLimit;
debugs(77,5, HERE << currTime << " clt" << (const char*)hash.key << ": " <<
- bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
- " = " << gain << ')');
+ bucketSize << " + (" << timePassed << " * " << writeSpeedLimit <<
+ " = " << gain << ')');
// to further combat error accumulation during micro updates,
// quit before updating time if we cannot add at least one byte
if (gain < 1.0)
- return;
+ return;
prevTime = currTime;
// for "first" connections, drain initial fat before refilling but keep
// updating prevTime to avoid bursts after the fat is gone
if (bucketSize > bucketSizeLimit) {
- debugs(77,4, HERE << "not refilling while draining initial fat");
- return;
+ debugs(77,4, HERE << "not refilling while draining initial fat");
+ return;
}
bucketSize += gain;
bucketSize = bucketSizeLimit;
}
-void
+void
ClientInfo::setWriteLimiter(const int aWriteSpeedLimit, const double anInitialBurst, const double aHighWatermark)
{
- debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
- " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
- " highwatermark=" << aHighWatermark);
+ debugs(77,5, HERE << "Write limits for " << (const char*)hash.key <<
+ " speed=" << aWriteSpeedLimit << " burst=" << anInitialBurst <<
+ " highwatermark=" << aHighWatermark);
// set or possibly update traffic shaping parameters
writeLimitingActive = true;
}
CommQuotaQueue::CommQuotaQueue(ClientInfo *info): clientInfo(info),
- ins(0), outs(0)
+ ins(0), outs(0)
{
assert(clientInfo);
}
CommQuotaQueue::enqueue(int fd)
{
debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
- ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
+ ": FD " << fd << " with qqid" << (ins+1) << ' ' << fds.size());
fds.push_back(fd);
return ++ins;
}
{
assert(!fds.empty());
debugs(77,5, HERE << "clt" << (const char*)clientInfo->hash.key <<
- ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
- fds.size());
+ ": FD " << fds.front() << " with qqid" << (outs+1) << ' ' <<
+ fds.size());
fds.pop_front();
++outs;
}
const int nleft_corrected = min(nleft, quota);
if (nleft != nleft_corrected) {
debugs(5, 5, HERE << "FD " << fd << " writes only " <<
- nleft_corrected << " out of " << nleft);
+ nleft_corrected << " out of " << nleft);
nleft = nleft_corrected;
}
if (len > 0) {
/* we wrote data - drain them from bucket */
clientInfo->bucketSize -= len;
- if (clientInfo->bucketSize < 0.0)
- {
+ if (clientInfo->bucketSize < 0.0) {
debugs(5,1, HERE << "drained too much"); // should not happen
clientInfo->bucketSize = 0;
}
- }
+ }
- // even if we wrote nothing, we were served; give others a chance
- clientInfo->kickQuotaQueue();
+ // even if we wrote nothing, we were served; give others a chance
+ clientInfo->kickQuotaQueue();
}
#endif