From: Dmitry Kurochkin Date: Wed, 2 Mar 2011 18:04:38 +0000 (+0300) Subject: IpcIoFile atomic queue v5: X-Git-Tag: take06~56^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=948230dec3eb62d434f79723455e3d070e214d85;p=thirdparty%2Fsquid.git IpcIoFile atomic queue v5: * add missing src/ipc/Queue.cc --- diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc new file mode 100644 index 0000000000..e86fe37ac3 --- /dev/null +++ b/src/ipc/Queue.cc @@ -0,0 +1,104 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#include "config.h" +#include "ipc/Queue.h" + + +static String +QueueId(String id, const int idx) +{ + id.append("__"); + id.append(xitoa(idx)); + return id; +} + + +// OneToOneUniQueue + +OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity): + shm(id.termedBuf()) +{ + shm.create(Items2Bytes(maxItemSize, capacity)); + assert(shm.mem()); + shared = new (shm.mem()) Shared(maxItemSize, capacity); +} + +OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()) +{ + shm.open(); + shared = reinterpret_cast(shm.mem()); + assert(shared); +} + +int +OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) +{ + assert(maxItemSize > 0); + size -= sizeof(Shared); + return size >= 0 ? size / maxItemSize : 0; +} + +int +OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) +{ + assert(size >= 0); + return sizeof(Shared) + maxItemSize * size; +} + +OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity): + theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), + theCapacity(aCapacity) +{ +} + + +// OneToOneBiQueue + +OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity): + popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)), + pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity)) +{ +} + +OneToOneBiQueue::OneToOneBiQueue(const String &id): + popQueue(new OneToOneUniQueue(QueueId(id, 2))), + pushQueue(new OneToOneUniQueue(QueueId(id, 1))) +{ +} + + +// FewToOneBiQueue + +FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity): + theLastPopWorkerId(-1), theWorkerCount(aWorkerCount) +{ + assert(theWorkerCount >= 0); + biQueues.reserve(theWorkerCount); + for (int i = 0; i < theWorkerCount; ++i) { + OneToOneBiQueue *const biQueue = + new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity); + biQueues.push_back(biQueue); + } +} + +OneToOneBiQueue * +FewToOneBiQueue::Attach(const String &id, const int workerId) +{ + return new OneToOneBiQueue(QueueId(id, workerId)); +} + +FewToOneBiQueue::~FewToOneBiQueue() +{ + for (int i = 0; i < theWorkerCount; ++i) + delete biQueues[i]; +} + +bool FewToOneBiQueue::validWorkerId(const int workerId) const +{ + return 0 <= workerId && workerId < theWorkerCount; +}