From: Alex Rousskov Date: Fri, 14 Oct 2011 16:00:08 +0000 (-0600) Subject: Account for max-swap-rate in swap-timeout handling for Rock. X-Git-Tag: BumpSslServerFirst.take01~97 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=55939a017eefbe9df6f225395000cbc106d9854f;p=thirdparty%2Fsquid.git Account for max-swap-rate in swap-timeout handling for Rock. Current swap-timeout code does not know about max-swap-rate. It simply finds the longest-waiting I/O in disker queues (incoming and outgoing) and then assumes that the new I/O will wait at least that long. The assumption is likely to be wrong when the queue contains lots of freshly queued requests to disker: Those requests have not waited long yet, but a max-swap-rate limit will slow them down shortly. The patch changes the swap-timeout code to account for max-swap-rate when dealing with the workers-to-disker queue: If there are N requests pending, the new one will wait at least N/max-swap-rate seconds. Also expected wait time is adjusted based on the queue "balance" member, in case we have been borrowing time against future I/O already. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index e0b3e34ee7..3f6a889e5d 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -360,7 +360,21 @@ IpcIoFile::canWait() const if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0) return true; // we cannot estimate expected wait time; assume it is OK - const int expectedWait = tvSubMsec(oldestIo.start, current_time); + const int oldestWait = tvSubMsec(oldestIo.start, current_time); + + int rateWait = -1; // time in millisecons + const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId); + if (ioRate > 0) { + // if there are N requests pending, the new one will wait at + // least N/max-swap-rate seconds + rateWait = 1e3 * queue->outSize(diskId) / ioRate; + // adjust N/max-swap-rate value based on the queue "balance" + // member, in case we have been borrowing time against future + // I/O already + rateWait += queue->balance(diskId); + } + + const int expectedWait = max(oldestWait, rateWait); if (expectedWait < 0 || static_cast(expectedWait) < config.ioTimeout) return true; // expected wait time is acceptible diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 24e67066be..76e266b684 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -197,6 +197,22 @@ Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcess return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } +/// incoming queue from a given remote process +const Ipc::OneToOneUniQueue & +Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const +{ + return oneToOneQueue(remoteGroup(), remoteProcessId, + theLocalGroup, theLocalProcessId); +} + +/// outgoing queue to a given remote process +const Ipc::OneToOneUniQueue & +Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const +{ + return oneToOneQueue(theLocalGroup, theLocalProcessId, + remoteGroup(), remoteProcessId); +} + int Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const { @@ -255,6 +271,13 @@ Ipc::FewToFewBiQueue::localBalance() return r.balance; } +const Ipc::QueueReader::Balance & +Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const +{ + const QueueReader &r = reader(remoteGroup(), remoteProcessId); + return r.balance; +} + Ipc::QueueReader::Rate & Ipc::FewToFewBiQueue::localRateLimit() { @@ -262,6 +285,13 @@ Ipc::FewToFewBiQueue::localRateLimit() return r.rateLimit; } +const Ipc::QueueReader::Rate & +Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const +{ + const QueueReader &r = reader(remoteGroup(), remoteProcessId); + return r.rateLimit; +} + Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 20986103d9..640342d973 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -208,14 +208,28 @@ public: /// returns local reader's balance QueueReader::Balance &localBalance(); + /// returns reader's balance for a given remote process + const QueueReader::Balance &balance(const int remoteProcessId) const; + /// returns local reader's rate limit QueueReader::Rate &localRateLimit(); + /// returns reader's rate limit for a given remote process + const QueueReader::Rate &rateLimit(const int remoteProcessId) const; + + /// number of items in incoming queue from a given remote process + int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } + + /// number of items in outgoing queue to a given remote process + int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } + private: bool validProcessId(const Group group, const int processId) const; int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); + const OneToOneUniQueue &inQueue(const int remoteProcessId) const; + const OneToOneUniQueue &outQueue(const int remoteProcessId) const; QueueReader &reader(const Group group, const int processId); const QueueReader &reader(const Group group, const int processId) const; int readerIndex(const Group group, const int processId) const; @@ -357,15 +371,17 @@ FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const return false; // we need the oldest value, so start with the incoming, them-to-us queue: - const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId); - debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size()); - if (inQueue.peek(value)) + const OneToOneUniQueue &in = inQueue(remoteProcessId); + debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << + theLocalProcessId << " at " << in.size()); + if (in.peek(value)) return true; // if the incoming queue is empty, check the outgoing, us-to-them queue: - const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); - debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size()); - return outQueue.peek(value); + const OneToOneUniQueue &out = outQueue(remoteProcessId); + debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << + remoteProcessId << " at " << out.size()); + return out.peek(value); } } // namespace Ipc