#include "ipc/mem/Page.h"
#include "ipc/mem/PageStack.h"
-/* Ipc::Mem::PageStackStorageSlot */
-
-static_assert(sizeof(Ipc::Mem::PageStackStorageSlot::Pointer) ==
- sizeof(decltype(Ipc::Mem::PageId::number)), "page indexing types are consistent");
-
-void
-Ipc::Mem::PageStackStorageSlot::take()
-{
- const auto nxt = nextOrMarker.exchange(TakenPage);
- assert(nxt != TakenPage);
-}
-
-void
-Ipc::Mem::PageStackStorageSlot::put(const PointerOrMarker expected, const Pointer nxt)
-{
- assert(nxt != TakenPage);
- const auto old = nextOrMarker.exchange(nxt);
- assert(old == expected);
-}
-
-/* Ipc::Mem::PageStack */
-
-Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const PageCount aCapacity, const size_t aPageSize):
- thePoolId(aPoolId), capacity_(aCapacity), thePageSize(aPageSize),
- size_(0),
- head_(Slot::NilPtr),
- slots_(aCapacity)
+/// used to mark a stack slot available for storing free page offsets
+const Ipc::Mem::PageStack::Value Writable = 0;
+
+Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize):
+ thePoolId(aPoolId), theCapacity(aCapacity), thePageSize(aPageSize),
+ theSize(theCapacity),
+ theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable)),
+ theItems(aCapacity)
{
- assert(capacity_ < Slot::TakenPage);
- assert(capacity_ < Slot::NilPtr);
-
// initially, all pages are free
- if (capacity_) {
- const auto lastIndex = capacity_-1;
- // FlexibleArray cannot construct its phantom elements so, technically,
- // all slots (except the very first one) are uninitialized until now.
- for (Slot::Pointer i = 0; i < lastIndex; ++i)
- (void)new(&slots_[i])Slot(i+1);
- (void)new(&slots_[lastIndex])Slot(Slot::NilPtr);
- size_ = capacity_;
- head_ = 0;
- }
+ for (Offset i = 0; i < theSize; ++i)
+ theItems[i] = i + 1; // skip page number zero to keep numbers positive
}
+/*
+ * TODO: We currently rely on the theLastReadable hint during each
+ * loop iteration. We could also use hint just for the start position:
+ * (const Offset start = theLastReadable) and then scan the stack
+ * sequentially regardless of theLastReadable changes by others. Which
+ * approach is better? Same for push().
+ */
bool
Ipc::Mem::PageStack::pop(PageId &page)
{
- assert(!page);
-
- Slot::Pointer current = head_.load();
-
- auto nextFree = Slot::NilPtr;
- do {
- if (current == Slot::NilPtr)
- return false;
- nextFree = slots_[current].next();
- } while (!head_.compare_exchange_weak(current, nextFree));
-
- // must decrement after removing the page to avoid underflow
- const auto newSize = --size_;
- assert(newSize < capacity_);
-
- slots_[current].take();
- page.number = current + 1;
- page.pool = thePoolId;
- debugs(54, 8, page << " size: " << newSize);
- return true;
+ Must(!page);
+
+ // we may fail to dequeue, but be conservative to prevent long searches
+ --theSize;
+
+ // find a Readable slot, starting with theLastReadable and going left
+ while (theSize >= 0) {
+ Offset idx = theLastReadable;
+ // mark the slot at ids Writable while extracting its current value
+ const Value value = theItems[idx].fetch_and(0); // works if Writable is 0
+ const bool popped = value != Writable;
+ // theItems[idx] is probably not Readable [any more]
+
+ // Whether we popped a Readable value or not, we should try going left
+ // to maintain the index (and make progress).
+ // We may fail if others already updated the index, but that is OK.
+ theLastReadable.compare_exchange_weak(idx, prev(idx)); // may fail or lie
+
+ if (popped) {
+ // the slot we emptied may already be filled, but that is OK
+ theFirstWritable = idx; // may lie
+ page.pool = thePoolId;
+ page.number = value;
+ debugs(54, 9, page << " at " << idx << " size: " << theSize);
+ return true;
+ }
+ // TODO: report suspiciously long loops
+ }
+
+ ++theSize;
+ return false;
}
void
Ipc::Mem::PageStack::push(PageId &page)
{
- debugs(54, 8, page);
- assert(page);
- assert(pageIdIsValid(page));
-
- const auto pageIndex = page.number - 1;
- auto &slot = slots_[pageIndex];
-
- // must increment before inserting the page to avoid underflow in pop()
- const auto newSize = ++size_;
- assert(newSize <= capacity_);
-
- auto current = head_.load();
- auto expected = Slot::TakenPage;
- do {
- slot.put(expected, current);
- expected = current;
- } while (!head_.compare_exchange_weak(current, pageIndex));
-
- debugs(54, 8, page << " size: " << newSize);
- page = PageId();
+ debugs(54, 9, page);
+
+ if (!page)
+ return;
+
+ Must(pageIdIsValid(page));
+ // find a Writable slot, starting with theFirstWritable and going right
+ while (theSize < theCapacity) {
+ Offset idx = theFirstWritable;
+ auto isWritable = Writable;
+ const bool pushed = theItems[idx].compare_exchange_strong(isWritable, page.number);
+ // theItems[idx] is probably not Writable [any more];
+
+ // Whether we pushed the page number or not, we should try going right
+ // to maintain the index (and make progress).
+ // We may fail if others already updated the index, but that is OK.
+ theFirstWritable.compare_exchange_weak(idx, next(idx)); // may fail or lie
+
+ if (pushed) {
+ // the enqueued value may already by gone, but that is OK
+ theLastReadable = idx; // may lie
+ ++theSize;
+ debugs(54, 9, page << " at " << idx << " size: " << theSize);
+ page = PageId();
+ return;
+ }
+ // TODO: report suspiciously long loops
+ }
+ Must(false); // the number of pages cannot exceed theCapacity
}
bool
Ipc::Mem::PageStack::pageIdIsValid(const PageId &page) const
{
- return page.pool == thePoolId &&
- 0 < page.number && page.number <= capacity();
+ return page.pool == thePoolId && page.number != Writable &&
+ page.number <= capacity();
}
size_t
Ipc::Mem::PageStack::sharedMemorySize() const
{
- return SharedMemorySize(thePoolId, capacity_, thePageSize);
+ return SharedMemorySize(thePoolId, theCapacity, thePageSize);
}
size_t
-Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const PageCount capacity, const size_t pageSize)
+Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const unsigned int capacity, const size_t pageSize)
{
const auto levelsSize = PageId::maxPurpose * sizeof(Levels_t);
const size_t pagesDataSize = capacity * pageSize;
}
size_t
-Ipc::Mem::PageStack::StackSize(const PageCount capacity)
+Ipc::Mem::PageStack::StackSize(const unsigned int capacity)
{
- return sizeof(PageStack) + capacity * sizeof(Slot);
+ return sizeof(PageStack) + capacity * sizeof(Item);
}
size_t
Ipc::Mem::PageStack::stackSize() const
{
- return StackSize(capacity_);
+ return StackSize(theCapacity);
}
size_t
-Ipc::Mem::PageStack::LevelsPaddingSize(const PageCount capacity)
+Ipc::Mem::PageStack::LevelsPaddingSize(const unsigned int capacity)
{
const auto displacement = StackSize(capacity) % alignof(Levels_t);
return displacement ? alignof(Levels_t) - displacement : 0;
#include "ipc/mem/FlexibleArray.h"
#include <atomic>
-#include <limits>
namespace Ipc
{
class PageId;
-/// reflects the dual nature of PageStack storage:
-/// - for free pages, this is a pointer to the next free page
-/// - for used pages, this is a "used page" marker
-class PageStackStorageSlot
-{
-public:
- // We are using uint32_t for Pointer because PageId::number is uint32_t.
- // PageId::number should probably be uint64_t to accommodate caches with
- // page numbers exceeding UINT32_MAX.
- typedef uint32_t PointerOrMarker;
- typedef PointerOrMarker Pointer;
- typedef PointerOrMarker Marker;
-
- /// represents a nil next slot pointer
- static const Pointer NilPtr = std::numeric_limits<PointerOrMarker>::max();
- /// marks a slot of a used (i.e. take()n) page
- static const Marker TakenPage = std::numeric_limits<PointerOrMarker>::max() - 1;
- static_assert(TakenPage != NilPtr, "magic PointerOrMarker values do not clash");
-
- explicit PageStackStorageSlot(const Pointer nxt = NilPtr): nextOrMarker(nxt) {}
-
- /// returns a (possibly nil) pointer to the next free page
- Pointer next() const { return nextOrMarker.load(); }
-
- /// marks our page as used
- void take();
-
- /// marks our page as free, to be used before the given `nxt` page;
- /// also checks that the slot state matches the caller expectations
- void put(const PointerOrMarker expected, const Pointer nxt);
-
-private:
- std::atomic<PointerOrMarker> nextOrMarker;
-};
-
-/// Atomic container of "free" PageIds. Detects (some) double-free bugs.
-/// Assumptions: All page numbers are unique, positive, with a known maximum.
-/// A pushed page may not become available immediately but is never truly lost.
+/// Atomic container of "free" page numbers inside a single SharedMemory space.
+/// Assumptions: all page numbers are unique, positive, have an known maximum,
+/// and can be temporary unavailable as long as they are never trully lost.
class PageStack
{
public:
+ typedef uint32_t Value; ///< stack item type (a free page number)
typedef std::atomic<size_t> Levels_t;
- // XXX: The actual type may have been on PagePool::Init() but may conflict
- // with PageLimit(), StoreMapSliceId, Rock::SwapDirRr::slotLimitActual(),
- // Rock::SlotId, PageId::number, etc.
- /// the number of (free and/or used) pages in a stack
- typedef unsigned int PageCount;
-
- PageStack(const uint32_t aPoolId, const PageCount aCapacity, const size_t aPageSize);
+ PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize);
- PageCount capacity() const { return capacity_; }
+ unsigned int capacity() const { return theCapacity; }
size_t pageSize() const { return thePageSize; }
- /// an approximate number of free pages
- PageCount size() const { return size_.load(); }
+ /// lower bound for the number of free pages
+ unsigned int size() const { return max(0, theSize.load()); }
/// sets value and returns true unless no free page numbers are found
bool pop(PageId &page);
bool pageIdIsValid(const PageId &page) const;
/// total shared memory size required to share
- static size_t SharedMemorySize(const uint32_t aPoolId, const PageCount capacity, const size_t pageSize);
+ static size_t SharedMemorySize(const uint32_t aPoolId, const unsigned int capacity, const size_t pageSize);
size_t sharedMemorySize() const;
/// shared memory size required only by PageStack, excluding
/// shared counters and page data
- static size_t StackSize(const PageCount capacity);
+ static size_t StackSize(const unsigned int capacity);
size_t stackSize() const;
/// \returns the number of padding bytes to align PagePool::theLevels array
- static size_t LevelsPaddingSize(const PageCount capacity);
- size_t levelsPaddingSize() const { return LevelsPaddingSize(capacity_); }
+ static size_t LevelsPaddingSize(const unsigned int capacity);
+ size_t levelsPaddingSize() const { return LevelsPaddingSize(theCapacity); }
private:
- using Slot = PageStackStorageSlot;
+ /// stack index and size type (may temporary go negative)
+ typedef int Offset;
+
+ // these help iterate the stack in search of a free spot or a page
+ Offset next(const Offset idx) const { return (idx + 1) % theCapacity; }
+ Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; }
- // XXX: theFoo members look misplaced due to messy separation of PagePool
- // (which should support multiple Segments but does not) and PageStack
- // (which should not calculate the Segment size but does) duties.
const uint32_t thePoolId; ///< pool ID
- const PageCount capacity_; ///< the maximum number of pages
+ const Offset theCapacity; ///< stack capacity, i.e. theItems size
const size_t thePageSize; ///< page size, used to calculate shared memory size
- /// a lower bound for the number of free pages (for debugging purposes)
- std::atomic<PageCount> size_;
+ /// lower bound for the number of free pages (may get negative!)
+ std::atomic<Offset> theSize;
- /// the index of the first free stack element or nil
- std::atomic<Slot::Pointer> head_;
+ /// last readable item index; just a hint, not a guarantee
+ std::atomic<Offset> theLastReadable;
+ /// first writable item index; just a hint, not a guarantee
+ std::atomic<Offset> theFirstWritable;
- /// slots indexed using their page number
- Ipc::Mem::FlexibleArray<Slot> slots_;
- // No more data members should follow! See Ipc::Mem::FlexibleArray<> for details.
+ typedef std::atomic<Value> Item;
+ Ipc::Mem::FlexibleArray<Item> theItems; ///< page number storage
};
} // namespace Mem