]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Improve sift up/down code in binaryheap.c and logtape.c.
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 14 Dec 2021 18:35:22 +0000 (13:35 -0500)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 14 Dec 2021 18:35:22 +0000 (13:35 -0500)
Borrow the logic that's long been used in tuplesort.c: instead
of physically swapping the data in two heap entries, keep the
value that's being sifted up or down in a local variable, and
just move the other values as necessary.  This makes the code
shorter as well as faster.  It's not clear that any current
callers are really time-critical enough to notice, but we
might as well code heap maintenance the same way everywhere.

Ma Liangzhu and Tom Lane

Discussion: https://postgr.es/m/17336-fc4e522d26a750fd@postgresql.org

src/backend/lib/binaryheap.c
src/backend/utils/sort/logtape.c

index d54e24529919483e6e7e13dd2ad1f6dd57523ab6..4198d9fcd9d13d4beb460650630e02a96c9212c8 100644 (file)
@@ -19,7 +19,6 @@
 
 static void sift_down(binaryheap *heap, int node_off);
 static void sift_up(binaryheap *heap, int node_off);
-static inline void swap_nodes(binaryheap *heap, int a, int b);
 
 /*
  * binaryheap_allocate
@@ -173,24 +172,28 @@ binaryheap_first(binaryheap *heap)
 Datum
 binaryheap_remove_first(binaryheap *heap)
 {
+       Datum           result;
+
        Assert(!binaryheap_empty(heap) && heap->bh_has_heap_property);
 
+       /* extract the root node, which will be the result */
+       result = heap->bh_nodes[0];
+
+       /* easy if heap contains one element */
        if (heap->bh_size == 1)
        {
                heap->bh_size--;
-               return heap->bh_nodes[0];
+               return result;
        }
 
        /*
-        * Swap the root and last nodes, decrease the size of the heap (i.e.
-        * remove the former root node) and sift the new root node down to its
-        * correct position.
+        * Remove the last node, placing it in the vacated root entry, and sift
+        * the new root node down to its correct position.
         */
-       swap_nodes(heap, 0, heap->bh_size - 1);
-       heap->bh_size--;
+       heap->bh_nodes[0] = heap->bh_nodes[--heap->bh_size];
        sift_down(heap, 0);
 
-       return heap->bh_nodes[heap->bh_size];
+       return result;
 }
 
 /*
@@ -211,19 +214,6 @@ binaryheap_replace_first(binaryheap *heap, Datum d)
                sift_down(heap, 0);
 }
 
-/*
- * Swap the contents of two nodes.
- */
-static inline void
-swap_nodes(binaryheap *heap, int a, int b)
-{
-       Datum           swap;
-
-       swap = heap->bh_nodes[a];
-       heap->bh_nodes[a] = heap->bh_nodes[b];
-       heap->bh_nodes[b] = swap;
-}
-
 /*
  * Sift a node up to the highest position it can hold according to the
  * comparator.
@@ -231,29 +221,40 @@ swap_nodes(binaryheap *heap, int a, int b)
 static void
 sift_up(binaryheap *heap, int node_off)
 {
+       Datum           node_val = heap->bh_nodes[node_off];
+
+       /*
+        * Within the loop, the node_off'th array entry is a "hole" that
+        * notionally holds node_val, but we don't actually store node_val there
+        * till the end, saving some unnecessary data copying steps.
+        */
        while (node_off != 0)
        {
                int                     cmp;
                int                     parent_off;
+               Datum           parent_val;
 
                /*
                 * If this node is smaller than its parent, the heap condition is
                 * satisfied, and we're done.
                 */
                parent_off = parent_offset(node_off);
-               cmp = heap->bh_compare(heap->bh_nodes[node_off],
-                                                          heap->bh_nodes[parent_off],
+               parent_val = heap->bh_nodes[parent_off];
+               cmp = heap->bh_compare(node_val,
+                                                          parent_val,
                                                           heap->bh_arg);
                if (cmp <= 0)
                        break;
 
                /*
-                * Otherwise, swap the node and its parent and go on to check the
-                * node's new parent.
+                * Otherwise, swap the parent value with the hole, and go on to check
+                * the node's new parent.
                 */
-               swap_nodes(heap, node_off, parent_off);
+               heap->bh_nodes[node_off] = parent_val;
                node_off = parent_off;
        }
+       /* Re-fill the hole */
+       heap->bh_nodes[node_off] = node_val;
 }
 
 /*
@@ -263,6 +264,13 @@ sift_up(binaryheap *heap, int node_off)
 static void
 sift_down(binaryheap *heap, int node_off)
 {
+       Datum           node_val = heap->bh_nodes[node_off];
+
+       /*
+        * Within the loop, the node_off'th array entry is a "hole" that
+        * notionally holds node_val, but we don't actually store node_val there
+        * till the end, saving some unnecessary data copying steps.
+        */
        while (true)
        {
                int                     left_off = left_offset(node_off);
@@ -271,14 +279,14 @@ sift_down(binaryheap *heap, int node_off)
 
                /* Is the left child larger than the parent? */
                if (left_off < heap->bh_size &&
-                       heap->bh_compare(heap->bh_nodes[node_off],
+                       heap->bh_compare(node_val,
                                                         heap->bh_nodes[left_off],
                                                         heap->bh_arg) < 0)
                        swap_off = left_off;
 
                /* Is the right child larger than the parent? */
                if (right_off < heap->bh_size &&
-                       heap->bh_compare(heap->bh_nodes[node_off],
+                       heap->bh_compare(node_val,
                                                         heap->bh_nodes[right_off],
                                                         heap->bh_arg) < 0)
                {
@@ -298,10 +306,12 @@ sift_down(binaryheap *heap, int node_off)
                        break;
 
                /*
-                * Otherwise, swap the node with the child that violates the heap
+                * Otherwise, swap the hole with the child that violates the heap
                 * property; then go on to check its children.
                 */
-               swap_nodes(heap, swap_off, node_off);
+               heap->bh_nodes[node_off] = heap->bh_nodes[swap_off];
                node_off = swap_off;
        }
+       /* Re-fill the hole */
+       heap->bh_nodes[node_off] = node_val;
 }
index 722708237b1cb504bb67eafba2c7f0d760b3d147..9366cb7e252e51f985a7dc11cc2dcb2f192ffd6b 100644 (file)
@@ -340,16 +340,6 @@ ltsReadFillBuffer(LogicalTape *lt)
        return (lt->nbytes > 0);
 }
 
-static inline void
-swap_nodes(long *heap, unsigned long a, unsigned long b)
-{
-       long            swap;
-
-       swap = heap[a];
-       heap[a] = heap[b];
-       heap[b] = swap;
-}
-
 static inline unsigned long
 left_offset(unsigned long i)
 {
@@ -390,31 +380,33 @@ ltsGetFreeBlock(LogicalTapeSet *lts)
        long       *heap = lts->freeBlocks;
        long            blocknum;
        int                     heapsize;
-       unsigned long pos;
+       long            holeval;
+       unsigned long holepos;
 
        /* freelist empty; allocate a new block */
        if (lts->nFreeBlocks == 0)
                return lts->nBlocksAllocated++;
 
+       /* easy if heap contains one element */
        if (lts->nFreeBlocks == 1)
        {
                lts->nFreeBlocks--;
                return lts->freeBlocks[0];
        }
 
-       /* take top of minheap */
+       /* remove top of minheap */
        blocknum = heap[0];
 
-       /* replace with end of minheap array */
-       heap[0] = heap[--lts->nFreeBlocks];
+       /* we'll replace it with end of minheap array */
+       holeval = heap[--lts->nFreeBlocks];
 
        /* sift down */
-       pos = 0;
+       holepos = 0;                            /* holepos is where the "hole" is */
        heapsize = lts->nFreeBlocks;
        while (true)
        {
-               unsigned long left = left_offset(pos);
-               unsigned long right = right_offset(pos);
+               unsigned long left = left_offset(holepos);
+               unsigned long right = right_offset(holepos);
                unsigned long min_child;
 
                if (left < heapsize && right < heapsize)
@@ -426,12 +418,13 @@ ltsGetFreeBlock(LogicalTapeSet *lts)
                else
                        break;
 
-               if (heap[min_child] >= heap[pos])
+               if (heap[min_child] >= holeval)
                        break;
 
-               swap_nodes(heap, min_child, pos);
-               pos = min_child;
+               heap[holepos] = heap[min_child];
+               holepos = min_child;
        }
+       heap[holepos] = holeval;
 
        return blocknum;
 }
@@ -483,7 +476,7 @@ static void
 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
 {
        long       *heap;
-       unsigned long pos;
+       unsigned long holepos;
 
        /*
         * Do nothing if we're no longer interested in remembering free space.
@@ -508,24 +501,23 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
                                                                                        lts->freeBlocksLen * sizeof(long));
        }
 
+       /* create a "hole" at end of minheap array */
        heap = lts->freeBlocks;
-       pos = lts->nFreeBlocks;
-
-       /* place entry at end of minheap array */
-       heap[pos] = blocknum;
+       holepos = lts->nFreeBlocks;
        lts->nFreeBlocks++;
 
-       /* sift up */
-       while (pos != 0)
+       /* sift up to insert blocknum */
+       while (holepos != 0)
        {
-               unsigned long parent = parent_offset(pos);
+               unsigned long parent = parent_offset(holepos);
 
-               if (heap[parent] < heap[pos])
+               if (heap[parent] < blocknum)
                        break;
 
-               swap_nodes(heap, parent, pos);
-               pos = parent;
+               heap[holepos] = heap[parent];
+               holepos = parent;
        }
+       heap[holepos] = blocknum;
 }
 
 /*