]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Queue.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / ipc / Queue.cc
index e86fe37ac31b7417ba7d3bd2b62f3f6ff85e6bf2..1fffc29908ff66b38cf022cc1f41ad35d4df7509 100644 (file)
 /*
- * $Id$
- *
- * DEBUG: section 54    Interprocess Communication
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
  *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
-#include "config.h"
+/* DEBUG: section 54    Interprocess Communication */
+
+#include "squid.h"
+#include "base/TextException.h"
+#include "Debug.h"
+#include "globals.h"
 #include "ipc/Queue.h"
 
+#include <limits>
 
+/// constructs Metadata ID from parent queue ID
 static String
-QueueId(String id, const int idx)
+MetadataId(String id)
 {
-    id.append("__");
-    id.append(xitoa(idx));
+    id.append("__metadata");
     return id;
 }
 
+/// constructs one-to-one queues ID from parent queue ID
+static String
+QueuesId(String id)
+{
+    id.append("__queues");
+    return id;
+}
 
-// OneToOneUniQueue
+/// constructs QueueReaders ID from parent queue ID
+static String
+ReadersId(String id)
+{
+    id.append("__readers");
+    return id;
+}
+
+/* QueueReader */
+
+InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
+
+Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+    rateLimit(0), balance(0)
+{
+    debugs(54, 7, HERE << "constructed " << id);
+}
+
+/* QueueReaders */
+
+Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
+    theReaders(theCapacity)
+{
+    Must(theCapacity > 0);
+}
 
-OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
-    shm(id.termedBuf())
+size_t
+Ipc::QueueReaders::sharedMemorySize() const
 {
-    shm.create(Items2Bytes(maxItemSize, capacity));
-    assert(shm.mem());
-    shared = new (shm.mem()) Shared(maxItemSize, capacity);
+    return SharedMemorySize(theCapacity);
 }
 
-OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf())
+size_t
+Ipc::QueueReaders::SharedMemorySize(const int capacity)
 {
-    shm.open();
-    shared = reinterpret_cast<Shared *>(shm.mem());
-    assert(shared);
+    return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
+}
+
+// OneToOneUniQueue
+
+Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
+    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+    theCapacity(aCapacity)
+{
+    Must(theMaxItemSize > 0);
+    Must(theCapacity > 0);
 }
 
 int
-OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
+Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
 {
     assert(maxItemSize > 0);
-    size -= sizeof(Shared);
+    size -= sizeof(OneToOneUniQueue);
     return size >= 0 ? size / maxItemSize : 0;
 }
 
 int
-OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
+Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
 {
     assert(size >= 0);
-    return sizeof(Shared) + maxItemSize * size;
+    return sizeof(OneToOneUniQueue) + maxItemSize * size;
 }
 
-OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
-    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
-    theCapacity(aCapacity)
+/// start state reporting (by reporting queue parameters)
+/// The labels reflect whether the caller owns theIn or theOut data member and,
+/// hence, cannot report the other value reliably.
+void
+Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
 {
+    os << "{ size: " << count <<
+       ", capacity: " << theCapacity <<
+       ", " << inLabel << ": " << theIn <<
+       ", " << outLabel << ": " << theOut;
 }
 
+/// end state reporting started by statOpen()
+void
+Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
+{
+    os << "}\n";
+}
 
-// OneToOneBiQueue
+/* OneToOneUniQueues */
 
-OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity):
-    popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)),
-    pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity))
+Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
 {
+    Must(theCapacity > 0);
+    for (int i = 0; i < theCapacity; ++i)
+        new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
 }
 
-OneToOneBiQueue::OneToOneBiQueue(const String &id):
-    popQueue(new OneToOneUniQueue(QueueId(id, 2))),
-    pushQueue(new OneToOneUniQueue(QueueId(id, 1)))
+size_t
+Ipc::OneToOneUniQueues::sharedMemorySize() const
 {
+    return sizeof(*this) + theCapacity * front().sharedMemorySize();
 }
 
+size_t
+Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
+{
+    const int queueSize =
+        OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
+    return sizeof(OneToOneUniQueues) + queueSize * capacity;
+}
 
-// FewToOneBiQueue
+const Ipc::OneToOneUniQueue &
+Ipc::OneToOneUniQueues::operator [](const int index) const
+{
+    Must(0 <= index && index < theCapacity);
+    const size_t queueSize = index ? front().sharedMemorySize() : 0;
+    const char *const queue =
+        reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
+    return *reinterpret_cast<const OneToOneUniQueue *>(queue);
+}
+
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+    theLocalProcessId(aLocalProcessId),
+    theLastPopProcessId(std::numeric_limits<int>::max() - 1)
+{
+}
 
-FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
-    theLastPopWorkerId(-1), theWorkerCount(aWorkerCount)
+void
+Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
 {
-    assert(theWorkerCount >= 0);
-    biQueues.reserve(theWorkerCount);
-    for (int i = 0; i < theWorkerCount; ++i) {
-        OneToOneBiQueue *const biQueue =
-            new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
-        biQueues.push_back(biQueue);
+    QueueReader &reader = localReader();
+    debugs(54, 7, "reader: " << reader.id);
+
+    reader.clearSignal();
+
+    // we got a hint; we could reposition iteration to try popping from the
+    // remoteProcessId queue first; but it does not seem to help much and might
+    // introduce some bias so we do not do that for now:
+    // theLastPopProcessId = remoteProcessId;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.balance;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
+{
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
+{
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader()
+{
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->localReader();
+    return const_cast<QueueReader &>(reader);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
+{
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+    return const_cast<QueueReader &>(reader);
+}
+
+// FewToFewBiQueue
+
+Ipc::FewToFewBiQueue::Owner *
+Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+    return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
+}
+
+Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+    BaseMultiQueue(aLocalProcessId),
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+    theLocalGroup(aLocalGroup)
+{
+    Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
+    Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
+
+    debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+    return capacity * groupASize * groupBSize * 2;
+}
+
+bool
+Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
+{
+    switch (group) {
+    case groupA:
+        return metadata->theGroupAIdOffset <= processId &&
+               processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+    case groupB:
+        return metadata->theGroupBIdOffset <= processId &&
+               processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
     }
+    return false;
 }
 
-OneToOneBiQueue *
-FewToOneBiQueue::Attach(const String &id, const int workerId)
+int
+Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
-    return new OneToOneBiQueue(QueueId(id, workerId));
+    Must(fromGroup != toGroup);
+    assert(validProcessId(fromGroup, fromProcessId));
+    assert(validProcessId(toGroup, toProcessId));
+    int index1;
+    int index2;
+    int offset;
+    if (fromGroup == groupA) {
+        index1 = fromProcessId - metadata->theGroupAIdOffset;
+        index2 = toProcessId - metadata->theGroupBIdOffset;
+        offset = 0;
+    } else {
+        index1 = toProcessId - metadata->theGroupAIdOffset;
+        index2 = fromProcessId - metadata->theGroupBIdOffset;
+        offset = metadata->theGroupASize * metadata->theGroupBSize;
+    }
+    const int index = offset + index1 * metadata->theGroupBSize + index2;
+    return index;
 }
 
-FewToOneBiQueue::~FewToOneBiQueue()
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
-    for (int i = 0; i < theWorkerCount; ++i)
-        delete biQueues[i];
+    return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
-bool FewToOneBiQueue::validWorkerId(const int workerId) const
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
 {
-    return 0 <= workerId && workerId < theWorkerCount;
+    return oneToOneQueue(remoteGroup(), remoteProcessId,
+                         theLocalGroup, theLocalProcessId);
 }
+
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalGroup, theLocalProcessId,
+                         remoteGroup(), remoteProcessId);
+}
+
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
+{
+    Must(validProcessId(group, processId));
+    return group == groupA ?
+           processId - metadata->theGroupAIdOffset :
+           metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
+{
+    return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
+{
+    return readers->theReaders[readerIndex(remoteGroup(), processId)];
+}
+
+int
+Ipc::FewToFewBiQueue::remotesCount() const
+{
+    return theLocalGroup == groupA ? metadata->theGroupBSize :
+           metadata->theGroupASize;
+}
+
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+    return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+           metadata->theGroupAIdOffset;
+}
+
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+    theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+    theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+    Must(theGroupASize > 0);
+    Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+{
+}
+
+Ipc::FewToFewBiQueue::Owner::~Owner()
+{
+    delete metadataOwner;
+    delete queuesOwner;
+    delete readersOwner;
+}
+
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+    return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
+}
+
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+    BaseMultiQueue(localProcessId),
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
+{
+    Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+    Must(readers->theCapacity == metadata->theProcessCount);
+
+    debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+bool
+Ipc::MultiQueue::validProcessId(const int processId) const
+{
+    return metadata->theProcessIdOffset <= processId &&
+           processId < metadata->theProcessIdOffset + metadata->theProcessCount;
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
+{
+    assert(validProcessId(fromProcessId));
+    assert(validProcessId(toProcessId));
+    const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+    const int toIndex = toProcessId - metadata->theProcessIdOffset;
+    const int index = fromIndex * metadata->theProcessCount + toIndex;
+    return (*queues)[index];
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
+{
+    assert(validProcessId(processId));
+    const int index = processId - metadata->theProcessIdOffset;
+    return readers->theReaders[index];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteProcessId, theLocalProcessId);
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalProcessId, remoteProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
+{
+    return reader(theLocalProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
+{
+    return reader(processId);
+}
+
+int
+Ipc::MultiQueue::remotesCount() const
+{
+    return metadata->theProcessCount;
+}
+
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+    return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+    theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+    Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
+{
+    delete metadataOwner;
+    delete queuesOwner;
+    delete readersOwner;
+}
+