]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/ipc/Queue.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Queue.cc
index ad1ee0d41bc6310f501ed13a9e3bf37c955e31f0..98dd765f28a5e8d4c31606146ddfba583e2cc067 100644 (file)
@@ -5,7 +5,7 @@
  *
  */
 
-#include "config.h"
+#include "squid.h"
 #include "base/TextException.h"
 #include "Debug.h"
 #include "globals.h"
@@ -35,12 +35,12 @@ ReadersId(String id)
     return id;
 }
 
-
 /* QueueReader */
 
 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
 
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
+        rateLimit(0), balance(0)
 {
     debugs(54, 7, HERE << "constructed " << id);
 }
@@ -50,7 +50,12 @@ Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
 {
     Must(theCapacity > 0);
-    new (theReaders) QueueReader[theCapacity];
+    theReaders=new QueueReader[theCapacity];
+}
+
+Ipc::QueueReaders::~QueueReaders()
+{
+    delete[] theReaders;
 }
 
 size_t
@@ -65,12 +70,11 @@ Ipc::QueueReaders::SharedMemorySize(const int capacity)
     return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
 }
 
-
 // OneToOneUniQueue
 
 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
-    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
-    theCapacity(aCapacity)
+        theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+        theCapacity(aCapacity)
 {
     Must(theMaxItemSize > 0);
     Must(theCapacity > 0);
@@ -91,7 +95,6 @@ Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int siz
     return sizeof(OneToOneUniQueue) + maxItemSize * size;
 }
 
-
 /* OneToOneUniQueues */
 
 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
@@ -125,7 +128,6 @@ Ipc::OneToOneUniQueues::operator [](const int index) const
     return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
-
 // FewToFewBiQueue
 
 Ipc::FewToFewBiQueue::Owner *
@@ -135,11 +137,11 @@ Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int gro
 }
 
 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
-    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
-    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
-    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
-    theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
-    theLastPopProcessId(readers->theCapacity)
+        metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+        queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+        readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+        theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
+        theLastPopProcessId(readers->theCapacity)
 {
     Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
     Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
@@ -148,26 +150,32 @@ Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup,
     debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
 }
 
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+    return capacity * groupASize * groupBSize * 2;
+}
+
 bool
 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
 {
     switch (group) {
     case groupA:
         return metadata->theGroupAIdOffset <= processId &&
-            processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+               processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
     case groupB:
         return metadata->theGroupBIdOffset <= processId &&
-            processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
+               processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
     }
     return false;
 }
 
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+int
+Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
     Must(fromGroup != toGroup);
-    Must(validProcessId(fromGroup, fromProcessId));
-    Must(validProcessId(toGroup, toProcessId));
+    assert(validProcessId(fromGroup, fromProcessId));
+    assert(validProcessId(toGroup, toProcessId));
     int index1;
     int index2;
     int offset;
@@ -181,17 +189,56 @@ Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcess
         offset = metadata->theGroupASize * metadata->theGroupBSize;
     }
     const int index = offset + index1 * metadata->theGroupBSize + index2;
-    return (*queues)[index];
+    return index;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+{
+    return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
+{
+    return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
+
+/// incoming queue from a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteGroup(), remoteProcessId,
+                         theLocalGroup, theLocalProcessId);
+}
+
+/// outgoing queue to a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalGroup, theLocalProcessId,
+                         remoteGroup(), remoteProcessId);
+}
+
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
+{
+    Must(validProcessId(group, processId));
+    return group == groupA ?
+           processId - metadata->theGroupAIdOffset :
+           metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
 }
 
 Ipc::QueueReader &
 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
 {
-    Must(validProcessId(group, processId));
-    const int index =  group == groupA ?
-        processId - metadata->theGroupAIdOffset :
-        metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
-    return readers->theReaders[index];
+    return readers->theReaders[readerIndex(group, processId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+{
+    return readers->theReaders[readerIndex(group, processId)];
 }
 
 void
@@ -209,18 +256,46 @@ Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
     // theLastPopProcessId = remoteProcessId;
 }
 
+Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::localBalance()
+{
+    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+    return r.balance;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+    return r.balance;
+}
+
+Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::localRateLimit()
+{
+    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+    return r.rateLimit;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+    return r.rateLimit;
+}
+
 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
-    theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
-    theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+        theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+        theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
 {
     Must(theGroupASize > 0);
     Must(theGroupBSize > 0);
 }
 
 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
-    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
-    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
-    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+        metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+        queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+        readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
 {
 }