if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
return true; // we cannot estimate expected wait time; assume it is OK
- const int expectedWait = tvSubMsec(oldestIo.start, current_time);
+ const int oldestWait = tvSubMsec(oldestIo.start, current_time);
+
+ int rateWait = -1; // time in millisecons
+ const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
+ if (ioRate > 0) {
+ // if there are N requests pending, the new one will wait at
+ // least N/max-swap-rate seconds
+ rateWait = 1e3 * queue->outSize(diskId) / ioRate;
+ // adjust N/max-swap-rate value based on the queue "balance"
+ // member, in case we have been borrowing time against future
+ // I/O already
+ rateWait += queue->balance(diskId);
+ }
+
+ const int expectedWait = max(oldestWait, rateWait);
if (expectedWait < 0 ||
static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
return true; // expected wait time is acceptible
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
+/// incoming queue from a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteGroup(), remoteProcessId,
+ theLocalGroup, theLocalProcessId);
+}
+
+/// outgoing queue to a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalGroup, theLocalProcessId,
+ remoteGroup(), remoteProcessId);
+}
+
int
Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
{
return r.balance;
}
+const Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+{
+ const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+ return r.balance;
+}
+
Ipc::QueueReader::Rate &
Ipc::FewToFewBiQueue::localRateLimit()
{
return r.rateLimit;
}
+const Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+{
+ const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+ return r.rateLimit;
+}
+
Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
/// returns local reader's balance
QueueReader::Balance &localBalance();
+ /// returns reader's balance for a given remote process
+ const QueueReader::Balance &balance(const int remoteProcessId) const;
+
/// returns local reader's rate limit
QueueReader::Rate &localRateLimit();
+ /// returns reader's rate limit for a given remote process
+ const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+ /// number of items in incoming queue from a given remote process
+ int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+ /// number of items in outgoing queue to a given remote process
+ int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
private:
bool validProcessId(const Group group, const int processId) const;
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
+ const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+ const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
QueueReader &reader(const Group group, const int processId);
const QueueReader &reader(const Group group, const int processId) const;
int readerIndex(const Group group, const int processId) const;
return false;
// we need the oldest value, so start with the incoming, them-to-us queue:
- const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId);
- debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size());
- if (inQueue.peek(value))
+ const OneToOneUniQueue &in = inQueue(remoteProcessId);
+ debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
+ theLocalProcessId << " at " << in.size());
+ if (in.peek(value))
return true;
// if the incoming queue is empty, check the outgoing, us-to-them queue:
- const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
- debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size());
- return outQueue.peek(value);
+ const OneToOneUniQueue &out = outQueue(remoteProcessId);
+ debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
+ remoteProcessId << " at " << out.size());
+ return out.peek(value);
}
} // namespace Ipc