*
*/
-#include "config.h"
+#include "squid.h"
#include "base/TextException.h"
#include "Debug.h"
#include "globals.h"
return id;
}
-
/* QueueReader */
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
+ rateLimit(0), balance(0)
{
debugs(54, 7, HERE << "constructed " << id);
}
Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
{
Must(theCapacity > 0);
- new (theReaders) QueueReader[theCapacity];
+ theReaders=new QueueReader[theCapacity];
+}
+
+Ipc::QueueReaders::~QueueReaders()
+{
+ delete[] theReaders;
}
size_t
return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
}
-
// OneToOneUniQueue
Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
- theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
- theCapacity(aCapacity)
+ theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+ theCapacity(aCapacity)
{
Must(theMaxItemSize > 0);
Must(theCapacity > 0);
return sizeof(OneToOneUniQueue) + maxItemSize * size;
}
-
/* OneToOneUniQueues */
Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
return *reinterpret_cast<const OneToOneUniQueue *>(queue);
}
-
// FewToFewBiQueue
Ipc::FewToFewBiQueue::Owner *
}
Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
- metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
- queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
- readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
- theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
- theLastPopProcessId(readers->theCapacity)
+ metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+ queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+ readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+ theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
+ theLastPopProcessId(readers->theCapacity)
{
Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
}
+int
+Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
+{
+ return capacity * groupASize * groupBSize * 2;
+}
+
bool
Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
{
switch (group) {
case groupA:
return metadata->theGroupAIdOffset <= processId &&
- processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+ processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
case groupB:
return metadata->theGroupBIdOffset <= processId &&
- processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
+ processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
}
return false;
}
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+int
+Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
Must(fromGroup != toGroup);
- Must(validProcessId(fromGroup, fromProcessId));
- Must(validProcessId(toGroup, toProcessId));
+ assert(validProcessId(fromGroup, fromProcessId));
+ assert(validProcessId(toGroup, toProcessId));
int index1;
int index2;
int offset;
offset = metadata->theGroupASize * metadata->theGroupBSize;
}
const int index = offset + index1 * metadata->theGroupBSize + index2;
- return (*queues)[index];
+ return index;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+{
+ return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
+{
+ return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
+}
+
+/// incoming queue from a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteGroup(), remoteProcessId,
+ theLocalGroup, theLocalProcessId);
+}
+
+/// outgoing queue to a given remote process
+const Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalGroup, theLocalProcessId,
+ remoteGroup(), remoteProcessId);
+}
+
+int
+Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
+{
+ Must(validProcessId(group, processId));
+ return group == groupA ?
+ processId - metadata->theGroupAIdOffset :
+ metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
}
Ipc::QueueReader &
Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
{
- Must(validProcessId(group, processId));
- const int index = group == groupA ?
- processId - metadata->theGroupAIdOffset :
- metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
- return readers->theReaders[index];
+ return readers->theReaders[readerIndex(group, processId)];
+}
+
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+{
+ return readers->theReaders[readerIndex(group, processId)];
}
void
// theLastPopProcessId = remoteProcessId;
}
+Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::localBalance()
+{
+ QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+ return r.balance;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+{
+ const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+ return r.balance;
+}
+
+Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::localRateLimit()
+{
+ QueueReader &r = reader(theLocalGroup, theLocalProcessId);
+ return r.rateLimit;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+{
+ const QueueReader &r = reader(remoteGroup(), remoteProcessId);
+ return r.rateLimit;
+}
+
Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
- theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
- theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+ theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+ theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
{
Must(theGroupASize > 0);
Must(theGroupBSize > 0);
}
Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
- metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
- queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
- readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
{
}