]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Ignore SMP queue responses made stale by worker restarts (#711)
authorEduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
Wed, 12 Aug 2020 22:47:14 +0000 (22:47 +0000)
committerAmos Jeffries <yadij@users.noreply.github.com>
Tue, 17 Nov 2020 01:10:07 +0000 (14:10 +1300)
When a worker restarts (for any reason), the disker-to-worker queue may
contain disker responses to I/O requests sent by the previous
incarnation of the restarted worker process (the "previous generation"
responses). Since the current response:request mapping mechanism relies
on a 32-bit integer counter, and a worker process always starts counting
from 0, there is a small chance that the restarted worker may see a
previous generation response that accidentally matches the current
generation request ID.

For writing transactions, accepting a previous generation response may
mean unlocking a cache entry too soon, making not yet written slots
available to other workers that might read wrong content. For reading
transactions, accepting a previous generation response may mean
immediately serving wrong response content (that have been already
overwritten on disk with the information that the restarted worker is
now waiting for).

To avoid these problems, each disk I/O request now stores the worker
process ID. Workers ignore responses to requests originated by a
different/mismatching worker ID.

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h

index 0dab628b371854fbb5036c5ed4b49747fc1864d8..71c5b01556b5ee5f0529c737fac8403cb806a8e5 100644 (file)
@@ -71,10 +71,15 @@ operator <<(std::ostream &os, const SipcIo &sio)
 }
 
 IpcIoFile::IpcIoFile(char const *aDb):
-    dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
+    dbName(aDb),
+    myPid(getpid()),
+    diskId(-1),
+    error_(false),
+    lastRequestId(0),
     olderRequests(&requestMap1), newerRequests(&requestMap2),
     timeoutCheckScheduled(false)
 {
+    assert(myPid >= 0);
 }
 
 IpcIoFile::~IpcIoFile()
@@ -117,7 +122,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
         queue->localRateLimit().store(config.ioRate);
 
-        Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
+        Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid));
         ann.strand.tag = dbName;
         Ipc::TypedMsgHdr message;
         ann.pack(message);
@@ -348,6 +353,7 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
             ++lastRequestId;
         ipcIo.requestId = lastRequestId;
         ipcIo.start = current_time;
+        ipcIo.workerPid = myPid;
         if (pending->readRequest) {
             ipcIo.command = IpcIo::cmdRead;
             ipcIo.offset = pending->readRequest->offset;
@@ -468,7 +474,10 @@ IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
     if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
         CallBack(pending->codeContext, [&] {
             debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
-            pending->completeIo(&ipcIo);
+            if (myPid == ipcIo.workerPid)
+                pending->completeIo(&ipcIo);
+            else
+                debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
             delete pending; // XXX: leaking if throwing
         });
     } else {
@@ -625,6 +634,7 @@ IpcIoMsg::IpcIoMsg():
     requestId(0),
     offset(0),
     len(0),
+    workerPid(-1), // Unix-like systems use process IDs starting from 0
     command(IpcIo::cmdNone),
     xerrno(0)
 {
@@ -773,9 +783,9 @@ IpcIoFile::WaitBeforePop()
         return false;
 
     // is there an I/O request we could potentially delay?
-    int processId;
+    int kidId;
     IpcIoMsg ipcIo;
-    if (!queue->peek(processId, ipcIo)) {
+    if (!queue->peek(kidId, ipcIo)) {
         // unlike pop(), peek() is not reliable and does not block reader
         // so we must proceed with pop() even if it is likely to fail
         return false;
@@ -881,11 +891,16 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
            ipcIo.len << " at " << ipcIo.offset <<
            " ipcIo" << workerId << '.' << ipcIo.requestId);
 
+    const auto workerPid = ipcIo.workerPid;
+    assert(workerPid >= 0);
+
     if (ipcIo.command == IpcIo::cmdRead)
         diskerRead(ipcIo);
     else // ipcIo.command == IpcIo::cmdWrite
         diskerWrite(ipcIo);
 
+    assert(ipcIo.workerPid == workerPid);
+
     debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
 
     try {
index 2589f292632f1b83c4d96c45107263d76eb26886..d56b0af633696a3b6b386a322190aac5878c3398 100644 (file)
@@ -46,6 +46,7 @@ public:
     off_t offset;
     size_t len;
     Ipc::Mem::PageId page;
+    pid_t workerPid; ///< the process ID of the I/O requestor
 
     IpcIo::Command command; ///< what disker is supposed to do or did
     struct timeval start; ///< when the I/O request was converted to IpcIoMsg
@@ -121,7 +122,8 @@ private:
 
 private:
     const String dbName; ///< the name of the file we are managing
-    int diskId; ///< the process ID of the disker we talk to
+    const pid_t myPid; ///< optimization: cached process ID of our process
+    int diskId; ///< the kid ID of the disker we talk to
     RefCount<IORequestor> ioRequestor;
 
     bool error_; ///< whether we have seen at least one I/O error (XXX)