]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Fix the ABA problem with Ipc::Mem::PageStack::pop() in c3408a3 (#587)
authorAlex Rousskov <rousskov@measurement-factory.com>
Wed, 13 May 2020 06:37:31 +0000 (06:37 +0000)
committerSquid Anubis <squid-anubis@squid-cache.org>
Wed, 13 May 2020 10:23:53 +0000 (10:23 +0000)
Suspected symptoms:

    assertion failed: mem/PageStack.cc:35: "old == expected"
    assertion failed: mem/PageStack.cc:27: "nxt != TakenPage"
    assertion failed: mem/PageStack.cc:33: "nxt != TakenPage"
    assertion failed: StoreMap.cc:337: "validSlice(sliceId)"

Replaced a list-based PageStack implementation with a tree-based one.
The new code uses a deterministic binary tree. Inner nodes count the
number of available IDs in their child subtrees. Leaf nodes store IDs
using bitmasks. The root node tells the pop() method whether it is going
to find a free page. The method then adjusts counters in 1-23 nodes
(depending on the tree hight) on the path to the leaf containing a page.
The push() method adds a bit to the leaf node and adjusts the counters
of all the inner nodes (1-23) on the way up to the root one. All the
adjustments are lockless. Push may also be wait-free. No ABA problems.

An alternative fix that preserved list-based implementation was
implemented but ultimately rejected: Avoiding ABA problems required
complex code, and that complexity prevented meaningful validation using
Rust's Loom. Also, dirty performance tests of outside-of-Squid code
showed unexplained significant response time growth of the list-based
implementation when concurrency levels were being increased beyond a few
threads. While these validation and performance concerns could be red
herrings, their existence decreased our confidence in the list-based
algorithm that already had a history of fooling developers.

The tree-based PageStack implementation needs 8-16x less RAM. Roughly:
* list-based: sizeof(uint32_t) * capacity          or  4*capacity
* tree-based: sizeof(uint64_t) * 2 * rcapacity/64  or  rcapacity/4
  where rounded capacity is somewhere between capacity and 2*capacity

The absolute RAM savings are minor for most environments, but the
footprint reduction might be enough to fit a significant part of some
hot index in a modern L1 CPU cache: (e.g., a 32GB rock cache_dir may
have 16GB/16KB = 1M slot IDs = 512KB tree size).

The tree-based structure may make future support for caches with more
than 2^25 entries easier because it does not heavily rely on combining a
cache entry ID and an ABA version/nonce in a single 64-bit atomic.

TODO: While absolute tree- and list-based operation costs are all small
(under 1 microsecond), tree-based implementation cost are higher. Since
rock code pops all disk slots at startup (a known performance bug), rock
startup costs increased significantly. For example, a 200 GB disk cache
test shows ~18 seconds startup time for the tree-based implementation
versus ~4 seconds for list-based. This will be addressed by fixing that
known performance bug. The fix will not alter the tree algorithm.

TODO: The PageStack class should be renamed. Ideally, that renaming
should coincide with refactoring the PagePool/PageStack split, which is
an old XXX also currently exposes a lot of internal PageStack code.

See also: https://en.wikipedia.org/wiki/ABA_problem

src/Makefile.am
src/ipc/Makefile.am
src/ipc/mem/PageStack.cc
src/ipc/mem/PageStack.h

index 238306133f0103fb3aa2425cdf6159828d96378a..cda8aa8415aa6ec4159795843e2ee4b91d430823 100644 (file)
@@ -576,7 +576,6 @@ squid_LDADD = \
        $(top_builddir)/lib/libmisccontainers.la \
        $(top_builddir)/lib/libmiscencoding.la \
        $(top_builddir)/lib/libmiscutil.la \
-       $(ATOMICLIB) \
        $(SSLLIB) \
        $(EPOLL_LIBS) \
        $(MINGW_LIBS) \
index 2c0f9a36a897da10f646b8297f9de25fa971cc10..5c57427e490a6968ccfdf40588928467c33e488f 100644 (file)
@@ -71,3 +71,6 @@ DEFS += -DDEFAULT_STATEDIR=\"$(localstatedir)/run/squid\"
 
 install-data-local:
        $(mkinstalldirs) $(DESTDIR)$(localstatedir)/run/squid;
+
+libipc_la_LIBADD = $(ATOMICLIB)
+
index 56314709b612d04b33f874828a2804203ac1553e..a2557f700203c1a0193d7b39a5db20bcb11fc320 100644 (file)
 
 #include "squid.h"
 
-#include "base/TextException.h"
 #include "Debug.h"
 #include "ipc/mem/Page.h"
 #include "ipc/mem/PageStack.h"
 
-/* Ipc::Mem::PageStackStorageSlot */
+#include <cmath>
+#include <algorithm>
 
-static_assert(sizeof(Ipc::Mem::PageStackStorageSlot::Pointer) ==
-              sizeof(decltype(Ipc::Mem::PageId::number)), "page indexing types are consistent");
+/*
+
+Ipc::Mem::IdSet and related code maintains a perfect full binary tree structure:
+
+         (l,r)
+           /\
+    (ll,lr)  (rl,rr)
+       /\      /\
+      L1 L2   L3 L4
+
+where
+
+    * (l,r) is an always-present root node;
+    * inner nodes, including the root one, count the total number of available
+      IDs in the leaf nodes of the left and right subtrees (e.g., r = rl + rr);
+    * leaf nodes are bitsets of available IDs (e.g., rl = number of 1s in L3);
+      all leaf nodes are always present.
+
+The above sample tree would be stored as seven 64-bit atomic integers:
+    (l,r), (ll,lr), (rl,rr), L1, L2, L3, L4
+
+*/
+
+namespace Ipc
+{
+
+namespace Mem
+{
+
+/// the maximum number of pages that a leaf node can store
+static const IdSet::size_type BitsPerLeaf = 64;
+
+class IdSetPosition
+{
+public:
+    using size_type = IdSet::size_type;
+
+    IdSetPosition() = default; ///< root node position
+    IdSetPosition(size_type aLevel, size_type anOffset);
+
+    /// whether we are at the top of the tree
+    bool atRoot() const { return !level && !offset; }
+
+    /// which direction is this position from our parent node
+    IdSetNavigationDirection ascendDirection() const;
+
+    /// the number of levels above us (e.g., zero for the root node)
+    IdSet::size_type level = 0;
+    /// the number of nodes (at our level) to the left of us
+    IdSet::size_type offset = 0;
+};
+
+/// a helper class to perform inner node manipulation for IdSet
+class IdSetInnerNode
+{
+public:
+    using size_type = IdSet::size_type;
+    typedef uint64_t Packed; ///< (atomically) stored serialized value
+
+    /// de-serializes a given value
+    static IdSetInnerNode Unpack(Packed packed);
+
+    IdSetInnerNode() = default;
+    IdSetInnerNode(size_type left, size_type right);
+
+    /// returns a serializes value suitable for shared memory storage
+    Packed pack() const { return (static_cast<Packed>(left) << 32) | right; }
+
+    size_type left = 0; ///< the number of available IDs in the left subtree
+    size_type right = 0; ///< the number of available IDs in the right subtree
+};
+
+} // namespace Mem
+
+} // namespace Ipc
+
+/* Ipc::Mem::IdSetPosition */
+
+Ipc::Mem::IdSetPosition::IdSetPosition(size_type aLevel, size_type anOffset):
+    level(aLevel),
+    offset(anOffset)
+{
+}
+
+Ipc::Mem::IdSetNavigationDirection
+Ipc::Mem::IdSetPosition::ascendDirection() const
+{
+    return (offset % 2 == 0) ? dirLeft : dirRight;
+}
+
+/* Ipc::Mem::IdSetMeasurements */
+
+Ipc::Mem::IdSetMeasurements::IdSetMeasurements(const size_type aCapacity)
+{
+    capacity = aCapacity;
+
+    // For simplicity, we want a perfect full binary tree with root and leaves.
+    // We could compute all this with log2() calls, but rounding and honoring
+    // root+leaves minimums make that approach more complex than this fast loop.
+    requestedLeafNodeCount = (capacity + (BitsPerLeaf-1))/BitsPerLeaf;
+    treeHeight = 1+1; // the root level plus the leaf nodes level
+    leafNodeCount = 2; // the root node can have only two leaf nodes
+    while (leafNodeCount < requestedLeafNodeCount) {
+        leafNodeCount *= 2;
+        ++treeHeight;
+    }
+    innerLevelCount = treeHeight - 1;
+
+    debugs(54, 5, "rounded capacity up from " << capacity << " to " << (leafNodeCount*BitsPerLeaf));
+
+    // we do (1 << level) when computing 32-bit IdSetInnerNode::left
+    assert(treeHeight < 32);
+}
+
+/* Ipc::Mem::IdSetInnerNode */
+
+Ipc::Mem::IdSetInnerNode::IdSetInnerNode(size_type aLeft, size_type aRight):
+    left(aLeft),
+    right(aRight)
+{
+}
+
+Ipc::Mem::IdSetInnerNode
+Ipc::Mem::IdSetInnerNode::Unpack(Packed packed)
+{
+    // truncation during the cast is intentional here
+    return IdSetInnerNode(packed >> 32, static_cast<uint32_t>(packed));
+}
+
+/* Ipc::Mem::IdSet */
+
+Ipc::Mem::IdSet::IdSet(const size_type capacity):
+    measurements(capacity),
+    nodes_(capacity)
+{
+    // For valueAddress() to be able to return a raw uint64_t pointer, the
+    // atomic wrappers in nodes_ must be zero-size. Check the best we can. Once.
+    static_assert(sizeof(StoredNode) == sizeof(Node), "atomic locks use no storage");
+    assert(StoredNode().is_lock_free());
+
+    makeFullBeforeSharing();
+}
+
+void
+Ipc::Mem::IdSet::makeFullBeforeSharing()
+{
+    // initially, all IDs are marked as available
+    fillAllNodes();
+
+    // ... but IDs beyond the requested capacity should not be available
+    if (measurements.capacity != measurements.leafNodeCount*BitsPerLeaf)
+        truncateExtras();
+}
+
+/// populates the entire allocated tree with available IDs
+/// may exceed the requested capacity; \see truncateExtras()
+void
+Ipc::Mem::IdSet::fillAllNodes()
+{
+    // leaf nodes
+    auto pos = Position(measurements.treeHeight-1, 0);
+    const auto allOnes = ~uint64_t(0);
+    std::fill_n(valueAddress(pos), measurements.leafNodeCount, allOnes);
+
+    // inner nodes, starting from the bottom of the tree
+    auto nodesAtLevel = measurements.leafNodeCount/2;
+    auto pagesBelow = BitsPerLeaf;
+    do {
+        pos = ascend(pos);
+        const auto value = IdSetInnerNode(pagesBelow, pagesBelow).pack();
+        std::fill_n(valueAddress(pos), nodesAtLevel, value);
+        nodesAtLevel /= 2;
+        pagesBelow *= 2;
+    } while (!pos.atRoot());
+}
+
+/// effectively removes IDs that exceed the requested capacity after makeFull()
+void
+Ipc::Mem::IdSet::truncateExtras()
+{
+    // leaf nodes
+    // start with the left-most leaf that should have some 0s; it may even have
+    // no 1s at all (i.e. be completely unused)
+    auto pos = Position(measurements.treeHeight-1, measurements.capacity/BitsPerLeaf);
+    leafTruncate(pos, measurements.capacity % BitsPerLeaf);
+    const auto rightLeaves = measurements.leafNodeCount - measurements.requestedLeafNodeCount;
+    // this zeroing of the leaf nodes to the right from pos is only necessary to
+    // trigger asserts if the code dealing with the inner node counters is buggy
+    if (rightLeaves > 1)
+        std::fill_n(valueAddress(pos) + 1, rightLeaves-1, 0);
 
+    // inner nodes, starting from the bottom of the tree; optimization: only
+    // adjusting nodes on the way up from the first leaf-with-0s position
+    auto toSubtract = BitsPerLeaf - (measurements.capacity % BitsPerLeaf);
+    do {
+        const auto direction = pos.ascendDirection();
+        pos = ascend(pos);
+        toSubtract = innerTruncate(pos, direction, toSubtract);
+    } while (!pos.atRoot());
+}
+
+/// fill the leaf node at a given position with 0s, leaving only idsToKeep IDs
 void
-Ipc::Mem::PageStackStorageSlot::take()
+Ipc::Mem::IdSet::leafTruncate(const Position pos, const size_type idsToKeep)
 {
-    const auto nxt = nextOrMarker.exchange(TakenPage);
-    assert(nxt != TakenPage);
+    Node &node = *valueAddress(pos); // no auto to simplify the asserts() below
+    assert(node == std::numeric_limits<Node>::max()); // all 1s
+    static_assert(std::is_unsigned<Node>::value, "right shift prepends 0s");
+    node >>= BitsPerLeaf - idsToKeep;
+    // node can be anything here, including all 0s and all 1s
+}
+
+/// accounts for toSubtract IDs removal from a subtree in the given direction of
+/// the given position
+/// \returns the number of IDs to subtract from the parent node
+Ipc::Mem::IdSet::size_type
+Ipc::Mem::IdSet::innerTruncate(const Position pos, const NavigationDirection dir, const size_type toSubtract)
+{
+    auto *valuePtr = valueAddress(pos);
+    auto value = IdSetInnerNode::Unpack(*valuePtr);
+    size_type toSubtractNext = 0;
+    if (dir == dirLeft) {
+        toSubtractNext = toSubtract + value.right;
+        assert(value.left >= toSubtract);
+        value.left -= toSubtract;
+        value.right = 0;
+    } else {
+        assert(dir == dirRight);
+        toSubtractNext = toSubtract;
+        assert(value.right >= toSubtract);
+        // value.left is unchanged; we have only adjusted the right branch
+        value.right -= toSubtract;
+    }
+    *valuePtr = value.pack();
+    return toSubtractNext;
+}
+
+/// accounts for an ID added to subtree in the given dir from the given position
+void
+Ipc::Mem::IdSet::innerPush(const Position pos, const NavigationDirection dir)
+{
+    // either left or right component will be true/1; the other will be false/0
+    const auto increment = IdSetInnerNode(dir == dirLeft, dir == dirRight).pack();
+    const auto previousValue = nodeAt(pos).fetch_add(increment);
+    // no overflows
+    assert(previousValue <= std::numeric_limits<Node>::max() - increment);
+}
+
+/// accounts for future ID removal from a subtree of the given position
+/// \returns the direction of the subtree chosen to relinquish the ID
+Ipc::Mem::IdSet::NavigationDirection
+Ipc::Mem::IdSet::innerPop(const Position pos)
+{
+    NavigationDirection direction = dirNone;
+
+    auto &node = nodeAt(pos);
+    auto oldValue = node.load();
+    IdSetInnerNode newValue;
+    do {
+        newValue = IdSetInnerNode::Unpack(oldValue);
+        if (newValue.left) {
+            --newValue.left;
+            direction = dirLeft;
+        } else if (newValue.right) {
+            --newValue.right;
+            direction = dirRight;
+        } else {
+            return dirEnd;
+        }
+    } while (!node.compare_exchange_weak(oldValue, newValue.pack()));
+
+    assert(direction == dirLeft || direction == dirRight);
+    return direction;
 }
 
+/// adds the given ID to the leaf node at the given position
 void
-Ipc::Mem::PageStackStorageSlot::put(const PointerOrMarker expected, const Pointer nxt)
+Ipc::Mem::IdSet::leafPush(const Position pos, const size_type id)
 {
-    assert(nxt != TakenPage);
-    const auto old = nextOrMarker.exchange(nxt);
-    assert(old == expected);
+    const auto mask = Node(1) << (id % BitsPerLeaf);
+    const auto oldValue = nodeAt(pos).fetch_or(mask);
+    // this was a new entry
+    assert((oldValue & mask) == 0);
+}
+
+// TODO: After switching to C++20, use countr_zero() which may compile to a
+// single TZCNT assembly instruction on modern CPUs.
+/// a temporary C++20 countr_zero() replacement
+static inline
+int trailingZeros(uint64_t x)
+{
+    if (!x)
+        return 64;
+    int count = 0;
+    for (uint64_t mask = 1; !(x & mask); mask <<= 1)
+        ++count;
+    return count;
+}
+
+/// extracts and returns an ID from the leaf node at the given position
+Ipc::Mem::IdSet::size_type
+Ipc::Mem::IdSet::leafPop(const Position pos)
+{
+    auto &node = nodeAt(pos);
+    auto oldValue = node.load();
+    Node newValue;
+    do {
+        assert(oldValue > 0);
+        const auto mask = oldValue - 1; // flips the rightmost 1 and trailing 0s
+        newValue = oldValue & mask; // clears the rightmost 1
+    } while (!node.compare_exchange_weak(oldValue, newValue));
+
+    return pos.offset*BitsPerLeaf + trailingZeros(oldValue);
+}
+
+/// \returns the position of a parent node of the node at the given position
+Ipc::Mem::IdSet::Position
+Ipc::Mem::IdSet::ascend(Position pos)
+{
+    assert(pos.level > 0);
+    --pos.level;
+    pos.offset /= 2;
+    return pos;
+}
+
+/// \returns the position of a child node in the given direction of the parent
+/// node at the given position
+Ipc::Mem::IdSet::Position
+Ipc::Mem::IdSet::descend(Position pos, const NavigationDirection direction)
+{
+    assert(pos.level < measurements.treeHeight);
+    ++pos.level;
+
+    pos.offset *= 2;
+    if (direction == dirRight)
+        ++pos.offset;
+    else
+        assert(direction == dirLeft);
+
+    return pos;
+}
+
+/// \returns the atomic node (either inner or leaf) at the given position
+Ipc::Mem::IdSet::StoredNode &
+Ipc::Mem::IdSet::nodeAt(const Position pos)
+{
+    assert(pos.level < measurements.treeHeight);
+    // n = 2^(h+1) - 1 with h = level-1
+    const auto nodesAbove = (1U << pos.level) - 1;
+
+    // the second clause is for the special case of a root node
+    assert(pos.offset < nodesAbove*2 || (pos.atRoot() && nodesAbove == 0));
+    const auto nodesToTheLeft = pos.offset;
+
+    const size_t nodesBefore = nodesAbove + nodesToTheLeft;
+    assert(nodesBefore < measurements.nodeCount());
+    return nodes_[nodesBefore];
+}
+
+/// \returns the location of the raw (inner or leaf) node at the given position
+Ipc::Mem::IdSet::Node *
+Ipc::Mem::IdSet::valueAddress(const Position pos)
+{
+    // IdSet() constructor asserts that this frequent reinterpret_cast is safe
+    return &reinterpret_cast<Node&>(nodeAt(pos));
+}
+
+bool
+Ipc::Mem::IdSet::pop(size_type &id)
+{
+    Position rootPos;
+    const auto directionFromRoot = innerPop(rootPos);
+    if (directionFromRoot == dirEnd)
+        return false; // an empty tree
+
+    auto pos = descend(rootPos, directionFromRoot);
+    for (size_t level = 1; level < measurements.innerLevelCount; ++level) {
+        const auto direction = innerPop(pos);
+        pos = descend(pos, direction);
+    }
+
+    id = leafPop(pos);
+    return true;
+}
+
+void
+Ipc::Mem::IdSet::push(const size_type id)
+{
+    const auto offsetAtLeafLevel = id/BitsPerLeaf;
+    auto pos = Position(measurements.innerLevelCount, offsetAtLeafLevel);
+    leafPush(pos, id);
+
+    do {
+        const auto direction = pos.ascendDirection();
+        pos = ascend(pos);
+        innerPush(pos, direction);
+    } while (!pos.atRoot());
+}
+
+size_t
+Ipc::Mem::IdSet::MemorySize(const size_type capacity)
+{
+    const IdSetMeasurements measurements(capacity);
+    // Adding sizeof(IdSet) double-counts the first node but it is better to
+    // overestimate (a little) than to underestimate our memory needs due to
+    // padding, new data members, etc.
+    return sizeof(IdSet) + measurements.nodeCount() * sizeof(StoredNode);
 }
 
 /* Ipc::Mem::PageStack */
@@ -40,25 +431,9 @@ Ipc::Mem::PageStackStorageSlot::put(const PointerOrMarker expected, const Pointe
 Ipc::Mem::PageStack::PageStack(const PoolId aPoolId, const PageCount aCapacity, const size_t aPageSize):
     thePoolId(aPoolId), capacity_(aCapacity), thePageSize(aPageSize),
     size_(0),
-    head_(Slot::NilPtr),
-    slots_(aCapacity)
-{
-    assert(thePoolId);
-
-    assert(capacity_ < Slot::TakenPage);
-    assert(capacity_ < Slot::NilPtr);
-
-    // initially, all pages are free
-    if (capacity_) {
-        const auto lastIndex = capacity_-1;
-        // FlexibleArray cannot construct its phantom elements so, technically,
-        // all slots (except the very first one) are uninitialized until now.
-        for (Slot::Pointer i = 0; i < lastIndex; ++i)
-            (void)new(&slots_[i])Slot(i+1);
-        (void)new(&slots_[lastIndex])Slot(Slot::NilPtr);
-        size_ = capacity_;
-        head_ = 0;
-    }
+    ids_(capacity_)
+{
+    size_ = capacity_;
 }
 
 bool
@@ -66,23 +441,21 @@ Ipc::Mem::PageStack::pop(PageId &page)
 {
     assert(!page);
 
-    Slot::Pointer current = head_.load();
+    if (!capacity_)
+        return false;
 
-    auto nextFree = Slot::NilPtr;
-    do {
-        if (current == Slot::NilPtr)
-            return false;
-        nextFree = slots_[current].next();
-    } while (!head_.compare_exchange_weak(current, nextFree));
+    IdSet::size_type pageIndex = 0;
+    if (!ids_.pop(pageIndex))
+        return false;
 
     // must decrement after removing the page to avoid underflow
     const auto newSize = --size_;
     assert(newSize < capacity_);
 
-    slots_[current].take();
-    page.number = current + 1;
+    page.number = pageIndex + 1;
     page.pool = thePoolId;
     debugs(54, 8, page << " size: " << newSize);
+    assert(pageIdIsValid(page));
     return true;
 }
 
@@ -93,19 +466,12 @@ Ipc::Mem::PageStack::push(PageId &page)
     assert(page);
     assert(pageIdIsValid(page));
 
-    const auto pageIndex = page.number - 1;
-    auto &slot = slots_[pageIndex];
-
     // must increment before inserting the page to avoid underflow in pop()
     const auto newSize = ++size_;
     assert(newSize <= capacity_);
 
-    auto current = head_.load();
-    auto expected = Slot::TakenPage;
-    do {
-        slot.put(expected, current);
-        expected = current;
-    } while (!head_.compare_exchange_weak(current, pageIndex));
+    const auto pageIndex = page.number - 1;
+    ids_.push(pageIndex);
 
     debugs(54, 8, page << " size: " << newSize);
     page = PageId();
@@ -135,7 +501,10 @@ Ipc::Mem::PageStack::SharedMemorySize(const PoolId, const PageCount capacity, co
 size_t
 Ipc::Mem::PageStack::StackSize(const PageCount capacity)
 {
-    return sizeof(PageStack) + capacity * sizeof(Slot);
+    // Adding sizeof(PageStack) double-counts the fixed portion of the ids_ data
+    // member but it is better to overestimate (a little) than to underestimate
+    // our memory needs due to padding, new data members, etc.
+    return sizeof(PageStack) + IdSet::MemorySize(capacity);
 }
 
 size_t
index a9b051e2804df558a13540e188d7630442e1173e..46a3822e068b1703f99cfc98c8f8b40b7c7e3c12 100644 (file)
@@ -23,39 +23,83 @@ namespace Mem
 
 class PageId;
 
-/// reflects the dual nature of PageStack storage:
-/// - for free pages, this is a pointer to the next free page
-/// - for used pages, this is a "used page" marker
-class PageStackStorageSlot
+class IdSetPosition;
+typedef enum { dirNone, dirLeft, dirRight, dirEnd } IdSetNavigationDirection;
+
+/// basic IdSet storage parameters, extracted here to keep them constant
+class IdSetMeasurements
+{
+public:
+    /// we need to fit two size_type counters into one 64-bit lockless atomic
+    typedef uint32_t size_type;
+
+    explicit IdSetMeasurements(size_type capacity);
+
+    /// the maximum number of pages our tree is allowed to store
+    size_type capacity = 0;
+
+    /// the number of leaf nodes that satisfy capacity requirements
+    size_type requestedLeafNodeCount = 0;
+
+    size_type treeHeight = 0; ///< total number of levels, including the leaf level
+    size_type leafNodeCount = 0; ///< the number of nodes at the leaf level
+    size_type innerLevelCount = 0; ///< all levels except the leaf level
+
+    /// the total number of nodes at all levels
+    size_type nodeCount() const { return leafNodeCount ? leafNodeCount*2 -1 : 0; }
+};
+
+/// a shareable set of positive uint32_t IDs with O(1) insertion/removal ops
+class IdSet
 {
 public:
-    // We are using uint32_t for Pointer because PageId::number is uint32_t.
-    // PageId::number should probably be uint64_t to accommodate caches with
-    // page numbers exceeding UINT32_MAX.
-    typedef uint32_t PointerOrMarker;
-    typedef PointerOrMarker Pointer;
-    typedef PointerOrMarker Marker;
+    using size_type = IdSetMeasurements::size_type;
+    using Position = IdSetPosition;
+    using NavigationDirection = IdSetNavigationDirection;
 
-    /// represents a nil next slot pointer
-    static const Pointer NilPtr = std::numeric_limits<PointerOrMarker>::max();
-    /// marks a slot of a used (i.e. take()n) page
-    static const Marker TakenPage = std::numeric_limits<PointerOrMarker>::max() - 1;
-    static_assert(TakenPage != NilPtr, "magic PointerOrMarker values do not clash");
+    /// memory size required to store a tree with the given capacity
+    static size_t MemorySize(size_type capacity);
 
-    explicit PageStackStorageSlot(const Pointer nxt = NilPtr): nextOrMarker(nxt) {}
+    explicit IdSet(size_type capacity);
 
-    /// returns a (possibly nil) pointer to the next free page
-    Pointer next() const { return nextOrMarker.load(); }
+    /// populates the allocated tree with the requested capacity IDs
+    /// optimized to run without atomic protection
+    void makeFullBeforeSharing();
 
-    /// marks our page as used
-    void take();
+    /// finds/extracts (into the given `id`) an ID value and returns true
+    /// \retval false no IDs are left
+    bool pop(size_type &id);
 
-    /// marks our page as free, to be used before the given `nxt` page;
-    /// also checks that the slot state matches the caller expectations
-    void put(const PointerOrMarker expected, const Pointer nxt);
+    /// makes `id` value available to future pop() callers
+    void push(size_type id);
+
+    const IdSetMeasurements measurements;
 
 private:
-    std::atomic<PointerOrMarker> nextOrMarker;
+    typedef uint64_t Node; ///< either leaf or intermediate node
+    typedef std::atomic<Node> StoredNode; ///< a Node stored in shared memory
+
+    /* optimization: these initialization methods bypass atomic protections */
+    void fillAllNodes();
+    void truncateExtras();
+    Node *valueAddress(Position);
+    size_type innerTruncate(Position pos, NavigationDirection dir, size_type toSubtract);
+    void leafTruncate(Position pos, size_type idsToKeep);
+
+    void innerPush(Position, NavigationDirection);
+    NavigationDirection innerPop(Position);
+
+    void leafPush(Position, size_type id);
+    size_type leafPop(Position);
+
+    Position ascend(Position);
+    Position descend(Position, NavigationDirection);
+
+    StoredNode &nodeAt(Position);
+
+    /// the entire binary tree flattened into an array
+    FlexibleArray<StoredNode> nodes_;
+    // No more data members should follow! See FlexibleArray<> for details.
 };
 
 /// Atomic container of "free" PageIds. Detects (some) double-free bugs.
@@ -113,8 +157,6 @@ public:
     static PoolId IdForSwapDirSpace(const int dirIdx) { return 900 + dirIdx + 1; }
 
 private:
-    using Slot = PageStackStorageSlot;
-
     // XXX: theFoo members look misplaced due to messy separation of PagePool
     // (which should support multiple Segments but does not) and PageStack
     // (which should not calculate the Segment size but does) duties.
@@ -124,12 +166,8 @@ private:
     /// a lower bound for the number of free pages (for debugging purposes)
     std::atomic<PageCount> size_;
 
-    /// the index of the first free stack element or nil
-    std::atomic<Slot::Pointer> head_;
-
-    /// slots indexed using their page number
-    Ipc::Mem::FlexibleArray<Slot> slots_;
-    // No more data members should follow! See Ipc::Mem::FlexibleArray<> for details.
+    IdSet ids_; ///< free pages (valid with positive capacity_)
+    // No more data members should follow! See IdSet for details.
 };
 
 } // namespace Mem