/*
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID_IPC_QUEUE_H
#define SQUID_IPC_QUEUE_H
#include "base/InstanceId.h"
-#include "base/Vector.h"
#include "Debug.h"
-#include "ipc/AtomicWord.h"
#include "ipc/mem/FlexibleArray.h"
#include "ipc/mem/Pointer.h"
#include "util.h"
+#include <algorithm>
+#include <atomic>
+
class String;
namespace Ipc
QueueReader(); // the initial state is "blocked without a signal"
/// whether the reader is waiting for a notification signal
- bool blocked() const { return popBlocked == 1; }
+ bool blocked() const { return popBlocked.load(); }
/// marks the reader as blocked, waiting for a notification signal
- void block() { popBlocked.swap_if(0, 1); }
+ void block() { popBlocked.store(true); }
/// removes the block() effects
- void unblock() { popBlocked.swap_if(1, 0); }
+ void unblock() { popBlocked.store(false); }
/// if reader is blocked and not notified, marks the notification signal
/// as sent and not received, returning true; otherwise, returns false
- bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
+ bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
/// marks sent reader notification as received (also removes pop blocking)
- void clearSignal() { unblock(); popSignal.swap_if(1,0); }
+ void clearSignal() { unblock(); popSignal.store(false); }
private:
- Atomic::Word popBlocked; ///< whether the reader is blocked on pop()
- Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification
+ std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop()
+ std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification
public:
- typedef Atomic::Word Rate; ///< pop()s per second
+ typedef std::atomic<int> Rate; ///< pop()s per second
Rate rateLimit; ///< pop()s per second limit if positive
// we need a signed atomic type because balance may get negative
- typedef Atomic::WordT<int> AtomicSignedMsec;
+ typedef std::atomic<int> AtomicSignedMsec;
typedef AtomicSignedMsec Balance;
/// how far ahead the reader is compared to a perfect read/sec event rate
Balance balance;
/// returns true iff the value was set; the value may be stale!
template<class Value> bool peek(Value &value) const;
+ /// prints incoming queue state; suitable for cache manager reports
+ template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
+ /// prints outgoing queue state; suitable for cache manager reports
+ template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
+
private:
+ void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
+ void statClose(std::ostream &) const;
+ template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
+ template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
- unsigned int theIn; ///< input index, used only in push()
- unsigned int theOut; ///< output index, used only in pop()
+ // optimization: these non-std::atomic data members are in shared memory,
+ // but each is used only by one process (aside from obscured reporting)
+ unsigned int theIn; ///< current push() position; reporting aside, used only in push()
+ unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek()
- Atomic::Word theSize; ///< number of items in the queue
+ std::atomic<uint32_t> theSize; ///< number of items in the queue
const unsigned int theMaxItemSize; ///< maximum item size
- const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+ const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size
char theBuffer[];
};
/// peeks at the item likely to be pop()ed next
template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+ /// prints current state; suitable for cache manager reports
+ template<class Value> void stat(std::ostream &) const;
+
/// returns local reader's balance
QueueReader::Balance &localBalance() { return localReader().balance; }
return wasEmpty && (!reader || reader->raiseSignal());
}
+template <class Value>
+void
+OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+ os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
+ // Nobody can modify our theOut so, after capturing some valid theSize value
+ // in count, we can reliably report all [theOut, theOut+count) items that
+ // were queued at theSize capturing time. We will miss new items push()ed by
+ // the other side, but it is OK -- we report state at the capturing time.
+ const auto count = theSize.load();
+ statOpen(os, "other", "popIndex", count);
+ statSamples<Value>(os, theOut, count);
+ statClose(os);
+}
+
+template <class Value>
+void
+OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+ os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
+ // Nobody can modify our theIn so, after capturing some valid theSize value
+ // in count, we can reliably report all [theIn-count, theIn) items that were
+ // queued at theSize capturing time. We may report items already pop()ed by
+ // the other side, but that is OK because pop() does not modify items -- it
+ // only increments theOut.
+ const auto count = theSize.load();
+ statOpen(os, "pushIndex", "other", count);
+ statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
+ statClose(os);
+}
+
+/// report a sample of [start, start + size) items
+template <class Value>
+void
+OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
+{
+ if (!count) {
+ os << " ";
+ return;
+ }
+
+ os << ", items: [\n";
+ // report a few leading and trailing items, without repetitions
+ const auto sampleSize = std::min(3U, count); // leading (and max) sample
+ statRange<Value>(os, start, sampleSize);
+ if (sampleSize < count) { // the first sample did not show some items
+ // The `start` offset aside, the first sample reported all items
+ // below the sampleSize offset. The second sample needs to report
+ // the last sampleSize items (i.e. starting at count-sampleSize
+ // offset) except those already reported by the first sample.
+ const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
+ const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
+
+ // but first we print a sample separator, unless there are no items
+ // between the samples or the separator hides the only unsampled item
+ const auto bothSamples = sampleSize + secondSampleSize;
+ if (bothSamples + 1U == count)
+ statRange<Value>(os, start + sampleSize, 1);
+ else if (count > bothSamples)
+ os << " # ... " << (count - bothSamples) << " items not shown ...\n";
+
+ statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
+ }
+ os << " ]";
+}
+
+/// statSamples() helper that reports n items from start
+template <class Value>
+void
+OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
+{
+ assert(sizeof(Value) <= theMaxItemSize);
+ auto offset = start;
+ for (uint32_t i = 0; i < n; ++i) {
+ // XXX: Throughout this C++ header, these overflow wrapping tricks work
+ // only because theCapacity currently happens to be a power of 2 (e.g.,
+ // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
+ const auto pos = (offset++ % theCapacity) * theMaxItemSize;
+ Value value;
+ memcpy(&value, theBuffer + pos, sizeof(value));
+ os << " { ";
+ value.stat(os);
+ os << " },\n";
+ }
+}
+
// OneToOneUniQueues
inline OneToOneUniQueue &
return false; // most likely, no process had anything to pop
}
+template <class Value>
+void
+BaseMultiQueue::stat(std::ostream &os) const
+{
+ for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+ const auto &queue = inQueue(processId);
+ queue.statIn<Value>(os, theLocalProcessId, processId);
+ }
+
+ os << "\n";
+
+ for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+ const auto &queue = outQueue(processId);
+ queue.statOut<Value>(os, theLocalProcessId, processId);
+ }
+}
+
// FewToFewBiQueue
template <class Value>
} // namespace Ipc
#endif // SQUID_IPC_QUEUE_H
+