]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add optimized functions for linear search within byte arrays
authorJohn Naylor <john.naylor@postgresql.org>
Sun, 21 Aug 2022 04:14:01 +0000 (21:14 -0700)
committerJohn Naylor <john.naylor@postgresql.org>
Fri, 26 Aug 2022 07:03:39 +0000 (14:03 +0700)
In similar vein to b6ef167564, add pg_lfind8() and pg_lfind8_le()
to search for bytes equal or less-than-or-equal to a given byte,
respectively. To abstract away platform details, add helper functions
and typedefs to simd.h.

John Naylor and Nathan Bossart, per suggestion from Andres Freund

Discussion: https://www.postgresql.org/message-id/CAFBsxsGzaaGLF%3DNuq61iRXTyspbO9rOjhSqFN%3DV6ozzmta5mXg%40mail.gmail.com

src/include/port/pg_lfind.h
src/include/port/simd.h
src/test/modules/test_lfind/expected/test_lfind.out
src/test/modules/test_lfind/sql/test_lfind.sql
src/test/modules/test_lfind/test_lfind--1.0.sql
src/test/modules/test_lfind/test_lfind.c

index fb125977b2eb06232310a469d971b43773220ea0..a4e13dffec0f6aeb63c0ec43b62880fba030237c 100644 (file)
@@ -1,7 +1,8 @@
 /*-------------------------------------------------------------------------
  *
  * pg_lfind.h
- *       Optimized linear search routines.
+ *       Optimized linear search routines using SIMD intrinsics where
+ *       available.
  *
  * Copyright (c) 2022, PostgreSQL Global Development Group
  *
 
 #include "port/simd.h"
 
+/*
+ * pg_lfind8
+ *
+ * Return true if there is an element in 'base' that equals 'key', otherwise
+ * return false.
+ */
+static inline bool
+pg_lfind8(uint8 key, uint8 *base, uint32 nelem)
+{
+       uint32          i;
+
+       /* round down to multiple of vector length */
+       uint32          tail_idx = nelem & ~(sizeof(Vector8) - 1);
+       Vector8         chunk;
+
+       for (i = 0; i < tail_idx; i += sizeof(Vector8))
+       {
+               vector8_load(&chunk, &base[i]);
+               if (vector8_has(chunk, key))
+                       return true;
+       }
+
+       /* Process the remaining elements one at a time. */
+       for (; i < nelem; i++)
+       {
+               if (key == base[i])
+                       return true;
+       }
+
+       return false;
+}
+
+/*
+ * pg_lfind8_le
+ *
+ * Return true if there is an element in 'base' that is less than or equal to
+ * 'key', otherwise return false.
+ */
+static inline bool
+pg_lfind8_le(uint8 key, uint8 *base, uint32 nelem)
+{
+       uint32          i;
+
+       /* round down to multiple of vector length */
+       uint32          tail_idx = nelem & ~(sizeof(Vector8) - 1);
+       Vector8         chunk;
+
+       for (i = 0; i < tail_idx; i += sizeof(Vector8))
+       {
+               vector8_load(&chunk, &base[i]);
+               if (vector8_has_le(chunk, key))
+                       return true;
+       }
+
+       /* Process the remaining elements one at a time. */
+       for (; i < nelem; i++)
+       {
+               if (base[i] <= key)
+                       return true;
+       }
+
+       return false;
+}
+
 /*
  * pg_lfind32
  *
@@ -26,7 +91,6 @@ pg_lfind32(uint32 key, uint32 *base, uint32 nelem)
 {
        uint32          i = 0;
 
-       /* Use SIMD intrinsics where available. */
 #ifdef USE_SSE2
 
        /*
index a571e79f57471a954081ba96a21f78857da6fdd3..61e4362258b5efa6602058b26d376269acbb152b 100644 (file)
@@ -8,11 +8,17 @@
  *
  * src/include/port/simd.h
  *
+ * NOTES
+ * - VectorN in this file refers to a register where the element operands
+ * are N bits wide. The vector width is platform-specific, so users that care
+ * about that will need to inspect "sizeof(VectorN)".
+ *
  *-------------------------------------------------------------------------
  */
 #ifndef SIMD_H
 #define SIMD_H
 
+#if (defined(__x86_64__) || defined(_M_AMD64))
 /*
  * SSE2 instructions are part of the spec for the 64-bit x86 ISA. We assume
  * that compilers targeting this architecture understand SSE2 intrinsics.
  * will allow the use of intrinsics that haven't been enabled at compile
  * time.
  */
-#if (defined(__x86_64__) || defined(_M_AMD64))
 #include <emmintrin.h>
 #define USE_SSE2
+typedef __m128i Vector8;
+
+#else
+/*
+ * If no SIMD instructions are available, we can in some cases emulate vector
+ * operations using bitwise operations on unsigned integers.
+ */
+#define USE_NO_SIMD
+typedef uint64 Vector8;
+#endif
+
+
+/* load/store operations */
+static inline void vector8_load(Vector8 *v, const uint8 *s);
+
+/* assignment operations */
+static inline Vector8 vector8_broadcast(const uint8 c);
+
+/* element-wise comparisons to a scalar */
+static inline bool vector8_has(const Vector8 v, const uint8 c);
+static inline bool vector8_has_zero(const Vector8 v);
+static inline bool vector8_has_le(const Vector8 v, const uint8 c);
+
+
+/*
+ * Load a chunk of memory into the given vector.
+ */
+static inline void
+vector8_load(Vector8 *v, const uint8 *s)
+{
+#if defined(USE_SSE2)
+       *v = _mm_loadu_si128((const __m128i *) s);
+#else
+       memcpy(v, s, sizeof(Vector8));
 #endif
+}
+
+
+/*
+ * Create a vector with all elements set to the same value.
+ */
+static inline Vector8
+vector8_broadcast(const uint8 c)
+{
+#if defined(USE_SSE2)
+       return _mm_set1_epi8(c);
+#else
+       return ~UINT64CONST(0) / 0xFF * c;
+#endif
+}
+
+/*
+ * Return true if any elements in the vector are equal to the given scalar.
+ */
+static inline bool
+vector8_has(const Vector8 v, const uint8 c)
+{
+       bool            result;
+
+       /* pre-compute the result for assert checking */
+#ifdef USE_ASSERT_CHECKING
+       bool            assert_result = false;
+
+       for (int i = 0; i < sizeof(Vector8); i++)
+       {
+               if (((const uint8 *) &v)[i] == c)
+               {
+                       assert_result = true;
+                       break;
+               }
+       }
+#endif                                                 /* USE_ASSERT_CHECKING */
+
+#if defined(USE_NO_SIMD)
+       /* any bytes in v equal to c will evaluate to zero via XOR */
+       result = vector8_has_zero(v ^ vector8_broadcast(c));
+#elif defined(USE_SSE2)
+       result = _mm_movemask_epi8(_mm_cmpeq_epi8(v, vector8_broadcast(c)));
+#endif
+
+       Assert(assert_result == result);
+       return result;
+}
+
+/*
+ * Convenience function equivalent to vector8_has(v, 0)
+ */
+static inline bool
+vector8_has_zero(const Vector8 v)
+{
+#if defined(USE_NO_SIMD)
+       /*
+        * We cannot call vector8_has() here, because that would lead to a circular
+        * definition.
+        */
+       return vector8_has_le(v, 0);
+#elif defined(USE_SSE2)
+       return vector8_has(v, 0);
+#endif
+}
+
+/*
+ * Return true if any elements in the vector are less than or equal to the
+ * given scalar.
+ */
+static inline bool
+vector8_has_le(const Vector8 v, const uint8 c)
+{
+       bool            result = false;
+#if defined(USE_SSE2)
+       __m128i         sub;
+#endif
+
+       /* pre-compute the result for assert checking */
+#ifdef USE_ASSERT_CHECKING
+       bool            assert_result = false;
+
+       for (int i = 0; i < sizeof(Vector8); i++)
+       {
+               if (((const uint8 *) &v)[i] <= c)
+               {
+                       assert_result = true;
+                       break;
+               }
+       }
+#endif                                                 /* USE_ASSERT_CHECKING */
+
+#if defined(USE_NO_SIMD)
+
+       /*
+        * To find bytes <= c, we can use bitwise operations to find bytes < c+1,
+        * but it only works if c+1 <= 128 and if the highest bit in v is not set.
+        * Adapted from
+        * https://graphics.stanford.edu/~seander/bithacks.html#HasLessInWord
+        */
+       if ((int64) v >= 0 && c < 0x80)
+               result = (v - vector8_broadcast(c + 1)) & ~v & vector8_broadcast(0x80);
+       else
+       {
+               /* one byte at a time */
+               for (int i = 0; i < sizeof(Vector8); i++)
+               {
+                       if (((const uint8 *) &v)[i] <= c)
+                       {
+                               result = true;
+                               break;
+                       }
+               }
+       }
+#elif defined(USE_SSE2)
+
+       /*
+        * Use saturating subtraction to find bytes <= c, which will present as
+        * NUL bytes in 'sub'.
+        */
+       sub = _mm_subs_epu8(v, vector8_broadcast(c));
+       result = vector8_has_zero(sub);
+#endif
+
+       Assert(assert_result == result);
+       return result;
+}
 
 #endif                                                 /* SIMD_H */
index 222c8fd7fff736d83014d8e25c859ff7243f2ca6..1d4b14e70321556ff974ada777b207d77c12abb1 100644 (file)
@@ -4,9 +4,21 @@ CREATE EXTENSION test_lfind;
 -- the operations complete without crashing or hanging and that none of their
 -- internal sanity tests fail.
 --
-SELECT test_lfind();
- test_lfind 
-------------
+SELECT test_lfind8();
+ test_lfind8 
+-------------
+(1 row)
+
+SELECT test_lfind8_le();
+ test_lfind8_le 
+----------------
+(1 row)
+
+SELECT test_lfind32();
+ test_lfind32 
+--------------
  
 (1 row)
 
index 899f1dd49bf82e8b600cc84dbbaa0253940a3592..766c640831f49905810db38fca6626bbe8656ee2 100644 (file)
@@ -5,4 +5,6 @@ CREATE EXTENSION test_lfind;
 -- the operations complete without crashing or hanging and that none of their
 -- internal sanity tests fail.
 --
-SELECT test_lfind();
+SELECT test_lfind8();
+SELECT test_lfind8_le();
+SELECT test_lfind32();
index d82ab0567effa25680d0e6a12bbb5ad7909369d5..81801926ae83cf32ae0e78f298f304e2ceda85da 100644 (file)
@@ -3,6 +3,14 @@
 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
 \echo Use "CREATE EXTENSION test_lfind" to load this file. \quit
 
-CREATE FUNCTION test_lfind()
+CREATE FUNCTION test_lfind32()
+       RETURNS pg_catalog.void
+       AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_lfind8()
+       RETURNS pg_catalog.void
+       AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION test_lfind8_le()
        RETURNS pg_catalog.void
        AS 'MODULE_PATHNAME' LANGUAGE C;
index a000746fb830429752327d3db948d24d255597c6..82673d54c6ede046a1387305f8fae5d692896a28 100644 (file)
 #include "fmgr.h"
 #include "port/pg_lfind.h"
 
+/*
+ * Convenience macros for testing both vector and scalar operations. The 2x
+ * factor is to make sure iteration works
+ */
+#define LEN_NO_TAIL(vectortype) (2 * sizeof(vectortype))
+#define LEN_WITH_TAIL(vectortype) (LEN_NO_TAIL(vectortype) + 3)
+
 PG_MODULE_MAGIC;
 
-PG_FUNCTION_INFO_V1(test_lfind);
+/* workhorse for test_lfind8 */
+static void
+test_lfind8_internal(uint8 key)
+{
+       uint8           charbuf[LEN_WITH_TAIL(Vector8)];
+       const int       len_no_tail = LEN_NO_TAIL(Vector8);
+       const int       len_with_tail = LEN_WITH_TAIL(Vector8);
+
+       memset(charbuf, 0xFF, len_with_tail);
+       /* search tail to test one-byte-at-a-time path */
+       charbuf[len_with_tail - 1] = key;
+       if (key > 0x00 && pg_lfind8(key - 1, charbuf, len_with_tail))
+               elog(ERROR, "pg_lfind8() found nonexistent element '0x%x'", key - 1);
+       if (key < 0xFF && !pg_lfind8(key, charbuf, len_with_tail))
+               elog(ERROR, "pg_lfind8() did not find existing element '0x%x'", key);
+       if (key < 0xFE && pg_lfind8(key + 1, charbuf, len_with_tail))
+               elog(ERROR, "pg_lfind8() found nonexistent element '0x%x'", key + 1);
+
+       memset(charbuf, 0xFF, len_with_tail);
+       /* search with vector operations */
+       charbuf[len_no_tail - 1] = key;
+       if (key > 0x00 && pg_lfind8(key - 1, charbuf, len_no_tail))
+               elog(ERROR, "pg_lfind8() found nonexistent element '0x%x'", key - 1);
+       if (key < 0xFF && !pg_lfind8(key, charbuf, len_no_tail))
+               elog(ERROR, "pg_lfind8() did not find existing element '0x%x'", key);
+       if (key < 0xFE && pg_lfind8(key + 1, charbuf, len_no_tail))
+               elog(ERROR, "pg_lfind8() found nonexistent element '0x%x'", key + 1);
+}
+
+PG_FUNCTION_INFO_V1(test_lfind8);
+Datum
+test_lfind8(PG_FUNCTION_ARGS)
+{
+       test_lfind8_internal(0);
+       test_lfind8_internal(1);
+       test_lfind8_internal(0x7F);
+       test_lfind8_internal(0x80);
+       test_lfind8_internal(0x81);
+       test_lfind8_internal(0xFD);
+       test_lfind8_internal(0xFE);
+       test_lfind8_internal(0xFF);
+
+       PG_RETURN_VOID();
+}
+
+/* workhorse for test_lfind8_le */
+static void
+test_lfind8_le_internal(uint8 key)
+{
+       uint8           charbuf[LEN_WITH_TAIL(Vector8)];
+       const int       len_no_tail = LEN_NO_TAIL(Vector8);
+       const int       len_with_tail = LEN_WITH_TAIL(Vector8);
+
+       memset(charbuf, 0xFF, len_with_tail);
+       /* search tail to test one-byte-at-a-time path */
+       charbuf[len_with_tail - 1] = key;
+       if (key > 0x00 && pg_lfind8_le(key - 1, charbuf, len_with_tail))
+               elog(ERROR, "pg_lfind8_le() found nonexistent element <= '0x%x'", key - 1);
+       if (key < 0xFF && !pg_lfind8_le(key, charbuf, len_with_tail))
+               elog(ERROR, "pg_lfind8_le() did not find existing element <= '0x%x'", key);
+       if (key < 0xFE && !pg_lfind8_le(key + 1, charbuf, len_with_tail))
+               elog(ERROR, "pg_lfind8_le() did not find existing element <= '0x%x'", key + 1);
+
+       memset(charbuf, 0xFF, len_with_tail);
+       /* search with vector operations */
+       charbuf[len_no_tail - 1] = key;
+       if (key > 0x00 && pg_lfind8_le(key - 1, charbuf, len_no_tail))
+               elog(ERROR, "pg_lfind8_le() found nonexistent element <= '0x%x'", key - 1);
+       if (key < 0xFF && !pg_lfind8_le(key, charbuf, len_no_tail))
+               elog(ERROR, "pg_lfind8_le() did not find existing element <= '0x%x'", key);
+       if (key < 0xFE && !pg_lfind8_le(key + 1, charbuf, len_no_tail))
+               elog(ERROR, "pg_lfind8_le() did not find existing element <= '0x%x'", key + 1);
+}
+
+PG_FUNCTION_INFO_V1(test_lfind8_le);
+Datum
+test_lfind8_le(PG_FUNCTION_ARGS)
+{
+       test_lfind8_le_internal(0);
+       test_lfind8_le_internal(1);
+       test_lfind8_le_internal(0x7F);
+       test_lfind8_le_internal(0x80);
+       test_lfind8_le_internal(0x81);
+       test_lfind8_le_internal(0xFD);
+       test_lfind8_le_internal(0xFE);
+       test_lfind8_le_internal(0xFF);
+
+       PG_RETURN_VOID();
+}
 
+PG_FUNCTION_INFO_V1(test_lfind32);
 Datum
-test_lfind(PG_FUNCTION_ARGS)
+test_lfind32(PG_FUNCTION_ARGS)
 {
 #define TEST_ARRAY_SIZE 135
        uint32          test_array[TEST_ARRAY_SIZE] = {0};