From: Amos Jeffries Date: Tue, 2 Jun 2015 15:28:56 +0000 (-0700) Subject: Convert Ipc::Queue to std::atomic X-Git-Tag: merge-candidate-3-v1~72^2~3 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=6c6a656afd32a3b3293ec87ce7ad42045d83c610;p=thirdparty%2Fsquid.git Convert Ipc::Queue to std::atomic --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 4b650ecd75..0297e06e63 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -113,8 +113,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) IpcIoFiles.insert(std::make_pair(diskId, this)).second; Must(inserted); - queue->localRateLimit() = - static_cast(config.ioRate); + queue->localRateLimit().store(config.ioRate); Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); ann.strand.tag = dbName; @@ -396,7 +395,7 @@ IpcIoFile::canWait() const const int oldestWait = tvSubMsec(oldestIo.start, current_time); int rateWait = -1; // time in millisecons - const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId); + const int ioRate = queue->rateLimit(diskId).load(); if (ioRate > 0) { // if there are N requests pending, the new one will wait at // least N/max-swap-rate seconds @@ -750,7 +749,7 @@ IpcIoFile::DiskerHandleMoreRequests(void *source) bool IpcIoFile::WaitBeforePop() { - const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit(); + const int ioRate = queue->localRateLimit().load(); const double maxRate = ioRate/1e3; // req/ms // do we need to enforce configured I/O rate? diff --git a/src/MemStore.cc b/src/MemStore.cc index 47c908e867..e9142c52f3 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -12,6 +12,7 @@ #include "base/RunnersRegistry.h" #include "CollapsedForwarding.h" #include "HttpReply.h" +#include "ipc/AtomicWord.h" #include "ipc/mem/Page.h" #include "ipc/mem/Pages.h" #include "MemObject.h" diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 9e24870d07..853374e4de 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -44,7 +44,7 @@ ReadersId(String id) InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); -Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0), +Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false), rateLimit(0), balance(0) { debugs(54, 7, HERE << "constructed " << id); diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 310ba21d15..b6128de515 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -11,11 +11,12 @@ #include "base/InstanceId.h" #include "Debug.h" -#include "ipc/AtomicWord.h" #include "ipc/mem/FlexibleArray.h" #include "ipc/mem/Pointer.h" #include "util.h" +#include + class String; namespace Ipc @@ -29,31 +30,31 @@ public: QueueReader(); // the initial state is "blocked without a signal" /// whether the reader is waiting for a notification signal - bool blocked() const { return popBlocked == 1; } + bool blocked() const { return popBlocked.load(); } /// marks the reader as blocked, waiting for a notification signal - void block() { popBlocked.swap_if(0, 1); } + void block() { popBlocked.store(true); } /// removes the block() effects - void unblock() { popBlocked.swap_if(1, 0); } + void unblock() { popBlocked.store(false); } /// if reader is blocked and not notified, marks the notification signal /// as sent and not received, returning true; otherwise, returns false - bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); } + bool raiseSignal() { return blocked() && !popSignal.exchange(true); } /// marks sent reader notification as received (also removes pop blocking) - void clearSignal() { unblock(); popSignal.swap_if(1,0); } + void clearSignal() { unblock(); popSignal.store(false); } private: - Atomic::Word popBlocked; ///< whether the reader is blocked on pop() - Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification + std::atomic popBlocked; ///< whether the reader is blocked on pop() + std::atomic popSignal; ///< whether writer has sent and reader has not received notification public: - typedef Atomic::Word Rate; ///< pop()s per second + typedef std::atomic Rate; ///< pop()s per second Rate rateLimit; ///< pop()s per second limit if positive // we need a signed atomic type because balance may get negative - typedef Atomic::WordT AtomicSignedMsec; + typedef std::atomic AtomicSignedMsec; typedef AtomicSignedMsec Balance; /// how far ahead the reader is compared to a perfect read/sec event rate Balance balance; @@ -118,9 +119,9 @@ private: unsigned int theIn; ///< input index, used only in push() unsigned int theOut; ///< output index, used only in pop() - Atomic::Word theSize; ///< number of items in the queue + std::atomic theSize; ///< number of items in the queue const unsigned int theMaxItemSize; ///< maximum item size - const int theCapacity; ///< maximum number of items, i.e. theBuffer size + const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size char theBuffer[]; };