]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Ignore SMP queue responses made stale by worker restarts (#711)
authorEduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
Wed, 12 Aug 2020 22:47:14 +0000 (22:47 +0000)
committerSquid Anubis <squid-anubis@squid-cache.org>
Thu, 13 Aug 2020 11:39:17 +0000 (11:39 +0000)
When a worker restarts (for any reason), the disker-to-worker queue may
contain disker responses to I/O requests sent by the previous
incarnation of the restarted worker process (the "previous generation"
responses). Since the current response:request mapping mechanism relies
on a 32-bit integer counter, and a worker process always starts counting
from 0, there is a small chance that the restarted worker may see a
previous generation response that accidentally matches the current
generation request ID.

For writing transactions, accepting a previous generation response may
mean unlocking a cache entry too soon, making not yet written slots
available to other workers that might read wrong content. For reading
transactions, accepting a previous generation response may mean
immediately serving wrong response content (that have been already
overwritten on disk with the information that the restarted worker is
now waiting for).

To avoid these problems, each disk I/O request now stores the worker
process ID. Workers ignore responses to requests originated by a
different/mismatching worker ID.

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h

index e97487de59583f485de7e262a5a95c232f62f2eb..95f1a96e9f48c6ef345abb4f28e84a65386ca21b 100644 (file)
@@ -92,10 +92,15 @@ operator <<(std::ostream &os, const IpcIo::Command command)
 /* IpcIoFile */
 
 IpcIoFile::IpcIoFile(char const *aDb):
-    dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
+    dbName(aDb),
+    myPid(getpid()),
+    diskId(-1),
+    error_(false),
+    lastRequestId(0),
     olderRequests(&requestMap1), newerRequests(&requestMap2),
     timeoutCheckScheduled(false)
 {
+    assert(myPid >= 0);
 }
 
 IpcIoFile::~IpcIoFile()
@@ -138,7 +143,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
         queue->localRateLimit().store(config.ioRate);
 
-        Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
+        Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid));
         ann.strand.tag = dbName;
         Ipc::TypedMsgHdr message;
         ann.pack(message);
@@ -369,6 +374,7 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
             ++lastRequestId;
         ipcIo.requestId = lastRequestId;
         ipcIo.start = current_time;
+        ipcIo.workerPid = myPid;
         if (pending->readRequest) {
             ipcIo.command = IpcIo::cmdRead;
             ipcIo.offset = pending->readRequest->offset;
@@ -489,7 +495,10 @@ IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
     if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
         CallBack(pending->codeContext, [&] {
             debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
-            pending->completeIo(&ipcIo);
+            if (myPid == ipcIo.workerPid)
+                pending->completeIo(&ipcIo);
+            else
+                debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
             delete pending; // XXX: leaking if throwing
         });
     } else {
@@ -655,6 +664,7 @@ IpcIoMsg::IpcIoMsg():
     requestId(0),
     offset(0),
     len(0),
+    workerPid(-1), // Unix-like systems use process IDs starting from 0
     command(IpcIo::cmdNone),
     xerrno(0)
 {
@@ -670,6 +680,7 @@ IpcIoMsg::stat(std::ostream &os)
     os << "id: " << requestId <<
         ", offset: " << offset <<
         ", size: " << len <<
+        ", workerPid: " << workerPid <<
         ", page: " << page <<
         ", command: " << command <<
         ", start: " << start <<
@@ -818,9 +829,9 @@ IpcIoFile::WaitBeforePop()
         return false;
 
     // is there an I/O request we could potentially delay?
-    int processId;
+    int kidId;
     IpcIoMsg ipcIo;
-    if (!queue->peek(processId, ipcIo)) {
+    if (!queue->peek(kidId, ipcIo)) {
         // unlike pop(), peek() is not reliable and does not block reader
         // so we must proceed with pop() even if it is likely to fail
         return false;
@@ -926,11 +937,16 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
            ipcIo.len << " at " << ipcIo.offset <<
            " ipcIo" << workerId << '.' << ipcIo.requestId);
 
+    const auto workerPid = ipcIo.workerPid;
+    assert(workerPid >= 0);
+
     if (ipcIo.command == IpcIo::cmdRead)
         diskerRead(ipcIo);
     else // ipcIo.command == IpcIo::cmdWrite
         diskerWrite(ipcIo);
 
+    assert(ipcIo.workerPid == workerPid);
+
     debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
 
     try {
index 1c5e5c4ae869159d487f2aa9960c666b7b5297c0..a15227c9234b3c8ee56708c5408ea362b5732297 100644 (file)
@@ -51,6 +51,7 @@ public:
     off_t offset;
     size_t len;
     Ipc::Mem::PageId page;
+    pid_t workerPid; ///< the process ID of the I/O requestor
 
     IpcIo::Command command; ///< what disker is supposed to do or did
     struct timeval start; ///< when the I/O request was converted to IpcIoMsg
@@ -129,7 +130,8 @@ private:
 
 private:
     const String dbName; ///< the name of the file we are managing
-    int diskId; ///< the process ID of the disker we talk to
+    const pid_t myPid; ///< optimization: cached process ID of our process
+    int diskId; ///< the kid ID of the disker we talk to
     RefCount<IORequestor> ioRequestor;
 
     bool error_; ///< whether we have seen at least one I/O error (XXX)