/*
- * $Id$
- *
- * DEBUG: section 54 Interprocess Communication
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
-#include "config.h"
+/* DEBUG: section 54 Interprocess Communication */
+
+#include "squid.h"
#include "base/TextException.h"
#include "Debug.h"
#include "globals.h"
#include "ipc/Queue.h"
-/// constructs shared segment ID from parent queue ID and child queue index
+#include <limits>
+
+/// constructs Metadata ID from parent queue ID
static String
-QueueId(String id, const int idx)
+MetadataId(String id)
{
- id.append("__");
- id.append(xitoa(idx));
+ id.append("__metadata");
return id;
}
-/// constructs QueueReader ID from parent queue ID
+/// constructs one-to-one queues ID from parent queue ID
static String
-ReaderId(String id)
+QueuesId(String id)
{
- id.append("__readers");
+ id.append("__queues");
return id;
}
+/// constructs QueueReaders ID from parent queue ID
+static String
+ReadersId(String id)
+{
+ id.append("__readers");
+ return id;
+}
/* QueueReader */
-InstanceIdDefinitions(QueueReader, "ipcQR");
+InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+ rateLimit(0), balance(0)
{
debugs(54, 7, HERE << "constructed " << id);
}
+/* QueueReaders */
-// OneToOneUniQueue
+Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
+ theReaders(theCapacity)
+{
+ Must(theCapacity > 0);
+}
-OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
- shm(id.termedBuf()), reader_(NULL)
+size_t
+Ipc::QueueReaders::sharedMemorySize() const
{
- const int sharedSize = Items2Bytes(maxItemSize, capacity);
- shm.create(sharedSize);
- shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity);
+ return SharedMemorySize(theCapacity);
}
-OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()),
- reader_(NULL)
+size_t
+Ipc::QueueReaders::SharedMemorySize(const int capacity)
{
- shm.open();
- shared = reinterpret_cast<Shared *>(shm.mem());
- assert(shared);
+ return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
}
-void
-OneToOneUniQueue::reader(QueueReader *aReader)
+// OneToOneUniQueue
+
+Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
+ theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+ theCapacity(aCapacity)
{
- Must(!reader_ && aReader);
- reader_ = aReader;
+ Must(theMaxItemSize > 0);
+ Must(theCapacity > 0);
}
int
-OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
+Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
{
assert(maxItemSize > 0);
- size -= sizeof(Shared);
+ size -= sizeof(OneToOneUniQueue);
return size >= 0 ? size / maxItemSize : 0;
}
int
-OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
+Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
{
assert(size >= 0);
- return sizeof(Shared) + maxItemSize * size;
+ return sizeof(OneToOneUniQueue) + maxItemSize * size;
}
-OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
- theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
- theCapacity(aCapacity)
+/// start state reporting (by reporting queue parameters)
+/// The labels reflect whether the caller owns theIn or theOut data member and,
+/// hence, cannot report the other value reliably.
+void
+Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
{
+ os << "{ size: " << count <<
+ ", capacity: " << theCapacity <<
+ ", " << inLabel << ": " << theIn <<
+ ", " << outLabel << ": " << theOut;
}
-QueueReader &
-OneToOneUniQueue::reader()
+/// end state reporting started by statOpen()
+void
+Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
{
- Must(reader_);
- return *reader_;
+ os << "}\n";
}
+/* OneToOneUniQueues */
-// OneToOneBiQueue
+Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
+{
+ Must(theCapacity > 0);
+ for (int i = 0; i < theCapacity; ++i)
+ new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
+}
-OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity):
- popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)),
- pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity))
+size_t
+Ipc::OneToOneUniQueues::sharedMemorySize() const
{
+ return sizeof(*this) + theCapacity * front().sharedMemorySize();
}
-OneToOneBiQueue::OneToOneBiQueue(const String &id):
- popQueue(new OneToOneUniQueue(QueueId(id, 2))),
- pushQueue(new OneToOneUniQueue(QueueId(id, 1)))
+size_t
+Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
{
+ const int queueSize =
+ OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
+ return sizeof(OneToOneUniQueues) + queueSize * capacity;
}
-void
-OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+const Ipc::OneToOneUniQueue &
+Ipc::OneToOneUniQueues::operator [](const int index) const
+{
+ Must(0 <= index && index < theCapacity);
+ const size_t queueSize = index ? front().sharedMemorySize() : 0;
+ const char *const queue =
+ reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
+ return *reinterpret_cast<const OneToOneUniQueue *>(queue);
+}
+
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+ theLocalProcessId(aLocalProcessId),
+ theLastPopProcessId(std::numeric_limits<int>::max() - 1)
{
- popQueue->reader(r1);
- pushQueue->reader(r2);
}
void
-OneToOneBiQueue::clearReaderSignal()
+Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
{
- debugs(54, 7, HERE << "reader: " << &popQueue->reader());
- popQueue->reader().clearSignal();
+ QueueReader &reader = localReader();
+ debugs(54, 7, "reader: " << reader.id);
+
+ reader.clearSignal();
+
+ // we got a hint; we could reposition iteration to try popping from the
+ // remoteProcessId queue first; but it does not seem to help much and might
+ // introduce some bias so we do not do that for now:
+ // theLastPopProcessId = remoteProcessId;
}
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+ const QueueReader &r = remoteReader(remoteProcessId);
+ return r.balance;
+}
-// FewToOneBiQueue
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+ const QueueReader &r = remoteReader(remoteProcessId);
+ return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
+{
+ const OneToOneUniQueue &queue =
+ const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+ return const_cast<OneToOneUniQueue &>(queue);
+}
-FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
- theLastPopWorker(0), theWorkerCount(aWorkerCount),
- shm(ReaderId(id).termedBuf()),
- reader(NULL)
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
{
- // create a new segment for the local and remote queue readers
- // TODO: all our queues and readers should use a single segment
- shm.create((theWorkerCount+1)*sizeof(QueueReader));
- reader = new (shm.reserve(sizeof(QueueReader))) QueueReader;
- debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
+ const OneToOneUniQueue &queue =
+ const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+ return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader()
+{
+ const QueueReader &reader =
+ const_cast<const BaseMultiQueue *>(this)->localReader();
+ return const_cast<QueueReader &>(reader);
+}
- assert(theWorkerCount >= 0);
- biQueues.reserve(theWorkerCount);
- for (int i = 0; i < theWorkerCount; ++i) {
- OneToOneBiQueue *const biQueue =
- new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
- QueueReader *remoteReader =
- new (shm.reserve(sizeof(QueueReader))) QueueReader;
- biQueue->readers(reader, remoteReader);
- biQueues.push_back(biQueue);
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
+{
+ const QueueReader &reader =
+ const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+ return const_cast<QueueReader &>(reader);
+}
+
+// FewToFewBiQueue
+
+Ipc::FewToFewBiQueue::Owner *
+Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+ return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
+}
+
+Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+ BaseMultiQueue(aLocalProcessId),
+ metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+ queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+ readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+ theLocalGroup(aLocalGroup)
+{
+ Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
+ Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
+
+ debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+ return capacity * groupASize * groupBSize * 2;
+}
+
+bool
+Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
+{
+ switch (group) {
+ case groupA:
+ return metadata->theGroupAIdOffset <= processId &&
+ processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+ case groupB:
+ return metadata->theGroupBIdOffset <= processId &&
+ processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
}
+ return false;
}
-OneToOneBiQueue *
-FewToOneBiQueue::Attach(const String &id, const int workerId)
+int
+Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
- // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
- Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf());
+ Must(fromGroup != toGroup);
+ assert(validProcessId(fromGroup, fromProcessId));
+ assert(validProcessId(toGroup, toProcessId));
+ int index1;
+ int index2;
+ int offset;
+ if (fromGroup == groupA) {
+ index1 = fromProcessId - metadata->theGroupAIdOffset;
+ index2 = toProcessId - metadata->theGroupBIdOffset;
+ offset = 0;
+ } else {
+ index1 = toProcessId - metadata->theGroupAIdOffset;
+ index2 = fromProcessId - metadata->theGroupBIdOffset;
+ offset = metadata->theGroupASize * metadata->theGroupBSize;
+ }
+ const int index = offset + index1 * metadata->theGroupBSize + index2;
+ return index;
+}
- Ipc::Mem::Segment &shm = *shmPtr;
- shm.open();
- assert(shm.size() >= static_cast<off_t>((1 + workerId+1)*sizeof(QueueReader)));
- QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
- QueueReader *remoteReader = &readers[0];
- debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
- QueueReader *localReader = &readers[workerId+1];
- debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
+{
+ return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
- OneToOneBiQueue *const biQueue =
- new OneToOneBiQueue(QueueId(id, workerId));
- biQueue->readers(localReader, remoteReader);
- return biQueue;
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteGroup(), remoteProcessId,
+ theLocalGroup, theLocalProcessId);
}
-FewToOneBiQueue::~FewToOneBiQueue()
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
{
- for (int i = 0; i < theWorkerCount; ++i)
- delete biQueues[i];
+ return oneToOneQueue(theLocalGroup, theLocalProcessId,
+ remoteGroup(), remoteProcessId);
}
-bool FewToOneBiQueue::validWorkerId(const int workerId) const
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
{
- return 0 <= workerId && workerId < theWorkerCount;
+ Must(validProcessId(group, processId));
+ return group == groupA ?
+ processId - metadata->theGroupAIdOffset :
+ metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
}
-void
-FewToOneBiQueue::clearReaderSignal(int workerId)
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
{
- debugs(54, 7, HERE << "reader: " << reader->id);
+ return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
+}
- assert(validWorkerId(workerId));
- reader->clearSignal();
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
+{
+ return readers->theReaders[readerIndex(remoteGroup(), processId)];
+}
- // we got a hint; we could reposition iteration to try popping from the
- // workerId queue first; but it does not seem to help much and might
- // introduce some bias so we do not do that for now:
- // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;
+int
+Ipc::FewToFewBiQueue::remotesCount() const
+{
+ return theLocalGroup == groupA ? metadata->theGroupBSize :
+ metadata->theGroupASize;
+}
+
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+ return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+ metadata->theGroupAIdOffset;
+}
+
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+ theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+ theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+ Must(theGroupASize > 0);
+ Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+{
+}
+
+Ipc::FewToFewBiQueue::Owner::~Owner()
+{
+ delete metadataOwner;
+ delete queuesOwner;
+ delete readersOwner;
+}
+
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+ return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
+}
+
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+ BaseMultiQueue(localProcessId),
+ metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+ queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+ readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
+{
+ Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+ Must(readers->theCapacity == metadata->theProcessCount);
+
+ debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+bool
+Ipc::MultiQueue::validProcessId(const int processId) const
+{
+ return metadata->theProcessIdOffset <= processId &&
+ processId < metadata->theProcessIdOffset + metadata->theProcessCount;
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
+{
+ assert(validProcessId(fromProcessId));
+ assert(validProcessId(toProcessId));
+ const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+ const int toIndex = toProcessId - metadata->theProcessIdOffset;
+ const int index = fromIndex * metadata->theProcessCount + toIndex;
+ return (*queues)[index];
}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
+{
+ assert(validProcessId(processId));
+ const int index = processId - metadata->theProcessIdOffset;
+ return readers->theReaders[index];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteProcessId, theLocalProcessId);
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalProcessId, remoteProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
+{
+ return reader(theLocalProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
+{
+ return reader(processId);
+}
+
+int
+Ipc::MultiQueue::remotesCount() const
+{
+ return metadata->theProcessCount;
+}
+
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+ return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+ theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+ Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
+{
+ delete metadataOwner;
+ delete queuesOwner;
+ delete readersOwner;
+}
+