/*
- * $Id$
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef SQUID_IPC_QUEUE_H
#define SQUID_IPC_QUEUE_H
-#include "Array.h"
#include "base/InstanceId.h"
-#include "ipc/AtomicWord.h"
-#include "ipc/mem/Segment.h"
+#include "Debug.h"
+#include "ipc/mem/FlexibleArray.h"
+#include "ipc/mem/Pointer.h"
#include "util.h"
+#include <algorithm>
+#include <atomic>
+
class String;
+namespace Ipc
+{
+
/// State of the reading end of a queue (i.e., of the code calling pop()).
/// Multiple queues attached to one reader share this state.
-class QueueReader {
+class QueueReader
+{
public:
QueueReader(); // the initial state is "blocked without a signal"
/// whether the reader is waiting for a notification signal
- bool blocked() const { return popBlocked == 1; }
+ bool blocked() const { return popBlocked.load(); }
/// marks the reader as blocked, waiting for a notification signal
- void block() { popBlocked.swap_if(0, 1); }
+ void block() { popBlocked.store(true); }
/// removes the block() effects
- void unblock() { popBlocked.swap_if(1, 0); }
+ void unblock() { popBlocked.store(false); }
/// if reader is blocked and not notified, marks the notification signal
/// as sent and not received, returning true; otherwise, returns false
- bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
+ bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
/// marks sent reader notification as received (also removes pop blocking)
- void clearSignal() { unblock(); popSignal.swap_if(1,0); }
+ void clearSignal() { unblock(); popSignal.store(false); }
private:
- AtomicWord popBlocked; ///< whether the reader is blocked on pop()
- AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
+ std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop()
+ std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification
public:
+ typedef std::atomic<int> Rate; ///< pop()s per second
+ Rate rateLimit; ///< pop()s per second limit if positive
+
+ // we need a signed atomic type because balance may get negative
+ typedef std::atomic<int> AtomicSignedMsec;
+ typedef AtomicSignedMsec Balance;
+ /// how far ahead the reader is compared to a perfect read/sec event rate
+ Balance balance;
+
/// unique ID for debugging which reader is used (works across processes)
const InstanceId<QueueReader> id;
};
+/// shared array of QueueReaders
+class QueueReaders
+{
+public:
+ QueueReaders(const int aCapacity);
+ size_t sharedMemorySize() const;
+ static size_t SharedMemorySize(const int capacity);
+
+ const int theCapacity; /// number of readers
+ Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
+};
/**
* Lockless fixed-capacity queue for a single writer and a single reader.
* queue is full, the writer will just not push and come back later (with a
* different value). We can add support for blocked writers if needed.
*/
-class OneToOneUniQueue {
+class OneToOneUniQueue
+{
public:
// pop() and push() exceptions; TODO: use TextException instead
class Full {};
class ItemTooLarge {};
- OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
- OneToOneUniQueue(const String &id);
+ OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
- unsigned int maxItemSize() const { return shared->theMaxItemSize; }
- int size() const { return shared->theSize; }
- int capacity() const { return shared->theCapacity; }
+ unsigned int maxItemSize() const { return theMaxItemSize; }
+ int size() const { return theSize; }
+ int capacity() const { return theCapacity; }
+ int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
- bool empty() const { return !shared->theSize; }
- bool full() const { return shared->theSize == shared->theCapacity; }
+ bool empty() const { return !theSize; }
+ bool full() const { return theSize == theCapacity; }
static int Bytes2Items(const unsigned int maxItemSize, int size);
static int Items2Bytes(const unsigned int maxItemSize, const int size);
/// returns true iff the value was set; [un]blocks the reader as needed
- template<class Value> bool pop(Value &value);
+ template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
/// returns true iff the caller must notify the reader of the pushed item
- template<class Value> bool push(const Value &value);
+ template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
- QueueReader &reader();
- void reader(QueueReader *aReader);
+ /// returns true iff the value was set; the value may be stale!
+ template<class Value> bool peek(Value &value) const;
+
+ /// prints incoming queue state; suitable for cache manager reports
+ template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
+ /// prints outgoing queue state; suitable for cache manager reports
+ template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
private:
- struct Shared {
- Shared(const unsigned int aMaxItemSize, const int aCapacity);
+ void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
+ void statClose(std::ostream &) const;
+ template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
+ template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
- unsigned int theIn; ///< input index, used only in push()
- unsigned int theOut; ///< output index, used only in pop()
+ // optimization: these non-std::atomic data members are in shared memory,
+ // but each is used only by one process (aside from obscured reporting)
+ unsigned int theIn; ///< current push() position; reporting aside, used only in push()
+ unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek()
- AtomicWord theSize; ///< number of items in the queue
- const unsigned int theMaxItemSize; ///< maximum item size
- const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+ std::atomic<uint32_t> theSize; ///< number of items in the queue
+ const unsigned int theMaxItemSize; ///< maximum item size
+ const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size
- char theBuffer[];
- };
+ char theBuffer[];
+};
+
+/// shared array of OneToOneUniQueues
+class OneToOneUniQueues
+{
+public:
+ OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
- Ipc::Mem::Segment shm; ///< shared memory segment
- Shared *shared; ///< pointer to shared memory
- QueueReader *reader_; ///< the state of the code popping from this queue
+ size_t sharedMemorySize() const;
+ static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
+
+ const OneToOneUniQueue &operator [](const int index) const;
+ inline OneToOneUniQueue &operator [](const int index);
+
+private:
+ inline const OneToOneUniQueue &front() const;
+
+public:
+ const int theCapacity; /// number of OneToOneUniQueues
};
-/// Lockless fixed-capacity bidirectional queue for two processes.
-class OneToOneBiQueue {
+/**
+ * Base class for lockless fixed-capacity bidirectional queues for a
+ * limited number processes.
+ */
+class BaseMultiQueue
+{
public:
- typedef OneToOneUniQueue::Full Full;
- typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+ BaseMultiQueue(const int aLocalProcessId);
+ virtual ~BaseMultiQueue() {}
+
+ /// clears the reader notification received by the local process from the remote process
+ void clearReaderSignal(const int remoteProcessId);
+
+ /// picks a process and calls OneToOneUniQueue::pop() using its queue
+ template <class Value> bool pop(int &remoteProcessId, Value &value);
+
+ /// calls OneToOneUniQueue::push() using the given process queue
+ template <class Value> bool push(const int remoteProcessId, const Value &value);
+
+ /// peeks at the item likely to be pop()ed next
+ template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+
+ /// prints current state; suitable for cache manager reports
+ template<class Value> void stat(std::ostream &) const;
+
+ /// returns local reader's balance
+ QueueReader::Balance &localBalance() { return localReader().balance; }
+
+ /// returns reader's balance for a given remote process
+ const QueueReader::Balance &balance(const int remoteProcessId) const;
- /// Create a new shared queue.
- OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
- OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
+ /// returns local reader's rate limit
+ QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
- void readers(QueueReader *r1, QueueReader *r2);
- void clearReaderSignal();
+ /// returns reader's rate limit for a given remote process
+ const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
- /* wrappers to call the right OneToOneUniQueue method for this process */
- template<class Value> bool pop(Value &value) { return popQueue->pop(value); }
- template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
+ /// number of items in incoming queue from a given remote process
+ int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
-//private:
- OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
- OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
+ /// number of items in outgoing queue to a given remote process
+ int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
+protected:
+ /// incoming queue from a given remote process
+ virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
+ OneToOneUniQueue &inQueue(const int remoteProcessId);
+
+ /// outgoing queue to a given remote process
+ virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
+ OneToOneUniQueue &outQueue(const int remoteProcessId);
+
+ virtual const QueueReader &localReader() const = 0;
+ QueueReader &localReader();
+
+ virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
+ QueueReader &remoteReader(const int remoteProcessId);
+
+ virtual int remotesCount() const = 0;
+ virtual int remotesIdOffset() const = 0;
+
+protected:
+ const int theLocalProcessId; ///< process ID of this queue
+
+private:
+ int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
};
/**
* Lockless fixed-capacity bidirectional queue for a limited number
- * pricesses. Implements a star topology: Many worker processes
- * communicate with the one central process. The central process uses
- * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
- * created with the Attach() method. Each worker has a unique integer
- * ID in [1, workerCount] range.
+ * processes. Allows communication between two groups of processes:
+ * any process in one group may send data to and receive from any
+ * process in another group, but processes in the same group can not
+ * communicate. Process in each group has a unique integer ID in
+ * [groupIdOffset, groupIdOffset + groupSize) range.
*/
-class FewToOneBiQueue {
+class FewToFewBiQueue: public BaseMultiQueue
+{
public:
- typedef OneToOneBiQueue::Full Full;
- typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
+ typedef OneToOneUniQueue::Full Full;
+ typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
+private:
+ /// Shared metadata for FewToFewBiQueue
+ struct Metadata {
+ Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
+ size_t sharedMemorySize() const { return sizeof(*this); }
+ static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
+
+ const int theGroupASize;
+ const int theGroupAIdOffset;
+ const int theGroupBSize;
+ const int theGroupBIdOffset;
+ };
+
+public:
+ class Owner
+ {
+ public:
+ Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
+ ~Owner();
+
+ private:
+ Mem::Owner<Metadata> *const metadataOwner;
+ Mem::Owner<OneToOneUniQueues> *const queuesOwner;
+ Mem::Owner<QueueReaders> *const readersOwner;
+ };
- FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
- static OneToOneBiQueue *Attach(const String &id, const int workerId);
- ~FewToOneBiQueue();
+ static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
- bool validWorkerId(const int workerId) const;
- int workerCount() const { return theWorkerCount; }
+ enum Group { groupA = 0, groupB = 1 };
+ FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
- /// clears the reader notification received by the disker from worker
- void clearReaderSignal(int workerId);
+ /// maximum number of items in the queue
+ static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
- /// picks a worker and calls OneToOneUniQueue::pop() using its queue
- template <class Value> bool pop(int &workerId, Value &value);
+ /// finds the oldest item in incoming and outgoing queues between
+ /// us and the given remote process
+ template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
- /// calls OneToOneUniQueue::push() using the given worker queue
- template <class Value> bool push(const int workerId, const Value &value);
+protected:
+ virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+ virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+ virtual const QueueReader &localReader() const;
+ virtual const QueueReader &remoteReader(const int processId) const;
+ virtual int remotesCount() const;
+ virtual int remotesIdOffset() const;
-//private: XXX: make private by moving pop/push debugging into pop/push
- int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
- Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
- const int theWorkerCount; ///< the total number of workers
+private:
+ bool validProcessId(const Group group, const int processId) const;
+ int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+ const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+ int readerIndex(const Group group, const int processId) const;
+ Group localGroup() const { return theLocalGroup; }
+ Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
- Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
- QueueReader *reader; ///< the state of the code popping from all biQueues
+private:
+ const Mem::Pointer<Metadata> metadata; ///< shared metadata
+ const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
+ const Mem::Pointer<QueueReaders> readers; ///< readers array
- enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
+ const Group theLocalGroup; ///< group of this queue
};
+/**
+ * Lockless fixed-capacity bidirectional queue for a limited number
+ * processes. Any process may send data to and receive from any other
+ * process (including itself). Each process has a unique integer ID in
+ * [processIdOffset, processIdOffset + processCount) range.
+ */
+class MultiQueue: public BaseMultiQueue
+{
+public:
+ typedef OneToOneUniQueue::Full Full;
+ typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
+private:
+ /// Shared metadata for MultiQueue
+ struct Metadata {
+ Metadata(const int aProcessCount, const int aProcessIdOffset);
+ size_t sharedMemorySize() const { return sizeof(*this); }
+ static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
+
+ const int theProcessCount;
+ const int theProcessIdOffset;
+ };
+
+public:
+ class Owner
+ {
+ public:
+ Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
+ ~Owner();
+
+ private:
+ Mem::Owner<Metadata> *const metadataOwner;
+ Mem::Owner<OneToOneUniQueues> *const queuesOwner;
+ Mem::Owner<QueueReaders> *const readersOwner;
+ };
+
+ static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
+
+ MultiQueue(const String &id, const int localProcessId);
+
+protected:
+ virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+ virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+ virtual const QueueReader &localReader() const;
+ virtual const QueueReader &remoteReader(const int remoteProcessId) const;
+ virtual int remotesCount() const;
+ virtual int remotesIdOffset() const;
+
+private:
+ bool validProcessId(const int processId) const;
+ const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
+ const QueueReader &reader(const int processId) const;
+
+private:
+ const Mem::Pointer<Metadata> metadata; ///< shared metadata
+ const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
+ const Mem::Pointer<QueueReaders> readers; ///< readers array
+};
// OneToOneUniQueue
template <class Value>
bool
-OneToOneUniQueue::pop(Value &value)
+OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
{
- if (sizeof(value) > shared->theMaxItemSize)
+ if (sizeof(value) > theMaxItemSize)
throw ItemTooLarge();
// A writer might push between the empty test and block() below, so we do
// not return false right after calling block(), but test again.
if (empty()) {
- reader().block();
+ if (!reader)
+ return false;
+
+ reader->block();
// A writer might push between the empty test and block() below,
// so we must test again as such a writer will not signal us.
if (empty())
return false;
}
- reader().unblock();
- const unsigned int pos =
- (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize;
- memcpy(&value, shared->theBuffer + pos, sizeof(value));
- --shared->theSize;
+ if (reader)
+ reader->unblock();
+
+ const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
+ memcpy(&value, theBuffer + pos, sizeof(value));
+ --theSize;
+
+ return true;
+}
+
+template <class Value>
+bool
+OneToOneUniQueue::peek(Value &value) const
+{
+ if (sizeof(value) > theMaxItemSize)
+ throw ItemTooLarge();
+
+ if (empty())
+ return false;
+ // the reader may pop() before we copy; making this method imprecise
+ const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
+ memcpy(&value, theBuffer + pos, sizeof(value));
return true;
}
template <class Value>
bool
-OneToOneUniQueue::push(const Value &value)
+OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
{
- if (sizeof(value) > shared->theMaxItemSize)
+ if (sizeof(value) > theMaxItemSize)
throw ItemTooLarge();
if (full())
throw Full();
- const bool wasEmpty = empty();
- const unsigned int pos =
- shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
- memcpy(shared->theBuffer + pos, &value, sizeof(value));
- ++shared->theSize;
+ const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
+ memcpy(theBuffer + pos, &value, sizeof(value));
+ const bool wasEmpty = !theSize++;
+
+ return wasEmpty && (!reader || reader->raiseSignal());
+}
+
+template <class Value>
+void
+OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+ os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
+ // Nobody can modify our theOut so, after capturing some valid theSize value
+ // in count, we can reliably report all [theOut, theOut+count) items that
+ // were queued at theSize capturing time. We will miss new items push()ed by
+ // the other side, but it is OK -- we report state at the capturing time.
+ const auto count = theSize.load();
+ statOpen(os, "other", "popIndex", count);
+ statSamples<Value>(os, theOut, count);
+ statClose(os);
+}
+
+template <class Value>
+void
+OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+ os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
+ // Nobody can modify our theIn so, after capturing some valid theSize value
+ // in count, we can reliably report all [theIn-count, theIn) items that were
+ // queued at theSize capturing time. We may report items already pop()ed by
+ // the other side, but that is OK because pop() does not modify items -- it
+ // only increments theOut.
+ const auto count = theSize.load();
+ statOpen(os, "pushIndex", "other", count);
+ statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
+ statClose(os);
+}
+
+/// report a sample of [start, start + size) items
+template <class Value>
+void
+OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
+{
+ if (!count) {
+ os << " ";
+ return;
+ }
+
+ os << ", items: [\n";
+ // report a few leading and trailing items, without repetitions
+ const auto sampleSize = std::min(3U, count); // leading (and max) sample
+ statRange<Value>(os, start, sampleSize);
+ if (sampleSize < count) { // the first sample did not show some items
+ // The `start` offset aside, the first sample reported all items
+ // below the sampleSize offset. The second sample needs to report
+ // the last sampleSize items (i.e. starting at count-sampleSize
+ // offset) except those already reported by the first sample.
+ const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
+ const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
+
+ // but first we print a sample separator, unless there are no items
+ // between the samples or the separator hides the only unsampled item
+ const auto bothSamples = sampleSize + secondSampleSize;
+ if (bothSamples + 1U == count)
+ statRange<Value>(os, start + sampleSize, 1);
+ else if (count > bothSamples)
+ os << " # ... " << (count - bothSamples) << " items not shown ...\n";
+
+ statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
+ }
+ os << " ]";
+}
+
+/// statSamples() helper that reports n items from start
+template <class Value>
+void
+OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
+{
+ assert(sizeof(Value) <= theMaxItemSize);
+ auto offset = start;
+ for (uint32_t i = 0; i < n; ++i) {
+ // XXX: Throughout this C++ header, these overflow wrapping tricks work
+ // only because theCapacity currently happens to be a power of 2 (e.g.,
+ // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
+ const auto pos = (offset++ % theCapacity) * theMaxItemSize;
+ Value value;
+ memcpy(&value, theBuffer + pos, sizeof(value));
+ os << " { ";
+ value.stat(os);
+ os << " },\n";
+ }
+}
+
+// OneToOneUniQueues
- return wasEmpty && reader().raiseSignal();
+inline OneToOneUniQueue &
+OneToOneUniQueues::operator [](const int index)
+{
+ return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
}
+inline const OneToOneUniQueue &
+OneToOneUniQueues::front() const
+{
+ const char *const queue =
+ reinterpret_cast<const char *>(this) + sizeof(*this);
+ return *reinterpret_cast<const OneToOneUniQueue *>(queue);
+}
+
+// BaseMultiQueue
+
+template <class Value>
+bool
+BaseMultiQueue::pop(int &remoteProcessId, Value &value)
+{
+ // iterate all remote processes, starting after the one we visited last
+ for (int i = 0; i < remotesCount(); ++i) {
+ if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
+ theLastPopProcessId = remotesIdOffset();
+ OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
+ if (queue.pop(value, &localReader())) {
+ remoteProcessId = theLastPopProcessId;
+ debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
+ return true;
+ }
+ }
+ return false; // no process had anything to pop
+}
-// FewToOneBiQueue
+template <class Value>
+bool
+BaseMultiQueue::push(const int remoteProcessId, const Value &value)
+{
+ OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
+ QueueReader &reader = remoteReader(remoteProcessId);
+ debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
+ return remoteQueue.push(value, &reader);
+}
template <class Value>
bool
-FewToOneBiQueue::pop(int &workerId, Value &value)
+BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
{
- // iterate all workers, starting after the one we visited last
- for (int i = 0; i < theWorkerCount; ++i) {
- theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
- if (biQueues[theLastPopWorker]->pop(value)) {
- workerId = theLastPopWorker + WorkerIdOffset;
+ // mimic FewToFewBiQueue::pop() but quit just before popping
+ int popProcessId = theLastPopProcessId; // preserve for future pop()
+ for (int i = 0; i < remotesCount(); ++i) {
+ if (++popProcessId >= remotesIdOffset() + remotesCount())
+ popProcessId = remotesIdOffset();
+ const OneToOneUniQueue &queue = inQueue(popProcessId);
+ if (queue.peek(value)) {
+ remoteProcessId = popProcessId;
return true;
}
}
- return false; // no worker had anything to pop
+ return false; // most likely, no process had anything to pop
}
+template <class Value>
+void
+BaseMultiQueue::stat(std::ostream &os) const
+{
+ for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+ const auto &queue = inQueue(processId);
+ queue.statIn<Value>(os, theLocalProcessId, processId);
+ }
+
+ os << "\n";
+
+ for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+ const auto &queue = outQueue(processId);
+ queue.statOut<Value>(os, theLocalProcessId, processId);
+ }
+}
+
+// FewToFewBiQueue
+
template <class Value>
bool
-FewToOneBiQueue::push(const int workerId, const Value &value)
+FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
{
- assert(validWorkerId(workerId));
- return biQueues[workerId - WorkerIdOffset]->push(value);
+ // we may be called before remote process configured its queue end
+ if (!validProcessId(remoteGroup(), remoteProcessId))
+ return false;
+
+ // we need the oldest value, so start with the incoming, them-to-us queue:
+ const OneToOneUniQueue &in = inQueue(remoteProcessId);
+ debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
+ theLocalProcessId << " at " << in.size());
+ if (in.peek(value))
+ return true;
+
+ // if the incoming queue is empty, check the outgoing, us-to-them queue:
+ const OneToOneUniQueue &out = outQueue(remoteProcessId);
+ debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
+ remoteProcessId << " at " << out.size());
+ return out.peek(value);
}
+} // namespace Ipc
+
#endif // SQUID_IPC_QUEUE_H
+