]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Added max-swap-rate=swaps/sec option to Rock cache_dir.
authorAlex Rousskov <rousskov@measurement-factory.com>
Thu, 15 Sep 2011 17:51:23 +0000 (11:51 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Thu, 15 Sep 2011 17:51:23 +0000 (11:51 -0600)
The option limits disk access to smooth out OS disk commit activity and to
avoid blocking Rock diskers (or even other processes) on I/O. Should be used
when swap demand exceeds disk performance limits but the underlying file
system does not slow down incoming I/Os until the situation gets out of
control.

TODO: Account for the I/O rate limit when estimating whether a future I/O
will complete in time (for swap-timeout).

TODO: Consider allowing the next swap in (i.e., read) through regardless of
the limit because, unlike writes, reads do not usually accumulate in OS
buffers.

src/DiskIO/DiskFile.h
src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/cf.data.pre
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h
src/ipc/Queue.cc
src/ipc/Queue.h

index 612badc748300d528b41ff2e420de998eb71a658..506525fd9f3fe310709a9ad19c86236720d66633 100644 (file)
@@ -51,10 +51,13 @@ public:
     /// generally useful configuration options supported by some children
     class Config {
     public:
-        Config(): ioTimeout(0) {}
+        Config(): ioTimeout(0), ioRate(-1) {}
 
         /// canRead/Write should return false if expected I/O delay exceeds it
         time_msec_t ioTimeout; // not enforced if zero, which is the default
+
+        /// shape I/O request stream to approach that many per second
+        int ioRate; // not enforced if negative, which is the default
     };
 
     typedef RefCount<DiskFile> Pointer;
index 34ebdb90f352b220dfa799a09ccfedbe68671a75..3bc587e51c8e11a39b87d8b215c780dccc7dc25d 100644 (file)
@@ -96,6 +96,9 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
             IpcIoFiles.insert(std::make_pair(diskId, this)).second;
         Must(inserted);
 
+        queue->localRateLimit() =
+            static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
+
         Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
         ann.strand.tag = dbName;
         Ipc::TypedMsgHdr message;
@@ -651,13 +654,67 @@ diskerWrite(IpcIoMsg &ipcIo)
 
 
 void
-IpcIoFile::DiskerHandleMoreRequests(void*)
+IpcIoFile::DiskerHandleMoreRequests(void *source)
 {
-    debugs(47, 7, HERE << "resuming handling requests");
+    debugs(47, 7, HERE << "resuming handling requests after " <<
+           static_cast<const char *>(source));
     DiskerHandleMoreRequestsScheduled = false;
     IpcIoFile::DiskerHandleRequests();
 }
 
+bool
+IpcIoFile::WaitBeforePop()
+{
+    const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
+    const double maxRate = ioRate/1e3; // req/ms
+
+    // do we need to enforce configured I/O rate?
+    if (maxRate <= 0)
+        return false;
+
+    // is there an I/O request we could potentially delay?
+    if (!queue->popReady()) {
+        // unlike pop(), popReady() is not reliable and does not block reader
+        // so we must proceed with pop() even if it is likely to fail
+        return false;
+    }
+
+    static timeval LastIo;
+
+    const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
+    // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
+    const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
+
+    const double credit = ioDuration; // what the last I/O should have cost us
+    const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
+    LastIo = current_time;
+
+    Ipc::QueueReader::Balance &balance = queue->localBalance();
+    balance += static_cast<int64_t>(credit - debit);
+
+    debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
+
+    if (balance > maxImbalance) {
+        // if we accumulated too much time for future slow I/Os,
+        // then shed accumulated time to keep just half of the excess
+        const int64_t toSpend = balance - maxImbalance/2;
+        debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
+               (1e3*maxRate) << "/sec rate");
+        eventAdd("IpcIoFile::DiskerHandleMoreRequests",
+                 &IpcIoFile::DiskerHandleMoreRequests,
+                 const_cast<char*>("rate limiting"),
+                 toSpend/1e3, 0, false);
+        DiskerHandleMoreRequestsScheduled = true;
+        return true;
+    } else
+    if (balance < -maxImbalance) {
+        // do not owe "too much" to avoid "too large" bursts of I/O
+        balance = -maxImbalance;
+    }
+
+    return false;
+}
+
 void
 IpcIoFile::DiskerHandleRequests()
 {
@@ -670,7 +727,7 @@ IpcIoFile::DiskerHandleRequests()
     int popped = 0;
     int workerId = 0;
     IpcIoMsg ipcIo;
-    while (queue->pop(workerId, ipcIo)) {
+    while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
         ++popped;
 
         // at least one I/O per call is guaranteed if the queue is not empty
@@ -684,7 +741,8 @@ IpcIoFile::DiskerHandleRequests()
                 const double minBreakSecs = 0.001;
                 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
                          &IpcIoFile::DiskerHandleMoreRequests,
-                         NULL, minBreakSecs, 0, false);
+                         const_cast<char*>("long I/O loop"),
+                         minBreakSecs, 0, false);
                 DiskerHandleMoreRequestsScheduled = true;
             }
             debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
index 3a43d593650118f3bd1c8ee329e30b13312ecd91..091743601e74e552e8fed8d250c374f3a870ba48 100644 (file)
@@ -102,6 +102,7 @@ private:
     static void DiskerHandleMoreRequests(void*);
     static void DiskerHandleRequests();
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
+    static bool WaitBeforePop();
 
 private:
     const String dbName; ///< the name of the file we are managing
index 95842e7dac85bb5bd5c3e582ab242ac9b806690c..e079ecb5153e0ea7dc17b0ff55e65f3f43c83a91 100644 (file)
@@ -2761,7 +2761,7 @@ DOC_START
 
        The rock store type:
 
-           cache_dir rock Directory-Name Mbytes <max-size=bytes>
+           cache_dir rock Directory-Name Mbytes <max-size=bytes> [options]
 
        The Rock Store type is a database-style storage. All cached
        entries are stored in a "database" file, using fixed-size slots,
@@ -2777,6 +2777,18 @@ DOC_START
        blocking synchronous I/O does not allow Squid to estimate the
        expected swap wait time.
 
+       max-swap-rate=swaps/sec: Artificially limits disk access using
+       the specified I/O rate limit. Swap in and swap out requests that
+       would cause the average I/O rate to exceed the limit are
+       delayed. This is necessary on file systems that buffer "too
+       many" writes and then start blocking Squid and other processes
+       while committing those writes to disk.  Usually used together
+       with swap-timeout to avoid excessive delays and queue overflows
+       when disk demand exceeds available disk "bandwidth". By default
+       and when set to zero, disables the disk I/O rate limit
+       enforcement. Currently supported by IpcIo module only.
+
+
        The coss store type:
 
        NP: COSS filesystem in Squid-3 has been deemed too unstable for
index f1e3daf79cc9176b31423a097c73e11250850368..a8897c5e10316982bc4c1367ff0386e132980f38 100644 (file)
@@ -285,6 +285,7 @@ Rock::SwapDir::getOptionTree() const
     ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
     assert(vector);
     vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
+    vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
     return vector;
 }
 
@@ -304,13 +305,15 @@ Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconf
     if (!value)
         self_destruct();
 
-    const time_msec_t newTime = strtoll(value, NULL, 10);
-
-    if (newTime < 0) {
-        debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newTime);
+    // TODO: handle time units and detect parsing errors better
+    const int64_t parsedValue = strtoll(value, NULL, 10);
+    if (parsedValue < 0) {
+        debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
         self_destruct();
     }
 
+    const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
+
     if (reconfiguring && *storedTime != newTime)
         debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newTime);
 
@@ -328,6 +331,49 @@ Rock::SwapDir::dumpTimeOption(StoreEntry * e) const
                           static_cast<int64_t>(fileConfig.ioTimeout));
 }
 
+/// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
+bool
+Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
+{
+    int *storedRate;
+    if (strcmp(option, "max-swap-rate") == 0)
+        storedRate = &fileConfig.ioRate;
+    else
+        return false;
+
+    if (!value)
+        self_destruct();
+
+    // TODO: handle time units and detect parsing errors better
+    const int64_t parsedValue = strtoll(value, NULL, 10);
+    if (parsedValue < 0) {
+        debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
+        self_destruct();
+    }
+
+    const int newRate = static_cast<int>(parsedValue);
+
+    if (newRate < 0) {
+        debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
+        self_destruct();
+    }
+
+    if (isaReconfig && *storedRate != newRate)
+        debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newRate);
+
+    *storedRate = newRate;
+
+    return true;
+}
+
+/// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
+void
+Rock::SwapDir::dumpRateOption(StoreEntry * e) const
+{
+    if (fileConfig.ioRate >= 0)
+        storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
+}
+
 /// check the results of the configuration; only level-0 debugging works here
 void
 Rock::SwapDir::validateOptions()
index 08895d513897eba9ec2d4b75c5298c08492ac597..4b5f5d8a3987c705fc014fec367701f6eab76d56 100644 (file)
@@ -65,6 +65,8 @@ protected:
     void validateOptions(); ///< warns of configuration problems; may quit
     bool parseTimeOption(char const *option, const char *value, int reconfiguring);
     void dumpTimeOption(StoreEntry * e) const;
+    bool parseRateOption(char const *option, const char *value, int reconfiguring);
+    void dumpRateOption(StoreEntry * e) const;
 
     void rebuild(); ///< starts loading and validating stored entry metadata
     ///< used to add entries successfully loaded during rebuild
index 12eb5e6b2f5b3e957184d92e5fa55cede026c547..24e67066be66929f16f2089d8ef456cb6b668d39 100644 (file)
@@ -40,7 +40,8 @@ ReadersId(String id)
 
 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
 
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
+        rateLimit(0), balance(0)
 {
     debugs(54, 7, HERE << "constructed " << id);
 }
@@ -196,14 +197,25 @@ Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcess
     return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
+{
+    Must(validProcessId(group, processId));
+    return group == groupA ?
+           processId - metadata->theGroupAIdOffset :
+           metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
+}
+
 Ipc::QueueReader &
 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
 {
-    Must(validProcessId(group, processId));
-    const int index =  group == groupA ?
-                       processId - metadata->theGroupAIdOffset :
-                       metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
-    return readers->theReaders[index];
+    return readers->theReaders[readerIndex(group, processId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+{
+    return readers->theReaders[readerIndex(group, processId)];
 }
 
 void
@@ -221,6 +233,35 @@ Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
     // theLastPopProcessId = remoteProcessId;
 }
 
+bool
+Ipc::FewToFewBiQueue::popReady() const
+{
+    // mimic FewToFewBiQueue::pop() but quit just before popping
+    int popProcessId = theLastPopProcessId; // preserve for future pop()
+    for (int i = 0; i < remoteGroupSize(); ++i) {
+        if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
+            popProcessId = remoteGroupIdOffset();
+        const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
+        if (!queue.empty())
+            return true;
+    }
+    return false; // most likely, no process had anything to pop
+}
+
+Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::localBalance()
+{
+    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+    return r.balance;
+}
+
+Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::localRateLimit()
+{
+    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+    return r.rateLimit;
+}
+
 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
         theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
         theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
index 225d1de00c550da923366e3fa9eea2a798803c1f..021b87a43ac1545ae9c281a7e7191fa2605b7ab1 100644 (file)
@@ -46,6 +46,14 @@ private:
     AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
 
 public:
+    typedef AtomicWord Rate; ///< pop()s per second
+    Rate rateLimit; ///< pop()s per second limit if positive
+
+    typedef AtomicWordT<int64_t> AtomicSignedMsec;
+    typedef AtomicSignedMsec Balance;
+    /// how far ahead the reader is compared to a perfect read/sec event rate
+    Balance balance;
+
     /// unique ID for debugging which reader is used (works across processes)
     const InstanceId<QueueReader> id;
 };
@@ -189,15 +197,27 @@ public:
     /// calls OneToOneUniQueue::push() using the given process queue
     template <class Value> bool push(const int remoteProcessId, const Value &value);
 
+    // TODO: rename to findOldest() or some such
     /// calls OneToOneUniQueue::peek() using the given process queue
     template<class Value> bool peek(const int remoteProcessId, Value &value) const;
 
+    /// returns true if pop() would have probably succeeded but does not pop()
+    bool popReady() const;
+
+    /// returns local reader's balance
+    QueueReader::Balance &localBalance();
+
+    /// returns local reader's rate limit
+    QueueReader::Rate &localRateLimit();
+
 private:
     bool validProcessId(const Group group, const int processId) const;
     int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
     QueueReader &reader(const Group group, const int processId);
+    const QueueReader &reader(const Group group, const int processId) const;
+    int readerIndex(const Group group, const int processId) const;
     int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
     int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }