WARNING: communication with ... may be too slow or disrupted...
WARNING: abandoning ... I/Os
ERROR: worker I/O push queue for ... overflow...
ERROR: Collapsed forwarding queue overflow...
SMP queues rely on the shared memory QueueReader::popSignal flag to
reduce the number of UDS messages that queue readers and writers need to
send each other. If the flag is true but there is no corresponding "wake
up, you have new queued items to read" UDS message for the reader, the
reader may stall. This happens when the reader restarts (e.g., after
hitting an assertion) while the flag is true. A stalled queue reader
leads to delays and queue overflows:
* When the problem affects worker-disker queues, disk I/O delays under
the hard-coded 7-second timeout are not reported to the admin but may
affect user experience. Larger delays trigger level-1 WARNINGs. Push
queue overflows trigger level-1 ERRORs.
* Transient worker-worker queue problems may stall concurrent
transactions that are reading from the cache entry being written by
another process. Overflows trigger level-1 ERRORs.
The restarted worker usually starts working just fine because it does
not expect any messages. A busy restarted worker may also appear to
continue working fine because workers always pop queued items before
pushing new ones -- as long as the worker queues new items, it will see
and pop responses to earlier requests, masking the problem. However, the
"stuck popSignal" problem never goes away: Squid only clears the flag
when receiving a notification, but sending new notifications is blocked
by that stuck flag.
Upon kid start, we now clear popSignal (to reflect the actual
communication state) and empty the queue (to reduce overflows). Since
commit
4c21861, any corresponding in-flight UDS queue notification is
ignored because it was sent to the previous process playing the same kid
role. The queue writer will see the false popSignal flag and send a new
notification when queuing a new item, preventing queue stalls.
Also properly ignore stale disker responses about cache_dirs we have not
opened yet, especially since we are now trying to empty the queues ASAP
after a restart, before Coordinator has a chance to inform us about
available diskers, populating the IpcIoFiles container. We already have
similar protection from stale UDS messages and from stale disker queue
messages about _opened_ cache_dirs ("LATE disker response to...").
Also report SMP queue flags in mgr:store_queues.
/* DEBUG: section 17 Request Forwarding */
#include "squid.h"
+#include "base/AsyncFunCalls.h"
#include "CollapsedForwarding.h"
#include "globals.h"
#include "ipc/mem/Segment.h"
CollapsedForwarding::Init()
{
Must(!queue.get());
- if (UsingSmp() && IamWorkerProcess())
+ if (UsingSmp() && IamWorkerProcess()) {
queue.reset(new Queue(ShmLabel, KidIdentifier));
+ AsyncCall::Pointer callback = asyncCall(17, 4, "CollapsedForwarding::HandleNewDataAtStart",
+ NullaryFunDialer(&CollapsedForwarding::HandleNewDataAtStart));
+ ScheduleCallHere(callback);
+ }
}
void
HandleNewData("after notification");
}
+/// Handle queued IPC messages for the first time in this process lifetime, when
+/// the queue may be reflecting the state of our killed predecessor.
+void
+CollapsedForwarding::HandleNewDataAtStart()
+{
+ /// \sa IpcIoFile::HandleMessagesAtStart() -- duplicates this logic
+ queue->clearAllReaderSignals();
+ HandleNewData("at start");
+}
+
void
CollapsedForwarding::StatQueue(std::ostream &os)
{
static void StatQueue(std::ostream &);
private:
+ static void HandleNewDataAtStart();
+
typedef Ipc::MultiQueue Queue;
static std::unique_ptr<Queue> queue; ///< IPC queue
};
/* DEBUG: section 47 Store Directory Routines */
#include "squid.h"
+#include "base/AsyncFunCalls.h"
#include "base/CodeContext.h"
#include "base/RunnersRegistry.h"
#include "base/TextException.h"
ioRequestor = callback;
Must(diskId < 0); // we do not know our disker yet
- if (!queue.get())
+ if (!queue.get()) {
queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
+ AsyncCall::Pointer call = asyncCall(79, 4, "IpcIoFile::HandleMessagesAtStart",
+ NullaryFunDialer(&IpcIoFile::HandleMessagesAtStart));
+ ScheduleCallHere(call);
+ }
if (IamDiskProcess()) {
error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
int diskId;
while (queue->pop(diskId, ipcIo)) {
const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
- Must(i != IpcIoFiles.end()); // TODO: warn but continue
+ if (i == IpcIoFiles.end()) {
+ debugs(47, 5, "ignoring disk response " << SipcIo(KidIdentifier, ipcIo, diskId) << ": the file is not open");
+ continue;
+ }
i->second->handleResponse(ipcIo);
}
}
HandleResponses("after notification");
}
+/// \copydoc CollapsedForwarding::HandleNewDataAtStart()
+void
+IpcIoFile::HandleMessagesAtStart()
+{
+ /// \sa CollapsedForwarding::HandleNewDataAtStart() -- duplicates this logic
+ queue->clearAllReaderSignals();
+ if (IamDiskProcess())
+ DiskerHandleRequests();
+ else
+ HandleResponses("at start");
+}
+
void
IpcIoFile::StatQueue(std::ostream &os)
{
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
static bool WaitBeforePop();
+ static void HandleMessagesAtStart();
+
private:
const String dbName; ///< the name of the file we are managing
const pid_t myPid; ///< optimization: cached process ID of our process
--- /dev/null
+/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_BASE_ASYNCFUNCALLS_H
+#define SQUID_BASE_ASYNCFUNCALLS_H
+
+#include "base/AsyncCall.h"
+
+#include <iostream>
+
+/// Calls a function without arguments. See also: NullaryMemFunT.
+class NullaryFunDialer: public CallDialer
+{
+public:
+ using Handler = void ();
+
+ explicit NullaryFunDialer(Handler * const aHandler): handler(aHandler) {}
+
+ /* CallDialer API */
+ bool canDial(AsyncCall &) { return bool(handler); }
+ void dial(AsyncCall &) { handler(); }
+ virtual void print(std::ostream &os) const override { os << "()"; }
+
+private:
+ Handler *handler; ///< the function to call (or nil)
+};
+
+#endif /* SQUID_BASE_ASYNCFUNCALLS_H */
+
AsyncCallQueue.cc \
AsyncCallQueue.h \
AsyncCbdataCalls.h \
+ AsyncFunCalls.h \
AsyncJob.cc \
AsyncJob.h \
AsyncJobCalls.h \
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+Ipc::QueueReader::QueueReader(): popBlocked(false), popSignal(false),
rateLimit(0), balance(0)
{
debugs(54, 7, "constructed " << id);
void
Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
+{
+ // Unused remoteProcessId may be useful for at least two optimizations:
+ // * TODO: After QueueReader::popSignal is moved to each OneToOneUniQueue,
+ // we could clear just the remoteProcessId popSignal, further reducing the
+ // number of UDS notifications writers have to send.
+ // * We could adjust theLastPopProcessId to try popping from the
+ // remoteProcessId queue first. That does not seem to help much and might
+ // introduce some bias, so we do not do that for now.
+ clearAllReaderSignals();
+}
+
+void
+Ipc::BaseMultiQueue::clearAllReaderSignals()
{
QueueReader &reader = localReader();
debugs(54, 7, "reader: " << reader.id);
-
reader.clearSignal();
-
- // we got a hint; we could reposition iteration to try popping from the
- // remoteProcessId queue first; but it does not seem to help much and might
- // introduce some bias so we do not do that for now:
- // theLastPopProcessId = remoteProcessId;
}
const Ipc::QueueReader::Balance &
/// whether the reader is waiting for a notification signal
bool blocked() const { return popBlocked.load(); }
+ /// \copydoc popSignal
+ bool signaled() const { return popSignal.load(); }
+
/// marks the reader as blocked, waiting for a notification signal
void block() { popBlocked.store(true); }
/// clears the reader notification received by the local process from the remote process
void clearReaderSignal(const int remoteProcessId);
+ /// clears all reader notifications received by the local process
+ void clearAllReaderSignals();
+
/// picks a process and calls OneToOneUniQueue::pop() using its queue
template <class Value> bool pop(int &remoteProcessId, Value &value);
const auto &queue = outQueue(processId);
queue.statOut<Value>(os, theLocalProcessId, processId);
}
+
+ os << "\n";
+
+ const auto &reader = localReader();
+ os << " kid" << theLocalProcessId << " reader flags: " <<
+ "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n";
}
// FewToFewBiQueue