]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Account for max-swap-rate in swap-timeout handling for Rock.
authorAlex Rousskov <rousskov@measurement-factory.com>
Fri, 14 Oct 2011 16:00:08 +0000 (10:00 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Fri, 14 Oct 2011 16:00:08 +0000 (10:00 -0600)
Current swap-timeout code does not know about max-swap-rate.  It
simply finds the longest-waiting I/O in disker queues (incoming and
outgoing) and then assumes that the new I/O will wait at least that
long.  The assumption is likely to be wrong when the queue contains
lots of freshly queued requests to disker: Those requests have not
waited long yet, but a max-swap-rate limit will slow them down
shortly.

The patch changes the swap-timeout code to account for max-swap-rate
when dealing with the workers-to-disker queue: If there are N requests
pending, the new one will wait at least N/max-swap-rate seconds.  Also
expected wait time is adjusted based on the queue "balance" member, in
case we have been borrowing time against future I/O already.

src/DiskIO/IpcIo/IpcIoFile.cc
src/ipc/Queue.cc
src/ipc/Queue.h

index e0b3e34ee733b2ea65c8ed99841d0581b2b9dae0..3f6a889e5df309f7b2b24ae4b34d5699b47777a9 100644 (file)
@@ -360,7 +360,21 @@ IpcIoFile::canWait() const
     if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
         return true; // we cannot estimate expected wait time; assume it is OK
 
-    const int expectedWait = tvSubMsec(oldestIo.start, current_time);
+    const int oldestWait = tvSubMsec(oldestIo.start, current_time);
+
+    int rateWait = -1; // time in millisecons
+    const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
+    if (ioRate > 0) {
+        // if there are N requests pending, the new one will wait at
+        // least N/max-swap-rate seconds
+        rateWait = 1e3 * queue->outSize(diskId) / ioRate;
+        // adjust N/max-swap-rate value based on the queue "balance"
+        // member, in case we have been borrowing time against future
+        // I/O already
+        rateWait += queue->balance(diskId);
+    }
+
+    const int expectedWait = max(oldestWait, rateWait);
     if (expectedWait < 0 ||
             static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
         return true; // expected wait time is acceptible
index 24e67066be66929f16f2089d8ef456cb6b668d39..76e266b68418b71c9ccf94478ab46abc409ddd19 100644 (file)
@@ -197,6 +197,22 @@ Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcess
     return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
+/// incoming queue from a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteGroup(), remoteProcessId,
+                         theLocalGroup, theLocalProcessId);
+}
+
+/// outgoing queue to a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalGroup, theLocalProcessId,
+                         remoteGroup(), remoteProcessId);
+}
+
 int
 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
 {
@@ -255,6 +271,13 @@ Ipc::FewToFewBiQueue::localBalance()
     return r.balance;
 }
 
+const Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+    return r.balance;
+}
+
 Ipc::QueueReader::Rate &
 Ipc::FewToFewBiQueue::localRateLimit()
 {
@@ -262,6 +285,13 @@ Ipc::FewToFewBiQueue::localRateLimit()
     return r.rateLimit;
 }
 
+const Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+    return r.rateLimit;
+}
+
 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
         theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
         theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
index 20986103d9aef628842aa38f5c868bb702e0cd58..640342d9739a7087bb3b88fc482676c0b1f39d41 100644 (file)
@@ -208,14 +208,28 @@ public:
     /// returns local reader's balance
     QueueReader::Balance &localBalance();
 
+    /// returns reader's balance for a given remote process
+    const QueueReader::Balance &balance(const int remoteProcessId) const;
+
     /// returns local reader's rate limit
     QueueReader::Rate &localRateLimit();
 
+    /// returns reader's rate limit for a given remote process
+    const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+    /// number of items in incoming queue from a given remote process
+    int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+    /// number of items in outgoing queue to a given remote process
+    int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
 private:
     bool validProcessId(const Group group, const int processId) const;
     int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
+    const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+    const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
     QueueReader &reader(const Group group, const int processId);
     const QueueReader &reader(const Group group, const int processId) const;
     int readerIndex(const Group group, const int processId) const;
@@ -357,15 +371,17 @@ FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const
         return false;
 
     // we need the oldest value, so start with the incoming, them-to-us queue:
-    const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId);
-    debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size());
-    if (inQueue.peek(value))
+    const OneToOneUniQueue &in = inQueue(remoteProcessId);
+    debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
+           theLocalProcessId << " at " << in.size());
+    if (in.peek(value))
         return true;
 
     // if the incoming queue is empty, check the outgoing, us-to-them queue:
-    const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
-    debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size());
-    return outQueue.peek(value);
+    const OneToOneUniQueue &out = outQueue(remoteProcessId);
+    debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
+           remoteProcessId << " at " << out.size());
+    return out.peek(value);
 }
 
 } // namespace Ipc