#include "ipc/mem/Page.h"
#include "ipc/mem/PageStack.h"
-/// used to mark a stack slot available for storing free page offsets
-const Ipc::Mem::PageStack::Value Writable = 0;
-
-Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize):
- thePoolId(aPoolId), theCapacity(aCapacity), thePageSize(aPageSize),
- theSize(theCapacity),
- theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable)),
- theItems(aCapacity)
+/* Ipc::Mem::PageStackStorageSlot */
+
+static_assert(sizeof(Ipc::Mem::PageStackStorageSlot::Pointer) ==
+ sizeof(decltype(Ipc::Mem::PageId::number)), "page indexing types are consistent");
+
+void
+Ipc::Mem::PageStackStorageSlot::take()
+{
+ const auto nxt = nextOrMarker.exchange(TakenPage);
+ assert(nxt != TakenPage);
+}
+
+void
+Ipc::Mem::PageStackStorageSlot::put(const PointerOrMarker expected, const Pointer nxt)
+{
+ assert(nxt != TakenPage);
+ const auto old = nextOrMarker.exchange(nxt);
+ assert(old == expected);
+}
+
+/* Ipc::Mem::PageStack */
+
+Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const PageCount aCapacity, const size_t aPageSize):
+ thePoolId(aPoolId), capacity_(aCapacity), thePageSize(aPageSize),
+ size_(0),
+ head_(Slot::NilPtr),
+ slots_(aCapacity)
{
+ assert(capacity_ < Slot::TakenPage);
+ assert(capacity_ < Slot::NilPtr);
+
// initially, all pages are free
- for (Offset i = 0; i < theSize; ++i)
- theItems[i] = i + 1; // skip page number zero to keep numbers positive
+ if (capacity_) {
+ const auto lastIndex = capacity_-1;
+ // FlexibleArray cannot construct its phantom elements so, technically,
+ // all slots (except the very first one) are uninitialized until now.
+ for (Slot::Pointer i = 0; i < lastIndex; ++i)
+ (void)new(&slots_[i])Slot(i+1);
+ (void)new(&slots_[lastIndex])Slot(Slot::NilPtr);
+ size_ = capacity_;
+ head_ = 0;
+ }
}
-/*
- * TODO: We currently rely on the theLastReadable hint during each
- * loop iteration. We could also use hint just for the start position:
- * (const Offset start = theLastReadable) and then scan the stack
- * sequentially regardless of theLastReadable changes by others. Which
- * approach is better? Same for push().
- */
bool
Ipc::Mem::PageStack::pop(PageId &page)
{
- Must(!page);
-
- // we may fail to dequeue, but be conservative to prevent long searches
- --theSize;
-
- // find a Readable slot, starting with theLastReadable and going left
- while (theSize >= 0) {
- Offset idx = theLastReadable;
- // mark the slot at ids Writable while extracting its current value
- const Value value = theItems[idx].fetch_and(0); // works if Writable is 0
- const bool popped = value != Writable;
- // theItems[idx] is probably not Readable [any more]
-
- // Whether we popped a Readable value or not, we should try going left
- // to maintain the index (and make progress).
- // We may fail if others already updated the index, but that is OK.
- theLastReadable.compare_exchange_weak(idx, prev(idx)); // may fail or lie
-
- if (popped) {
- // the slot we emptied may already be filled, but that is OK
- theFirstWritable = idx; // may lie
- page.pool = thePoolId;
- page.number = value;
- debugs(54, 9, page << " at " << idx << " size: " << theSize);
- return true;
- }
- // TODO: report suspiciously long loops
- }
-
- ++theSize;
- return false;
+ assert(!page);
+
+ Slot::Pointer current = head_.load();
+
+ auto nextFree = Slot::NilPtr;
+ do {
+ if (current == Slot::NilPtr)
+ return false;
+ nextFree = slots_[current].next();
+ } while (!head_.compare_exchange_weak(current, nextFree));
+
+ // must decrement after removing the page to avoid underflow
+ const auto newSize = --size_;
+ assert(newSize < capacity_);
+
+ slots_[current].take();
+ page.number = current + 1;
+ page.pool = thePoolId;
+ debugs(54, 8, page << " size: " << newSize);
+ return true;
}
void
Ipc::Mem::PageStack::push(PageId &page)
{
- debugs(54, 9, page);
-
- if (!page)
- return;
-
- Must(pageIdIsValid(page));
- // find a Writable slot, starting with theFirstWritable and going right
- while (theSize < theCapacity) {
- Offset idx = theFirstWritable;
- auto isWritable = Writable;
- const bool pushed = theItems[idx].compare_exchange_strong(isWritable, page.number);
- // theItems[idx] is probably not Writable [any more];
-
- // Whether we pushed the page number or not, we should try going right
- // to maintain the index (and make progress).
- // We may fail if others already updated the index, but that is OK.
- theFirstWritable.compare_exchange_weak(idx, next(idx)); // may fail or lie
-
- if (pushed) {
- // the enqueued value may already by gone, but that is OK
- theLastReadable = idx; // may lie
- ++theSize;
- debugs(54, 9, page << " at " << idx << " size: " << theSize);
- page = PageId();
- return;
- }
- // TODO: report suspiciously long loops
- }
- Must(false); // the number of pages cannot exceed theCapacity
+ debugs(54, 8, page);
+ assert(page);
+ assert(pageIdIsValid(page));
+
+ const auto pageIndex = page.number - 1;
+ auto &slot = slots_[pageIndex];
+
+ // must increment before inserting the page to avoid underflow in pop()
+ const auto newSize = ++size_;
+ assert(newSize <= capacity_);
+
+ auto current = head_.load();
+ auto expected = Slot::TakenPage;
+ do {
+ slot.put(expected, current);
+ expected = current;
+ } while (!head_.compare_exchange_weak(current, pageIndex));
+
+ debugs(54, 8, page << " size: " << newSize);
+ page = PageId();
}
bool
Ipc::Mem::PageStack::pageIdIsValid(const PageId &page) const
{
- return page.pool == thePoolId && page.number != Writable &&
- page.number <= capacity();
+ return page.pool == thePoolId &&
+ 0 < page.number && page.number <= capacity();
}
size_t
Ipc::Mem::PageStack::sharedMemorySize() const
{
- return SharedMemorySize(thePoolId, theCapacity, thePageSize);
+ return SharedMemorySize(thePoolId, capacity_, thePageSize);
}
size_t
-Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const unsigned int capacity, const size_t pageSize)
+Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const PageCount capacity, const size_t pageSize)
{
const auto levelsSize = PageId::maxPurpose * sizeof(Levels_t);
const size_t pagesDataSize = capacity * pageSize;
}
size_t
-Ipc::Mem::PageStack::StackSize(const unsigned int capacity)
+Ipc::Mem::PageStack::StackSize(const PageCount capacity)
{
- return sizeof(PageStack) + capacity * sizeof(Item);
+ return sizeof(PageStack) + capacity * sizeof(Slot);
}
size_t
Ipc::Mem::PageStack::stackSize() const
{
- return StackSize(theCapacity);
+ return StackSize(capacity_);
}
size_t
-Ipc::Mem::PageStack::LevelsPaddingSize(const unsigned int capacity)
+Ipc::Mem::PageStack::LevelsPaddingSize(const PageCount capacity)
{
const auto displacement = StackSize(capacity) % alignof(Levels_t);
return displacement ? alignof(Levels_t) - displacement : 0;
#include "ipc/mem/FlexibleArray.h"
#include <atomic>
+#include <limits>
namespace Ipc
{
class PageId;
-/// Atomic container of "free" page numbers inside a single SharedMemory space.
-/// Assumptions: all page numbers are unique, positive, have an known maximum,
-/// and can be temporary unavailable as long as they are never trully lost.
+/// reflects the dual nature of PageStack storage:
+/// - for free pages, this is a pointer to the next free page
+/// - for used pages, this is a "used page" marker
+class PageStackStorageSlot
+{
+public:
+ // We are using uint32_t for Pointer because PageId::number is uint32_t.
+ // PageId::number should probably be uint64_t to accommodate caches with
+ // page numbers exceeding UINT32_MAX.
+ typedef uint32_t PointerOrMarker;
+ typedef PointerOrMarker Pointer;
+ typedef PointerOrMarker Marker;
+
+ /// represents a nil next slot pointer
+ static const Pointer NilPtr = std::numeric_limits<PointerOrMarker>::max();
+ /// marks a slot of a used (i.e. take()n) page
+ static const Marker TakenPage = std::numeric_limits<PointerOrMarker>::max() - 1;
+ static_assert(TakenPage != NilPtr, "magic PointerOrMarker values do not clash");
+
+ explicit PageStackStorageSlot(const Pointer nxt = NilPtr): nextOrMarker(nxt) {}
+
+ /// returns a (possibly nil) pointer to the next free page
+ Pointer next() const { return nextOrMarker.load(); }
+
+ /// marks our page as used
+ void take();
+
+ /// marks our page as free, to be used before the given `nxt` page;
+ /// also checks that the slot state matches the caller expectations
+ void put(const PointerOrMarker expected, const Pointer nxt);
+
+private:
+ std::atomic<PointerOrMarker> nextOrMarker;
+};
+
+/// Atomic container of "free" PageIds. Detects (some) double-free bugs.
+/// Assumptions: All page numbers are unique, positive, with a known maximum.
+/// A pushed page may not become available immediately but is never truly lost.
class PageStack
{
public:
- typedef uint32_t Value; ///< stack item type (a free page number)
typedef std::atomic<size_t> Levels_t;
- PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize);
+ // XXX: The actual type may have been on PagePool::Init() but may conflict
+ // with PageLimit(), StoreMapSliceId, Rock::SwapDirRr::slotLimitActual(),
+ // Rock::SlotId, PageId::number, etc.
+ /// the number of (free and/or used) pages in a stack
+ typedef unsigned int PageCount;
+
+ PageStack(const uint32_t aPoolId, const PageCount aCapacity, const size_t aPageSize);
- unsigned int capacity() const { return theCapacity; }
+ PageCount capacity() const { return capacity_; }
size_t pageSize() const { return thePageSize; }
- /// lower bound for the number of free pages
- unsigned int size() const { return max(0, theSize.load()); }
+ /// an approximate number of free pages
+ PageCount size() const { return size_.load(); }
/// sets value and returns true unless no free page numbers are found
bool pop(PageId &page);
bool pageIdIsValid(const PageId &page) const;
/// total shared memory size required to share
- static size_t SharedMemorySize(const uint32_t aPoolId, const unsigned int capacity, const size_t pageSize);
+ static size_t SharedMemorySize(const uint32_t aPoolId, const PageCount capacity, const size_t pageSize);
size_t sharedMemorySize() const;
/// shared memory size required only by PageStack, excluding
/// shared counters and page data
- static size_t StackSize(const unsigned int capacity);
+ static size_t StackSize(const PageCount capacity);
size_t stackSize() const;
/// \returns the number of padding bytes to align PagePool::theLevels array
- static size_t LevelsPaddingSize(const unsigned int capacity);
- size_t levelsPaddingSize() const { return LevelsPaddingSize(theCapacity); }
+ static size_t LevelsPaddingSize(const PageCount capacity);
+ size_t levelsPaddingSize() const { return LevelsPaddingSize(capacity_); }
private:
- /// stack index and size type (may temporary go negative)
- typedef int Offset;
-
- // these help iterate the stack in search of a free spot or a page
- Offset next(const Offset idx) const { return (idx + 1) % theCapacity; }
- Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; }
+ using Slot = PageStackStorageSlot;
+ // XXX: theFoo members look misplaced due to messy separation of PagePool
+ // (which should support multiple Segments but does not) and PageStack
+ // (which should not calculate the Segment size but does) duties.
const uint32_t thePoolId; ///< pool ID
- const Offset theCapacity; ///< stack capacity, i.e. theItems size
+ const PageCount capacity_; ///< the maximum number of pages
const size_t thePageSize; ///< page size, used to calculate shared memory size
- /// lower bound for the number of free pages (may get negative!)
- std::atomic<Offset> theSize;
+ /// a lower bound for the number of free pages (for debugging purposes)
+ std::atomic<PageCount> size_;
- /// last readable item index; just a hint, not a guarantee
- std::atomic<Offset> theLastReadable;
- /// first writable item index; just a hint, not a guarantee
- std::atomic<Offset> theFirstWritable;
+ /// the index of the first free stack element or nil
+ std::atomic<Slot::Pointer> head_;
- typedef std::atomic<Value> Item;
- Ipc::Mem::FlexibleArray<Item> theItems; ///< page number storage
+ /// slots indexed using their page number
+ Ipc::Mem::FlexibleArray<Slot> slots_;
+ // No more data members should follow! See Ipc::Mem::FlexibleArray<> for details.
};
} // namespace Mem