]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
IpcIoFile atomic queue v5:
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 2 Mar 2011 18:04:38 +0000 (21:04 +0300)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 2 Mar 2011 18:04:38 +0000 (21:04 +0300)
* add missing src/ipc/Queue.cc

src/ipc/Queue.cc [new file with mode: 0644]

diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc
new file mode 100644 (file)
index 0000000..e86fe37
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "ipc/Queue.h"
+
+
+static String
+QueueId(String id, const int idx)
+{
+    id.append("__");
+    id.append(xitoa(idx));
+    return id;
+}
+
+
+// OneToOneUniQueue
+
+OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
+    shm(id.termedBuf())
+{
+    shm.create(Items2Bytes(maxItemSize, capacity));
+    assert(shm.mem());
+    shared = new (shm.mem()) Shared(maxItemSize, capacity);
+}
+
+OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf())
+{
+    shm.open();
+    shared = reinterpret_cast<Shared *>(shm.mem());
+    assert(shared);
+}
+
+int
+OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
+{
+    assert(maxItemSize > 0);
+    size -= sizeof(Shared);
+    return size >= 0 ? size / maxItemSize : 0;
+}
+
+int
+OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
+{
+    assert(size >= 0);
+    return sizeof(Shared) + maxItemSize * size;
+}
+
+OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
+    theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+    theCapacity(aCapacity)
+{
+}
+
+
+// OneToOneBiQueue
+
+OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity):
+    popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)),
+    pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity))
+{
+}
+
+OneToOneBiQueue::OneToOneBiQueue(const String &id):
+    popQueue(new OneToOneUniQueue(QueueId(id, 2))),
+    pushQueue(new OneToOneUniQueue(QueueId(id, 1)))
+{
+}
+
+
+// FewToOneBiQueue
+
+FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
+    theLastPopWorkerId(-1), theWorkerCount(aWorkerCount)
+{
+    assert(theWorkerCount >= 0);
+    biQueues.reserve(theWorkerCount);
+    for (int i = 0; i < theWorkerCount; ++i) {
+        OneToOneBiQueue *const biQueue =
+            new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
+        biQueues.push_back(biQueue);
+    }
+}
+
+OneToOneBiQueue *
+FewToOneBiQueue::Attach(const String &id, const int workerId)
+{
+    return new OneToOneBiQueue(QueueId(id, workerId));
+}
+
+FewToOneBiQueue::~FewToOneBiQueue()
+{
+    for (int i = 0; i < theWorkerCount; ++i)
+        delete biQueues[i];
+}
+
+bool FewToOneBiQueue::validWorkerId(const int workerId) const
+{
+    return 0 <= workerId && workerId < theWorkerCount;
+}