CBDATA_CLASS_INIT(IpcIoFile);
-IpcIoFile::RequestsMap IpcIoFile::TheRequestsMap;
+IpcIoFile::RequestMap IpcIoFile::TheRequestMap1;
+IpcIoFile::RequestMap IpcIoFile::TheRequestMap2;
+IpcIoFile::RequestMap *IpcIoFile::TheOlderRequests = &IpcIoFile::TheRequestMap1;
+IpcIoFile::RequestMap *IpcIoFile::TheNewerRequests = &IpcIoFile::TheRequestMap2;
+bool IpcIoFile::TimeoutCheckScheduled = false;
unsigned int IpcIoFile::LastRequestId = 0;
static bool DiskerOpen(const String &path, int flags, mode_t mode);
}
void
-IpcIoFile::openCompleted(const IpcIoResponse &ipcResponse) {
- if (ipcResponse.xerrno) {
- debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) {
+ if (!ipcResponse) {
+ debugs(79,1, HERE << "error: timeout");
+ error_ = true;
+ } else
+ if (ipcResponse->xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
error_ = true;
} else {
- diskId = ipcResponse.diskId;
+ diskId = ipcResponse->diskId;
if (diskId < 0) {
error_ = true;
debugs(79,1, HERE << "error: no disker claimed " << dbName);
void
IpcIoFile::readCompleted(ReadRequest *readRequest,
- const IpcIoResponse &ipcResponse)
-{
- if (ipcResponse.xerrno) {
- debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
- error_ = true;
+ const IpcIoResponse *ipcResponse)
+{
+ bool ioError = false;
+ if (!ipcResponse) {
+ debugs(79,1, HERE << "error: timeout");
+ ioError = true; // I/O timeout does not warrant setting error_?
+ } else
+ if (ipcResponse->xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
+ ioError = error_ = true;
} else {
- memcpy(readRequest->buf, ipcResponse.buf, ipcResponse.len);
+ memcpy(readRequest->buf, ipcResponse->buf, ipcResponse->len);
}
- const ssize_t rlen = error_ ? -1 : (ssize_t)readRequest->len;
- const int errflag = error_ ? DISK_ERROR : DISK_OK;
+ const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
+ const int errflag = ioError ? DISK_ERROR : DISK_OK;
ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
}
void
IpcIoFile::writeCompleted(WriteRequest *writeRequest,
- const IpcIoResponse &ipcResponse)
-{
- if (ipcResponse.xerrno) {
- debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+ const IpcIoResponse *ipcResponse)
+{
+ bool ioError = false;
+ if (!ipcResponse) {
+ debugs(79,1, HERE << "error: timeout");
+ ioError = true; // I/O timeout does not warrant setting error_?
+ } else
+ if (ipcResponse->xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
error_ = true;
} else
- if (ipcResponse.len != writeRequest->len) {
- debugs(79,1, HERE << "problem: " << ipcResponse.len << " < " << writeRequest->len);
+ if (ipcResponse->len != writeRequest->len) {
+ debugs(79,1, HERE << "problem: " << ipcResponse->len << " < " << writeRequest->len);
error_ = true;
}
if (writeRequest->free_func)
(writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
- if (!error_) {
+ if (!ioError) {
debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
diskId << " at " << writeRequest->offset);
}
- const ssize_t rlen = error_ ? 0 : (ssize_t)writeRequest->len;
- const int errflag = error_ ? DISK_ERROR : DISK_OK;
+ const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
+ const int errflag = ioError ? DISK_ERROR : DISK_OK;
ioRequestor->writeCompleted(errflag, rlen, writeRequest);
}
if (++LastRequestId == 0) // don't use zero value as requestId
++LastRequestId;
ipcIo.requestId = LastRequestId;
- TheRequestsMap[ipcIo.requestId] = pending;
+ TheNewerRequests->insert(std::make_pair(ipcIo.requestId, pending));
Ipc::TypedMsgHdr message;
ipcIo.pack(message);
Ipc::SendMessage(addr, message);
++ioLevel;
- const double timeout = 10; // in seconds
- eventAdd("IpcIoFile::requestTimedOut", &IpcIoFile::RequestTimedOut,
- this, timeout, 0, false);
+ if (!TimeoutCheckScheduled)
+ ScheduleTimeoutCheck();
}
/// called when disker responds to our I/O request
IpcIoResponse response(raw);
const int requestId = response.requestId;
- debugs(47, 7, HERE << "disker response to " <<
- response.command << "; ipcIo" << KidIdentifier << '.' << requestId);
+ debugs(47, 7, HERE << "disker response to " << response.command <<
+ "; ipcIo" << KidIdentifier << '.' << requestId);
Must(requestId != 0);
- IpcIoPendingRequest *pending = DequeueRequest(requestId);
- Must(pending);
-
- if (pending->readRequest)
- pending->file->readCompleted(pending->readRequest, response);
- else
- if (pending->writeRequest)
- pending->file->writeCompleted(pending->writeRequest, response);
- else
- pending->file->openCompleted(response);
-
- // XXX: leaking if throwinig
- delete pending;
+ if (IpcIoPendingRequest *pending = DequeueRequest(requestId)) {
+ pending->completeIo(&response);
+ delete pending; // XXX: leaking if throwing
+ } else {
+ debugs(47, 4, HERE << "LATE disker response to " << response.command <<
+ "; ipcIo" << KidIdentifier << '.' << requestId);
+ // nothing we can do about it; completeIo() has been called already
+ }
}
-
-/// Mgr::IpcIoFile::requestTimedOut wrapper
+/// Mgr::IpcIoFile::checkTimeous wrapper
void
-IpcIoFile::RequestTimedOut(void* param)
+IpcIoFile::CheckTimeouts(void*)
{
- debugs(47, 1, HERE << "bug: request timedout and we cannot handle that");
- Must(param != NULL);
- // XXX: cannot get to file because IpcIoFile is not cbdata-protected
- // IpcIoFile* file = static_cast<IpcIoFile*>(param);
+ TimeoutCheckScheduled = false;
- // TODO: notify the pending request (XXX: which one?)
+ // any old request would have timed out by now
+ typedef RequestMap::const_iterator RMCI;
+ RequestMap &elders = *TheOlderRequests;
+ for (RMCI i = elders.begin(); i != elders.end(); ++i) {
+ IpcIoPendingRequest *pending = i->second;
- // use async call to enable job call protection that time events lack
- // CallJobHere(47, 5, mgrFwdr, IpcIoFile, requestTimedOut);
-}
+ const int requestId = i->first;
+ debugs(47, 7, HERE << "disker timeout; ipcIo" <<
+ KidIdentifier << '.' << requestId);
-/// Called when Coordinator fails to start processing the request [in time]
-void
-IpcIoFile::requestTimedOut()
-{
- debugs(47, 3, HERE);
- assert(false); // TODO: notify the pending request (XXX: which one?)
+ pending->completeIo(NULL); // no response
+ delete pending; // XXX: leaking if throwing
+ }
+ elders.clear();
+
+ swap(TheOlderRequests, TheNewerRequests); // switches pointers around
+ if (!TheOlderRequests->empty())
+ ScheduleTimeoutCheck();
}
-/// called when we are no longer waiting for Coordinator to respond
+/// prepare to check for timeouts in a little while
void
-IpcIoFile::removeTimeoutEvent()
+IpcIoFile::ScheduleTimeoutCheck()
{
- if (eventFind(&IpcIoFile::RequestTimedOut, this))
- eventDelete(&IpcIoFile::RequestTimedOut, this);
+ // we check all older requests at once so some may be wait for 2*timeout
+ const double timeout = 7; // in seconds
+ eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
+ NULL, timeout, 0, false);
+ TimeoutCheckScheduled = true;
}
/// returns and forgets the right IpcIoFile pending request
{
debugs(47, 3, HERE);
Must(requestId != 0);
- RequestsMap::iterator i = TheRequestsMap.find(requestId);
- if (i != TheRequestsMap.end()) {
- IpcIoPendingRequest *pending = i->second;
- TheRequestsMap.erase(i);
- return pending;
- }
- return NULL;
+
+ RequestMap *map = NULL;
+ RequestMap::iterator i = TheRequestMap1.find(requestId);
+
+ if (i != TheRequestMap1.end())
+ map = &TheRequestMap1;
+ else {
+ i = TheRequestMap2.find(requestId);
+ if (i != TheRequestMap2.end())
+ map = &TheRequestMap2;
+ }
+
+ if (!map) // not found in both maps
+ return NULL;
+
+ IpcIoPendingRequest *pending = i->second;
+ map->erase(i);
+ return pending;
}
int
}
-/* IpcIoPendingRequest: */
+/* IpcIoPendingRequest */
IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
file(aFile), readRequest(NULL), writeRequest(NULL)
{
}
+void
+IpcIoPendingRequest::completeIo(IpcIoResponse *response)
+{
+ Must(file != NULL);
+
+ if (readRequest)
+ file->readCompleted(readRequest, response);
+ else
+ if (writeRequest)
+ file->writeCompleted(writeRequest, response);
+ else
+ file->openCompleted(response);
+}
+
+
/* XXX: disker code that should probably be moved elsewhere */
/// disker entry point for remote I/O requests
static void HandleRequest(const IpcIoRequest &request);
+protected:
+ friend class IpcIoPendingRequest;
+ void openCompleted(const IpcIoResponse *response);
+ void readCompleted(ReadRequest *readRequest, const IpcIoResponse *);
+ void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *);
+
private:
void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
- void openCompleted(const IpcIoResponse &);
- void readCompleted(ReadRequest *readRequest, const IpcIoResponse &);
- void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse &);
-
- static void RequestTimedOut(void* param);
- void requestTimedOut();
- void removeTimeoutEvent();
static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+ static void CheckTimeouts(void* param);
+ static void ScheduleTimeoutCheck();
+
private:
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
bool error_; ///< whether we have seen at least one I/O error (XXX)
/// maps requestId to the handleResponse callback
- typedef std::map<unsigned int, IpcIoPendingRequest*> RequestsMap;
- static RequestsMap TheRequestsMap; ///< pending requests map
+ typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
+ static RequestMap TheRequestMap1; ///< older (or newer) pending requests
+ static RequestMap TheRequestMap2; ///< newer (or older) pending requests
+ static RequestMap *TheOlderRequests; ///< older requests (map1 or map2)
+ static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1)
+ static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call
static unsigned int LastRequestId; ///< last requestId used
public:
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
+ /// called when response is received and, with a nil response, on timeouts
+ void completeIo(IpcIoResponse *response);
+
public:
IpcIoFile::Pointer file; ///< the file object waiting for the response
ReadRequest *readRequest; ///< set if this is a read requests