#include "ipc/StrandSearch.h"
#include "ipc/UdsOp.h"
#include "ipc/mem/Pages.h"
+#include "SquidTime.h"
CBDATA_CLASS_INIT(IpcIoFile);
bool
IpcIoFile::canRead() const
{
- return diskId >= 0;
+ return diskId >= 0 && canWait();
}
bool
IpcIoFile::canWrite() const
{
- return diskId >= 0;
+ return diskId >= 0 && canWait();
}
bool
IpcIoMsg ipcIo;
try {
ipcIo.requestId = lastRequestId;
+ ipcIo.start = current_time;
if (pending->readRequest) {
ipcIo.command = IpcIo::cmdRead;
ipcIo.offset = pending->readRequest->offset;
}
}
+/// whether we think there is enough time to complete the I/O
+bool
+IpcIoFile::canWait() const {
+ if (!Config.Timeout.disk_io)
+ return true; // no timeout specified
+
+ IpcIoMsg oldestIo;
+ if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
+ return true; // we cannot estimate expected wait time; assume it is OK
+
+ const int expectedWait = tvSubMsec(oldestIo.start, current_time);
+ if (expectedWait < 0 ||
+ static_cast<time_msec_t>(expectedWait) < Config.Timeout.disk_io)
+ return true; // expected wait time is acceptible
+
+ debugs(47,2, HERE << "cannot wait: " << expectedWait);
+ return false; // do not want to wait that long
+}
+
/// called when coordinator responds to worker open request
void
IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
IpcIoMsg::IpcIoMsg():
requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0)
{
+ start.tv_sec = 0;
}
/* IpcIoPendingRequest */
Ipc::Mem::PageId page;
IpcIo::Command command; ///< what disker is supposed to do or did
+ struct timeval start; ///< when the I/O request was converted to IpcIoMsg
int xerrno; ///< I/O error code or zero
};
void openCompleted(const Ipc::StrandSearchResponse *const response);
void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
+ bool canWait() const;
private:
void trackPendingRequest(IpcIoPendingRequest *const pending);
many ident requests going at once.
DOC_END
+NAME: disk_io_timeout
+TYPE: time_msec
+DEFAULT: 0 milliseconds
+LOC: Config.Timeout.disk_io
+DOC_START
+ Squid will not start a new disk I/O operation if it estimates that the
+ operation will take more than the specified disk I/O timeout.
+ Only Rock Store supports this timeout for now.
+
+ By default and when set to zero, disables the disk I/O time limit
+ enforcement.
+DOC_END
+
+
+
NAME: shutdown_lifetime
COMMENT: time-units
TYPE: time_t
Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
{
assert(theFile != NULL);
- assert(theFile->canRead());
assert(coreOff >= 0);
offset_ = coreOff;
Rock::IoState::startWriting()
{
assert(theFile != NULL);
- assert(theFile->canWrite());
assert(!theBuf.isNull());
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
StoreEntry *
Rock::SwapDir::get(const cache_key *key)
{
- if (!map)
+ if (!map || !theFile || !theFile->canRead())
return NULL;
sfileno fileno;
return false;
}
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+int
+Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
Must(fromGroup != toGroup);
- Must(validProcessId(fromGroup, fromProcessId));
- Must(validProcessId(toGroup, toProcessId));
+ assert(validProcessId(fromGroup, fromProcessId));
+ assert(validProcessId(toGroup, toProcessId));
int index1;
int index2;
int offset;
offset = metadata->theGroupASize * metadata->theGroupBSize;
}
const int index = offset + index1 * metadata->theGroupBSize + index2;
- return (*queues)[index];
+ return index;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+{
+ return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
+{
+ return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
Ipc::QueueReader &
/// returns true iff the caller must notify the reader of the pushed item
template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
+ /// returns true iff the value was set; the value may be stale!
+ template<class Value> bool peek(Value &value) const;
+
private:
unsigned int theIn; ///< input index, used only in push()
/// calls OneToOneUniQueue::push() using the given process queue
template <class Value> bool push(const int remoteProcessId, const Value &value);
+ /// calls OneToOneUniQueue::peek() using the given process queue
+ template<class Value> bool peek(const int remoteProcessId, Value &value) const;
+
private:
bool validProcessId(const Group group, const int processId) const;
+ int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+ const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
QueueReader &reader(const Group group, const int processId);
int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
return true;
}
+template <class Value>
+bool
+OneToOneUniQueue::peek(Value &value) const
+{
+ if (sizeof(value) > theMaxItemSize)
+ throw ItemTooLarge();
+
+ if (empty())
+ return false;
+
+ // the reader may pop() before we copy; making this method imprecise
+ const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
+ memcpy(&value, theBuffer + pos, sizeof(value));
+ return true;
+}
+
template <class Value>
bool
OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
return remoteQueue.push(value, &remoteReader);
}
+template <class Value>
+bool
+FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const
+{
+ // we may be called before remote process configured its queue end
+ if (!validProcessId(remoteGroup(), remoteProcessId))
+ return false;
+
+ const OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
+ debugs(54, 7, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
+ return remoteQueue.peek(value);
+}
+
} // namespace Ipc
#endif // SQUID_IPC_QUEUE_H
return Config.memMaxSize > 0 ? Config.memMaxSize / PageSize() : 0;
case PageId::ioPage:
// XXX: this should be independent from memory cache pages
- return PageLimit(PageId::cachePage) * 0.5;
+ return PageLimit(PageId::cachePage)/2;
default:
Must(false);
}
int icp_query_max; /* msec */
int icp_query_min; /* msec */
int mcast_icp_query; /* msec */
+ time_msec_t disk_io;
#if !USE_DNSSERVERS