]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Queue.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / ipc / Queue.cc
index 4cba4bff26693e787e3040a25964e770c9539bac..1fffc29908ff66b38cf022cc1f41ad35d4df7509 100644 (file)
@@ -1,16 +1,21 @@
 /*
- * $Id$
- *
- * DEBUG: section 54    Interprocess Communication
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
  *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
-#include "config.h"
+/* DEBUG: section 54    Interprocess Communication */
+
+#include "squid.h"
 #include "base/TextException.h"
 #include "Debug.h"
 #include "globals.h"
 #include "ipc/Queue.h"
 
+#include <limits>
+
 /// constructs Metadata ID from parent queue ID
 static String
 MetadataId(String id)
@@ -35,23 +40,22 @@ ReadersId(String id)
     return id;
 }
 
-
 /* QueueReader */
 
 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
 
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
-        rateLimit(0), balance(0)
+Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+    rateLimit(0), balance(0)
 {
     debugs(54, 7, HERE << "constructed " << id);
 }
 
 /* QueueReaders */
 
-Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
+Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
+    theReaders(theCapacity)
 {
     Must(theCapacity > 0);
-    new (theReaders) QueueReader[theCapacity];
 }
 
 size_t
@@ -66,12 +70,11 @@ Ipc::QueueReaders::SharedMemorySize(const int capacity)
     return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
 }
 
-
 // OneToOneUniQueue
 
 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
-        theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
-        theCapacity(aCapacity)
+    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+    theCapacity(aCapacity)
 {
     Must(theMaxItemSize > 0);
     Must(theCapacity > 0);
@@ -92,6 +95,24 @@ Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int siz
     return sizeof(OneToOneUniQueue) + maxItemSize * size;
 }
 
+/// start state reporting (by reporting queue parameters)
+/// The labels reflect whether the caller owns theIn or theOut data member and,
+/// hence, cannot report the other value reliably.
+void
+Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
+{
+    os << "{ size: " << count <<
+       ", capacity: " << theCapacity <<
+       ", " << inLabel << ": " << theIn <<
+       ", " << outLabel << ": " << theOut;
+}
+
+/// end state reporting started by statOpen()
+void
+Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
+{
+    os << "}\n";
+}
 
 /* OneToOneUniQueues */
 
@@ -126,6 +147,73 @@ Ipc::OneToOneUniQueues::operator [](const int index) const
     return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+    theLocalProcessId(aLocalProcessId),
+    theLastPopProcessId(std::numeric_limits<int>::max() - 1)
+{
+}
+
+void
+Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
+{
+    QueueReader &reader = localReader();
+    debugs(54, 7, "reader: " << reader.id);
+
+    reader.clearSignal();
+
+    // we got a hint; we could reposition iteration to try popping from the
+    // remoteProcessId queue first; but it does not seem to help much and might
+    // introduce some bias so we do not do that for now:
+    // theLastPopProcessId = remoteProcessId;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.balance;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
+{
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
+{
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader()
+{
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->localReader();
+    return const_cast<QueueReader &>(reader);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
+{
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+    return const_cast<QueueReader &>(reader);
+}
 
 // FewToFewBiQueue
 
@@ -136,17 +224,16 @@ Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int gro
 }
 
 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
-        metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
-        queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
-        readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
-        theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
-        theLastPopProcessId(readers->theCapacity)
+    BaseMultiQueue(aLocalProcessId),
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+    theLocalGroup(aLocalGroup)
 {
     Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
     Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
 
-    const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
-    debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
+    debugs(54, 7, "queue " << id << " reader: " << localReader().id);
 }
 
 int
@@ -191,19 +278,12 @@ Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromPr
     return index;
 }
 
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
-{
-    return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
-}
-
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
     return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
-/// incoming queue from a given remote process
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
 {
@@ -211,7 +291,6 @@ Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
                          theLocalGroup, theLocalProcessId);
 }
 
-/// outgoing queue to a given remote process
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
 {
@@ -228,94 +307,153 @@ Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
            metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
 }
 
-Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
 {
-    return readers->theReaders[readerIndex(group, processId)];
+    return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
 }
 
 const Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
 {
-    return readers->theReaders[readerIndex(group, processId)];
+    return readers->theReaders[readerIndex(remoteGroup(), processId)];
 }
 
-void
-Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
+int
+Ipc::FewToFewBiQueue::remotesCount() const
 {
-    QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
-    debugs(54, 7, HERE << "reader: " << localReader.id);
+    return theLocalGroup == groupA ? metadata->theGroupBSize :
+           metadata->theGroupASize;
+}
 
-    Must(validProcessId(remoteGroup(), remoteProcessId));
-    localReader.clearSignal();
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+    return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+           metadata->theGroupAIdOffset;
+}
 
-    // we got a hint; we could reposition iteration to try popping from the
-    // remoteProcessId queue first; but it does not seem to help much and might
-    // introduce some bias so we do not do that for now:
-    // theLastPopProcessId = remoteProcessId;
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+    theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+    theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+    Must(theGroupASize > 0);
+    Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+{
+}
+
+Ipc::FewToFewBiQueue::Owner::~Owner()
+{
+    delete metadataOwner;
+    delete queuesOwner;
+    delete readersOwner;
+}
+
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+    return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
+}
+
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+    BaseMultiQueue(localProcessId),
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
+{
+    Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+    Must(readers->theCapacity == metadata->theProcessCount);
+
+    debugs(54, 7, "queue " << id << " reader: " << localReader().id);
 }
 
 bool
-Ipc::FewToFewBiQueue::popReady() const
-{
-    // mimic FewToFewBiQueue::pop() but quit just before popping
-    int popProcessId = theLastPopProcessId; // preserve for future pop()
-    for (int i = 0; i < remoteGroupSize(); ++i) {
-        if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
-            popProcessId = remoteGroupIdOffset();
-        const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
-        if (!queue.empty())
-            return true;
-    }
-    return false; // most likely, no process had anything to pop
+Ipc::MultiQueue::validProcessId(const int processId) const
+{
+    return metadata->theProcessIdOffset <= processId &&
+           processId < metadata->theProcessIdOffset + metadata->theProcessCount;
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
+{
+    assert(validProcessId(fromProcessId));
+    assert(validProcessId(toProcessId));
+    const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+    const int toIndex = toProcessId - metadata->theProcessIdOffset;
+    const int index = fromIndex * metadata->theProcessCount + toIndex;
+    return (*queues)[index];
 }
 
-Ipc::QueueReader::Balance &
-Ipc::FewToFewBiQueue::localBalance()
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
 {
-    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
-    return r.balance;
+    assert(validProcessId(processId));
+    const int index = processId - metadata->theProcessIdOffset;
+    return readers->theReaders[index];
 }
 
-const Ipc::QueueReader::Balance &
-Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
 {
-    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
-    return r.balance;
+    return oneToOneQueue(remoteProcessId, theLocalProcessId);
 }
 
-Ipc::QueueReader::Rate &
-Ipc::FewToFewBiQueue::localRateLimit()
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
 {
-    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
-    return r.rateLimit;
+    return oneToOneQueue(theLocalProcessId, remoteProcessId);
 }
 
-const Ipc::QueueReader::Rate &
-Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
 {
-    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
-    return r.rateLimit;
+    return reader(theLocalProcessId);
 }
 
-Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
-        theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
-        theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
 {
-    Must(theGroupASize > 0);
-    Must(theGroupBSize > 0);
+    return reader(processId);
 }
 
-Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
-        metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
-        queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
-        readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+int
+Ipc::MultiQueue::remotesCount() const
 {
+    return metadata->theProcessCount;
 }
 
-Ipc::FewToFewBiQueue::Owner::~Owner()
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+    return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+    theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+    Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
 {
     delete metadataOwner;
     delete queuesOwner;
     delete readersOwner;
 }
+