]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Optimize popcount functions with ARM Neon intrinsics.
authorNathan Bossart <nathan@postgresql.org>
Fri, 28 Mar 2025 19:49:35 +0000 (14:49 -0500)
committerNathan Bossart <nathan@postgresql.org>
Fri, 28 Mar 2025 19:49:35 +0000 (14:49 -0500)
This commit introduces Neon implementations of pg_popcount{32,64},
pg_popcount(), and pg_popcount_masked().  As in simd.h, we assume
that all available AArch64 hardware supports Neon, so we don't need
any new configure-time or runtime checks.  Some compilers already
emit Neon instructions for these functions, but our hand-rolled
implementations for pg_popcount() and pg_popcount_masked()
performed better in testing, likely due to better instruction-level
parallelism.

Author: "Chiranmoy.Bhattacharya@fujitsu.com" <Chiranmoy.Bhattacharya@fujitsu.com>
Reviewed-by: John Naylor <johncnaylorls@gmail.com>
Discussion: https://postgr.es/m/010101936e4aaa70-b474ab9e-b9ce-474d-a3ba-a3dc223d295c-000000%40us-west-2.amazonses.com

src/include/port/pg_bitutils.h
src/port/Makefile
src/port/meson.build
src/port/pg_bitutils.c
src/port/pg_popcount_aarch64.c [new file with mode: 0644]

index 3067ff402bad23dbf7d0dc84725a2960e22a29f7..a387f77c2c005a4954db782e6bb6667e565df258 100644 (file)
@@ -298,6 +298,15 @@ pg_ceil_log2_64(uint64 num)
 #endif
 #endif
 
+/*
+ * On AArch64, we can use Neon instructions if the compiler provides access to
+ * them (as indicated by __ARM_NEON).  As in simd.h, we assume that all
+ * available 64-bit hardware has Neon support.
+ */
+#if defined(__aarch64__) && defined(__ARM_NEON)
+#define POPCNT_AARCH64 1
+#endif
+
 #ifdef TRY_POPCNT_X86_64
 /* Attempt to use the POPCNT instruction, but perform a runtime check first */
 extern PGDLLIMPORT int (*pg_popcount32) (uint32 word);
index 3bf51659ee25db70861d8af966e11234390cae40..3e51bf1bd653e56b7081b16aa647a87c03819cf5 100644 (file)
@@ -46,6 +46,7 @@ OBJS = \
        path.o \
        pg_bitutils.o \
        pg_localeconv_r.o \
+       pg_popcount_aarch64.o \
        pg_popcount_avx512.o \
        pg_strong_random.o \
        pgcheckdir.o \
index 06809692ad0b1dc3a4b1757d5cf50890973a2fa4..45954dd2808da31091c9fa0a2cedf5f3ab57dd99 100644 (file)
@@ -9,6 +9,7 @@ pgport_sources = [
   'path.c',
   'pg_bitutils.c',
   'pg_localeconv_r.c',
+  'pg_popcount_aarch64.c',
   'pg_popcount_avx512.c',
   'pg_strong_random.c',
   'pgcheckdir.c',
index 82be40e2fb40c628e157c8028cb709284008a5f7..61c7388f474778da78ff5e02d58f680a9f201008 100644 (file)
@@ -103,10 +103,15 @@ const uint8 pg_number_of_ones[256] = {
        4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8
 };
 
+/*
+ * If we are building the Neon versions, we don't need the "slow" fallbacks.
+ */
+#ifndef POPCNT_AARCH64
 static inline int pg_popcount32_slow(uint32 word);
 static inline int pg_popcount64_slow(uint64 word);
 static uint64 pg_popcount_slow(const char *buf, int bytes);
 static uint64 pg_popcount_masked_slow(const char *buf, int bytes, bits8 mask);
+#endif
 
 #ifdef TRY_POPCNT_X86_64
 static bool pg_popcount_available(void);
@@ -339,6 +344,10 @@ pg_popcount_masked_fast(const char *buf, int bytes, bits8 mask)
 
 #endif                                                 /* TRY_POPCNT_X86_64 */
 
+/*
+ * If we are building the Neon versions, we don't need the "slow" fallbacks.
+ */
+#ifndef POPCNT_AARCH64
 
 /*
  * pg_popcount32_slow
@@ -486,14 +495,15 @@ pg_popcount_masked_slow(const char *buf, int bytes, bits8 mask)
        return popcnt;
 }
 
-#ifndef TRY_POPCNT_X86_64
+#endif                                                 /* ! POPCNT_AARCH64 */
+
+#if !defined(TRY_POPCNT_X86_64) && !defined(POPCNT_AARCH64)
 
 /*
- * When the POPCNT instruction is not available, there's no point in using
+ * When special CPU instructions are not available, there's no point in using
  * function pointers to vary the implementation between the fast and slow
- * method.  We instead just make these actual external functions when
- * TRY_POPCNT_X86_64 is not defined.  The compiler should be able to inline
- * the slow versions here.
+ * method.  We instead just make these actual external functions.  The compiler
+ * should be able to inline the slow versions here.
  */
 int
 pg_popcount32(uint32 word)
@@ -527,4 +537,4 @@ pg_popcount_masked_optimized(const char *buf, int bytes, bits8 mask)
        return pg_popcount_masked_slow(buf, bytes, mask);
 }
 
-#endif                                                 /* !TRY_POPCNT_X86_64 */
+#endif                                                 /* ! TRY_POPCNT_X86_64 && ! POPCNT_AARCH64 */
diff --git a/src/port/pg_popcount_aarch64.c b/src/port/pg_popcount_aarch64.c
new file mode 100644 (file)
index 0000000..29b8c06
--- /dev/null
@@ -0,0 +1,208 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_popcount_aarch64.c
+ *       Holds the AArch64 popcount implementations.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/port/pg_popcount_aarch64.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "c.h"
+
+#include "port/pg_bitutils.h"
+
+#ifdef POPCNT_AARCH64
+
+#include <arm_neon.h>
+
+/*
+ * pg_popcount32
+ *             Return number of 1 bits in word
+ */
+int
+pg_popcount32(uint32 word)
+{
+       return pg_popcount64((uint64) word);
+}
+
+/*
+ * pg_popcount64
+ *             Return number of 1 bits in word
+ */
+int
+pg_popcount64(uint64 word)
+{
+       /*
+        * For some compilers, __builtin_popcountl() already emits Neon
+        * instructions.  The line below should compile to the same code on those
+        * systems.
+        */
+       return vaddv_u8(vcnt_u8(vld1_u8((const uint8 *) &word)));
+}
+
+/*
+ * pg_popcount_optimized
+ *             Returns number of 1 bits in buf
+ */
+uint64
+pg_popcount_optimized(const char *buf, int bytes)
+{
+       uint8x16_t      vec;
+       uint64x2_t      accum1 = vdupq_n_u64(0),
+                               accum2 = vdupq_n_u64(0),
+                               accum3 = vdupq_n_u64(0),
+                               accum4 = vdupq_n_u64(0);
+       uint32          bytes_per_iteration = 4 * sizeof(uint8x16_t);
+       uint64          popcnt = 0;
+
+       /*
+        * For better instruction-level parallelism, each loop iteration operates
+        * on a block of four registers.
+        */
+       for (; bytes >= bytes_per_iteration; bytes -= bytes_per_iteration)
+       {
+               vec = vld1q_u8((const uint8 *) buf);
+               accum1 = vpadalq_u32(accum1, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vld1q_u8((const uint8 *) buf);
+               accum2 = vpadalq_u32(accum2, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vld1q_u8((const uint8 *) buf);
+               accum3 = vpadalq_u32(accum3, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vld1q_u8((const uint8 *) buf);
+               accum4 = vpadalq_u32(accum4, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+       }
+
+       /*
+        * If enough data remains, do another iteration on a block of two
+        * registers.
+        */
+       bytes_per_iteration = 2 * sizeof(uint8x16_t);
+       if (bytes >= bytes_per_iteration)
+       {
+               vec = vld1q_u8((const uint8 *) buf);
+               accum1 = vpadalq_u32(accum1, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vld1q_u8((const uint8 *) buf);
+               accum2 = vpadalq_u32(accum2, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               bytes -= bytes_per_iteration;
+       }
+
+       /*
+        * Add the accumulators.
+        */
+       popcnt += vaddvq_u64(vaddq_u64(accum1, accum2));
+       popcnt += vaddvq_u64(vaddq_u64(accum3, accum4));
+
+       /*
+        * Process remaining 8-byte blocks.
+        */
+       for (; bytes >= sizeof(uint64); bytes -= sizeof(uint64))
+       {
+               popcnt += pg_popcount64(*((uint64 *) buf));
+               buf += sizeof(uint64);
+       }
+
+       /*
+        * Process any remaining data byte-by-byte.
+        */
+       while (bytes--)
+               popcnt += pg_number_of_ones[(unsigned char) *buf++];
+
+       return popcnt;
+}
+
+/*
+ * pg_popcount_masked_optimized
+ *             Returns number of 1 bits in buf after applying the mask to each byte
+ */
+uint64
+pg_popcount_masked_optimized(const char *buf, int bytes, bits8 mask)
+{
+       uint8x16_t      vec,
+                               maskv = vdupq_n_u8(mask);
+       uint64x2_t      accum1 = vdupq_n_u64(0),
+                               accum2 = vdupq_n_u64(0),
+                               accum3 = vdupq_n_u64(0),
+                               accum4 = vdupq_n_u64(0);
+       uint32          bytes_per_iteration = 4 * sizeof(uint8x16_t);
+       uint64          popcnt = 0,
+                               mask64 = ~UINT64CONST(0) / 0xFF * mask;
+
+       /*
+        * For better instruction-level parallelism, each loop iteration operates
+        * on a block of four registers.
+        */
+       for (; bytes >= bytes_per_iteration; bytes -= bytes_per_iteration)
+       {
+               vec = vandq_u8(vld1q_u8((const uint8 *) buf), maskv);
+               accum1 = vpadalq_u32(accum1, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vandq_u8(vld1q_u8((const uint8 *) buf), maskv);
+               accum2 = vpadalq_u32(accum2, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vandq_u8(vld1q_u8((const uint8 *) buf), maskv);
+               accum3 = vpadalq_u32(accum3, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vandq_u8(vld1q_u8((const uint8 *) buf), maskv);
+               accum4 = vpadalq_u32(accum4, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+       }
+
+       /*
+        * If enough data remains, do another iteration on a block of two
+        * registers.
+        */
+       bytes_per_iteration = 2 * sizeof(uint8x16_t);
+       if (bytes >= bytes_per_iteration)
+       {
+               vec = vandq_u8(vld1q_u8((const uint8 *) buf), maskv);
+               accum1 = vpadalq_u32(accum1, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               vec = vandq_u8(vld1q_u8((const uint8 *) buf), maskv);
+               accum2 = vpadalq_u32(accum2, vpaddlq_u16(vpaddlq_u8(vcntq_u8(vec))));
+               buf += sizeof(uint8x16_t);
+
+               bytes -= bytes_per_iteration;
+       }
+
+       /*
+        * Add the accumulators.
+        */
+       popcnt += vaddvq_u64(vaddq_u64(accum1, accum2));
+       popcnt += vaddvq_u64(vaddq_u64(accum3, accum4));
+
+       /*
+        * Process remining 8-byte blocks.
+        */
+       for (; bytes >= sizeof(uint64); bytes -= sizeof(uint64))
+       {
+               popcnt += pg_popcount64(*((uint64 *) buf) & mask64);
+               buf += sizeof(uint64);
+       }
+
+       /*
+        * Process any remaining data byte-by-byte.
+        */
+       while (bytes--)
+               popcnt += pg_number_of_ones[(unsigned char) *buf++ & mask];
+
+       return popcnt;
+}
+
+#endif                                                 /* POPCNT_AARCH64 */