From: Alex Rousskov Date: Thu, 15 Sep 2011 17:51:23 +0000 (-0600) Subject: Added max-swap-rate=swaps/sec option to Rock cache_dir. X-Git-Tag: take08~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=df881a0f9a049b3853ff8d6d0bbc67666cecd6c6;p=thirdparty%2Fsquid.git Added max-swap-rate=swaps/sec option to Rock cache_dir. The option limits disk access to smooth out OS disk commit activity and to avoid blocking Rock diskers (or even other processes) on I/O. Should be used when swap demand exceeds disk performance limits but the underlying file system does not slow down incoming I/Os until the situation gets out of control. TODO: Account for the I/O rate limit when estimating whether a future I/O will complete in time (for swap-timeout). TODO: Consider allowing the next swap in (i.e., read) through regardless of the limit because, unlike writes, reads do not usually accumulate in OS buffers. --- diff --git a/src/DiskIO/DiskFile.h b/src/DiskIO/DiskFile.h index 612badc748..506525fd9f 100644 --- a/src/DiskIO/DiskFile.h +++ b/src/DiskIO/DiskFile.h @@ -51,10 +51,13 @@ public: /// generally useful configuration options supported by some children class Config { public: - Config(): ioTimeout(0) {} + Config(): ioTimeout(0), ioRate(-1) {} /// canRead/Write should return false if expected I/O delay exceeds it time_msec_t ioTimeout; // not enforced if zero, which is the default + + /// shape I/O request stream to approach that many per second + int ioRate; // not enforced if negative, which is the default }; typedef RefCount Pointer; diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 34ebdb90f3..3bc587e51c 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -96,6 +96,9 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) IpcIoFiles.insert(std::make_pair(diskId, this)).second; Must(inserted); + queue->localRateLimit() = + static_cast(config.ioRate); + Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); ann.strand.tag = dbName; Ipc::TypedMsgHdr message; @@ -651,13 +654,67 @@ diskerWrite(IpcIoMsg &ipcIo) void -IpcIoFile::DiskerHandleMoreRequests(void*) +IpcIoFile::DiskerHandleMoreRequests(void *source) { - debugs(47, 7, HERE << "resuming handling requests"); + debugs(47, 7, HERE << "resuming handling requests after " << + static_cast(source)); DiskerHandleMoreRequestsScheduled = false; IpcIoFile::DiskerHandleRequests(); } +bool +IpcIoFile::WaitBeforePop() +{ + const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit(); + const double maxRate = ioRate/1e3; // req/ms + + // do we need to enforce configured I/O rate? + if (maxRate <= 0) + return false; + + // is there an I/O request we could potentially delay? + if (!queue->popReady()) { + // unlike pop(), popReady() is not reliable and does not block reader + // so we must proceed with pop() even if it is likely to fail + return false; + } + + static timeval LastIo; + + const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os + // do not accumulate more than 100ms or 100 I/Os, whichever is smaller + const int64_t maxImbalance = min(static_cast(100), static_cast(100 * ioDuration)); + + const double credit = ioDuration; // what the last I/O should have cost us + const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O + LastIo = current_time; + + Ipc::QueueReader::Balance &balance = queue->localBalance(); + balance += static_cast(credit - debit); + + debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit); + + if (balance > maxImbalance) { + // if we accumulated too much time for future slow I/Os, + // then shed accumulated time to keep just half of the excess + const int64_t toSpend = balance - maxImbalance/2; + debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" << + (1e3*maxRate) << "/sec rate"); + eventAdd("IpcIoFile::DiskerHandleMoreRequests", + &IpcIoFile::DiskerHandleMoreRequests, + const_cast("rate limiting"), + toSpend/1e3, 0, false); + DiskerHandleMoreRequestsScheduled = true; + return true; + } else + if (balance < -maxImbalance) { + // do not owe "too much" to avoid "too large" bursts of I/O + balance = -maxImbalance; + } + + return false; +} + void IpcIoFile::DiskerHandleRequests() { @@ -670,7 +727,7 @@ IpcIoFile::DiskerHandleRequests() int popped = 0; int workerId = 0; IpcIoMsg ipcIo; - while (queue->pop(workerId, ipcIo)) { + while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) { ++popped; // at least one I/O per call is guaranteed if the queue is not empty @@ -684,7 +741,8 @@ IpcIoFile::DiskerHandleRequests() const double minBreakSecs = 0.001; eventAdd("IpcIoFile::DiskerHandleMoreRequests", &IpcIoFile::DiskerHandleMoreRequests, - NULL, minBreakSecs, 0, false); + const_cast("long I/O loop"), + minBreakSecs, 0, false); DiskerHandleMoreRequestsScheduled = true; } debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " << diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index 3a43d59365..091743601e 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -102,6 +102,7 @@ private: static void DiskerHandleMoreRequests(void*); static void DiskerHandleRequests(); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); + static bool WaitBeforePop(); private: const String dbName; ///< the name of the file we are managing diff --git a/src/cf.data.pre b/src/cf.data.pre index 95842e7dac..e079ecb515 100644 --- a/src/cf.data.pre +++ b/src/cf.data.pre @@ -2761,7 +2761,7 @@ DOC_START The rock store type: - cache_dir rock Directory-Name Mbytes + cache_dir rock Directory-Name Mbytes [options] The Rock Store type is a database-style storage. All cached entries are stored in a "database" file, using fixed-size slots, @@ -2777,6 +2777,18 @@ DOC_START blocking synchronous I/O does not allow Squid to estimate the expected swap wait time. + max-swap-rate=swaps/sec: Artificially limits disk access using + the specified I/O rate limit. Swap in and swap out requests that + would cause the average I/O rate to exceed the limit are + delayed. This is necessary on file systems that buffer "too + many" writes and then start blocking Squid and other processes + while committing those writes to disk. Usually used together + with swap-timeout to avoid excessive delays and queue overflows + when disk demand exceeds available disk "bandwidth". By default + and when set to zero, disables the disk I/O rate limit + enforcement. Currently supported by IpcIo module only. + + The coss store type: NP: COSS filesystem in Squid-3 has been deemed too unstable for diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index f1e3daf79c..a8897c5e10 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -285,6 +285,7 @@ Rock::SwapDir::getOptionTree() const ConfigOptionVector *vector = dynamic_cast(::SwapDir::getOptionTree()); assert(vector); vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption)); + vector->options.push_back(new ConfigOptionAdapter(*const_cast(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption)); return vector; } @@ -304,13 +305,15 @@ Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconf if (!value) self_destruct(); - const time_msec_t newTime = strtoll(value, NULL, 10); - - if (newTime < 0) { - debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newTime); + // TODO: handle time units and detect parsing errors better + const int64_t parsedValue = strtoll(value, NULL, 10); + if (parsedValue < 0) { + debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue); self_destruct(); } + const time_msec_t newTime = static_cast(parsedValue); + if (reconfiguring && *storedTime != newTime) debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newTime); @@ -328,6 +331,49 @@ Rock::SwapDir::dumpTimeOption(StoreEntry * e) const static_cast(fileConfig.ioTimeout)); } +/// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse() +bool +Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig) +{ + int *storedRate; + if (strcmp(option, "max-swap-rate") == 0) + storedRate = &fileConfig.ioRate; + else + return false; + + if (!value) + self_destruct(); + + // TODO: handle time units and detect parsing errors better + const int64_t parsedValue = strtoll(value, NULL, 10); + if (parsedValue < 0) { + debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue); + self_destruct(); + } + + const int newRate = static_cast(parsedValue); + + if (newRate < 0) { + debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate); + self_destruct(); + } + + if (isaReconfig && *storedRate != newRate) + debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newRate); + + *storedRate = newRate; + + return true; +} + +/// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump() +void +Rock::SwapDir::dumpRateOption(StoreEntry * e) const +{ + if (fileConfig.ioRate >= 0) + storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate); +} + /// check the results of the configuration; only level-0 debugging works here void Rock::SwapDir::validateOptions() diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index 08895d5138..4b5f5d8a39 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -65,6 +65,8 @@ protected: void validateOptions(); ///< warns of configuration problems; may quit bool parseTimeOption(char const *option, const char *value, int reconfiguring); void dumpTimeOption(StoreEntry * e) const; + bool parseRateOption(char const *option, const char *value, int reconfiguring); + void dumpRateOption(StoreEntry * e) const; void rebuild(); ///< starts loading and validating stored entry metadata ///< used to add entries successfully loaded during rebuild diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 12eb5e6b2f..24e67066be 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -40,7 +40,8 @@ ReadersId(String id) InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); -Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0) +Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0), + rateLimit(0), balance(0) { debugs(54, 7, HERE << "constructed " << id); } @@ -196,14 +197,25 @@ Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcess return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } +int +Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const +{ + Must(validProcessId(group, processId)); + return group == groupA ? + processId - metadata->theGroupAIdOffset : + metadata->theGroupASize + processId - metadata->theGroupBIdOffset; +} + Ipc::QueueReader & Ipc::FewToFewBiQueue::reader(const Group group, const int processId) { - Must(validProcessId(group, processId)); - const int index = group == groupA ? - processId - metadata->theGroupAIdOffset : - metadata->theGroupASize + processId - metadata->theGroupBIdOffset; - return readers->theReaders[index]; + return readers->theReaders[readerIndex(group, processId)]; +} + +const Ipc::QueueReader & +Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const +{ + return readers->theReaders[readerIndex(group, processId)]; } void @@ -221,6 +233,35 @@ Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) // theLastPopProcessId = remoteProcessId; } +bool +Ipc::FewToFewBiQueue::popReady() const +{ + // mimic FewToFewBiQueue::pop() but quit just before popping + int popProcessId = theLastPopProcessId; // preserve for future pop() + for (int i = 0; i < remoteGroupSize(); ++i) { + if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize()) + popProcessId = remoteGroupIdOffset(); + const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId); + if (!queue.empty()) + return true; + } + return false; // most likely, no process had anything to pop +} + +Ipc::QueueReader::Balance & +Ipc::FewToFewBiQueue::localBalance() +{ + QueueReader &r = reader(theLocalGroup, theLocalProcessId); + return r.balance; +} + +Ipc::QueueReader::Rate & +Ipc::FewToFewBiQueue::localRateLimit() +{ + QueueReader &r = reader(theLocalGroup, theLocalProcessId); + return r.rateLimit; +} + Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 225d1de00c..021b87a43a 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -46,6 +46,14 @@ private: AtomicWord popSignal; ///< whether writer has sent and reader has not received notification public: + typedef AtomicWord Rate; ///< pop()s per second + Rate rateLimit; ///< pop()s per second limit if positive + + typedef AtomicWordT AtomicSignedMsec; + typedef AtomicSignedMsec Balance; + /// how far ahead the reader is compared to a perfect read/sec event rate + Balance balance; + /// unique ID for debugging which reader is used (works across processes) const InstanceId id; }; @@ -189,15 +197,27 @@ public: /// calls OneToOneUniQueue::push() using the given process queue template bool push(const int remoteProcessId, const Value &value); + // TODO: rename to findOldest() or some such /// calls OneToOneUniQueue::peek() using the given process queue template bool peek(const int remoteProcessId, Value &value) const; + /// returns true if pop() would have probably succeeded but does not pop() + bool popReady() const; + + /// returns local reader's balance + QueueReader::Balance &localBalance(); + + /// returns local reader's rate limit + QueueReader::Rate &localRateLimit(); + private: bool validProcessId(const Group group, const int processId) const; int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); QueueReader &reader(const Group group, const int processId); + const QueueReader &reader(const Group group, const int processId) const; + int readerIndex(const Group group, const int processId) const; int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }