From: Eduard Bagdasaryan Date: Mon, 28 Mar 2022 04:51:53 +0000 (+0000) Subject: Kid restart leads to persistent queue overflows, delays/timeouts (#706) X-Git-Tag: SQUID_6_0_1~220 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=5faec1a1a8c8deca4e72f073eb9092390ebf66c7;p=thirdparty%2Fsquid.git Kid restart leads to persistent queue overflows, delays/timeouts (#706) WARNING: communication with ... may be too slow or disrupted... WARNING: abandoning ... I/Os ERROR: worker I/O push queue for ... overflow... ERROR: Collapsed forwarding queue overflow... SMP queues rely on the shared memory QueueReader::popSignal flag to reduce the number of UDS messages that queue readers and writers need to send each other. If the flag is true but there is no corresponding "wake up, you have new queued items to read" UDS message for the reader, the reader may stall. This happens when the reader restarts (e.g., after hitting an assertion) while the flag is true. A stalled queue reader leads to delays and queue overflows: * When the problem affects worker-disker queues, disk I/O delays under the hard-coded 7-second timeout are not reported to the admin but may affect user experience. Larger delays trigger level-1 WARNINGs. Push queue overflows trigger level-1 ERRORs. * Transient worker-worker queue problems may stall concurrent transactions that are reading from the cache entry being written by another process. Overflows trigger level-1 ERRORs. The restarted worker usually starts working just fine because it does not expect any messages. A busy restarted worker may also appear to continue working fine because workers always pop queued items before pushing new ones -- as long as the worker queues new items, it will see and pop responses to earlier requests, masking the problem. However, the "stuck popSignal" problem never goes away: Squid only clears the flag when receiving a notification, but sending new notifications is blocked by that stuck flag. Upon kid start, we now clear popSignal (to reflect the actual communication state) and empty the queue (to reduce overflows). Since commit 4c21861, any corresponding in-flight UDS queue notification is ignored because it was sent to the previous process playing the same kid role. The queue writer will see the false popSignal flag and send a new notification when queuing a new item, preventing queue stalls. Also properly ignore stale disker responses about cache_dirs we have not opened yet, especially since we are now trying to empty the queues ASAP after a restart, before Coordinator has a chance to inform us about available diskers, populating the IpcIoFiles container. We already have similar protection from stale UDS messages and from stale disker queue messages about _opened_ cache_dirs ("LATE disker response to..."). Also report SMP queue flags in mgr:store_queues. --- diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc index 5ad397c0ee..2c002d9549 100644 --- a/src/CollapsedForwarding.cc +++ b/src/CollapsedForwarding.cc @@ -9,6 +9,7 @@ /* DEBUG: section 17 Request Forwarding */ #include "squid.h" +#include "base/AsyncFunCalls.h" #include "CollapsedForwarding.h" #include "globals.h" #include "ipc/mem/Segment.h" @@ -57,8 +58,12 @@ void CollapsedForwarding::Init() { Must(!queue.get()); - if (UsingSmp() && IamWorkerProcess()) + if (UsingSmp() && IamWorkerProcess()) { queue.reset(new Queue(ShmLabel, KidIdentifier)); + AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart", + NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart)); + ScheduleCallHere(callback); + } } void @@ -146,6 +151,16 @@ CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) HandleNewData("after notification"); } +/// Handle queued IPC messages for the first time in this process lifetime, when +/// the queue may be reflecting the state of our killed predecessor. +void +CollapsedForwarding::HandleNewDataAtStart() +{ + /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic + queue->clearAllReaderSignals(); + HandleNewData("at start"); +} + void CollapsedForwarding::StatQueue(std::ostream &os) { diff --git a/src/CollapsedForwarding.h b/src/CollapsedForwarding.h index 7e2acfcc94..0e1760b4fc 100644 --- a/src/CollapsedForwarding.h +++ b/src/CollapsedForwarding.h @@ -46,6 +46,8 @@ public: static void StatQueue(std::ostream &); private: + static void HandleNewDataAtStart(); + typedef Ipc::MultiQueue Queue; static std::unique_ptr queue; ///< IPC queue }; diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 54e6527549..e9f54a3a25 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -9,6 +9,7 @@ /* DEBUG: section 47 Store Directory Routines */ #include "squid.h" +#include "base/AsyncFunCalls.h" #include "base/CodeContext.h" #include "base/RunnersRegistry.h" #include "base/TextException.h" @@ -129,8 +130,12 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) ioRequestor = callback; Must(diskId < 0); // we do not know our disker yet - if (!queue.get()) + if (!queue.get()) { queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); + AsyncCall::Pointer call = asyncCall(79, 4, "IpcIoFile::HandleMessagesAtStart", + NullaryFunDialer(&IpcIoFile::HandleMessagesAtStart)); + ScheduleCallHere(call); + } if (IamDiskProcess()) { error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode); @@ -475,7 +480,10 @@ IpcIoFile::HandleResponses(const char *const when) int diskId; while (queue->pop(diskId, ipcIo)) { const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); - Must(i != IpcIoFiles.end()); // TODO: warn but continue + if (i == IpcIoFiles.end()) { + debugs(47, 5, "ignoring disk response " << SipcIo(KidIdentifier, ipcIo, diskId) << ": the file is not open"); + continue; + } i->second->handleResponse(ipcIo); } } @@ -525,6 +533,18 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) HandleResponses("after notification"); } +/// \copydoc CollapsedForwarding::HandleNewDataAtStart() +void +IpcIoFile::HandleMessagesAtStart() +{ + /// \sa CollapsedForwarding::HandleNewDataAtStart() -- duplicates this logic + queue->clearAllReaderSignals(); + if (IamDiskProcess()) + DiskerHandleRequests(); + else + HandleResponses("at start"); +} + void IpcIoFile::StatQueue(std::ostream &os) { diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index ddd2f70ead..665f0d33e1 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -128,6 +128,8 @@ private: static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); static bool WaitBeforePop(); + static void HandleMessagesAtStart(); + private: const String dbName; ///< the name of the file we are managing const pid_t myPid; ///< optimization: cached process ID of our process diff --git a/src/base/AsyncFunCalls.h b/src/base/AsyncFunCalls.h new file mode 100644 index 0000000000..42af64ce74 --- /dev/null +++ b/src/base/AsyncFunCalls.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) 1996-2021 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_BASE_ASYNCFUNCALLS_H +#define SQUID_BASE_ASYNCFUNCALLS_H + +#include "base/AsyncCall.h" + +#include + +/// Calls a function without arguments. See also: NullaryMemFunT. +class NullaryFunDialer: public CallDialer +{ +public: + using Handler = void (); + + explicit NullaryFunDialer(Handler * const aHandler): handler(aHandler) {} + + /* CallDialer API */ + bool canDial(AsyncCall &) { return bool(handler); } + void dial(AsyncCall &) { handler(); } + virtual void print(std::ostream &os) const override { os << "()"; } + +private: + Handler *handler; ///< the function to call (or nil) +}; + +#endif /* SQUID_BASE_ASYNCFUNCALLS_H */ + diff --git a/src/base/Makefile.am b/src/base/Makefile.am index 3dd32c1341..a5d35bd26c 100644 --- a/src/base/Makefile.am +++ b/src/base/Makefile.am @@ -16,6 +16,7 @@ libbase_la_SOURCES = \ AsyncCallQueue.cc \ AsyncCallQueue.h \ AsyncCbdataCalls.h \ + AsyncFunCalls.h \ AsyncJob.cc \ AsyncJob.h \ AsyncJobCalls.h \ diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 71f715ff95..97c5d5414e 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -44,7 +44,7 @@ ReadersId(String id) InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); -Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false), +Ipc::QueueReader::QueueReader(): popBlocked(false), popSignal(false), rateLimit(0), balance(0) { debugs(54, 7, "constructed " << id); @@ -157,16 +157,23 @@ Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId): void Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/) +{ + // Unused remoteProcessId may be useful for at least two optimizations: + // * TODO: After QueueReader::popSignal is moved to each OneToOneUniQueue, + // we could clear just the remoteProcessId popSignal, further reducing the + // number of UDS notifications writers have to send. + // * We could adjust theLastPopProcessId to try popping from the + // remoteProcessId queue first. That does not seem to help much and might + // introduce some bias, so we do not do that for now. + clearAllReaderSignals(); +} + +void +Ipc::BaseMultiQueue::clearAllReaderSignals() { QueueReader &reader = localReader(); debugs(54, 7, "reader: " << reader.id); - reader.clearSignal(); - - // we got a hint; we could reposition iteration to try popping from the - // remoteProcessId queue first; but it does not seem to help much and might - // introduce some bias so we do not do that for now: - // theLastPopProcessId = remoteProcessId; } const Ipc::QueueReader::Balance & diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 37a76bf2ac..6e5763293a 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -33,6 +33,9 @@ public: /// whether the reader is waiting for a notification signal bool blocked() const { return popBlocked.load(); } + /// \copydoc popSignal + bool signaled() const { return popSignal.load(); } + /// marks the reader as blocked, waiting for a notification signal void block() { popBlocked.store(true); } @@ -170,6 +173,9 @@ public: /// clears the reader notification received by the local process from the remote process void clearReaderSignal(const int remoteProcessId); + /// clears all reader notifications received by the local process + void clearAllReaderSignals(); + /// picks a process and calls OneToOneUniQueue::pop() using its queue template bool pop(int &remoteProcessId, Value &value); @@ -590,6 +596,12 @@ BaseMultiQueue::stat(std::ostream &os) const const auto &queue = outQueue(processId); queue.statOut(os, theLocalProcessId, processId); } + + os << "\n"; + + const auto &reader = localReader(); + os << " kid" << theLocalProcessId << " reader flags: " << + "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n"; } // FewToFewBiQueue