From a58608546341c5d2f53f7a4798ae464921961d9f Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Wed, 13 May 2020 06:37:31 +0000 Subject: [PATCH] Fix the ABA problem with Ipc::Mem::PageStack::pop() in c3408a3 (#587) Suspected symptoms: assertion failed: mem/PageStack.cc:35: "old == expected" assertion failed: mem/PageStack.cc:27: "nxt != TakenPage" assertion failed: mem/PageStack.cc:33: "nxt != TakenPage" assertion failed: StoreMap.cc:337: "validSlice(sliceId)" Replaced a list-based PageStack implementation with a tree-based one. The new code uses a deterministic binary tree. Inner nodes count the number of available IDs in their child subtrees. Leaf nodes store IDs using bitmasks. The root node tells the pop() method whether it is going to find a free page. The method then adjusts counters in 1-23 nodes (depending on the tree hight) on the path to the leaf containing a page. The push() method adds a bit to the leaf node and adjusts the counters of all the inner nodes (1-23) on the way up to the root one. All the adjustments are lockless. Push may also be wait-free. No ABA problems. An alternative fix that preserved list-based implementation was implemented but ultimately rejected: Avoiding ABA problems required complex code, and that complexity prevented meaningful validation using Rust's Loom. Also, dirty performance tests of outside-of-Squid code showed unexplained significant response time growth of the list-based implementation when concurrency levels were being increased beyond a few threads. While these validation and performance concerns could be red herrings, their existence decreased our confidence in the list-based algorithm that already had a history of fooling developers. The tree-based PageStack implementation needs 8-16x less RAM. Roughly: * list-based: sizeof(uint32_t) * capacity or 4*capacity * tree-based: sizeof(uint64_t) * 2 * rcapacity/64 or rcapacity/4 where rounded capacity is somewhere between capacity and 2*capacity The absolute RAM savings are minor for most environments, but the footprint reduction might be enough to fit a significant part of some hot index in a modern L1 CPU cache: (e.g., a 32GB rock cache_dir may have 16GB/16KB = 1M slot IDs = 512KB tree size). The tree-based structure may make future support for caches with more than 2^25 entries easier because it does not heavily rely on combining a cache entry ID and an ABA version/nonce in a single 64-bit atomic. TODO: While absolute tree- and list-based operation costs are all small (under 1 microsecond), tree-based implementation cost are higher. Since rock code pops all disk slots at startup (a known performance bug), rock startup costs increased significantly. For example, a 200 GB disk cache test shows ~18 seconds startup time for the tree-based implementation versus ~4 seconds for list-based. This will be addressed by fixing that known performance bug. The fix will not alter the tree algorithm. TODO: The PageStack class should be renamed. Ideally, that renaming should coincide with refactoring the PagePool/PageStack split, which is an old XXX also currently exposes a lot of internal PageStack code. See also: https://en.wikipedia.org/wiki/ABA_problem --- src/Makefile.am | 1 - src/ipc/Makefile.am | 3 + src/ipc/mem/PageStack.cc | 467 +++++++++++++++++++++++++++++++++++---- src/ipc/mem/PageStack.h | 102 ++++++--- 4 files changed, 491 insertions(+), 82 deletions(-) diff --git a/src/Makefile.am b/src/Makefile.am index 238306133f..cda8aa8415 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -576,7 +576,6 @@ squid_LDADD = \ $(top_builddir)/lib/libmisccontainers.la \ $(top_builddir)/lib/libmiscencoding.la \ $(top_builddir)/lib/libmiscutil.la \ - $(ATOMICLIB) \ $(SSLLIB) \ $(EPOLL_LIBS) \ $(MINGW_LIBS) \ diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index 2c0f9a36a8..5c57427e49 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -71,3 +71,6 @@ DEFS += -DDEFAULT_STATEDIR=\"$(localstatedir)/run/squid\" install-data-local: $(mkinstalldirs) $(DESTDIR)$(localstatedir)/run/squid; + +libipc_la_LIBADD = $(ATOMICLIB) + diff --git a/src/ipc/mem/PageStack.cc b/src/ipc/mem/PageStack.cc index 56314709b6..a2557f7002 100644 --- a/src/ipc/mem/PageStack.cc +++ b/src/ipc/mem/PageStack.cc @@ -10,29 +10,420 @@ #include "squid.h" -#include "base/TextException.h" #include "Debug.h" #include "ipc/mem/Page.h" #include "ipc/mem/PageStack.h" -/* Ipc::Mem::PageStackStorageSlot */ +#include +#include -static_assert(sizeof(Ipc::Mem::PageStackStorageSlot::Pointer) == - sizeof(decltype(Ipc::Mem::PageId::number)), "page indexing types are consistent"); +/* + +Ipc::Mem::IdSet and related code maintains a perfect full binary tree structure: + + (l,r) + /\ + (ll,lr) (rl,rr) + /\ /\ + L1 L2 L3 L4 + +where + + * (l,r) is an always-present root node; + * inner nodes, including the root one, count the total number of available + IDs in the leaf nodes of the left and right subtrees (e.g., r = rl + rr); + * leaf nodes are bitsets of available IDs (e.g., rl = number of 1s in L3); + all leaf nodes are always present. + +The above sample tree would be stored as seven 64-bit atomic integers: + (l,r), (ll,lr), (rl,rr), L1, L2, L3, L4 + +*/ + +namespace Ipc +{ + +namespace Mem +{ + +/// the maximum number of pages that a leaf node can store +static const IdSet::size_type BitsPerLeaf = 64; + +class IdSetPosition +{ +public: + using size_type = IdSet::size_type; + + IdSetPosition() = default; ///< root node position + IdSetPosition(size_type aLevel, size_type anOffset); + + /// whether we are at the top of the tree + bool atRoot() const { return !level && !offset; } + + /// which direction is this position from our parent node + IdSetNavigationDirection ascendDirection() const; + + /// the number of levels above us (e.g., zero for the root node) + IdSet::size_type level = 0; + /// the number of nodes (at our level) to the left of us + IdSet::size_type offset = 0; +}; + +/// a helper class to perform inner node manipulation for IdSet +class IdSetInnerNode +{ +public: + using size_type = IdSet::size_type; + typedef uint64_t Packed; ///< (atomically) stored serialized value + + /// de-serializes a given value + static IdSetInnerNode Unpack(Packed packed); + + IdSetInnerNode() = default; + IdSetInnerNode(size_type left, size_type right); + + /// returns a serializes value suitable for shared memory storage + Packed pack() const { return (static_cast(left) << 32) | right; } + + size_type left = 0; ///< the number of available IDs in the left subtree + size_type right = 0; ///< the number of available IDs in the right subtree +}; + +} // namespace Mem + +} // namespace Ipc + +/* Ipc::Mem::IdSetPosition */ + +Ipc::Mem::IdSetPosition::IdSetPosition(size_type aLevel, size_type anOffset): + level(aLevel), + offset(anOffset) +{ +} + +Ipc::Mem::IdSetNavigationDirection +Ipc::Mem::IdSetPosition::ascendDirection() const +{ + return (offset % 2 == 0) ? dirLeft : dirRight; +} + +/* Ipc::Mem::IdSetMeasurements */ + +Ipc::Mem::IdSetMeasurements::IdSetMeasurements(const size_type aCapacity) +{ + capacity = aCapacity; + + // For simplicity, we want a perfect full binary tree with root and leaves. + // We could compute all this with log2() calls, but rounding and honoring + // root+leaves minimums make that approach more complex than this fast loop. + requestedLeafNodeCount = (capacity + (BitsPerLeaf-1))/BitsPerLeaf; + treeHeight = 1+1; // the root level plus the leaf nodes level + leafNodeCount = 2; // the root node can have only two leaf nodes + while (leafNodeCount < requestedLeafNodeCount) { + leafNodeCount *= 2; + ++treeHeight; + } + innerLevelCount = treeHeight - 1; + + debugs(54, 5, "rounded capacity up from " << capacity << " to " << (leafNodeCount*BitsPerLeaf)); + + // we do (1 << level) when computing 32-bit IdSetInnerNode::left + assert(treeHeight < 32); +} + +/* Ipc::Mem::IdSetInnerNode */ + +Ipc::Mem::IdSetInnerNode::IdSetInnerNode(size_type aLeft, size_type aRight): + left(aLeft), + right(aRight) +{ +} + +Ipc::Mem::IdSetInnerNode +Ipc::Mem::IdSetInnerNode::Unpack(Packed packed) +{ + // truncation during the cast is intentional here + return IdSetInnerNode(packed >> 32, static_cast(packed)); +} + +/* Ipc::Mem::IdSet */ + +Ipc::Mem::IdSet::IdSet(const size_type capacity): + measurements(capacity), + nodes_(capacity) +{ + // For valueAddress() to be able to return a raw uint64_t pointer, the + // atomic wrappers in nodes_ must be zero-size. Check the best we can. Once. + static_assert(sizeof(StoredNode) == sizeof(Node), "atomic locks use no storage"); + assert(StoredNode().is_lock_free()); + + makeFullBeforeSharing(); +} + +void +Ipc::Mem::IdSet::makeFullBeforeSharing() +{ + // initially, all IDs are marked as available + fillAllNodes(); + + // ... but IDs beyond the requested capacity should not be available + if (measurements.capacity != measurements.leafNodeCount*BitsPerLeaf) + truncateExtras(); +} + +/// populates the entire allocated tree with available IDs +/// may exceed the requested capacity; \see truncateExtras() +void +Ipc::Mem::IdSet::fillAllNodes() +{ + // leaf nodes + auto pos = Position(measurements.treeHeight-1, 0); + const auto allOnes = ~uint64_t(0); + std::fill_n(valueAddress(pos), measurements.leafNodeCount, allOnes); + + // inner nodes, starting from the bottom of the tree + auto nodesAtLevel = measurements.leafNodeCount/2; + auto pagesBelow = BitsPerLeaf; + do { + pos = ascend(pos); + const auto value = IdSetInnerNode(pagesBelow, pagesBelow).pack(); + std::fill_n(valueAddress(pos), nodesAtLevel, value); + nodesAtLevel /= 2; + pagesBelow *= 2; + } while (!pos.atRoot()); +} + +/// effectively removes IDs that exceed the requested capacity after makeFull() +void +Ipc::Mem::IdSet::truncateExtras() +{ + // leaf nodes + // start with the left-most leaf that should have some 0s; it may even have + // no 1s at all (i.e. be completely unused) + auto pos = Position(measurements.treeHeight-1, measurements.capacity/BitsPerLeaf); + leafTruncate(pos, measurements.capacity % BitsPerLeaf); + const auto rightLeaves = measurements.leafNodeCount - measurements.requestedLeafNodeCount; + // this zeroing of the leaf nodes to the right from pos is only necessary to + // trigger asserts if the code dealing with the inner node counters is buggy + if (rightLeaves > 1) + std::fill_n(valueAddress(pos) + 1, rightLeaves-1, 0); + // inner nodes, starting from the bottom of the tree; optimization: only + // adjusting nodes on the way up from the first leaf-with-0s position + auto toSubtract = BitsPerLeaf - (measurements.capacity % BitsPerLeaf); + do { + const auto direction = pos.ascendDirection(); + pos = ascend(pos); + toSubtract = innerTruncate(pos, direction, toSubtract); + } while (!pos.atRoot()); +} + +/// fill the leaf node at a given position with 0s, leaving only idsToKeep IDs void -Ipc::Mem::PageStackStorageSlot::take() +Ipc::Mem::IdSet::leafTruncate(const Position pos, const size_type idsToKeep) { - const auto nxt = nextOrMarker.exchange(TakenPage); - assert(nxt != TakenPage); + Node &node = *valueAddress(pos); // no auto to simplify the asserts() below + assert(node == std::numeric_limits::max()); // all 1s + static_assert(std::is_unsigned::value, "right shift prepends 0s"); + node >>= BitsPerLeaf - idsToKeep; + // node can be anything here, including all 0s and all 1s +} + +/// accounts for toSubtract IDs removal from a subtree in the given direction of +/// the given position +/// \returns the number of IDs to subtract from the parent node +Ipc::Mem::IdSet::size_type +Ipc::Mem::IdSet::innerTruncate(const Position pos, const NavigationDirection dir, const size_type toSubtract) +{ + auto *valuePtr = valueAddress(pos); + auto value = IdSetInnerNode::Unpack(*valuePtr); + size_type toSubtractNext = 0; + if (dir == dirLeft) { + toSubtractNext = toSubtract + value.right; + assert(value.left >= toSubtract); + value.left -= toSubtract; + value.right = 0; + } else { + assert(dir == dirRight); + toSubtractNext = toSubtract; + assert(value.right >= toSubtract); + // value.left is unchanged; we have only adjusted the right branch + value.right -= toSubtract; + } + *valuePtr = value.pack(); + return toSubtractNext; +} + +/// accounts for an ID added to subtree in the given dir from the given position +void +Ipc::Mem::IdSet::innerPush(const Position pos, const NavigationDirection dir) +{ + // either left or right component will be true/1; the other will be false/0 + const auto increment = IdSetInnerNode(dir == dirLeft, dir == dirRight).pack(); + const auto previousValue = nodeAt(pos).fetch_add(increment); + // no overflows + assert(previousValue <= std::numeric_limits::max() - increment); +} + +/// accounts for future ID removal from a subtree of the given position +/// \returns the direction of the subtree chosen to relinquish the ID +Ipc::Mem::IdSet::NavigationDirection +Ipc::Mem::IdSet::innerPop(const Position pos) +{ + NavigationDirection direction = dirNone; + + auto &node = nodeAt(pos); + auto oldValue = node.load(); + IdSetInnerNode newValue; + do { + newValue = IdSetInnerNode::Unpack(oldValue); + if (newValue.left) { + --newValue.left; + direction = dirLeft; + } else if (newValue.right) { + --newValue.right; + direction = dirRight; + } else { + return dirEnd; + } + } while (!node.compare_exchange_weak(oldValue, newValue.pack())); + + assert(direction == dirLeft || direction == dirRight); + return direction; } +/// adds the given ID to the leaf node at the given position void -Ipc::Mem::PageStackStorageSlot::put(const PointerOrMarker expected, const Pointer nxt) +Ipc::Mem::IdSet::leafPush(const Position pos, const size_type id) { - assert(nxt != TakenPage); - const auto old = nextOrMarker.exchange(nxt); - assert(old == expected); + const auto mask = Node(1) << (id % BitsPerLeaf); + const auto oldValue = nodeAt(pos).fetch_or(mask); + // this was a new entry + assert((oldValue & mask) == 0); +} + +// TODO: After switching to C++20, use countr_zero() which may compile to a +// single TZCNT assembly instruction on modern CPUs. +/// a temporary C++20 countr_zero() replacement +static inline +int trailingZeros(uint64_t x) +{ + if (!x) + return 64; + int count = 0; + for (uint64_t mask = 1; !(x & mask); mask <<= 1) + ++count; + return count; +} + +/// extracts and returns an ID from the leaf node at the given position +Ipc::Mem::IdSet::size_type +Ipc::Mem::IdSet::leafPop(const Position pos) +{ + auto &node = nodeAt(pos); + auto oldValue = node.load(); + Node newValue; + do { + assert(oldValue > 0); + const auto mask = oldValue - 1; // flips the rightmost 1 and trailing 0s + newValue = oldValue & mask; // clears the rightmost 1 + } while (!node.compare_exchange_weak(oldValue, newValue)); + + return pos.offset*BitsPerLeaf + trailingZeros(oldValue); +} + +/// \returns the position of a parent node of the node at the given position +Ipc::Mem::IdSet::Position +Ipc::Mem::IdSet::ascend(Position pos) +{ + assert(pos.level > 0); + --pos.level; + pos.offset /= 2; + return pos; +} + +/// \returns the position of a child node in the given direction of the parent +/// node at the given position +Ipc::Mem::IdSet::Position +Ipc::Mem::IdSet::descend(Position pos, const NavigationDirection direction) +{ + assert(pos.level < measurements.treeHeight); + ++pos.level; + + pos.offset *= 2; + if (direction == dirRight) + ++pos.offset; + else + assert(direction == dirLeft); + + return pos; +} + +/// \returns the atomic node (either inner or leaf) at the given position +Ipc::Mem::IdSet::StoredNode & +Ipc::Mem::IdSet::nodeAt(const Position pos) +{ + assert(pos.level < measurements.treeHeight); + // n = 2^(h+1) - 1 with h = level-1 + const auto nodesAbove = (1U << pos.level) - 1; + + // the second clause is for the special case of a root node + assert(pos.offset < nodesAbove*2 || (pos.atRoot() && nodesAbove == 0)); + const auto nodesToTheLeft = pos.offset; + + const size_t nodesBefore = nodesAbove + nodesToTheLeft; + assert(nodesBefore < measurements.nodeCount()); + return nodes_[nodesBefore]; +} + +/// \returns the location of the raw (inner or leaf) node at the given position +Ipc::Mem::IdSet::Node * +Ipc::Mem::IdSet::valueAddress(const Position pos) +{ + // IdSet() constructor asserts that this frequent reinterpret_cast is safe + return &reinterpret_cast(nodeAt(pos)); +} + +bool +Ipc::Mem::IdSet::pop(size_type &id) +{ + Position rootPos; + const auto directionFromRoot = innerPop(rootPos); + if (directionFromRoot == dirEnd) + return false; // an empty tree + + auto pos = descend(rootPos, directionFromRoot); + for (size_t level = 1; level < measurements.innerLevelCount; ++level) { + const auto direction = innerPop(pos); + pos = descend(pos, direction); + } + + id = leafPop(pos); + return true; +} + +void +Ipc::Mem::IdSet::push(const size_type id) +{ + const auto offsetAtLeafLevel = id/BitsPerLeaf; + auto pos = Position(measurements.innerLevelCount, offsetAtLeafLevel); + leafPush(pos, id); + + do { + const auto direction = pos.ascendDirection(); + pos = ascend(pos); + innerPush(pos, direction); + } while (!pos.atRoot()); +} + +size_t +Ipc::Mem::IdSet::MemorySize(const size_type capacity) +{ + const IdSetMeasurements measurements(capacity); + // Adding sizeof(IdSet) double-counts the first node but it is better to + // overestimate (a little) than to underestimate our memory needs due to + // padding, new data members, etc. + return sizeof(IdSet) + measurements.nodeCount() * sizeof(StoredNode); } /* Ipc::Mem::PageStack */ @@ -40,25 +431,9 @@ Ipc::Mem::PageStackStorageSlot::put(const PointerOrMarker expected, const Pointe Ipc::Mem::PageStack::PageStack(const PoolId aPoolId, const PageCount aCapacity, const size_t aPageSize): thePoolId(aPoolId), capacity_(aCapacity), thePageSize(aPageSize), size_(0), - head_(Slot::NilPtr), - slots_(aCapacity) -{ - assert(thePoolId); - - assert(capacity_ < Slot::TakenPage); - assert(capacity_ < Slot::NilPtr); - - // initially, all pages are free - if (capacity_) { - const auto lastIndex = capacity_-1; - // FlexibleArray cannot construct its phantom elements so, technically, - // all slots (except the very first one) are uninitialized until now. - for (Slot::Pointer i = 0; i < lastIndex; ++i) - (void)new(&slots_[i])Slot(i+1); - (void)new(&slots_[lastIndex])Slot(Slot::NilPtr); - size_ = capacity_; - head_ = 0; - } + ids_(capacity_) +{ + size_ = capacity_; } bool @@ -66,23 +441,21 @@ Ipc::Mem::PageStack::pop(PageId &page) { assert(!page); - Slot::Pointer current = head_.load(); + if (!capacity_) + return false; - auto nextFree = Slot::NilPtr; - do { - if (current == Slot::NilPtr) - return false; - nextFree = slots_[current].next(); - } while (!head_.compare_exchange_weak(current, nextFree)); + IdSet::size_type pageIndex = 0; + if (!ids_.pop(pageIndex)) + return false; // must decrement after removing the page to avoid underflow const auto newSize = --size_; assert(newSize < capacity_); - slots_[current].take(); - page.number = current + 1; + page.number = pageIndex + 1; page.pool = thePoolId; debugs(54, 8, page << " size: " << newSize); + assert(pageIdIsValid(page)); return true; } @@ -93,19 +466,12 @@ Ipc::Mem::PageStack::push(PageId &page) assert(page); assert(pageIdIsValid(page)); - const auto pageIndex = page.number - 1; - auto &slot = slots_[pageIndex]; - // must increment before inserting the page to avoid underflow in pop() const auto newSize = ++size_; assert(newSize <= capacity_); - auto current = head_.load(); - auto expected = Slot::TakenPage; - do { - slot.put(expected, current); - expected = current; - } while (!head_.compare_exchange_weak(current, pageIndex)); + const auto pageIndex = page.number - 1; + ids_.push(pageIndex); debugs(54, 8, page << " size: " << newSize); page = PageId(); @@ -135,7 +501,10 @@ Ipc::Mem::PageStack::SharedMemorySize(const PoolId, const PageCount capacity, co size_t Ipc::Mem::PageStack::StackSize(const PageCount capacity) { - return sizeof(PageStack) + capacity * sizeof(Slot); + // Adding sizeof(PageStack) double-counts the fixed portion of the ids_ data + // member but it is better to overestimate (a little) than to underestimate + // our memory needs due to padding, new data members, etc. + return sizeof(PageStack) + IdSet::MemorySize(capacity); } size_t diff --git a/src/ipc/mem/PageStack.h b/src/ipc/mem/PageStack.h index a9b051e280..46a3822e06 100644 --- a/src/ipc/mem/PageStack.h +++ b/src/ipc/mem/PageStack.h @@ -23,39 +23,83 @@ namespace Mem class PageId; -/// reflects the dual nature of PageStack storage: -/// - for free pages, this is a pointer to the next free page -/// - for used pages, this is a "used page" marker -class PageStackStorageSlot +class IdSetPosition; +typedef enum { dirNone, dirLeft, dirRight, dirEnd } IdSetNavigationDirection; + +/// basic IdSet storage parameters, extracted here to keep them constant +class IdSetMeasurements +{ +public: + /// we need to fit two size_type counters into one 64-bit lockless atomic + typedef uint32_t size_type; + + explicit IdSetMeasurements(size_type capacity); + + /// the maximum number of pages our tree is allowed to store + size_type capacity = 0; + + /// the number of leaf nodes that satisfy capacity requirements + size_type requestedLeafNodeCount = 0; + + size_type treeHeight = 0; ///< total number of levels, including the leaf level + size_type leafNodeCount = 0; ///< the number of nodes at the leaf level + size_type innerLevelCount = 0; ///< all levels except the leaf level + + /// the total number of nodes at all levels + size_type nodeCount() const { return leafNodeCount ? leafNodeCount*2 -1 : 0; } +}; + +/// a shareable set of positive uint32_t IDs with O(1) insertion/removal ops +class IdSet { public: - // We are using uint32_t for Pointer because PageId::number is uint32_t. - // PageId::number should probably be uint64_t to accommodate caches with - // page numbers exceeding UINT32_MAX. - typedef uint32_t PointerOrMarker; - typedef PointerOrMarker Pointer; - typedef PointerOrMarker Marker; + using size_type = IdSetMeasurements::size_type; + using Position = IdSetPosition; + using NavigationDirection = IdSetNavigationDirection; - /// represents a nil next slot pointer - static const Pointer NilPtr = std::numeric_limits::max(); - /// marks a slot of a used (i.e. take()n) page - static const Marker TakenPage = std::numeric_limits::max() - 1; - static_assert(TakenPage != NilPtr, "magic PointerOrMarker values do not clash"); + /// memory size required to store a tree with the given capacity + static size_t MemorySize(size_type capacity); - explicit PageStackStorageSlot(const Pointer nxt = NilPtr): nextOrMarker(nxt) {} + explicit IdSet(size_type capacity); - /// returns a (possibly nil) pointer to the next free page - Pointer next() const { return nextOrMarker.load(); } + /// populates the allocated tree with the requested capacity IDs + /// optimized to run without atomic protection + void makeFullBeforeSharing(); - /// marks our page as used - void take(); + /// finds/extracts (into the given `id`) an ID value and returns true + /// \retval false no IDs are left + bool pop(size_type &id); - /// marks our page as free, to be used before the given `nxt` page; - /// also checks that the slot state matches the caller expectations - void put(const PointerOrMarker expected, const Pointer nxt); + /// makes `id` value available to future pop() callers + void push(size_type id); + + const IdSetMeasurements measurements; private: - std::atomic nextOrMarker; + typedef uint64_t Node; ///< either leaf or intermediate node + typedef std::atomic StoredNode; ///< a Node stored in shared memory + + /* optimization: these initialization methods bypass atomic protections */ + void fillAllNodes(); + void truncateExtras(); + Node *valueAddress(Position); + size_type innerTruncate(Position pos, NavigationDirection dir, size_type toSubtract); + void leafTruncate(Position pos, size_type idsToKeep); + + void innerPush(Position, NavigationDirection); + NavigationDirection innerPop(Position); + + void leafPush(Position, size_type id); + size_type leafPop(Position); + + Position ascend(Position); + Position descend(Position, NavigationDirection); + + StoredNode &nodeAt(Position); + + /// the entire binary tree flattened into an array + FlexibleArray nodes_; + // No more data members should follow! See FlexibleArray<> for details. }; /// Atomic container of "free" PageIds. Detects (some) double-free bugs. @@ -113,8 +157,6 @@ public: static PoolId IdForSwapDirSpace(const int dirIdx) { return 900 + dirIdx + 1; } private: - using Slot = PageStackStorageSlot; - // XXX: theFoo members look misplaced due to messy separation of PagePool // (which should support multiple Segments but does not) and PageStack // (which should not calculate the Segment size but does) duties. @@ -124,12 +166,8 @@ private: /// a lower bound for the number of free pages (for debugging purposes) std::atomic size_; - /// the index of the first free stack element or nil - std::atomic head_; - - /// slots indexed using their page number - Ipc::Mem::FlexibleArray slots_; - // No more data members should follow! See Ipc::Mem::FlexibleArray<> for details. + IdSet ids_; ///< free pages (valid with positive capacity_) + // No more data members should follow! See IdSet for details. }; } // namespace Mem -- 2.39.5