/* QueueReader */
-InstanceIdDefinitions(QueueReader, "ipcQR");
+InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
-QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
{
debugs(54, 7, HERE << "constructed " << id);
}
/* QueueReaders */
-QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
+Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
{
Must(theCapacity > 0);
new (theReaders) QueueReader[theCapacity];
}
size_t
-QueueReaders::sharedMemorySize() const
+Ipc::QueueReaders::sharedMemorySize() const
{
return SharedMemorySize(theCapacity);
}
size_t
-QueueReaders::SharedMemorySize(const int capacity)
+Ipc::QueueReaders::SharedMemorySize(const int capacity)
{
return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
}
// OneToOneUniQueue
-OneToOneUniQueue::Owner *
-OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
+Ipc::OneToOneUniQueue::Owner *
+Ipc::OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
{
Must(maxItemSize > 0);
Must(capacity > 0);
return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity);
}
-OneToOneUniQueue::OneToOneUniQueue(const String &id):
+Ipc::OneToOneUniQueue::OneToOneUniQueue(const String &id):
shared(shm_old(Shared)(id.termedBuf())), reader_(NULL)
{
}
void
-OneToOneUniQueue::reader(QueueReader *aReader)
+Ipc::OneToOneUniQueue::reader(QueueReader *aReader)
{
Must(!reader_ && aReader);
reader_ = aReader;
}
int
-OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
+Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
{
assert(maxItemSize > 0);
size -= sizeof(Shared);
}
int
-OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
+Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
{
assert(size >= 0);
return sizeof(Shared) + maxItemSize * size;
}
-QueueReader &
-OneToOneUniQueue::reader()
+Ipc::QueueReader &
+Ipc::OneToOneUniQueue::reader()
{
Must(reader_);
return *reader_;
}
-OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
+Ipc::OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
theCapacity(aCapacity)
{
}
size_t
-OneToOneUniQueue::Shared::sharedMemorySize() const
+Ipc::OneToOneUniQueue::Shared::sharedMemorySize() const
{
return SharedMemorySize(theMaxItemSize, theCapacity);
}
size_t
-OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity)
+Ipc::OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity)
{
return Items2Bytes(maxItemSize, capacity);
}
// OneToOneBiQueue
-OneToOneBiQueue::Owner *
-OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
+Ipc::OneToOneBiQueue::Owner *
+Ipc::OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
{
UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity));
UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity));
return owner;
}
-OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side)
+Ipc::OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side)
{
OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1));
OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2));
}
void
-OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+Ipc::OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
{
popQueue->reader(r1);
pushQueue->reader(r2);
}
void
-OneToOneBiQueue::clearReaderSignal()
+Ipc::OneToOneBiQueue::clearReaderSignal()
{
debugs(54, 7, HERE << "reader: " << &popQueue->reader());
popQueue->reader().clearSignal();
// FewToOneBiQueue
-FewToOneBiQueue::Owner *
-FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity)
+Ipc::FewToOneBiQueue::Owner *
+Ipc::FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity)
{
return new Owner(id, workerCount, maxItemSize, capacity);
}
-FewToOneBiQueue::FewToOneBiQueue(const String &id):
+Ipc::FewToOneBiQueue::FewToOneBiQueue(const String &id):
theLastPopWorker(0),
readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())),
reader(readers->theReaders)
}
}
-OneToOneBiQueue *
-FewToOneBiQueue::Attach(const String &id, const int workerId)
+Ipc::OneToOneBiQueue *
+Ipc::FewToOneBiQueue::Attach(const String &id, const int workerId)
{
- Ipc::Mem::Pointer<QueueReaders> readers = shm_old(QueueReaders)(ReaderId(id).termedBuf());
+ Mem::Pointer<QueueReaders> readers = shm_old(QueueReaders)(ReaderId(id).termedBuf());
Must(workerId >= WorkerIdOffset);
Must(workerId < readers->theCapacity - 1 + WorkerIdOffset);
QueueReader *const remoteReader = readers->theReaders;
biQueue->readers(localReader, remoteReader);
// XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
- const Ipc::Mem::Pointer<QueueReaders> *const leakingReaders = new Ipc::Mem::Pointer<QueueReaders>(readers);
+ const Mem::Pointer<QueueReaders> *const leakingReaders = new Mem::Pointer<QueueReaders>(readers);
Must(leakingReaders); // silence unused variable warning
return biQueue;
}
-FewToOneBiQueue::~FewToOneBiQueue()
+Ipc::FewToOneBiQueue::~FewToOneBiQueue()
{
for (int i = 0; i < workerCount(); ++i)
delete biQueues[i];
}
-bool FewToOneBiQueue::validWorkerId(const int workerId) const
+bool
+Ipc::FewToOneBiQueue::validWorkerId(const int workerId) const
{
return WorkerIdOffset <= workerId &&
workerId < WorkerIdOffset + workerCount();
}
void
-FewToOneBiQueue::clearReaderSignal(int workerId)
+Ipc::FewToOneBiQueue::clearReaderSignal(int workerId)
{
debugs(54, 7, HERE << "reader: " << reader->id);
// theLastPopWorker = (workerId + workerCount() - 1) % workerCount();
}
-FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity):
+Ipc::FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity):
readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1))
{
biQueueOwners.reserve(workerCount);
}
}
-FewToOneBiQueue::Owner::~Owner()
+Ipc::FewToOneBiQueue::Owner::~Owner()
{
for (size_t i = 0; i < biQueueOwners.size(); ++i)
delete biQueueOwners[i];
class String;
+namespace Ipc {
+
/// State of the reading end of a queue (i.e., of the code calling pop()).
/// Multiple queues attached to one reader share this state.
class QueueReader {
class Full {};
class ItemTooLarge {};
- typedef Ipc::Mem::Owner<Shared> Owner;
+ typedef Mem::Owner<Shared> Owner;
/// initialize shared memory
static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
private:
- Ipc::Mem::Pointer<Shared> shared; ///< pointer to shared memory
+ Mem::Pointer<Shared> shared; ///< pointer to shared memory
QueueReader *reader_; ///< the state of the code popping from this queue
};
private:
Vector<OneToOneBiQueue::Owner *> biQueueOwners;
- Ipc::Mem::Owner<QueueReaders> *const readersOwner;
+ Mem::Owner<QueueReaders> *const readersOwner;
};
static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
- const Ipc::Mem::Pointer<QueueReaders> readers; ///< readers array
+ const Mem::Pointer<QueueReaders> readers; ///< readers array
QueueReader *const reader; ///< the state of the code popping from all biQueues
enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
return biQueues[workerId - WorkerIdOffset]->push(value);
}
+} // namespace Ipc
+
#endif // SQUID_IPC_QUEUE_H