]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Queue.h
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / ipc / Queue.h
index 20986103d9aef628842aa38f5c868bb702e0cd58..f78577bdf021582cda9b32517fb7fc4dce831ab6 100644 (file)
@@ -1,18 +1,23 @@
 /*
- * $Id$
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
  *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef SQUID_IPC_QUEUE_H
 #define SQUID_IPC_QUEUE_H
 
-#include "Array.h"
-#include "Debug.h"
 #include "base/InstanceId.h"
-#include "ipc/AtomicWord.h"
+#include "Debug.h"
+#include "ipc/mem/FlexibleArray.h"
 #include "ipc/mem/Pointer.h"
 #include "util.h"
 
+#include <algorithm>
+#include <atomic>
+
 class String;
 
 namespace Ipc
@@ -26,31 +31,31 @@ public:
     QueueReader(); // the initial state is "blocked without a signal"
 
     /// whether the reader is waiting for a notification signal
-    bool blocked() const { return popBlocked == 1; }
+    bool blocked() const { return popBlocked.load(); }
 
     /// marks the reader as blocked, waiting for a notification signal
-    void block() { popBlocked.swap_if(0, 1); }
+    void block() { popBlocked.store(true); }
 
     /// removes the block() effects
-    void unblock() { popBlocked.swap_if(1, 0); }
+    void unblock() { popBlocked.store(false); }
 
     /// if reader is blocked and not notified, marks the notification signal
     /// as sent and not received, returning true; otherwise, returns false
-    bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
+    bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
 
     /// marks sent reader notification as received (also removes pop blocking)
-    void clearSignal() { unblock(); popSignal.swap_if(1,0); }
+    void clearSignal() { unblock(); popSignal.store(false); }
 
 private:
-    AtomicWord popBlocked; ///< whether the reader is blocked on pop()
-    AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
+    std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop()
+    std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification
 
 public:
-    typedef AtomicWord Rate; ///< pop()s per second
+    typedef std::atomic<int> Rate; ///< pop()s per second
     Rate rateLimit; ///< pop()s per second limit if positive
 
     // we need a signed atomic type because balance may get negative
-    typedef AtomicWordT<int> AtomicSignedMsec;
+    typedef std::atomic<int> AtomicSignedMsec;
     typedef AtomicSignedMsec Balance;
     /// how far ahead the reader is compared to a perfect read/sec event rate
     Balance balance;
@@ -68,7 +73,7 @@ public:
     static size_t SharedMemorySize(const int capacity);
 
     const int theCapacity; /// number of readers
-    QueueReader theReaders[]; /// readers
+    Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
 };
 
 /**
@@ -110,14 +115,25 @@ public:
     /// returns true iff the value was set; the value may be stale!
     template<class Value> bool peek(Value &value) const;
 
+    /// prints incoming queue state; suitable for cache manager reports
+    template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
+    /// prints outgoing queue state; suitable for cache manager reports
+    template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
+
 private:
+    void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
+    void statClose(std::ostream &) const;
+    template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
+    template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
 
-    unsigned int theIn; ///< input index, used only in push()
-    unsigned int theOut; ///< output index, used only in pop()
+    // optimization: these non-std::atomic data members are in shared memory,
+    // but each is used only by one process (aside from obscured reporting)
+    unsigned int theIn; ///< current push() position; reporting aside, used only in push()
+    unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek()
 
-    AtomicWord theSize; ///< number of items in the queue
+    std::atomic<uint32_t> theSize; ///< number of items in the queue
     const unsigned int theMaxItemSize; ///< maximum item size
-    const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+    const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size
 
     char theBuffer[];
 };
@@ -141,6 +157,74 @@ public:
     const int theCapacity; /// number of OneToOneUniQueues
 };
 
+/**
+ * Base class for lockless fixed-capacity bidirectional queues for a
+ * limited number processes.
+ */
+class BaseMultiQueue
+{
+public:
+    BaseMultiQueue(const int aLocalProcessId);
+    virtual ~BaseMultiQueue() {}
+
+    /// clears the reader notification received by the local process from the remote process
+    void clearReaderSignal(const int remoteProcessId);
+
+    /// picks a process and calls OneToOneUniQueue::pop() using its queue
+    template <class Value> bool pop(int &remoteProcessId, Value &value);
+
+    /// calls OneToOneUniQueue::push() using the given process queue
+    template <class Value> bool push(const int remoteProcessId, const Value &value);
+
+    /// peeks at the item likely to be pop()ed next
+    template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+
+    /// prints current state; suitable for cache manager reports
+    template<class Value> void stat(std::ostream &) const;
+
+    /// returns local reader's balance
+    QueueReader::Balance &localBalance() { return localReader().balance; }
+
+    /// returns reader's balance for a given remote process
+    const QueueReader::Balance &balance(const int remoteProcessId) const;
+
+    /// returns local reader's rate limit
+    QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
+
+    /// returns reader's rate limit for a given remote process
+    const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+    /// number of items in incoming queue from a given remote process
+    int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+    /// number of items in outgoing queue to a given remote process
+    int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
+protected:
+    /// incoming queue from a given remote process
+    virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
+    OneToOneUniQueue &inQueue(const int remoteProcessId);
+
+    /// outgoing queue to a given remote process
+    virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
+    OneToOneUniQueue &outQueue(const int remoteProcessId);
+
+    virtual const QueueReader &localReader() const = 0;
+    QueueReader &localReader();
+
+    virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
+    QueueReader &remoteReader(const int remoteProcessId);
+
+    virtual int remotesCount() const = 0;
+    virtual int remotesIdOffset() const = 0;
+
+protected:
+    const int theLocalProcessId; ///< process ID of this queue
+
+private:
+    int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
+};
+
 /**
  * Lockless fixed-capacity bidirectional queue for a limited number
  * processes. Allows communication between two groups of processes:
@@ -149,7 +233,7 @@ public:
  * communicate. Process in each group has a unique integer ID in
  * [groupIdOffset, groupIdOffset + groupSize) range.
  */
-class FewToFewBiQueue
+class FewToFewBiQueue: public BaseMultiQueue
 {
 public:
     typedef OneToOneUniQueue::Full Full;
@@ -186,41 +270,28 @@ public:
     enum Group { groupA = 0, groupB = 1 };
     FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
 
-    Group localGroup() const { return theLocalGroup; }
-    Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
+    /// maximum number of items in the queue
+    static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
 
-    /// clears the reader notification received by the local process from the remote process
-    void clearReaderSignal(const int remoteProcessId);
+    /// finds the oldest item in incoming and outgoing queues between
+    /// us and the given remote process
+    template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
 
-    /// picks a process and calls OneToOneUniQueue::pop() using its queue
-    template <class Value> bool pop(int &remoteProcessId, Value &value);
-
-    /// calls OneToOneUniQueue::push() using the given process queue
-    template <class Value> bool push(const int remoteProcessId, const Value &value);
-
-    // TODO: rename to findOldest() or some such
-    /// calls OneToOneUniQueue::peek() using the given process queue
-    template<class Value> bool peek(const int remoteProcessId, Value &value) const;
-
-    /// returns true if pop() would have probably succeeded but does not pop()
-    bool popReady() const;
-
-    /// returns local reader's balance
-    QueueReader::Balance &localBalance();
-
-    /// returns local reader's rate limit
-    QueueReader::Rate &localRateLimit();
+protected:
+    virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+    virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+    virtual const QueueReader &localReader() const;
+    virtual const QueueReader &remoteReader(const int processId) const;
+    virtual int remotesCount() const;
+    virtual int remotesIdOffset() const;
 
 private:
     bool validProcessId(const Group group, const int processId) const;
     int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
     const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
-    OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
-    QueueReader &reader(const Group group, const int processId);
-    const QueueReader &reader(const Group group, const int processId) const;
     int readerIndex(const Group group, const int processId) const;
-    int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
-    int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
+    Group localGroup() const { return theLocalGroup; }
+    Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
 
 private:
     const Mem::Pointer<Metadata> metadata; ///< shared metadata
@@ -228,10 +299,66 @@ private:
     const Mem::Pointer<QueueReaders> readers; ///< readers array
 
     const Group theLocalGroup; ///< group of this queue
-    const int theLocalProcessId; ///< process ID of this queue
-    int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
 };
 
+/**
+ * Lockless fixed-capacity bidirectional queue for a limited number
+ * processes. Any process may send data to and receive from any other
+ * process (including itself). Each process has a unique integer ID in
+ * [processIdOffset, processIdOffset + processCount) range.
+ */
+class MultiQueue: public BaseMultiQueue
+{
+public:
+    typedef OneToOneUniQueue::Full Full;
+    typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
+private:
+    /// Shared metadata for MultiQueue
+    struct Metadata {
+        Metadata(const int aProcessCount, const int aProcessIdOffset);
+        size_t sharedMemorySize() const { return sizeof(*this); }
+        static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
+
+        const int theProcessCount;
+        const int theProcessIdOffset;
+    };
+
+public:
+    class Owner
+    {
+    public:
+        Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
+        ~Owner();
+
+    private:
+        Mem::Owner<Metadata> *const metadataOwner;
+        Mem::Owner<OneToOneUniQueues> *const queuesOwner;
+        Mem::Owner<QueueReaders> *const readersOwner;
+    };
+
+    static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
+
+    MultiQueue(const String &id, const int localProcessId);
+
+protected:
+    virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+    virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+    virtual const QueueReader &localReader() const;
+    virtual const QueueReader &remoteReader(const int remoteProcessId) const;
+    virtual int remotesCount() const;
+    virtual int remotesIdOffset() const;
+
+private:
+    bool validProcessId(const int processId) const;
+    const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
+    const QueueReader &reader(const int processId) const;
+
+private:
+    const Mem::Pointer<Metadata> metadata; ///< shared metadata
+    const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
+    const Mem::Pointer<QueueReaders> readers; ///< readers array
+};
 
 // OneToOneUniQueue
 
@@ -291,14 +418,98 @@ OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
     if (full())
         throw Full();
 
-    const bool wasEmpty = empty();
     const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
     memcpy(theBuffer + pos, &value, sizeof(value));
-    ++theSize;
+    const bool wasEmpty = !theSize++;
 
     return wasEmpty && (!reader || reader->raiseSignal());
 }
 
+template <class Value>
+void
+OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+    os << "  kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
+    // Nobody can modify our theOut so, after capturing some valid theSize value
+    // in count, we can reliably report all [theOut, theOut+count) items that
+    // were queued at theSize capturing time. We will miss new items push()ed by
+    // the other side, but it is OK -- we report state at the capturing time.
+    const auto count = theSize.load();
+    statOpen(os, "other", "popIndex", count);
+    statSamples<Value>(os, theOut, count);
+    statClose(os);
+}
+
+template <class Value>
+void
+OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+    os << "  kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
+    // Nobody can modify our theIn so, after capturing some valid theSize value
+    // in count, we can reliably report all [theIn-count, theIn) items that were
+    // queued at theSize capturing time. We may report items already pop()ed by
+    // the other side, but that is OK because pop() does not modify items -- it
+    // only increments theOut.
+    const auto count = theSize.load();
+    statOpen(os, "pushIndex", "other", count);
+    statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
+    statClose(os);
+}
+
+/// report a sample of [start, start + size) items
+template <class Value>
+void
+OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
+{
+    if (!count) {
+        os << " ";
+        return;
+    }
+
+    os << ", items: [\n";
+    // report a few leading and trailing items, without repetitions
+    const auto sampleSize = std::min(3U, count); // leading (and max) sample
+    statRange<Value>(os, start, sampleSize);
+    if (sampleSize < count) { // the first sample did not show some items
+        // The `start` offset aside, the first sample reported all items
+        // below the sampleSize offset. The second sample needs to report
+        // the last sampleSize items (i.e. starting at count-sampleSize
+        // offset) except those already reported by the first sample.
+        const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
+        const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
+
+        // but first we print a sample separator, unless there are no items
+        // between the samples or the separator hides the only unsampled item
+        const auto bothSamples = sampleSize + secondSampleSize;
+        if (bothSamples + 1U == count)
+            statRange<Value>(os, start + sampleSize, 1);
+        else if (count > bothSamples)
+            os << "    # ... " << (count - bothSamples) << " items not shown ...\n";
+
+        statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
+    }
+    os << "  ]";
+}
+
+/// statSamples() helper that reports n items from start
+template <class Value>
+void
+OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
+{
+    assert(sizeof(Value) <= theMaxItemSize);
+    auto offset = start;
+    for (uint32_t i = 0; i < n; ++i) {
+        // XXX: Throughout this C++ header, these overflow wrapping tricks work
+        // only because theCapacity currently happens to be a power of 2 (e.g.,
+        // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
+        const auto pos = (offset++ % theCapacity) * theMaxItemSize;
+        Value value;
+        memcpy(&value, theBuffer + pos, sizeof(value));
+        os << "    { ";
+        value.stat(os);
+        os << " },\n";
+    }
+}
 
 // OneToOneUniQueues
 
@@ -316,20 +527,18 @@ OneToOneUniQueues::front() const
     return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
-
-// FewToFewBiQueue
+// BaseMultiQueue
 
 template <class Value>
 bool
-FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
+BaseMultiQueue::pop(int &remoteProcessId, Value &value)
 {
-    // iterate all remote group processes, starting after the one we visited last
-    QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
-    for (int i = 0; i < remoteGroupSize(); ++i) {
-        if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
-            theLastPopProcessId = remoteGroupIdOffset();
-        OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
-        if (queue.pop(value, &localReader)) {
+    // iterate all remote processes, starting after the one we visited last
+    for (int i = 0; i < remotesCount(); ++i) {
+        if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
+            theLastPopProcessId = remotesIdOffset();
+        OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
+        if (queue.pop(value, &localReader())) {
             remoteProcessId = theLastPopProcessId;
             debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
             return true;
@@ -340,34 +549,74 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
 
 template <class Value>
 bool
-FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
+BaseMultiQueue::push(const int remoteProcessId, const Value &value)
 {
-    OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
-    QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
+    OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
+    QueueReader &reader = remoteReader(remoteProcessId);
     debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
-    return remoteQueue.push(value, &remoteReader);
+    return remoteQueue.push(value, &reader);
 }
 
 template <class Value>
 bool
-FewToFewBiQueue::peek(const int remoteProcessId, Value &value) const
+BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
+{
+    // mimic FewToFewBiQueue::pop() but quit just before popping
+    int popProcessId = theLastPopProcessId; // preserve for future pop()
+    for (int i = 0; i < remotesCount(); ++i) {
+        if (++popProcessId >= remotesIdOffset() + remotesCount())
+            popProcessId = remotesIdOffset();
+        const OneToOneUniQueue &queue = inQueue(popProcessId);
+        if (queue.peek(value)) {
+            remoteProcessId = popProcessId;
+            return true;
+        }
+    }
+    return false; // most likely, no process had anything to pop
+}
+
+template <class Value>
+void
+BaseMultiQueue::stat(std::ostream &os) const
+{
+    for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+        const auto &queue = inQueue(processId);
+        queue.statIn<Value>(os, theLocalProcessId, processId);
+    }
+
+    os << "\n";
+
+    for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+        const auto &queue = outQueue(processId);
+        queue.statOut<Value>(os, theLocalProcessId, processId);
+    }
+}
+
+// FewToFewBiQueue
+
+template <class Value>
+bool
+FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
 {
     // we may be called before remote process configured its queue end
     if (!validProcessId(remoteGroup(), remoteProcessId))
         return false;
 
     // we need the oldest value, so start with the incoming, them-to-us queue:
-    const OneToOneUniQueue &inQueue = oneToOneQueue(remoteGroup(), remoteProcessId, theLocalGroup, theLocalProcessId);
-    debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << inQueue.size());
-    if (inQueue.peek(value))
+    const OneToOneUniQueue &in = inQueue(remoteProcessId);
+    debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
+           theLocalProcessId << " at " << in.size());
+    if (in.peek(value))
         return true;
 
     // if the incoming queue is empty, check the outgoing, us-to-them queue:
-    const OneToOneUniQueue &outQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
-    debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << outQueue.size());
-    return outQueue.peek(value);
+    const OneToOneUniQueue &out = outQueue(remoteProcessId);
+    debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
+           remoteProcessId << " at " << out.size());
+    return out.peek(value);
 }
 
 } // namespace Ipc
 
 #endif // SQUID_IPC_QUEUE_H
+