]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Queue.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / ipc / Queue.cc
index 2492cf7457096448da1aed714bd9ebc352c47e17..1fffc29908ff66b38cf022cc1f41ad35d4df7509 100644 (file)
 /*
- * $Id$
- *
- * DEBUG: section 54    Interprocess Communication
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
  *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
-#include "config.h"
+/* DEBUG: section 54    Interprocess Communication */
+
+#include "squid.h"
 #include "base/TextException.h"
 #include "Debug.h"
 #include "globals.h"
 #include "ipc/Queue.h"
 
-/// constructs shared segment ID from parent queue ID and child queue index
+#include <limits>
+
+/// constructs Metadata ID from parent queue ID
 static String
-QueueId(String id, const int idx)
+MetadataId(String id)
 {
-    id.append("__");
-    id.append(xitoa(idx));
+    id.append("__metadata");
     return id;
 }
 
-/// constructs QueueReader ID from parent queue ID
+/// constructs one-to-one queues ID from parent queue ID
 static String
-ReaderId(String id)
+QueuesId(String id)
 {
-    id.append("__readers");
+    id.append("__queues");
     return id;
 }
 
+/// constructs QueueReaders ID from parent queue ID
+static String
+ReadersId(String id)
+{
+    id.append("__readers");
+    return id;
+}
 
 /* QueueReader */
 
-InstanceIdDefinitions(QueueReader, "ipcQR");
+InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
 
-QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+    rateLimit(0), balance(0)
 {
     debugs(54, 7, HERE << "constructed " << id);
 }
 
+/* QueueReaders */
 
-// OneToOneUniQueue
+Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
+    theReaders(theCapacity)
+{
+    Must(theCapacity > 0);
+}
 
-OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
-    shm(id.termedBuf()), reader_(NULL)
+size_t
+Ipc::QueueReaders::sharedMemorySize() const
 {
-    const int sharedSize = Items2Bytes(maxItemSize, capacity);
-    shm.create(sharedSize);
-    shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity);
+    return SharedMemorySize(theCapacity);
 }
 
-OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()),
-    reader_(NULL)
+size_t
+Ipc::QueueReaders::SharedMemorySize(const int capacity)
 {
-    shm.open();
-    shared = reinterpret_cast<Shared *>(shm.mem());
-    assert(shared);
-    const int sharedSize =
-        Items2Bytes(shared->theMaxItemSize, shared->theCapacity);
-    assert(shared == reinterpret_cast<Shared *>(shm.reserve(sharedSize)));
+    return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
 }
 
-void
-OneToOneUniQueue::reader(QueueReader *aReader)
+// OneToOneUniQueue
+
+Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
+    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+    theCapacity(aCapacity)
 {
-    Must(!reader_ && aReader);
-    reader_ = aReader;
+    Must(theMaxItemSize > 0);
+    Must(theCapacity > 0);
 }
 
 int
-OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
+Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
 {
     assert(maxItemSize > 0);
-    size -= sizeof(Shared);
+    size -= sizeof(OneToOneUniQueue);
     return size >= 0 ? size / maxItemSize : 0;
 }
 
 int
-OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
+Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
 {
     assert(size >= 0);
-    return sizeof(Shared) + maxItemSize * size;
+    return sizeof(OneToOneUniQueue) + maxItemSize * size;
 }
 
-OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
-    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
-    theCapacity(aCapacity)
+/// start state reporting (by reporting queue parameters)
+/// The labels reflect whether the caller owns theIn or theOut data member and,
+/// hence, cannot report the other value reliably.
+void
+Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
 {
+    os << "{ size: " << count <<
+       ", capacity: " << theCapacity <<
+       ", " << inLabel << ": " << theIn <<
+       ", " << outLabel << ": " << theOut;
 }
 
-QueueReader &
-OneToOneUniQueue::reader()
+/// end state reporting started by statOpen()
+void
+Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
 {
-    Must(reader_);
-    return *reader_;
+    os << "}\n";
 }
 
+/* OneToOneUniQueues */
 
-// OneToOneBiQueue
+Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
+{
+    Must(theCapacity > 0);
+    for (int i = 0; i < theCapacity; ++i)
+        new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
+}
 
-OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity):
-    popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)),
-    pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity))
+size_t
+Ipc::OneToOneUniQueues::sharedMemorySize() const
 {
+    return sizeof(*this) + theCapacity * front().sharedMemorySize();
 }
 
-OneToOneBiQueue::OneToOneBiQueue(const String &id):
-    popQueue(new OneToOneUniQueue(QueueId(id, 2))),
-    pushQueue(new OneToOneUniQueue(QueueId(id, 1)))
+size_t
+Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
 {
+    const int queueSize =
+        OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
+    return sizeof(OneToOneUniQueues) + queueSize * capacity;
 }
 
-void
-OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+const Ipc::OneToOneUniQueue &
+Ipc::OneToOneUniQueues::operator [](const int index) const
+{
+    Must(0 <= index && index < theCapacity);
+    const size_t queueSize = index ? front().sharedMemorySize() : 0;
+    const char *const queue =
+        reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
+    return *reinterpret_cast<const OneToOneUniQueue *>(queue);
+}
+
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+    theLocalProcessId(aLocalProcessId),
+    theLastPopProcessId(std::numeric_limits<int>::max() - 1)
 {
-    popQueue->reader(r1);
-    pushQueue->reader(r2);
 }
 
 void
-OneToOneBiQueue::clearReaderSignal()
+Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
 {
-    debugs(54, 7, HERE << "reader: " << &popQueue->reader());
-    popQueue->reader().clearSignal();
+    QueueReader &reader = localReader();
+    debugs(54, 7, "reader: " << reader.id);
+
+    reader.clearSignal();
+
+    // we got a hint; we could reposition iteration to try popping from the
+    // remoteProcessId queue first; but it does not seem to help much and might
+    // introduce some bias so we do not do that for now:
+    // theLastPopProcessId = remoteProcessId;
 }
 
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.balance;
+}
 
-// FewToOneBiQueue
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
+{
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
 
-FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
-    theLastPopWorker(0), theWorkerCount(aWorkerCount),
-    shm(ReaderId(id).termedBuf()),
-    reader(NULL)
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
 {
-    // create a new segment for the local and remote queue readers
-    // TODO: all our queues and readers should use a single segment
-    shm.create((theWorkerCount+1)*sizeof(QueueReader));
-    reader = new (shm.reserve(sizeof(QueueReader))) QueueReader;
-    debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader()
+{
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->localReader();
+    return const_cast<QueueReader &>(reader);
+}
 
-    assert(theWorkerCount >= 0);
-    biQueues.reserve(theWorkerCount);
-    for (int i = 0; i < theWorkerCount; ++i) {
-        OneToOneBiQueue *const biQueue =
-            new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
-        QueueReader *remoteReader =
-            new (shm.reserve(sizeof(QueueReader))) QueueReader;
-        biQueue->readers(reader, remoteReader);
-        biQueues.push_back(biQueue);
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
+{
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+    return const_cast<QueueReader &>(reader);
+}
+
+// FewToFewBiQueue
+
+Ipc::FewToFewBiQueue::Owner *
+Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+    return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
+}
+
+Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+    BaseMultiQueue(aLocalProcessId),
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+    theLocalGroup(aLocalGroup)
+{
+    Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
+    Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
+
+    debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+    return capacity * groupASize * groupBSize * 2;
+}
+
+bool
+Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
+{
+    switch (group) {
+    case groupA:
+        return metadata->theGroupAIdOffset <= processId &&
+               processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+    case groupB:
+        return metadata->theGroupBIdOffset <= processId &&
+               processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
     }
+    return false;
 }
 
-OneToOneBiQueue *
-FewToOneBiQueue::Attach(const String &id, const int workerId)
+int
+Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
-    // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
-    Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf());
+    Must(fromGroup != toGroup);
+    assert(validProcessId(fromGroup, fromProcessId));
+    assert(validProcessId(toGroup, toProcessId));
+    int index1;
+    int index2;
+    int offset;
+    if (fromGroup == groupA) {
+        index1 = fromProcessId - metadata->theGroupAIdOffset;
+        index2 = toProcessId - metadata->theGroupBIdOffset;
+        offset = 0;
+    } else {
+        index1 = toProcessId - metadata->theGroupAIdOffset;
+        index2 = fromProcessId - metadata->theGroupBIdOffset;
+        offset = metadata->theGroupASize * metadata->theGroupBSize;
+    }
+    const int index = offset + index1 * metadata->theGroupBSize + index2;
+    return index;
+}
 
-    Ipc::Mem::Segment &shm = *shmPtr;
-    shm.open();
-    assert(shm.size() >= static_cast<off_t>((1 + workerId+1 - WorkerIdOffset)*sizeof(QueueReader)));
-    QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
-    QueueReader *remoteReader = &readers[0];
-    debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
-    QueueReader *localReader = &readers[workerId+1 - WorkerIdOffset];
-    debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
+{
+    return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
 
-    OneToOneBiQueue *const biQueue =
-        new OneToOneBiQueue(QueueId(id, workerId));
-    biQueue->readers(localReader, remoteReader);
-    return biQueue;
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteGroup(), remoteProcessId,
+                         theLocalGroup, theLocalProcessId);
 }
 
-FewToOneBiQueue::~FewToOneBiQueue()
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
 {
-    for (int i = 0; i < theWorkerCount; ++i)
-        delete biQueues[i];
+    return oneToOneQueue(theLocalGroup, theLocalProcessId,
+                         remoteGroup(), remoteProcessId);
 }
 
-bool FewToOneBiQueue::validWorkerId(const int workerId) const
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
 {
-    return WorkerIdOffset <= workerId &&
-        workerId < WorkerIdOffset + theWorkerCount;
+    Must(validProcessId(group, processId));
+    return group == groupA ?
+           processId - metadata->theGroupAIdOffset :
+           metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
 }
 
-void
-FewToOneBiQueue::clearReaderSignal(int workerId)
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
 {
-    debugs(54, 7, HERE << "reader: " << reader->id);
+    return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
+}
 
-    assert(validWorkerId(workerId));
-    reader->clearSignal();
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
+{
+    return readers->theReaders[readerIndex(remoteGroup(), processId)];
+}
 
-    // we got a hint; we could reposition iteration to try popping from the
-    // workerId queue first; but it does not seem to help much and might
-    // introduce some bias so we do not do that for now:
-    // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;
+int
+Ipc::FewToFewBiQueue::remotesCount() const
+{
+    return theLocalGroup == groupA ? metadata->theGroupBSize :
+           metadata->theGroupASize;
+}
+
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+    return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+           metadata->theGroupAIdOffset;
+}
+
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+    theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+    theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+    Must(theGroupASize > 0);
+    Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+{
+}
+
+Ipc::FewToFewBiQueue::Owner::~Owner()
+{
+    delete metadataOwner;
+    delete queuesOwner;
+    delete readersOwner;
+}
+
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+    return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
+}
+
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+    BaseMultiQueue(localProcessId),
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
+{
+    Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+    Must(readers->theCapacity == metadata->theProcessCount);
+
+    debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+bool
+Ipc::MultiQueue::validProcessId(const int processId) const
+{
+    return metadata->theProcessIdOffset <= processId &&
+           processId < metadata->theProcessIdOffset + metadata->theProcessCount;
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
+{
+    assert(validProcessId(fromProcessId));
+    assert(validProcessId(toProcessId));
+    const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+    const int toIndex = toProcessId - metadata->theProcessIdOffset;
+    const int index = fromIndex * metadata->theProcessCount + toIndex;
+    return (*queues)[index];
 }
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
+{
+    assert(validProcessId(processId));
+    const int index = processId - metadata->theProcessIdOffset;
+    return readers->theReaders[index];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteProcessId, theLocalProcessId);
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalProcessId, remoteProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
+{
+    return reader(theLocalProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
+{
+    return reader(processId);
+}
+
+int
+Ipc::MultiQueue::remotesCount() const
+{
+    return metadata->theProcessCount;
+}
+
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+    return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+    theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+    Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
+{
+    delete metadataOwner;
+    delete queuesOwner;
+    delete readersOwner;
+}
+