]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Added BaseMultiQueue class, a common base of the old FewToFewBiQueue class and
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 29 May 2013 16:04:40 +0000 (10:04 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Wed, 29 May 2013 16:04:40 +0000 (10:04 -0600)
the new MultiQueue class.

Added MultiQueue, a lockless fixed-capacity bidirectional queue for a limited
number processes. Any process may send data to and receive from any other
process (including itself). Used for collapsed forwarding notifications.

Added CollapsedForwarding class to send and handle received collapsed
forwarding notifications using MultiQueue.

Write partial Rock pages to disk in order to propagate data from the hit
writer to the collapsed hit readers. Send collapsed forwarding notification
after data was written to disk.

Missing code to share locked StoreMap entries, kick collapsed hit readers, and
to disable notifications in no-daemon mode.

src/CollapsedForwarding.cc [new file with mode: 0644]
src/CollapsedForwarding.h [new file with mode: 0644]
src/Makefile.am
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockSwapDir.cc
src/ipc/Messages.h
src/ipc/Queue.cc
src/ipc/Queue.h
src/ipc/Strand.cc

diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc
new file mode 100644 (file)
index 0000000..657e6d6
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * DEBUG: section 17    Request Forwarding
+ *
+ */
+
+#include "squid.h"
+#include "ipc/mem/Segment.h"
+#include "ipc/Messages.h"
+#include "ipc/Port.h"
+#include "ipc/TypedMsgHdr.h"
+#include "CollapsedForwarding.h"
+#include "SquidConfig.h"
+#include "globals.h"
+#include "tools.h"
+
+/// shared memory segment path to use for CollapsedForwarding queue
+static const char *const ShmLabel = "cf";
+/// a single worker-to-worker queue capacity
+// TODO: make configurable or compute from squid.conf settings if possible
+static const int QueueCapacity = 1024;
+
+std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
+
+/// IPC queue message
+class CollapsedForwardingMsg
+{
+public:
+    CollapsedForwardingMsg(): processId(-1) {}
+
+public:
+    int processId; /// ID of sending process
+    // XXX: add entry info
+};
+
+// CollapsedForwarding
+
+void
+CollapsedForwarding::Init()
+{
+    Must(!queue.get());
+    queue.reset(new Queue(ShmLabel, KidIdentifier));
+}
+
+void
+CollapsedForwarding::NewData(const StoreIOState &sio)
+{
+    CollapsedForwardingMsg msg;
+    msg.processId = KidIdentifier;
+    // XXX: copy data from sio
+
+    // TODO: send only to workers who are waiting for data
+    // XXX: does not work for non-daemon mode?
+    for (int workerId = 1; workerId <= Config.workers; ++workerId) {
+        try {
+            if (queue->push(workerId, msg))
+                Notify(workerId);
+        } catch (const Queue::Full &) {
+            debugs(17, DBG_IMPORTANT, "Worker collapsed forwarding push queue "
+                   "overflow: " << workerId); // TODO: report queue len
+            // TODO: grow queue size
+        }
+    }
+}
+
+void
+CollapsedForwarding::Notify(const int workerId)
+{
+    // TODO: Count and report the total number of notifications, pops, pushes.
+    debugs(17, 7, HERE << "kid" << workerId);
+    Ipc::TypedMsgHdr msg;
+    // TODO: add proper message type?
+    msg.setType(Ipc::mtCollapsedForwardingNotification);
+    msg.putInt(KidIdentifier);
+    const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId);
+    Ipc::SendMessage(addr, msg);
+}
+
+void
+CollapsedForwarding::HandleNewData(const char *const when)
+{
+    debugs(17, 4, HERE << "popping all " << when);
+    CollapsedForwardingMsg msg;
+    int workerId;
+    int poppedCount = 0;
+    while (queue->pop(workerId, msg)) {
+        debugs(17, 3, HERE << "collapsed forwarding data message from " <<
+               workerId);
+        if (workerId != msg.processId) {
+            debugs(17, DBG_IMPORTANT, HERE << "mismatching IDs: " << workerId <<
+                   " != " << msg.processId);
+        }
+
+        // XXX: stop and schedule an async call to continue
+        assert(++poppedCount < SQUID_MAXFD);
+    }
+}
+
+void
+CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
+{
+    const int from = msg.getInt();
+    debugs(17, 7, HERE << "from " << from);
+    queue->clearReaderSignal(from);
+    HandleNewData("after notification");
+}
+
+/// initializes shared queue used by CollapsedForwarding
+class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
+{
+public:
+    /* RegisteredRunner API */
+    CollapsedForwardingRr(): owner(NULL) {}
+    virtual ~CollapsedForwardingRr();
+
+protected:
+    virtual void create(const RunnerRegistry &);
+    virtual void open(const RunnerRegistry &);
+
+private:
+    Ipc::MultiQueue::Owner *owner;
+};
+
+RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr);
+
+void CollapsedForwardingRr::create(const RunnerRegistry &)
+{
+    Must(!owner);
+    owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
+                                  sizeof(CollapsedForwardingMsg),
+                                  QueueCapacity);
+}
+
+void CollapsedForwardingRr::open(const RunnerRegistry &)
+{
+    if (IamWorkerProcess())
+        CollapsedForwarding::Init();
+}
+
+CollapsedForwardingRr::~CollapsedForwardingRr()
+{
+    delete owner;
+}
diff --git a/src/CollapsedForwarding.h b/src/CollapsedForwarding.h
new file mode 100644 (file)
index 0000000..e183802
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * DEBUG: section 17    Request Forwarding
+ *
+ */
+
+#ifndef SQUID_COLLAPSED_FORWARDING_H
+#define SQUID_COLLAPSED_FORWARDING_H
+
+#include "ipc/Queue.h"
+#include "ipc/forward.h"
+
+#include <memory>
+
+class StoreIOState;
+
+/// Sends and handles collapsed forwarding notifications.
+class CollapsedForwarding
+{
+public:
+    /// open shared memory segment
+    static void Init();
+
+    /// notify other workers that new data is available
+    static void NewData(const StoreIOState &sio);
+
+    /// kick worker with empty IPC queue
+    static void Notify(const int workerId);
+
+    /// handle new data messages in IPC queue
+    static void HandleNewData(const char *const when);
+
+    /// handle queue push notifications from worker or disker
+    static void HandleNotification(const Ipc::TypedMsgHdr &msg);
+
+private:
+    typedef Ipc::MultiQueue Queue;
+    static std::auto_ptr<Queue> queue; ///< IPC queue
+};
+
+#endif /* SQUID_COLLAPSED_FORWARDING_H */
index 22a24e6dd1cc8fc3f7b3bee843075bcb9aee2154..fae409a8d5e92623782776a5af76adc537a08e4d 100644 (file)
@@ -302,6 +302,8 @@ squid_SOURCES = \
        ClientRequestContext.h \
        clientStream.cc \
        clientStream.h \
+       CollapsedForwarding.cc \
+       CollapsedForwarding.h \
        CompletionDispatcher.cc \
        CompletionDispatcher.h \
        CommRead.h \
index 4eab56931c429346f773681b3b9fa2161f952262..f8e461eb832ecc85f8f90f96e01d5ccddb176986 100644 (file)
@@ -177,6 +177,9 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
             writeToDisk(sidNext);
         }
     }
+
+    // XXX: check that there are workers waiting for data, i.e. readers > 0
+    writeBufToDisk();
 }
 
 /// Buffers incoming data for the current slot.
@@ -215,10 +218,6 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
-    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
-    debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
-           theBuf.size);
-
     // finalize map slice
     Ipc::StoreMap::Slice &slice =
         dir->map->writeableSlice(swap_filen, sidCurrent);
@@ -237,6 +236,15 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     // copy finalized db cell header into buffer
     memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
 
+    writeBufToDisk(sidNext < 0);
+    theBuf.clear();
+
+    sidCurrent = sidNext;
+}
+
+void
+Rock::IoState::writeBufToDisk(const bool last)
+{
     // and now allocate another buffer for the WriteRequest so that
     // we can support concurrent WriteRequests (and to ease cleaning)
     // TODO: should we limit the number of outstanding requests?
@@ -244,12 +252,13 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
     memcpy(wBuf, theBuf.mem, theBuf.size);
 
+    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
+    debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
+           theBuf.size);
+
     WriteRequest *const r = new WriteRequest(
         ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
-            memFreeBufFunc(wBufCap)), this, sidNext < 0);
-    theBuf.clear();
-
-    sidCurrent = sidNext;
+            memFreeBufFunc(wBufCap)), this, last);
 
     // theFile->write may call writeCompleted immediatelly
     theFile->write(r);
index 8c4ff5313fb6f5887e9bdc1c706c020016d658f3..ee6f6a3dfc3a944732815c2ca15d59167339aa11 100644 (file)
@@ -50,6 +50,7 @@ private:
     void tryWrite(char const *buf, size_t size, off_t offset);
     size_t writeToBuffer(char const *buf, size_t size);
     void writeToDisk(const SlotId nextSlot);
+    void writeBufToDisk(const bool last = false);
     SlotId reserveSlotForWriting();
     
     void callBack(int errflag);
index a0c31516689164a847a6104462fd79b1a5c08f58..5b08b23839281ee330165eef36da75ce27c6f7e8 100644 (file)
@@ -4,6 +4,7 @@
 
 #include "squid.h"
 #include "cache_cf.h"
+#include "CollapsedForwarding.h"
 #include "ConfigOption.h"
 #include "DiskIO/DiskIOModule.h"
 #include "DiskIO/DiskIOStrategy.h"
@@ -747,6 +748,9 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
         return;
     }
 
+    // XXX: check that there are workers waiting for data, i.e. readers > 0
+    CollapsedForwarding::NewData(sio);
+
     if (errflag == DISK_OK) {
         // do not increment sio.offset_ because we do it in sio->write()
         if (request->isLast) {
index 05f8fd1ee5a05c9d511b4493d6c55ecac29f4069..14837ff81be86126ebad24e64fa030e043a855bc 100644 (file)
@@ -15,7 +15,7 @@ namespace Ipc
 typedef enum { mtNone = 0, mtRegistration,
                mtStrandSearchRequest, mtStrandSearchResponse,
                mtSharedListenRequest, mtSharedListenResponse,
-               mtIpcIoNotification,
+               mtIpcIoNotification, mtCollapsedForwardingNotification,
                mtCacheMgrRequest, mtCacheMgrResponse
 #if SQUID_SNMP
                ,
index 06e01c61b02776091155a573514bafc7de2e47cd..120cc5a8498a784bed905b479edd3d871d6ef255 100644 (file)
@@ -9,6 +9,8 @@
 #include "globals.h"
 #include "ipc/Queue.h"
 
+#include <limits>
+
 /// constructs Metadata ID from parent queue ID
 static String
 MetadataId(String id)
@@ -121,6 +123,70 @@ Ipc::OneToOneUniQueues::operator [](const int index) const
     return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
+// BaseMultiQueue
+
+Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
+        theLocalProcessId(aLocalProcessId),
+        theLastPopProcessId(std::numeric_limits<int>::max() - 1)
+{
+}
+
+void
+Ipc::BaseMultiQueue::clearReaderSignal(const int remoteProcessId)
+{
+    QueueReader &reader = localReader();
+    debugs(54, 7, HERE << "reader: " << reader.id);
+
+    reader.clearSignal();
+
+    // we got a hint; we could reposition iteration to try popping from the
+    // remoteProcessId queue first; but it does not seem to help much and might
+    // introduce some bias so we do not do that for now:
+    // theLastPopProcessId = remoteProcessId;
+}
+
+const Ipc::QueueReader::Balance &
+Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.balance;
+}
+
+const Ipc::QueueReader::Rate &
+Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
+{
+    const QueueReader &r = remoteReader(remoteProcessId);
+    return r.rateLimit;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::inQueue(const int remoteProcessId) {
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::BaseMultiQueue::outQueue(const int remoteProcessId) {
+    const OneToOneUniQueue &queue =
+        const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
+    return const_cast<OneToOneUniQueue &>(queue);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::localReader() {
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->localReader();
+    return const_cast<QueueReader &>(reader);
+}
+
+Ipc::QueueReader &
+Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId) {
+    const QueueReader &reader =
+        const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
+    return const_cast<QueueReader &>(reader);
+}
+
 // FewToFewBiQueue
 
 Ipc::FewToFewBiQueue::Owner *
@@ -130,17 +196,16 @@ Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int gro
 }
 
 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+        BaseMultiQueue(aLocalProcessId),
         metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
         queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
         readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
-        theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
-        theLastPopProcessId(readers->theCapacity)
+        theLocalGroup(aLocalGroup)
 {
     Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
     Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
 
-    const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
-    debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
+    debugs(54, 7, HERE << "queue " << id << " reader: " << localReader().id);
 }
 
 int
@@ -185,19 +250,12 @@ Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromPr
     return index;
 }
 
-Ipc::OneToOneUniQueue &
-Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
-{
-    return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
-}
-
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
 {
     return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
 }
 
-/// incoming queue from a given remote process
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
 {
@@ -205,7 +263,6 @@ Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
                          theLocalGroup, theLocalProcessId);
 }
 
-/// outgoing queue to a given remote process
 const Ipc::OneToOneUniQueue &
 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
 {
@@ -222,77 +279,150 @@ Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
            metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
 }
 
-Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
+const Ipc::QueueReader &
+Ipc::FewToFewBiQueue::localReader() const
 {
-    return readers->theReaders[readerIndex(group, processId)];
+    return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
 }
 
 const Ipc::QueueReader &
-Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
+Ipc::FewToFewBiQueue::remoteReader(const int processId) const
 {
-    return readers->theReaders[readerIndex(group, processId)];
+    return readers->theReaders[readerIndex(remoteGroup(), processId)];
 }
 
-void
-Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
+int
+Ipc::FewToFewBiQueue::remotesCount() const
 {
-    QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
-    debugs(54, 7, HERE << "reader: " << localReader.id);
+    return theLocalGroup == groupA ? metadata->theGroupBSize :
+        metadata->theGroupASize;
+}
 
-    Must(validProcessId(remoteGroup(), remoteProcessId));
-    localReader.clearSignal();
+int
+Ipc::FewToFewBiQueue::remotesIdOffset() const
+{
+    return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
+        metadata->theGroupAIdOffset;
+}
 
-    // we got a hint; we could reposition iteration to try popping from the
-    // remoteProcessId queue first; but it does not seem to help much and might
-    // introduce some bias so we do not do that for now:
-    // theLastPopProcessId = remoteProcessId;
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+        theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+        theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+    Must(theGroupASize > 0);
+    Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+        metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+        queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+        readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+{
 }
 
-Ipc::QueueReader::Balance &
-Ipc::FewToFewBiQueue::localBalance()
+Ipc::FewToFewBiQueue::Owner::~Owner()
 {
-    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
-    return r.balance;
+    delete metadataOwner;
+    delete queuesOwner;
+    delete readersOwner;
 }
 
-const Ipc::QueueReader::Balance &
-Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
+// MultiQueue
+
+Ipc::MultiQueue::Owner *
+Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
 {
-    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
-    return r.balance;
+    return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
 }
 
-Ipc::QueueReader::Rate &
-Ipc::FewToFewBiQueue::localRateLimit()
+Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
+        BaseMultiQueue(localProcessId),
+        metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+        queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+        readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
 {
-    QueueReader &r = reader(theLocalGroup, theLocalProcessId);
-    return r.rateLimit;
+    Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
+    Must(readers->theCapacity == metadata->theProcessCount);
+
+    debugs(54, 7, HERE << "queue " << id << " reader: " << localReader().id);
 }
 
-const Ipc::QueueReader::Rate &
-Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
+bool
+Ipc::MultiQueue::validProcessId(const int processId) const
 {
-    const QueueReader &r = reader(remoteGroup(), remoteProcessId);
-    return r.rateLimit;
+    return metadata->theProcessIdOffset <= processId &&
+        processId < metadata->theProcessIdOffset + metadata->theProcessCount;
 }
 
-Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
-        theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
-        theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
 {
-    Must(theGroupASize > 0);
-    Must(theGroupBSize > 0);
+    assert(validProcessId(fromProcessId));
+    assert(validProcessId(toProcessId));
+    const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
+    const int toIndex = toProcessId - metadata->theProcessIdOffset;
+    const int index = fromIndex * metadata->theProcessCount + toIndex;
+    return (*queues)[index];
 }
 
-Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
-        metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
-        queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
-        readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
+const Ipc::QueueReader &
+Ipc::MultiQueue::reader(const int processId) const
 {
+    assert(validProcessId(processId));
+    const int index = processId - metadata->theProcessIdOffset;
+    return readers->theReaders[index];
 }
 
-Ipc::FewToFewBiQueue::Owner::~Owner()
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::inQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(remoteProcessId, theLocalProcessId);
+}
+
+const Ipc::OneToOneUniQueue &
+Ipc::MultiQueue::outQueue(const int remoteProcessId) const
+{
+    return oneToOneQueue(theLocalProcessId, remoteProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::localReader() const
+{
+    return reader(theLocalProcessId);
+}
+
+const Ipc::QueueReader &
+Ipc::MultiQueue::remoteReader(const int processId) const
+{
+    return reader(processId);
+}
+
+int
+Ipc::MultiQueue::remotesCount() const
+{
+    return metadata->theProcessCount;
+}
+
+int
+Ipc::MultiQueue::remotesIdOffset() const
+{
+    return metadata->theProcessIdOffset;
+}
+
+Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
+        theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
+{
+    Must(theProcessCount > 0);
+}
+
+Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
+        metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
+        queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
+        readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
+{
+}
+
+Ipc::MultiQueue::Owner::~Owner()
 {
     delete metadataOwner;
     delete queuesOwner;
index 8311a13a3550598fe6195e8cc93d2986de5fcf17..47e0de618527dc919c0b4a8023c34baad87445f4 100644 (file)
@@ -140,6 +140,70 @@ public:
     const int theCapacity; /// number of OneToOneUniQueues
 };
 
+/**
+ * Base class for lockless fixed-capacity bidirectional queues for a
+ * limited number processes.
+ */
+class BaseMultiQueue
+{
+public:
+    BaseMultiQueue(const int aLocalProcessId);
+
+    /// clears the reader notification received by the local process from the remote process
+    void clearReaderSignal(const int remoteProcessId);
+
+    /// picks a process and calls OneToOneUniQueue::pop() using its queue
+    template <class Value> bool pop(int &remoteProcessId, Value &value);
+
+    /// calls OneToOneUniQueue::push() using the given process queue
+    template <class Value> bool push(const int remoteProcessId, const Value &value);
+
+    /// peeks at the item likely to be pop()ed next
+    template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+
+    /// returns local reader's balance
+    QueueReader::Balance &localBalance() { return localReader().balance; }
+
+    /// returns reader's balance for a given remote process
+    const QueueReader::Balance &balance(const int remoteProcessId) const;
+
+    /// returns local reader's rate limit
+    QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
+
+    /// returns reader's rate limit for a given remote process
+    const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+
+    /// number of items in incoming queue from a given remote process
+    int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+
+    /// number of items in outgoing queue to a given remote process
+    int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+
+protected:
+    /// incoming queue from a given remote process
+    virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
+    OneToOneUniQueue &inQueue(const int remoteProcessId);
+
+    /// outgoing queue to a given remote process
+    virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
+    OneToOneUniQueue &outQueue(const int remoteProcessId);
+
+    virtual const QueueReader &localReader() const = 0;
+    QueueReader &localReader();
+
+    virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
+    QueueReader &remoteReader(const int remoteProcessId);
+
+    virtual int remotesCount() const = 0;
+    virtual int remotesIdOffset() const = 0;
+
+protected:
+    const int theLocalProcessId; ///< process ID of this queue
+
+private:
+    int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
+};
+
 /**
  * Lockless fixed-capacity bidirectional queue for a limited number
  * processes. Allows communication between two groups of processes:
@@ -148,7 +212,7 @@ public:
  * communicate. Process in each group has a unique integer ID in
  * [groupIdOffset, groupIdOffset + groupSize) range.
  */
-class FewToFewBiQueue
+class FewToFewBiQueue: public BaseMultiQueue
 {
 public:
     typedef OneToOneUniQueue::Full Full;
@@ -188,64 +252,91 @@ public:
     /// maximum number of items in the queue
     static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
 
+    /// finds the oldest item in incoming and outgoing queues between
+    /// us and the given remote process
+    template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
+
+protected:
+    virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+    virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+    virtual const QueueReader &localReader() const;
+    virtual const QueueReader &remoteReader(const int processId) const;
+    virtual int remotesCount() const;
+    virtual int remotesIdOffset() const;
+
+private:
+    bool validProcessId(const Group group, const int processId) const;
+    int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+    const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
+    int readerIndex(const Group group, const int processId) const;
     Group localGroup() const { return theLocalGroup; }
     Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
 
-    /// clears the reader notification received by the local process from the remote process
-    void clearReaderSignal(const int remoteProcessId);
-
-    /// picks a process and calls OneToOneUniQueue::pop() using its queue
-    template <class Value> bool pop(int &remoteProcessId, Value &value);
+private:
+    const Mem::Pointer<Metadata> metadata; ///< shared metadata
+    const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
+    const Mem::Pointer<QueueReaders> readers; ///< readers array
 
-    /// calls OneToOneUniQueue::push() using the given process queue
-    template <class Value> bool push(const int remoteProcessId, const Value &value);
+    const Group theLocalGroup; ///< group of this queue
+};
 
-    /// finds the oldest item in incoming and outgoing queues between
-    /// us and the given remote process
-    template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
+/**
+ * Lockless fixed-capacity bidirectional queue for a limited number
+ * processes. Any process may send data to and receive from any other
+ * process (including itself). Each process has a unique integer ID in
+ * [processIdOffset, processIdOffset + processCount) range.
+ */
+class MultiQueue: public BaseMultiQueue
+{
+public:
+    typedef OneToOneUniQueue::Full Full;
+    typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
 
-    /// peeks at the item likely to be pop()ed next
-    template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+private:
+    /// Shared metadata for MultiQueue
+    struct Metadata {
+        Metadata(const int aProcessCount, const int aProcessIdOffset);
+        size_t sharedMemorySize() const { return sizeof(*this); }
+        static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
 
-    /// returns local reader's balance
-    QueueReader::Balance &localBalance();
+        const int theProcessCount;
+        const int theProcessIdOffset;
+    };
 
-    /// returns reader's balance for a given remote process
-    const QueueReader::Balance &balance(const int remoteProcessId) const;
+public:
+    class Owner
+    {
+    public:
+        Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
+        ~Owner();
 
-    /// returns local reader's rate limit
-    QueueReader::Rate &localRateLimit();
+    private:
+        Mem::Owner<Metadata> *const metadataOwner;
+        Mem::Owner<OneToOneUniQueues> *const queuesOwner;
+        Mem::Owner<QueueReaders> *const readersOwner;
+    };
 
-    /// returns reader's rate limit for a given remote process
-    const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
+    static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
 
-    /// number of items in incoming queue from a given remote process
-    int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
+    MultiQueue(const String &id, const int localProcessId);
 
-    /// number of items in outgoing queue to a given remote process
-    int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
+protected:
+    virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
+    virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
+    virtual const QueueReader &localReader() const;
+    virtual const QueueReader &remoteReader(const int remoteProcessId) const;
+    virtual int remotesCount() const;
+    virtual int remotesIdOffset() const;
 
 private:
-    bool validProcessId(const Group group, const int processId) const;
-    int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
-    const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
-    OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
-    const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
-    const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
-    QueueReader &reader(const Group group, const int processId);
-    const QueueReader &reader(const Group group, const int processId) const;
-    int readerIndex(const Group group, const int processId) const;
-    int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
-    int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
+    bool validProcessId(const int processId) const;
+    const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
+    const QueueReader &reader(const int processId) const;
 
 private:
     const Mem::Pointer<Metadata> metadata; ///< shared metadata
     const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
     const Mem::Pointer<QueueReaders> readers; ///< readers array
-
-    const Group theLocalGroup; ///< group of this queue
-    const int theLocalProcessId; ///< process ID of this queue
-    int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
 };
 
 // OneToOneUniQueue
@@ -330,19 +421,18 @@ OneToOneUniQueues::front() const
     return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
-// FewToFewBiQueue
+// BaseMultiQueue
 
 template <class Value>
 bool
-FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
+BaseMultiQueue::pop(int &remoteProcessId, Value &value)
 {
-    // iterate all remote group processes, starting after the one we visited last
-    QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
-    for (int i = 0; i < remoteGroupSize(); ++i) {
-        if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
-            theLastPopProcessId = remoteGroupIdOffset();
-        OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
-        if (queue.pop(value, &localReader)) {
+    // iterate all remote processes, starting after the one we visited last
+    for (int i = 0; i < remotesCount(); ++i) {
+        if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
+            theLastPopProcessId = remotesIdOffset();
+        OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
+        if (queue.pop(value, &localReader())) {
             remoteProcessId = theLastPopProcessId;
             debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
             return true;
@@ -353,14 +443,34 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
 
 template <class Value>
 bool
-FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
+BaseMultiQueue::push(const int remoteProcessId, const Value &value)
 {
-    OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
-    QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
+    OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
+    QueueReader &reader = remoteReader(remoteProcessId);
     debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
-    return remoteQueue.push(value, &remoteReader);
+    return remoteQueue.push(value, &reader);
 }
 
+template <class Value>
+bool
+BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
+{
+    // mimic FewToFewBiQueue::pop() but quit just before popping
+    int popProcessId = theLastPopProcessId; // preserve for future pop()
+    for (int i = 0; i < remotesCount(); ++i) {
+        if (++popProcessId >= remotesIdOffset() + remotesCount())
+            popProcessId = remotesIdOffset();
+        const OneToOneUniQueue &queue = inQueue(popProcessId);
+        if (queue.peek(value)) {
+            remoteProcessId = popProcessId;
+            return true;
+        }
+    }
+    return false; // most likely, no process had anything to pop
+}
+
+// FewToFewBiQueue
+
 template <class Value>
 bool
 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
@@ -383,26 +493,6 @@ FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
     return out.peek(value);
 }
 
-template <class Value>
-bool
-FewToFewBiQueue::peek(int &remoteProcessId, Value &value) const
-{
-    // mimic FewToFewBiQueue::pop() but quit just before popping
-    int popProcessId = theLastPopProcessId; // preserve for future pop()
-    for (int i = 0; i < remoteGroupSize(); ++i) {
-        if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
-            popProcessId = remoteGroupIdOffset();
-        const OneToOneUniQueue &queue =
-            oneToOneQueue(remoteGroup(), popProcessId,
-                          theLocalGroup, theLocalProcessId);
-        if (queue.peek(value)) {
-            remoteProcessId = popProcessId;
-            return true;
-        }
-    }
-    return false; // most likely, no process had anything to pop
-}
-
 } // namespace Ipc
 
 #endif // SQUID_IPC_QUEUE_H
index 363e706e0c0d74fb37a2648e84a8006deb5cddb3..10dc1befd1e5d396c755767f534b9879229740ce 100644 (file)
@@ -19,6 +19,7 @@
 #include "mgr/Forwarder.h"
 #include "SwapDir.h" /* XXX: scope boundary violation */
 #include "CacheManager.h"
+#include "CollapsedForwarding.h"
 #if USE_DISKIO_IPCIO
 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
 #endif
@@ -89,6 +90,10 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
     }
     break;
 
+    case mtCollapsedForwardingNotification:
+        CollapsedForwarding::HandleNotification(message);
+        break;
+
 #if SQUID_SNMP
     case mtSnmpRequest: {
         const Snmp::Request req(message);