IpcIoFiles.insert(std::make_pair(diskId, this)).second;
Must(inserted);
- queue->localRateLimit() =
- static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
+ queue->localRateLimit().store(config.ioRate);
Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
ann.strand.tag = dbName;
const int oldestWait = tvSubMsec(oldestIo.start, current_time);
int rateWait = -1; // time in millisecons
- const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
+ const int ioRate = queue->rateLimit(diskId).load();
if (ioRate > 0) {
// if there are N requests pending, the new one will wait at
// least N/max-swap-rate seconds
bool
IpcIoFile::WaitBeforePop()
{
- const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
+ const int ioRate = queue->localRateLimit().load();
const double maxRate = ioRate/1e3; // req/ms
// do we need to enforce configured I/O rate?
#include "base/InstanceId.h"
#include "Debug.h"
-#include "ipc/AtomicWord.h"
#include "ipc/mem/FlexibleArray.h"
#include "ipc/mem/Pointer.h"
#include "util.h"
+#include <atomic>
+
class String;
namespace Ipc
QueueReader(); // the initial state is "blocked without a signal"
/// whether the reader is waiting for a notification signal
- bool blocked() const { return popBlocked == 1; }
+ bool blocked() const { return popBlocked.load(); }
/// marks the reader as blocked, waiting for a notification signal
- void block() { popBlocked.swap_if(0, 1); }
+ void block() { popBlocked.store(true); }
/// removes the block() effects
- void unblock() { popBlocked.swap_if(1, 0); }
+ void unblock() { popBlocked.store(false); }
/// if reader is blocked and not notified, marks the notification signal
/// as sent and not received, returning true; otherwise, returns false
- bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
+ bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
/// marks sent reader notification as received (also removes pop blocking)
- void clearSignal() { unblock(); popSignal.swap_if(1,0); }
+ void clearSignal() { unblock(); popSignal.store(false); }
private:
- Atomic::Word popBlocked; ///< whether the reader is blocked on pop()
- Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification
+ std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop()
+ std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification
public:
- typedef Atomic::Word Rate; ///< pop()s per second
+ typedef std::atomic<int> Rate; ///< pop()s per second
Rate rateLimit; ///< pop()s per second limit if positive
// we need a signed atomic type because balance may get negative
- typedef Atomic::WordT<int> AtomicSignedMsec;
+ typedef std::atomic<int> AtomicSignedMsec;
typedef AtomicSignedMsec Balance;
/// how far ahead the reader is compared to a perfect read/sec event rate
Balance balance;
unsigned int theIn; ///< input index, used only in push()
unsigned int theOut; ///< output index, used only in pop()
- Atomic::Word theSize; ///< number of items in the queue
+ std::atomic<uint32_t> theSize; ///< number of items in the queue
const unsigned int theMaxItemSize; ///< maximum item size
- const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+ const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size
char theBuffer[];
};