From: Alex Rousskov Date: Thu, 3 Feb 2011 05:33:05 +0000 (-0700) Subject: Support IpcIO timeouts. X-Git-Tag: take02~20 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=caca86d7e35c502dbcf4d9a6d409fa19f066354f;p=thirdparty%2Fsquid.git Support IpcIO timeouts. Penging IpcIo requests are now stored in two alternating maps: "old" and "new". Every T seconds, any requests remaining in the "old" map are treated as timed out. After that check, the current "new" and (now empty) "old" map pointers are swapped so that the previously "new" requests can now age for T seconds. New requests are always added to the "new" map. Responses are always checked against both maps. This approach gives us access to pending request information and allows to report errors to the right I/O requestors without creating additional per-request state attached to a per-request timeout event. The price is (a) two instead of one map lookups when the response comes and (b) timeout precision decrease from "about T" to "anywhere from T to 2*T". --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 300b746d35..0f7cd69f78 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -17,7 +17,11 @@ CBDATA_CLASS_INIT(IpcIoFile); -IpcIoFile::RequestsMap IpcIoFile::TheRequestsMap; +IpcIoFile::RequestMap IpcIoFile::TheRequestMap1; +IpcIoFile::RequestMap IpcIoFile::TheRequestMap2; +IpcIoFile::RequestMap *IpcIoFile::TheOlderRequests = &IpcIoFile::TheRequestMap1; +IpcIoFile::RequestMap *IpcIoFile::TheNewerRequests = &IpcIoFile::TheRequestMap2; +bool IpcIoFile::TimeoutCheckScheduled = false; unsigned int IpcIoFile::LastRequestId = 0; static bool DiskerOpen(const String &path, int flags, mode_t mode); @@ -70,12 +74,16 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) } void -IpcIoFile::openCompleted(const IpcIoResponse &ipcResponse) { - if (ipcResponse.xerrno) { - debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno)); +IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) { + if (!ipcResponse) { + debugs(79,1, HERE << "error: timeout"); + error_ = true; + } else + if (ipcResponse->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno)); error_ = true; } else { - diskId = ipcResponse.diskId; + diskId = ipcResponse->diskId; if (diskId < 0) { error_ = true; debugs(79,1, HERE << "error: no disker claimed " << dbName); @@ -154,17 +162,22 @@ IpcIoFile::read(ReadRequest *readRequest) void IpcIoFile::readCompleted(ReadRequest *readRequest, - const IpcIoResponse &ipcResponse) -{ - if (ipcResponse.xerrno) { - debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno)); - error_ = true; + const IpcIoResponse *ipcResponse) +{ + bool ioError = false; + if (!ipcResponse) { + debugs(79,1, HERE << "error: timeout"); + ioError = true; // I/O timeout does not warrant setting error_? + } else + if (ipcResponse->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno)); + ioError = error_ = true; } else { - memcpy(readRequest->buf, ipcResponse.buf, ipcResponse.len); + memcpy(readRequest->buf, ipcResponse->buf, ipcResponse->len); } - const ssize_t rlen = error_ ? -1 : (ssize_t)readRequest->len; - const int errflag = error_ ? DISK_ERROR : DISK_OK; + const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; + const int errflag = ioError ? DISK_ERROR : DISK_OK; ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest); } @@ -198,27 +211,32 @@ IpcIoFile::write(WriteRequest *writeRequest) void IpcIoFile::writeCompleted(WriteRequest *writeRequest, - const IpcIoResponse &ipcResponse) -{ - if (ipcResponse.xerrno) { - debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno)); + const IpcIoResponse *ipcResponse) +{ + bool ioError = false; + if (!ipcResponse) { + debugs(79,1, HERE << "error: timeout"); + ioError = true; // I/O timeout does not warrant setting error_? + } else + if (ipcResponse->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno)); error_ = true; } else - if (ipcResponse.len != writeRequest->len) { - debugs(79,1, HERE << "problem: " << ipcResponse.len << " < " << writeRequest->len); + if (ipcResponse->len != writeRequest->len) { + debugs(79,1, HERE << "problem: " << ipcResponse->len << " < " << writeRequest->len); error_ = true; } if (writeRequest->free_func) (writeRequest->free_func)(const_cast(writeRequest->buf)); // broken API? - if (!error_) { + if (!ioError) { debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << diskId << " at " << writeRequest->offset); } - const ssize_t rlen = error_ ? 0 : (ssize_t)writeRequest->len; - const int errflag = error_ ? DISK_ERROR : DISK_OK; + const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; + const int errflag = ioError ? DISK_ERROR : DISK_OK; ioRequestor->writeCompleted(errflag, rlen, writeRequest); } @@ -235,7 +253,7 @@ IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending) if (++LastRequestId == 0) // don't use zero value as requestId ++LastRequestId; ipcIo.requestId = LastRequestId; - TheRequestsMap[ipcIo.requestId] = pending; + TheNewerRequests->insert(std::make_pair(ipcIo.requestId, pending)); Ipc::TypedMsgHdr message; ipcIo.pack(message); @@ -251,9 +269,8 @@ IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending) Ipc::SendMessage(addr, message); ++ioLevel; - const double timeout = 10; // in seconds - eventAdd("IpcIoFile::requestTimedOut", &IpcIoFile::RequestTimedOut, - this, timeout, 0, false); + if (!TimeoutCheckScheduled) + ScheduleTimeoutCheck(); } /// called when disker responds to our I/O request @@ -263,56 +280,56 @@ IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw) IpcIoResponse response(raw); const int requestId = response.requestId; - debugs(47, 7, HERE << "disker response to " << - response.command << "; ipcIo" << KidIdentifier << '.' << requestId); + debugs(47, 7, HERE << "disker response to " << response.command << + "; ipcIo" << KidIdentifier << '.' << requestId); Must(requestId != 0); - IpcIoPendingRequest *pending = DequeueRequest(requestId); - Must(pending); - - if (pending->readRequest) - pending->file->readCompleted(pending->readRequest, response); - else - if (pending->writeRequest) - pending->file->writeCompleted(pending->writeRequest, response); - else - pending->file->openCompleted(response); - - // XXX: leaking if throwinig - delete pending; + if (IpcIoPendingRequest *pending = DequeueRequest(requestId)) { + pending->completeIo(&response); + delete pending; // XXX: leaking if throwing + } else { + debugs(47, 4, HERE << "LATE disker response to " << response.command << + "; ipcIo" << KidIdentifier << '.' << requestId); + // nothing we can do about it; completeIo() has been called already + } } - -/// Mgr::IpcIoFile::requestTimedOut wrapper +/// Mgr::IpcIoFile::checkTimeous wrapper void -IpcIoFile::RequestTimedOut(void* param) +IpcIoFile::CheckTimeouts(void*) { - debugs(47, 1, HERE << "bug: request timedout and we cannot handle that"); - Must(param != NULL); - // XXX: cannot get to file because IpcIoFile is not cbdata-protected - // IpcIoFile* file = static_cast(param); + TimeoutCheckScheduled = false; - // TODO: notify the pending request (XXX: which one?) + // any old request would have timed out by now + typedef RequestMap::const_iterator RMCI; + RequestMap &elders = *TheOlderRequests; + for (RMCI i = elders.begin(); i != elders.end(); ++i) { + IpcIoPendingRequest *pending = i->second; - // use async call to enable job call protection that time events lack - // CallJobHere(47, 5, mgrFwdr, IpcIoFile, requestTimedOut); -} + const int requestId = i->first; + debugs(47, 7, HERE << "disker timeout; ipcIo" << + KidIdentifier << '.' << requestId); -/// Called when Coordinator fails to start processing the request [in time] -void -IpcIoFile::requestTimedOut() -{ - debugs(47, 3, HERE); - assert(false); // TODO: notify the pending request (XXX: which one?) + pending->completeIo(NULL); // no response + delete pending; // XXX: leaking if throwing + } + elders.clear(); + + swap(TheOlderRequests, TheNewerRequests); // switches pointers around + if (!TheOlderRequests->empty()) + ScheduleTimeoutCheck(); } -/// called when we are no longer waiting for Coordinator to respond +/// prepare to check for timeouts in a little while void -IpcIoFile::removeTimeoutEvent() +IpcIoFile::ScheduleTimeoutCheck() { - if (eventFind(&IpcIoFile::RequestTimedOut, this)) - eventDelete(&IpcIoFile::RequestTimedOut, this); + // we check all older requests at once so some may be wait for 2*timeout + const double timeout = 7; // in seconds + eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, + NULL, timeout, 0, false); + TimeoutCheckScheduled = true; } /// returns and forgets the right IpcIoFile pending request @@ -321,13 +338,24 @@ IpcIoFile::DequeueRequest(unsigned int requestId) { debugs(47, 3, HERE); Must(requestId != 0); - RequestsMap::iterator i = TheRequestsMap.find(requestId); - if (i != TheRequestsMap.end()) { - IpcIoPendingRequest *pending = i->second; - TheRequestsMap.erase(i); - return pending; - } - return NULL; + + RequestMap *map = NULL; + RequestMap::iterator i = TheRequestMap1.find(requestId); + + if (i != TheRequestMap1.end()) + map = &TheRequestMap1; + else { + i = TheRequestMap2.find(requestId); + if (i != TheRequestMap2.end()) + map = &TheRequestMap2; + } + + if (!map) // not found in both maps + return NULL; + + IpcIoPendingRequest *pending = i->second; + map->erase(i); + return pending; } int @@ -415,13 +443,28 @@ IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const } -/* IpcIoPendingRequest: */ +/* IpcIoPendingRequest */ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): file(aFile), readRequest(NULL), writeRequest(NULL) { } +void +IpcIoPendingRequest::completeIo(IpcIoResponse *response) +{ + Must(file != NULL); + + if (readRequest) + file->readCompleted(readRequest, response); + else + if (writeRequest) + file->writeCompleted(writeRequest, response); + else + file->openCompleted(response); +} + + /* XXX: disker code that should probably be moved elsewhere */ diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index 3b23c40227..10181eae1a 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -89,18 +89,20 @@ public: /// disker entry point for remote I/O requests static void HandleRequest(const IpcIoRequest &request); +protected: + friend class IpcIoPendingRequest; + void openCompleted(const IpcIoResponse *response); + void readCompleted(ReadRequest *readRequest, const IpcIoResponse *); + void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *); + private: void send(IpcIoRequest &request, IpcIoPendingRequest *pending); - void openCompleted(const IpcIoResponse &); - void readCompleted(ReadRequest *readRequest, const IpcIoResponse &); - void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse &); - - static void RequestTimedOut(void* param); - void requestTimedOut(); - void removeTimeoutEvent(); static IpcIoPendingRequest *DequeueRequest(unsigned int requestId); + static void CheckTimeouts(void* param); + static void ScheduleTimeoutCheck(); + private: const String dbName; ///< the name of the file we are managing int diskId; ///< the process ID of the disker we talk to @@ -111,8 +113,12 @@ private: bool error_; ///< whether we have seen at least one I/O error (XXX) /// maps requestId to the handleResponse callback - typedef std::map RequestsMap; - static RequestsMap TheRequestsMap; ///< pending requests map + typedef std::map RequestMap; + static RequestMap TheRequestMap1; ///< older (or newer) pending requests + static RequestMap TheRequestMap2; ///< newer (or older) pending requests + static RequestMap *TheOlderRequests; ///< older requests (map1 or map2) + static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1) + static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call static unsigned int LastRequestId; ///< last requestId used @@ -126,6 +132,9 @@ class IpcIoPendingRequest public: IpcIoPendingRequest(const IpcIoFile::Pointer &aFile); + /// called when response is received and, with a nil response, on timeouts + void completeIo(IpcIoResponse *response); + public: IpcIoFile::Pointer file; ///< the file object waiting for the response ReadRequest *readRequest; ///< set if this is a read requests