]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix orphaned origin in shared memory after DROP SUBSCRIPTION
authorMichael Paquier <michael@paquier.xyz>
Tue, 23 Dec 2025 05:32:14 +0000 (14:32 +0900)
committerMichael Paquier <michael@paquier.xyz>
Tue, 23 Dec 2025 05:32:14 +0000 (14:32 +0900)
Since ce0fdbfe9722, a replication slot and an origin are created by each
tablesync worker, whose information is stored in both a catalog and
shared memory (once the origin is set up in the latter case).  The
transaction where the origin is created is the same as the one that runs
the initial COPY, with the catalog state of the origin becoming visible
for other sessions only once the COPY transaction has committed.  The
catalog state is coupled with a state in shared memory, initialized at
the same time as the origin created in the catalogs.  Note that the
transaction doing the initial data sync can take a long time, time that
depends on the amount of data to transfer from a publication node to its
subscriber node.

Now, when a DROP SUBSCRIPTION is executed, all its workers are stopped
with the origins removed.  The removal of each origin relies on a
catalog lookup.  A worker still running the initial COPY would fail its
transaction, with the catalog state of the origin rolled back while the
shared memory state remains around.  The session running the DROP
SUBSCRIPTION should be in charge of cleaning up the catalog and the
shared memory state, but as there is no data in the catalogs the shared
memory state is not removed.  This issue would leave orphaned origin
data in shared memory, leading to a confusing state as it would still
show up in pg_replication_origin_status.  Note that this shared memory
data is sticky, being flushed on disk in replorigin_checkpoint at
checkpoint.  This prevents other origins from reusing a slot position
in the shared memory data.

To address this problem, the commit moves the creation of the origin at
the end of the transaction that precedes the one executing the initial
COPY, making the origin immediately visible in the catalogs for other
sessions, giving DROP SUBSCRIPTION a way to know about it.  A different
solution would have been to clean up the shared memory state using an
abort callback within the tablesync worker.  The solution of this commit
is more consistent with the apply worker that creates an origin in a
short transaction.

A test is added in the subscription test 004_sync.pl, which was able to
display the problem.  The test fails when this commit is reverted.

Reported-by: Tenglong Gu <brucegu@amazon.com>
Reported-by: Daisuke Higuchi <higudai@amazon.com>
Analyzed-by: Michael Paquier <michael@paquier.xyz>
Author: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Discussion: https://postgr.es/m/aUTekQTg4OYnw-Co@paquier.xyz
Backpatch-through: 14

src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c
src/test/subscription/t/004_sync.pl

index abbcaff0838b941c42cdf0b8ac883388efefaee2..921cd9674f1019becb6a48a55f78e2660766030d 100644 (file)
@@ -1099,10 +1099,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                                         *
                                         * It is possible that the origin is not yet created for
                                         * tablesync worker, this can happen for the states before
-                                        * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
-                                        * apply worker can also concurrently try to drop the
-                                        * origin and by this time the origin might be already
-                                        * removed. For these reasons, passing missing_ok = true.
+                                        * SUBREL_STATE_DATASYNC. The tablesync worker or apply
+                                        * worker can also concurrently try to drop the origin and
+                                        * by this time the origin might be already removed. For
+                                        * these reasons, passing missing_ok = true.
                                         */
                                        ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
                                                                                                           sizeof(originname));
@@ -2174,7 +2174,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
                 *
                 * It is possible that the origin is not yet created for tablesync
                 * worker so passing missing_ok = true. This can happen for the states
-                * before SUBREL_STATE_FINISHEDCOPY.
+                * before SUBREL_STATE_DATASYNC.
                 */
                ReplicationOriginNameForLogicalRep(subid, relid, originname,
                                                                                   sizeof(originname));
index 6bb0cbeedad8e98624ce74852a8d21880b236128..2522e372036ffc34fe30afbff7972b54dba48a98 100644 (file)
@@ -1333,13 +1333,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
        MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
        SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-       /* Update the state and make it visible to others. */
+       /*
+        * Update the state, create the replication origin, and make them visible
+        * to others.
+        */
        StartTransactionCommand();
        UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                                           MyLogicalRepWorker->relid,
                                                           MyLogicalRepWorker->relstate,
                                                           MyLogicalRepWorker->relstate_lsn,
                                                           false);
+
+       /*
+        * Create the replication origin in a separate transaction from the one
+        * that sets up the origin in shared memory. This prevents the risk that
+        * changes to the origin in shared memory cannot be rolled back if the
+        * transaction aborts.
+        */
+       originid = replorigin_by_name(originname, true);
+       if (!OidIsValid(originid))
+               originid = replorigin_create(originname);
+
        CommitTransactionCommand();
        pgstat_report_stat(true);
 
@@ -1379,37 +1393,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                           CRS_USE_SNAPSHOT, origin_startpos);
 
        /*
-        * Setup replication origin tracking. The purpose of doing this before the
-        * copy is to avoid doing the copy again due to any error in setting up
-        * origin tracking.
+        * Advance the origin to the LSN got from walrcv_create_slot and then set
+        * up the origin. The advancement is WAL logged for the purpose of
+        * recovery. Locks are to prevent the replication origin from vanishing
+        * while advancing.
+        *
+        * The purpose of doing these before the copy is to avoid doing the copy
+        * again due to any error in advancing or setting up origin tracking.
         */
-       originid = replorigin_by_name(originname, true);
-       if (!OidIsValid(originid))
-       {
-               /*
-                * Origin tracking does not exist, so create it now.
-                *
-                * Then advance to the LSN got from walrcv_create_slot. This is WAL
-                * logged for the purpose of recovery. Locks are to prevent the
-                * replication origin from vanishing while advancing.
-                */
-               originid = replorigin_create(originname);
-
-               LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
-               replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
-                                                  true /* go backward */ , true /* WAL log */ );
-               UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+       LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+       replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr,
+                                          true /* go backward */ , true /* WAL log */ );
+       UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
-               replorigin_session_setup(originid, 0);
-               replorigin_session_origin = originid;
-       }
-       else
-       {
-               ereport(ERROR,
-                               (errcode(ERRCODE_DUPLICATE_OBJECT),
-                                errmsg("replication origin \"%s\" already exists",
-                                               originname)));
-       }
+       replorigin_session_setup(originid, 0);
+       replorigin_session_origin = originid;
 
        /*
         * If the user did not opt to run as the owner of the subscription
index d5eac05a3b309ea10a187b800023ea2696876d04..2c6ae2d023a0ec65d9218e7c6062c019d53dfe3e 100644 (file)
@@ -172,6 +172,12 @@ ok( $node_publisher->poll_query_until(
                'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'),
        'DROP SUBSCRIPTION during error can clean up the slots on the publisher');
 
+# After dropping the subscription, all replication origins, whether created by
+# an apply worker or table sync worker, should have been cleaned up.
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT count(*) FROM pg_replication_origin_status");
+is($result, qq(0), 'all replication origins have been cleaned up');
+
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');