/// generally useful configuration options supported by some children
class Config {
public:
- Config(): ioTimeout(0) {}
+ Config(): ioTimeout(0), ioRate(-1) {}
/// canRead/Write should return false if expected I/O delay exceeds it
time_msec_t ioTimeout; // not enforced if zero, which is the default
+
+ /// shape I/O request stream to approach that many per second
+ int ioRate; // not enforced if negative, which is the default
};
typedef RefCount<DiskFile> Pointer;
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
Must(inserted);
+ queue->localRateLimit() =
+ static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
+
Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
ann.strand.tag = dbName;
Ipc::TypedMsgHdr message;
void
-IpcIoFile::DiskerHandleMoreRequests(void*)
+IpcIoFile::DiskerHandleMoreRequests(void *source)
{
- debugs(47, 7, HERE << "resuming handling requests");
+ debugs(47, 7, HERE << "resuming handling requests after " <<
+ static_cast<const char *>(source));
DiskerHandleMoreRequestsScheduled = false;
IpcIoFile::DiskerHandleRequests();
}
+bool
+IpcIoFile::WaitBeforePop()
+{
+ const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
+ const double maxRate = ioRate/1e3; // req/ms
+
+ // do we need to enforce configured I/O rate?
+ if (maxRate <= 0)
+ return false;
+
+ // is there an I/O request we could potentially delay?
+ if (!queue->popReady()) {
+ // unlike pop(), popReady() is not reliable and does not block reader
+ // so we must proceed with pop() even if it is likely to fail
+ return false;
+ }
+
+ static timeval LastIo;
+
+ const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
+ // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
+ const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
+
+ const double credit = ioDuration; // what the last I/O should have cost us
+ const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
+ LastIo = current_time;
+
+ Ipc::QueueReader::Balance &balance = queue->localBalance();
+ balance += static_cast<int64_t>(credit - debit);
+
+ debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
+
+ if (balance > maxImbalance) {
+ // if we accumulated too much time for future slow I/Os,
+ // then shed accumulated time to keep just half of the excess
+ const int64_t toSpend = balance - maxImbalance/2;
+ debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
+ (1e3*maxRate) << "/sec rate");
+ eventAdd("IpcIoFile::DiskerHandleMoreRequests",
+ &IpcIoFile::DiskerHandleMoreRequests,
+ const_cast<char*>("rate limiting"),
+ toSpend/1e3, 0, false);
+ DiskerHandleMoreRequestsScheduled = true;
+ return true;
+ } else
+ if (balance < -maxImbalance) {
+ // do not owe "too much" to avoid "too large" bursts of I/O
+ balance = -maxImbalance;
+ }
+
+ return false;
+}
+
void
IpcIoFile::DiskerHandleRequests()
{
int popped = 0;
int workerId = 0;
IpcIoMsg ipcIo;
- while (queue->pop(workerId, ipcIo)) {
+ while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
++popped;
// at least one I/O per call is guaranteed if the queue is not empty
const double minBreakSecs = 0.001;
eventAdd("IpcIoFile::DiskerHandleMoreRequests",
&IpcIoFile::DiskerHandleMoreRequests,
- NULL, minBreakSecs, 0, false);
+ const_cast<char*>("long I/O loop"),
+ minBreakSecs, 0, false);
DiskerHandleMoreRequestsScheduled = true;
}
debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
static void DiskerHandleMoreRequests(void*);
static void DiskerHandleRequests();
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
+ static bool WaitBeforePop();
private:
const String dbName; ///< the name of the file we are managing
The rock store type:
- cache_dir rock Directory-Name Mbytes <max-size=bytes>
+ cache_dir rock Directory-Name Mbytes <max-size=bytes> [options]
The Rock Store type is a database-style storage. All cached
entries are stored in a "database" file, using fixed-size slots,
blocking synchronous I/O does not allow Squid to estimate the
expected swap wait time.
+ max-swap-rate=swaps/sec: Artificially limits disk access using
+ the specified I/O rate limit. Swap in and swap out requests that
+ would cause the average I/O rate to exceed the limit are
+ delayed. This is necessary on file systems that buffer "too
+ many" writes and then start blocking Squid and other processes
+ while committing those writes to disk. Usually used together
+ with swap-timeout to avoid excessive delays and queue overflows
+ when disk demand exceeds available disk "bandwidth". By default
+ and when set to zero, disables the disk I/O rate limit
+ enforcement. Currently supported by IpcIo module only.
+
+
The coss store type:
NP: COSS filesystem in Squid-3 has been deemed too unstable for
ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
assert(vector);
vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
+ vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
return vector;
}
if (!value)
self_destruct();
- const time_msec_t newTime = strtoll(value, NULL, 10);
-
- if (newTime < 0) {
- debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newTime);
+ // TODO: handle time units and detect parsing errors better
+ const int64_t parsedValue = strtoll(value, NULL, 10);
+ if (parsedValue < 0) {
+ debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
self_destruct();
}
+ const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
+
if (reconfiguring && *storedTime != newTime)
debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newTime);
static_cast<int64_t>(fileConfig.ioTimeout));
}
+/// parses rate-specific options; mimics ::SwapDir::optionObjectSizeParse()
+bool
+Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaReconfig)
+{
+ int *storedRate;
+ if (strcmp(option, "max-swap-rate") == 0)
+ storedRate = &fileConfig.ioRate;
+ else
+ return false;
+
+ if (!value)
+ self_destruct();
+
+ // TODO: handle time units and detect parsing errors better
+ const int64_t parsedValue = strtoll(value, NULL, 10);
+ if (parsedValue < 0) {
+ debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
+ self_destruct();
+ }
+
+ const int newRate = static_cast<int>(parsedValue);
+
+ if (newRate < 0) {
+ debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
+ self_destruct();
+ }
+
+ if (isaReconfig && *storedRate != newRate)
+ debugs(3, DBG_IMPORTANT, "cache_dir " << path << ' ' << option << " is now " << newRate);
+
+ *storedRate = newRate;
+
+ return true;
+}
+
+/// reports rate-specific options; mimics ::SwapDir::optionObjectSizeDump()
+void
+Rock::SwapDir::dumpRateOption(StoreEntry * e) const
+{
+ if (fileConfig.ioRate >= 0)
+ storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
+}
+
/// check the results of the configuration; only level-0 debugging works here
void
Rock::SwapDir::validateOptions()
void validateOptions(); ///< warns of configuration problems; may quit
bool parseTimeOption(char const *option, const char *value, int reconfiguring);
void dumpTimeOption(StoreEntry * e) const;
+ bool parseRateOption(char const *option, const char *value, int reconfiguring);
+ void dumpRateOption(StoreEntry * e) const;
void rebuild(); ///< starts loading and validating stored entry metadata
///< used to add entries successfully loaded during rebuild
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
+ rateLimit(0), balance(0)
{
debugs(54, 7, HERE << "constructed " << id);
}
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
+{
+ Must(validProcessId(group, processId));
+ return group == groupA ?
+ processId - metadata->theGroupAIdOffset :
+ metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
+}
+
Ipc::QueueReader &
Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
{
- Must(validProcessId(group, processId));
- const int index = group == groupA ?
- processId - metadata->theGroupAIdOffset :
- metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
- return readers->theReaders[index];
+ return readers->theReaders[readerIndex(group, processId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+{
+ return readers->theReaders[readerIndex(group, processId)];
}
void
// theLastPopProcessId = remoteProcessId;
}
+bool
+Ipc::FewToFewBiQueue::popReady() const
+{
+ // mimic FewToFewBiQueue::pop() but quit just before popping
+ int popProcessId = theLastPopProcessId; // preserve for future pop()
+ for (int i = 0; i < remoteGroupSize(); ++i) {
+ if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
+ popProcessId = remoteGroupIdOffset();
+ const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
+ if (!queue.empty())
+ return true;
+ }
+ return false; // most likely, no process had anything to pop
+}
+
+Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::localBalance()
+{
+ QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+ return r.balance;
+}
+
+Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::localRateLimit()
+{
+ QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+ return r.rateLimit;
+}
+
Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
public:
+ typedef AtomicWord Rate; ///< pop()s per second
+ Rate rateLimit; ///< pop()s per second limit if positive
+
+ typedef AtomicWordT<int64_t> AtomicSignedMsec;
+ typedef AtomicSignedMsec Balance;
+ /// how far ahead the reader is compared to a perfect read/sec event rate
+ Balance balance;
+
/// unique ID for debugging which reader is used (works across processes)
const InstanceId<QueueReader> id;
};
/// calls OneToOneUniQueue::push() using the given process queue
template <class Value> bool push(const int remoteProcessId, const Value &value);
+ // TODO: rename to findOldest() or some such
/// calls OneToOneUniQueue::peek() using the given process queue
template<class Value> bool peek(const int remoteProcessId, Value &value) const;
+ /// returns true if pop() would have probably succeeded but does not pop()
+ bool popReady() const;
+
+ /// returns local reader's balance
+ QueueReader::Balance &localBalance();
+
+ /// returns local reader's rate limit
+ QueueReader::Rate &localRateLimit();
+
private:
bool validProcessId(const Group group, const int processId) const;
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
QueueReader &reader(const Group group, const int processId);
+ const QueueReader &reader(const Group group, const int processId) const;
+ int readerIndex(const Group group, const int processId) const;
int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }