From: Amit Kapila Date: Fri, 19 Sep 2025 05:38:40 +0000 (+0000) Subject: Add optional pid parameter to pg_replication_origin_session_setup(). X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=5b148706c5c8ffffe5662fe569a0f0bcef2351d9;p=thirdparty%2Fpostgresql.git Add optional pid parameter to pg_replication_origin_session_setup(). Commit 216a784829c introduced parallel apply workers, allowing multiple processes to share a replication origin. To support this, replorigin_session_setup() was extended to accept a pid argument identifying the process using the origin. This commit exposes that capability through the SQL interface function pg_replication_origin_session_setup() by adding an optional pid parameter. This enables multiple processes to coordinate replication using the same origin when using SQL-level replication functions. This change allows the non-builtin logical replication solutions to implement parallel apply for large transactions. Additionally, an existing internal error was made user-facing, as it can now be triggered via the exposed SQL API. Author: Doruk Yilmaz Author: Hayato Kuroda Reviewed-by: Amit Kapila Reviewed-by: Euler Taveira Discussion: https://postgr.es/m/CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com Discussion: https://postgr.es/m/CAE2gYzyTSNvHY1+iWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA@mail.gmail.com --- diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 02e961f4d31..acbcaed2feb 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ - skip_snapshot_restore invalidation_distribution + skip_snapshot_restore invalidation_distribution parallel_session_origin REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out new file mode 100644 index 00000000000..e515b39f7ce --- /dev/null +++ b/contrib/test_decoding/expected/parallel_session_origin.out @@ -0,0 +1,79 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset +step s0_setup: SELECT pg_replication_origin_session_setup('origin'); +pg_replication_origin_session_setup +----------------------------------- + +(1 row) + +step s0_is_setup: SELECT pg_replication_origin_session_is_setup(); +pg_replication_origin_session_is_setup +-------------------------------------- +t +(1 row) + +step s1_setup: + SELECT pg_replication_origin_session_setup('origin', pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/parallel_session_origin/s0'; + +pg_replication_origin_session_setup +----------------------------------- + +(1 row) + +step s1_is_setup: SELECT pg_replication_origin_session_is_setup(); +pg_replication_origin_session_is_setup +-------------------------------------- +t +(1 row) + +step s0_add_message: + SELECT 1 + FROM pg_logical_emit_message(true, 'prefix', 'message on s0'); + +?column? +-------- + 1 +(1 row) + +step s0_store_lsn: + INSERT INTO local_lsn_store + SELECT 0, local_lsn FROM pg_replication_origin_status; + +step s1_add_message: + SELECT 1 + FROM pg_logical_emit_message(true, 'prefix', 'message on s1'); + +?column? +-------- + 1 +(1 row) + +step s1_store_lsn: + INSERT INTO local_lsn_store + SELECT 1, local_lsn FROM pg_replication_origin_status; + +step s0_compare: + SELECT s0.lsn < s1.lsn + FROM local_lsn_store as s0, local_lsn_store as s1 + WHERE s0.session = 0 AND s1.session = 1; + +?column? +-------- +t +(1 row) + +step s0_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + +step s1_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out index c85e1a01b23..29a9630c900 100644 --- a/contrib/test_decoding/expected/replorigin.out +++ b/contrib/test_decoding/expected/replorigin.out @@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index" DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists. +-- ensure inactive origin cannot be set as session one if pid is specified +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1); +ERROR: cannot use PID -1 for inactive replication origin with ID 1 --ensure deletions work (once) SELECT pg_replication_origin_create('regress_test_decoding: temp'); pg_replication_origin_create diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index 25f6b8a9082..99310555e6c 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -64,6 +64,7 @@ tests += { 'slot_creation_error', 'skip_snapshot_restore', 'invalidation_distribution', + 'parallel_session_origin', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec new file mode 100644 index 00000000000..c0e5fda0723 --- /dev/null +++ b/contrib/test_decoding/specs/parallel_session_origin.spec @@ -0,0 +1,56 @@ +# Test parallel replication origin manipulations; ensure local_lsn can be +# updated by all attached sessions. + +setup +{ + SELECT pg_replication_origin_create('origin'); + CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn); +} + +teardown +{ + SELECT pg_replication_origin_drop('origin'); + DROP TABLE local_lsn_store; +} + +session "s0" +setup { SET synchronous_commit = on; } +step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); } +step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); } +step "s0_add_message" { + SELECT 1 + FROM pg_logical_emit_message(true, 'prefix', 'message on s0'); +} +step "s0_store_lsn" { + INSERT INTO local_lsn_store + SELECT 0, local_lsn FROM pg_replication_origin_status; +} +step "s0_compare" { + SELECT s0.lsn < s1.lsn + FROM local_lsn_store as s0, local_lsn_store as s1 + WHERE s0.session = 0 AND s1.session = 1; +} +step "s0_reset" { SELECT pg_replication_origin_session_reset(); } + +session "s1" +setup { SET synchronous_commit = on; } +step "s1_setup" { + SELECT pg_replication_origin_session_setup('origin', pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/parallel_session_origin/s0'; +} +step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); } +step "s1_add_message" { + SELECT 1 + FROM pg_logical_emit_message(true, 'prefix', 'message on s1'); +} +step "s1_store_lsn" { + INSERT INTO local_lsn_store + SELECT 1, local_lsn FROM pg_replication_origin_status; +} +step "s1_reset" { SELECT pg_replication_origin_session_reset(); } + +# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions +# commits a transaction and store the local_lsn of the replication origin. +# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn. +permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset" diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql index e71ee02d050..17f2b888238 100644 --- a/contrib/test_decoding/sql/replorigin.sql +++ b/contrib/test_decoding/sql/replorigin.sql @@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); -- ensure duplicate creations fail SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); +-- ensure inactive origin cannot be set as session one if pid is specified +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1); + --ensure deletions work (once) SELECT pg_replication_origin_create('regress_test_decoding: temp'); SELECT pg_replication_origin_drop('regress_test_decoding: temp'); diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 57ff333159f..1b465bc8ba7 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_replication_origin_session_setup - pg_replication_origin_session_setup ( node_name text ) + pg_replication_origin_session_setup ( node_name text , pid integer DEFAULT 0 ) void @@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset origin, allowing replay progress to be tracked. Can only be used if no origin is currently selected. Use pg_replication_origin_session_reset to undo. - + If multiple processes can safely use the same replication origin (for + example, parallel apply processes), the optional pid + parameter can be used to specify the process ID of the first process. + The first process must provide pid equals to + 0 and the other processes that share the same + replication origin should provide the process ID of the first process. + + + + When multiple processes share the same replication origin, it is critical + to maintain commit order to prevent data inconsistency. While processes + may send operations out of order, they must commit transactions in the + correct sequence to ensure proper replication consistency. The recommended workflow + for each worker is: set up the replication origin session with the first process's PID, + apply changes within transactions, call pg_replication_origin_xact_setup + with the LSN and commit timestamp before committing, then commit the + transaction only if everything succeeded. + + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308e443..2d946d6d9e9 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -650,6 +650,13 @@ LANGUAGE INTERNAL CALLED ON NULL INPUT VOLATILE PARALLEL SAFE AS 'pg_stat_reset_slru'; +CREATE OR REPLACE FUNCTION + pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0) +RETURNS void +LANGUAGE INTERNAL +STRICT VOLATILE PARALLEL UNSAFE +AS 'pg_replication_origin_session_setup'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather @@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public; -REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public; REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 87f10e50dcc..bcd5d9aad62 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by) curstate->roident, curstate->acquired_by))); } + else if (curstate->acquired_by != acquired_by) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d", + node, acquired_by))); + } + /* ok, found slot */ session_replication_state = curstate; break; @@ -1181,6 +1189,12 @@ replorigin_session_setup(RepOriginId node, int acquired_by) errhint("Increase \"max_active_replication_origins\" and try again."))); else if (session_replication_state == NULL) { + if (acquired_by) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use PID %d for inactive replication origin with ID %d", + acquired_by, node))); + /* initialize new slot */ session_replication_state = &replication_states[free_slot]; Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); @@ -1193,9 +1207,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by) if (acquired_by == 0) session_replication_state->acquired_by = MyProcPid; - else if (session_replication_state->acquired_by != acquired_by) - elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d", - node, acquired_by); + else + Assert(session_replication_state->acquired_by == acquired_by); LWLockRelease(ReplicationOriginLock); @@ -1374,12 +1387,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; RepOriginId origin; + int pid; replorigin_check_prerequisites(true, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); - replorigin_session_setup(origin, 0); + pid = PG_GETARG_INT32(1); + replorigin_session_setup(origin, pid); replorigin_session_origin = origin; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index ef0d0f92165..62c21d3670d 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202509091 +#define CATALOG_VERSION_NO 202509191 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 03e82d28c87..01eba3b5a19 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12235,7 +12235,7 @@ { oid => '6006', descr => 'configure session to maintain replication progress tracking for the passed in origin', proname => 'pg_replication_origin_session_setup', provolatile => 'v', - proparallel => 'u', prorettype => 'void', proargtypes => 'text', + proparallel => 'u', prorettype => 'void', proargtypes => 'text int4', prosrc => 'pg_replication_origin_session_setup' }, { oid => '6007', descr => 'teardown configured replication progress tracking',