the new MultiQueue class.
Added MultiQueue, a lockless fixed-capacity bidirectional queue for a limited
number processes. Any process may send data to and receive from any other
process (including itself). Used for collapsed forwarding notifications.
Added CollapsedForwarding class to send and handle received collapsed
forwarding notifications using MultiQueue.
Write partial Rock pages to disk in order to propagate data from the hit
writer to the collapsed hit readers. Send collapsed forwarding notification
after data was written to disk.
Missing code to share locked StoreMap entries, kick collapsed hit readers, and
to disable notifications in no-daemon mode.
--- /dev/null
+/*
+ * DEBUG: section 17 Request Forwarding
+ *
+ */
+
+#include "squid.h"
+#include "ipc/mem/Segment.h"
+#include "ipc/Messages.h"
+#include "ipc/Port.h"
+#include "ipc/TypedMsgHdr.h"
+#include "CollapsedForwarding.h"
+#include "SquidConfig.h"
+#include "globals.h"
+#include "tools.h"
+
+/// shared memory segment path to use for CollapsedForwarding queue
+static const char *const ShmLabel = "cf";
+/// a single worker-to-worker queue capacity
+// TODO: make configurable or compute from squid.conf settings if possible
+static const int QueueCapacity = 1024;
+
+std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
+
+/// IPC queue message
+class CollapsedForwardingMsg
+{
+public:
+ CollapsedForwardingMsg(): processId(-1) {}
+
+public:
+ int processId; /// ID of sending process
+ // XXX: add entry info
+};
+
+// CollapsedForwarding
+
+void
+CollapsedForwarding::Init()
+{
+ Must(!queue.get());
+ queue.reset(new Queue(ShmLabel, KidIdentifier));
+}
+
+void
+CollapsedForwarding::NewData(const StoreIOState &sio)
+{
+ CollapsedForwardingMsg msg;
+ msg.processId = KidIdentifier;
+ // XXX: copy data from sio
+
+ // TODO: send only to workers who are waiting for data
+ // XXX: does not work for non-daemon mode?
+ for (int workerId = 1; workerId <= Config.workers; ++workerId) {
+ try {
+ if (queue->push(workerId, msg))
+ Notify(workerId);
+ } catch (const Queue::Full &) {
+ debugs(17, DBG_IMPORTANT, "Worker collapsed forwarding push queue "
+ "overflow: " << workerId); // TODO: report queue len
+ // TODO: grow queue size
+ }
+ }
+}
+
+void
+CollapsedForwarding::Notify(const int workerId)
+{
+ // TODO: Count and report the total number of notifications, pops, pushes.
+ debugs(17, 7, HERE << "kid" << workerId);
+ Ipc::TypedMsgHdr msg;
+ // TODO: add proper message type?
+ msg.setType(Ipc::mtCollapsedForwardingNotification);
+ msg.putInt(KidIdentifier);
+ const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId);
+ Ipc::SendMessage(addr, msg);
+}
+
+void
+CollapsedForwarding::HandleNewData(const char *const when)
+{
+ debugs(17, 4, HERE << "popping all " << when);
+ CollapsedForwardingMsg msg;
+ int workerId;
+ int poppedCount = 0;
+ while (queue->pop(workerId, msg)) {
+ debugs(17, 3, HERE << "collapsed forwarding data message from " <<
+ workerId);
+ if (workerId != msg.processId) {
+ debugs(17, DBG_IMPORTANT, HERE << "mismatching IDs: " << workerId <<
+ " != " << msg.processId);
+ }
+
+ // XXX: stop and schedule an async call to continue
+ assert(++poppedCount < SQUID_MAXFD);
+ }
+}
+
+void
+CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
+{
+ const int from = msg.getInt();
+ debugs(17, 7, HERE << "from " << from);
+ queue->clearReaderSignal(from);
+ HandleNewData("after notification");
+}
+
+/// initializes shared queue used by CollapsedForwarding
+class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
+{
+public:
+ /* RegisteredRunner API */
+ CollapsedForwardingRr(): owner(NULL) {}
+ virtual ~CollapsedForwardingRr();
+
+protected:
+ virtual void create(const RunnerRegistry &);
+ virtual void open(const RunnerRegistry &);
+
+private:
+ Ipc::MultiQueue::Owner *owner;
+};
+
+RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr);
+
+void CollapsedForwardingRr::create(const RunnerRegistry &)
+{
+ Must(!owner);
+ owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
+ sizeof(CollapsedForwardingMsg),
+ QueueCapacity);
+}
+
+void CollapsedForwardingRr::open(const RunnerRegistry &)
+{
+ if (IamWorkerProcess())
+ CollapsedForwarding::Init();
+}
+
+CollapsedForwardingRr::~CollapsedForwardingRr()
+{
+ delete owner;
+}
--- /dev/null
+/*
+ * DEBUG: section 17 Request Forwarding
+ *
+ */
+
+#ifndef SQUID_COLLAPSED_FORWARDING_H
+#define SQUID_COLLAPSED_FORWARDING_H
+
+#include "ipc/Queue.h"
+#include "ipc/forward.h"
+
+#include <memory>
+
+class StoreIOState;
+
+/// Sends and handles collapsed forwarding notifications.
+class CollapsedForwarding
+{
+public:
+ /// open shared memory segment
+ static void Init();
+
+ /// notify other workers that new data is available
+ static void NewData(const StoreIOState &sio);
+
+ /// kick worker with empty IPC queue
+ static void Notify(const int workerId);
+
+ /// handle new data messages in IPC queue
+ static void HandleNewData(const char *const when);
+
+ /// handle queue push notifications from worker or disker
+ static void HandleNotification(const Ipc::TypedMsgHdr &msg);
+
+private:
+ typedef Ipc::MultiQueue Queue;
+ static std::auto_ptr<Queue> queue; ///< IPC queue
+};
+
+#endif /* SQUID_COLLAPSED_FORWARDING_H */
ClientRequestContext.h \
clientStream.cc \
clientStream.h \
+ CollapsedForwarding.cc \
+ CollapsedForwarding.h \
CompletionDispatcher.cc \
CompletionDispatcher.h \
CommRead.h \
writeToDisk(sidNext);
}
}
+
+ // XXX: check that there are workers waiting for data, i.e. readers > 0
+ writeBufToDisk();
}
/// Buffers incoming data for the current slot.
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
- const uint64_t diskOffset = dir->diskOffset(sidCurrent);
- debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
- theBuf.size);
-
// finalize map slice
Ipc::StoreMap::Slice &slice =
dir->map->writeableSlice(swap_filen, sidCurrent);
// copy finalized db cell header into buffer
memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
+ writeBufToDisk(sidNext < 0);
+ theBuf.clear();
+
+ sidCurrent = sidNext;
+}
+
+void
+Rock::IoState::writeBufToDisk(const bool last)
+{
// and now allocate another buffer for the WriteRequest so that
// we can support concurrent WriteRequests (and to ease cleaning)
// TODO: should we limit the number of outstanding requests?
void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
memcpy(wBuf, theBuf.mem, theBuf.size);
+ const uint64_t diskOffset = dir->diskOffset(sidCurrent);
+ debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
+ theBuf.size);
+
WriteRequest *const r = new WriteRequest(
::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
- memFreeBufFunc(wBufCap)), this, sidNext < 0);
- theBuf.clear();
-
- sidCurrent = sidNext;
+ memFreeBufFunc(wBufCap)), this, last);
// theFile->write may call writeCompleted immediatelly
theFile->write(r);
void tryWrite(char const *buf, size_t size, off_t offset);
size_t writeToBuffer(char const *buf, size_t size);
void writeToDisk(const SlotId nextSlot);
+ void writeBufToDisk(const bool last = false);
SlotId reserveSlotForWriting();
void callBack(int errflag);
#include "squid.h"
#include "cache_cf.h"
+#include "CollapsedForwarding.h"
#include "ConfigOption.h"
#include "DiskIO/DiskIOModule.h"
#include "DiskIO/DiskIOStrategy.h"
return;
}
+ // XXX: check that there are workers waiting for data, i.e. readers > 0
+ CollapsedForwarding::NewData(sio);
+
if (errflag == DISK_OK) {
// do not increment sio.offset_ because we do it in sio->write()
if (request->isLast) {
typedef enum { mtNone = 0, mtRegistration,
mtStrandSearchRequest, mtStrandSearchResponse,
mtSharedListenRequest, mtSharedListenResponse,
- mtIpcIoNotification,
+ mtIpcIoNotification, mtCollapsedForwardingNotification,
mtCacheMgrRequest, mtCacheMgrResponse
#if SQUID_SNMP
,
#include "globals.h"
#include "ipc/Queue.h"
+#include <limits>
+
/// constructs Metadata ID from parent queue ID
static String
MetadataId(String id)
return *reinterpret_cast<const OneToOneUniQueue *>(queue);
}
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+ theLocalProcessId(aLocalProcessId),
+ theLastPopProcessId(std::numeric_limits<int>::max() - 1)
+{
+}
+
+void
+Ipc::BaseMultiQueue::clearReaderSignal(const int remoteProcessId)
+{
+ QueueReader &reader = localReader();
+ debugs(54, 7, HERE << "reader: " << reader.id);
+
+ reader.clearSignal();
+
+ // we got a hint; we could reposition iteration to try popping from the
+ // remoteProcessId queue first; but it does not seem to help much and might
+ // introduce some bias so we do not do that for now:
+ // theLastPopProcessId = remoteProcessId;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+ const QueueReader &r = remoteReader(remoteProcessId);
+ return r.balance;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+ const QueueReader &r = remoteReader(remoteProcessId);
+ return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId) {
+ const OneToOneUniQueue &queue =
+ const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+ return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId) {
+ const OneToOneUniQueue &queue =
+ const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+ return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader() {
+ const QueueReader &reader =
+ const_cast<const BaseMultiQueue *>(this)->localReader();
+ return const_cast<QueueReader &>(reader);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId) {
+ const QueueReader &reader =
+ const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+ return const_cast<QueueReader &>(reader);
+}
+
// FewToFewBiQueue
Ipc::FewToFewBiQueue::Owner *
}
Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+ BaseMultiQueue(aLocalProcessId),
metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
- theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
- theLastPopProcessId(readers->theCapacity)
+ theLocalGroup(aLocalGroup)
{
Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
- const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
- debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
+ debugs(54, 7, HERE << "queue " << id << " reader: " << localReader().id);
}
int
return index;
}
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
-{
- return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
-}
-
const Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
{
return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
}
-/// incoming queue from a given remote process
const Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
{
theLocalGroup, theLocalProcessId);
}
-/// outgoing queue to a given remote process
const Ipc::OneToOneUniQueue &
Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
{
metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
}
-Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
{
- return readers->theReaders[readerIndex(group, processId)];
+ return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
}
const Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
{
- return readers->theReaders[readerIndex(group, processId)];
+ return readers->theReaders[readerIndex(remoteGroup(), processId)];
}
-void
-Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
+int
+Ipc::FewToFewBiQueue::remotesCount() const
{
- QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
- debugs(54, 7, HERE << "reader: " << localReader.id);
+ return theLocalGroup == groupA ? metadata->theGroupBSize :
+ metadata->theGroupASize;
+}
- Must(validProcessId(remoteGroup(), remoteProcessId));
- localReader.clearSignal();
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+ return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+ metadata->theGroupAIdOffset;
+}
- // we got a hint; we could reposition iteration to try popping from the
- // remoteProcessId queue first; but it does not seem to help much and might
- // introduce some bias so we do not do that for now:
- // theLastPopProcessId = remoteProcessId;
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+ theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+ theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+ Must(theGroupASize > 0);
+ Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+{
}
-Ipc::QueueReader::Balance &
-Ipc::FewToFewBiQueue::localBalance()
+Ipc::FewToFewBiQueue::Owner::~Owner()
{
- QueueReader &r = reader(theLocalGroup, theLocalProcessId);
- return r.balance;
+ delete metadataOwner;
+ delete queuesOwner;
+ delete readersOwner;
}
-const Ipc::QueueReader::Balance &
-Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
{
- const QueueReader &r = reader(remoteGroup(), remoteProcessId);
- return r.balance;
+ return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
}
-Ipc::QueueReader::Rate &
-Ipc::FewToFewBiQueue::localRateLimit()
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+ BaseMultiQueue(localProcessId),
+ metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+ queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+ readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
{
- QueueReader &r = reader(theLocalGroup, theLocalProcessId);
- return r.rateLimit;
+ Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+ Must(readers->theCapacity == metadata->theProcessCount);
+
+ debugs(54, 7, HERE << "queue " << id << " reader: " << localReader().id);
}
-const Ipc::QueueReader::Rate &
-Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+bool
+Ipc::MultiQueue::validProcessId(const int processId) const
{
- const QueueReader &r = reader(remoteGroup(), remoteProcessId);
- return r.rateLimit;
+ return metadata->theProcessIdOffset <= processId &&
+ processId < metadata->theProcessIdOffset + metadata->theProcessCount;
}
-Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
- theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
- theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
{
- Must(theGroupASize > 0);
- Must(theGroupBSize > 0);
+ assert(validProcessId(fromProcessId));
+ assert(validProcessId(toProcessId));
+ const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+ const int toIndex = toProcessId - metadata->theProcessIdOffset;
+ const int index = fromIndex * metadata->theProcessCount + toIndex;
+ return (*queues)[index];
}
-Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
- metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
- queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
- readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
{
+ assert(validProcessId(processId));
+ const int index = processId - metadata->theProcessIdOffset;
+ return readers->theReaders[index];
}
-Ipc::FewToFewBiQueue::Owner::~Owner()
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(remoteProcessId, theLocalProcessId);
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
+{
+ return oneToOneQueue(theLocalProcessId, remoteProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
+{
+ return reader(theLocalProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
+{
+ return reader(processId);
+}
+
+int
+Ipc::MultiQueue::remotesCount() const
+{
+ return metadata->theProcessCount;
+}
+
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+ return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+ theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+ Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
{
delete metadataOwner;
delete queuesOwner;
const int theCapacity; /// number of OneToOneUniQueues
};
+/**
+ * Base class for lockless fixed-capacity bidirectional queues for a
+ * limited number processes.
+ */
+class BaseMultiQueue
+{
+public:
+ BaseMultiQueue(const int aLocalProcessId);
+
+ /// clears the reader notification received by the local process from the remote process
+ void clearReaderSignal(const int remoteProcessId);
+
+ /// picks a process and calls OneToOneUniQueue::pop() using its queue
+ template <class Value> bool pop(int &remoteProcessId, Value &value);
+
+ /// calls OneToOneUniQueue::push() using the given process queue
+ template <class Value> bool push(const int remoteProcessId, const Value &value);
+
+ /// peeks at the item likely to be pop()ed next
+ template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+
+ /// returns local reader's balance
+ QueueReader::Balance &localBalance() { return localReader().balance; }
+
+ /// returns reader's balance for a given remote process
+ const QueueReader::Balance &balance(const int remoteProcessId) const;
+
+ /// returns local reader's rate limit
+ QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
+
+ /// returns reader's rate limit for a given remote process
+ const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+ /// number of items in incoming queue from a given remote process
+ int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+ /// number of items in outgoing queue to a given remote process
+ int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
+protected:
+ /// incoming queue from a given remote process
+ virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
+ OneToOneUniQueue &inQueue(const int remoteProcessId);
+
+ /// outgoing queue to a given remote process
+ virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
+ OneToOneUniQueue &outQueue(const int remoteProcessId);
+
+ virtual const QueueReader &localReader() const = 0;
+ QueueReader &localReader();
+
+ virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
+ QueueReader &remoteReader(const int remoteProcessId);
+
+ virtual int remotesCount() const = 0;
+ virtual int remotesIdOffset() const = 0;
+
+protected:
+ const int theLocalProcessId; ///< process ID of this queue
+
+private:
+ int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
+};
+
/**
* Lockless fixed-capacity bidirectional queue for a limited number
* processes. Allows communication between two groups of processes:
* communicate. Process in each group has a unique integer ID in
* [groupIdOffset, groupIdOffset + groupSize) range.
*/
-class FewToFewBiQueue
+class FewToFewBiQueue: public BaseMultiQueue
{
public:
typedef OneToOneUniQueue::Full Full;
/// maximum number of items in the queue
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
+ /// finds the oldest item in incoming and outgoing queues between
+ /// us and the given remote process
+ template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
+
+protected:
+ virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+ virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+ virtual const QueueReader &localReader() const;
+ virtual const QueueReader &remoteReader(const int processId) const;
+ virtual int remotesCount() const;
+ virtual int remotesIdOffset() const;
+
+private:
+ bool validProcessId(const Group group, const int processId) const;
+ int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+ const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+ int readerIndex(const Group group, const int processId) const;
Group localGroup() const { return theLocalGroup; }
Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
- /// clears the reader notification received by the local process from the remote process
- void clearReaderSignal(const int remoteProcessId);
-
- /// picks a process and calls OneToOneUniQueue::pop() using its queue
- template <class Value> bool pop(int &remoteProcessId, Value &value);
+private:
+ const Mem::Pointer<Metadata> metadata; ///< shared metadata
+ const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
+ const Mem::Pointer<QueueReaders> readers; ///< readers array
- /// calls OneToOneUniQueue::push() using the given process queue
- template <class Value> bool push(const int remoteProcessId, const Value &value);
+ const Group theLocalGroup; ///< group of this queue
+};
- /// finds the oldest item in incoming and outgoing queues between
- /// us and the given remote process
- template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
+/**
+ * Lockless fixed-capacity bidirectional queue for a limited number
+ * processes. Any process may send data to and receive from any other
+ * process (including itself). Each process has a unique integer ID in
+ * [processIdOffset, processIdOffset + processCount) range.
+ */
+class MultiQueue: public BaseMultiQueue
+{
+public:
+ typedef OneToOneUniQueue::Full Full;
+ typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
- /// peeks at the item likely to be pop()ed next
- template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+private:
+ /// Shared metadata for MultiQueue
+ struct Metadata {
+ Metadata(const int aProcessCount, const int aProcessIdOffset);
+ size_t sharedMemorySize() const { return sizeof(*this); }
+ static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
- /// returns local reader's balance
- QueueReader::Balance &localBalance();
+ const int theProcessCount;
+ const int theProcessIdOffset;
+ };
- /// returns reader's balance for a given remote process
- const QueueReader::Balance &balance(const int remoteProcessId) const;
+public:
+ class Owner
+ {
+ public:
+ Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
+ ~Owner();
- /// returns local reader's rate limit
- QueueReader::Rate &localRateLimit();
+ private:
+ Mem::Owner<Metadata> *const metadataOwner;
+ Mem::Owner<OneToOneUniQueues> *const queuesOwner;
+ Mem::Owner<QueueReaders> *const readersOwner;
+ };
- /// returns reader's rate limit for a given remote process
- const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+ static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
- /// number of items in incoming queue from a given remote process
- int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+ MultiQueue(const String &id, const int localProcessId);
- /// number of items in outgoing queue to a given remote process
- int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+protected:
+ virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+ virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+ virtual const QueueReader &localReader() const;
+ virtual const QueueReader &remoteReader(const int remoteProcessId) const;
+ virtual int remotesCount() const;
+ virtual int remotesIdOffset() const;
private:
- bool validProcessId(const Group group, const int processId) const;
- int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
- const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
- OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
- const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
- const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
- QueueReader &reader(const Group group, const int processId);
- const QueueReader &reader(const Group group, const int processId) const;
- int readerIndex(const Group group, const int processId) const;
- int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
- int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
+ bool validProcessId(const int processId) const;
+ const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
+ const QueueReader &reader(const int processId) const;
private:
const Mem::Pointer<Metadata> metadata; ///< shared metadata
const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
const Mem::Pointer<QueueReaders> readers; ///< readers array
-
- const Group theLocalGroup; ///< group of this queue
- const int theLocalProcessId; ///< process ID of this queue
- int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
};
// OneToOneUniQueue
return *reinterpret_cast<const OneToOneUniQueue *>(queue);
}
-// FewToFewBiQueue
+// BaseMultiQueue
template <class Value>
bool
-FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
+BaseMultiQueue::pop(int &remoteProcessId, Value &value)
{
- // iterate all remote group processes, starting after the one we visited last
- QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
- for (int i = 0; i < remoteGroupSize(); ++i) {
- if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
- theLastPopProcessId = remoteGroupIdOffset();
- OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
- if (queue.pop(value, &localReader)) {
+ // iterate all remote processes, starting after the one we visited last
+ for (int i = 0; i < remotesCount(); ++i) {
+ if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
+ theLastPopProcessId = remotesIdOffset();
+ OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
+ if (queue.pop(value, &localReader())) {
remoteProcessId = theLastPopProcessId;
debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
return true;
template <class Value>
bool
-FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
+BaseMultiQueue::push(const int remoteProcessId, const Value &value)
{
- OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
- QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
+ OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
+ QueueReader &reader = remoteReader(remoteProcessId);
debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
- return remoteQueue.push(value, &remoteReader);
+ return remoteQueue.push(value, &reader);
}
+template <class Value>
+bool
+BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
+{
+ // mimic FewToFewBiQueue::pop() but quit just before popping
+ int popProcessId = theLastPopProcessId; // preserve for future pop()
+ for (int i = 0; i < remotesCount(); ++i) {
+ if (++popProcessId >= remotesIdOffset() + remotesCount())
+ popProcessId = remotesIdOffset();
+ const OneToOneUniQueue &queue = inQueue(popProcessId);
+ if (queue.peek(value)) {
+ remoteProcessId = popProcessId;
+ return true;
+ }
+ }
+ return false; // most likely, no process had anything to pop
+}
+
+// FewToFewBiQueue
+
template <class Value>
bool
FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
return out.peek(value);
}
-template <class Value>
-bool
-FewToFewBiQueue::peek(int &remoteProcessId, Value &value) const
-{
- // mimic FewToFewBiQueue::pop() but quit just before popping
- int popProcessId = theLastPopProcessId; // preserve for future pop()
- for (int i = 0; i < remoteGroupSize(); ++i) {
- if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
- popProcessId = remoteGroupIdOffset();
- const OneToOneUniQueue &queue =
- oneToOneQueue(remoteGroup(), popProcessId,
- theLocalGroup, theLocalProcessId);
- if (queue.peek(value)) {
- remoteProcessId = popProcessId;
- return true;
- }
- }
- return false; // most likely, no process had anything to pop
-}
-
} // namespace Ipc
#endif // SQUID_IPC_QUEUE_H
#include "mgr/Forwarder.h"
#include "SwapDir.h" /* XXX: scope boundary violation */
#include "CacheManager.h"
+#include "CollapsedForwarding.h"
#if USE_DISKIO_IPCIO
#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
#endif
}
break;
+ case mtCollapsedForwardingNotification:
+ CollapsedForwarding::HandleNotification(message);
+ break;
+
#if SQUID_SNMP
case mtSnmpRequest: {
const Snmp::Request req(message);