]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Rework shared queue for IpcIoFile, further optimize IpcIo notifications.
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Tue, 26 Apr 2011 20:39:59 +0000 (00:39 +0400)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Tue, 26 Apr 2011 20:39:59 +0000 (00:39 +0400)
The patch implements a FewToFewBiQueue class that allows
communication between two group of processes.  The queue is used
in IpcIoFile and allows to have a single shared queue reader
state for each process (both diskers and workers).  This
continues the optimization started in r11279, see commit log for
more details.

The patch also decreases the number of shared memory segment used
by queues.  Before the change, FewToOneBiQueue used
(2*workerCount + 1) number of segments. Now FewToFewBiQueue uses
just three: for shared metadata, for array of one-to-one queues
and for array of queue readers.

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/ipc/Queue.cc
src/ipc/Queue.h
src/ipc/mem/Pointer.h

index 1cc4a81f78b002083f49fdd1a48351776c8f3c2c..a9a2bd497c5e8a65953425c0d9df98c007cb827d 100644 (file)
@@ -5,6 +5,7 @@
  */
 
 #include "config.h"
+#include "base/RunnersRegistry.h"
 #include "base/TextException.h"
 #include "DiskIO/IORequestor.h"
 #include "DiskIO/IpcIo/IpcIoFile.h"
 #include "DiskIO/WriteRequest.h"
 #include "ipc/Messages.h"
 #include "ipc/Port.h"
+#include "ipc/Queue.h"
 #include "ipc/StrandSearch.h"
 #include "ipc/UdsOp.h"
 #include "ipc/mem/Pages.h"
 
 CBDATA_CLASS_INIT(IpcIoFile);
 
-IpcIoFile::DiskerQueue::Owner *IpcIoFile::diskerQueueOwner = NULL;
-IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
+/// shared memory segment path to use for IpcIoFile maps
+static const char *const ShmLabel = "io_file";
+
 const double IpcIoFile::Timeout = 7; // seconds;  XXX: ALL,9 may require more
 IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
 IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
+std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
 
 static bool DiskerOpen(const String &path, int flags, mode_t mode);
 static void DiskerClose(const String &path);
@@ -46,7 +50,7 @@ operator <<(std::ostream &os, const SipcIo &sio)
 
 
 IpcIoFile::IpcIoFile(char const *aDb):
-    dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0),
+    dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
     olderRequests(&requestMap1), newerRequests(&requestMap2),
     timeoutCheckScheduled(false)
 {
@@ -68,17 +72,15 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 {
     ioRequestor = callback;
     Must(diskId < 0); // we do not know our disker yet
-    Must(!diskerQueueOwner && !diskerQueue && !workerQueue);
+
+    if (!queue.get())
+        queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
 
     if (IamDiskProcess()) {
         error_ = !DiskerOpen(dbName, flags, mode);
         if (error_)
             return;
 
-        // XXX: make capacity configurable
-        diskerQueueOwner =
-            DiskerQueue::Init(dbName, Config.workers, sizeof(IpcIoMsg), 1024);
-        diskerQueue = new DiskerQueue(dbName);
         diskId = KidIdentifier;
         const bool inserted =
             IpcIoFiles.insert(std::make_pair(diskId, this)).second;
@@ -111,7 +113,6 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 void
 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
     Must(diskId < 0); // we do not know our disker yet
-    Must(!workerQueue);
 
     if (!response) {
         debugs(79,1, HERE << "error: timeout");
@@ -119,7 +120,6 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
     } else {
         diskId = response->strand.kidId;
         if (diskId >= 0) {
-            workerQueue = DiskerQueue::Attach(dbName, KidIdentifier);
             const bool inserted =
                 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
             Must(inserted);
@@ -149,13 +149,6 @@ IpcIoFile::close()
 {
     assert(ioRequestor != NULL);
 
-    delete diskerQueueOwner;
-    diskerQueueOwner = NULL;
-    delete diskerQueue;
-    diskerQueue = NULL;
-    delete workerQueue;
-    workerQueue = NULL;
-
     if (IamDiskProcess())
         DiskerClose(dbName);
     // XXX: else nothing to do?
@@ -300,11 +293,10 @@ void
 IpcIoFile::push(IpcIoPendingRequest *const pending)
 {
     // prevent queue overflows: check for responses to earlier requests
-    handleResponses("before push");
+    HandleResponses("before push");
 
     debugs(47, 7, HERE);
     Must(diskId >= 0);
-    Must(workerQueue);
     Must(pending);
     Must(pending->readRequest || pending->writeRequest);
 
@@ -328,12 +320,12 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
             memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
         }
 
-        debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
+        debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
 
-        if (workerQueue->push(ipcIo))
+        if (queue->push(diskId, ipcIo))
             Notify(diskId); // must notify disker
         trackPendingRequest(pending);
-    } catch (const WorkerQueue::Full &) {
+    } catch (const Queue::Full &) {
         debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
                SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
         // TODO: grow queue size
@@ -367,22 +359,17 @@ IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
 }
 
 void
-IpcIoFile::handleNotification()
-{
-    debugs(47, 4, HERE << "notified");
-    workerQueue->clearReaderSignal();
-    handleResponses("after notification");
-}
-
-void
-IpcIoFile::handleResponses(const char *when)
+IpcIoFile::HandleResponses(const char *const when)
 {
     debugs(47, 4, HERE << "popping all " << when);
-    Must(workerQueue);
     IpcIoMsg ipcIo;
     // get all responses we can: since we are not pushing, this will stop
-    while (workerQueue->pop(ipcIo))
-        handleResponse(ipcIo);
+    int diskId;
+    while (queue->pop(diskId, ipcIo)) {
+        const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
+        Must(i != IpcIoFiles.end()); // TODO: warn but continue
+        i->second->handleResponse(ipcIo);
+    }
 }
 
 void
@@ -390,7 +377,7 @@ IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
 {
     const int requestId = ipcIo.requestId;
     debugs(47, 7, HERE << "popped disker response: " <<
-        SipcIo(KidIdentifier, ipcIo, diskId)  << " at " << workerQueue->popQueue->size());
+        SipcIo(KidIdentifier, ipcIo, diskId));
 
     Must(requestId);
     if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
@@ -420,15 +407,11 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
 {
     const int from = msg.getInt();
     debugs(47, 7, HERE << "from " << from);
-    if (IamDiskProcess()) {
-        const int workerId = from;
-        DiskerHandleRequests(workerId);
-    } else {
-        const int diskId = from;
-        const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
-        Must(i != IpcIoFiles.end()); // TODO: warn but continue
-        i->second->handleNotification();
-    }
+    queue->clearReaderSignal(from);
+    if (IamDiskProcess())
+        DiskerHandleRequests();
+    else
+        HandleResponses("after notification");
 }
 
 /// handles open request timeout
@@ -618,14 +601,11 @@ diskerWrite(IpcIoMsg &ipcIo)
 }
 
 void
-IpcIoFile::DiskerHandleRequests(const int workerWhoNotified)
+IpcIoFile::DiskerHandleRequests()
 {
-    Must(diskerQueue);
-    diskerQueue->clearReaderSignal(workerWhoNotified);
-
     int workerId = 0;
     IpcIoMsg ipcIo;
-    while (diskerQueue->pop(workerId, ipcIo))
+    while (queue->pop(workerId, ipcIo))
         DiskerHandleRequest(workerId, ipcIo);
 
     // TODO: If the loop keeps on looping, we probably should take a break
@@ -640,8 +620,6 @@ IpcIoFile::DiskerHandleRequests(const int workerWhoNotified)
 void
 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
 {
-    Must(diskerQueue);
-
     if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
         debugs(0,0, HERE << "disker" << KidIdentifier <<
                " should not receive " << ipcIo.command <<
@@ -659,12 +637,12 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
     else // ipcIo.command == IpcIo::cmdWrite
         diskerWrite(ipcIo);
 
-    debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId-1]->pushQueue->size());
+    debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
 
     try {
-        if (diskerQueue->push(workerId, ipcIo))
+        if (queue->push(workerId, ipcIo))
             Notify(workerId); // must notify worker
-    } catch (const DiskerQueue::Full &) {
+    } catch (const Queue::Full &) {
         // The worker queue should not overflow because the worker should pop()
         // before push()ing and because if disker pops N requests at a time,
         // we should make sure the worker pop() queue length is the worker
@@ -705,3 +683,37 @@ DiskerClose(const String &path)
         store_open_disk_fd--;
     }
 }
+
+
+/// initializes shared memory segments used by IpcIoFile
+class IpcIoRr: public RegisteredRunner
+{
+public:
+    /* RegisteredRunner API */
+    IpcIoRr(): owner(NULL) {}
+    virtual void run(const RunnerRegistry &);
+    virtual ~IpcIoRr();
+
+private:
+    Ipc::FewToFewBiQueue::Owner *owner;
+};
+
+RunnerRegistrationEntry(rrAfterConfig, IpcIoRr);
+
+
+void IpcIoRr::run(const RunnerRegistry &)
+{
+    if (!UsingSmp())
+        return;
+
+    if (IamMasterProcess()) {
+        Must(!owner);
+        // XXX: make capacity configurable
+        owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_configured, 1 + Config.workers, sizeof(IpcIoMsg), 1024);
+    }
+}
+
+IpcIoRr::~IpcIoRr()
+{
+    delete owner;
+}
index 273556c5316cfd9dd5dd7faef78be36ab182ea4a..ba1043133e960b5b41d73b026ac2e32fdcd445c6 100644 (file)
@@ -6,10 +6,14 @@
 #include "DiskIO/DiskFile.h"
 #include "DiskIO/IORequestor.h"
 #include "ipc/forward.h"
-#include "ipc/Queue.h"
 #include "ipc/mem/Page.h"
 #include <list>
 #include <map>
+#include <memory>
+
+namespace Ipc {
+class FewToFewBiQueue;
+} // Ipc
 
 // TODO: expand to all classes
 namespace IpcIo {
@@ -84,22 +88,15 @@ private:
     void checkTimeouts();
     void scheduleTimeoutCheck();
 
-    void handleNotification();
-    void handleResponses(const char *when);
+    static void HandleResponses(const char *const when);
     void handleResponse(IpcIoMsg &ipcIo);
 
-    static void DiskerHandleRequests(const int workerId);
+    static void DiskerHandleRequests();
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
 
 private:
-    typedef Ipc::FewToOneBiQueue DiskerQueue;
-    typedef Ipc::OneToOneBiQueue WorkerQueue;
-
     const String dbName; ///< the name of the file we are managing
     int diskId; ///< the process ID of the disker we talk to
-    static DiskerQueue::Owner *diskerQueueOwner; ///< IPC queue owner for disker
-    static DiskerQueue *diskerQueue; ///< IPC queue for disker
-    WorkerQueue *workerQueue; ///< IPC queue for worker
     RefCount<IORequestor> ioRequestor;
 
     bool error_; ///< whether we have seen at least one I/O error (XXX)
@@ -123,6 +120,9 @@ private:
     typedef std::map<int, IpcIoFile*> IpcIoFilesMap;
     static IpcIoFilesMap IpcIoFiles;
 
+    typedef Ipc::FewToFewBiQueue Queue;
+    static std::auto_ptr<Queue> queue; ///< IPC queue
+
     CBDATA_CLASS2(IpcIoFile);
 };
 
index 993e9aa2f533c811edcb2ca29d6137f7085530e1..ad1ee0d41bc6310f501ed13a9e3bf37c955e31f0 100644 (file)
 #include "globals.h"
 #include "ipc/Queue.h"
 
-/// constructs shared segment ID from parent queue ID and child queue index
+/// constructs Metadata ID from parent queue ID
 static String
-QueueId(String id, const int idx)
+MetadataId(String id)
 {
-    id.append("__");
-    id.append(xitoa(idx));
+    id.append("__metadata");
     return id;
 }
 
-/// constructs QueueReader ID from parent queue ID
+/// constructs one-to-one queues ID from parent queue ID
 static String
-ReaderId(String id)
+QueuesId(String id)
+{
+    id.append("__queues");
+    return id;
+}
+
+/// constructs QueueReaders ID from parent queue ID
+static String
+ReadersId(String id)
 {
     id.append("__readers");
     return id;
@@ -61,31 +68,19 @@ Ipc::QueueReaders::SharedMemorySize(const int capacity)
 
 // OneToOneUniQueue
 
-Ipc::OneToOneUniQueue::Owner *
-Ipc::OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
-{
-    Must(maxItemSize > 0);
-    Must(capacity > 0);
-    return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity);
-}
-
-Ipc::OneToOneUniQueue::OneToOneUniQueue(const String &id):
-    shared(shm_old(Shared)(id.termedBuf())), reader_(NULL)
-{
-}
-
-void
-Ipc::OneToOneUniQueue::reader(QueueReader *aReader)
+Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
+    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+    theCapacity(aCapacity)
 {
-    Must(!reader_ && aReader);
-    reader_ = aReader;
+    Must(theMaxItemSize > 0);
+    Must(theCapacity > 0);
 }
 
 int
 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
 {
     assert(maxItemSize > 0);
-    size -= sizeof(Shared);
+    size -= sizeof(OneToOneUniQueue);
     return size >= 0 ? size / maxItemSize : 0;
 }
 
@@ -93,170 +88,145 @@ int
 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
 {
     assert(size >= 0);
-    return sizeof(Shared) + maxItemSize * size;
+    return sizeof(OneToOneUniQueue) + maxItemSize * size;
 }
 
-Ipc::QueueReader &
-Ipc::OneToOneUniQueue::reader()
-{
-    Must(reader_);
-    return *reader_;
-}
 
-Ipc::OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
-    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
-    theCapacity(aCapacity)
-{
-}
+/* OneToOneUniQueues */
 
-size_t
-Ipc::OneToOneUniQueue::Shared::sharedMemorySize() const
+Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
 {
-    return SharedMemorySize(theMaxItemSize, theCapacity);
+    Must(theCapacity > 0);
+    for (int i = 0; i < theCapacity; ++i)
+        new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
 }
 
 size_t
-Ipc::OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity)
+Ipc::OneToOneUniQueues::sharedMemorySize() const
 {
-    return Items2Bytes(maxItemSize, capacity);
+    return sizeof(*this) + theCapacity * front().sharedMemorySize();
 }
 
-
-// OneToOneBiQueue
-
-Ipc::OneToOneBiQueue::Owner *
-Ipc::OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
-{
-    UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity));
-    UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity));
-    Owner *const owner = new Owner;
-    owner->first = owner1;
-    owner->second = owner2;
-    return owner;
-}
-
-Ipc::OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side)
-{
-    OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1));
-    OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2));
-    switch (side) {
-    case Side1:
-        popQueue.reset(queue1);
-        pushQueue.reset(queue2);
-        break;
-    case Side2:
-        popQueue.reset(queue2);
-        pushQueue.reset(queue1);
-        break;
-    default:
-        Must(false);
-    }
-}
-
-void
-Ipc::OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+size_t
+Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
 {
-    popQueue->reader(r1);
-    pushQueue->reader(r2);
+    const int queueSize =
+        OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
+    return sizeof(OneToOneUniQueues) + queueSize * capacity;
 }
 
-void
-Ipc::OneToOneBiQueue::clearReaderSignal()
+const Ipc::OneToOneUniQueue &
+Ipc::OneToOneUniQueues::operator [](const int index) const
 {
-    debugs(54, 7, HERE << "reader: " << &popQueue->reader());
-    popQueue->reader().clearSignal();
+    Must(0 <= index && index < theCapacity);
+    const size_t queueSize = index ? front().sharedMemorySize() : 0;
+    const char *const queue =
+        reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
+    return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
 
-// FewToOneBiQueue
+// FewToFewBiQueue
 
-Ipc::FewToOneBiQueue::Owner *
-Ipc::FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity)
+Ipc::FewToFewBiQueue::Owner *
+Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
 {
-    return new Owner(id, workerCount, maxItemSize, capacity);
+    return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
 }
 
-Ipc::FewToOneBiQueue::FewToOneBiQueue(const String &id):
-    theLastPopWorker(0),
-    readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())),
-    reader(readers->theReaders)
+Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+    metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+    queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+    readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+    theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
+    theLastPopProcessId(readers->theCapacity)
 {
-    Must(readers->theCapacity > 1);
+    Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
+    Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
 
-    debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
-
-    biQueues.reserve(workerCount());
-    for (int i = 0; i < workerCount(); ++i) {
-        OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), OneToOneBiQueue::Side1);
-        QueueReader *const remoteReader = readers->theReaders + i + 1;
-        biQueue->readers(reader, remoteReader);
-        biQueues.push_back(biQueue);
-    }
+    const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
+    debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
 }
 
-Ipc::OneToOneBiQueue *
-Ipc::FewToOneBiQueue::Attach(const String &id, const int workerId)
-{
-    Mem::Pointer<QueueReaders> readers = shm_old(QueueReaders)(ReaderId(id).termedBuf());
-    Must(workerId >= WorkerIdOffset);
-    Must(workerId < readers->theCapacity - 1 + WorkerIdOffset);
-    QueueReader *const remoteReader = readers->theReaders;
-    debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
-    QueueReader *const localReader =
-        readers->theReaders + workerId - WorkerIdOffset + 1;
-    debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
-
-    OneToOneBiQueue *const biQueue =
-        new OneToOneBiQueue(QueueId(id, workerId), OneToOneBiQueue::Side2);
-    biQueue->readers(localReader, remoteReader);
-
-    // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
-    const Mem::Pointer<QueueReaders> *const leakingReaders = new Mem::Pointer<QueueReaders>(readers);
-    Must(leakingReaders); // silence unused variable warning
-
-    return biQueue;
-}
-
-Ipc::FewToOneBiQueue::~FewToOneBiQueue()
-{
-    for (int i = 0; i < workerCount(); ++i)
-        delete biQueues[i];
+bool
+Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
+{
+    switch (group) {
+    case groupA:
+        return metadata->theGroupAIdOffset <= processId &&
+            processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+    case groupB:
+        return metadata->theGroupBIdOffset <= processId &&
+            processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
+    }
+    return false;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+{
+    Must(fromGroup != toGroup);
+    Must(validProcessId(fromGroup, fromProcessId));
+    Must(validProcessId(toGroup, toProcessId));
+    int index1;
+    int index2;
+    int offset;
+    if (fromGroup == groupA) {
+        index1 = fromProcessId - metadata->theGroupAIdOffset;
+        index2 = toProcessId - metadata->theGroupBIdOffset;
+        offset = 0;
+    } else {
+        index1 = toProcessId - metadata->theGroupAIdOffset;
+        index2 = fromProcessId - metadata->theGroupBIdOffset;
+        offset = metadata->theGroupASize * metadata->theGroupBSize;
+    }
+    const int index = offset + index1 * metadata->theGroupBSize + index2;
+    return (*queues)[index];
 }
 
-bool
-Ipc::FewToOneBiQueue::validWorkerId(const int workerId) const
+Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
 {
-    return WorkerIdOffset <= workerId &&
-        workerId < WorkerIdOffset + workerCount();
+    Must(validProcessId(group, processId));
+    const int index =  group == groupA ?
+        processId - metadata->theGroupAIdOffset :
+        metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
+    return readers->theReaders[index];
 }
 
 void
-Ipc::FewToOneBiQueue::clearReaderSignal(int workerId)
+Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
 {
-    debugs(54, 7, HERE << "reader: " << reader->id);
+    QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
+    debugs(54, 7, HERE << "reader: " << localReader.id);
 
-    assert(validWorkerId(workerId));
-    reader->clearSignal();
+    Must(validProcessId(remoteGroup(), remoteProcessId));
+    localReader.clearSignal();
 
     // we got a hint; we could reposition iteration to try popping from the
-    // workerId queue first; but it does not seem to help much and might
+    // remoteProcessId queue first; but it does not seem to help much and might
     // introduce some bias so we do not do that for now:
-    // theLastPopWorker = (workerId + workerCount() - 1) % workerCount();
+    // theLastPopProcessId = remoteProcessId;
 }
 
-Ipc::FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity):
-    readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1))
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+    theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+    theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+    Must(theGroupASize > 0);
+    Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+    metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+    queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+    readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
 {
-    biQueueOwners.reserve(workerCount);
-    for (int i = 0; i < workerCount; ++i) {
-        OneToOneBiQueue::Owner *const queueOwner = OneToOneBiQueue::Init(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
-        biQueueOwners.push_back(queueOwner);
-    }
 }
 
-Ipc::FewToOneBiQueue::Owner::~Owner()
+Ipc::FewToFewBiQueue::Owner::~Owner()
 {
-    for (size_t i = 0; i < biQueueOwners.size(); ++i)
-        delete biQueueOwners[i];
+    delete metadataOwner;
+    delete queuesOwner;
     delete readersOwner;
 }
index ca449e338ec8572fcf417c2911926696e31fe6e9..f87259c06483000476501c7e0c8465786d304962 100644 (file)
@@ -12,8 +12,6 @@
 #include "ipc/mem/Pointer.h"
 #include "util.h"
 
-#include <memory>
-
 class String;
 
 namespace Ipc {
@@ -71,137 +69,130 @@ public:
  * different value). We can add support for blocked writers if needed.
  */
 class OneToOneUniQueue {
-private:
-    struct Shared {
-        Shared(const unsigned int aMaxItemSize, const int aCapacity);
-        size_t sharedMemorySize() const;
-        static size_t SharedMemorySize(const unsigned int maxItemSize, const int capacity);
-
-        unsigned int theIn; ///< input index, used only in push()
-        unsigned int theOut; ///< output index, used only in pop()
-
-        AtomicWord theSize; ///< number of items in the queue
-        const unsigned int theMaxItemSize; ///< maximum item size
-        const int theCapacity; ///< maximum number of items, i.e. theBuffer size
-
-        char theBuffer[];
-    };
-
 public:
     // pop() and push() exceptions; TODO: use TextException instead
     class Full {};
     class ItemTooLarge {};
 
-    typedef Mem::Owner<Shared> Owner;
-
-    /// initialize shared memory
-    static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
+    OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
 
-    OneToOneUniQueue(const String &id);
+    unsigned int maxItemSize() const { return theMaxItemSize; }
+    int size() const { return theSize; }
+    int capacity() const { return theCapacity; }
+    int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
 
-    unsigned int maxItemSize() const { return shared->theMaxItemSize; }
-    int size() const { return shared->theSize; }
-    int capacity() const { return shared->theCapacity; }
-
-    bool empty() const { return !shared->theSize; }
-    bool full() const { return shared->theSize == shared->theCapacity; }
+    bool empty() const { return !theSize; }
+    bool full() const { return theSize == theCapacity; }
 
     static int Bytes2Items(const unsigned int maxItemSize, int size);
     static int Items2Bytes(const unsigned int maxItemSize, const int size);
 
     /// returns true iff the value was set; [un]blocks the reader as needed
-    template<class Value> bool pop(Value &value);
+    template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
 
     /// returns true iff the caller must notify the reader of the pushed item
-    template<class Value> bool push(const Value &value);
-
-    QueueReader &reader();
-    void reader(QueueReader *aReader);
+    template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
 
 private:
 
-    Mem::Pointer<Shared> shared; ///< pointer to shared memory
-    QueueReader *reader_; ///< the state of the code popping from this queue
-};
-
-/// Lockless fixed-capacity bidirectional queue for two processes.
-class OneToOneBiQueue {
-private:
-    typedef std::auto_ptr<OneToOneUniQueue::Owner> UniQueueOwner;
+    unsigned int theIn; ///< input index, used only in push()
+    unsigned int theOut; ///< output index, used only in pop()
 
-public:
-    typedef OneToOneUniQueue::Full Full;
-    typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+    AtomicWord theSize; ///< number of items in the queue
+    const unsigned int theMaxItemSize; ///< maximum item size
+    const int theCapacity; ///< maximum number of items, i.e. theBuffer size
 
-    typedef std::pair<UniQueueOwner, UniQueueOwner> Owner;
+    char theBuffer[];
+};
 
-    /// initialize shared memory
-    static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
+/// shared array of OneToOneUniQueues
+class OneToOneUniQueues {
+public:
+    OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
 
-    enum Side { Side1 = 1, Side2 = 2 };
-    OneToOneBiQueue(const String &id, const Side side);
+    size_t sharedMemorySize() const;
+    static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
 
-    void readers(QueueReader *r1, QueueReader *r2);
-    void clearReaderSignal();
+    const OneToOneUniQueue &operator [](const int index) const;
+    inline OneToOneUniQueue &operator [](const int index);
 
-    /* wrappers to call the right OneToOneUniQueue method for this process */
-    template<class Value> bool pop(Value &value) { return popQueue->pop(value); }
-    template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
+private:
+    inline const OneToOneUniQueue &front() const;
 
-//private:
-    std::auto_ptr<OneToOneUniQueue> popQueue; ///< queue to pop from for this process
-    std::auto_ptr<OneToOneUniQueue> pushQueue; ///< queue to push to for this process
+public:
+    const int theCapacity; /// number of OneToOneUniQueues
 };
 
 /**
  * Lockless fixed-capacity bidirectional queue for a limited number
- * pricesses. Implements a star topology: Many worker processes
- * communicate with the one central process. The central process uses
- * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
- * created with the Attach() method. Each worker has a unique integer
- * ID in [1, workerCount] range.
+ * processes. Allows communication between two groups of processes:
+ * any process in one group may send data to and receive from any
+ * process in another group, but processes in the same group can not
+ * communicate. Process in each group has a unique integer ID in
+ * [groupIdOffset, groupIdOffset + groupSize) range.
  */
-class FewToOneBiQueue {
+class FewToFewBiQueue {
 public:
-    typedef OneToOneBiQueue::Full Full;
-    typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
+    typedef OneToOneUniQueue::Full Full;
+    typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
+private:
+    /// Shared metadata for FewToFewBiQueue
+    struct Metadata {
+        Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
+        size_t sharedMemorySize() const { return sizeof(*this); }
+        static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
+
+        const int theGroupASize;
+        const int theGroupAIdOffset;
+        const int theGroupBSize;
+        const int theGroupBIdOffset;
+    };
 
+public:
     class Owner {
     public:
-        Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
+        Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
         ~Owner();
 
     private:
-        Vector<OneToOneBiQueue::Owner *> biQueueOwners;
+        Mem::Owner<Metadata> *const metadataOwner;
+        Mem::Owner<OneToOneUniQueues> *const queuesOwner;
         Mem::Owner<QueueReaders> *const readersOwner;
     };
 
-    static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
+    static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
 
-    FewToOneBiQueue(const String &id);
-    static OneToOneBiQueue *Attach(const String &id, const int workerId);
-    ~FewToOneBiQueue();
+    enum Group { groupA = 0, groupB = 1 };
+    FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
 
-    bool validWorkerId(const int workerId) const;
-    int workerCount() const { return readers->theCapacity - 1; }
+    Group localGroup() const { return theLocalGroup; }
+    Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
 
-    /// clears the reader notification received by the disker from worker
-    void clearReaderSignal(int workerId);
+    /// clears the reader notification received by the local process from the remote process
+    void clearReaderSignal(const int remoteProcessId);
 
-    /// picks a worker and calls OneToOneUniQueue::pop() using its queue
-    template <class Value> bool pop(int &workerId, Value &value);
+    /// picks a process and calls OneToOneUniQueue::pop() using its queue
+    template <class Value> bool pop(int &remoteProcessId, Value &value);
 
-    /// calls OneToOneUniQueue::push() using the given worker queue
-    template <class Value> bool push(const int workerId, const Value &value);
+    /// calls OneToOneUniQueue::push() using the given process queue
+    template <class Value> bool push(const int remoteProcessId, const Value &value);
 
-//private: XXX: make private by moving pop/push debugging into pop/push
-    int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
-    Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
+private:
+    bool validProcessId(const Group group, const int processId) const;
+    OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
+    QueueReader &reader(const Group group, const int processId);
+    int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
+    int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
 
+private:
+    const Mem::Pointer<Metadata> metadata; ///< shared metadata
+    const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
     const Mem::Pointer<QueueReaders> readers; ///< readers array
-    QueueReader *const reader; ///< the state of the code popping from all biQueues
 
-    enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
+    const Group theLocalGroup; ///< group of this queue
+    const int theLocalProcessId; ///< process ID of this queue
+    int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
 };
 
 
@@ -209,73 +200,99 @@ public:
 
 template <class Value>
 bool
-OneToOneUniQueue::pop(Value &value)
+OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
 {
-    if (sizeof(value) > shared->theMaxItemSize)
+    if (sizeof(value) > theMaxItemSize)
         throw ItemTooLarge();
 
     // A writer might push between the empty test and block() below, so we do
     // not return false right after calling block(), but test again.
     if (empty()) {
-        reader().block();
+        if (!reader)
+            return false;
+
+        reader->block();
         // A writer might push between the empty test and block() below,
         // so we must test again as such a writer will not signal us.
         if (empty())
             return false;
     }
 
-    reader().unblock();
-    const unsigned int pos =
-        (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize;
-    memcpy(&value, shared->theBuffer + pos, sizeof(value));
-    --shared->theSize;
+    if (reader)
+        reader->unblock();
+
+    const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
+    memcpy(&value, theBuffer + pos, sizeof(value));
+    --theSize;
 
     return true;
 }
 
 template <class Value>
 bool
-OneToOneUniQueue::push(const Value &value)
+OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
 {
-    if (sizeof(value) > shared->theMaxItemSize)
+    if (sizeof(value) > theMaxItemSize)
         throw ItemTooLarge();
 
     if (full())
         throw Full();
 
     const bool wasEmpty = empty();
-    const unsigned int pos =
-        shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
-    memcpy(shared->theBuffer + pos, &value, sizeof(value));
-    ++shared->theSize;
+    const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
+    memcpy(theBuffer + pos, &value, sizeof(value));
+    ++theSize;
+
+    return wasEmpty && (!reader || reader->raiseSignal());
+}
+
 
-    return wasEmpty && reader().raiseSignal();
+// OneToOneUniQueues
+
+inline OneToOneUniQueue &
+OneToOneUniQueues::operator [](const int index)
+{
+    return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
+}
+
+inline const OneToOneUniQueue &
+OneToOneUniQueues::front() const
+{
+    const char *const queue =
+        reinterpret_cast<const char *>(this) + sizeof(*this);
+    return *reinterpret_cast<const OneToOneUniQueue *>(queue);
 }
 
 
-// FewToOneBiQueue
+// FewToFewBiQueue
 
 template <class Value>
 bool
-FewToOneBiQueue::pop(int &workerId, Value &value)
+FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
 {
-    // iterate all workers, starting after the one we visited last
-    for (int i = 0; i < workerCount(); ++i) {
-        theLastPopWorker = (theLastPopWorker + 1) % workerCount();
-        if (biQueues[theLastPopWorker]->pop(value)) {
-            workerId = theLastPopWorker + WorkerIdOffset;
+    // iterate all remote group processes, starting after the one we visited last
+    QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
+    for (int i = 0; i < remoteGroupSize(); ++i) {
+        if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
+            theLastPopProcessId = remoteGroupIdOffset();
+        OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
+        if (queue.pop(value, &localReader)) {
+            remoteProcessId = theLastPopProcessId;
+            debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
             return true;
         }
     }
-    return false; // no worker had anything to pop
+    return false; // no process had anything to pop
 }
 
 template <class Value>
 bool
-FewToOneBiQueue::push(const int workerId, const Value &value)
+FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
 {
-    assert(validWorkerId(workerId));
-    return biQueues[workerId - WorkerIdOffset]->push(value);
+    OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
+    QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
+    debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
+    return remoteQueue.push(value, &remoteReader);
 }
 
 } // namespace Ipc
index 3726b978076576931c6929fbba8b022856f479c9..01a606d746332d0953f0ef47efc4c86e70fd621c 100644 (file)
@@ -27,6 +27,8 @@ public:
     static Owner *New(const char *const id, const P1 &p1, const P2 &p2);
     template <class P1, class P2, class P3>
     static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3);
+    template <class P1, class P2, class P3, class P4>
+    static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4);
 
     ~Owner();
 
@@ -72,7 +74,7 @@ private:
     typedef RefCount< Object<Class> > Base;
 
 public:
-    explicit Pointer(Object<Class> *const anObject): Base(anObject) {}
+    explicit Pointer(Object<Class> *const anObject = NULL): Base(anObject) {}
 
     Class *operator ->() const { return Base::operator ->()->theObject; }
     Class &operator *() const { return *Base::operator *().theObject; }
@@ -137,6 +139,16 @@ Owner<Class>::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3
     return owner;
 }
 
+template <class Class> template <class P1, class P2, class P3, class P4>
+Owner<Class> *
+Owner<Class>::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4)
+{
+    const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3, p4);
+    Owner *const owner = new Owner(id, sharedSize);
+    owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3, p4);
+    return owner;
+}
+
 // Object implementation
 
 template <class Class>