From: Dmitry Kurochkin Date: Tue, 26 Apr 2011 20:39:59 +0000 (+0400) Subject: Rework shared queue for IpcIoFile, further optimize IpcIo notifications. X-Git-Tag: take07~36 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=f5591061d55ed2cc7964f82124b4961237be7890;p=thirdparty%2Fsquid.git Rework shared queue for IpcIoFile, further optimize IpcIo notifications. The patch implements a FewToFewBiQueue class that allows communication between two group of processes. The queue is used in IpcIoFile and allows to have a single shared queue reader state for each process (both diskers and workers). This continues the optimization started in r11279, see commit log for more details. The patch also decreases the number of shared memory segment used by queues. Before the change, FewToOneBiQueue used (2*workerCount + 1) number of segments. Now FewToFewBiQueue uses just three: for shared metadata, for array of one-to-one queues and for array of queue readers. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 1cc4a81f78..a9a2bd497c 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -5,6 +5,7 @@ */ #include "config.h" +#include "base/RunnersRegistry.h" #include "base/TextException.h" #include "DiskIO/IORequestor.h" #include "DiskIO/IpcIo/IpcIoFile.h" @@ -12,17 +13,20 @@ #include "DiskIO/WriteRequest.h" #include "ipc/Messages.h" #include "ipc/Port.h" +#include "ipc/Queue.h" #include "ipc/StrandSearch.h" #include "ipc/UdsOp.h" #include "ipc/mem/Pages.h" CBDATA_CLASS_INIT(IpcIoFile); -IpcIoFile::DiskerQueue::Owner *IpcIoFile::diskerQueueOwner = NULL; -IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL; +/// shared memory segment path to use for IpcIoFile maps +static const char *const ShmLabel = "io_file"; + const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; +std::auto_ptr IpcIoFile::queue; static bool DiskerOpen(const String &path, int flags, mode_t mode); static void DiskerClose(const String &path); @@ -46,7 +50,7 @@ operator <<(std::ostream &os, const SipcIo &sio) IpcIoFile::IpcIoFile(char const *aDb): - dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0), + dbName(aDb), diskId(-1), error_(false), lastRequestId(0), olderRequests(&requestMap1), newerRequests(&requestMap2), timeoutCheckScheduled(false) { @@ -68,17 +72,15 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) { ioRequestor = callback; Must(diskId < 0); // we do not know our disker yet - Must(!diskerQueueOwner && !diskerQueue && !workerQueue); + + if (!queue.get()) + queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); if (IamDiskProcess()) { error_ = !DiskerOpen(dbName, flags, mode); if (error_) return; - // XXX: make capacity configurable - diskerQueueOwner = - DiskerQueue::Init(dbName, Config.workers, sizeof(IpcIoMsg), 1024); - diskerQueue = new DiskerQueue(dbName); diskId = KidIdentifier; const bool inserted = IpcIoFiles.insert(std::make_pair(diskId, this)).second; @@ -111,7 +113,6 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) void IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { Must(diskId < 0); // we do not know our disker yet - Must(!workerQueue); if (!response) { debugs(79,1, HERE << "error: timeout"); @@ -119,7 +120,6 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { } else { diskId = response->strand.kidId; if (diskId >= 0) { - workerQueue = DiskerQueue::Attach(dbName, KidIdentifier); const bool inserted = IpcIoFiles.insert(std::make_pair(diskId, this)).second; Must(inserted); @@ -149,13 +149,6 @@ IpcIoFile::close() { assert(ioRequestor != NULL); - delete diskerQueueOwner; - diskerQueueOwner = NULL; - delete diskerQueue; - diskerQueue = NULL; - delete workerQueue; - workerQueue = NULL; - if (IamDiskProcess()) DiskerClose(dbName); // XXX: else nothing to do? @@ -300,11 +293,10 @@ void IpcIoFile::push(IpcIoPendingRequest *const pending) { // prevent queue overflows: check for responses to earlier requests - handleResponses("before push"); + HandleResponses("before push"); debugs(47, 7, HERE); Must(diskId >= 0); - Must(workerQueue); Must(pending); Must(pending->readRequest || pending->writeRequest); @@ -328,12 +320,12 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away } - debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size()); + debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); - if (workerQueue->push(ipcIo)) + if (queue->push(diskId, ipcIo)) Notify(diskId); // must notify disker trackPendingRequest(pending); - } catch (const WorkerQueue::Full &) { + } catch (const Queue::Full &) { debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len // TODO: grow queue size @@ -367,22 +359,17 @@ IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) } void -IpcIoFile::handleNotification() -{ - debugs(47, 4, HERE << "notified"); - workerQueue->clearReaderSignal(); - handleResponses("after notification"); -} - -void -IpcIoFile::handleResponses(const char *when) +IpcIoFile::HandleResponses(const char *const when) { debugs(47, 4, HERE << "popping all " << when); - Must(workerQueue); IpcIoMsg ipcIo; // get all responses we can: since we are not pushing, this will stop - while (workerQueue->pop(ipcIo)) - handleResponse(ipcIo); + int diskId; + while (queue->pop(diskId, ipcIo)) { + const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); + Must(i != IpcIoFiles.end()); // TODO: warn but continue + i->second->handleResponse(ipcIo); + } } void @@ -390,7 +377,7 @@ IpcIoFile::handleResponse(IpcIoMsg &ipcIo) { const int requestId = ipcIo.requestId; debugs(47, 7, HERE << "popped disker response: " << - SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->popQueue->size()); + SipcIo(KidIdentifier, ipcIo, diskId)); Must(requestId); if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { @@ -420,15 +407,11 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) { const int from = msg.getInt(); debugs(47, 7, HERE << "from " << from); - if (IamDiskProcess()) { - const int workerId = from; - DiskerHandleRequests(workerId); - } else { - const int diskId = from; - const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); - Must(i != IpcIoFiles.end()); // TODO: warn but continue - i->second->handleNotification(); - } + queue->clearReaderSignal(from); + if (IamDiskProcess()) + DiskerHandleRequests(); + else + HandleResponses("after notification"); } /// handles open request timeout @@ -618,14 +601,11 @@ diskerWrite(IpcIoMsg &ipcIo) } void -IpcIoFile::DiskerHandleRequests(const int workerWhoNotified) +IpcIoFile::DiskerHandleRequests() { - Must(diskerQueue); - diskerQueue->clearReaderSignal(workerWhoNotified); - int workerId = 0; IpcIoMsg ipcIo; - while (diskerQueue->pop(workerId, ipcIo)) + while (queue->pop(workerId, ipcIo)) DiskerHandleRequest(workerId, ipcIo); // TODO: If the loop keeps on looping, we probably should take a break @@ -640,8 +620,6 @@ IpcIoFile::DiskerHandleRequests(const int workerWhoNotified) void IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) { - Must(diskerQueue); - if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { debugs(0,0, HERE << "disker" << KidIdentifier << " should not receive " << ipcIo.command << @@ -659,12 +637,12 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) else // ipcIo.command == IpcIo::cmdWrite diskerWrite(ipcIo); - debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId-1]->pushQueue->size()); + debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier)); try { - if (diskerQueue->push(workerId, ipcIo)) + if (queue->push(workerId, ipcIo)) Notify(workerId); // must notify worker - } catch (const DiskerQueue::Full &) { + } catch (const Queue::Full &) { // The worker queue should not overflow because the worker should pop() // before push()ing and because if disker pops N requests at a time, // we should make sure the worker pop() queue length is the worker @@ -705,3 +683,37 @@ DiskerClose(const String &path) store_open_disk_fd--; } } + + +/// initializes shared memory segments used by IpcIoFile +class IpcIoRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + IpcIoRr(): owner(NULL) {} + virtual void run(const RunnerRegistry &); + virtual ~IpcIoRr(); + +private: + Ipc::FewToFewBiQueue::Owner *owner; +}; + +RunnerRegistrationEntry(rrAfterConfig, IpcIoRr); + + +void IpcIoRr::run(const RunnerRegistry &) +{ + if (!UsingSmp()) + return; + + if (IamMasterProcess()) { + Must(!owner); + // XXX: make capacity configurable + owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_configured, 1 + Config.workers, sizeof(IpcIoMsg), 1024); + } +} + +IpcIoRr::~IpcIoRr() +{ + delete owner; +} diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index 273556c531..ba1043133e 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -6,10 +6,14 @@ #include "DiskIO/DiskFile.h" #include "DiskIO/IORequestor.h" #include "ipc/forward.h" -#include "ipc/Queue.h" #include "ipc/mem/Page.h" #include #include +#include + +namespace Ipc { +class FewToFewBiQueue; +} // Ipc // TODO: expand to all classes namespace IpcIo { @@ -84,22 +88,15 @@ private: void checkTimeouts(); void scheduleTimeoutCheck(); - void handleNotification(); - void handleResponses(const char *when); + static void HandleResponses(const char *const when); void handleResponse(IpcIoMsg &ipcIo); - static void DiskerHandleRequests(const int workerId); + static void DiskerHandleRequests(); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); private: - typedef Ipc::FewToOneBiQueue DiskerQueue; - typedef Ipc::OneToOneBiQueue WorkerQueue; - const String dbName; ///< the name of the file we are managing int diskId; ///< the process ID of the disker we talk to - static DiskerQueue::Owner *diskerQueueOwner; ///< IPC queue owner for disker - static DiskerQueue *diskerQueue; ///< IPC queue for disker - WorkerQueue *workerQueue; ///< IPC queue for worker RefCount ioRequestor; bool error_; ///< whether we have seen at least one I/O error (XXX) @@ -123,6 +120,9 @@ private: typedef std::map IpcIoFilesMap; static IpcIoFilesMap IpcIoFiles; + typedef Ipc::FewToFewBiQueue Queue; + static std::auto_ptr queue; ///< IPC queue + CBDATA_CLASS2(IpcIoFile); }; diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 993e9aa2f5..ad1ee0d41b 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -11,18 +11,25 @@ #include "globals.h" #include "ipc/Queue.h" -/// constructs shared segment ID from parent queue ID and child queue index +/// constructs Metadata ID from parent queue ID static String -QueueId(String id, const int idx) +MetadataId(String id) { - id.append("__"); - id.append(xitoa(idx)); + id.append("__metadata"); return id; } -/// constructs QueueReader ID from parent queue ID +/// constructs one-to-one queues ID from parent queue ID static String -ReaderId(String id) +QueuesId(String id) +{ + id.append("__queues"); + return id; +} + +/// constructs QueueReaders ID from parent queue ID +static String +ReadersId(String id) { id.append("__readers"); return id; @@ -61,31 +68,19 @@ Ipc::QueueReaders::SharedMemorySize(const int capacity) // OneToOneUniQueue -Ipc::OneToOneUniQueue::Owner * -Ipc::OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) -{ - Must(maxItemSize > 0); - Must(capacity > 0); - return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity); -} - -Ipc::OneToOneUniQueue::OneToOneUniQueue(const String &id): - shared(shm_old(Shared)(id.termedBuf())), reader_(NULL) -{ -} - -void -Ipc::OneToOneUniQueue::reader(QueueReader *aReader) +Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): + theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), + theCapacity(aCapacity) { - Must(!reader_ && aReader); - reader_ = aReader; + Must(theMaxItemSize > 0); + Must(theCapacity > 0); } int Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) { assert(maxItemSize > 0); - size -= sizeof(Shared); + size -= sizeof(OneToOneUniQueue); return size >= 0 ? size / maxItemSize : 0; } @@ -93,170 +88,145 @@ int Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) { assert(size >= 0); - return sizeof(Shared) + maxItemSize * size; + return sizeof(OneToOneUniQueue) + maxItemSize * size; } -Ipc::QueueReader & -Ipc::OneToOneUniQueue::reader() -{ - Must(reader_); - return *reader_; -} -Ipc::OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity): - theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), - theCapacity(aCapacity) -{ -} +/* OneToOneUniQueues */ -size_t -Ipc::OneToOneUniQueue::Shared::sharedMemorySize() const +Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) { - return SharedMemorySize(theMaxItemSize, theCapacity); + Must(theCapacity > 0); + for (int i = 0; i < theCapacity; ++i) + new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); } size_t -Ipc::OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity) +Ipc::OneToOneUniQueues::sharedMemorySize() const { - return Items2Bytes(maxItemSize, capacity); + return sizeof(*this) + theCapacity * front().sharedMemorySize(); } - -// OneToOneBiQueue - -Ipc::OneToOneBiQueue::Owner * -Ipc::OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) -{ - UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity)); - UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity)); - Owner *const owner = new Owner; - owner->first = owner1; - owner->second = owner2; - return owner; -} - -Ipc::OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side) -{ - OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1)); - OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2)); - switch (side) { - case Side1: - popQueue.reset(queue1); - pushQueue.reset(queue2); - break; - case Side2: - popQueue.reset(queue2); - pushQueue.reset(queue1); - break; - default: - Must(false); - } -} - -void -Ipc::OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2) +size_t +Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) { - popQueue->reader(r1); - pushQueue->reader(r2); + const int queueSize = + OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); + return sizeof(OneToOneUniQueues) + queueSize * capacity; } -void -Ipc::OneToOneBiQueue::clearReaderSignal() +const Ipc::OneToOneUniQueue & +Ipc::OneToOneUniQueues::operator [](const int index) const { - debugs(54, 7, HERE << "reader: " << &popQueue->reader()); - popQueue->reader().clearSignal(); + Must(0 <= index && index < theCapacity); + const size_t queueSize = index ? front().sharedMemorySize() : 0; + const char *const queue = + reinterpret_cast(this) + sizeof(*this) + index * queueSize; + return *reinterpret_cast(queue); } -// FewToOneBiQueue +// FewToFewBiQueue -Ipc::FewToOneBiQueue::Owner * -Ipc::FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity) +Ipc::FewToFewBiQueue::Owner * +Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) { - return new Owner(id, workerCount, maxItemSize, capacity); + return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); } -Ipc::FewToOneBiQueue::FewToOneBiQueue(const String &id): - theLastPopWorker(0), - readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())), - reader(readers->theReaders) +Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): + metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), + queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), + readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), + theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), + theLastPopProcessId(readers->theCapacity) { - Must(readers->theCapacity > 1); + Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); + Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); - debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id); - - biQueues.reserve(workerCount()); - for (int i = 0; i < workerCount(); ++i) { - OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), OneToOneBiQueue::Side1); - QueueReader *const remoteReader = readers->theReaders + i + 1; - biQueue->readers(reader, remoteReader); - biQueues.push_back(biQueue); - } + const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); + debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); } -Ipc::OneToOneBiQueue * -Ipc::FewToOneBiQueue::Attach(const String &id, const int workerId) -{ - Mem::Pointer readers = shm_old(QueueReaders)(ReaderId(id).termedBuf()); - Must(workerId >= WorkerIdOffset); - Must(workerId < readers->theCapacity - 1 + WorkerIdOffset); - QueueReader *const remoteReader = readers->theReaders; - debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id); - QueueReader *const localReader = - readers->theReaders + workerId - WorkerIdOffset + 1; - debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id); - - OneToOneBiQueue *const biQueue = - new OneToOneBiQueue(QueueId(id, workerId), OneToOneBiQueue::Side2); - biQueue->readers(localReader, remoteReader); - - // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker? - const Mem::Pointer *const leakingReaders = new Mem::Pointer(readers); - Must(leakingReaders); // silence unused variable warning - - return biQueue; -} - -Ipc::FewToOneBiQueue::~FewToOneBiQueue() -{ - for (int i = 0; i < workerCount(); ++i) - delete biQueues[i]; +bool +Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const +{ + switch (group) { + case groupA: + return metadata->theGroupAIdOffset <= processId && + processId < metadata->theGroupAIdOffset + metadata->theGroupASize; + case groupB: + return metadata->theGroupBIdOffset <= processId && + processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; + } + return false; +} + +Ipc::OneToOneUniQueue & +Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) +{ + Must(fromGroup != toGroup); + Must(validProcessId(fromGroup, fromProcessId)); + Must(validProcessId(toGroup, toProcessId)); + int index1; + int index2; + int offset; + if (fromGroup == groupA) { + index1 = fromProcessId - metadata->theGroupAIdOffset; + index2 = toProcessId - metadata->theGroupBIdOffset; + offset = 0; + } else { + index1 = toProcessId - metadata->theGroupAIdOffset; + index2 = fromProcessId - metadata->theGroupBIdOffset; + offset = metadata->theGroupASize * metadata->theGroupBSize; + } + const int index = offset + index1 * metadata->theGroupBSize + index2; + return (*queues)[index]; } -bool -Ipc::FewToOneBiQueue::validWorkerId(const int workerId) const +Ipc::QueueReader & +Ipc::FewToFewBiQueue::reader(const Group group, const int processId) { - return WorkerIdOffset <= workerId && - workerId < WorkerIdOffset + workerCount(); + Must(validProcessId(group, processId)); + const int index = group == groupA ? + processId - metadata->theGroupAIdOffset : + metadata->theGroupASize + processId - metadata->theGroupBIdOffset; + return readers->theReaders[index]; } void -Ipc::FewToOneBiQueue::clearReaderSignal(int workerId) +Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) { - debugs(54, 7, HERE << "reader: " << reader->id); + QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); + debugs(54, 7, HERE << "reader: " << localReader.id); - assert(validWorkerId(workerId)); - reader->clearSignal(); + Must(validProcessId(remoteGroup(), remoteProcessId)); + localReader.clearSignal(); // we got a hint; we could reposition iteration to try popping from the - // workerId queue first; but it does not seem to help much and might + // remoteProcessId queue first; but it does not seem to help much and might // introduce some bias so we do not do that for now: - // theLastPopWorker = (workerId + workerCount() - 1) % workerCount(); + // theLastPopProcessId = remoteProcessId; } -Ipc::FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity): - readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1)) +Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): + theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), + theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) +{ + Must(theGroupASize > 0); + Must(theGroupBSize > 0); +} + +Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): + metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), + queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), + readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) { - biQueueOwners.reserve(workerCount); - for (int i = 0; i < workerCount; ++i) { - OneToOneBiQueue::Owner *const queueOwner = OneToOneBiQueue::Init(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity); - biQueueOwners.push_back(queueOwner); - } } -Ipc::FewToOneBiQueue::Owner::~Owner() +Ipc::FewToFewBiQueue::Owner::~Owner() { - for (size_t i = 0; i < biQueueOwners.size(); ++i) - delete biQueueOwners[i]; + delete metadataOwner; + delete queuesOwner; delete readersOwner; } diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index ca449e338e..f87259c064 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -12,8 +12,6 @@ #include "ipc/mem/Pointer.h" #include "util.h" -#include - class String; namespace Ipc { @@ -71,137 +69,130 @@ public: * different value). We can add support for blocked writers if needed. */ class OneToOneUniQueue { -private: - struct Shared { - Shared(const unsigned int aMaxItemSize, const int aCapacity); - size_t sharedMemorySize() const; - static size_t SharedMemorySize(const unsigned int maxItemSize, const int capacity); - - unsigned int theIn; ///< input index, used only in push() - unsigned int theOut; ///< output index, used only in pop() - - AtomicWord theSize; ///< number of items in the queue - const unsigned int theMaxItemSize; ///< maximum item size - const int theCapacity; ///< maximum number of items, i.e. theBuffer size - - char theBuffer[]; - }; - public: // pop() and push() exceptions; TODO: use TextException instead class Full {}; class ItemTooLarge {}; - typedef Mem::Owner Owner; - - /// initialize shared memory - static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity); + OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); - OneToOneUniQueue(const String &id); + unsigned int maxItemSize() const { return theMaxItemSize; } + int size() const { return theSize; } + int capacity() const { return theCapacity; } + int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } - unsigned int maxItemSize() const { return shared->theMaxItemSize; } - int size() const { return shared->theSize; } - int capacity() const { return shared->theCapacity; } - - bool empty() const { return !shared->theSize; } - bool full() const { return shared->theSize == shared->theCapacity; } + bool empty() const { return !theSize; } + bool full() const { return theSize == theCapacity; } static int Bytes2Items(const unsigned int maxItemSize, int size); static int Items2Bytes(const unsigned int maxItemSize, const int size); /// returns true iff the value was set; [un]blocks the reader as needed - template bool pop(Value &value); + template bool pop(Value &value, QueueReader *const reader = NULL); /// returns true iff the caller must notify the reader of the pushed item - template bool push(const Value &value); - - QueueReader &reader(); - void reader(QueueReader *aReader); + template bool push(const Value &value, QueueReader *const reader = NULL); private: - Mem::Pointer shared; ///< pointer to shared memory - QueueReader *reader_; ///< the state of the code popping from this queue -}; - -/// Lockless fixed-capacity bidirectional queue for two processes. -class OneToOneBiQueue { -private: - typedef std::auto_ptr UniQueueOwner; + unsigned int theIn; ///< input index, used only in push() + unsigned int theOut; ///< output index, used only in pop() -public: - typedef OneToOneUniQueue::Full Full; - typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; + AtomicWord theSize; ///< number of items in the queue + const unsigned int theMaxItemSize; ///< maximum item size + const int theCapacity; ///< maximum number of items, i.e. theBuffer size - typedef std::pair Owner; + char theBuffer[]; +}; - /// initialize shared memory - static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity); +/// shared array of OneToOneUniQueues +class OneToOneUniQueues { +public: + OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); - enum Side { Side1 = 1, Side2 = 2 }; - OneToOneBiQueue(const String &id, const Side side); + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); - void readers(QueueReader *r1, QueueReader *r2); - void clearReaderSignal(); + const OneToOneUniQueue &operator [](const int index) const; + inline OneToOneUniQueue &operator [](const int index); - /* wrappers to call the right OneToOneUniQueue method for this process */ - template bool pop(Value &value) { return popQueue->pop(value); } - template bool push(const Value &value) { return pushQueue->push(value); } +private: + inline const OneToOneUniQueue &front() const; -//private: - std::auto_ptr popQueue; ///< queue to pop from for this process - std::auto_ptr pushQueue; ///< queue to push to for this process +public: + const int theCapacity; /// number of OneToOneUniQueues }; /** * Lockless fixed-capacity bidirectional queue for a limited number - * pricesses. Implements a star topology: Many worker processes - * communicate with the one central process. The central process uses - * FewToOneBiQueue object, while workers use OneToOneBiQueue objects - * created with the Attach() method. Each worker has a unique integer - * ID in [1, workerCount] range. + * processes. Allows communication between two groups of processes: + * any process in one group may send data to and receive from any + * process in another group, but processes in the same group can not + * communicate. Process in each group has a unique integer ID in + * [groupIdOffset, groupIdOffset + groupSize) range. */ -class FewToOneBiQueue { +class FewToFewBiQueue { public: - typedef OneToOneBiQueue::Full Full; - typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge; + typedef OneToOneUniQueue::Full Full; + typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; + +private: + /// Shared metadata for FewToFewBiQueue + struct Metadata { + Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); + size_t sharedMemorySize() const { return sizeof(*this); } + static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } + + const int theGroupASize; + const int theGroupAIdOffset; + const int theGroupBSize; + const int theGroupBIdOffset; + }; +public: class Owner { public: - Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity); + Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); ~Owner(); private: - Vector biQueueOwners; + Mem::Owner *const metadataOwner; + Mem::Owner *const queuesOwner; Mem::Owner *const readersOwner; }; - static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity); + static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); - FewToOneBiQueue(const String &id); - static OneToOneBiQueue *Attach(const String &id, const int workerId); - ~FewToOneBiQueue(); + enum Group { groupA = 0, groupB = 1 }; + FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); - bool validWorkerId(const int workerId) const; - int workerCount() const { return readers->theCapacity - 1; } + Group localGroup() const { return theLocalGroup; } + Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } - /// clears the reader notification received by the disker from worker - void clearReaderSignal(int workerId); + /// clears the reader notification received by the local process from the remote process + void clearReaderSignal(const int remoteProcessId); - /// picks a worker and calls OneToOneUniQueue::pop() using its queue - template bool pop(int &workerId, Value &value); + /// picks a process and calls OneToOneUniQueue::pop() using its queue + template bool pop(int &remoteProcessId, Value &value); - /// calls OneToOneUniQueue::push() using the given worker queue - template bool push(const int workerId, const Value &value); + /// calls OneToOneUniQueue::push() using the given process queue + template bool push(const int remoteProcessId, const Value &value); -//private: XXX: make private by moving pop/push debugging into pop/push - int theLastPopWorker; ///< the ID of the last worker we tried to pop() from - Vector biQueues; ///< worker queues indexed by worker ID +private: + bool validProcessId(const Group group, const int processId) const; + OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); + QueueReader &reader(const Group group, const int processId); + int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } + int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } +private: + const Mem::Pointer metadata; ///< shared metadata + const Mem::Pointer queues; ///< unidirection one-to-one queues const Mem::Pointer readers; ///< readers array - QueueReader *const reader; ///< the state of the code popping from all biQueues - enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now + const Group theLocalGroup; ///< group of this queue + const int theLocalProcessId; ///< process ID of this queue + int theLastPopProcessId; ///< the ID of the last process we tried to pop() from }; @@ -209,73 +200,99 @@ public: template bool -OneToOneUniQueue::pop(Value &value) +OneToOneUniQueue::pop(Value &value, QueueReader *const reader) { - if (sizeof(value) > shared->theMaxItemSize) + if (sizeof(value) > theMaxItemSize) throw ItemTooLarge(); // A writer might push between the empty test and block() below, so we do // not return false right after calling block(), but test again. if (empty()) { - reader().block(); + if (!reader) + return false; + + reader->block(); // A writer might push between the empty test and block() below, // so we must test again as such a writer will not signal us. if (empty()) return false; } - reader().unblock(); - const unsigned int pos = - (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize; - memcpy(&value, shared->theBuffer + pos, sizeof(value)); - --shared->theSize; + if (reader) + reader->unblock(); + + const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; + memcpy(&value, theBuffer + pos, sizeof(value)); + --theSize; return true; } template bool -OneToOneUniQueue::push(const Value &value) +OneToOneUniQueue::push(const Value &value, QueueReader *const reader) { - if (sizeof(value) > shared->theMaxItemSize) + if (sizeof(value) > theMaxItemSize) throw ItemTooLarge(); if (full()) throw Full(); const bool wasEmpty = empty(); - const unsigned int pos = - shared->theIn++ % shared->theCapacity * shared->theMaxItemSize; - memcpy(shared->theBuffer + pos, &value, sizeof(value)); - ++shared->theSize; + const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; + memcpy(theBuffer + pos, &value, sizeof(value)); + ++theSize; + + return wasEmpty && (!reader || reader->raiseSignal()); +} + - return wasEmpty && reader().raiseSignal(); +// OneToOneUniQueues + +inline OneToOneUniQueue & +OneToOneUniQueues::operator [](const int index) +{ + return const_cast((*const_cast(this))[index]); +} + +inline const OneToOneUniQueue & +OneToOneUniQueues::front() const +{ + const char *const queue = + reinterpret_cast(this) + sizeof(*this); + return *reinterpret_cast(queue); } -// FewToOneBiQueue +// FewToFewBiQueue template bool -FewToOneBiQueue::pop(int &workerId, Value &value) +FewToFewBiQueue::pop(int &remoteProcessId, Value &value) { - // iterate all workers, starting after the one we visited last - for (int i = 0; i < workerCount(); ++i) { - theLastPopWorker = (theLastPopWorker + 1) % workerCount(); - if (biQueues[theLastPopWorker]->pop(value)) { - workerId = theLastPopWorker + WorkerIdOffset; + // iterate all remote group processes, starting after the one we visited last + QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); + for (int i = 0; i < remoteGroupSize(); ++i) { + if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize()) + theLastPopProcessId = remoteGroupIdOffset(); + OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId); + if (queue.pop(value, &localReader)) { + remoteProcessId = theLastPopProcessId; + debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); return true; } } - return false; // no worker had anything to pop + return false; // no process had anything to pop } template bool -FewToOneBiQueue::push(const int workerId, const Value &value) +FewToFewBiQueue::push(const int remoteProcessId, const Value &value) { - assert(validWorkerId(workerId)); - return biQueues[workerId - WorkerIdOffset]->push(value); + OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); + QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId); + debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); + return remoteQueue.push(value, &remoteReader); } } // namespace Ipc diff --git a/src/ipc/mem/Pointer.h b/src/ipc/mem/Pointer.h index 3726b97807..01a606d746 100644 --- a/src/ipc/mem/Pointer.h +++ b/src/ipc/mem/Pointer.h @@ -27,6 +27,8 @@ public: static Owner *New(const char *const id, const P1 &p1, const P2 &p2); template static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3); + template + static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4); ~Owner(); @@ -72,7 +74,7 @@ private: typedef RefCount< Object > Base; public: - explicit Pointer(Object *const anObject): Base(anObject) {} + explicit Pointer(Object *const anObject = NULL): Base(anObject) {} Class *operator ->() const { return Base::operator ->()->theObject; } Class &operator *() const { return *Base::operator *().theObject; } @@ -137,6 +139,16 @@ Owner::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3 return owner; } +template template +Owner * +Owner::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4) +{ + const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3, p4); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3, p4); + return owner; +} + // Object implementation template