From: Dmitry Kurochkin Date: Wed, 2 Mar 2011 01:08:08 +0000 (+0300) Subject: IpcIoFile atomic queue v4: X-Git-Tag: take06~57^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=7a907247328fc608422087ea3f9c7efae0553073;p=thirdparty%2Fsquid.git IpcIoFile atomic queue v4: * change Queue push/pop interface * fix leak in IpcIoFile::push() --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index eb51a503f8..1de66ab9df 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -173,7 +173,7 @@ IpcIoFile::read(ReadRequest *readRequest) IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); pending->readRequest = readRequest; - push(*pending); + push(pending); } void @@ -214,7 +214,7 @@ IpcIoFile::write(WriteRequest *writeRequest) IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); pending->writeRequest = writeRequest; - push(*pending); + push(pending); } void @@ -256,47 +256,48 @@ IpcIoFile::ioInProgress() const /// track a new pending request void -IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending) +IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) { - newerRequests->insert(std::make_pair(lastRequestId, &pending)); + newerRequests->insert(std::make_pair(lastRequestId, pending)); if (!timeoutCheckScheduled) scheduleTimeoutCheck(); } /// push an I/O request to disker void -IpcIoFile::push(IpcIoPendingRequest &pending) +IpcIoFile::push(IpcIoPendingRequest *const pending) { debugs(47, 7, HERE); Must(diskId >= 0); Must(workerQueue); - Must(pending.readRequest || pending.writeRequest); + Must(pending); + Must(pending->readRequest || pending->writeRequest); IpcIoMsg ipcIo; ipcIo.requestId = lastRequestId; - if (pending.readRequest) { + if (pending->readRequest) { ipcIo.command = IpcIo::cmdRead; - ipcIo.offset = pending.readRequest->offset; - ipcIo.len = pending.readRequest->len; + ipcIo.offset = pending->readRequest->offset; + ipcIo.len = pending->readRequest->len; assert(ipcIo.len <= sizeof(ipcIo.buf)); - memcpy(ipcIo.buf, pending.readRequest->buf, ipcIo.len); // optimize away - } else { // pending.writeRequest + memcpy(ipcIo.buf, pending->readRequest->buf, ipcIo.len); // optimize away + } else { // pending->writeRequest ipcIo.command = IpcIo::cmdWrite; - ipcIo.offset = pending.writeRequest->offset; - ipcIo.len = pending.writeRequest->len; + ipcIo.offset = pending->writeRequest->offset; + ipcIo.len = pending->writeRequest->len; assert(ipcIo.len <= sizeof(ipcIo.buf)); - memcpy(ipcIo.buf, pending.writeRequest->buf, ipcIo.len); // optimize away + memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away } - if (!workerQueue->push(ipcIo)) { - pending.completeIo(NULL); - return; + try { + if (workerQueue->push(ipcIo)) + Notify(diskId); // notify disker + trackPendingRequest(pending); + } catch (const WorkerQueue::Full &) { + // XXX: grow queue size? + pending->completeIo(NULL); + delete pending; } - - if (workerQueue->pushedSize() == 1) - Notify(diskId); // notify disker - - trackPendingRequest(pending); } /// called when coordinator responds to worker open request @@ -322,9 +323,13 @@ void IpcIoFile::handleResponses() { Must(workerQueue); - IpcIoMsg ipcIo; - while (workerQueue->pop(ipcIo)) - handleResponse(ipcIo); + try { + while (true) { + IpcIoMsg ipcIo; + workerQueue->pop(ipcIo); // XXX: notify disker? + handleResponse(ipcIo); + } + } catch (const WorkerQueue::Empty &) {} } void @@ -549,10 +554,14 @@ void IpcIoFile::DiskerHandleRequests() { Must(diskerQueue); - int workerId; - IpcIoMsg ipcIo; - while (diskerQueue->pop(workerId, ipcIo)) - DiskerHandleRequest(workerId, ipcIo); + try { + while (true) { + int workerId; + IpcIoMsg ipcIo; + diskerQueue->pop(workerId, ipcIo); // XXX: notify worker? + DiskerHandleRequest(workerId, ipcIo); + } + } catch (const DiskerQueue::Empty &) {} } /// called when disker receives an I/O request @@ -578,10 +587,12 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) else // ipcIo.command == IpcIo::cmdWrite diskerWrite(ipcIo); - diskerQueue->push(workerId, ipcIo); // XXX: report warning - - if (diskerQueue->pushedSize(workerId) == 1) - Notify(workerId + 1); // notify worker + try { + if (diskerQueue->push(workerId, ipcIo)) + Notify(workerId + 1); // notify worker + } catch (const DiskerQueue::Full &) { + // XXX: grow queue size? + } } static bool diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index c92833396e..d4c24df794 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -74,8 +74,8 @@ protected: void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); private: - void trackPendingRequest(IpcIoPendingRequest &pending); - void push(IpcIoPendingRequest &pending); + void trackPendingRequest(IpcIoPendingRequest *const pending); + void push(IpcIoPendingRequest *const pending); IpcIoPendingRequest *dequeueRequest(const unsigned int requestId); static void Notify(const int peerId); diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 25839851b6..c3b4b42913 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -16,6 +16,8 @@ class String; /// Lockless fixed-capacity queue for a single writer and a single reader. class OneToOneUniQueue { public: + class Empty {}; + class Full {}; class ItemTooLarge {}; OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity); @@ -32,9 +34,9 @@ public: static int Items2Bytes(const unsigned int maxItemSize, const int size); template - bool pop(Value &value); ///< returns false iff the queue is empty + bool pop(Value &value); ///< returns true iff the queue was full template - bool push(const Value &value); ///< returns false iff the queue is full + bool push(const Value &value); ///< returns true iff the queue was empty private: struct Shared { @@ -57,12 +59,14 @@ private: /// Lockless fixed-capacity bidirectional queue for two processes. class OneToOneBiQueue { public: + typedef OneToOneUniQueue::Empty Empty; + typedef OneToOneUniQueue::Full Full; + typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; + /// Create a new shared queue. OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity); OneToOneBiQueue(const String &id); ///< Attach to existing shared queue. - int pushedSize() const { return pushQueue->size(); } - template bool pop(Value &value) { return popQueue->pop(value); } template @@ -83,18 +87,21 @@ private: */ class FewToOneBiQueue { public: + typedef OneToOneBiQueue::Empty Empty; + typedef OneToOneBiQueue::Full Full; + typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge; + FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity); static OneToOneBiQueue *Attach(const String &id, const int workerId); ~FewToOneBiQueue(); bool validWorkerId(const int workerId) const; int workerCount() const { return theWorkerCount; } - int pushedSize(const int workerId) const; template - bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty + bool pop(int &workerId, Value &value); ///< returns false iff the queue was full template - bool push(const int workerId, const Value &value); ///< returns false iff the queue is full + bool push(const int workerId, const Value &value); ///< returns false iff the queue was empty private: int theLastPopWorkerId; ///< the last worker ID we pop()ed from @@ -113,13 +120,14 @@ OneToOneUniQueue::pop(Value &value) throw ItemTooLarge(); if (empty()) - return false; + throw Empty(); + const bool wasFull = full(); const unsigned int pos = shared->theOut++ % shared->theCapacity * shared->theMaxItemSize; memcpy(&value, shared->theBuffer + pos, sizeof(value)); --shared->theSize; - return true; + return wasFull; } template @@ -130,13 +138,14 @@ OneToOneUniQueue::push(const Value &value) throw ItemTooLarge(); if (full()) - return false; + throw Full(); + const bool wasEmpty = empty(); const unsigned int pos = shared->theIn++ % shared->theCapacity * shared->theMaxItemSize; memcpy(shared->theBuffer + pos, &value, sizeof(value)); ++shared->theSize; - return true; + return wasEmpty; } @@ -149,12 +158,13 @@ FewToOneBiQueue::pop(int &workerId, Value &value) ++theLastPopWorkerId; for (int i = 0; i < theWorkerCount; ++i) { theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount; - if (biQueues[theLastPopWorkerId]->pop(value)) { + try { + const bool wasFull = biQueues[theLastPopWorkerId]->pop(value); workerId = theLastPopWorkerId; - return true; - } + return wasFull; + } catch (const Empty &) {} } - return false; + throw Empty(); } template