/*
- * $Id$
- *
- * DEBUG: section 54 Interprocess Communication
+ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
-#include "config.h"
+/* DEBUG: section 54 Interprocess Communication */
+
+#include "squid.h"
#include "base/TextException.h"
#include "Debug.h"
#include "globals.h"
#include "ipc/Queue.h"
+#include <limits>
+
/// constructs Metadata ID from parent queue ID
static String
MetadataId(String id)
return id;
}
-
/* QueueReader */
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
+ rateLimit(0), balance(0)
{
debugs(54, 7, HERE << "constructed " << id);
}
/* QueueReaders */
-Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
+Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
+ theReaders(theCapacity)
{
Must(theCapacity > 0);
- new (theReaders) QueueReader[theCapacity];
}
size_t
return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
}
-
// OneToOneUniQueue
Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
return sizeof(OneToOneUniQueue) + maxItemSize * size;
}
+/// start state reporting (by reporting queue parameters)
+/// The labels reflect whether the caller owns theIn or theOut data member and,
+/// hence, cannot report the other value reliably.
+void
+Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
+{
+ os << "{ size: " << count <<
+ ", capacity: " << theCapacity <<
+ ", " << inLabel << ": " << theIn <<
+ ", " << outLabel << ": " << theOut;
+}
+
+/// end state reporting started by statOpen()
+void
+Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
+{
+ os << "}\n";
+}
/* OneToOneUniQueues */
return *reinterpret_cast<const OneToOneUniQueue *>(queue);
}
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+ theLocalProcessId(aLocalProcessId),
+ theLastPopProcessId(std::numeric_limits<int>::max() - 1)
+{
+}
+
+void
+Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
+{
+ QueueReader &reader = localReader();
+ debugs(54, 7, "reader: " << reader.id);
+
+ reader.clearSignal();
+
+ // we got a hint; we could reposition iteration to try popping from the
+ // remoteProcessId queue first; but it does not seem to help much and might
+ // introduce some bias so we do not do that for now:
+ // theLastPopProcessId = remoteProcessId;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+ const QueueReader &r = remoteReader(remoteProcessId);
+ return r.balance;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+ const QueueReader &r = remoteReader(remoteProcessId);
+ return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
+{
+ const OneToOneUniQueue &queue =
+ const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+ return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
+{
+ const OneToOneUniQueue &queue =
+ const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+ return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader()
+{
+ const QueueReader &reader =
+ const_cast<const BaseMultiQueue *>(this)->localReader();
+ return const_cast<QueueReader &>(reader);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
+{
+ const QueueReader &reader =
+ const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+ return const_cast<QueueReader &>(reader);
+}
// FewToFewBiQueue
}
Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+ BaseMultiQueue(aLocalProcessId),
metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
- theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
- theLastPopProcessId(readers->theCapacity)
+ theLocalGroup(aLocalGroup)
{
Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
- const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
- debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
+ debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+ return capacity * groupASize * groupBSize * 2;
}
bool
switch (group) {
case groupA:
return metadata->theGroupAIdOffset <= processId &&
- processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+ processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
case groupB:
return metadata->theGroupBIdOffset <= processId &&
- processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
+ processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
}
return false;
}
return index;
}
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
const Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
{
- return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+ return oneToOneQueue(remoteGroup(), remoteProcessId,
+ theLocalGroup, theLocalProcessId);
}
-Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalGroup, theLocalProcessId,
+ remoteGroup(), remoteProcessId);
+}
+
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
{
Must(validProcessId(group, processId));
- const int index = group == groupA ?
- processId - metadata->theGroupAIdOffset :
- metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
- return readers->theReaders[index];
+ return group == groupA ?
+ processId - metadata->theGroupAIdOffset :
+ metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
}
-void
-Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
+{
+ return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
{
- QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
- debugs(54, 7, HERE << "reader: " << localReader.id);
+ return readers->theReaders[readerIndex(remoteGroup(), processId)];
+}
- Must(validProcessId(remoteGroup(), remoteProcessId));
- localReader.clearSignal();
+int
+Ipc::FewToFewBiQueue::remotesCount() const
+{
+ return theLocalGroup == groupA ? metadata->theGroupBSize :
+ metadata->theGroupASize;
+}
- // we got a hint; we could reposition iteration to try popping from the
- // remoteProcessId queue first; but it does not seem to help much and might
- // introduce some bias so we do not do that for now:
- // theLastPopProcessId = remoteProcessId;
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+ return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+ metadata->theGroupAIdOffset;
}
Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
delete queuesOwner;
delete readersOwner;
}
+
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
+{
+ return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
+}
+
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+ BaseMultiQueue(localProcessId),
+ metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+ queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+ readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
+{
+ Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+ Must(readers->theCapacity == metadata->theProcessCount);
+
+ debugs(54, 7, "queue " << id << " reader: " << localReader().id);
+}
+
+bool
+Ipc::MultiQueue::validProcessId(const int processId) const
+{
+ return metadata->theProcessIdOffset <= processId &&
+ processId < metadata->theProcessIdOffset + metadata->theProcessCount;
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
+{
+ assert(validProcessId(fromProcessId));
+ assert(validProcessId(toProcessId));
+ const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+ const int toIndex = toProcessId - metadata->theProcessIdOffset;
+ const int index = fromIndex * metadata->theProcessCount + toIndex;
+ return (*queues)[index];
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
+{
+ assert(validProcessId(processId));
+ const int index = processId - metadata->theProcessIdOffset;
+ return readers->theReaders[index];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteProcessId, theLocalProcessId);
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalProcessId, remoteProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
+{
+ return reader(theLocalProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
+{
+ return reader(processId);
+}
+
+int
+Ipc::MultiQueue::remotesCount() const
+{
+ return metadata->theProcessCount;
+}
+
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+ return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+ theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+ Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
+{
+ delete metadataOwner;
+ delete queuesOwner;
+ delete readersOwner;
+}
+