From 807feb1d580d72cdc3d0847e30e474702078da18 Mon Sep 17 00:00:00 2001 From: Dmitry Kurochkin Date: Wed, 29 May 2013 10:04:40 -0600 Subject: [PATCH] Added BaseMultiQueue class, a common base of the old FewToFewBiQueue class and the new MultiQueue class. Added MultiQueue, a lockless fixed-capacity bidirectional queue for a limited number processes. Any process may send data to and receive from any other process (including itself). Used for collapsed forwarding notifications. Added CollapsedForwarding class to send and handle received collapsed forwarding notifications using MultiQueue. Write partial Rock pages to disk in order to propagate data from the hit writer to the collapsed hit readers. Send collapsed forwarding notification after data was written to disk. Missing code to share locked StoreMap entries, kick collapsed hit readers, and to disable notifications in no-daemon mode. --- src/CollapsedForwarding.cc | 142 ++++++++++++++++++++++ src/CollapsedForwarding.h | 40 +++++++ src/Makefile.am | 2 + src/fs/rock/RockIoState.cc | 25 ++-- src/fs/rock/RockIoState.h | 1 + src/fs/rock/RockSwapDir.cc | 4 + src/ipc/Messages.h | 2 +- src/ipc/Queue.cc | 236 ++++++++++++++++++++++++++++--------- src/ipc/Queue.h | 236 +++++++++++++++++++++++++------------ src/ipc/Strand.cc | 5 + 10 files changed, 558 insertions(+), 135 deletions(-) create mode 100644 src/CollapsedForwarding.cc create mode 100644 src/CollapsedForwarding.h diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc new file mode 100644 index 0000000000..657e6d6c09 --- /dev/null +++ b/src/CollapsedForwarding.cc @@ -0,0 +1,142 @@ +/* + * DEBUG: section 17 Request Forwarding + * + */ + +#include "squid.h" +#include "ipc/mem/Segment.h" +#include "ipc/Messages.h" +#include "ipc/Port.h" +#include "ipc/TypedMsgHdr.h" +#include "CollapsedForwarding.h" +#include "SquidConfig.h" +#include "globals.h" +#include "tools.h" + +/// shared memory segment path to use for CollapsedForwarding queue +static const char *const ShmLabel = "cf"; +/// a single worker-to-worker queue capacity +// TODO: make configurable or compute from squid.conf settings if possible +static const int QueueCapacity = 1024; + +std::auto_ptr CollapsedForwarding::queue; + +/// IPC queue message +class CollapsedForwardingMsg +{ +public: + CollapsedForwardingMsg(): processId(-1) {} + +public: + int processId; /// ID of sending process + // XXX: add entry info +}; + +// CollapsedForwarding + +void +CollapsedForwarding::Init() +{ + Must(!queue.get()); + queue.reset(new Queue(ShmLabel, KidIdentifier)); +} + +void +CollapsedForwarding::NewData(const StoreIOState &sio) +{ + CollapsedForwardingMsg msg; + msg.processId = KidIdentifier; + // XXX: copy data from sio + + // TODO: send only to workers who are waiting for data + // XXX: does not work for non-daemon mode? + for (int workerId = 1; workerId <= Config.workers; ++workerId) { + try { + if (queue->push(workerId, msg)) + Notify(workerId); + } catch (const Queue::Full &) { + debugs(17, DBG_IMPORTANT, "Worker collapsed forwarding push queue " + "overflow: " << workerId); // TODO: report queue len + // TODO: grow queue size + } + } +} + +void +CollapsedForwarding::Notify(const int workerId) +{ + // TODO: Count and report the total number of notifications, pops, pushes. + debugs(17, 7, HERE << "kid" << workerId); + Ipc::TypedMsgHdr msg; + // TODO: add proper message type? + msg.setType(Ipc::mtCollapsedForwardingNotification); + msg.putInt(KidIdentifier); + const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId); + Ipc::SendMessage(addr, msg); +} + +void +CollapsedForwarding::HandleNewData(const char *const when) +{ + debugs(17, 4, HERE << "popping all " << when); + CollapsedForwardingMsg msg; + int workerId; + int poppedCount = 0; + while (queue->pop(workerId, msg)) { + debugs(17, 3, HERE << "collapsed forwarding data message from " << + workerId); + if (workerId != msg.processId) { + debugs(17, DBG_IMPORTANT, HERE << "mismatching IDs: " << workerId << + " != " << msg.processId); + } + + // XXX: stop and schedule an async call to continue + assert(++poppedCount < SQUID_MAXFD); + } +} + +void +CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) +{ + const int from = msg.getInt(); + debugs(17, 7, HERE << "from " << from); + queue->clearReaderSignal(from); + HandleNewData("after notification"); +} + +/// initializes shared queue used by CollapsedForwarding +class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner +{ +public: + /* RegisteredRunner API */ + CollapsedForwardingRr(): owner(NULL) {} + virtual ~CollapsedForwardingRr(); + +protected: + virtual void create(const RunnerRegistry &); + virtual void open(const RunnerRegistry &); + +private: + Ipc::MultiQueue::Owner *owner; +}; + +RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr); + +void CollapsedForwardingRr::create(const RunnerRegistry &) +{ + Must(!owner); + owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1, + sizeof(CollapsedForwardingMsg), + QueueCapacity); +} + +void CollapsedForwardingRr::open(const RunnerRegistry &) +{ + if (IamWorkerProcess()) + CollapsedForwarding::Init(); +} + +CollapsedForwardingRr::~CollapsedForwardingRr() +{ + delete owner; +} diff --git a/src/CollapsedForwarding.h b/src/CollapsedForwarding.h new file mode 100644 index 0000000000..e183802f44 --- /dev/null +++ b/src/CollapsedForwarding.h @@ -0,0 +1,40 @@ +/* + * DEBUG: section 17 Request Forwarding + * + */ + +#ifndef SQUID_COLLAPSED_FORWARDING_H +#define SQUID_COLLAPSED_FORWARDING_H + +#include "ipc/Queue.h" +#include "ipc/forward.h" + +#include + +class StoreIOState; + +/// Sends and handles collapsed forwarding notifications. +class CollapsedForwarding +{ +public: + /// open shared memory segment + static void Init(); + + /// notify other workers that new data is available + static void NewData(const StoreIOState &sio); + + /// kick worker with empty IPC queue + static void Notify(const int workerId); + + /// handle new data messages in IPC queue + static void HandleNewData(const char *const when); + + /// handle queue push notifications from worker or disker + static void HandleNotification(const Ipc::TypedMsgHdr &msg); + +private: + typedef Ipc::MultiQueue Queue; + static std::auto_ptr queue; ///< IPC queue +}; + +#endif /* SQUID_COLLAPSED_FORWARDING_H */ diff --git a/src/Makefile.am b/src/Makefile.am index 22a24e6dd1..fae409a8d5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -302,6 +302,8 @@ squid_SOURCES = \ ClientRequestContext.h \ clientStream.cc \ clientStream.h \ + CollapsedForwarding.cc \ + CollapsedForwarding.h \ CompletionDispatcher.cc \ CompletionDispatcher.h \ CommRead.h \ diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index 4eab56931c..f8e461eb83 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -177,6 +177,9 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff) writeToDisk(sidNext); } } + + // XXX: check that there are workers waiting for data, i.e. readers > 0 + writeBufToDisk(); } /// Buffers incoming data for the current slot. @@ -215,10 +218,6 @@ Rock::IoState::writeToDisk(const SlotId sidNext) // TODO: if DiskIO module is mmap-based, we should be writing whole pages // to avoid triggering read-page;new_head+old_tail;write-page overheads - const uint64_t diskOffset = dir->diskOffset(sidCurrent); - debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' << - theBuf.size); - // finalize map slice Ipc::StoreMap::Slice &slice = dir->map->writeableSlice(swap_filen, sidCurrent); @@ -237,6 +236,15 @@ Rock::IoState::writeToDisk(const SlotId sidNext) // copy finalized db cell header into buffer memcpy(theBuf.mem, &header, sizeof(DbCellHeader)); + writeBufToDisk(sidNext < 0); + theBuf.clear(); + + sidCurrent = sidNext; +} + +void +Rock::IoState::writeBufToDisk(const bool last) +{ // and now allocate another buffer for the WriteRequest so that // we can support concurrent WriteRequests (and to ease cleaning) // TODO: should we limit the number of outstanding requests? @@ -244,12 +252,13 @@ Rock::IoState::writeToDisk(const SlotId sidNext) void *wBuf = memAllocBuf(theBuf.size, &wBufCap); memcpy(wBuf, theBuf.mem, theBuf.size); + const uint64_t diskOffset = dir->diskOffset(sidCurrent); + debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' << + theBuf.size); + WriteRequest *const r = new WriteRequest( ::WriteRequest(static_cast(wBuf), diskOffset, theBuf.size, - memFreeBufFunc(wBufCap)), this, sidNext < 0); - theBuf.clear(); - - sidCurrent = sidNext; + memFreeBufFunc(wBufCap)), this, last); // theFile->write may call writeCompleted immediatelly theFile->write(r); diff --git a/src/fs/rock/RockIoState.h b/src/fs/rock/RockIoState.h index 8c4ff5313f..ee6f6a3dfc 100644 --- a/src/fs/rock/RockIoState.h +++ b/src/fs/rock/RockIoState.h @@ -50,6 +50,7 @@ private: void tryWrite(char const *buf, size_t size, off_t offset); size_t writeToBuffer(char const *buf, size_t size); void writeToDisk(const SlotId nextSlot); + void writeBufToDisk(const bool last = false); SlotId reserveSlotForWriting(); void callBack(int errflag); diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index a0c3151668..5b08b23839 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -4,6 +4,7 @@ #include "squid.h" #include "cache_cf.h" +#include "CollapsedForwarding.h" #include "ConfigOption.h" #include "DiskIO/DiskIOModule.h" #include "DiskIO/DiskIOStrategy.h" @@ -747,6 +748,9 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest return; } + // XXX: check that there are workers waiting for data, i.e. readers > 0 + CollapsedForwarding::NewData(sio); + if (errflag == DISK_OK) { // do not increment sio.offset_ because we do it in sio->write() if (request->isLast) { diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index 05f8fd1ee5..14837ff81b 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -15,7 +15,7 @@ namespace Ipc typedef enum { mtNone = 0, mtRegistration, mtStrandSearchRequest, mtStrandSearchResponse, mtSharedListenRequest, mtSharedListenResponse, - mtIpcIoNotification, + mtIpcIoNotification, mtCollapsedForwardingNotification, mtCacheMgrRequest, mtCacheMgrResponse #if SQUID_SNMP , diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 06e01c61b0..120cc5a849 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -9,6 +9,8 @@ #include "globals.h" #include "ipc/Queue.h" +#include + /// constructs Metadata ID from parent queue ID static String MetadataId(String id) @@ -121,6 +123,70 @@ Ipc::OneToOneUniQueues::operator [](const int index) const return *reinterpret_cast(queue); } +// BaseMultiQueue + +Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId): + theLocalProcessId(aLocalProcessId), + theLastPopProcessId(std::numeric_limits::max() - 1) +{ +} + +void +Ipc::BaseMultiQueue::clearReaderSignal(const int remoteProcessId) +{ + QueueReader &reader = localReader(); + debugs(54, 7, HERE << "reader: " << reader.id); + + reader.clearSignal(); + + // we got a hint; we could reposition iteration to try popping from the + // remoteProcessId queue first; but it does not seem to help much and might + // introduce some bias so we do not do that for now: + // theLastPopProcessId = remoteProcessId; +} + +const Ipc::QueueReader::Balance & +Ipc::BaseMultiQueue::balance(const int remoteProcessId) const +{ + const QueueReader &r = remoteReader(remoteProcessId); + return r.balance; +} + +const Ipc::QueueReader::Rate & +Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const +{ + const QueueReader &r = remoteReader(remoteProcessId); + return r.rateLimit; +} + +Ipc::OneToOneUniQueue & +Ipc::BaseMultiQueue::inQueue(const int remoteProcessId) { + const OneToOneUniQueue &queue = + const_cast(this)->inQueue(remoteProcessId); + return const_cast(queue); +} + +Ipc::OneToOneUniQueue & +Ipc::BaseMultiQueue::outQueue(const int remoteProcessId) { + const OneToOneUniQueue &queue = + const_cast(this)->outQueue(remoteProcessId); + return const_cast(queue); +} + +Ipc::QueueReader & +Ipc::BaseMultiQueue::localReader() { + const QueueReader &reader = + const_cast(this)->localReader(); + return const_cast(reader); +} + +Ipc::QueueReader & +Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId) { + const QueueReader &reader = + const_cast(this)->remoteReader(remoteProcessId); + return const_cast(reader); +} + // FewToFewBiQueue Ipc::FewToFewBiQueue::Owner * @@ -130,17 +196,16 @@ Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int gro } Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): + BaseMultiQueue(aLocalProcessId), metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), - theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), - theLastPopProcessId(readers->theCapacity) + theLocalGroup(aLocalGroup) { Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); - const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); - debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); + debugs(54, 7, HERE << "queue " << id << " reader: " << localReader().id); } int @@ -185,19 +250,12 @@ Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromPr return index; } -Ipc::OneToOneUniQueue & -Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) -{ - return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; -} - const Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const { return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; } -/// incoming queue from a given remote process const Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const { @@ -205,7 +263,6 @@ Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const theLocalGroup, theLocalProcessId); } -/// outgoing queue to a given remote process const Ipc::OneToOneUniQueue & Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const { @@ -222,77 +279,150 @@ Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const metadata->theGroupASize + processId - metadata->theGroupBIdOffset; } -Ipc::QueueReader & -Ipc::FewToFewBiQueue::reader(const Group group, const int processId) +const Ipc::QueueReader & +Ipc::FewToFewBiQueue::localReader() const { - return readers->theReaders[readerIndex(group, processId)]; + return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)]; } const Ipc::QueueReader & -Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const +Ipc::FewToFewBiQueue::remoteReader(const int processId) const { - return readers->theReaders[readerIndex(group, processId)]; + return readers->theReaders[readerIndex(remoteGroup(), processId)]; } -void -Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) +int +Ipc::FewToFewBiQueue::remotesCount() const { - QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); - debugs(54, 7, HERE << "reader: " << localReader.id); + return theLocalGroup == groupA ? metadata->theGroupBSize : + metadata->theGroupASize; +} - Must(validProcessId(remoteGroup(), remoteProcessId)); - localReader.clearSignal(); +int +Ipc::FewToFewBiQueue::remotesIdOffset() const +{ + return theLocalGroup == groupA ? metadata->theGroupBIdOffset : + metadata->theGroupAIdOffset; +} - // we got a hint; we could reposition iteration to try popping from the - // remoteProcessId queue first; but it does not seem to help much and might - // introduce some bias so we do not do that for now: - // theLastPopProcessId = remoteProcessId; +Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): + theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), + theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) +{ + Must(theGroupASize > 0); + Must(theGroupBSize > 0); +} + +Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): + metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), + queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), + readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) +{ } -Ipc::QueueReader::Balance & -Ipc::FewToFewBiQueue::localBalance() +Ipc::FewToFewBiQueue::Owner::~Owner() { - QueueReader &r = reader(theLocalGroup, theLocalProcessId); - return r.balance; + delete metadataOwner; + delete queuesOwner; + delete readersOwner; } -const Ipc::QueueReader::Balance & -Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const +// MultiQueue + +Ipc::MultiQueue::Owner * +Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity) { - const QueueReader &r = reader(remoteGroup(), remoteProcessId); - return r.balance; + return new Owner(id, processCount, processIdOffset, maxItemSize, capacity); } -Ipc::QueueReader::Rate & -Ipc::FewToFewBiQueue::localRateLimit() +Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId): + BaseMultiQueue(localProcessId), + metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), + queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), + readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())) { - QueueReader &r = reader(theLocalGroup, theLocalProcessId); - return r.rateLimit; + Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount); + Must(readers->theCapacity == metadata->theProcessCount); + + debugs(54, 7, HERE << "queue " << id << " reader: " << localReader().id); } -const Ipc::QueueReader::Rate & -Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const +bool +Ipc::MultiQueue::validProcessId(const int processId) const { - const QueueReader &r = reader(remoteGroup(), remoteProcessId); - return r.rateLimit; + return metadata->theProcessIdOffset <= processId && + processId < metadata->theProcessIdOffset + metadata->theProcessCount; } -Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): - theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), - theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) +const Ipc::OneToOneUniQueue & +Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const { - Must(theGroupASize > 0); - Must(theGroupBSize > 0); + assert(validProcessId(fromProcessId)); + assert(validProcessId(toProcessId)); + const int fromIndex = fromProcessId - metadata->theProcessIdOffset; + const int toIndex = toProcessId - metadata->theProcessIdOffset; + const int index = fromIndex * metadata->theProcessCount + toIndex; + return (*queues)[index]; } -Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): - metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), - queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), - readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) +const Ipc::QueueReader & +Ipc::MultiQueue::reader(const int processId) const { + assert(validProcessId(processId)); + const int index = processId - metadata->theProcessIdOffset; + return readers->theReaders[index]; } -Ipc::FewToFewBiQueue::Owner::~Owner() +const Ipc::OneToOneUniQueue & +Ipc::MultiQueue::inQueue(const int remoteProcessId) const +{ + return oneToOneQueue(remoteProcessId, theLocalProcessId); +} + +const Ipc::OneToOneUniQueue & +Ipc::MultiQueue::outQueue(const int remoteProcessId) const +{ + return oneToOneQueue(theLocalProcessId, remoteProcessId); +} + +const Ipc::QueueReader & +Ipc::MultiQueue::localReader() const +{ + return reader(theLocalProcessId); +} + +const Ipc::QueueReader & +Ipc::MultiQueue::remoteReader(const int processId) const +{ + return reader(processId); +} + +int +Ipc::MultiQueue::remotesCount() const +{ + return metadata->theProcessCount; +} + +int +Ipc::MultiQueue::remotesIdOffset() const +{ + return metadata->theProcessIdOffset; +} + +Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset): + theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset) +{ + Must(theProcessCount > 0); +} + +Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity): + metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)), + queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)), + readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount)) +{ +} + +Ipc::MultiQueue::Owner::~Owner() { delete metadataOwner; delete queuesOwner; diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 8311a13a35..47e0de6185 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -140,6 +140,70 @@ public: const int theCapacity; /// number of OneToOneUniQueues }; +/** + * Base class for lockless fixed-capacity bidirectional queues for a + * limited number processes. + */ +class BaseMultiQueue +{ +public: + BaseMultiQueue(const int aLocalProcessId); + + /// clears the reader notification received by the local process from the remote process + void clearReaderSignal(const int remoteProcessId); + + /// picks a process and calls OneToOneUniQueue::pop() using its queue + template bool pop(int &remoteProcessId, Value &value); + + /// calls OneToOneUniQueue::push() using the given process queue + template bool push(const int remoteProcessId, const Value &value); + + /// peeks at the item likely to be pop()ed next + template bool peek(int &remoteProcessId, Value &value) const; + + /// returns local reader's balance + QueueReader::Balance &localBalance() { return localReader().balance; } + + /// returns reader's balance for a given remote process + const QueueReader::Balance &balance(const int remoteProcessId) const; + + /// returns local reader's rate limit + QueueReader::Rate &localRateLimit() { return localReader().rateLimit; } + + /// returns reader's rate limit for a given remote process + const QueueReader::Rate &rateLimit(const int remoteProcessId) const; + + /// number of items in incoming queue from a given remote process + int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } + + /// number of items in outgoing queue to a given remote process + int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } + +protected: + /// incoming queue from a given remote process + virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0; + OneToOneUniQueue &inQueue(const int remoteProcessId); + + /// outgoing queue to a given remote process + virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0; + OneToOneUniQueue &outQueue(const int remoteProcessId); + + virtual const QueueReader &localReader() const = 0; + QueueReader &localReader(); + + virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0; + QueueReader &remoteReader(const int remoteProcessId); + + virtual int remotesCount() const = 0; + virtual int remotesIdOffset() const = 0; + +protected: + const int theLocalProcessId; ///< process ID of this queue + +private: + int theLastPopProcessId; ///< the ID of the last process we tried to pop() from +}; + /** * Lockless fixed-capacity bidirectional queue for a limited number * processes. Allows communication between two groups of processes: @@ -148,7 +212,7 @@ public: * communicate. Process in each group has a unique integer ID in * [groupIdOffset, groupIdOffset + groupSize) range. */ -class FewToFewBiQueue +class FewToFewBiQueue: public BaseMultiQueue { public: typedef OneToOneUniQueue::Full Full; @@ -188,64 +252,91 @@ public: /// maximum number of items in the queue static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); + /// finds the oldest item in incoming and outgoing queues between + /// us and the given remote process + template bool findOldest(const int remoteProcessId, Value &value) const; + +protected: + virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const; + virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const; + virtual const QueueReader &localReader() const; + virtual const QueueReader &remoteReader(const int processId) const; + virtual int remotesCount() const; + virtual int remotesIdOffset() const; + +private: + bool validProcessId(const Group group, const int processId) const; + int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; + const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; + int readerIndex(const Group group, const int processId) const; Group localGroup() const { return theLocalGroup; } Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } - /// clears the reader notification received by the local process from the remote process - void clearReaderSignal(const int remoteProcessId); - - /// picks a process and calls OneToOneUniQueue::pop() using its queue - template bool pop(int &remoteProcessId, Value &value); +private: + const Mem::Pointer metadata; ///< shared metadata + const Mem::Pointer queues; ///< unidirection one-to-one queues + const Mem::Pointer readers; ///< readers array - /// calls OneToOneUniQueue::push() using the given process queue - template bool push(const int remoteProcessId, const Value &value); + const Group theLocalGroup; ///< group of this queue +}; - /// finds the oldest item in incoming and outgoing queues between - /// us and the given remote process - template bool findOldest(const int remoteProcessId, Value &value) const; +/** + * Lockless fixed-capacity bidirectional queue for a limited number + * processes. Any process may send data to and receive from any other + * process (including itself). Each process has a unique integer ID in + * [processIdOffset, processIdOffset + processCount) range. + */ +class MultiQueue: public BaseMultiQueue +{ +public: + typedef OneToOneUniQueue::Full Full; + typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; - /// peeks at the item likely to be pop()ed next - template bool peek(int &remoteProcessId, Value &value) const; +private: + /// Shared metadata for MultiQueue + struct Metadata { + Metadata(const int aProcessCount, const int aProcessIdOffset); + size_t sharedMemorySize() const { return sizeof(*this); } + static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); } - /// returns local reader's balance - QueueReader::Balance &localBalance(); + const int theProcessCount; + const int theProcessIdOffset; + }; - /// returns reader's balance for a given remote process - const QueueReader::Balance &balance(const int remoteProcessId) const; +public: + class Owner + { + public: + Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); + ~Owner(); - /// returns local reader's rate limit - QueueReader::Rate &localRateLimit(); + private: + Mem::Owner *const metadataOwner; + Mem::Owner *const queuesOwner; + Mem::Owner *const readersOwner; + }; - /// returns reader's rate limit for a given remote process - const QueueReader::Rate &rateLimit(const int remoteProcessId) const; + static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); - /// number of items in incoming queue from a given remote process - int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } + MultiQueue(const String &id, const int localProcessId); - /// number of items in outgoing queue to a given remote process - int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } +protected: + virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const; + virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const; + virtual const QueueReader &localReader() const; + virtual const QueueReader &remoteReader(const int remoteProcessId) const; + virtual int remotesCount() const; + virtual int remotesIdOffset() const; private: - bool validProcessId(const Group group, const int processId) const; - int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; - const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; - OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); - const OneToOneUniQueue &inQueue(const int remoteProcessId) const; - const OneToOneUniQueue &outQueue(const int remoteProcessId) const; - QueueReader &reader(const Group group, const int processId); - const QueueReader &reader(const Group group, const int processId) const; - int readerIndex(const Group group, const int processId) const; - int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } - int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } + bool validProcessId(const int processId) const; + const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const; + const QueueReader &reader(const int processId) const; private: const Mem::Pointer metadata; ///< shared metadata const Mem::Pointer queues; ///< unidirection one-to-one queues const Mem::Pointer readers; ///< readers array - - const Group theLocalGroup; ///< group of this queue - const int theLocalProcessId; ///< process ID of this queue - int theLastPopProcessId; ///< the ID of the last process we tried to pop() from }; // OneToOneUniQueue @@ -330,19 +421,18 @@ OneToOneUniQueues::front() const return *reinterpret_cast(queue); } -// FewToFewBiQueue +// BaseMultiQueue template bool -FewToFewBiQueue::pop(int &remoteProcessId, Value &value) +BaseMultiQueue::pop(int &remoteProcessId, Value &value) { - // iterate all remote group processes, starting after the one we visited last - QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); - for (int i = 0; i < remoteGroupSize(); ++i) { - if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize()) - theLastPopProcessId = remoteGroupIdOffset(); - OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId); - if (queue.pop(value, &localReader)) { + // iterate all remote processes, starting after the one we visited last + for (int i = 0; i < remotesCount(); ++i) { + if (++theLastPopProcessId >= remotesIdOffset() + remotesCount()) + theLastPopProcessId = remotesIdOffset(); + OneToOneUniQueue &queue = inQueue(theLastPopProcessId); + if (queue.pop(value, &localReader())) { remoteProcessId = theLastPopProcessId; debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); return true; @@ -353,14 +443,34 @@ FewToFewBiQueue::pop(int &remoteProcessId, Value &value) template bool -FewToFewBiQueue::push(const int remoteProcessId, const Value &value) +BaseMultiQueue::push(const int remoteProcessId, const Value &value) { - OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); - QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId); + OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId); + QueueReader &reader = remoteReader(remoteProcessId); debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); - return remoteQueue.push(value, &remoteReader); + return remoteQueue.push(value, &reader); } +template +bool +BaseMultiQueue::peek(int &remoteProcessId, Value &value) const +{ + // mimic FewToFewBiQueue::pop() but quit just before popping + int popProcessId = theLastPopProcessId; // preserve for future pop() + for (int i = 0; i < remotesCount(); ++i) { + if (++popProcessId >= remotesIdOffset() + remotesCount()) + popProcessId = remotesIdOffset(); + const OneToOneUniQueue &queue = inQueue(popProcessId); + if (queue.peek(value)) { + remoteProcessId = popProcessId; + return true; + } + } + return false; // most likely, no process had anything to pop +} + +// FewToFewBiQueue + template bool FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const @@ -383,26 +493,6 @@ FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const return out.peek(value); } -template -bool -FewToFewBiQueue::peek(int &remoteProcessId, Value &value) const -{ - // mimic FewToFewBiQueue::pop() but quit just before popping - int popProcessId = theLastPopProcessId; // preserve for future pop() - for (int i = 0; i < remoteGroupSize(); ++i) { - if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize()) - popProcessId = remoteGroupIdOffset(); - const OneToOneUniQueue &queue = - oneToOneQueue(remoteGroup(), popProcessId, - theLocalGroup, theLocalProcessId); - if (queue.peek(value)) { - remoteProcessId = popProcessId; - return true; - } - } - return false; // most likely, no process had anything to pop -} - } // namespace Ipc #endif // SQUID_IPC_QUEUE_H diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index 363e706e0c..10dc1befd1 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -19,6 +19,7 @@ #include "mgr/Forwarder.h" #include "SwapDir.h" /* XXX: scope boundary violation */ #include "CacheManager.h" +#include "CollapsedForwarding.h" #if USE_DISKIO_IPCIO #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */ #endif @@ -89,6 +90,10 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) } break; + case mtCollapsedForwardingNotification: + CollapsedForwarding::HandleNotification(message); + break; + #if SQUID_SNMP case mtSnmpRequest: { const Snmp::Request req(message); -- 2.39.5