From: Eduard Bagdasaryan Date: Wed, 12 Aug 2020 22:47:14 +0000 (+0000) Subject: Ignore SMP queue responses made stale by worker restarts (#711) X-Git-Tag: 4.15-20210522-snapshot~71 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=30fe12bc8814768d8978c9c84771b08750c7f2fe;p=thirdparty%2Fsquid.git Ignore SMP queue responses made stale by worker restarts (#711) When a worker restarts (for any reason), the disker-to-worker queue may contain disker responses to I/O requests sent by the previous incarnation of the restarted worker process (the "previous generation" responses). Since the current response:request mapping mechanism relies on a 32-bit integer counter, and a worker process always starts counting from 0, there is a small chance that the restarted worker may see a previous generation response that accidentally matches the current generation request ID. For writing transactions, accepting a previous generation response may mean unlocking a cache entry too soon, making not yet written slots available to other workers that might read wrong content. For reading transactions, accepting a previous generation response may mean immediately serving wrong response content (that have been already overwritten on disk with the information that the restarted worker is now waiting for). To avoid these problems, each disk I/O request now stores the worker process ID. Workers ignore responses to requests originated by a different/mismatching worker ID. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index e97487de59..95f1a96e9f 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -92,10 +92,15 @@ operator <<(std::ostream &os, const IpcIo::Command command) /* IpcIoFile */ IpcIoFile::IpcIoFile(char const *aDb): - dbName(aDb), diskId(-1), error_(false), lastRequestId(0), + dbName(aDb), + myPid(getpid()), + diskId(-1), + error_(false), + lastRequestId(0), olderRequests(&requestMap1), newerRequests(&requestMap2), timeoutCheckScheduled(false) { + assert(myPid >= 0); } IpcIoFile::~IpcIoFile() @@ -138,7 +143,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) queue->localRateLimit().store(config.ioRate); - Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); + Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid)); ann.strand.tag = dbName; Ipc::TypedMsgHdr message; ann.pack(message); @@ -369,6 +374,7 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) ++lastRequestId; ipcIo.requestId = lastRequestId; ipcIo.start = current_time; + ipcIo.workerPid = myPid; if (pending->readRequest) { ipcIo.command = IpcIo::cmdRead; ipcIo.offset = pending->readRequest->offset; @@ -489,7 +495,10 @@ IpcIoFile::handleResponse(IpcIoMsg &ipcIo) if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { CallBack(pending->codeContext, [&] { debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId)); - pending->completeIo(&ipcIo); + if (myPid == ipcIo.workerPid) + pending->completeIo(&ipcIo); + else + debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid); delete pending; // XXX: leaking if throwing }); } else { @@ -655,6 +664,7 @@ IpcIoMsg::IpcIoMsg(): requestId(0), offset(0), len(0), + workerPid(-1), // Unix-like systems use process IDs starting from 0 command(IpcIo::cmdNone), xerrno(0) { @@ -670,6 +680,7 @@ IpcIoMsg::stat(std::ostream &os) os << "id: " << requestId << ", offset: " << offset << ", size: " << len << + ", workerPid: " << workerPid << ", page: " << page << ", command: " << command << ", start: " << start << @@ -818,9 +829,9 @@ IpcIoFile::WaitBeforePop() return false; // is there an I/O request we could potentially delay? - int processId; + int kidId; IpcIoMsg ipcIo; - if (!queue->peek(processId, ipcIo)) { + if (!queue->peek(kidId, ipcIo)) { // unlike pop(), peek() is not reliable and does not block reader // so we must proceed with pop() even if it is likely to fail return false; @@ -926,11 +937,16 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) ipcIo.len << " at " << ipcIo.offset << " ipcIo" << workerId << '.' << ipcIo.requestId); + const auto workerPid = ipcIo.workerPid; + assert(workerPid >= 0); + if (ipcIo.command == IpcIo::cmdRead) diskerRead(ipcIo); else // ipcIo.command == IpcIo::cmdWrite diskerWrite(ipcIo); + assert(ipcIo.workerPid == workerPid); + debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier)); try { diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index 1c5e5c4ae8..a15227c923 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -51,6 +51,7 @@ public: off_t offset; size_t len; Ipc::Mem::PageId page; + pid_t workerPid; ///< the process ID of the I/O requestor IpcIo::Command command; ///< what disker is supposed to do or did struct timeval start; ///< when the I/O request was converted to IpcIoMsg @@ -129,7 +130,8 @@ private: private: const String dbName; ///< the name of the file we are managing - int diskId; ///< the process ID of the disker we talk to + const pid_t myPid; ///< optimization: cached process ID of our process + int diskId; ///< the kid ID of the disker we talk to RefCount ioRequestor; bool error_; ///< whether we have seen at least one I/O error (XXX)