]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Support IpcIO timeouts.
authorAlex Rousskov <rousskov@measurement-factory.com>
Thu, 3 Feb 2011 05:33:05 +0000 (22:33 -0700)
committerAlex Rousskov <rousskov@measurement-factory.com>
Thu, 3 Feb 2011 05:33:05 +0000 (22:33 -0700)
Penging IpcIo requests are now stored in two alternating maps: "old" and
"new".  Every T seconds, any requests remaining in the "old" map are treated
as timed out.  After that check, the current "new" and (now empty) "old" map
pointers are swapped so that the previously "new" requests can now age for T
seconds.  New requests are always added to the "new" map. Responses are
always checked against both maps.

This approach gives us access to pending request information and allows to
report errors to the right I/O requestors without creating additional
per-request state attached to a per-request timeout event. The price is (a)
two instead of one map lookups when the response comes and (b) timeout
precision decrease from "about T" to "anywhere from T to 2*T".

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h

index 300b746d351548c3931e7fa90eee8ed6447cc281..0f7cd69f78740f84552289568ca5b1d0a683c17c 100644 (file)
 
 CBDATA_CLASS_INIT(IpcIoFile);
 
-IpcIoFile::RequestsMap IpcIoFile::TheRequestsMap;
+IpcIoFile::RequestMap IpcIoFile::TheRequestMap1;
+IpcIoFile::RequestMap IpcIoFile::TheRequestMap2;
+IpcIoFile::RequestMap *IpcIoFile::TheOlderRequests = &IpcIoFile::TheRequestMap1;
+IpcIoFile::RequestMap *IpcIoFile::TheNewerRequests = &IpcIoFile::TheRequestMap2;
+bool IpcIoFile::TimeoutCheckScheduled = false;
 unsigned int IpcIoFile::LastRequestId = 0;
 
 static bool DiskerOpen(const String &path, int flags, mode_t mode);
@@ -70,12 +74,16 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 }
 
 void
-IpcIoFile::openCompleted(const IpcIoResponse &ipcResponse) {
-    if (ipcResponse.xerrno) {
-        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) {
+    if (!ipcResponse) {
+        debugs(79,1, HERE << "error: timeout");
+        error_ = true;
+       } else
+    if (ipcResponse->xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
         error_ = true;
        } else {
-        diskId = ipcResponse.diskId;
+        diskId = ipcResponse->diskId;
         if (diskId < 0) {
             error_ = true;
             debugs(79,1, HERE << "error: no disker claimed " << dbName);
@@ -154,17 +162,22 @@ IpcIoFile::read(ReadRequest *readRequest)
 
 void
 IpcIoFile::readCompleted(ReadRequest *readRequest,
-                         const IpcIoResponse &ipcResponse)
-{
-    if (ipcResponse.xerrno) {
-        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
-        error_ = true;
+                         const IpcIoResponse *ipcResponse)
+{
+    bool ioError = false;
+    if (!ipcResponse) {
+        debugs(79,1, HERE << "error: timeout");
+        ioError = true; // I/O timeout does not warrant setting error_?
+       } else
+    if (ipcResponse->xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
+        ioError = error_ = true;
        } else {
-        memcpy(readRequest->buf, ipcResponse.buf, ipcResponse.len);
+        memcpy(readRequest->buf, ipcResponse->buf, ipcResponse->len);
     }
 
-    const ssize_t rlen = error_ ? -1 : (ssize_t)readRequest->len;
-    const int errflag = error_ ? DISK_ERROR : DISK_OK;
+    const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
+    const int errflag = ioError ? DISK_ERROR : DISK_OK;
     ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
 }
 
@@ -198,27 +211,32 @@ IpcIoFile::write(WriteRequest *writeRequest)
 
 void
 IpcIoFile::writeCompleted(WriteRequest *writeRequest,
-                          const IpcIoResponse &ipcResponse)
-{
-    if (ipcResponse.xerrno) {
-        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+                          const IpcIoResponse *ipcResponse)
+{
+    bool ioError = false;
+    if (!ipcResponse) {
+        debugs(79,1, HERE << "error: timeout");
+        ioError = true; // I/O timeout does not warrant setting error_?
+       } else
+    if (ipcResponse->xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
         error_ = true;
     } else
-    if (ipcResponse.len != writeRequest->len) {
-        debugs(79,1, HERE << "problem: " << ipcResponse.len << " < " << writeRequest->len);
+    if (ipcResponse->len != writeRequest->len) {
+        debugs(79,1, HERE << "problem: " << ipcResponse->len << " < " << writeRequest->len);
         error_ = true;
     }
 
     if (writeRequest->free_func)
         (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
 
-    if (!error_) {
+    if (!ioError) {
         debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
             diskId << " at " << writeRequest->offset);
        }
 
-    const ssize_t rlen = error_ ? 0 : (ssize_t)writeRequest->len;
-    const int errflag = error_ ? DISK_ERROR : DISK_OK;
+    const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
+    const int errflag = ioError ? DISK_ERROR : DISK_OK;
     ioRequestor->writeCompleted(errflag, rlen, writeRequest);
 }
 
@@ -235,7 +253,7 @@ IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending)
     if (++LastRequestId == 0) // don't use zero value as requestId
         ++LastRequestId;
     ipcIo.requestId = LastRequestId;
-    TheRequestsMap[ipcIo.requestId] = pending;
+    TheNewerRequests->insert(std::make_pair(ipcIo.requestId, pending));
 
     Ipc::TypedMsgHdr message;
     ipcIo.pack(message);
@@ -251,9 +269,8 @@ IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending)
     Ipc::SendMessage(addr, message);
     ++ioLevel;
 
-    const double timeout = 10; // in seconds
-    eventAdd("IpcIoFile::requestTimedOut", &IpcIoFile::RequestTimedOut,
-             this, timeout, 0, false);
+    if (!TimeoutCheckScheduled)
+        ScheduleTimeoutCheck();
 }
 
 /// called when disker responds to our I/O request
@@ -263,56 +280,56 @@ IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw)
     IpcIoResponse response(raw);
 
     const int requestId = response.requestId;
-    debugs(47, 7, HERE << "disker response to " <<
-        response.command << "; ipcIo" << KidIdentifier << '.' << requestId);
+    debugs(47, 7, HERE << "disker response to " << response.command <<
+        "; ipcIo" << KidIdentifier << '.' << requestId);
 
     Must(requestId != 0);
 
-    IpcIoPendingRequest *pending = DequeueRequest(requestId);
-    Must(pending);
-
-    if (pending->readRequest)
-        pending->file->readCompleted(pending->readRequest, response);
-    else
-    if (pending->writeRequest)
-        pending->file->writeCompleted(pending->writeRequest, response);
-    else
-        pending->file->openCompleted(response);
-
-    // XXX: leaking if throwinig
-    delete pending;
+    if (IpcIoPendingRequest *pending = DequeueRequest(requestId)) {
+        pending->completeIo(&response);
+        delete pending; // XXX: leaking if throwing
+    } else {
+        debugs(47, 4, HERE << "LATE disker response to " << response.command <<
+            "; ipcIo" << KidIdentifier << '.' << requestId);
+        // nothing we can do about it; completeIo() has been called already
+    }
 }
 
-
-/// Mgr::IpcIoFile::requestTimedOut wrapper
+/// Mgr::IpcIoFile::checkTimeous wrapper
 void
-IpcIoFile::RequestTimedOut(void* param)
+IpcIoFile::CheckTimeouts(void*)
 {
-    debugs(47, 1, HERE << "bug: request timedout and we cannot handle that");
-    Must(param != NULL);
-    // XXX: cannot get to file because IpcIoFile is not cbdata-protected
-    // IpcIoFile* file = static_cast<IpcIoFile*>(param);
+    TimeoutCheckScheduled = false;
 
-    // TODO: notify the pending request (XXX: which one?)
+    // any old request would have timed out by now
+    typedef RequestMap::const_iterator RMCI;
+    RequestMap &elders = *TheOlderRequests;
+    for (RMCI i = elders.begin(); i != elders.end(); ++i) {
+        IpcIoPendingRequest *pending = i->second;
 
-    // use async call to enable job call protection that time events lack
-    // CallJobHere(47, 5, mgrFwdr, IpcIoFile, requestTimedOut);
-}
+        const int requestId = i->first;
+        debugs(47, 7, HERE << "disker timeout; ipcIo" <<
+            KidIdentifier << '.' << requestId);
 
-/// Called when Coordinator fails to start processing the request [in time]
-void
-IpcIoFile::requestTimedOut()
-{
-    debugs(47, 3, HERE);
-    assert(false); // TODO: notify the pending request (XXX: which one?)
+        pending->completeIo(NULL); // no response
+        delete pending; // XXX: leaking if throwing
+       }
+    elders.clear();
+
+    swap(TheOlderRequests, TheNewerRequests); // switches pointers around
+    if (!TheOlderRequests->empty())
+        ScheduleTimeoutCheck();
 }
 
-/// called when we are no longer waiting for Coordinator to respond
+/// prepare to check for timeouts in a little while
 void
-IpcIoFile::removeTimeoutEvent()
+IpcIoFile::ScheduleTimeoutCheck()
 {
-    if (eventFind(&IpcIoFile::RequestTimedOut, this))
-        eventDelete(&IpcIoFile::RequestTimedOut, this);
+    // we check all older requests at once so some may be wait for 2*timeout
+    const double timeout = 7; // in seconds
+    eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
+             NULL, timeout, 0, false);
+    TimeoutCheckScheduled = true;
 }
 
 /// returns and forgets the right IpcIoFile pending request
@@ -321,13 +338,24 @@ IpcIoFile::DequeueRequest(unsigned int requestId)
 {
     debugs(47, 3, HERE);
     Must(requestId != 0);
-    RequestsMap::iterator i = TheRequestsMap.find(requestId);
-    if (i != TheRequestsMap.end()) {
-        IpcIoPendingRequest *pending = i->second;
-        TheRequestsMap.erase(i);
-        return pending;
-       }
-    return NULL;
+
+       RequestMap *map = NULL;
+    RequestMap::iterator i = TheRequestMap1.find(requestId);
+
+    if (i != TheRequestMap1.end())
+        map = &TheRequestMap1;
+    else {
+        i = TheRequestMap2.find(requestId);
+        if (i != TheRequestMap2.end())
+            map = &TheRequestMap2;
+    }
+
+    if (!map) // not found in both maps
+        return NULL;
+
+    IpcIoPendingRequest *pending = i->second;
+    map->erase(i);
+    return pending;
 }
 
 int
@@ -415,13 +443,28 @@ IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const
 }
 
 
-/* IpcIoPendingRequest: */
+/* IpcIoPendingRequest */
 
 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
     file(aFile), readRequest(NULL), writeRequest(NULL)
 {
 }
 
+void
+IpcIoPendingRequest::completeIo(IpcIoResponse *response)
+{
+    Must(file != NULL);
+
+    if (readRequest)
+        file->readCompleted(readRequest, response);
+    else
+    if (writeRequest)
+        file->writeCompleted(writeRequest, response);
+    else
+        file->openCompleted(response);
+}
+
+
 
 /* XXX: disker code that should probably be moved elsewhere */
 
index 3b23c402276c8cec4a965fc2ee8b1e3cdfa556e9..10181eae1a00484d2a56c3b1b959d8c668ac4f1c 100644 (file)
@@ -89,18 +89,20 @@ public:
     /// disker entry point for remote I/O requests
     static void HandleRequest(const IpcIoRequest &request);
 
+protected:
+    friend class IpcIoPendingRequest;
+    void openCompleted(const IpcIoResponse *response);
+    void readCompleted(ReadRequest *readRequest, const IpcIoResponse *);
+    void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *);
+
 private:
     void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
 
-    void openCompleted(const IpcIoResponse &);
-    void readCompleted(ReadRequest *readRequest, const IpcIoResponse &);
-    void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse &);
-
-    static void RequestTimedOut(void* param);
-    void requestTimedOut();
-    void removeTimeoutEvent();
     static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
 
+    static void CheckTimeouts(void* param);
+    static void ScheduleTimeoutCheck();
+
 private:
     const String dbName; ///< the name of the file we are managing
     int diskId; ///< the process ID of the disker we talk to
@@ -111,8 +113,12 @@ private:
     bool error_; ///< whether we have seen at least one I/O error (XXX)
 
     /// maps requestId to the handleResponse callback
-    typedef std::map<unsigned int, IpcIoPendingRequest*> RequestsMap;
-    static RequestsMap TheRequestsMap; ///< pending requests map
+    typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
+    static RequestMap TheRequestMap1; ///< older (or newer) pending requests
+    static RequestMap TheRequestMap2; ///< newer (or older) pending requests
+    static RequestMap *TheOlderRequests; ///< older requests (map1 or map2)
+    static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1)
+    static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call
 
     static unsigned int LastRequestId; ///< last requestId used
 
@@ -126,6 +132,9 @@ class IpcIoPendingRequest
 public:
     IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
 
+    /// called when response is received and, with a nil response, on timeouts
+    void completeIo(IpcIoResponse *response);
+
 public:
     IpcIoFile::Pointer file; ///< the file object waiting for the response
     ReadRequest *readRequest; ///< set if this is a read requests