]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Kid restart leads to persistent queue overflows, delays/timeouts (#706)
authorEduard Bagdasaryan <eduard.bagdasaryan@measurement-factory.com>
Mon, 28 Mar 2022 04:51:53 +0000 (04:51 +0000)
committerSquid Anubis <squid-anubis@squid-cache.org>
Mon, 28 Mar 2022 05:05:31 +0000 (05:05 +0000)
    WARNING: communication with ... may be too slow or disrupted...
    WARNING: abandoning ... I/Os
    ERROR: worker I/O push queue for ... overflow...
    ERROR: Collapsed forwarding queue overflow...

SMP queues rely on the shared memory QueueReader::popSignal flag to
reduce the number of UDS messages that queue readers and writers need to
send each other. If the flag is true but there is no corresponding "wake
up, you have new queued items to read" UDS message for the reader, the
reader may stall. This happens when the reader restarts (e.g., after
hitting an assertion) while the flag is true. A stalled queue reader
leads to delays and queue overflows:

* When the problem affects worker-disker queues, disk I/O delays under
  the hard-coded 7-second timeout are not reported to the admin but may
  affect user experience. Larger delays trigger level-1 WARNINGs. Push
  queue overflows trigger level-1 ERRORs.

* Transient worker-worker queue problems may stall concurrent
  transactions that are reading from the cache entry being written by
  another process. Overflows trigger level-1 ERRORs.

The restarted worker usually starts working just fine because it does
not expect any messages. A busy restarted worker may also appear to
continue working fine because workers always pop queued items before
pushing new ones -- as long as the worker queues new items, it will see
and pop responses to earlier requests, masking the problem. However, the
"stuck popSignal" problem never goes away: Squid only clears the flag
when receiving a notification, but sending new notifications is blocked
by that stuck flag.

Upon kid start, we now clear popSignal (to reflect the actual
communication state) and empty the queue (to reduce overflows). Since
commit 4c21861, any corresponding in-flight UDS queue notification is
ignored because it was sent to the previous process playing the same kid
role. The queue writer will see the false popSignal flag and send a new
notification when queuing a new item, preventing queue stalls.

Also properly ignore stale disker responses about cache_dirs we have not
opened yet, especially since we are now trying to empty the queues ASAP
after a restart, before Coordinator has a chance to inform us about
available diskers, populating the IpcIoFiles container. We already have
similar protection from stale UDS messages and from stale disker queue
messages about _opened_ cache_dirs ("LATE disker response to...").

Also report SMP queue flags in mgr:store_queues.

src/CollapsedForwarding.cc
src/CollapsedForwarding.h
src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/base/AsyncFunCalls.h [new file with mode: 0644]
src/base/Makefile.am
src/ipc/Queue.cc
src/ipc/Queue.h

index 5ad397c0ee6e474f76c88327a3d108e58a48d6ab..2c002d9549fa2149eef72d050da5ba70a304125c 100644 (file)
@@ -9,6 +9,7 @@
 /* DEBUG: section 17    Request Forwarding */
 
 #include "squid.h"
+#include "base/AsyncFunCalls.h"
 #include "CollapsedForwarding.h"
 #include "globals.h"
 #include "ipc/mem/Segment.h"
@@ -57,8 +58,12 @@ void
 CollapsedForwarding::Init()
 {
     Must(!queue.get());
-    if (UsingSmp() && IamWorkerProcess())
+    if (UsingSmp() && IamWorkerProcess()) {
         queue.reset(new Queue(ShmLabel, KidIdentifier));
+        AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart",
+                                                NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart));
+        ScheduleCallHere(callback);
+    }
 }
 
 void
@@ -146,6 +151,16 @@ CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
     HandleNewData("after notification");
 }
 
+/// Handle queued IPC messages for the first time in this process lifetime, when
+/// the queue may be reflecting the state of our killed predecessor.
+void
+CollapsedForwarding::HandleNewDataAtStart()
+{
+    /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic
+    queue->clearAllReaderSignals();
+    HandleNewData("at start");
+}
+
 void
 CollapsedForwarding::StatQueue(std::ostream &os)
 {
index 7e2acfcc94a28bd2a4f67954ba7130f93a824ccb..0e1760b4fc15d475f5bc397b03778f7010a86632 100644 (file)
@@ -46,6 +46,8 @@ public:
     static void StatQueue(std::ostream &);
 
 private:
+    static void HandleNewDataAtStart();
+
     typedef Ipc::MultiQueue Queue;
     static std::unique_ptr<Queue> queue; ///< IPC queue
 };
index 54e65275499304569693322118ca4822da49c16b..e9f54a3a25fe2490eb6503446ad8c251122ca39e 100644 (file)
@@ -9,6 +9,7 @@
 /* DEBUG: section 47    Store Directory Routines */
 
 #include "squid.h"
+#include "base/AsyncFunCalls.h"
 #include "base/CodeContext.h"
 #include "base/RunnersRegistry.h"
 #include "base/TextException.h"
@@ -129,8 +130,12 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
     ioRequestor = callback;
     Must(diskId < 0); // we do not know our disker yet
 
-    if (!queue.get())
+    if (!queue.get()) {
         queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
+        AsyncCall::Pointer call = asyncCall(79, 4, "IpcIoFile::HandleMessagesAtStart",
+                                            NullaryFunDialer(&IpcIoFile::HandleMessagesAtStart));
+        ScheduleCallHere(call);
+    }
 
     if (IamDiskProcess()) {
         error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
@@ -475,7 +480,10 @@ IpcIoFile::HandleResponses(const char *const when)
     int diskId;
     while (queue->pop(diskId, ipcIo)) {
         const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
-        Must(i != IpcIoFiles.end()); // TODO: warn but continue
+        if (i == IpcIoFiles.end()) {
+            debugs(47, 5, "ignoring disk response " << SipcIo(KidIdentifier, ipcIo, diskId) << ": the file is not open");
+            continue;
+        }
         i->second->handleResponse(ipcIo);
     }
 }
@@ -525,6 +533,18 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
         HandleResponses("after notification");
 }
 
+/// \copydoc CollapsedForwarding::HandleNewDataAtStart()
+void
+IpcIoFile::HandleMessagesAtStart()
+{
+    /// \sa CollapsedForwarding::HandleNewDataAtStart() -- duplicates this logic
+    queue->clearAllReaderSignals();
+    if (IamDiskProcess())
+        DiskerHandleRequests();
+    else
+        HandleResponses("at start");
+}
+
 void
 IpcIoFile::StatQueue(std::ostream &os)
 {
index ddd2f70eadd1ec7cd07e298da9ccd4ddebb4bc5a..665f0d33e127929017d2cf8625da9c32c2a53963 100644 (file)
@@ -128,6 +128,8 @@ private:
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
     static bool WaitBeforePop();
 
+    static void HandleMessagesAtStart();
+
 private:
     const String dbName; ///< the name of the file we are managing
     const pid_t myPid; ///< optimization: cached process ID of our process
diff --git a/src/base/AsyncFunCalls.h b/src/base/AsyncFunCalls.h
new file mode 100644 (file)
index 0000000..42af64c
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_ASYNCFUNCALLS_H
+#define SQUID_BASE_ASYNCFUNCALLS_H
+
+#include "base/AsyncCall.h"
+
+#include <iostream>
+
+/// Calls a function without arguments. See also: NullaryMemFunT.
+class NullaryFunDialer: public CallDialer
+{
+public:
+    using Handler = void ();
+
+    explicit NullaryFunDialer(Handler * const aHandler): handler(aHandler) {}
+
+    /* CallDialer API */
+    bool canDial(AsyncCall &) { return bool(handler); }
+    void dial(AsyncCall &) { handler(); }
+    virtual void print(std::ostream &os) const override { os << "()"; }
+
+private:
+    Handler *handler; ///< the function to call (or nil)
+};
+
+#endif /* SQUID_BASE_ASYNCFUNCALLS_H */
+
index 3dd32c134153fa7ad0fce674c7f480647cceb98f..a5d35bd26cc4fa9eca00468580d9d91458126e8b 100644 (file)
@@ -16,6 +16,7 @@ libbase_la_SOURCES = \
        AsyncCallQueue.cc \
        AsyncCallQueue.h \
        AsyncCbdataCalls.h \
+       AsyncFunCalls.h \
        AsyncJob.cc \
        AsyncJob.h \
        AsyncJobCalls.h \
index 71f715ff9572c472463907bdbc012017744668a6..97c5d5414eb789792986afa1741837bb71b28b73 100644 (file)
@@ -44,7 +44,7 @@ ReadersId(String id)
 
 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
 
-Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+Ipc::QueueReader::QueueReader(): popBlocked(false), popSignal(false),
     rateLimit(0), balance(0)
 {
     debugs(54, 7, "constructed " << id);
@@ -157,16 +157,23 @@ Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
 
 void
 Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
+{
+    // Unused remoteProcessId may be useful for at least two optimizations:
+    // * TODO: After QueueReader::popSignal is moved to each OneToOneUniQueue,
+    //   we could clear just the remoteProcessId popSignal, further reducing the
+    //   number of UDS notifications writers have to send.
+    // * We could adjust theLastPopProcessId to try popping from the
+    //   remoteProcessId queue first. That does not seem to help much and might
+    //   introduce some bias, so we do not do that for now.
+    clearAllReaderSignals();
+}
+
+void
+Ipc::BaseMultiQueue::clearAllReaderSignals()
 {
     QueueReader &reader = localReader();
     debugs(54, 7, "reader: " << reader.id);
-
     reader.clearSignal();
-
-    // we got a hint; we could reposition iteration to try popping from the
-    // remoteProcessId queue first; but it does not seem to help much and might
-    // introduce some bias so we do not do that for now:
-    // theLastPopProcessId = remoteProcessId;
 }
 
 const Ipc::QueueReader::Balance &
index 37a76bf2ac939f7214d42b5b1a07dae32e41dcff..6e5763293a772e9419a08b5cdcc78cf1c0740064 100644 (file)
@@ -33,6 +33,9 @@ public:
     /// whether the reader is waiting for a notification signal
     bool blocked() const { return popBlocked.load(); }
 
+    /// \copydoc popSignal
+    bool signaled() const { return popSignal.load(); }
+
     /// marks the reader as blocked, waiting for a notification signal
     void block() { popBlocked.store(true); }
 
@@ -170,6 +173,9 @@ public:
     /// clears the reader notification received by the local process from the remote process
     void clearReaderSignal(const int remoteProcessId);
 
+    /// clears all reader notifications received by the local process
+    void clearAllReaderSignals();
+
     /// picks a process and calls OneToOneUniQueue::pop() using its queue
     template <class Value> bool pop(int &remoteProcessId, Value &value);
 
@@ -590,6 +596,12 @@ BaseMultiQueue::stat(std::ostream &os) const
         const auto &queue = outQueue(processId);
         queue.statOut<Value>(os, theLocalProcessId, processId);
     }
+
+    os << "\n";
+
+    const auto &reader = localReader();
+    os << "  kid" << theLocalProcessId << " reader flags: " <<
+       "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n";
 }
 
 // FewToFewBiQueue