]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Support text position search functions with nondeterministic collations
authorPeter Eisentraut <peter@eisentraut.org>
Fri, 21 Feb 2025 11:21:17 +0000 (12:21 +0100)
committerPeter Eisentraut <peter@eisentraut.org>
Fri, 21 Feb 2025 11:21:17 +0000 (12:21 +0100)
This allows using text position search functions with nondeterministic
collations.  These functions are

- position, strpos
- replace
- split_part
- string_to_array
- string_to_table

which all use common internal infrastructure.

There was previously no internal implementation of this, so it was met
with a not-supported error.  This adds the internal implementation and
removes the error.

Unlike with deterministic collations, the search cannot use any
byte-by-byte optimized techniques but has to go substring by
substring.  We also need to consider that the found match could have a
different length than the needle and that there could be substrings of
different length matching at a position.  In most cases, we need to
find the longest such substring (greedy semantics), but this can be
configured by each caller.

Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://www.postgresql.org/message-id/flat/582b2613-0900-48ca-8b0d-340c06f4d400@eisentraut.org

src/backend/utils/adt/varlena.c
src/test/regress/expected/collate.icu.utf8.out
src/test/regress/sql/collate.icu.utf8.sql

index 34796f2e27ce4e23ba215ffbf15a8a70c5dc85d1..e455657170300522f5da22d02de326f6cd30ff97 100644 (file)
@@ -54,7 +54,9 @@ typedef struct varlena VarString;
  */
 typedef struct
 {
+       pg_locale_t locale;                     /* collation used for substring matching */
        bool            is_multibyte_char_in_char;      /* need to check char boundaries? */
+       bool            greedy;                 /* find longest possible substring? */
 
        char       *str1;                       /* haystack string */
        char       *str2;                       /* needle string */
@@ -65,7 +67,13 @@ typedef struct
        int                     skiptablemask;  /* mask for ANDing with skiptable subscripts */
        int                     skiptable[256]; /* skip distance for given mismatched char */
 
+       /*
+        * Note that with nondeterministic collations, the length of the last
+        * match is not necessarily equal to the length of the "needle" passed in.
+        */
        char       *last_match;         /* pointer to last match in 'str1' */
+       int                     last_match_len; /* length of last match */
+       int                     last_match_len_tmp; /* same but for internal use */
 
        /*
         * Sometimes we need to convert the byte position of a match to a
@@ -1178,15 +1186,21 @@ text_position(text *t1, text *t2, Oid collid)
        TextPositionState state;
        int                     result;
 
+       check_collation_set(collid);
+
        /* Empty needle always matches at position 1 */
        if (VARSIZE_ANY_EXHDR(t2) < 1)
                return 1;
 
        /* Otherwise, can't match if haystack is shorter than needle */
-       if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2))
+       if (VARSIZE_ANY_EXHDR(t1) < VARSIZE_ANY_EXHDR(t2) &&
+               pg_newlocale_from_collation(collid)->deterministic)
                return 0;
 
        text_position_setup(t1, t2, collid, &state);
+       /* don't need greedy mode here */
+       state.greedy = false;
+
        if (!text_position_next(&state))
                result = 0;
        else
@@ -1217,18 +1231,17 @@ text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state)
 {
        int                     len1 = VARSIZE_ANY_EXHDR(t1);
        int                     len2 = VARSIZE_ANY_EXHDR(t2);
-       pg_locale_t mylocale;
 
        check_collation_set(collid);
 
-       mylocale = pg_newlocale_from_collation(collid);
+       state->locale = pg_newlocale_from_collation(collid);
 
-       if (!mylocale->deterministic)
-               ereport(ERROR,
-                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                errmsg("nondeterministic collations are not supported for substring searches")));
+       /*
+        * Most callers need greedy mode, but some might want to unset this to
+        * optimize.
+        */
+       state->greedy = true;
 
-       Assert(len1 > 0);
        Assert(len2 > 0);
 
        /*
@@ -1264,8 +1277,11 @@ text_position_setup(text *t1, text *t2, Oid collid, TextPositionState *state)
         * point in wasting cycles initializing the table.  We also choose not to
         * use B-M-H for needles of length 1, since the skip table can't possibly
         * save anything in that case.
+        *
+        * (With nondeterministic collations, the search is already
+        * multibyte-aware, so we don't need this.)
         */
-       if (len1 >= len2 && len2 > 1)
+       if (len1 >= len2 && len2 > 1 && state->locale->deterministic)
        {
                int                     searchlength = len1 - len2;
                int                     skiptablemask;
@@ -1343,7 +1359,7 @@ text_position_next(TextPositionState *state)
 
        /* Start from the point right after the previous match. */
        if (state->last_match)
-               start_ptr = state->last_match + needle_len;
+               start_ptr = state->last_match + state->last_match_len;
        else
                start_ptr = state->str1;
 
@@ -1359,7 +1375,7 @@ retry:
         * multi-byte character, we need to verify that the match was at a
         * character boundary, not in the middle of a multi-byte character.
         */
-       if (state->is_multibyte_char_in_char)
+       if (state->is_multibyte_char_in_char && state->locale->deterministic)
        {
                /* Walk one character at a time, until we reach the match. */
 
@@ -1387,6 +1403,7 @@ retry:
        }
 
        state->last_match = matchptr;
+       state->last_match_len = state->last_match_len_tmp;
        return true;
 }
 
@@ -1408,7 +1425,62 @@ text_position_next_internal(char *start_ptr, TextPositionState *state)
 
        Assert(start_ptr >= haystack && start_ptr <= haystack_end);
 
-       if (needle_len == 1)
+       state->last_match_len_tmp = needle_len;
+
+       if (!state->locale->deterministic)
+       {
+               /*
+                * With a nondeterministic collation, we have to use an unoptimized
+                * route.  We walk through the haystack and see if at each position
+                * there is a substring of the remaining string that is equal to the
+                * needle under the given collation.
+                *
+                * Note, the found substring could have a different length than the
+                * needle, including being empty.  Callers that want to skip over the
+                * found string need to read the length of the found substring from
+                * last_match_len rather than just using the length of their needle.
+                *
+                * Most callers will require "greedy" semantics, meaning that we need
+                * to find the longest such substring, not the shortest.  For callers
+                * that don't need greedy semantics, we can finish on the first match.
+                */
+               const char *result_hptr = NULL;
+
+               hptr = start_ptr;
+               while (hptr < haystack_end)
+               {
+                       /*
+                        * First check the common case that there is a match in the
+                        * haystack of exactly the length of the needle.
+                        */
+                       if (!state->greedy &&
+                               haystack_end - hptr >= needle_len &&
+                               pg_strncoll(hptr, needle_len, needle, needle_len, state->locale) == 0)
+                               return (char *) hptr;
+
+                       /*
+                        * Else check if any of the possible substrings starting at hptr
+                        * are equal to the needle.
+                        */
+                       for (const char *test_end = hptr; test_end < haystack_end; test_end += pg_mblen(test_end))
+                       {
+                               if (pg_strncoll(hptr, (test_end - hptr), needle, needle_len, state->locale) == 0)
+                               {
+                                       state->last_match_len_tmp = (test_end - hptr);
+                                       result_hptr = hptr;
+                                       if (!state->greedy)
+                                               break;
+                               }
+                       }
+                       if (result_hptr)
+                               break;
+
+                       hptr += pg_mblen(hptr);
+               }
+
+               return (char *) result_hptr;
+       }
+       else if (needle_len == 1)
        {
                /* No point in using B-M-H for a one-character needle */
                char            nchar = *needle;
@@ -4055,7 +4127,7 @@ replace_text(PG_FUNCTION_ARGS)
 
                appendStringInfoText(&str, to_sub_text);
 
-               start_ptr = curr_ptr + from_sub_text_len;
+               start_ptr = curr_ptr + state.last_match_len;
 
                found = text_position_next(&state);
                if (found)
@@ -4445,7 +4517,7 @@ split_part(PG_FUNCTION_ARGS)
                /* special case of last field does not require an extra pass */
                if (fldnum == -1)
                {
-                       start_ptr = text_position_get_match_ptr(&state) + fldsep_len;
+                       start_ptr = text_position_get_match_ptr(&state) + state.last_match_len;
                        end_ptr = VARDATA_ANY(inputstring) + inputstring_len;
                        text_position_cleanup(&state);
                        PG_RETURN_TEXT_P(cstring_to_text_with_len(start_ptr,
@@ -4475,7 +4547,7 @@ split_part(PG_FUNCTION_ARGS)
        while (found && --fldnum > 0)
        {
                /* identify bounds of next field */
-               start_ptr = end_ptr + fldsep_len;
+               start_ptr = end_ptr + state.last_match_len;
                found = text_position_next(&state);
                if (found)
                        end_ptr = text_position_get_match_ptr(&state);
@@ -4691,7 +4763,7 @@ split_text(FunctionCallInfo fcinfo, SplitTextOutputData *tstate)
                        if (!found)
                                break;
 
-                       start_ptr = end_ptr + fldsep_len;
+                       start_ptr = end_ptr + state.last_match_len;
                }
 
                text_position_cleanup(&state);
index 96a134d156169eb5200617905f581aca801ef838..aee4755c08349e3a38412f54a43c33aa5a7b4e4e 100644 (file)
@@ -1326,39 +1326,109 @@ SELECT 'abc' LIKE 'a\bc' COLLATE ctest_nondet;
 
 CREATE TABLE test6 (a int, b text);
 -- same string in different normal forms
-INSERT INTO test6 VALUES (1, U&'\00E4bc');
-INSERT INTO test6 VALUES (2, U&'\0061\0308bc');
+INSERT INTO test6 VALUES (1, U&'zy\00E4bc');
+INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc');
 SELECT * FROM test6;
- a |  b  
----+-----
- 1 | äbc
- 2 | äbc
+ a |   b   
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
 (2 rows)
 
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det;
- a |  b  
----+-----
- 1 | äbc
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det;
+ a |   b   
+---+-------
+ 1 | zyäbc
 (1 row)
 
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet;
- a |  b  
----+-----
- 1 | äbc
- 2 | äbc
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet;
+ a |   b   
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
 (2 rows)
 
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_det;
- a |  b  
----+-----
- 1 | äbc
+SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6;
+ strpos 
+--------
+      4
+      5
+(2 rows)
+
+SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6;
+ strpos 
+--------
+      4
+      5
+(2 rows)
+
+SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6;
+ replace 
+---------
+ zyXc
+ zyäbc
+(2 rows)
+
+SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6;
+ replace 
+---------
+ zyXc
+ zyXc
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | 
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | c
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | zyäbc
+(2 rows)
+
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6;
+ a | split_part 
+---+------------
+ 1 | c
+ 2 | c
+(2 rows)
+
+SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6;
+ a | string_to_array 
+---+-----------------
+ 1 | {zy,c}
+ 2 | {zyäbc}
+(2 rows)
+
+SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6;
+ a | string_to_array 
+---+-----------------
+ 1 | {zy,c}
+ 2 | {zy,c}
+(2 rows)
+
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_det;
+ a |   b   
+---+-------
+ 1 | zyäbc
 (1 row)
 
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_nondet;
- a |  b  
----+-----
- 1 | äbc
- 2 | äbc
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_nondet;
+ a |   b   
+---+-------
+ 1 | zyäbc
+ 2 | zyäbc
 (2 rows)
 
 -- same with arrays
@@ -1669,7 +1739,11 @@ CREATE UNIQUE INDEX ON test3ci (x);  -- error
 ERROR:  could not create unique index "test3ci_x_idx"
 DETAIL:  Key (x)=(abc) is duplicated.
 SELECT string_to_array('ABC,DEF,GHI' COLLATE case_insensitive, ',', 'abc');
-ERROR:  nondeterministic collations are not supported for substring searches
+ string_to_array 
+-----------------
+ {NULL,DEF,GHI}
+(1 row)
+
 SELECT string_to_array('ABCDEFGHI' COLLATE case_insensitive, NULL, 'b');
     string_to_array     
 ------------------------
@@ -1792,7 +1866,11 @@ CREATE UNIQUE INDEX ON test3bpci (x);  -- error
 ERROR:  could not create unique index "test3bpci_x_idx"
 DETAIL:  Key (x)=(abc) is duplicated.
 SELECT string_to_array('ABC,DEF,GHI'::char(11) COLLATE case_insensitive, ',', 'abc');
-ERROR:  nondeterministic collations are not supported for substring searches
+ string_to_array 
+-----------------
+ {NULL,DEF,GHI}
+(1 row)
+
 SELECT string_to_array('ABCDEFGHI'::char(9) COLLATE case_insensitive, NULL, 'b');
     string_to_array     
 ------------------------
@@ -1921,6 +1999,30 @@ SELECT * FROM test4 WHERE b = 'Cote' COLLATE case_insensitive;
  1 | cote
 (1 row)
 
+CREATE TABLE test4nfd (a int, b text);
+INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté');
+UPDATE test4nfd SET b = normalize(b, nfd);
+-- This shows why replace should be greedy.  Otherwise, in the NFD
+-- case, the match would stop before the decomposed accents, which
+-- would leave the accents in the results.
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4;
+ a |  b   | replace 
+---+------+---------
+ 1 | cote | mate
+ 2 | côte | mate
+ 3 | coté | maté
+ 4 | côté | maté
+(4 rows)
+
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd;
+ a |  b   | replace 
+---+------+---------
+ 1 | cote | mate
+ 2 | côte | mate
+ 3 | coté | maté
+ 4 | côté | maté
+(4 rows)
+
 -- This is a tricky one.  A naive implementation would first test
 -- \00E4 matches \0061, which is true under ignore_accents, but then
 -- the rest of the string won't match anymore.  Therefore, the
index eea50e34c2d2f0d32443fa0e6d374b844e79ced0..38ebcd99508422a20d44480abffb389ca5908e27 100644 (file)
@@ -527,14 +527,28 @@ SELECT 'abc' LIKE 'a\bc' COLLATE ctest_nondet;
 
 CREATE TABLE test6 (a int, b text);
 -- same string in different normal forms
-INSERT INTO test6 VALUES (1, U&'\00E4bc');
-INSERT INTO test6 VALUES (2, U&'\0061\0308bc');
+INSERT INTO test6 VALUES (1, U&'zy\00E4bc');
+INSERT INTO test6 VALUES (2, U&'zy\0061\0308bc');
 SELECT * FROM test6;
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_det;
-SELECT * FROM test6 WHERE b = 'äbc' COLLATE ctest_nondet;
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_det;
+SELECT * FROM test6 WHERE b = 'zyäbc' COLLATE ctest_nondet;
 
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_det;
-SELECT * FROM test6 WHERE b LIKE 'äbc' COLLATE ctest_nondet;
+SELECT strpos(b COLLATE ctest_det, 'bc') FROM test6;
+SELECT strpos(b COLLATE ctest_nondet, 'bc') FROM test6;
+
+SELECT replace(b COLLATE ctest_det, U&'\00E4b', 'X') FROM test6;
+SELECT replace(b COLLATE ctest_nondet, U&'\00E4b', 'X') FROM test6;
+
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', 2) FROM test6;
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', 2) FROM test6;
+SELECT a, split_part(b COLLATE ctest_det, U&'\00E4b', -1) FROM test6;
+SELECT a, split_part(b COLLATE ctest_nondet, U&'\00E4b', -1) FROM test6;
+
+SELECT a, string_to_array(b COLLATE ctest_det, U&'\00E4b') FROM test6;
+SELECT a, string_to_array(b COLLATE ctest_nondet, U&'\00E4b') FROM test6;
+
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_det;
+SELECT * FROM test6 WHERE b LIKE 'zyäbc' COLLATE ctest_nondet;
 
 -- same with arrays
 CREATE TABLE test6a (a int, b text[]);
@@ -701,6 +715,16 @@ SELECT * FROM test4 WHERE b = 'cote' COLLATE ignore_accents;
 SELECT * FROM test4 WHERE b = 'Cote' COLLATE ignore_accents;  -- still case-sensitive
 SELECT * FROM test4 WHERE b = 'Cote' COLLATE case_insensitive;
 
+CREATE TABLE test4nfd (a int, b text);
+INSERT INTO test4nfd VALUES (1, 'cote'), (2, 'côte'), (3, 'coté'), (4, 'côté');
+UPDATE test4nfd SET b = normalize(b, nfd);
+
+-- This shows why replace should be greedy.  Otherwise, in the NFD
+-- case, the match would stop before the decomposed accents, which
+-- would leave the accents in the results.
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4;
+SELECT a, b, replace(b COLLATE ignore_accents, 'co', 'ma') FROM test4nfd;
+
 -- This is a tricky one.  A naive implementation would first test
 -- \00E4 matches \0061, which is true under ignore_accents, but then
 -- the rest of the string won't match anymore.  Therefore, the