]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
IpcIoFile atomic queue v4:
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 2 Mar 2011 01:08:08 +0000 (04:08 +0300)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 2 Mar 2011 01:08:08 +0000 (04:08 +0300)
* change Queue push/pop interface
* fix leak in IpcIoFile::push()

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/ipc/Queue.h

index eb51a503f886458fcf8ec19534eaa1eb1dc7025b..1de66ab9dfb04ada59782e908f469e4568729847 100644 (file)
@@ -173,7 +173,7 @@ IpcIoFile::read(ReadRequest *readRequest)
 
     IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
     pending->readRequest = readRequest;
-    push(*pending);
+    push(pending);
 }
 
 void
@@ -214,7 +214,7 @@ IpcIoFile::write(WriteRequest *writeRequest)
 
     IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
     pending->writeRequest = writeRequest;
-    push(*pending);
+    push(pending);
 }
 
 void
@@ -256,47 +256,48 @@ IpcIoFile::ioInProgress() const
 
 /// track a new pending request
 void
-IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
+IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
 {
-    newerRequests->insert(std::make_pair(lastRequestId, &pending));
+    newerRequests->insert(std::make_pair(lastRequestId, pending));
     if (!timeoutCheckScheduled)
         scheduleTimeoutCheck();
 }
 
 /// push an I/O request to disker
 void
-IpcIoFile::push(IpcIoPendingRequest &pending)
+IpcIoFile::push(IpcIoPendingRequest *const pending)
 {
     debugs(47, 7, HERE);
     Must(diskId >= 0);
     Must(workerQueue);
-    Must(pending.readRequest || pending.writeRequest);
+    Must(pending);
+    Must(pending->readRequest || pending->writeRequest);
 
     IpcIoMsg ipcIo;
     ipcIo.requestId = lastRequestId;
-    if (pending.readRequest) {
+    if (pending->readRequest) {
         ipcIo.command = IpcIo::cmdRead;
-        ipcIo.offset = pending.readRequest->offset;
-        ipcIo.len = pending.readRequest->len;
+        ipcIo.offset = pending->readRequest->offset;
+        ipcIo.len = pending->readRequest->len;
         assert(ipcIo.len <= sizeof(ipcIo.buf));
-        memcpy(ipcIo.buf, pending.readRequest->buf, ipcIo.len); // optimize away
-    } else { // pending.writeRequest
+        memcpy(ipcIo.buf, pending->readRequest->buf, ipcIo.len); // optimize away
+    } else { // pending->writeRequest
         ipcIo.command = IpcIo::cmdWrite;
-        ipcIo.offset = pending.writeRequest->offset;
-        ipcIo.len = pending.writeRequest->len;
+        ipcIo.offset = pending->writeRequest->offset;
+        ipcIo.len = pending->writeRequest->len;
         assert(ipcIo.len <= sizeof(ipcIo.buf));
-        memcpy(ipcIo.buf, pending.writeRequest->buf, ipcIo.len); // optimize away
+        memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away
     }
 
-    if (!workerQueue->push(ipcIo)) {
-        pending.completeIo(NULL);
-        return;
+    try {
+        if (workerQueue->push(ipcIo))
+            Notify(diskId); // notify disker
+        trackPendingRequest(pending);
+    } catch (const WorkerQueue::Full &) {
+        // XXX: grow queue size?
+        pending->completeIo(NULL);
+        delete pending;
     }
-
-    if (workerQueue->pushedSize() == 1)
-        Notify(diskId); // notify disker
-
-    trackPendingRequest(pending);
 }
 
 /// called when coordinator responds to worker open request
@@ -322,9 +323,13 @@ void
 IpcIoFile::handleResponses()
 {
     Must(workerQueue);
-    IpcIoMsg ipcIo;
-    while (workerQueue->pop(ipcIo))
-        handleResponse(ipcIo);
+    try {
+        while (true) {
+            IpcIoMsg ipcIo;
+            workerQueue->pop(ipcIo); // XXX: notify disker?
+            handleResponse(ipcIo);
+        }
+    } catch (const WorkerQueue::Empty &) {}
 }
 
 void
@@ -549,10 +554,14 @@ void
 IpcIoFile::DiskerHandleRequests()
 {
     Must(diskerQueue);
-    int workerId;
-    IpcIoMsg ipcIo;
-    while (diskerQueue->pop(workerId, ipcIo))
-        DiskerHandleRequest(workerId, ipcIo);
+    try {
+        while (true) {
+            int workerId;
+            IpcIoMsg ipcIo;
+            diskerQueue->pop(workerId, ipcIo); // XXX: notify worker?
+            DiskerHandleRequest(workerId, ipcIo);
+        }
+    } catch (const DiskerQueue::Empty &) {}
 }
 
 /// called when disker receives an I/O request
@@ -578,10 +587,12 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
     else // ipcIo.command == IpcIo::cmdWrite
         diskerWrite(ipcIo);
 
-    diskerQueue->push(workerId, ipcIo); // XXX: report warning
-
-    if (diskerQueue->pushedSize(workerId) == 1)
-        Notify(workerId + 1); // notify worker
+    try {
+        if (diskerQueue->push(workerId, ipcIo))
+            Notify(workerId + 1); // notify worker
+    } catch (const DiskerQueue::Full &) {
+        // XXX: grow queue size?
+    }
 }
 
 static bool
index c92833396e4b93b5da37744133fb676946a33a5c..d4c24df794097694a0b445fbc25924f881e3cf7e 100644 (file)
@@ -74,8 +74,8 @@ protected:
     void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
 
 private:
-    void trackPendingRequest(IpcIoPendingRequest &pending);
-    void push(IpcIoPendingRequest &pending);
+    void trackPendingRequest(IpcIoPendingRequest *const pending);
+    void push(IpcIoPendingRequest *const pending);
     IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
 
     static void Notify(const int peerId);
index 25839851b6ca6ac73122fe4403b0e8f6a1f01937..c3b4b42913352fa481afdf225c257a6347093e6d 100644 (file)
@@ -16,6 +16,8 @@ class String;
 /// Lockless fixed-capacity queue for a single writer and a single reader.
 class OneToOneUniQueue {
 public:
+    class Empty {};
+    class Full {};
     class ItemTooLarge {};
 
     OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
@@ -32,9 +34,9 @@ public:
     static int Items2Bytes(const unsigned int maxItemSize, const int size);
 
     template <class Value>
-    bool pop(Value &value); ///< returns false iff the queue is empty
+    bool pop(Value &value); ///< returns true iff the queue was full
     template <class Value>
-    bool push(const Value &value); ///< returns false iff the queue is full
+    bool push(const Value &value); ///< returns true iff the queue was empty
 
 private:
     struct Shared {
@@ -57,12 +59,14 @@ private:
 /// Lockless fixed-capacity bidirectional queue for two processes.
 class OneToOneBiQueue {
 public:
+    typedef OneToOneUniQueue::Empty Empty;
+    typedef OneToOneUniQueue::Full Full;
+    typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
     /// Create a new shared queue.
     OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
     OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
 
-    int pushedSize() const { return pushQueue->size(); }
-
     template <class Value>
     bool pop(Value &value) { return popQueue->pop(value); }
     template <class Value>
@@ -83,18 +87,21 @@ private:
  */
 class FewToOneBiQueue {
 public:
+    typedef OneToOneBiQueue::Empty Empty;
+    typedef OneToOneBiQueue::Full Full;
+    typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
+
     FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
     static OneToOneBiQueue *Attach(const String &id, const int workerId);
     ~FewToOneBiQueue();
 
     bool validWorkerId(const int workerId) const;
     int workerCount() const { return theWorkerCount; }
-    int pushedSize(const int workerId) const;
 
     template <class Value>
-    bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty
+    bool pop(int &workerId, Value &value); ///< returns false iff the queue was full
     template <class Value>
-    bool push(const int workerId, const Value &value); ///< returns false iff the queue is full
+    bool push(const int workerId, const Value &value); ///< returns false iff the queue was empty
 
 private:
     int theLastPopWorkerId; ///< the last worker ID we pop()ed from
@@ -113,13 +120,14 @@ OneToOneUniQueue::pop(Value &value)
         throw ItemTooLarge();
 
     if (empty())
-        return false;
+        throw Empty();
 
+    const bool wasFull = full();
     const unsigned int pos =
         shared->theOut++ % shared->theCapacity * shared->theMaxItemSize;
     memcpy(&value, shared->theBuffer + pos, sizeof(value));
     --shared->theSize;
-    return true;
+    return wasFull;
 }
 
 template <class Value>
@@ -130,13 +138,14 @@ OneToOneUniQueue::push(const Value &value)
         throw ItemTooLarge();
 
     if (full())
-        return false;
+        throw Full();
 
+    const bool wasEmpty = empty();
     const unsigned int pos =
         shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
     memcpy(shared->theBuffer + pos, &value, sizeof(value));
     ++shared->theSize;
-    return true;
+    return wasEmpty;
 }
 
 
@@ -149,12 +158,13 @@ FewToOneBiQueue::pop(int &workerId, Value &value)
     ++theLastPopWorkerId;
     for (int i = 0; i < theWorkerCount; ++i) {
         theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount;
-        if (biQueues[theLastPopWorkerId]->pop(value)) {
+        try {
+            const bool wasFull = biQueues[theLastPopWorkerId]->pop(value);
             workerId = theLastPopWorkerId;
-            return true;
-        }
+            return wasFull;
+        } catch (const Empty &) {}
     }
-    return false;
+    throw Empty();
 }
 
 template <class Value>