From: Dmitry Kurochkin Date: Mon, 25 Apr 2011 19:29:41 +0000 (+0400) Subject: Move shared queues to Ipc namespace. X-Git-Tag: take07~37 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=15cdbc7cd3261b5925f556a8901284ad13f58faa;p=thirdparty%2Fsquid.git Move shared queues to Ipc namespace. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index fd0865e22b..273556c531 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -92,8 +92,8 @@ private: static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); private: - typedef FewToOneBiQueue DiskerQueue; - typedef OneToOneBiQueue WorkerQueue; + typedef Ipc::FewToOneBiQueue DiskerQueue; + typedef Ipc::OneToOneBiQueue WorkerQueue; const String dbName; ///< the name of the file we are managing int diskId; ///< the process ID of the disker we talk to diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index a4294be48e..993e9aa2f5 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -31,29 +31,29 @@ ReaderId(String id) /* QueueReader */ -InstanceIdDefinitions(QueueReader, "ipcQR"); +InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); -QueueReader::QueueReader(): popBlocked(1), popSignal(0) +Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0) { debugs(54, 7, HERE << "constructed " << id); } /* QueueReaders */ -QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) +Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) { Must(theCapacity > 0); new (theReaders) QueueReader[theCapacity]; } size_t -QueueReaders::sharedMemorySize() const +Ipc::QueueReaders::sharedMemorySize() const { return SharedMemorySize(theCapacity); } size_t -QueueReaders::SharedMemorySize(const int capacity) +Ipc::QueueReaders::SharedMemorySize(const int capacity) { return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; } @@ -61,28 +61,28 @@ QueueReaders::SharedMemorySize(const int capacity) // OneToOneUniQueue -OneToOneUniQueue::Owner * -OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) +Ipc::OneToOneUniQueue::Owner * +Ipc::OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) { Must(maxItemSize > 0); Must(capacity > 0); return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity); } -OneToOneUniQueue::OneToOneUniQueue(const String &id): +Ipc::OneToOneUniQueue::OneToOneUniQueue(const String &id): shared(shm_old(Shared)(id.termedBuf())), reader_(NULL) { } void -OneToOneUniQueue::reader(QueueReader *aReader) +Ipc::OneToOneUniQueue::reader(QueueReader *aReader) { Must(!reader_ && aReader); reader_ = aReader; } int -OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) +Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) { assert(maxItemSize > 0); size -= sizeof(Shared); @@ -90,33 +90,33 @@ OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) } int -OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) +Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) { assert(size >= 0); return sizeof(Shared) + maxItemSize * size; } -QueueReader & -OneToOneUniQueue::reader() +Ipc::QueueReader & +Ipc::OneToOneUniQueue::reader() { Must(reader_); return *reader_; } -OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity): +Ipc::OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity): theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), theCapacity(aCapacity) { } size_t -OneToOneUniQueue::Shared::sharedMemorySize() const +Ipc::OneToOneUniQueue::Shared::sharedMemorySize() const { return SharedMemorySize(theMaxItemSize, theCapacity); } size_t -OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity) +Ipc::OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity) { return Items2Bytes(maxItemSize, capacity); } @@ -124,8 +124,8 @@ OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const // OneToOneBiQueue -OneToOneBiQueue::Owner * -OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) +Ipc::OneToOneBiQueue::Owner * +Ipc::OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) { UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity)); UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity)); @@ -135,7 +135,7 @@ OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const in return owner; } -OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side) +Ipc::OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side) { OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1)); OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2)); @@ -154,14 +154,14 @@ OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side) } void -OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2) +Ipc::OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2) { popQueue->reader(r1); pushQueue->reader(r2); } void -OneToOneBiQueue::clearReaderSignal() +Ipc::OneToOneBiQueue::clearReaderSignal() { debugs(54, 7, HERE << "reader: " << &popQueue->reader()); popQueue->reader().clearSignal(); @@ -170,13 +170,13 @@ OneToOneBiQueue::clearReaderSignal() // FewToOneBiQueue -FewToOneBiQueue::Owner * -FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity) +Ipc::FewToOneBiQueue::Owner * +Ipc::FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity) { return new Owner(id, workerCount, maxItemSize, capacity); } -FewToOneBiQueue::FewToOneBiQueue(const String &id): +Ipc::FewToOneBiQueue::FewToOneBiQueue(const String &id): theLastPopWorker(0), readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())), reader(readers->theReaders) @@ -194,10 +194,10 @@ FewToOneBiQueue::FewToOneBiQueue(const String &id): } } -OneToOneBiQueue * -FewToOneBiQueue::Attach(const String &id, const int workerId) +Ipc::OneToOneBiQueue * +Ipc::FewToOneBiQueue::Attach(const String &id, const int workerId) { - Ipc::Mem::Pointer readers = shm_old(QueueReaders)(ReaderId(id).termedBuf()); + Mem::Pointer readers = shm_old(QueueReaders)(ReaderId(id).termedBuf()); Must(workerId >= WorkerIdOffset); Must(workerId < readers->theCapacity - 1 + WorkerIdOffset); QueueReader *const remoteReader = readers->theReaders; @@ -211,26 +211,27 @@ FewToOneBiQueue::Attach(const String &id, const int workerId) biQueue->readers(localReader, remoteReader); // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker? - const Ipc::Mem::Pointer *const leakingReaders = new Ipc::Mem::Pointer(readers); + const Mem::Pointer *const leakingReaders = new Mem::Pointer(readers); Must(leakingReaders); // silence unused variable warning return biQueue; } -FewToOneBiQueue::~FewToOneBiQueue() +Ipc::FewToOneBiQueue::~FewToOneBiQueue() { for (int i = 0; i < workerCount(); ++i) delete biQueues[i]; } -bool FewToOneBiQueue::validWorkerId(const int workerId) const +bool +Ipc::FewToOneBiQueue::validWorkerId(const int workerId) const { return WorkerIdOffset <= workerId && workerId < WorkerIdOffset + workerCount(); } void -FewToOneBiQueue::clearReaderSignal(int workerId) +Ipc::FewToOneBiQueue::clearReaderSignal(int workerId) { debugs(54, 7, HERE << "reader: " << reader->id); @@ -243,7 +244,7 @@ FewToOneBiQueue::clearReaderSignal(int workerId) // theLastPopWorker = (workerId + workerCount() - 1) % workerCount(); } -FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity): +Ipc::FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity): readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1)) { biQueueOwners.reserve(workerCount); @@ -253,7 +254,7 @@ FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const uns } } -FewToOneBiQueue::Owner::~Owner() +Ipc::FewToOneBiQueue::Owner::~Owner() { for (size_t i = 0; i < biQueueOwners.size(); ++i) delete biQueueOwners[i]; diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 99698003a5..ca449e338e 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -16,6 +16,8 @@ class String; +namespace Ipc { + /// State of the reading end of a queue (i.e., of the code calling pop()). /// Multiple queues attached to one reader share this state. class QueueReader { @@ -90,7 +92,7 @@ public: class Full {}; class ItemTooLarge {}; - typedef Ipc::Mem::Owner Owner; + typedef Mem::Owner Owner; /// initialize shared memory static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity); @@ -118,7 +120,7 @@ public: private: - Ipc::Mem::Pointer shared; ///< pointer to shared memory + Mem::Pointer shared; ///< pointer to shared memory QueueReader *reader_; ///< the state of the code popping from this queue }; @@ -171,7 +173,7 @@ public: private: Vector biQueueOwners; - Ipc::Mem::Owner *const readersOwner; + Mem::Owner *const readersOwner; }; static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity); @@ -196,7 +198,7 @@ public: int theLastPopWorker; ///< the ID of the last worker we tried to pop() from Vector biQueues; ///< worker queues indexed by worker ID - const Ipc::Mem::Pointer readers; ///< readers array + const Mem::Pointer readers; ///< readers array QueueReader *const reader; ///< the state of the code popping from all biQueues enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now @@ -276,4 +278,6 @@ FewToOneBiQueue::push(const int workerId, const Value &value) return biQueues[workerId - WorkerIdOffset]->push(value); } +} // namespace Ipc + #endif // SQUID_IPC_QUEUE_H